[BugFix] scheduler: Fix ordering preserving of skipped requests (#32173)
Signed-off-by: Or Ozeri <oro@il.ibm.com>
This commit is contained in:
@@ -3349,3 +3349,28 @@ def test_ec_connector_allocate_encoder_tokens_with_external_load(use_kv_connecto
|
||||
# ==============================================================================
|
||||
# EPD (Encoder-Prefill-Decode) Encoder-cache-specific tests end
|
||||
# ==============================================================================
|
||||
|
||||
|
||||
def test_prepend_skipped_requests_order():
|
||||
scheduler = create_scheduler(max_num_seqs=1, use_kv_connector=True)
|
||||
requests = create_requests(num_requests=4)
|
||||
for request in requests:
|
||||
scheduler.add_request(request)
|
||||
|
||||
# 4 requests waiting, capture their order
|
||||
expected_waiting_reqs = list(scheduler.waiting)
|
||||
|
||||
# simulate first 2 waiting requests are waiting for remote KVs
|
||||
for req in expected_waiting_reqs[:2]:
|
||||
req.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
||||
|
||||
# schedule step
|
||||
# expect the first 2 waiting to be skipped, the third running,
|
||||
# and the fourth waiting
|
||||
scheduler.schedule()
|
||||
|
||||
# pop the third request which is expected to be running
|
||||
expected_waiting_reqs.pop(2)
|
||||
|
||||
# verify waiting order is preserved
|
||||
assert list(scheduler.waiting) == expected_waiting_reqs
|
||||
|
||||
@@ -71,11 +71,6 @@ class RequestQueue(ABC):
|
||||
"""Iterate over the queue according to the policy."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def __reversed__(self) -> Iterator[Request]:
|
||||
"""Iterate over the queue in reverse order."""
|
||||
pass
|
||||
|
||||
|
||||
class FCFSRequestQueue(deque[Request], RequestQueue):
|
||||
"""A first-come-first-served queue that supports deque operations."""
|
||||
@@ -100,8 +95,12 @@ class FCFSRequestQueue(deque[Request], RequestQueue):
|
||||
|
||||
def prepend_requests(self, requests: RequestQueue) -> None:
|
||||
"""Prepend all requests from another queue to the front of this
|
||||
queue."""
|
||||
self.extendleft(reversed(requests))
|
||||
queue.
|
||||
|
||||
Note: The requests will be prepended in reverse order of their
|
||||
appearance in the `requests` queue.
|
||||
"""
|
||||
self.extendleft(requests)
|
||||
|
||||
def remove_request(self, request: Request) -> None:
|
||||
"""Remove a specific request from the queue."""
|
||||
@@ -128,10 +127,6 @@ class FCFSRequestQueue(deque[Request], RequestQueue):
|
||||
"""Iterate over the queue according to FCFS policy."""
|
||||
return super().__iter__()
|
||||
|
||||
def __reversed__(self) -> Iterator[Request]:
|
||||
"""Iterate over the queue in reverse order."""
|
||||
return super().__reversed__()
|
||||
|
||||
|
||||
class PriorityRequestQueue(RequestQueue):
|
||||
"""
|
||||
@@ -202,10 +197,6 @@ class PriorityRequestQueue(RequestQueue):
|
||||
while heap_copy:
|
||||
yield heapq.heappop(heap_copy)
|
||||
|
||||
def __reversed__(self) -> Iterator[Request]:
|
||||
"""Iterate over the queue in reverse priority order."""
|
||||
return reversed(list(self))
|
||||
|
||||
|
||||
def create_request_queue(policy: SchedulingPolicy) -> RequestQueue:
|
||||
"""Create request queue based on scheduling policy."""
|
||||
|
||||
Reference in New Issue
Block a user