Compare commits

..

17 Commits

Author SHA1 Message Date
Jee Jee Li
b6553be1bc [Misc] Slight improvement of the BNB (#19418)
Some checks failed
Create Release / Create Release (push) Has been cancelled
Signed-off-by: Jee Jee Li <pandaleefree@gmail.com>
Co-authored-by: Isotr0py <2037008807@qq.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-06-10 13:51:49 +00:00
youkaichao
64a9af5afa Simplify ep kernels installation (#19412)
Signed-off-by: youkaichao <youkaichao@gmail.com>
2025-06-10 20:06:08 +08:00
Li, Jiang
e4248849ec [BugFix][CPU] Fix CPU CI by ignore collecting test_pixtral (#19411)
Signed-off-by: jiang.li <jiang1.li@intel.com>
2025-06-10 12:02:40 +00:00
Rachel Guo
467bef18a3 [BugFix][FlashInfer] Fix attention backend interface mismatch with unexpected keyword use_irope (#19134)
Signed-off-by: Yunqiu Guo <guorachel@meta.com>
2025-06-10 16:48:51 +08:00
Isotr0py
5f1ac1e1d1 Revert "[v1] Add fp32 support to v1 engine through flex attn" (#19404) 2025-06-10 01:30:20 -07:00
Louie Tsai
9368cc90b2 Automatically bind CPU OMP Threads of a rank to CPU ids of a NUMA node. (#17930)
Signed-off-by: Tsai, Louie <louie.tsai@intel.com>
Co-authored-by: Li, Jiang <bigpyj64@gmail.com>
2025-06-10 06:22:05 +00:00
Anna Pendleton
32b3946bb4 Add clear documentation around the impact of debugging flag (#19369)
Signed-off-by: Anna Pendleton <pendleton@google.com>
2025-06-10 06:16:09 +00:00
Reid
6b1391ca7e [Misc] refactor neuron_multimodal and profiling (#19397)
Signed-off-by: reidliu41 <reid201711@gmail.com>
Co-authored-by: reidliu41 <reid201711@gmail.com>
2025-06-10 06:12:42 +00:00
Russell Bryant
a3f66e75d1 Add security warning to bug report template (#19365)
Signed-off-by: Russell Bryant <rbryant@redhat.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-10 06:06:36 +00:00
Lukas Geiger
319cb1e351 [Core] Batch multi modal input using pinned memory (#19169)
Signed-off-by: Lukas Geiger <lukas.geiger94@gmail.com>
2025-06-10 13:44:59 +08:00
Li Wang
1efef71645 [Bugfix] Fix modelscope token passed in (#19389)
Signed-off-by: wangli <wangli858794774@gmail.com>
Signed-off-by: Jee Jee Li <pandaleefree@gmail.com>
Co-authored-by: Jee Jee Li <pandaleefree@gmail.com>
2025-06-10 13:39:37 +08:00
Nick Hill
646d62f636 [Core] Use tuple for kv cache group block ids (#19175)
Signed-off-by: Nick Hill <nhill@redhat.com>
2025-06-10 07:01:17 +02:00
Reid
6cd4ae8acd [Frontend] Add tqdm_leave_pbar to control progress bar visibility (#19357)
Signed-off-by: reidliu41 <reid201711@gmail.com>
Co-authored-by: reidliu41 <reid201711@gmail.com>
2025-06-10 04:55:09 +00:00
Harry Mellor
c016047ed7 Fix docs/mkdocs/hooks/remove_announcement.py (#19382) 2025-06-09 21:36:54 -07:00
XiongfeiWei
9af6d22e4c Use xla flag to improve the quantized model performance (#19303)
Signed-off-by: Xiongfei Wei <isaacwxf23@gmail.com>
2025-06-10 01:28:45 +00:00
Tianyu Guo
4589b94032 [Bugfix] Fix benchmark_moe.py (#19016)
Signed-off-by: Tianyu Guo <guoty9@mail2.sysu.edu.cn>
2025-06-09 18:04:36 -07:00
Ye (Charlotte) Qi
cc867be19c [V1] Reuse V0's memory_profiling util for gpu worker memory profiling (#19312)
Signed-off-by: Ye (Charlotte) Qi <yeq@meta.com>
2025-06-10 08:40:01 +08:00
43 changed files with 477 additions and 372 deletions

View File

@@ -43,7 +43,10 @@ function cpu_tests() {
pytest -v -s tests/kernels/attention/test_mla_decode_cpu.py -m cpu_model
pytest -v -s tests/models/language/generation -m cpu_model
pytest -v -s tests/models/language/pooling -m cpu_model
pytest -v -s tests/models/multimodal/generation --ignore=tests/models/multimodal/generation/test_mllama.py -m cpu_model"
pytest -v -s tests/models/multimodal/generation \
--ignore=tests/models/multimodal/generation/test_mllama.py \
--ignore=tests/models/multimodal/generation/test_pixtral.py \
-m cpu_model"
# Run compressed-tensor test
docker exec cpu-test-"$NUMA_NODE" bash -c "

View File

@@ -8,6 +8,16 @@ body:
attributes:
value: >
#### Before submitting an issue, please make sure the issue hasn't been already addressed by searching through [the existing and past issues](https://github.com/vllm-project/vllm/issues?q=is%3Aissue+sort%3Acreated-desc+).
- type: markdown
attributes:
value: |
⚠️ **SECURITY WARNING:** Please review any text you paste to ensure it does not contain sensitive information such as:
- API tokens or keys (e.g., Hugging Face tokens, OpenAI API keys)
- Passwords or authentication credentials
- Private URLs or endpoints
- Personal or confidential data
Consider redacting or replacing sensitive values with placeholders like `<YOUR_TOKEN_HERE>` when sharing configuration or code examples.
- type: textarea
attributes:
label: Your current environment

View File

@@ -7,7 +7,6 @@ import time
from contextlib import nullcontext
from datetime import datetime
from itertools import product
from types import SimpleNamespace
from typing import Any, TypedDict
import ray
@@ -43,7 +42,7 @@ def benchmark_config(
use_fp8_w8a8: bool,
use_int8_w8a16: bool,
num_iters: int = 100,
block_quant_shape: List[int] = None,
block_quant_shape: list[int] = None,
use_deep_gemm: bool = False,
) -> float:
init_dtype = torch.float16 if use_fp8_w8a8 else dtype
@@ -400,7 +399,7 @@ class BenchmarkWorker:
dtype: torch.dtype,
use_fp8_w8a8: bool,
use_int8_w8a16: bool,
block_quant_shape: List[int] = None,
block_quant_shape: list[int] = None,
use_deep_gemm: bool = False,
) -> tuple[dict[str, int], float]:
current_platform.seed_everything(self.seed)
@@ -532,7 +531,7 @@ def save_configs(
dtype: torch.dtype,
use_fp8_w8a8: bool,
use_int8_w8a16: bool,
block_quant_shape: List[int],
block_quant_shape: list[int],
) -> None:
dtype_str = get_config_dtype_str(
dtype, use_int8_w8a16=use_int8_w8a16, use_fp8_w8a8=use_fp8_w8a8
@@ -563,7 +562,6 @@ def main(args: argparse.Namespace):
config = get_config(model=args.model, trust_remote_code=args.trust_remote_code)
if args.model_prefix:
config = getattr(config, args.model_prefix)
config = SimpleNamespace(**config)
if config.architectures[0] == "DbrxForCausalLM":
E = config.ffn_config.moe_num_experts
@@ -595,11 +593,7 @@ def main(args: argparse.Namespace):
shard_intermediate_size = 2 * intermediate_size // args.tp_size
hidden_size = config.hidden_size
dtype = (
torch.float16
if current_platform.is_rocm()
else getattr(torch, config.torch_dtype)
)
dtype = torch.float16 if current_platform.is_rocm() else config.torch_dtype
use_fp8_w8a8 = args.dtype == "fp8_w8a8"
use_int8_w8a16 = args.dtype == "int8_w8a16"
block_quant_shape = get_weight_block_size_safety(config)

View File

@@ -110,8 +110,9 @@ vLLM CPU backend supports the following vLLM features:
## Related runtime environment variables
- `VLLM_CPU_KVCACHE_SPACE`: specify the KV Cache size (e.g, `VLLM_CPU_KVCACHE_SPACE=40` means 40 GiB space for KV cache), larger setting will allow vLLM running more requests in parallel. This parameter should be set based on the hardware configuration and memory management pattern of users.
- `VLLM_CPU_OMP_THREADS_BIND`: specify the CPU cores dedicated to the OpenMP threads. For example, `VLLM_CPU_OMP_THREADS_BIND=0-31` means there will be 32 OpenMP threads bound on 0-31 CPU cores. `VLLM_CPU_OMP_THREADS_BIND=0-31|32-63` means there will be 2 tensor parallel processes, 32 OpenMP threads of rank0 are bound on 0-31 CPU cores, and the OpenMP threads of rank1 are bound on 32-63 CPU cores.
- `VLLM_CPU_KVCACHE_SPACE`: specify the KV Cache size (e.g, `VLLM_CPU_KVCACHE_SPACE=40` means 40 GiB space for KV cache), larger setting will allow vLLM running more requests in parallel. This parameter should be set based on the hardware configuration and memory management pattern of users. Default value is `0`.
- `VLLM_CPU_OMP_THREADS_BIND`: specify the CPU cores dedicated to the OpenMP threads. For example, `VLLM_CPU_OMP_THREADS_BIND=0-31` means there will be 32 OpenMP threads bound on 0-31 CPU cores. `VLLM_CPU_OMP_THREADS_BIND=0-31|32-63` means there will be 2 tensor parallel processes, 32 OpenMP threads of rank0 are bound on 0-31 CPU cores, and the OpenMP threads of rank1 are bound on 32-63 CPU cores. By setting to `auto`, the OpenMP threads of each rank are bound to the CPU cores in each NUMA node. By setting to `all`, the OpenMP threads of each rank uses all CPU cores available on the system. Default value is `auto`.
- `VLLM_CPU_NUM_OF_RESERVED_CPU`: specify the number of CPU cores which are not dedicated to the OpenMP threads for each rank. The variable only takes effect when VLLM_CPU_OMP_THREADS_BIND is set to `auto`. Default value is `0`.
- `VLLM_CPU_MOE_PREPACK`: whether to use prepack for MoE layer. This will be passed to `ipex.llm.modules.GatedMLPMOE`. Default is `1` (True). On unsupported CPUs, you might need to set this to `0` (False).
## Performance tips
@@ -133,7 +134,15 @@ export VLLM_CPU_OMP_THREADS_BIND=0-29
vllm serve facebook/opt-125m
```
- If using vLLM CPU backend on a machine with hyper-threading, it is recommended to bind only one OpenMP thread on each physical CPU core using `VLLM_CPU_OMP_THREADS_BIND`. On a hyper-threading enabled platform with 16 logical CPU cores / 8 physical CPU cores:
or using default auto thread binding:
```console
export VLLM_CPU_KVCACHE_SPACE=40
export VLLM_CPU_NUM_OF_RESERVED_CPU=2
vllm serve facebook/opt-125m
```
- If using vLLM CPU backend on a machine with hyper-threading, it is recommended to bind only one OpenMP thread on each physical CPU core using `VLLM_CPU_OMP_THREADS_BIND` or using auto thread binding feature by default. On a hyper-threading enabled platform with 16 logical CPU cores / 8 physical CPU cores:
```console
$ lscpu -e # check the mapping between logical CPU cores and physical CPU cores
@@ -178,6 +187,12 @@ $ python examples/offline_inference/basic/basic.py
VLLM_CPU_KVCACHE_SPACE=40 VLLM_CPU_OMP_THREADS_BIND="0-31|32-63" vllm serve meta-llama/Llama-2-7b-chat-hf -tp=2 --distributed-executor-backend mp
```
or using default auto thread binding:
```console
VLLM_CPU_KVCACHE_SPACE=40 vllm serve meta-llama/Llama-2-7b-chat-hf -tp=2 --distributed-executor-backend mp
```
- For each thread id list in `VLLM_CPU_OMP_THREADS_BIND`, users should guarantee threads in the list belong to a same NUMA node.
- Meanwhile, users should also take care of memory capacity of each NUMA node. The memory usage of each TP rank is the sum of `weight shard size` and `VLLM_CPU_KVCACHE_SPACE`, if it exceeds the capacity of a single NUMA node, TP worker will be killed due to out-of-memory.

View File

@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
from pathlib import Path
from typing import Literal
@@ -8,10 +9,9 @@ def on_startup(command: Literal["build", "gh-deploy", "serve"], dirty: bool):
# see https://docs.readthedocs.io/en/stable/reference/environment-variables.html # noqa
if os.getenv('READTHEDOCS_VERSION_TYPE') == "tag":
# remove the warning banner if the version is a tagged release
docs_dir = os.path.dirname(__file__)
announcement_path = os.path.join(docs_dir,
"mkdocs/overrides/main.html")
mkdocs_dir = Path(__file__).parent.parent
announcement_path = mkdocs_dir / "overrides/main.html"
# The file might be removed already if the build is triggered multiple
# times (readthedocs build both HTML and PDF versions separately)
if os.path.exists(announcement_path):
if announcement_path.exists():
os.remove(announcement_path)

View File

@@ -40,7 +40,7 @@ If other strategies don't solve the problem, it's likely that the vLLM instance
- `export VLLM_LOGGING_LEVEL=DEBUG` to turn on more logging.
- `export CUDA_LAUNCH_BLOCKING=1` to identify which CUDA kernel is causing the problem.
- `export NCCL_DEBUG=TRACE` to turn on more logging for NCCL.
- `export VLLM_TRACE_FUNCTION=1` to record all function calls for inspection in the log files to tell which function crashes or hangs.
- `export VLLM_TRACE_FUNCTION=1` to record all function calls for inspection in the log files to tell which function crashes or hangs. Do not use this flag unless absolutely needed for debugging, it will cause significant delays in startup time.
## Incorrect network setup

View File

@@ -64,7 +64,7 @@ def print_outputs(outputs):
print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
if __name__ == "__main__":
def main():
assert (
len(PROMPTS) == len(IMAGES) == len(SAMPLING_PARAMS)
), f"""Text, image prompts and sampling parameters should have the
@@ -104,3 +104,7 @@ if __name__ == "__main__":
# test batch-size = 4
outputs = llm.generate(batched_inputs, batched_sample_params)
print_outputs(outputs)
if __name__ == "__main__":
main()

View File

@@ -70,7 +70,7 @@ def main(args: argparse.Namespace):
return
if __name__ == "__main__":
def parse_args():
parser = FlexibleArgumentParser(
description="Benchmark the latency of processing a single batch of "
"requests till completion."
@@ -102,5 +102,9 @@ if __name__ == "__main__":
)
parser = EngineArgs.add_cli_args(parser)
args = parser.parse_args()
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(args)

View File

@@ -27,3 +27,5 @@ triton==3.2.0; platform_machine == "x86_64"
# Intel Extension for PyTorch, only for x86_64 CPUs
intel-openmp==2024.2.1; platform_machine == "x86_64"
intel_extension_for_pytorch==2.7.0; platform_machine == "x86_64"
py-libnuma; platform_system != "Darwin"
psutil; platform_system != "Darwin"

View File

@@ -183,34 +183,6 @@ def test_env(
assert backend.get_name() == expected
@pytest.mark.parametrize("device", ["cpu", "cuda"])
@pytest.mark.parametrize("use_v1", [True, False])
def test_fp32_fallback(
device: str,
use_v1: bool,
monkeypatch: pytest.MonkeyPatch,
):
"""Test attention backend selection with fp32."""
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1" if use_v1 else "0")
if device == "cpu":
with patch("vllm.attention.selector.current_platform",
CpuPlatform()):
backend = get_attn_backend(16, torch.float32, torch.float32,
16, False)
assert (backend.get_name() == "TORCH_SDPA_VLLM_V1"
if use_v1 else "TORCH_SDPA")
elif device == "cuda":
with patch("vllm.attention.selector.current_platform",
CudaPlatform()):
backend = get_attn_backend(16, torch.float32, torch.float32,
16, False)
assert (backend.get_name() == "FLEX_ATTENTION"
if use_v1 else "XFORMERS")
def test_flash_attn(monkeypatch: pytest.MonkeyPatch):
"""Test FlashAttn validation."""
# TODO: When testing for v1, pipe in `use_v1` as an argument to

View File

@@ -117,7 +117,7 @@ def test_prefill(hash_algo):
blocks = manager.allocate_slots(req0, 55,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
# Check full block metadata
parent_block_hash = None
@@ -141,13 +141,13 @@ def test_prefill(hash_algo):
req1 = make_request("1", common_token_ids + unique_token_ids)
computed_blocks, num_computed_tokens = manager.get_computed_blocks(req1)
assert len(manager.req_to_block_hashes[req1.request_id]) == 3
assert computed_blocks.get_block_ids() == [[1, 2, 3]]
assert computed_blocks.get_block_ids() == ([1, 2, 3], )
assert num_computed_tokens == 3 * 16
num_new_tokens = 53 - 3 * 16
blocks = manager.allocate_slots(req1, num_new_tokens,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[5]]
assert blocks.get_block_ids() == ([5], )
for block in computed_blocks.blocks[0]:
assert block.ref_cnt == 2
@@ -175,13 +175,13 @@ def test_prefill(hash_algo):
req2 = make_request("2", common_token_ids + unique_token_ids)
computed_blocks, num_computed_tokens = manager.get_computed_blocks(req2)
assert len(manager.req_to_block_hashes[req2.request_id]) == 3
assert computed_blocks.get_block_ids() == [[1, 2, 3]]
assert computed_blocks.get_block_ids() == ([1, 2, 3], )
assert num_computed_tokens == 3 * 16
num_new_tokens = 53 - 3 * 16
blocks = manager.allocate_slots(req2, num_new_tokens,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[6]]
assert blocks.get_block_ids() == ([6], )
# Although we only have 6 free blocks, we have 8 blocks in
# the free block queue due to lazy removal.
@@ -205,7 +205,7 @@ def test_prefill(hash_algo):
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
# This block ID order also checks the eviction order.
assert blocks.get_block_ids() == [[7, 8, 9, 10, 4, 5, 6, 3, 2, 1]]
assert blocks.get_block_ids() == ([7, 8, 9, 10, 4, 5, 6, 3, 2, 1], )
assert manager.block_pool.free_block_queue.num_free_blocks == 0
assert manager.block_pool.free_block_queue.free_list_head is None
assert manager.block_pool.free_block_queue.free_list_tail is None
@@ -236,8 +236,8 @@ def test_prefill_hybrid_model():
blocks = manager.allocate_slots(req0, 55,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4], [5, 6, 7, 8],
[9, 10, 11, 12]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], [5, 6, 7,
8], [9, 10, 11, 12])
# Check full block metadata
parent_block_hash = None
@@ -263,14 +263,14 @@ def test_prefill_hybrid_model():
req1 = make_request("1", common_token_ids + unique_token_ids)
computed_blocks, num_computed_tokens = manager.get_computed_blocks(req1)
assert len(manager.req_to_block_hashes[req1.request_id]) == 3
assert computed_blocks.get_block_ids() == [[1, 2, 3], [0, 6, 7],
[0, 10, 11]]
assert computed_blocks.get_block_ids() == ([1, 2, 3], [0, 6,
7], [0, 10, 11])
assert num_computed_tokens == 3 * 16
num_new_tokens = 53 - 3 * 16
blocks = manager.allocate_slots(req1, num_new_tokens,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[13], [14], [15]]
assert blocks.get_block_ids() == ([13], [14], [15])
for block_per_group in computed_blocks.blocks:
for block in block_per_group:
if block != manager.block_pool.null_block:
@@ -374,7 +374,7 @@ def test_prefill_plp():
blocks = manager.allocate_slots(req0, 55,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
req0_block_hashes = [b.block_hash for b in blocks.blocks[0]]
# Check full block metadata
@@ -400,13 +400,13 @@ def test_prefill_plp():
req1 = make_request("1", common_token_ids + unique_token_ids)
computed_blocks, num_computed_tokens = manager.get_computed_blocks(req1)
assert len(manager.req_to_block_hashes[req1.request_id]) == 3
assert computed_blocks.get_block_ids() == [[1, 2, 3]]
assert computed_blocks.get_block_ids() == ([1, 2, 3], )
assert num_computed_tokens == 3 * 16
num_new_tokens = 53 - 3 * 16
blocks = manager.allocate_slots(req1, num_new_tokens,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[5]]
assert blocks.get_block_ids() == ([5], )
for block in computed_blocks.blocks[0]:
assert block.ref_cnt == 2
@@ -444,7 +444,7 @@ def test_prefill_plp():
block_ids = blocks.get_block_ids()
# Duplicate cached blocks have different ids but same hashes vs request #0
assert [b.block_hash for b in blocks.blocks[0]] == req0_block_hashes
assert block_ids != [[1, 2, 3, 4]]
assert block_ids != ([1, 2, 3, 4], )
# Request #2 block hashes are valid since request #0 hashes are.
# Check block reference counts.
@@ -474,7 +474,7 @@ def test_decode():
blocks = manager.allocate_slots(req0, 55,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
# Append slots without allocating a new block.
req0.num_computed_tokens = 55
@@ -546,12 +546,12 @@ def test_evict():
# Touch the first 2 blocks.
req2 = make_request("2", list(range(2 * 16 + 3)))
computed_blocks, num_computed_tokens = manager.get_computed_blocks(req2)
assert computed_blocks.get_block_ids() == [[1, 2]]
assert computed_blocks.get_block_ids() == ([1, 2], )
assert num_computed_tokens == 2 * 16
blocks = manager.allocate_slots(req2, 3,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[10]]
assert blocks.get_block_ids() == ([10], )
assert manager.block_pool.free_block_queue.num_free_blocks == 7
@@ -865,7 +865,7 @@ def test_mm_prefix_caching():
blocks = manager.allocate_slots(req0, 59,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
req0.num_computed_tokens = 59
# Append slots without allocating a new block.
@@ -926,7 +926,7 @@ def test_cache_key_salting():
blocks = manager.allocate_slots(req0, 59,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
req0.num_computed_tokens = 59
# Append slots without allocating a new block.
@@ -1042,7 +1042,7 @@ def test_reset_prefix_cache():
all_token_ids = full_block_token_ids + unique_token_ids
req0 = make_request("0", all_token_ids)
blocks = manager.allocate_slots(req0, 55)
assert blocks.get_block_ids() == [[1, 2, 3, 4]]
assert blocks.get_block_ids() == ([1, 2, 3, 4], )
unique_token_ids = [4] * 7
all_token_ids = full_block_token_ids + unique_token_ids
@@ -1053,7 +1053,7 @@ def test_reset_prefix_cache():
blocks = manager.allocate_slots(req1, 7,
len(computed_blocks.blocks[0]) * 16,
computed_blocks)
assert blocks.get_block_ids() == [[5]]
assert blocks.get_block_ids() == ([5], )
# Failed to reset prefix cache because some blocks are not freed yet.
assert not manager.reset_prefix_cache()

View File

@@ -71,7 +71,7 @@ def _schedule_new_request(*req_ids: str) -> SchedulerOutput:
mm_hashes=[],
mm_positions=[],
sampling_params=SamplingParams(),
block_ids=[[0]], # block_ids should be list[list[int]]
block_ids=([0], ), # block_ids should be tuple[list[int]]
num_computed_tokens=0,
lora_request=None,
))
@@ -116,10 +116,10 @@ def _is_req_state_block_table_match(model_runner, req_id: str) -> bool:
# This is safe since we currently only use single KV cache groups
block_table = multi_group_block_table[0]
# req_state.block_ids is now list[list[int]] for MultiGroupBlockTable
# req_state.block_ids is now tuple[list[int], ...] for MultiGroupBlockTable
# Extract the first group's block IDs
if isinstance(req_state.block_ids[0], list):
# New format: list[list[int]] - extract first group
# New format: tuple[list[int], ...] - extract first group
req_block_ids = req_state.block_ids[0]
else:
# Legacy format: list[int] - use directly
@@ -210,7 +210,7 @@ def test_update_states_request_resumed(model_runner):
req_id=req_id,
resumed_from_preemption=False,
new_token_ids=[],
new_block_ids=[[]],
new_block_ids=([], ),
num_computed_tokens=0,
)

View File

@@ -203,7 +203,7 @@ def _construct_cached_request_state(req_id_suffix: int):
sampling_params=_create_sampling_params(),
mm_inputs=[],
mm_positions=[],
block_ids=[[]],
block_ids=([], ),
generator=None,
num_computed_tokens=len(output_token_ids),
output_token_ids=output_token_ids,

View File

@@ -4,7 +4,6 @@
import random
import pytest
import torch
from vllm.attention import Attention
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
@@ -123,7 +122,7 @@ def _schedule_new_request(*req_ids: str) -> SchedulerOutput:
mm_hashes=[],
mm_positions=[],
sampling_params=SamplingParams(),
block_ids=[[0]],
block_ids=([0], ),
num_computed_tokens=0,
lora_request=None,
))
@@ -251,7 +250,7 @@ def test_update_states_request_resumed(model_runner):
req_id=req_id,
resumed_from_preemption=False,
new_token_ids=[],
new_block_ids=[[]],
new_block_ids=([], ),
num_computed_tokens=0,
)
@@ -400,7 +399,6 @@ def test_load_model_weights_inplace(dist_init, model_runner, model_runner_2):
def test_init_kv_cache_with_kv_sharing_invalid_target_layer_order():
torch.set_default_dtype(torch.float16)
layer_0 = "model.layers.0.self_attn.attn"
layer_1 = "model.layers.1.self_attn.attn"
error_msg = f"{layer_1} must come before the current layer"
@@ -429,7 +427,6 @@ def test_init_kv_cache_with_kv_sharing_invalid_target_layer_order():
def test_init_kv_cache_with_kv_sharing_target_layer_not_exist():
torch.set_default_dtype(torch.float16)
layer_0 = "model.layers.0.self_attn.attn"
layer_1 = "model.layers.1.self_attn.attn"
invalid_layer = "model.layers.0.cross_attn.attn"
@@ -458,7 +455,6 @@ def test_init_kv_cache_with_kv_sharing_target_layer_not_exist():
def test_init_kv_cache_with_kv_sharing_target_same_as_current():
torch.set_default_dtype(torch.float16)
layer_0 = "model.layers.0.self_attn.attn"
layer_1 = "model.layers.1.self_attn.attn"
error_msg = f"{layer_1} cannot be the same as the current layer"
@@ -487,7 +483,6 @@ def test_init_kv_cache_with_kv_sharing_target_same_as_current():
def test_init_kv_cache_without_kv_sharing():
torch.set_default_dtype(torch.float16)
layer_0 = "model.layers.0.self_attn.attn"
layer_1 = "model.layers.1.self_attn.attn"
vllm_config = get_vllm_config()
@@ -555,7 +550,6 @@ def test_init_kv_cache_without_kv_sharing():
def test_init_kv_cache_with_kv_sharing_valid():
torch.set_default_dtype(torch.float16)
layer_0 = "model.layers.0.self_attn.attn"
layer_1 = "model.layers.1.self_attn.attn"
vllm_config = get_vllm_config()

View File

@@ -1,11 +1,10 @@
Large-scale cluster-level expert parallel, as described in the [DeepSeek-V3 Technical Report](http://arxiv.org/abs/2412.19437), is an efficient way to deploy sparse MoE models with many experts. However, such deployment requires many components beyond a normal Python package, including system package support and system driver support. It is impossible to bundle all these components into a Python package.
Here we break down the requirements in 3 steps:
Here we break down the requirements in 2 steps:
1. Build and install the Python libraries (both [pplx-kernels](https://github.com/ppl-ai/pplx-kernels) and [DeepEP](https://github.com/deepseek-ai/DeepEP)), including necessary dependencies like NVSHMEM. This step does not require any privileged access. Any user can do this.
2. Build and install the system libraries (GDR Copy). This step requires root access. You can do it inside a Docker container so that they can be shipped as a single image.
3. Build and install the system drivers (GDR Copy, and necessary modifications to NVIDIA driver to enable IBGDA). This step requires root access, and must be done on the host machine.
2. Configure NVIDIA driver to enable IBGDA. This step requires root access, and must be done on the host machine.
2 and 3 are necessary for multi-node deployment.
2 is necessary for multi-node deployment.
All scripts accept a positional argument as workspace path for staging the build, defaulting to `$(pwd)/ep_kernels_workspace`.
@@ -21,7 +20,6 @@ bash install_python_libraries.sh
```bash
bash install_python_libraries.sh
sudo bash install_system_libraries.sh
sudo bash install_system_drivers.sh
sudo bash configure_system_drivers.sh
sudo reboot # Reboot is required to load the new driver
```

View File

@@ -0,0 +1,7 @@
set -ex
# turn on IBGDA
echo 'options nvidia NVreg_EnableStreamMemOPs=1 NVreg_RegistryDwords="PeerMappingOverride=1;"' | tee -a /etc/modprobe.d/nvidia.conf
update-initramfs -u
echo "Please reboot the system to apply the changes"

View File

@@ -13,16 +13,6 @@ fi
# install dependencies if not installed
pip3 install cmake torch ninja
# build gdrcopy, required by nvshmem
pushd $WORKSPACE
wget https://github.com/NVIDIA/gdrcopy/archive/refs/tags/v2.4.4.tar.gz
mkdir -p gdrcopy_src
tar -xvf v2.4.4.tar.gz -C gdrcopy_src --strip-components=1
pushd gdrcopy_src
make -j$(nproc)
make prefix=$WORKSPACE/gdrcopy_install install
popd
# build nvshmem
pushd $WORKSPACE
mkdir -p nvshmem_src
@@ -34,26 +24,30 @@ git init
git apply -vvv nvshmem.patch
# assume CUDA_HOME is set correctly
export GDRCOPY_HOME=$WORKSPACE/gdrcopy_install
if [ -z "$CUDA_HOME" ]; then
echo "CUDA_HOME is not set, please set it to your CUDA installation directory."
exit 1
fi
# disable all features except IBGDA
export NVSHMEM_IBGDA_SUPPORT=1
export NVSHMEM_SHMEM_SUPPORT=0
export NVSHMEM_UCX_SUPPORT=0
export NVSHMEM_USE_NCCL=0
export NVSHMEM_IBGDA_SUPPORT=1
export NVSHMEM_PMIX_SUPPORT=0
export NVSHMEM_TIMEOUT_DEVICE_POLLING=0
export NVSHMEM_USE_GDRCOPY=1
export NVSHMEM_IBRC_SUPPORT=1
# remove MPI dependency
export NVSHMEM_USE_GDRCOPY=0
export NVSHMEM_IBRC_SUPPORT=0
export NVSHMEM_BUILD_TESTS=0
export NVSHMEM_BUILD_EXAMPLES=0
export NVSHMEM_MPI_SUPPORT=0
export NVSHMEM_BUILD_HYDRA_LAUNCHER=0
export NVSHMEM_BUILD_TXZ_PACKAGE=0
export NVSHMEM_TIMEOUT_DEVICE_POLLING=0
cmake -S . -B $WORKSPACE/nvshmem_build/ -DCMAKE_INSTALL_PREFIX=$WORKSPACE/nvshmem_install
cd $WORKSPACE/nvshmem_build/
make -j$(nproc)
make install
cmake -G Ninja -S . -B $WORKSPACE/nvshmem_build/ -DCMAKE_INSTALL_PREFIX=$WORKSPACE/nvshmem_install
cmake --build $WORKSPACE/nvshmem_build/ --target install
popd

View File

@@ -1,24 +0,0 @@
set -ex
# prepare workspace directory
WORKSPACE=$1
if [ -z "$WORKSPACE" ]; then
export WORKSPACE=$(pwd)/ep_kernels_workspace
fi
if [ ! -d "$WORKSPACE" ]; then
mkdir -p $WORKSPACE
fi
# build and install gdrcopy driver
pushd $WORKSPACE
cd gdrcopy_src
./insmod.sh
# run gdrcopy_copybw to test the installation
$WORKSPACE/gdrcopy_install/bin/gdrcopy_copybw
# turn on IBGDA
echo 'options nvidia NVreg_EnableStreamMemOPs=1 NVreg_RegistryDwords="PeerMappingOverride=1;"' | tee -a /etc/modprobe.d/nvidia.conf
update-initramfs -u
echo "Please reboot the system to apply the changes"

View File

@@ -1,18 +0,0 @@
set -ex
# prepare workspace directory
WORKSPACE=$1
if [ -z "$WORKSPACE" ]; then
export WORKSPACE=$(pwd)/ep_kernels_workspace
fi
if [ ! -d "$WORKSPACE" ]; then
mkdir -p $WORKSPACE
fi
# build and install gdrcopy system packages
pushd $WORKSPACE
cd gdrcopy_src/packages
apt install devscripts -y
CUDA=${CUDA_HOME:-/usr/local/cuda} ./build-deb-packages.sh
dpkg -i *.deb

View File

@@ -1337,6 +1337,13 @@ class EngineArgs:
recommend_to_remove=False)
return False
# Only Fp16 and Bf16 dtypes since we only support FA.
V1_SUPPORTED_DTYPES = [torch.bfloat16, torch.float16]
if model_config.dtype not in V1_SUPPORTED_DTYPES:
_raise_or_fallback(feature_name=f"--dtype {model_config.dtype}",
recommend_to_remove=False)
return False
# No Embedding Models so far.
if model_config.task not in ["generate"]:
_raise_or_fallback(feature_name=f"--task {model_config.task}",

View File

@@ -281,7 +281,7 @@ class LLM:
sampling_params: Optional[Union[SamplingParams,
Sequence[SamplingParams]]] = None,
*,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -297,7 +297,7 @@ class LLM:
sampling_params: Optional[Union[SamplingParams,
list[SamplingParams]]] = None,
prompt_token_ids: Optional[list[int]] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -313,7 +313,7 @@ class LLM:
sampling_params: Optional[Union[SamplingParams,
list[SamplingParams]]] = None,
prompt_token_ids: Optional[list[list[int]]] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -330,7 +330,7 @@ class LLM:
list[SamplingParams]]] = None,
*,
prompt_token_ids: list[int],
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -347,7 +347,7 @@ class LLM:
list[SamplingParams]]] = None,
*,
prompt_token_ids: list[list[int]],
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -362,7 +362,7 @@ class LLM:
prompts: None,
sampling_params: None,
prompt_token_ids: Union[list[int], list[list[int]]],
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -382,7 +382,7 @@ class LLM:
sampling_params: Optional[Union[SamplingParams,
Sequence[SamplingParams]]] = None,
prompt_token_ids: Optional[Union[list[int], list[list[int]]]] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
guided_options_request: Optional[Union[LLMGuidedOptions,
@@ -404,7 +404,10 @@ class LLM:
When it is a single value, it is applied to every prompt.
When it is a list, the list must have the same length as the
prompts and it is paired one by one with the prompt.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
prompt_adapter_request: Prompt Adapter request to use for
generation, if any.
@@ -678,7 +681,7 @@ class LLM:
list[list[ChatCompletionMessageParam]]],
sampling_params: Optional[Union[SamplingParams,
list[SamplingParams]]] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[LoRARequest] = None,
chat_template: Optional[str] = None,
chat_template_content_format: ChatTemplateContentFormatOption = "auto",
@@ -709,7 +712,10 @@ class LLM:
is a single value, it is applied to every prompt. When it
is a list, the list must have the same length as the
prompts and it is paired one by one with the prompt.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
chat_template: The template to use for structuring the chat.
If not provided, the model's default chat template will be used.
@@ -823,7 +829,7 @@ class LLM:
Sequence[PoolingParams]]] = None,
*,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -838,7 +844,7 @@ class LLM:
Sequence[PoolingParams]]] = None,
prompt_token_ids: Optional[list[int]] = None,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -853,7 +859,7 @@ class LLM:
Sequence[PoolingParams]]] = None,
prompt_token_ids: Optional[list[list[int]]] = None,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -869,7 +875,7 @@ class LLM:
*,
prompt_token_ids: list[int],
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -885,7 +891,7 @@ class LLM:
*,
prompt_token_ids: list[list[int]],
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -899,7 +905,7 @@ class LLM:
pooling_params: None,
prompt_token_ids: Union[list[int], list[list[int]]],
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -918,7 +924,7 @@ class LLM:
Sequence[PoolingParams]]] = None,
prompt_token_ids: Optional[Union[list[int], list[list[int]]]] = None,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[PoolingRequestOutput]:
@@ -935,7 +941,10 @@ class LLM:
for more details about the format of each prompts.
pooling_params: The pooling parameters for pooling. If None, we
use the default pooling parameters.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
prompt_adapter_request: Prompt Adapter request to use for
generation, if any.
@@ -1005,7 +1014,7 @@ class LLM:
/,
*,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
pooling_params: Optional[Union[PoolingParams,
Sequence[PoolingParams]]] = None,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
@@ -1024,7 +1033,10 @@ class LLM:
for more details about the format of each prompts.
pooling_params: The pooling parameters for pooling. If None, we
use the default pooling parameters.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
prompt_adapter_request: Prompt Adapter request to use for
generation, if any.
@@ -1051,7 +1063,7 @@ class LLM:
prompts: Union[PromptType, Sequence[PromptType]],
/,
*,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[ClassificationRequestOutput]:
@@ -1066,7 +1078,10 @@ class LLM:
prompts: The prompts to the LLM. You may pass a sequence of prompts
for batch inference. See [PromptType][vllm.inputs.PromptType]
for more details about the format of each prompts.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
prompt_adapter_request: Prompt Adapter request to use for
generation, if any.
@@ -1092,7 +1107,7 @@ class LLM:
text_1: list[Union[str, TextPrompt, TokensPrompt]],
text_2: list[Union[str, TextPrompt, TokensPrompt]],
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[ScoringRequestOutput]:
@@ -1126,7 +1141,7 @@ class LLM:
text_1: list[str],
text_2: list[str],
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[ScoringRequestOutput]:
@@ -1178,7 +1193,7 @@ class LLM:
/,
*,
truncate_prompt_tokens: Optional[int] = None,
use_tqdm: bool = True,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[list[LoRARequest], LoRARequest]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
) -> list[ScoringRequestOutput]:
@@ -1198,7 +1213,10 @@ class LLM:
text_2: The texts to pair with the query to form the input
to the LLM. See [PromptType][vllm.inputs.PromptType] for
more details about the format of each prompts.
use_tqdm: Whether to use tqdm to display the progress bar.
use_tqdm: If `True`, shows a tqdm progress bar.
If a callable (e.g., `functools.partial(tqdm, leave=False)`),
it is used to create the progress bar.
If `False`, no progress bar is created.
lora_request: LoRA request to use for generation, if any.
prompt_adapter_request: Prompt Adapter request to use for
generation, if any.
@@ -1379,7 +1397,7 @@ class LLM:
params: Union[SamplingParams, Sequence[SamplingParams], PoolingParams,
Sequence[PoolingParams]],
*,
use_tqdm: bool,
use_tqdm: Union[bool, Callable[..., tqdm]] = True,
lora_request: Optional[Union[Sequence[LoRARequest], LoRARequest]],
prompt_adapter_request: Optional[PromptAdapterRequest],
tokenization_kwargs: Optional[dict[str, Any]] = None,
@@ -1417,7 +1435,8 @@ class LLM:
# Add requests to the engine.
it = prompts
if use_tqdm:
it = tqdm(it, desc="Adding requests")
tqdm_func = use_tqdm if callable(use_tqdm) else tqdm
it = tqdm_func(it, desc="Adding requests")
for i, prompt in enumerate(it):
self._add_request(
@@ -1474,12 +1493,15 @@ class LLM:
return params
def _run_engine(
self, *, use_tqdm: bool
self,
*,
use_tqdm: Union[bool, Callable[..., tqdm]] = True
) -> list[Union[RequestOutput, PoolingRequestOutput]]:
# Initialize tqdm.
if use_tqdm:
num_requests = self.llm_engine.get_num_unfinished_requests()
pbar = tqdm(
tqdm_func = use_tqdm if callable(use_tqdm) else tqdm
pbar = tqdm_func(
total=num_requests,
desc="Processed prompts",
dynamic_ncols=True,

View File

@@ -44,6 +44,7 @@ if TYPE_CHECKING:
VLLM_PP_LAYER_PARTITION: Optional[str] = None
VLLM_CPU_KVCACHE_SPACE: int = 0
VLLM_CPU_OMP_THREADS_BIND: str = ""
VLLM_CPU_NUM_OF_RESERVED_CPU: int = 0
VLLM_CPU_MOE_PREPACK: bool = True
VLLM_XLA_CACHE_PATH: str = os.path.join(VLLM_CACHE_ROOT, "xla_cache")
VLLM_XLA_CHECK_RECOMPILATION: bool = False
@@ -422,7 +423,12 @@ environment_variables: dict[str, Callable[[], Any]] = {
# (CPU backend only) CPU core ids bound by OpenMP threads, e.g., "0-31",
# "0,1,2", "0-31,33". CPU cores of different ranks are separated by '|'.
"VLLM_CPU_OMP_THREADS_BIND":
lambda: os.getenv("VLLM_CPU_OMP_THREADS_BIND", "all"),
lambda: os.getenv("VLLM_CPU_OMP_THREADS_BIND", "auto"),
# (CPU backend only) CPU cores not used by OMP threads .
# Those CPU cores will not be used by OMP threads of a rank.
"VLLM_CPU_NUM_OF_RESERVED_CPU":
lambda: int(os.getenv("VLLM_CPU_NUM_OF_RESERVED_CPU", "0")),
# (CPU backend only) whether to use prepack for MoE layer. This will be
# passed to ipex.llm.modules.GatedMLPMOE. On unsupported CPUs, you might

View File

@@ -71,9 +71,7 @@ class BitsAndBytesConfig(QuantizationConfig):
@staticmethod
def get_config_filenames() -> list[str]:
return [
"adapter_config.json",
]
return []
@classmethod
def from_config(cls, config: dict[str, Any]) -> "BitsAndBytesConfig":

View File

@@ -392,7 +392,8 @@ class BitsAndBytesModelLoader(BaseModelLoader):
def _get_bnb_target_modules(self, model: nn.Module) -> None:
for name, module in model.named_modules():
if isinstance(module, (LinearBase, )):
if (isinstance(module, LinearBase) and
hasattr(module.quant_method, "quant_config")):
if modules_info := self.modules_mapping.get_sub_modules(name):
# Map vllm's names to transformers's names.
rep_name, sub_modules = modules_info

View File

@@ -680,7 +680,8 @@ class MultiModalKwargs(UserDict[str, NestedTensors]):
return self._items_by_modality.keys()
@staticmethod
def _try_stack(nested_tensors: NestedTensors) -> NestedTensors:
def _try_stack(nested_tensors: NestedTensors,
pin_memory: bool = False) -> NestedTensors:
"""
Stack the inner dimensions that have the same shape in
a nested list of tensors.
@@ -697,7 +698,9 @@ class MultiModalKwargs(UserDict[str, NestedTensors]):
if isinstance(nested_tensors, (int, float)):
return torch.tensor(nested_tensors)
stacked = [MultiModalKwargs._try_stack(t) for t in nested_tensors]
stacked = [
MultiModalKwargs._try_stack(t, pin_memory) for t in nested_tensors
]
if not is_list_of(stacked, torch.Tensor, check="all"):
# Only tensors (not lists) can be stacked.
return stacked
@@ -713,10 +716,16 @@ class MultiModalKwargs(UserDict[str, NestedTensors]):
# The tensors have incompatible shapes and can't be stacked.
return tensors_
return torch.stack(tensors_)
outputs = torch.empty(len(tensors_),
*tensors_[0].shape,
dtype=tensors_[0].dtype,
device=tensors_[0].device,
pin_memory=pin_memory)
return torch.stack(tensors_, out=outputs)
@staticmethod
def batch(inputs_list: list["MultiModalKwargs"]) -> BatchedTensorInputs:
def batch(inputs_list: list["MultiModalKwargs"],
pin_memory: bool = False) -> BatchedTensorInputs:
"""
Batch multiple inputs together into a dictionary.
@@ -738,7 +747,7 @@ class MultiModalKwargs(UserDict[str, NestedTensors]):
item_lists[k].append(v)
return {
k: MultiModalKwargs._try_stack(item_list)
k: MultiModalKwargs._try_stack(item_list, pin_memory)
for k, item_list in item_lists.items()
}

View File

@@ -208,6 +208,9 @@ class CpuPlatform(Platform):
# Disable torch async compiling which won't work with daemonic processes
os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1"
# Share the cpusets list among ranks by spawning process instead
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
# Intel OpenMP setting
ld_prealod_str = os.getenv("LD_PRELOAD", "")
if "libiomp5.so" in ld_prealod_str:

View File

@@ -233,10 +233,6 @@ class CudaPlatformBase(Platform):
logger.info_once("Using Triton backend on V1 engine.")
return ("vllm.v1.attention.backends."
"triton_attn.TritonAttentionBackend")
if dtype not in (torch.float16, torch.bfloat16):
logger.info_once(
f"Using FlexAttenion backend for {dtype} on V1 engine.")
return "vllm.v1.attention.backends.flex_attention.FlexAttentionBackend" # noqa: E501
if cls.is_device_capability(100):
# Prefer FlashInfer for V1 on Blackwell GPUs if installed
try:

View File

@@ -143,7 +143,9 @@ def list_repo_files(
modelscope_list_repo_files)
return modelscope_list_repo_files(repo_id,
revision=revision,
token=token)
token=os.getenv(
"MODELSCOPE_API_TOKEN",
None))
return hf_list_repo_files(repo_id,
revision=revision,
repo_type=repo_type,

View File

@@ -2269,6 +2269,8 @@ def kill_process_tree(pid: int):
class MemorySnapshot:
"""Memory snapshot."""
torch_peak: int = 0
free_memory: int = 0
total_memory: int = 0
cuda_memory: int = 0
torch_memory: int = 0
non_torch_memory: int = 0
@@ -2288,8 +2290,8 @@ class MemorySnapshot:
self.torch_peak = torch.cuda.memory_stats().get(
"allocated_bytes.all.peak", 0)
self.cuda_memory = torch.cuda.mem_get_info(
)[1] - torch.cuda.mem_get_info()[0]
self.free_memory, self.total_memory = torch.cuda.mem_get_info()
self.cuda_memory = self.total_memory - self.free_memory
# torch.cuda.memory_reserved() is how many bytes
# PyTorch gets from cuda (by calling cudaMalloc, etc.)
@@ -2302,6 +2304,8 @@ class MemorySnapshot:
def __sub__(self, other: MemorySnapshot) -> MemorySnapshot:
return MemorySnapshot(
torch_peak=self.torch_peak - other.torch_peak,
free_memory=self.free_memory - other.free_memory,
total_memory=self.total_memory - other.total_memory,
cuda_memory=self.cuda_memory - other.cuda_memory,
torch_memory=self.torch_memory - other.torch_memory,
non_torch_memory=self.non_torch_memory - other.non_torch_memory,
@@ -2323,6 +2327,16 @@ class MemoryProfilingResult:
after_profile: MemorySnapshot = field(default_factory=MemorySnapshot)
profile_time: float = 0.0
def __repr__(self) -> str:
return (f"Memory profiling takes {self.profile_time:.2f} seconds. "
f"Total non KV cache memory: "
f"{(self.non_kv_cache_memory / GiB_bytes):.2f}GiB; "
f"torch peak memory increase: "
f"{(self.torch_peak_increase / GiB_bytes):.2f}GiB; "
f"non-torch forward increase memory: "
f"{(self.non_torch_increase / GiB_bytes):.2f}GiB; "
f"weights memory: {(self.weights_memory / GiB_bytes):.2f}GiB.")
@contextlib.contextmanager
def memory_profiling(

View File

@@ -508,7 +508,12 @@ class FlashInferImpl(AttentionImpl):
logits_soft_cap: Optional[float] = None,
attn_type: AttentionType = AttentionType.DECODER,
kv_sharing_target_layer_name: Optional[int] = None,
use_irope: bool = False,
) -> None:
if use_irope:
logger.warning_once(
"Using irope in FlashInfer is not supported yet, it will fall"
" back to global attention for long context.")
self.num_heads = num_heads
self.head_size = head_size
self.scale = float(scale)

View File

@@ -89,8 +89,8 @@ class BlockPool:
BlockHashWithGroupId(block_hash, group_id))
if not cached_blocks_one_group:
return None
first_block_id = next(iter(cached_blocks_one_group))
cached_blocks.append(cached_blocks_one_group[first_block_id])
first_block = next(iter(cached_blocks_one_group.values()))
cached_blocks.append(first_block)
return cached_blocks
def cache_full_blocks(
@@ -260,7 +260,7 @@ class BlockPool:
return True
return False
def touch(self, blocks: list[list[KVCacheBlock]]) -> None:
def touch(self, blocks: tuple[list[KVCacheBlock], ...]) -> None:
"""Touch a block increases its reference count by 1, and may remove
the block from the free queue. This is used when a block is hit by
another request with the same prefix.
@@ -299,7 +299,7 @@ class BlockPool:
bool: True if the prefix cache is successfully reset,
False otherwise.
"""
num_used_blocks = (self.num_gpu_blocks - self.get_num_free_blocks())
num_used_blocks = self.num_gpu_blocks - self.get_num_free_blocks()
if num_used_blocks != 1: # The null block is always marked as used
logger.warning(
"Failed to reset prefix cache because some "

View File

@@ -5,8 +5,7 @@ from typing import Callable, Optional
from vllm.v1.core.block_pool import BlockPool
from vllm.v1.core.kv_cache_utils import BlockHash, KVCacheBlock
from vllm.v1.core.single_type_kv_cache_manager import (
FullAttentionManager, SingleTypeKVCacheManager,
get_manager_for_kv_cache_spec)
FullAttentionManager, get_manager_for_kv_cache_spec)
from vllm.v1.kv_cache_interface import FullAttentionSpec, KVCacheConfig
from vllm.v1.request import Request
@@ -30,25 +29,21 @@ class KVCacheCoordinator(ABC):
self.block_pool = BlockPool(kv_cache_config.num_blocks, enable_caching,
enable_kv_cache_events)
self.single_type_managers: list[SingleTypeKVCacheManager] = []
# Needs special handling for find_longest_cache_hit if eagle is enabled
self.use_eagle = use_eagle
for i in range(len(self.kv_cache_config.kv_cache_groups)):
kv_cache_spec = self.kv_cache_config.kv_cache_groups[
i].kv_cache_spec
self.single_type_managers.append(
get_manager_for_kv_cache_spec(
kv_cache_spec=kv_cache_spec,
block_pool=self.block_pool,
kv_cache_group_id=i,
caching_hash_fn=caching_hash_fn,
))
self.single_type_managers = tuple(
get_manager_for_kv_cache_spec(
kv_cache_spec=kv_cache_group.kv_cache_spec,
block_pool=self.block_pool,
kv_cache_group_id=i,
caching_hash_fn=caching_hash_fn,
) for i, kv_cache_group in enumerate(
self.kv_cache_config.kv_cache_groups))
def get_num_blocks_to_allocate(
self, request_id: str, num_tokens: int,
new_computed_blocks: list[list[KVCacheBlock]]) -> int:
new_computed_blocks: tuple[list[KVCacheBlock], ...]) -> int:
"""
Get the number of blocks needed to be allocated for the request.
@@ -70,7 +65,7 @@ class KVCacheCoordinator(ABC):
def save_new_computed_blocks(
self, request_id: str,
new_computed_blocks: list[list[KVCacheBlock]]) -> None:
new_computed_blocks: tuple[list[KVCacheBlock], ...]) -> None:
"""
Add the new computed blocks to the request.
@@ -84,7 +79,7 @@ class KVCacheCoordinator(ABC):
new_computed_blocks[i])
def allocate_new_blocks(self, request_id: str,
num_tokens: int) -> list[list[KVCacheBlock]]:
num_tokens: int) -> tuple[list[KVCacheBlock], ...]:
"""
Allocate new blocks for the request to give it at least `num_tokens`
token slots.
@@ -97,11 +92,9 @@ class KVCacheCoordinator(ABC):
Returns:
The new allocated blocks.
"""
new_blocks = []
for manager in self.single_type_managers:
new_blocks.append(
manager.allocate_new_blocks(request_id, num_tokens))
return new_blocks
return tuple(
manager.allocate_new_blocks(request_id, num_tokens)
for manager in self.single_type_managers)
def cache_blocks(self, request: Request, block_hashes: list[BlockHash],
num_computed_tokens: int) -> None:
@@ -159,19 +152,20 @@ class KVCacheCoordinator(ABC):
for manager in self.single_type_managers:
manager.remove_skipped_blocks(request_id, num_computed_tokens)
def get_blocks(self, request_id: str) -> list[list[KVCacheBlock]]:
def get_blocks(self, request_id: str) -> tuple[list[KVCacheBlock], ...]:
"""
Get the blocks for the request.
"""
return [
return tuple(
manager.req_to_blocks.get(request_id) or []
for manager in self.single_type_managers
]
for manager in self.single_type_managers)
@abstractmethod
def find_longest_cache_hit(
self, block_hashes: list[BlockHash],
max_cache_hit_length: int) -> tuple[list[list[KVCacheBlock]], int]:
self,
block_hashes: list[BlockHash],
max_cache_hit_length: int,
) -> tuple[tuple[list[KVCacheBlock], ...], int]:
pass
@@ -195,8 +189,10 @@ class UnitaryKVCacheCoordinator(KVCacheCoordinator):
"UnitaryKVCacheCoordinator assumes only one kv cache group")
def find_longest_cache_hit(
self, block_hashes: list[BlockHash],
max_cache_hit_length: int) -> tuple[list[list[KVCacheBlock]], int]:
self,
block_hashes: list[BlockHash],
max_cache_hit_length: int,
) -> tuple[tuple[list[KVCacheBlock], ...], int]:
hit_blocks = self.single_type_managers[0].find_longest_cache_hit(
block_hashes=block_hashes,
max_length=max_cache_hit_length,
@@ -275,11 +271,24 @@ class HybridKVCacheCoordinator(KVCacheCoordinator):
"KVCacheCoordinator assumes the block_size of full attention "
"layers is divisible by other layers now.")
if max(self.full_attention_group_ids) < min(self.other_group_ids):
self.full_attn_first = True
elif max(self.other_group_ids) < min(self.full_attention_group_ids):
self.full_attn_first = False
else:
raise ValueError(
"HybridKVCacheCoordinator assumes the full "
"attention group ids and other attention group ids "
"do not interleave, either full attention group ids "
"are before other attention group ids or vice versa."
"This is for simplifying merging hit_blocks_full_attn and "
"hit_blocks_other_attn to hit_blocks.")
def find_longest_cache_hit(
self,
block_hashes: list[BlockHash],
max_cache_hit_length: int,
) -> tuple[list[list[KVCacheBlock]], int]:
) -> tuple[tuple[list[KVCacheBlock], ...], int]:
"""
Find the longest cache hit for the request.
@@ -318,27 +327,25 @@ class HybridKVCacheCoordinator(KVCacheCoordinator):
))
hit_length = len(hit_blocks_other_attn[0]) * self.other_block_size
# NOTE: the prefix cache hit length must be a multiply of block_size as
# NOTE: the prefix cache hit length must be a multiple of block_size as
# we don't support partial block cache hit yet. The cache hit length
# of other attention is ensured to be a multiply of the block size of
# of other attention is ensured to be a multiple of the block size of
# full attention layers in current implementation, because hit_length is
# a multiply of other attention's block size, and other attention's
# block size is a multiply of full attention's block size (verified in
# a multiple of other attention's block size, and other attention's
# block size is a multiple of full attention's block size (verified in
# `verify_and_split_kv_cache_groups`).
assert hit_length % self.full_attention_block_size == 0
# Truncate the full attention cache hit to the length of the
# cache hit of the other attention.
for i in range(len(hit_blocks_full_attn)):
del hit_blocks_full_attn[i][hit_length //
self.full_attention_block_size:]
for group_hit_blocks in hit_blocks_full_attn:
del group_hit_blocks[hit_length // self.full_attention_block_size:]
# Merge the hit blocks of full attention and other attention.
hit_blocks = hit_blocks_other_attn
for group_id, blocks in enumerate(hit_blocks_full_attn):
# NOTE: there is only one full attention group in most cases. So
# the time complexity of insert is fine.
hit_blocks.insert(group_id, blocks)
if self.full_attn_first:
hit_blocks = hit_blocks_full_attn + hit_blocks_other_attn
else:
hit_blocks = hit_blocks_other_attn + hit_blocks_full_attn
return hit_blocks, hit_length
@@ -351,8 +358,6 @@ def get_kv_cache_coordinator(
use_eagle, enable_caching,
caching_hash_fn,
enable_kv_cache_events)
else:
return HybridKVCacheCoordinator(kv_cache_config, max_model_len,
use_eagle, enable_caching,
caching_hash_fn,
enable_kv_cache_events)
return HybridKVCacheCoordinator(kv_cache_config, max_model_len, use_eagle,
enable_caching, caching_hash_fn,
enable_kv_cache_events)

View File

@@ -21,11 +21,11 @@ logger = init_logger(__name__)
@dataclass
class KVCacheBlocks:
"""
The allocation result of KVCacheManager, work as the interface between
Scheduler and KVCacheManager, to hide KVCacheManager's internal data
The allocation result of KVCacheManager, work as the interface between
Scheduler and KVCacheManager, to hide KVCacheManager's internal data
structure from the Scheduler.
"""
blocks: list[list[KVCacheBlock]]
blocks: tuple[list[KVCacheBlock], ...]
"""
blocks[i][j] refers to the i-th kv_cache_group and the j-th block of tokens.
We don't use block of tokens as the outer dimension because it assumes all
@@ -37,21 +37,19 @@ class KVCacheBlocks:
def __add__(self, other: "KVCacheBlocks") -> "KVCacheBlocks":
"""Adds two KVCacheBlocks instances."""
return KVCacheBlocks(
[blk1 + blk2 for blk1, blk2 in zip(self.blocks, other.blocks)])
tuple(blk1 + blk2
for blk1, blk2 in zip(self.blocks, other.blocks)))
def get_block_ids(self) -> list[list[int]]:
def get_block_ids(self) -> tuple[list[int], ...]:
"""
Converts the KVCacheBlocks instance to block_ids.
Returns:
list[list[int]]: A two-level list where
* the outer list corresponds to KV cache groups
tuple[list[int], ...]: A tuple of lists where
* the outer tuple corresponds to KV cache groups
* each inner list contains the block_ids of the blocks in that group
"""
block_ids = []
for group in self.blocks:
block_ids.append([blk.block_id for blk in group])
return block_ids
return tuple([blk.block_id for blk in group] for group in self.blocks)
def get_unhashed_block_ids(self) -> list[int]:
"""Get block_ids of unhashed blocks from KVCacheBlocks instance."""
@@ -63,7 +61,7 @@ class KVCacheBlocks:
def new_empty(self) -> "KVCacheBlocks":
"""Creates a new KVCacheBlocks instance with no blocks."""
return KVCacheBlocks([[] for _ in range(len(self.blocks))])
return KVCacheBlocks(tuple([] for _ in range(len(self.blocks))))
class KVCacheManager:
@@ -232,9 +230,8 @@ class KVCacheManager:
if new_computed_blocks is not None:
new_computed_block_list = new_computed_blocks.blocks
else:
new_computed_block_list = [
[] for _ in range(len(self.kv_cache_config.kv_cache_groups))
]
new_computed_block_list = tuple(
[] for _ in range(len(self.kv_cache_config.kv_cache_groups)))
# Free the blocks that are skipped during the attention computation
# (e.g., tokens outside the sliding window).
@@ -267,7 +264,7 @@ class KVCacheManager:
if self.enable_caching:
self.block_pool.touch(new_computed_block_list)
else:
assert all(not blocks for blocks in new_computed_block_list), (
assert not any(new_computed_block_list), (
"Computed blocks should be empty when "
"prefix caching is disabled")
@@ -378,17 +375,18 @@ class KVCacheManager:
"""
return self.block_pool.take_events()
def get_block_ids(self, request_id: str) -> list[list[int]]:
def get_block_ids(self, request_id: str) -> tuple[list[int], ...]:
"""Get the block ids of a request."""
return KVCacheBlocks(
self.coordinator.get_blocks(request_id)).get_block_ids()
def cache_blocks(self, request: Request, block_hashes: list[BlockHash],
num_computed_tokens: int) -> None:
def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
"""Cache the blocks for the request."""
block_hashes = self.req_to_block_hashes[request.request_id]
self.coordinator.cache_blocks(request, block_hashes,
num_computed_tokens)
def create_empty_block_list(self) -> KVCacheBlocks:
"""Creates a new KVCacheBlocks instance with no blocks."""
return KVCacheBlocks([[] for _ in range(self.num_kv_cache_groups)])
return KVCacheBlocks(tuple([]
for _ in range(self.num_kv_cache_groups)))

View File

@@ -27,7 +27,7 @@ class NewRequestData:
mm_hashes: list[str]
mm_positions: list[PlaceholderRange]
sampling_params: SamplingParams
block_ids: list[list[int]]
block_ids: tuple[list[int], ...]
num_computed_tokens: int
lora_request: Optional[LoRARequest]
@@ -35,7 +35,7 @@ class NewRequestData:
def from_request(
cls,
request: Request,
block_ids: list[list[int]],
block_ids: tuple[list[int], ...],
) -> NewRequestData:
return cls(
req_id=request.request_id,
@@ -86,7 +86,7 @@ class CachedRequestData:
# request's block IDs instead of appending to the existing block IDs.
resumed_from_preemption: bool
new_token_ids: list[int]
new_block_ids: list[list[int]]
new_block_ids: tuple[list[int], ...]
num_computed_tokens: int
@classmethod
@@ -95,7 +95,7 @@ class CachedRequestData:
request: Request,
resumed_from_preemption: bool,
new_token_ids: list[int],
new_block_ids: list[list[int]],
new_block_ids: tuple[list[int], ...],
) -> CachedRequestData:
return cls(
req_id=request.request_id,

View File

@@ -180,7 +180,7 @@ class Scheduler(SchedulerInterface):
# uses structured decoding.
structured_output_request_ids: dict[str, int] = {}
req_to_new_block_ids: dict[str, list[list[int]]] = {}
req_to_new_block_ids: dict[str, tuple[list[int], ...]] = {}
num_scheduled_tokens: dict[str, int] = {}
token_budget = self.max_num_scheduled_tokens
# Encoder-related.
@@ -471,7 +471,7 @@ class Scheduler(SchedulerInterface):
token_budget -= num_new_tokens
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_computed_tokens
# Count the number of prifix cached tokens.
# Count the number of prefix cached tokens.
if request.num_cached_tokens < 0:
request.num_cached_tokens = num_computed_tokens
# Encoder-related.
@@ -588,7 +588,7 @@ class Scheduler(SchedulerInterface):
request: Request,
num_scheduled_tokens: int,
num_scheduled_spec_tokens: int,
new_block_ids: list[list[int]],
new_block_ids: tuple[list[int], ...],
resumed_from_preemption: bool,
) -> CachedRequestData:
# OPTIMIZATION: Cache the CachedRequestData objects to avoid creating
@@ -1015,11 +1015,7 @@ class Scheduler(SchedulerInterface):
num_computed_tokens = min(num_computed_tokens, request.num_tokens)
if num_computed_tokens == request.num_tokens:
num_computed_tokens -= 1
self.kv_cache_manager.cache_blocks(
request,
self.kv_cache_manager.req_to_block_hashes[request.request_id],
num_computed_tokens,
)
self.kv_cache_manager.cache_blocks(request, num_computed_tokens)
# Update the request state for scheduling.
request.num_computed_tokens = num_computed_tokens

View File

@@ -197,7 +197,7 @@ class SingleTypeKVCacheManager(ABC):
block_pool: BlockPool,
kv_cache_spec: KVCacheSpec,
use_eagle: bool,
) -> list[list[KVCacheBlock]]:
) -> tuple[list[KVCacheBlock], ...]:
"""
Get the longest cache hit prefix of the blocks that is not longer than
`max_length`. The prefix should be a common prefix hit for all the
@@ -222,7 +222,7 @@ class SingleTypeKVCacheManager(ABC):
element is a list of cached blocks for the i-th kv cache group
in `kv_cache_group_ids`.
For example, sliding window manager should return a list like
[[NULL, NULL, KVCacheBlock(7), KVCacheBlock(8)]] for block size 4
([NULL, NULL, KVCacheBlock(7), KVCacheBlock(8)]) for block size 4
and sliding window 8 and len(kv_cache_group_ids) = 1.
"""
@@ -254,27 +254,25 @@ class FullAttentionManager(SingleTypeKVCacheManager):
block_pool: BlockPool,
kv_cache_spec: KVCacheSpec,
use_eagle: bool,
) -> list[list[KVCacheBlock]]:
) -> tuple[list[KVCacheBlock], ...]:
assert isinstance(kv_cache_spec, FullAttentionSpec), (
"FullAttentionManager can only be used for full attention groups")
computed_blocks: list[list[KVCacheBlock]] = [
[] for _ in range(len(kv_cache_group_ids))
]
computed_blocks: tuple[list[KVCacheBlock], ...] = tuple(
[] for _ in range(len(kv_cache_group_ids)))
max_num_blocks = max_length // kv_cache_spec.block_size
for i in range(max_num_blocks):
block_hash = block_hashes[i]
for i, block_hash in zip(range(max_num_blocks), block_hashes):
# block_hashes is a chain of block hashes. If a block hash is not
# in the cached_block_hash_to_id, the following block hashes are
# not computed yet for sure.
if cached_block := block_pool.get_cached_block(
block_hash, kv_cache_group_ids):
for j in range(len(kv_cache_group_ids)):
computed_blocks[j].append(cached_block[j])
for computed, cached in zip(computed_blocks, cached_block):
computed.append(cached)
else:
break
if use_eagle and len(computed_blocks[0]) > 0:
for j in range(len(kv_cache_group_ids)):
computed_blocks[j].pop()
if use_eagle and computed_blocks[0]:
for computed in computed_blocks:
computed.pop()
return computed_blocks
def remove_skipped_blocks(self, request_id: str,
@@ -311,7 +309,7 @@ class SlidingWindowManager(SingleTypeKVCacheManager):
block_pool: BlockPool,
kv_cache_spec: KVCacheSpec,
use_eagle: bool,
) -> list[list[KVCacheBlock]]:
) -> tuple[list[KVCacheBlock], ...]:
assert isinstance(kv_cache_spec, SlidingWindowSpec), (
"SlidingWindowManager can only be used for sliding window groups")
@@ -332,23 +330,23 @@ class SlidingWindowManager(SingleTypeKVCacheManager):
# sliding_window_contiguous_blocks),
# which is good for low cache hit rate scenarios.
max_num_blocks = max_length // kv_cache_spec.block_size
computed_blocks = [[block_pool.null_block] * max_num_blocks
for _ in range(len(kv_cache_group_ids))]
computed_blocks = tuple([block_pool.null_block] * max_num_blocks
for _ in range(len(kv_cache_group_ids)))
num_contiguous_blocks = 0
match_found = False
# Search from right to left and early stop when a match is found.
for i in range(max_num_blocks - 1, -1, -1):
if cached_block := block_pool.get_cached_block(
block_hashes[i], kv_cache_group_ids):
for j in range(len(kv_cache_group_ids)):
computed_blocks[j][i] = cached_block[j]
for computed, cached in zip(computed_blocks, cached_block):
computed[i] = cached
num_contiguous_blocks += 1
if (num_contiguous_blocks >= sliding_window_contiguous_blocks):
if num_contiguous_blocks >= sliding_window_contiguous_blocks:
# Trim the trailing blocks.
# E.g., [NULL, NULL, 8, 3, NULL, 9] -> [NULL, NULL, 8, 3]
# when sliding_window_contiguous_blocks=2.
for j in range(len(kv_cache_group_ids)):
del computed_blocks[j][i + num_contiguous_blocks:]
for computed in computed_blocks:
del computed[i + num_contiguous_blocks:]
match_found = True
break
else:
@@ -356,11 +354,11 @@ class SlidingWindowManager(SingleTypeKVCacheManager):
if not match_found:
# The first `num_contiguous_blocks` is a cache hit even if
# `num_contiguous_blocks < sliding_window_contiguous_blocks`.
for j in range(len(kv_cache_group_ids)):
del computed_blocks[j][num_contiguous_blocks:]
if use_eagle and len(computed_blocks[0]) > 0:
for j in range(len(kv_cache_group_ids)):
computed_blocks[j].pop()
for computed in computed_blocks:
del computed[num_contiguous_blocks:]
if use_eagle and computed_blocks[0]:
for computed in computed_blocks:
computed.pop()
return computed_blocks
def remove_skipped_blocks(self, request_id: str,

View File

@@ -112,11 +112,12 @@ class MultiGroupBlockTable:
for block_size in block_sizes
]
def append_row(self, block_ids: list[list[int]], row_idx: int) -> None:
def append_row(self, block_ids: tuple[list[int], ...],
row_idx: int) -> None:
for i, block_table in enumerate(self.block_tables):
block_table.append_row(block_ids[i], row_idx)
def add_row(self, block_ids: list[list[int]], row_idx: int) -> None:
def add_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
for i, block_table in enumerate(self.block_tables):
block_table.add_row(block_ids[i], row_idx)

View File

@@ -1,5 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
import os
from importlib import util
from typing import Optional
import torch
@@ -38,10 +39,14 @@ class CPUWorker(Worker):
def init_device(self):
# Setup OpenMP threads affinity.
omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND
if omp_cpuids == "all":
self.local_omp_cpuid = "all"
self.local_omp_cpuid = "all"
if omp_cpuids == "auto":
self.local_omp_cpuid = self.get_cpus_id_binding_based_on_numa_nodes(
)
else:
self.local_omp_cpuid = omp_cpuids.split("|")[self.rank]
if self.local_omp_cpuid != "all":
ret = torch.ops._C_utils.init_cpu_threads_env(self.local_omp_cpuid)
if ret:
logger.info(ret)
@@ -99,3 +104,49 @@ class CPUWorker(Worker):
assert isinstance(output, ModelRunnerOutput)
return output if self.is_driver_worker else None
def get_cpus_id_binding_based_on_numa_nodes(self) -> str:
"""Return CPUs id binding based on NUMA nodes.
"""
rank_to_cpus = self.local_omp_cpuid
# Setup OpenMP thread affinity based on NUMA nodes automatically
world_size = self.vllm_config.parallel_config.world_size
libnuma_found = util.find_spec("numa") is not None
psutil_found = util.find_spec("psutil") is not None
if libnuma_found and psutil_found:
import psutil
from numa import info
cpu_count = psutil.cpu_count(logical=False)
cpus_allow_list = psutil.Process().cpu_affinity()
numa_size = info.get_num_configured_nodes()
cpu_count_per_numa = cpu_count // numa_size
num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
cpu_count_per_numa // 2)
# check allow node_to_cpus list
node_to_cpus = []
for i in range(numa_size):
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(list(node_intersect))
if world_size > len(node_to_cpus):
logger.error(
"Auto thread-binding failed due to "
"world size: %d is larger than "
"allowed NUMA nodes number: %d."
"Please try to bind threads manually.", world_size,
len(node_to_cpus))
else:
end = cpu_count_per_numa - num_of_reserved_cpu
rank_to_cpus_list = node_to_cpus[self.rank][:end]
rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
logger.info("auto thread-binding list: %s", rank_to_cpus)
else:
logger.warning(
"Auto thread-binding is not supported due to "
"the lack of package numa and psutil,"
"fallback to no thread-binding. To get better performance,"
"please try to manually bind threads.")
return rank_to_cpus

View File

@@ -30,7 +30,7 @@ class CachedRequestState:
sampling_params: SamplingParams
generator: Optional[torch.Generator]
block_ids: list[list[int]]
block_ids: tuple[list[int], ...]
num_computed_tokens: int
output_token_ids: list[int]

View File

@@ -962,7 +962,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
encoder_outputs = []
for grouped_mm_inputs in grouped_mm_inputs_list:
batched_mm_inputs = MultiModalKwargs.batch(grouped_mm_inputs)
batched_mm_inputs = MultiModalKwargs.batch(
grouped_mm_inputs, pin_memory=self.pin_memory)
batched_mm_inputs = MultiModalKwargs.as_kwargs(
batched_mm_inputs,
device=self.device,
@@ -1989,7 +1990,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
).multi_modal_data
batched_dummy_mm_inputs = MultiModalKwargs.batch(
[dummy_mm_kwargs] * max_num_mm_items)
[dummy_mm_kwargs] * max_num_mm_items,
pin_memory=self.pin_memory)
batched_dummy_mm_inputs = MultiModalKwargs.as_kwargs(
batched_dummy_mm_inputs,
device=self.device,

View File

@@ -22,7 +22,7 @@ from vllm.lora.request import LoRARequest
from vllm.model_executor import set_random_seed
from vllm.platforms import current_platform
from vllm.sequence import IntermediateTensors
from vllm.utils import GiB_bytes
from vllm.utils import GiB_bytes, MemorySnapshot, memory_profiling
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.utils import report_usage_stats
@@ -130,20 +130,22 @@ class Worker(WorkerBase):
_check_if_gpu_supports_dtype(self.model_config.dtype)
gc.collect()
torch.cuda.empty_cache()
self.init_gpu_memory, total_gpu_memory = torch.cuda.mem_get_info()
requested_memory = (total_gpu_memory *
self.cache_config.gpu_memory_utilization)
if self.init_gpu_memory < requested_memory:
# take current memory snapshot
self.init_snapshot = MemorySnapshot()
self.requested_memory = (self.init_snapshot.total_memory *
self.cache_config.gpu_memory_utilization)
if self.init_snapshot.free_memory < self.requested_memory:
GiB = lambda b: round(b / GiB_bytes, 2)
raise ValueError(
f"Free memory on device ({GiB(self.init_gpu_memory)}/"
f"{GiB(total_gpu_memory)} GiB) on startup is less than "
f"desired GPU memory utilization "
f"Free memory on device "
f"({GiB(self.init_snapshot.free_memory)}/"
f"{GiB(self.init_snapshot.total_memory)} GiB) on startup "
f"is less than desired GPU memory utilization "
f"({self.cache_config.gpu_memory_utilization}, "
f"{GiB(requested_memory)} GiB). Decrease GPU memory "
f"{GiB(self.requested_memory)} GiB). Decrease GPU memory "
f"utilization or reduce GPU memory used by other processes."
)
else:
raise RuntimeError(
f"Not support device type: {self.device_config.device}")
@@ -192,57 +194,39 @@ class Worker(WorkerBase):
"""
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
GiB = lambda b: b / GiB_bytes
_, total_gpu_memory = torch.cuda.mem_get_info()
# Execute a forward pass with dummy inputs to profile the memory usage
# of the model.
self.model_runner.profile_run()
with memory_profiling(
self.init_snapshot,
weights_memory=int(
self.model_runner.model_memory_usage)) as profile_result:
self.model_runner.profile_run()
free_gpu_memory, _ = torch.cuda.mem_get_info()
free_gpu_memory = profile_result.after_profile.free_memory
# NOTE(woosuk): Here we assume that the other processes using the same
# GPU did not change their memory usage during the profiling.
assert self.init_gpu_memory > free_gpu_memory, (
assert self.init_snapshot.free_memory > free_gpu_memory, (
"Error in memory profiling. "
f"Initial free memory {self.init_gpu_memory/GiB_bytes} GiB, "
f"current free memory {free_gpu_memory/GiB_bytes} GiB. "
f"This happens when the GPU memory was not properly cleaned up "
f"before initializing the vLLM instance.")
f"Initial free memory {GiB(self.init_snapshot.free_memory)} GiB, "
f"current free memory {GiB(free_gpu_memory)} GiB. "
"This happens when other processes sharing the same container "
"release GPU memory while vLLM is profiling during initialization. "
"To fix this, ensure consistent GPU memory allocation or "
"isolate vLLM in its own container.")
available_kv_cache_memory = self.requested_memory \
- profile_result.non_kv_cache_memory
# Get the peak memory allocation recorded by torch
peak_torch_memory = torch.cuda.memory_stats(
)["allocated_bytes.all.peak"]
# Check for any memory left around that may have been allocated on the
# gpu outside of `torch`. NCCL operations, for example, can use a few
# GB during a forward pass.
torch.cuda.empty_cache()
torch_allocated_bytes = torch.cuda.memory_stats(
)["allocated_bytes.all.current"]
# Reset after emptying torch cache
free_gpu_memory = torch.cuda.mem_get_info()[0]
# Total forward allocation (current) is equal to the diff in free memory
fwd_alloc_bytes = self.init_gpu_memory - free_gpu_memory
# We assume current non-torch allocation is equal to peak
non_torch_alloc_bytes = max(0, fwd_alloc_bytes - torch_allocated_bytes)
# Total forward allocation (peak) is peak torch + non-torch
peak_memory = peak_torch_memory + non_torch_alloc_bytes
available_kv_cache_memory = (
total_gpu_memory * self.cache_config.gpu_memory_utilization -
peak_memory)
GiB = lambda b: b / GiB_bytes
logger.debug(
"Initial free memory: %.2f GiB, free memory: %.2f GiB, "
"total GPU memory: %.2f GiB", GiB(self.init_gpu_memory),
GiB(free_gpu_memory), GiB(total_gpu_memory))
logger.debug(
"Peak torch memory: %.2f GiB, non-torch forward-pass memory: "
"%.2f GiB, available KVCache memory: %.2f GiB",
GiB(peak_torch_memory), GiB(non_torch_alloc_bytes),
GiB(available_kv_cache_memory))
"requested GPU memory: %.2f GiB",
GiB(self.init_snapshot.free_memory), GiB(free_gpu_memory),
GiB(self.requested_memory))
logger.debug(profile_result)
logger.info("Available KV cache memory: %.2f GiB",
GiB(available_kv_cache_memory))
gc.collect()
return int(available_kv_cache_memory)

View File

@@ -101,7 +101,10 @@ class TPUWorker:
# fix this. It will be removed after the bug in XLA compiler is fixed.
os.environ["LIBTPU_INIT_ARGS"] = (
os.environ.get("LIBTPU_INIT_ARGS", "") +
" --xla_tpu_force_1d_allreduce_at_chunk_count=1")
" --xla_tpu_force_1d_allreduce_at_chunk_count=1"
" --xla_jf_conv_input_fusion=False")
# --xla_jf_conv_input_fusion=False is used to improve the perf of
# quantized matmul.
torch.set_grad_enabled(False)
torch.set_default_dtype(self.model_config.dtype)

View File

@@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""A CPU worker class."""
import os
from importlib import util
from typing import Dict, List, Optional, Set, Tuple, Type
import torch
@@ -156,8 +157,10 @@ class CPUWorker(LocalOrDistributedWorkerBase):
# Setup OpenMP threads affinity.
omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND
if omp_cpuids == "all":
self.local_omp_cpuid = "all"
self.local_omp_cpuid = "all"
if omp_cpuids == "auto":
self.local_omp_cpuid = self.get_cpus_id_binding_based_on_numa_nodes(
)
else:
self.local_omp_cpuid = omp_cpuids.split("|")[rank]
@@ -399,3 +402,49 @@ class CPUWorker(LocalOrDistributedWorkerBase):
return CPUCacheEngine.get_cache_block_size(
self.cache_config.block_size, self.cache_config.cache_dtype,
self.model_config, self.parallel_config)
def get_cpus_id_binding_based_on_numa_nodes(self) -> str:
"""Return CPUs id binding based on NUMA nodes.
"""
rank_to_cpus = self.local_omp_cpuid
# Setup OpenMP thread affinity based on NUMA nodes automatically
world_size = self.vllm_config.parallel_config.world_size
libnuma_found = util.find_spec("numa") is not None
psutil_found = util.find_spec("psutil") is not None
if libnuma_found and psutil_found:
import psutil
from numa import info
cpu_count = psutil.cpu_count(logical=False)
cpus_allow_list = psutil.Process().cpu_affinity()
numa_size = info.get_num_configured_nodes()
cpu_count_per_numa = cpu_count // numa_size
num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
cpu_count_per_numa // 2)
# check allow node_to_cpus list
node_to_cpus = []
for i in range(numa_size):
node_intersect = set(
info.node_to_cpus(i)).intersection(cpus_allow_list)
if bool(node_intersect):
node_to_cpus.append(list(node_intersect))
if world_size > len(node_to_cpus):
logger.error(
"Auto thread-binding failed due to "
"world size: %d is larger than "
"allowed NUMA nodes number: %d."
"Please try to bind threads manually.", world_size,
len(node_to_cpus))
else:
end = cpu_count_per_numa - num_of_reserved_cpu
rank_to_cpus_list = node_to_cpus[self.rank][:end]
rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
logger.info("auto thread-binding list: %s", rank_to_cpus)
else:
logger.warning(
"Auto thread-binding is not supported due to "
"the lack of package numa and psutil,"
"fallback to no thread-binding. To get better performance,"
"please try to manually bind threads.")
return rank_to_cpus