Fix EventPublisherFactory logic for disabled KV cache events (#27419)
Signed-off-by: Bradley <bradley.b.pitt@gmail.com>
This commit is contained in:
@@ -263,3 +263,52 @@ def test_data_parallel_rank_tagging(publisher_config):
|
|||||||
pub_1.shutdown()
|
pub_1.shutdown()
|
||||||
sub_0.close()
|
sub_0.close()
|
||||||
sub_1.close()
|
sub_1.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_event_publisher_factory():
|
||||||
|
"""Test event publisher factory creation behavior under different configurations"""
|
||||||
|
from vllm.config.kv_events import KVEventsConfig
|
||||||
|
from vllm.distributed.kv_events import ZmqEventPublisher
|
||||||
|
|
||||||
|
# test config is None
|
||||||
|
publisher = EventPublisherFactory.create(None, DP_RANK)
|
||||||
|
assert isinstance(publisher, NullEventPublisher)
|
||||||
|
publisher.shutdown()
|
||||||
|
|
||||||
|
# test disable kv cache events
|
||||||
|
config = KVEventsConfig(
|
||||||
|
enable_kv_cache_events=False,
|
||||||
|
publisher="zmq", # Even if zmq is specified, should return NullEventPublisher
|
||||||
|
endpoint="tcp://localhost:5557",
|
||||||
|
)
|
||||||
|
publisher = EventPublisherFactory.create(config, DP_RANK)
|
||||||
|
assert isinstance(publisher, NullEventPublisher)
|
||||||
|
publisher.shutdown()
|
||||||
|
|
||||||
|
# test zmq publisher
|
||||||
|
config = KVEventsConfig(
|
||||||
|
enable_kv_cache_events=True,
|
||||||
|
publisher="zmq",
|
||||||
|
endpoint="inproc://test-factory-true",
|
||||||
|
)
|
||||||
|
publisher = EventPublisherFactory.create(config, DP_RANK)
|
||||||
|
assert isinstance(publisher, ZmqEventPublisher)
|
||||||
|
publisher.shutdown()
|
||||||
|
|
||||||
|
# test unknown publisher
|
||||||
|
with pytest.raises(ValueError, match="Input should be"):
|
||||||
|
KVEventsConfig(
|
||||||
|
enable_kv_cache_events=True,
|
||||||
|
publisher="unknown_publisher",
|
||||||
|
endpoint="tcp://localhost:5557",
|
||||||
|
)
|
||||||
|
|
||||||
|
# test publisher not specified
|
||||||
|
config = KVEventsConfig(
|
||||||
|
enable_kv_cache_events=True,
|
||||||
|
# publisher not specified, should default to "zmq"
|
||||||
|
endpoint="tcp://localhost:5557",
|
||||||
|
)
|
||||||
|
publisher = EventPublisherFactory.create(config, DP_RANK)
|
||||||
|
assert isinstance(publisher, ZmqEventPublisher)
|
||||||
|
publisher.shutdown()
|
||||||
|
|||||||
@@ -353,7 +353,11 @@ class EventPublisherFactory:
|
|||||||
cls, config: KVEventsConfig | None, data_parallel_rank: int = 0
|
cls, config: KVEventsConfig | None, data_parallel_rank: int = 0
|
||||||
) -> EventPublisher:
|
) -> EventPublisher:
|
||||||
"""Create publisher from a config mapping."""
|
"""Create publisher from a config mapping."""
|
||||||
if config is None or config.publisher == "null":
|
if (
|
||||||
|
config is None
|
||||||
|
or not config.enable_kv_cache_events
|
||||||
|
or config.publisher == "null"
|
||||||
|
):
|
||||||
return NullEventPublisher()
|
return NullEventPublisher()
|
||||||
|
|
||||||
config_dict = asdict(config)
|
config_dict = asdict(config)
|
||||||
|
|||||||
Reference in New Issue
Block a user