Files
nvfp4-megamoe-kernel/dsv4/_archive/cache/prepare_forward.py
biondizzle f3b551956d Cleanup Step 2: Archive Lineage P code, fix broken imports
- Move dead dsv4/ modules to dsv4/_archive/ (52 files)
  - model/{dsv4,mtp,layer,layer_schedule}
  - layers/{embedding,attention,ffn,norm} (kept linear,mhc,router,moe,shared_expert,grouped_linear - live)
  - cache/*, kernels/cache/*, kernels/indexer/{csa_indexer,score_topk,compute_valid_lens}
  - kernels/router/{nvfp4_fused_router,dense_router_decode_kernel,dense_router_prefill}
  - ops/{topk,topk_select,rope,router}, loader/{hf_checkpoint,layout_convert}
  - reference/{attention,compressor,csa_attention,moe_pipeline}
  - kernels/compressor/{compress_tail,csa_hca}
- Restore dsv4/ops/{router,custom_ops}.py (needed by live layers)
- Fix dsv4/kernels/{indexer,compressor,attention}/__init__.py (removed broken imports)
- Remove preload_all() from loader.py (dead, referenced nonexistent .cu file)
- Fix loader.py docstring (fused_amax_quantize_nvfp4 → quantize_nvfp4_from_buffer)
- Move broken tests to tests/e2e_archive/
  - test_fused_router, production_values_test, e2e/{one_layer,model_construction,csa_hca}
- vLLM has 0 imports of dsv4 (Step 0 confirmed)
2026-06-02 19:27:07 +00:00

84 lines
3.5 KiB
Python

"""Pre-forward block allocation.
Runs between captured graphs. Computes how many new compressed entries
will be produced by this forward (deterministic from positions), allocates
the required physical blocks, and updates block tables.
After this runs, the captured graph can perform flushes by writing to
already-resolved (request, layer, logical_block) -> physical_block
mappings. No allocation inside the graph.
"""
from __future__ import annotations
from typing import List
import torch
from dsv4.model.layer_schedule import LayerSpec, AttentionType
from dsv4.cache.manager import KVCacheManager
def prepare_forward(
manager: KVCacheManager,
request_slots: torch.Tensor, # [B] state cache slots
positions_before: torch.Tensor, # [B] absolute position BEFORE this forward
positions_after: torch.Tensor, # [B] absolute position AFTER this forward
) -> None:
"""Pre-allocate any blocks that will be needed by flushes in this forward.
Pure CPU/GPU bookkeeping — runs between captures, not in hot path.
For each compressed layer, works out how many flushes happen per
request and allocates blocks to cover them.
"""
for layer_idx, spec in enumerate(manager.schedule):
if spec.attn == AttentionType.SWA:
continue # No classical pool, no flushes.
schema = manager.schemas[layer_idx]
alloc = manager.allocators[layer_idx]
if alloc is None:
continue
m = (manager.config.csa_compression_ratio
if spec.attn == AttentionType.CSA
else manager.config.hca_compression_ratio)
epb = schema.entries_per_block
# How many compressed entries are NEWLY produced per request?
# = floor(positions_after / m) - floor(positions_before / m)
entries_after = (positions_after // m).to(torch.int64)
entries_before = (positions_before // m).to(torch.int64)
new_entries = entries_after - entries_before # [B] int64
# For each request, figure out how many new blocks are needed.
# A block holds `epb` entries. If there are already some entries
# in the current (open) block, they take some slots.
for b in range(request_slots.numel()):
n_new = int(new_entries[b])
if n_new == 0:
continue
req_slot = int(request_slots[b])
# How many entries are already in the current open block?
existing_blocks = int(manager.block_lens[layer_idx][req_slot])
entries_in_open_block = int(entries_before[b]) % epb if existing_blocks > 0 else 0
slots_remaining_in_open = epb - entries_in_open_block if entries_in_open_block > 0 else 0
# How many new blocks do we need?
if entries_in_open_block == 0 and existing_blocks == 0:
# Fresh — no open block yet
blocks_needed = (n_new + epb - 1) // epb
elif slots_remaining_in_open >= n_new:
# Fits in the current open block
blocks_needed = 0
else:
# Need additional blocks beyond the current open one
overflow = n_new - slots_remaining_in_open
blocks_needed = (overflow + epb - 1) // epb
if blocks_needed == 0:
continue
ids = alloc.acquire(blocks_needed)
existing = int(manager.block_lens[layer_idx][req_slot])
manager.block_tables[layer_idx][req_slot, existing:existing + blocks_needed] = ids
manager.block_lens[layer_idx][req_slot] = existing + blocks_needed