[Refactor] Rename WAITING_FOR_FSM to WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR (#38048)
Signed-off-by: yewentao256 <zhyanwentao@126.com>
This commit is contained in:
@@ -3652,7 +3652,7 @@ def test_prepend_skipped_requests_order():
|
|||||||
assert waiting_reqs == expected_waiting_reqs
|
assert waiting_reqs == expected_waiting_reqs
|
||||||
|
|
||||||
|
|
||||||
def test_remote_kv_promotion_keeps_fcfs_with_fsm_prefix():
|
def test_remote_kv_promotion_keeps_fcfs_with_grammar_prefix():
|
||||||
scheduler = create_scheduler(max_num_seqs=1)
|
scheduler = create_scheduler(max_num_seqs=1)
|
||||||
scheduler.connector = Mock()
|
scheduler.connector = Mock()
|
||||||
scheduler.connector.get_num_new_matched_tokens.return_value = (0, False)
|
scheduler.connector.get_num_new_matched_tokens.return_value = (0, False)
|
||||||
@@ -3661,19 +3661,20 @@ def test_remote_kv_promotion_keeps_fcfs_with_fsm_prefix():
|
|||||||
for request in requests:
|
for request in requests:
|
||||||
scheduler.add_request(request)
|
scheduler.add_request(request)
|
||||||
|
|
||||||
req_fsm_1, req_fsm_2, req_remote, req_tail = list(scheduler.waiting)
|
req_grammar_1, req_grammar_2, req_remote, req_tail = list(scheduler.waiting)
|
||||||
|
|
||||||
# simulate two FSM requests at the waiting head that become ready now.
|
# simulate two structured-output grammar requests at the waiting head
|
||||||
req_fsm_1.status = RequestStatus.WAITING_FOR_FSM
|
# that become ready now.
|
||||||
req_fsm_1.structured_output_request = Mock(grammar=object())
|
req_grammar_1.status = RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR
|
||||||
req_fsm_2.status = RequestStatus.WAITING_FOR_FSM
|
req_grammar_1.structured_output_request = Mock(grammar=object())
|
||||||
req_fsm_2.structured_output_request = Mock(grammar=object())
|
req_grammar_2.status = RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR
|
||||||
|
req_grammar_2.structured_output_request = Mock(grammar=object())
|
||||||
|
|
||||||
# simulate a remote-KV request that is ready to be promoted now.
|
# simulate a remote-KV request that is ready to be promoted now.
|
||||||
req_remote.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
req_remote.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
||||||
scheduler.waiting.remove_requests([req_fsm_1, req_fsm_2, req_remote])
|
scheduler.waiting.remove_requests([req_grammar_1, req_grammar_2, req_remote])
|
||||||
scheduler.skipped_waiting.add_request(req_fsm_1)
|
scheduler.skipped_waiting.add_request(req_grammar_1)
|
||||||
scheduler.skipped_waiting.add_request(req_fsm_2)
|
scheduler.skipped_waiting.add_request(req_grammar_2)
|
||||||
scheduler.skipped_waiting.add_request(req_remote)
|
scheduler.skipped_waiting.add_request(req_remote)
|
||||||
scheduler.finished_recving_kv_req_ids.add(req_remote.request_id)
|
scheduler.finished_recving_kv_req_ids.add(req_remote.request_id)
|
||||||
scheduler._update_waiting_for_remote_kv = Mock()
|
scheduler._update_waiting_for_remote_kv = Mock()
|
||||||
@@ -3681,13 +3682,13 @@ def test_remote_kv_promotion_keeps_fcfs_with_fsm_prefix():
|
|||||||
output = scheduler.schedule()
|
output = scheduler.schedule()
|
||||||
|
|
||||||
assert output.scheduled_new_reqs
|
assert output.scheduled_new_reqs
|
||||||
assert output.scheduled_new_reqs[0].req_id == req_fsm_1.request_id
|
assert output.scheduled_new_reqs[0].req_id == req_grammar_1.request_id
|
||||||
waiting_req_ids = [
|
waiting_req_ids = [
|
||||||
req.request_id
|
req.request_id
|
||||||
for req in list(scheduler.skipped_waiting) + list(scheduler.waiting)
|
for req in list(scheduler.skipped_waiting) + list(scheduler.waiting)
|
||||||
]
|
]
|
||||||
assert waiting_req_ids == [
|
assert waiting_req_ids == [
|
||||||
req_fsm_2.request_id,
|
req_grammar_2.request_id,
|
||||||
req_remote.request_id,
|
req_remote.request_id,
|
||||||
req_tail.request_id,
|
req_tail.request_id,
|
||||||
]
|
]
|
||||||
@@ -3700,28 +3701,32 @@ def test_fcfs_mixed_skipped_waiting_types_keep_order():
|
|||||||
mk_req = lambda req_id, num_tokens=1: create_requests( # noqa: E731
|
mk_req = lambda req_id, num_tokens=1: create_requests( # noqa: E731
|
||||||
num_requests=1, num_tokens=num_tokens, req_ids=[req_id]
|
num_requests=1, num_tokens=num_tokens, req_ids=[req_id]
|
||||||
)[0]
|
)[0]
|
||||||
req_fsm, req_remote, req_stream = mk_req("fsm"), mk_req("remote"), mk_req("stream")
|
req_grammar, req_remote, req_stream = (
|
||||||
|
mk_req("grammar"),
|
||||||
|
mk_req("remote"),
|
||||||
|
mk_req("stream"),
|
||||||
|
)
|
||||||
req_regular, req_tail = mk_req("regular", 20), mk_req("tail")
|
req_regular, req_tail = mk_req("regular", 20), mk_req("tail")
|
||||||
req_fsm.status = RequestStatus.WAITING_FOR_FSM
|
req_grammar.status = RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR
|
||||||
req_fsm.structured_output_request = Mock(grammar=None)
|
req_grammar.structured_output_request = Mock(grammar=None)
|
||||||
req_remote.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
req_remote.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
||||||
req_stream.status = RequestStatus.WAITING_FOR_STREAMING_REQ
|
req_stream.status = RequestStatus.WAITING_FOR_STREAMING_REQ
|
||||||
|
|
||||||
for req in (req_fsm, req_remote, req_stream, req_regular, req_tail):
|
for req in (req_grammar, req_remote, req_stream, req_regular, req_tail):
|
||||||
scheduler.add_request(req)
|
scheduler.add_request(req)
|
||||||
scheduler.schedule()
|
scheduler.schedule()
|
||||||
assert list(scheduler.skipped_waiting) == [req_fsm, req_remote, req_stream]
|
assert list(scheduler.skipped_waiting) == [req_grammar, req_remote, req_stream]
|
||||||
|
|
||||||
scheduler.finish_requests(req_regular.request_id, RequestStatus.FINISHED_ABORTED)
|
scheduler.finish_requests(req_regular.request_id, RequestStatus.FINISHED_ABORTED)
|
||||||
assert not scheduler.running
|
assert not scheduler.running
|
||||||
|
|
||||||
req_fsm.structured_output_request = Mock(grammar=object())
|
req_grammar.structured_output_request = Mock(grammar=object())
|
||||||
scheduler.finished_recving_kv_req_ids.add(req_remote.request_id)
|
scheduler.finished_recving_kv_req_ids.add(req_remote.request_id)
|
||||||
req_stream.status = RequestStatus.WAITING
|
req_stream.status = RequestStatus.WAITING
|
||||||
|
|
||||||
second_output = scheduler.schedule()
|
second_output = scheduler.schedule()
|
||||||
expected_order = [
|
expected_order = [
|
||||||
req_fsm.request_id,
|
req_grammar.request_id,
|
||||||
req_remote.request_id,
|
req_remote.request_id,
|
||||||
req_stream.request_id,
|
req_stream.request_id,
|
||||||
req_tail.request_id,
|
req_tail.request_id,
|
||||||
|
|||||||
@@ -6,7 +6,10 @@ from vllm.v1.request import RequestStatus
|
|||||||
def test_request_status_fmt_str():
|
def test_request_status_fmt_str():
|
||||||
"""Test that the string representation of RequestStatus is correct."""
|
"""Test that the string representation of RequestStatus is correct."""
|
||||||
assert f"{RequestStatus.WAITING}" == "WAITING"
|
assert f"{RequestStatus.WAITING}" == "WAITING"
|
||||||
assert f"{RequestStatus.WAITING_FOR_FSM}" == "WAITING_FOR_FSM"
|
assert (
|
||||||
|
f"{RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR}"
|
||||||
|
== "WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR"
|
||||||
|
)
|
||||||
assert f"{RequestStatus.WAITING_FOR_REMOTE_KVS}" == "WAITING_FOR_REMOTE_KVS"
|
assert f"{RequestStatus.WAITING_FOR_REMOTE_KVS}" == "WAITING_FOR_REMOTE_KVS"
|
||||||
assert f"{RequestStatus.WAITING_FOR_STREAMING_REQ}" == "WAITING_FOR_STREAMING_REQ"
|
assert f"{RequestStatus.WAITING_FOR_STREAMING_REQ}" == "WAITING_FOR_STREAMING_REQ"
|
||||||
assert f"{RequestStatus.RUNNING}" == "RUNNING"
|
assert f"{RequestStatus.RUNNING}" == "RUNNING"
|
||||||
|
|||||||
@@ -1550,7 +1550,7 @@ class Scheduler(SchedulerInterface):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _is_blocked_waiting_status(status: RequestStatus) -> bool:
|
def _is_blocked_waiting_status(status: RequestStatus) -> bool:
|
||||||
return status in (
|
return status in (
|
||||||
RequestStatus.WAITING_FOR_FSM,
|
RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR,
|
||||||
RequestStatus.WAITING_FOR_REMOTE_KVS,
|
RequestStatus.WAITING_FOR_REMOTE_KVS,
|
||||||
RequestStatus.WAITING_FOR_STREAMING_REQ,
|
RequestStatus.WAITING_FOR_STREAMING_REQ,
|
||||||
)
|
)
|
||||||
@@ -2084,7 +2084,7 @@ class Scheduler(SchedulerInterface):
|
|||||||
request.status = RequestStatus.WAITING
|
request.status = RequestStatus.WAITING
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if request.status == RequestStatus.WAITING_FOR_FSM:
|
if request.status == RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR:
|
||||||
structured_output_req = request.structured_output_request
|
structured_output_req = request.structured_output_request
|
||||||
if not (structured_output_req and structured_output_req.grammar):
|
if not (structured_output_req and structured_output_req.grammar):
|
||||||
return False
|
return False
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ class Request:
|
|||||||
assert sampling_params.max_tokens is not None
|
assert sampling_params.max_tokens is not None
|
||||||
self.max_tokens = sampling_params.max_tokens
|
self.max_tokens = sampling_params.max_tokens
|
||||||
if self.structured_output_request is not None:
|
if self.structured_output_request is not None:
|
||||||
self.status = RequestStatus.WAITING_FOR_FSM
|
self.status = RequestStatus.WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR
|
||||||
|
|
||||||
if sampling_params.extra_args is not None:
|
if sampling_params.extra_args is not None:
|
||||||
self.kv_transfer_params = sampling_params.extra_args.get(
|
self.kv_transfer_params = sampling_params.extra_args.get(
|
||||||
@@ -296,7 +296,7 @@ class RequestStatus(enum.IntEnum):
|
|||||||
"""Status of a request."""
|
"""Status of a request."""
|
||||||
|
|
||||||
WAITING = enum.auto()
|
WAITING = enum.auto()
|
||||||
WAITING_FOR_FSM = enum.auto()
|
WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR = enum.auto()
|
||||||
WAITING_FOR_REMOTE_KVS = enum.auto()
|
WAITING_FOR_REMOTE_KVS = enum.auto()
|
||||||
WAITING_FOR_STREAMING_REQ = enum.auto()
|
WAITING_FOR_STREAMING_REQ = enum.auto()
|
||||||
RUNNING = enum.auto()
|
RUNNING = enum.auto()
|
||||||
|
|||||||
@@ -42,7 +42,8 @@ class StructuredOutputManager:
|
|||||||
|
|
||||||
# When in external_launcher mode, async grammar compilation causes deadlocks
|
# When in external_launcher mode, async grammar compilation causes deadlocks
|
||||||
# due to external_launcher mode having a scheduler for each TP rank.
|
# due to external_launcher mode having a scheduler for each TP rank.
|
||||||
# Async grammar compilation causes the WAITING_FOR_FSM → WAITING transition to
|
# Async grammar compilation causes the
|
||||||
|
# WAITING_FOR_STRUCTURED_OUTPUT_GRAMMAR → WAITING transition to
|
||||||
# happen at different times on different TP ranks,
|
# happen at different times on different TP ranks,
|
||||||
# breaking the determinism assumption that external_launcher relies on.
|
# breaking the determinism assumption that external_launcher relies on.
|
||||||
self._use_async_grammar_compilation = (
|
self._use_async_grammar_compilation = (
|
||||||
|
|||||||
Reference in New Issue
Block a user