diff --git a/tests/v1/kv_offload/test_cpu_manager.py b/tests/v1/kv_offload/test_cpu_manager.py index eea0367bf..a9a8e21d6 100644 --- a/tests/v1/kv_offload/test_cpu_manager.py +++ b/tests/v1/kv_offload/test_cpu_manager.py @@ -15,6 +15,7 @@ from vllm.v1.kv_offload.abstract import ( from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy from vllm.v1.kv_offload.mediums import CPULoadStoreSpec +from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager @dataclass @@ -243,335 +244,300 @@ def test_cpu_manager(): ) -def test_arc_manager_basic(): - """ - Tests CPUOffloadingManager with arc policy. - Verifies that ARC handles store, load, and lookup operations correctly. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) +class TestARCPolicy: + """Unit tests for CPUOffloadingManager with ARC eviction policy.""" - # prepare store [1, 2] - prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2])) - verify_store_output( - prepare_store_output, - ExpectedPrepareStoreOutput( - block_hashes_to_store=[1, 2], - store_block_ids=[0, 1], - block_hashes_evicted=[], - ), - ) + def _make_manager( + self, num_blocks: int = 4, enable_events: bool = True + ) -> tuple[CPUOffloadingManager, ARCCachePolicy]: + manager = CPUOffloadingManager( + block_size=256, + num_blocks=num_blocks, + cache_policy="arc", + enable_events=enable_events, + ) + policy = manager._policy + assert isinstance(policy, ARCCachePolicy) + return manager, policy - # lookup [1, 2] -> not ready - assert arc_manager.lookup(to_hashes([1, 2])) == 0 + def test_basic(self): + """ + Tests CPUOffloadingManager with arc policy. + Verifies that ARC handles store, load, and lookup operations correctly. + """ + cpu_manager, arc_policy = self._make_manager() - # no events so far - assert list(arc_manager.take_events()) == [] + # prepare store [1, 2] + prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2])) + verify_store_output( + prepare_store_output, + ExpectedPrepareStoreOutput( + block_hashes_to_store=[1, 2], + store_block_ids=[0, 1], + block_hashes_evicted=[], + ), + ) - # complete store [1, 2] - arc_manager.complete_store(to_hashes([1, 2])) - verify_events( - arc_manager.take_events(), block_size=block_size, expected_stores=({1, 2},) - ) + # lookup [1, 2] -> not ready + assert cpu_manager.lookup(to_hashes([1, 2])) == 0 - # lookup [1, 2] - assert arc_manager.lookup(to_hashes([1])) == 1 - assert arc_manager.lookup(to_hashes([1, 2])) == 2 - assert arc_manager.lookup(to_hashes([1, 2, 3])) == 2 + # no events so far + assert list(cpu_manager.take_events()) == [] - # blocks should be in T1 (recent) - assert len(arc_policy.t1) == 2 - assert len(arc_policy.t2) == 0 + # complete store [1, 2] + cpu_manager.complete_store(to_hashes([1, 2])) + verify_events( + cpu_manager.take_events(), block_size=256, expected_stores=({1, 2},) + ) + # lookup [1, 2] + assert cpu_manager.lookup(to_hashes([1])) == 1 + assert cpu_manager.lookup(to_hashes([1, 2])) == 2 + assert cpu_manager.lookup(to_hashes([1, 2, 3])) == 2 -def test_arc_manager_t1_to_t2_promotion(): - """ - Tests that accessing a block in T1 promotes it to T2 (frequent). - This is a key feature of ARC's adaptive behavior. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # blocks should be in T1 (recent) + assert len(arc_policy.t1) == 2 + assert len(arc_policy.t2) == 0 - # store and complete block 1 - arc_manager.prepare_store(to_hashes([1])) - arc_manager.complete_store(to_hashes([1])) + def test_t1_to_t2_promotion(self): + """ + Tests that accessing a block in T1 promotes it to T2 (frequent). + This is a key feature of ARC's adaptive behavior. + """ + cpu_manager, arc_policy = self._make_manager(enable_events=False) - # block 1 starts in T1 (recent) - assert to_hashes([1])[0] in arc_policy.t1 - assert to_hashes([1])[0] not in arc_policy.t2 + # store and complete block 1 + cpu_manager.prepare_store(to_hashes([1])) + cpu_manager.complete_store(to_hashes([1])) - # touch block 1 (simulate second access) - arc_manager.touch(to_hashes([1])) + # block 1 starts in T1 (recent) + assert to_hashes([1])[0] in arc_policy.t1 + assert to_hashes([1])[0] not in arc_policy.t2 - # block 1 should now be in T2 (frequent) - assert to_hashes([1])[0] not in arc_policy.t1 - assert to_hashes([1])[0] in arc_policy.t2 + # touch block 1 (simulate second access) + cpu_manager.touch(to_hashes([1])) + # block 1 should now be in T2 (frequent) + assert to_hashes([1])[0] not in arc_policy.t1 + assert to_hashes([1])[0] in arc_policy.t2 -def test_arc_manager_eviction_with_load(): - """ - Tests ARC eviction behavior similar to LRU test. - Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True - ) + def test_eviction_with_load(self): + """ + Tests ARC eviction behavior similar to LRU test. + Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted. + """ + cpu_manager, _ = self._make_manager() - # prepare and complete store [1, 2, 3, 4] - prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) - verify_store_output( - prepare_store_output, - ExpectedPrepareStoreOutput( - block_hashes_to_store=[1, 2, 3, 4], - store_block_ids=[0, 1, 2, 3], - block_hashes_evicted=[], - ), - ) - arc_manager.complete_store(to_hashes([1, 2, 3, 4])) + # prepare and complete store [1, 2, 3, 4] + prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2, 3, 4])) + verify_store_output( + prepare_store_output, + ExpectedPrepareStoreOutput( + block_hashes_to_store=[1, 2, 3, 4], + store_block_ids=[0, 1, 2, 3], + block_hashes_evicted=[], + ), + ) + cpu_manager.complete_store(to_hashes([1, 2, 3, 4])) - # prepare load [2, 3] (increases ref_cnt) - prepare_load_output = arc_manager.prepare_load(to_hashes([2, 3])) - verify_load_output(prepare_load_output, [1, 2]) + # prepare load [2, 3] (increases ref_cnt) + prepare_load_output = cpu_manager.prepare_load(to_hashes([2, 3])) + verify_load_output(prepare_load_output, [1, 2]) - # prepare store [5, 6, 7] with [2, 3] being loaded - # should fail because [2, 3] have ref_cnt > 0 - assert arc_manager.prepare_store(to_hashes([5, 6, 7])) is None + # prepare store [5, 6, 7] with [2, 3] being loaded + # should fail because [2, 3] have ref_cnt > 0 + assert cpu_manager.prepare_store(to_hashes([5, 6, 7])) is None - # complete load [2, 3] - arc_manager.complete_load(to_hashes([2, 3])) + # complete load [2, 3] + cpu_manager.complete_load(to_hashes([2, 3])) - # now prepare store [5, 6, 7] should succeed - # ARC will evict blocks one at a time from T1 as needed - prepare_store_output = arc_manager.prepare_store(to_hashes([5, 6, 7])) - assert prepare_store_output is not None - # Should successfully evict enough blocks to make room (at least 1) - assert len(prepare_store_output.block_hashes_evicted) >= 1 + # now prepare store [5, 6, 7] should succeed + # ARC will evict blocks one at a time from T1 as needed + prepare_store_output = cpu_manager.prepare_store(to_hashes([5, 6, 7])) + assert prepare_store_output is not None + # Should successfully evict enough blocks to make room (at least 1) + assert len(prepare_store_output.block_hashes_evicted) >= 1 + def test_adaptive_target(self): + """ + Tests ARC's adaptive target adjustment via ghost lists. + When a block in B1 (ghost list) is accessed, target_t1_size increases. + When a block in B2 is accessed, target_t1_size decreases. + """ + cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False) -def test_arc_manager_adaptive_target(): - """ - Tests ARC's adaptive target adjustment via ghost lists. - When a block in B1 (ghost list) is accessed, target_t1_size increases. - When a block in B2 is accessed, target_t1_size decreases. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # store blocks 1, 2 (fills cache) + cpu_manager.prepare_store(to_hashes([1, 2])) + cpu_manager.complete_store(to_hashes([1, 2])) - # store blocks 1, 2 (fills cache) - arc_manager.prepare_store(to_hashes([1, 2])) - arc_manager.complete_store(to_hashes([1, 2])) + initial_target = arc_policy.target_t1_size - initial_target = arc_policy.target_t1_size + # store block 3, evicting block 1 (moves to B1 ghost list) + cpu_manager.prepare_store(to_hashes([3])) + cpu_manager.complete_store(to_hashes([3])) - # store block 3, evicting block 1 (moves to B1 ghost list) - arc_manager.prepare_store(to_hashes([3])) - arc_manager.complete_store(to_hashes([3])) + # block 1 should be in B1 (ghost list) + assert to_hashes([1])[0] in arc_policy.b1 - # block 1 should be in B1 (ghost list) - assert to_hashes([1])[0] in arc_policy.b1 + # touch block 1 (cache miss, but in B1) + # this should increase target_t1_size (favor recency) + cpu_manager.touch(to_hashes([1])) - # touch block 1 (cache miss, but in B1) - # this should increase target_t1_size (favor recency) - arc_manager.touch(to_hashes([1])) + # target should have increased + assert arc_policy.target_t1_size > initial_target - # target should have increased - assert arc_policy.target_t1_size > initial_target + def test_t1_t2_eviction_policy(self): + """ + Tests that ARC evicts from T1 or T2 based on target_t1_size. + If |T1| >= target_t1_size, evict from T1, otherwise from T2. + """ + cpu_manager, arc_policy = self._make_manager(enable_events=False) + # store blocks 1, 2, 3, 4 + cpu_manager.prepare_store(to_hashes([1, 2, 3, 4])) + cpu_manager.complete_store(to_hashes([1, 2, 3, 4])) -def test_arc_manager_t1_t2_eviction_policy(): - """ - Tests that ARC evicts from T1 or T2 based on target_t1_size. - If |T1| >= target_t1_size, evict from T1, otherwise from T2. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # promote blocks 3, 4 to T2 by touching them + cpu_manager.touch(to_hashes([3, 4])) - # store blocks 1, 2, 3, 4 - arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) - arc_manager.complete_store(to_hashes([1, 2, 3, 4])) + # now: T1 = {1, 2}, T2 = {3, 4} + assert len(arc_policy.t1) == 2 + assert len(arc_policy.t2) == 2 - # promote blocks 3, 4 to T2 by touching them - arc_manager.touch(to_hashes([3, 4])) + # set target_t1_size to prefer evicting from T1 + # (when |T1| >= target, evict from T1) + arc_policy.target_t1_size = 1 - # now: T1 = {1, 2}, T2 = {3, 4} - assert len(arc_policy.t1) == 2 - assert len(arc_policy.t2) == 2 + # store block 5, should evict from T1 (block 1, LRU in T1) + output = cpu_manager.prepare_store(to_hashes([5])) + assert output is not None + assert to_hashes([1]) == output.block_hashes_evicted - # set target_t1_size to prefer evicting from T1 - # (when |T1| >= target, evict from T1) - arc_policy.target_t1_size = 1 + cpu_manager.complete_store(to_hashes([5])) - # store block 5, should evict from T1 (block 1, LRU in T1) - output = arc_manager.prepare_store(to_hashes([5])) - assert output is not None - assert to_hashes([1]) == output.block_hashes_evicted + # block 1 should be in B1 (ghost list) + assert to_hashes([1])[0] in arc_policy.b1 + # block 5 should be in T1 + assert to_hashes([5])[0] in arc_policy.t1 - arc_manager.complete_store(to_hashes([5])) + def test_ghost_list_bounds(self): + """ + Tests that ghost lists (B1, B2) don't grow unbounded. + They should be capped at cache_capacity. + """ + cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False) - # block 1 should be in B1 (ghost list) - assert to_hashes([1])[0] in arc_policy.b1 - # block 5 should be in T1 - assert to_hashes([5])[0] in arc_policy.t1 + # fill cache with blocks 1, 2 + cpu_manager.prepare_store(to_hashes([1, 2])) + cpu_manager.complete_store(to_hashes([1, 2])) + # store many blocks to fill ghost lists + for i in range(3, 20): + cpu_manager.prepare_store(to_hashes([i])) + cpu_manager.complete_store(to_hashes([i])) -def test_arc_manager_ghost_list_bounds(): - """ - Tests that ghost lists (B1, B2) don't grow unbounded. - They should be capped at cache_capacity. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # ghost lists should not exceed cache_capacity + assert len(arc_policy.b1) <= arc_policy.cache_capacity + assert len(arc_policy.b2) <= arc_policy.cache_capacity - # fill cache with blocks 1, 2 - arc_manager.prepare_store(to_hashes([1, 2])) - arc_manager.complete_store(to_hashes([1, 2])) + def test_touch_ordering(self): + """ + Tests that touch() correctly updates access patterns. + Similar to LRU test but verifies T1/T2 ordering. + """ + cpu_manager, arc_policy = self._make_manager() - # store many blocks to fill ghost lists - for i in range(3, 20): - arc_manager.prepare_store(to_hashes([i])) - arc_manager.complete_store(to_hashes([i])) + # store blocks 1, 2, 3, 4 + cpu_manager.prepare_store(to_hashes([1, 2, 3, 4])) + cpu_manager.complete_store(to_hashes([1, 2, 3, 4])) - # ghost lists should not exceed cache_capacity - assert len(arc_policy.b1) <= arc_policy.cache_capacity - assert len(arc_policy.b2) <= arc_policy.cache_capacity + # promote 3, 4 to T2 + cpu_manager.touch(to_hashes([3, 4])) + # T1 = {1, 2}, T2 = {3, 4} + # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2 + cpu_manager.touch(to_hashes([1, 3, 4])) -def test_arc_manager_touch_ordering(): - """ - Tests that touch() correctly updates access patterns. - Similar to LRU test but verifies T1/T2 ordering. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent) + assert len(arc_policy.t1) == 1 + assert len(arc_policy.t2) == 3 - # store blocks 1, 2, 3, 4 - arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) - arc_manager.complete_store(to_hashes([1, 2, 3, 4])) + # store block 5, should evict from T1 (block 2, only one in T1) + prepare_store_output = cpu_manager.prepare_store(to_hashes([5])) + verify_store_output( + prepare_store_output, + ExpectedPrepareStoreOutput( + block_hashes_to_store=[5], + store_block_ids=[1], # reuses block 2's storage + block_hashes_evicted=[2], + ), + ) - # promote 3, 4 to T2 - arc_manager.touch(to_hashes([3, 4])) + def test_failed_store(self): + """ + Tests that failed store operations clean up correctly. + Similar to LRU test but for ARC. + """ + cpu_manager, arc_policy = self._make_manager() - # T1 = {1, 2}, T2 = {3, 4} - # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2 - arc_manager.touch(to_hashes([1, 3, 4])) + # store blocks 1, 2, 3, 4 + cpu_manager.prepare_store(to_hashes([1, 2, 3, 4])) + cpu_manager.complete_store(to_hashes([1, 2, 3, 4])) - # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent) - assert len(arc_policy.t1) == 1 - assert len(arc_policy.t2) == 3 + # prepare store block 5 (will evict block 1) + prepare_store_output = cpu_manager.prepare_store(to_hashes([5])) + assert prepare_store_output is not None + assert len(prepare_store_output.block_hashes_evicted) == 1 - # store block 5, should evict from T1 (block 2, only one in T1) - prepare_store_output = arc_manager.prepare_store(to_hashes([5])) - verify_store_output( - prepare_store_output, - ExpectedPrepareStoreOutput( - block_hashes_to_store=[5], - store_block_ids=[1], # reuses block 2's storage - block_hashes_evicted=[2], - ), - ) + # complete store with failure + cpu_manager.complete_store(to_hashes([5]), success=False) + # block 5 should not be in cache + assert cpu_manager.lookup(to_hashes([5])) == 0 + # block 5 should not be in T1 or T2 + assert to_hashes([5])[0] not in arc_policy.t1 + assert to_hashes([5])[0] not in arc_policy.t2 -def test_arc_manager_failed_store(): - """ - Tests that failed store operations clean up correctly. - Similar to LRU test but for ARC. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # evicted block should still be gone (in B1 ghost list) + evicted_hash = prepare_store_output.block_hashes_evicted[0] + assert evicted_hash in arc_policy.b1 - # store blocks 1, 2, 3, 4 - arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) - arc_manager.complete_store(to_hashes([1, 2, 3, 4])) + def test_full_scenario(self): + """ + Comprehensive test covering multiple ARC operations in sequence. + Similar to the full LRU test but adapted for ARC behavior. + """ + cpu_manager, arc_policy = self._make_manager() - # prepare store block 5 (will evict block 1) - prepare_store_output = arc_manager.prepare_store(to_hashes([5])) - assert prepare_store_output is not None - assert len(prepare_store_output.block_hashes_evicted) == 1 + # store [1, 2] + cpu_manager.prepare_store(to_hashes([1, 2])) + cpu_manager.complete_store(to_hashes([1, 2])) - # complete store with failure - arc_manager.complete_store(to_hashes([5]), success=False) + # store [3, 4, 5] -> evicts [1] + prepare_store_output = cpu_manager.prepare_store(to_hashes([3, 4, 5])) + assert prepare_store_output is not None + assert len(prepare_store_output.block_hashes_evicted) == 1 + cpu_manager.complete_store(to_hashes([3, 4, 5])) - # block 5 should not be in cache - assert arc_manager.lookup(to_hashes([5])) == 0 - # block 5 should not be in T1 or T2 - assert to_hashes([5])[0] not in arc_policy.t1 - assert to_hashes([5])[0] not in arc_policy.t2 + # promote some blocks to T2 + cpu_manager.touch(to_hashes([2, 3])) - # evicted block should still be gone (in B1 ghost list) - evicted_hash = prepare_store_output.block_hashes_evicted[0] - assert evicted_hash in arc_policy.b1 + # T1 has {4, 5}, T2 has {2, 3} + assert len(arc_policy.t1) == 2 + assert len(arc_policy.t2) == 2 + # store [6] -> should evict from T1 (4 is oldest in T1) + prepare_store_output = cpu_manager.prepare_store(to_hashes([6])) + assert prepare_store_output is not None + cpu_manager.complete_store(to_hashes([6])) -def test_arc_manager_full_scenario(): - """ - Comprehensive test covering multiple ARC operations in sequence. - Similar to the full LRU test but adapted for ARC behavior. - """ - block_size = 256 - arc_manager = CPUOffloadingManager( - block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True - ) - arc_policy = arc_manager._policy - assert isinstance(arc_policy, ARCCachePolicy) + # verify blocks 2, 3 (in T2) are still present + assert cpu_manager.lookup(to_hashes([2])) == 1 + assert cpu_manager.lookup(to_hashes([3])) == 1 - # store [1, 2] - arc_manager.prepare_store(to_hashes([1, 2])) - arc_manager.complete_store(to_hashes([1, 2])) - - # store [3, 4, 5] -> evicts [1] - prepare_store_output = arc_manager.prepare_store(to_hashes([3, 4, 5])) - assert prepare_store_output is not None - assert len(prepare_store_output.block_hashes_evicted) == 1 - arc_manager.complete_store(to_hashes([3, 4, 5])) - - # promote some blocks to T2 - arc_manager.touch(to_hashes([2, 3])) - - # T1 has {4, 5}, T2 has {2, 3} - assert len(arc_policy.t1) == 2 - assert len(arc_policy.t2) == 2 - - # store [6] -> should evict from T1 (4 is oldest in T1) - prepare_store_output = arc_manager.prepare_store(to_hashes([6])) - assert prepare_store_output is not None - arc_manager.complete_store(to_hashes([6])) - - # verify blocks 2, 3 (in T2) are still present - assert arc_manager.lookup(to_hashes([2])) == 1 - assert arc_manager.lookup(to_hashes([3])) == 1 - - # verify events - events = list(arc_manager.take_events()) - assert len(events) > 0 # should have store and eviction events + # verify events + events = list(cpu_manager.take_events()) + assert len(events) > 0 # should have store and eviction events def test_filter_reused_manager(): @@ -583,8 +549,6 @@ def test_filter_reused_manager(): block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True ) - from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager - manager = FilterReusedOffloadingManager( backing=lru_manager, store_threshold=2, max_tracker_size=3 ) diff --git a/vllm/v1/kv_offload/reuse_manager.py b/vllm/v1/kv_offload/reuse_manager.py index daf6c65cd..3c372c5d9 100644 --- a/vllm/v1/kv_offload/reuse_manager.py +++ b/vllm/v1/kv_offload/reuse_manager.py @@ -93,9 +93,8 @@ class FilterReusedOffloadingManager(OffloadingManager): ] # Delegate to the backing manager with only the eligible hashes. - # Passing an empty list is intentional and safe — both - # LRUOffloadingManager and ARCOffloadingManager handle it correctly, - # returning a PrepareStoreOutput with empty lists. + # Passing an empty list is intentional and safe — CPUOffloadingManager + # handles it correctly, returning a PrepareStoreOutput with empty lists. return self._backing.prepare_store(eligible) # ------------------------------------------------------------------