[V0 Deprecation] Remove VLLM_USE_V1 from tests (#26341)
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
This commit is contained in:
@@ -46,188 +46,184 @@ def make_request() -> EngineCoreRequest:
|
||||
|
||||
|
||||
@create_new_process_for_each_test()
|
||||
def test_engine_core(monkeypatch: pytest.MonkeyPatch):
|
||||
with monkeypatch.context() as m:
|
||||
m.setenv("VLLM_USE_V1", "1")
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
def test_engine_core():
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
"""Test basic request lifecycle."""
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
"""Test basic request lifecycle."""
|
||||
|
||||
# First request.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
# First request.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
|
||||
# Second request.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
# Second request.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
# Add two requests in a row.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 2
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
# Add two requests in a row.
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(make_request()))
|
||||
assert len(engine_core.scheduler.waiting) == 2
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 4
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 4
|
||||
|
||||
# Loop through until they are all done.
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
# Loop through until they are all done.
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
"""Test abort cycle."""
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
"""Test abort cycle."""
|
||||
|
||||
# Basic abort.
|
||||
req = make_request()
|
||||
request_id = req.request_id
|
||||
# Basic abort.
|
||||
req = make_request()
|
||||
request_id = req.request_id
|
||||
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
assert engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
assert engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
assert engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 1
|
||||
assert engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
|
||||
engine_core.abort_requests([request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
assert not engine_core.scheduler.has_unfinished_requests()
|
||||
assert engine_core.scheduler.has_finished_requests()
|
||||
engine_core.abort_requests([request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
assert not engine_core.scheduler.has_unfinished_requests()
|
||||
assert engine_core.scheduler.has_finished_requests()
|
||||
|
||||
_ = engine_core.step()
|
||||
assert not engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
_ = engine_core.step()
|
||||
assert not engine_core.scheduler.has_unfinished_requests()
|
||||
assert not engine_core.scheduler.has_finished_requests()
|
||||
|
||||
# Add, step, abort 1 of the 3.
|
||||
req0 = make_request()
|
||||
req1 = make_request()
|
||||
req2 = make_request()
|
||||
# Add, step, abort 1 of the 3.
|
||||
req0 = make_request()
|
||||
req1 = make_request()
|
||||
req2 = make_request()
|
||||
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
assert len(engine_core.scheduler.waiting) == 2
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
assert len(engine_core.scheduler.waiting) == 2
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req2))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req2))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 3
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 3
|
||||
|
||||
# Abort just one.
|
||||
engine_core.abort_requests([req1.request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
# Abort just one.
|
||||
engine_core.abort_requests([req1.request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
_ = engine_core.step()
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 2
|
||||
|
||||
# Abort the other requests at the same time.
|
||||
engine_core.abort_requests([req2.request_id, req0.request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
# Abort the other requests at the same time.
|
||||
engine_core.abort_requests([req2.request_id, req0.request_id])
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
# Sending duplicate requests with same request_id
|
||||
req0 = make_request()
|
||||
req1 = make_request()
|
||||
req0.request_id = req1.request_id = "test"
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
# Sending duplicate requests with same request_id
|
||||
req0 = make_request()
|
||||
req1 = make_request()
|
||||
req0.request_id = req1.request_id = "test"
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
|
||||
@create_new_process_for_each_test()
|
||||
def test_engine_core_advanced_sampling(monkeypatch: pytest.MonkeyPatch):
|
||||
def test_engine_core_advanced_sampling():
|
||||
"""
|
||||
A basic end-to-end test to verify that the engine functions correctly
|
||||
when additional sampling parameters, such as top_p, min_tokens, and
|
||||
presence_penalty, are set.
|
||||
"""
|
||||
with monkeypatch.context() as m:
|
||||
m.setenv("VLLM_USE_V1", "1")
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
"""Test basic request lifecycle."""
|
||||
# First request.
|
||||
request: EngineCoreRequest = make_request()
|
||||
request.sampling_params = SamplingParams(
|
||||
min_tokens=4,
|
||||
presence_penalty=1.0,
|
||||
frequency_penalty=1.0,
|
||||
repetition_penalty=0.1,
|
||||
stop_token_ids=[1001, 1002],
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(request))
|
||||
"""Test basic request lifecycle."""
|
||||
# First request.
|
||||
request: EngineCoreRequest = make_request()
|
||||
request.sampling_params = SamplingParams(
|
||||
min_tokens=4,
|
||||
presence_penalty=1.0,
|
||||
frequency_penalty=1.0,
|
||||
repetition_penalty=0.1,
|
||||
stop_token_ids=[1001, 1002],
|
||||
)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(request))
|
||||
|
||||
def _check_engine_state():
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
# Loop through until they are all done.
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
def _check_engine_state():
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
# Loop through until they are all done.
|
||||
while (outs := engine_core.step()[0].get(0)) and outs.outputs:
|
||||
pass
|
||||
assert len(engine_core.scheduler.waiting) == 0
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
_check_engine_state()
|
||||
_check_engine_state()
|
||||
|
||||
# Second request.
|
||||
request2 = make_request()
|
||||
request2.sampling_params = SamplingParams(
|
||||
top_p=0.99,
|
||||
top_k=50,
|
||||
)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(request2))
|
||||
_check_engine_state()
|
||||
# Second request.
|
||||
request2 = make_request()
|
||||
request2.sampling_params = SamplingParams(
|
||||
top_p=0.99,
|
||||
top_k=50,
|
||||
)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(request2))
|
||||
_check_engine_state()
|
||||
|
||||
|
||||
@create_new_process_for_each_test()
|
||||
def test_engine_core_concurrent_batches(monkeypatch: pytest.MonkeyPatch):
|
||||
def test_engine_core_concurrent_batches():
|
||||
"""
|
||||
Test that the engine can handle multiple concurrent batches.
|
||||
"""
|
||||
@@ -272,173 +268,163 @@ def test_engine_core_concurrent_batches(monkeypatch: pytest.MonkeyPatch):
|
||||
if hasattr(self, "thread_pool"):
|
||||
self.thread_pool.shutdown(wait=False)
|
||||
|
||||
with monkeypatch.context() as m:
|
||||
m.setenv("VLLM_USE_V1", "1")
|
||||
|
||||
engine_args = EngineArgs(
|
||||
model=MODEL_NAME,
|
||||
# To test concurrent batches.
|
||||
max_num_seqs=2,
|
||||
# Avoid all requests being scheduled once.
|
||||
enable_prefix_caching=False,
|
||||
max_num_batched_tokens=10,
|
||||
# Reduce startup time.
|
||||
enforce_eager=True,
|
||||
engine_args = EngineArgs(
|
||||
model=MODEL_NAME,
|
||||
# To test concurrent batches.
|
||||
max_num_seqs=2,
|
||||
# Avoid all requests being scheduled once.
|
||||
enable_prefix_caching=False,
|
||||
max_num_batched_tokens=10,
|
||||
# Reduce startup time.
|
||||
enforce_eager=True,
|
||||
)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, log_stats=False, executor_class=DummyExecutor
|
||||
)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, log_stats=False, executor_class=DummyExecutor
|
||||
assert engine_core.batch_queue is not None
|
||||
|
||||
# Add two requests in a row. Each request have 12 prompt tokens.
|
||||
req0 = make_request_with_max_tokens("0", 5)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
req1 = make_request_with_max_tokens("1", 5)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
|
||||
# Schedule Batch 1: (10, req0)
|
||||
assert engine_core.step_with_batch_queue()[0] is None
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 10
|
||||
# num_computed_tokens should have been updated immediately.
|
||||
assert engine_core.scheduler.requests[req0.request_id].num_computed_tokens == 10
|
||||
|
||||
# Schedule Batch 2: (2, req0), (8, req1)
|
||||
assert engine_core.step_with_batch_queue()[0] == {}
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 2
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 8
|
||||
# num_computed_tokens should have been updated immediately.
|
||||
assert engine_core.scheduler.requests["0"].num_computed_tokens == 12
|
||||
assert engine_core.scheduler.requests["1"].num_computed_tokens == 8
|
||||
|
||||
assert engine_core.scheduler.get_num_unfinished_requests() == 2
|
||||
|
||||
# Finish Batch 1 and schedule Batch 3: (4, req1).
|
||||
# Note that req0 cannot be scheduled
|
||||
# because it is in the decoding stage now.
|
||||
engine_core.step_with_batch_queue()
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 4
|
||||
|
||||
# Finish Batch 2. Get first token of req0.
|
||||
# Schedule Batch 4: (1, req0).
|
||||
output = engine_core.step_with_batch_queue()[0].get(0)
|
||||
assert output is not None
|
||||
assert len(output.outputs) == 1
|
||||
assert engine_core.scheduler.requests[req0.request_id].num_tokens == 13
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 1
|
||||
|
||||
# Finish Batch 3. Get first token of req1. Schedule Batch 5: (1, req1).
|
||||
output = engine_core.step_with_batch_queue()[0].get(0)
|
||||
assert output is not None
|
||||
assert len(output.outputs) == 1
|
||||
assert engine_core.scheduler.requests[req1.request_id].num_tokens == 13
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 1
|
||||
|
||||
# Loop until req0 is finished.
|
||||
req_id = 0
|
||||
expected_num_tokens = [
|
||||
engine_core.scheduler.requests["0"].num_tokens + 1,
|
||||
engine_core.scheduler.requests["1"].num_tokens + 1,
|
||||
]
|
||||
while engine_core.scheduler.get_num_unfinished_requests() == 2:
|
||||
output = engine_core.step_with_batch_queue()[0]
|
||||
# Every step consumes an output.
|
||||
assert output is not None
|
||||
assert len(output[0].outputs) == 1
|
||||
if req_id in engine_core.scheduler.requests:
|
||||
assert (
|
||||
engine_core.scheduler.requests[req_id].num_tokens
|
||||
== expected_num_tokens[req_id]
|
||||
)
|
||||
assert engine_core.batch_queue is not None
|
||||
|
||||
# Add two requests in a row. Each request have 12 prompt tokens.
|
||||
req0 = make_request_with_max_tokens("0", 5)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req0))
|
||||
req1 = make_request_with_max_tokens("1", 5)
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(req1))
|
||||
|
||||
# Schedule Batch 1: (10, req0)
|
||||
assert engine_core.step_with_batch_queue()[0] is None
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 10
|
||||
# num_computed_tokens should have been updated immediately.
|
||||
assert engine_core.scheduler.requests[req0.request_id].num_computed_tokens == 10
|
||||
|
||||
# Schedule Batch 2: (2, req0), (8, req1)
|
||||
assert engine_core.step_with_batch_queue()[0] == {}
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 2
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 8
|
||||
# num_computed_tokens should have been updated immediately.
|
||||
assert engine_core.scheduler.requests["0"].num_computed_tokens == 12
|
||||
assert engine_core.scheduler.requests["1"].num_computed_tokens == 8
|
||||
|
||||
assert engine_core.scheduler.get_num_unfinished_requests() == 2
|
||||
|
||||
# Finish Batch 1 and schedule Batch 3: (4, req1).
|
||||
# Note that req0 cannot be scheduled
|
||||
# because it is in the decoding stage now.
|
||||
engine_core.step_with_batch_queue()
|
||||
assert len(engine_core.batch_queue) == 1
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 4
|
||||
|
||||
# Finish Batch 2. Get first token of req0.
|
||||
# Schedule Batch 4: (1, req0).
|
||||
output = engine_core.step_with_batch_queue()[0].get(0)
|
||||
assert output is not None
|
||||
assert len(output.outputs) == 1
|
||||
assert engine_core.scheduler.requests[req0.request_id].num_tokens == 13
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["0"] == 1
|
||||
|
||||
# Finish Batch 3. Get first token of req1. Schedule Batch 5: (1, req1).
|
||||
output = engine_core.step_with_batch_queue()[0].get(0)
|
||||
assert output is not None
|
||||
assert len(output.outputs) == 1
|
||||
assert engine_core.scheduler.requests[req1.request_id].num_tokens == 13
|
||||
scheduler_output = engine_core.batch_queue[-1][1]
|
||||
assert scheduler_output.num_scheduled_tokens["1"] == 1
|
||||
|
||||
# Loop until req0 is finished.
|
||||
req_id = 0
|
||||
expected_num_tokens = [
|
||||
engine_core.scheduler.requests["0"].num_tokens + 1,
|
||||
engine_core.scheduler.requests["1"].num_tokens + 1,
|
||||
]
|
||||
while engine_core.scheduler.get_num_unfinished_requests() == 2:
|
||||
output = engine_core.step_with_batch_queue()[0]
|
||||
# Every step consumes an output.
|
||||
assert output is not None
|
||||
assert len(output[0].outputs) == 1
|
||||
if req_id in engine_core.scheduler.requests:
|
||||
assert (
|
||||
engine_core.scheduler.requests[req_id].num_tokens
|
||||
== expected_num_tokens[req_id]
|
||||
)
|
||||
expected_num_tokens[req_id] += 1
|
||||
req_id = (req_id + 1) % 2
|
||||
expected_num_tokens[req_id] += 1
|
||||
req_id = (req_id + 1) % 2
|
||||
|
||||
|
||||
@multi_gpu_test(num_gpus=2)
|
||||
def test_engine_core_tp(monkeypatch: pytest.MonkeyPatch):
|
||||
def test_engine_core_tp():
|
||||
"""
|
||||
Test engine can initialize worker in tp properly
|
||||
"""
|
||||
|
||||
with monkeypatch.context() as m:
|
||||
m.setenv("VLLM_USE_V1", "1")
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(
|
||||
model=MODEL_NAME,
|
||||
tensor_parallel_size=2,
|
||||
# Reduce startup time.
|
||||
enforce_eager=True,
|
||||
)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
"""Setup the EngineCore."""
|
||||
engine_args = EngineArgs(
|
||||
model=MODEL_NAME,
|
||||
tensor_parallel_size=2,
|
||||
# Reduce startup time.
|
||||
enforce_eager=True,
|
||||
)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
|
||||
def get_worker_cache_config_field(worker, key: str):
|
||||
return getattr(worker.cache_config, key)
|
||||
|
||||
num_gpu_blocks = engine_core.collective_rpc(
|
||||
get_worker_cache_config_field, args=("num_gpu_blocks",)
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
num_cpu_blocks = engine_core.collective_rpc(
|
||||
get_worker_cache_config_field, args=("num_cpu_blocks",)
|
||||
)
|
||||
assert all(x is not None for x in num_gpu_blocks)
|
||||
assert all(x is not None for x in num_cpu_blocks)
|
||||
|
||||
def get_worker_cache_config_field(worker, key: str):
|
||||
return getattr(worker.cache_config, key)
|
||||
|
||||
num_gpu_blocks = engine_core.collective_rpc(
|
||||
get_worker_cache_config_field, args=("num_gpu_blocks",)
|
||||
)
|
||||
num_cpu_blocks = engine_core.collective_rpc(
|
||||
get_worker_cache_config_field, args=("num_cpu_blocks",)
|
||||
)
|
||||
assert all(x is not None for x in num_gpu_blocks)
|
||||
assert all(x is not None for x in num_cpu_blocks)
|
||||
|
||||
|
||||
@create_new_process_for_each_test()
|
||||
def test_engine_core_invalid_request_id_type(monkeypatch: pytest.MonkeyPatch):
|
||||
def test_engine_core_invalid_request_id_type():
|
||||
"""Test that engine raises TypeError for non-string request_id."""
|
||||
with monkeypatch.context() as m:
|
||||
m.setenv("VLLM_USE_V1", "1")
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
|
||||
engine_args = EngineArgs(model=MODEL_NAME)
|
||||
vllm_config = engine_args.create_engine_config()
|
||||
executor_class = Executor.get_class(vllm_config)
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
|
||||
with set_default_torch_num_threads(1):
|
||||
engine_core = EngineCore(
|
||||
vllm_config=vllm_config, executor_class=executor_class, log_stats=True
|
||||
)
|
||||
# Test with UUID object (common mistake)
|
||||
uuid_request = make_request()
|
||||
uuid_request.request_id = uuid.uuid4() # UUID object instead of string
|
||||
|
||||
# Test with UUID object (common mistake)
|
||||
uuid_request = make_request()
|
||||
uuid_request.request_id = uuid.uuid4() # UUID object instead of string
|
||||
with pytest.raises(TypeError, match="request_id must be a string, got.*UUID"):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(uuid_request))
|
||||
|
||||
with pytest.raises(TypeError, match="request_id must be a string, got.*UUID"):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(uuid_request))
|
||||
# Test with integer
|
||||
int_request = make_request()
|
||||
int_request.request_id = 12345
|
||||
|
||||
# Test with integer
|
||||
int_request = make_request()
|
||||
int_request.request_id = 12345
|
||||
with pytest.raises(TypeError, match="request_id must be a string, got.*int"):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(int_request))
|
||||
|
||||
with pytest.raises(TypeError, match="request_id must be a string, got.*int"):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(int_request))
|
||||
# Test with None
|
||||
none_request = make_request()
|
||||
none_request.request_id = None
|
||||
|
||||
# Test with None
|
||||
none_request = make_request()
|
||||
none_request.request_id = None
|
||||
with pytest.raises(TypeError, match="request_id must be a string, got.*NoneType"):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(none_request))
|
||||
|
||||
with pytest.raises(
|
||||
TypeError, match="request_id must be a string, got.*NoneType"
|
||||
):
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(none_request))
|
||||
|
||||
# Verify engine is still functional after errors
|
||||
valid_request = make_request()
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(valid_request))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
# Verify engine is still functional after errors
|
||||
valid_request = make_request()
|
||||
engine_core.add_request(*engine_core.preprocess_add_request(valid_request))
|
||||
assert len(engine_core.scheduler.waiting) == 1
|
||||
assert len(engine_core.scheduler.running) == 0
|
||||
|
||||
Reference in New Issue
Block a user