[Bugfix] fix --scheduling-policy=priority & n>1 crashes engine (#29764)
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
Signed-off-by: Nick Hill <nhill@redhat.com>
Co-authored-by: Nick Hill <nhill@redhat.com>
(cherry picked from commit 0a9caca9f5)
This commit is contained in:
@@ -219,7 +219,17 @@ def test_priority_scheduling_blast(
|
|||||||
vllm_config=scheduler.vllm_config,
|
vllm_config=scheduler.vllm_config,
|
||||||
)
|
)
|
||||||
scheduler.add_request(req)
|
scheduler.add_request(req)
|
||||||
|
num_initial_requests = 2
|
||||||
|
for _ in range(num_initial_requests):
|
||||||
|
req = _create_random_request(
|
||||||
|
max_tokens_range=(1, max_output_tokens),
|
||||||
|
num_tokens_range=(1, max_input_tokens),
|
||||||
|
arrival_time_range=(0, 0),
|
||||||
|
priority_range=(4, 4),
|
||||||
|
num_mm_item_range=(0, 2),
|
||||||
|
vllm_config=scheduler.vllm_config,
|
||||||
|
)
|
||||||
|
scheduler.add_request(req)
|
||||||
for _ in range(20000):
|
for _ in range(20000):
|
||||||
if len(scheduler.waiting) == 0:
|
if len(scheduler.waiting) == 0:
|
||||||
num_new_requests = random.randint(0, 2)
|
num_new_requests = random.randint(0, 2)
|
||||||
|
|||||||
@@ -137,31 +137,30 @@ class PriorityRequestQueue(RequestQueue):
|
|||||||
"""
|
"""
|
||||||
A priority queue that supports heap operations.
|
A priority queue that supports heap operations.
|
||||||
|
|
||||||
Requests with a smaller value of `priority` are processed first.
|
Respects the ordering defined in the Request class, where
|
||||||
|
requests with a smaller value of `priority` are processed first.
|
||||||
If multiple requests have the same priority, the one with the earlier
|
If multiple requests have the same priority, the one with the earlier
|
||||||
`arrival_time` is processed first.
|
`arrival_time` is processed first.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._heap: list[tuple[int, float, Request]] = []
|
self._heap: list[Request] = []
|
||||||
|
|
||||||
def add_request(self, request: Request) -> None:
|
def add_request(self, request: Request) -> None:
|
||||||
"""Add a request to the queue according to priority policy."""
|
"""Add a request to the queue according to priority policy."""
|
||||||
heapq.heappush(self._heap, (request.priority, request.arrival_time, request))
|
heapq.heappush(self._heap, request)
|
||||||
|
|
||||||
def pop_request(self) -> Request:
|
def pop_request(self) -> Request:
|
||||||
"""Pop a request from the queue according to priority policy."""
|
"""Pop a request from the queue according to priority policy."""
|
||||||
if not self._heap:
|
if not self._heap:
|
||||||
raise IndexError("pop from empty heap")
|
raise IndexError("pop from empty heap")
|
||||||
_, _, request = heapq.heappop(self._heap)
|
return heapq.heappop(self._heap)
|
||||||
return request
|
|
||||||
|
|
||||||
def peek_request(self) -> Request:
|
def peek_request(self) -> Request:
|
||||||
"""Peek at the next request in the queue without removing it."""
|
"""Peek at the next request in the queue without removing it."""
|
||||||
if not self._heap:
|
if not self._heap:
|
||||||
raise IndexError("peek from empty heap")
|
raise IndexError("peek from empty heap")
|
||||||
_, _, request = self._heap[0]
|
return self._heap[0]
|
||||||
return request
|
|
||||||
|
|
||||||
def prepend_request(self, request: Request) -> None:
|
def prepend_request(self, request: Request) -> None:
|
||||||
"""Add a request to the queue according to priority policy.
|
"""Add a request to the queue according to priority policy.
|
||||||
@@ -180,15 +179,13 @@ class PriorityRequestQueue(RequestQueue):
|
|||||||
|
|
||||||
def remove_request(self, request: Request) -> None:
|
def remove_request(self, request: Request) -> None:
|
||||||
"""Remove a specific request from the queue."""
|
"""Remove a specific request from the queue."""
|
||||||
self._heap = [(p, t, r) for p, t, r in self._heap if r != request]
|
self._heap.remove(request)
|
||||||
heapq.heapify(self._heap)
|
heapq.heapify(self._heap)
|
||||||
|
|
||||||
def remove_requests(self, requests: Iterable[Request]) -> None:
|
def remove_requests(self, requests: Iterable[Request]) -> None:
|
||||||
"""Remove multiple specific requests from the queue."""
|
"""Remove multiple specific requests from the queue."""
|
||||||
requests_to_remove = set(requests)
|
requests_to_remove = requests if isinstance(requests, set) else set(requests)
|
||||||
self._heap = [
|
self._heap = [r for r in self._heap if r not in requests_to_remove]
|
||||||
(p, t, r) for p, t, r in self._heap if r not in requests_to_remove
|
|
||||||
]
|
|
||||||
heapq.heapify(self._heap)
|
heapq.heapify(self._heap)
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
def __bool__(self) -> bool:
|
||||||
@@ -203,8 +200,7 @@ class PriorityRequestQueue(RequestQueue):
|
|||||||
"""Iterate over the queue according to priority policy."""
|
"""Iterate over the queue according to priority policy."""
|
||||||
heap_copy = self._heap[:]
|
heap_copy = self._heap[:]
|
||||||
while heap_copy:
|
while heap_copy:
|
||||||
_, _, request = heapq.heappop(heap_copy)
|
yield heapq.heappop(heap_copy)
|
||||||
yield request
|
|
||||||
|
|
||||||
def __reversed__(self) -> Iterator[Request]:
|
def __reversed__(self) -> Iterator[Request]:
|
||||||
"""Iterate over the queue in reverse priority order."""
|
"""Iterate over the queue in reverse priority order."""
|
||||||
|
|||||||
@@ -227,6 +227,19 @@ class Request:
|
|||||||
events, self.events = self.events, []
|
events, self.events = self.events, []
|
||||||
return events
|
return events
|
||||||
|
|
||||||
|
def __lt__(self, other: "Request") -> bool:
|
||||||
|
"""
|
||||||
|
Compare two requests based on priority, arrival time, and request ID.
|
||||||
|
Used in priority scheduling.
|
||||||
|
"""
|
||||||
|
if self.priority != other.priority:
|
||||||
|
return self.priority < other.priority
|
||||||
|
if self.arrival_time != other.arrival_time:
|
||||||
|
return self.arrival_time < other.arrival_time
|
||||||
|
if self.request_id != other.request_id:
|
||||||
|
return self.request_id < other.request_id
|
||||||
|
return id(self) < id(other)
|
||||||
|
|
||||||
|
|
||||||
class RequestStatus(enum.IntEnum):
|
class RequestStatus(enum.IntEnum):
|
||||||
"""Status of a request."""
|
"""Status of a request."""
|
||||||
|
|||||||
Reference in New Issue
Block a user