From 75e01a39a16c1f39d5e2cf37edb6525df1a76c9f Mon Sep 17 00:00:00 2001 From: Shengqi Chen Date: Thu, 9 Apr 2026 00:55:24 +0800 Subject: [PATCH] [Feature] NUMA binding support for GPU workers (#38635) Signed-off-by: Shengqi Chen Co-authored-by: Jason Li Co-authored-by: Roger Wang --- docs/configuration/optimization.md | 74 ++++++ tests/engine/test_arg_utils.py | 23 ++ tests/utils_/test_numa_utils.py | 132 ++++++++++ tests/utils_/test_system_utils.py | 10 +- vllm/config/parallel.py | 67 ++++++ vllm/engine/arg_utils.py | 13 + vllm/platforms/cuda.py | 113 +++++++++ vllm/utils/numa_utils.py | 317 +++++++++++++++++++++++++ vllm/utils/numa_wrapper.sh | 25 ++ vllm/utils/system_utils.py | 5 + vllm/v1/engine/core.py | 8 +- vllm/v1/engine/utils.py | 27 ++- vllm/v1/executor/multiproc_executor.py | 10 +- 13 files changed, 817 insertions(+), 7 deletions(-) create mode 100644 tests/utils_/test_numa_utils.py create mode 100644 vllm/utils/numa_utils.py create mode 100755 vllm/utils/numa_wrapper.sh diff --git a/docs/configuration/optimization.md b/docs/configuration/optimization.md index 56329a6ed..26eda1246 100644 --- a/docs/configuration/optimization.md +++ b/docs/configuration/optimization.md @@ -140,6 +140,80 @@ Data parallelism replicates the entire model across multiple GPU sets and proces Data parallelism can be combined with the other parallelism strategies and is set by `data_parallel_size=N`. Note that MoE layers will be sharded according to the product of the tensor parallel size and data parallel size. +### NUMA Binding for Multi-Socket GPU Nodes + +On multi-socket GPU servers, GPU worker processes can lose performance if their +CPU execution and memory allocation drift away from the NUMA node nearest to the +GPU. vLLM can pin each worker with `numactl` before the Python subprocess starts, +so the interpreter, imports, and early allocator state are created with the +desired NUMA policy from the beginning. + +Use `--numa-bind` to enable the feature. By default, vLLM auto-detects the +GPU-to-NUMA mapping and uses `--cpunodebind= --membind=` for each +worker. When you need a custom CPU policy, add `--numa-bind-cpus` and vLLM will +switch to `--physcpubind= --membind=`. + +These `--numa-bind*` options only apply to GPU execution processes. They do not +configure the CPU backend's separate thread-affinity controls. Automatic +GPU-to-NUMA detection is currently implemented for CUDA/NVML-based platforms; +other GPU backends must provide explicit binding lists if they use these +options. + +`--numa-bind-nodes` takes one non-negative NUMA node index per visible GPU, in +the same order as the GPU indices. +`--numa-bind-cpus` takes one `numactl` CPU list per visible GPU, in the same +order as the GPU indices. Each CPU list must use +`numactl --physcpubind` syntax such as `0-3`, `0,2,4-7`, or `16-31,48-63`. + +```bash +# Auto-detect NUMA nodes for visible GPUs +vllm serve meta-llama/Llama-3.1-8B-Instruct \ + --tensor-parallel-size 4 \ + --numa-bind + +# Explicit NUMA-node mapping +vllm serve meta-llama/Llama-3.1-8B-Instruct \ + --tensor-parallel-size 4 \ + --numa-bind \ + --numa-bind-nodes 0 0 1 1 + +# Explicit CPU pinning, useful for PCT or other high-frequency core layouts +vllm serve meta-llama/Llama-3.1-8B-Instruct \ + --tensor-parallel-size 4 \ + --numa-bind \ + --numa-bind-nodes 0 0 1 1 \ + --numa-bind-cpus 0-3 4-7 48-51 52-55 +``` + +Notes: + +- CLI usage forces multiprocessing to use the `spawn` method automatically. If you enable NUMA binding through the Python API, also set `VLLM_WORKER_MULTIPROC_METHOD=spawn`. +- Automatic detection relies on NVML and NUMA support from the host. If it cannot determine the mapping reliably, pass `--numa-bind-nodes` explicitly. +- Explicit `--numa-bind-nodes` and `--numa-bind-cpus` values must be valid `numactl` inputs. vLLM does a small amount of validation, but the effective binding semantics are still determined by `numactl`. +- The current implementation binds GPU execution processes such as `EngineCore` and multiprocessing workers. It does not apply NUMA binding to frontend API server processes or the DP coordinator. +- In containerized environments, NUMA policy syscalls may require extra permissions, such as `--cap-add SYS_NICE` when running via `docker run`. + +### CPU Backend Thread Affinity + +The CPU backend uses a different mechanism from `--numa-bind`. CPU execution is +configured through CPU-specific environment variables such as +`VLLM_CPU_OMP_THREADS_BIND`, `VLLM_CPU_NUM_OF_RESERVED_CPU`, and +`CPU_VISIBLE_MEMORY_NODES`, rather than the GPU-oriented `--numa-bind*` CLI +options. + +By default, `VLLM_CPU_OMP_THREADS_BIND=auto` derives OpenMP placement from the +available CPU and NUMA topology for each CPU worker. To override the automatic +policy, set `VLLM_CPU_OMP_THREADS_BIND` explicitly using the CPU list format +documented for the CPU backend, or use `nobind` to disable this behavior. + +For the current CPU backend setup and tuning guidance, see: + +- [Related runtime environment variables](../getting_started/installation/cpu.md#related-runtime-environment-variables) +- [How to decide `VLLM_CPU_OMP_THREADS_BIND`](../getting_started/installation/cpu.md#how-to-decide-vllm_cpu_omp_threads_bind) + +The GPU-only `--numa-bind`, `--numa-bind-nodes`, and `--numa-bind-cpus` options +do not configure CPU worker affinity. + ### Batch-level DP for Multi-Modal Encoders By default, TP is used to shard the weights of multi-modal encoders just like for language decoders, diff --git a/tests/engine/test_arg_utils.py b/tests/engine/test_arg_utils.py index a0f4bf970..9f03078a4 100644 --- a/tests/engine/test_arg_utils.py +++ b/tests/engine/test_arg_utils.py @@ -525,6 +525,29 @@ def test_human_readable_model_len(): parser.parse_args(["--max-model-len", invalid]) +def test_numa_bind_args(): + parser = EngineArgs.add_cli_args(FlexibleArgumentParser()) + args = parser.parse_args( + [ + "--numa-bind", + "--numa-bind-nodes", + "0", + "0", + "1", + "1", + "--numa-bind-cpus", + "0-3", + "4-7", + "8-11", + "12-15", + ] + ) + engine_args = EngineArgs.from_cli_args(args=args) + assert engine_args.numa_bind is True + assert engine_args.numa_bind_nodes == [0, 0, 1, 1] + assert engine_args.numa_bind_cpus == ["0-3", "4-7", "8-11", "12-15"] + + def test_ir_op_priority(): from vllm.config.kernel import IrOpPriorityConfig, KernelConfig diff --git a/tests/utils_/test_numa_utils.py b/tests/utils_/test_numa_utils.py new file mode 100644 index 000000000..9f6fa85da --- /dev/null +++ b/tests/utils_/test_numa_utils.py @@ -0,0 +1,132 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import os +from types import SimpleNamespace + +import pytest + +from vllm.config import ParallelConfig +from vllm.utils import numa_utils + + +def _make_config(**parallel_kwargs): + parallel_defaults = dict( + numa_bind=False, + numa_bind_nodes=None, + numa_bind_cpus=None, + distributed_executor_backend="mp", + data_parallel_backend="mp", + nnodes_within_dp=1, + data_parallel_rank_local=0, + data_parallel_index=0, + pipeline_parallel_size=1, + tensor_parallel_size=1, + ) + parallel_defaults.update(parallel_kwargs) + parallel_config = SimpleNamespace(**parallel_defaults) + return SimpleNamespace(parallel_config=parallel_config) + + +def test_get_numactl_args_with_node_binding(): + vllm_config = _make_config(numa_bind=True, numa_bind_nodes=[0, 1]) + assert ( + numa_utils._get_numactl_args(vllm_config, local_rank=1) + == "--cpunodebind=1 --membind=1" + ) + + +def test_get_numactl_args_with_cpu_binding(): + vllm_config = _make_config( + numa_bind=True, + numa_bind_nodes=[0, 1], + numa_bind_cpus=["0-3", "4-7"], + ) + assert ( + numa_utils._get_numactl_args(vllm_config, local_rank=1) + == "--physcpubind=4-7 --membind=1" + ) + + +def test_get_numactl_args_uses_dp_offset(): + vllm_config = _make_config( + numa_bind=True, + numa_bind_nodes=[0, 0, 1, 1], + data_parallel_rank_local=1, + pipeline_parallel_size=1, + tensor_parallel_size=2, + ) + assert ( + numa_utils._get_numactl_args(vllm_config, local_rank=1) + == "--cpunodebind=1 --membind=1" + ) + + +def test_get_numactl_args_requires_detectable_nodes(monkeypatch): + vllm_config = _make_config(numa_bind=True) + monkeypatch.setattr(numa_utils, "get_auto_numa_nodes", lambda: None) + with pytest.raises(RuntimeError): + numa_utils._get_numactl_args(vllm_config, local_rank=0) + + +def test_log_numactl_show(monkeypatch): + log_lines = [] + + def fake_debug(msg, *args): + log_lines.append(msg % args) + + monkeypatch.setattr(numa_utils.logger, "debug", fake_debug) + monkeypatch.setattr( + numa_utils.subprocess, + "run", + lambda *args, **kwargs: SimpleNamespace( + stdout="policy: bind\nphyscpubind: 0 1 2 3\n", returncode=0 + ), + ) + + assert numa_utils._log_numactl_show("Worker_0") is True + assert log_lines == [ + "Worker_0 affinity: policy: bind, physcpubind: 0 1 2 3", + ] + + +def test_get_numactl_executable_points_to_fixed_wrapper(monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/numactl") + executable, debug_str = numa_utils._get_numactl_executable() + assert executable.endswith("/vllm/utils/numa_wrapper.sh") + assert "_VLLM_INTERNAL_NUMACTL_ARGS" in debug_str + + +def test_set_numa_wrapper_env_restores_previous_values(): + os.environ[numa_utils._NUMACTL_ARGS_ENV] = "old-args" + os.environ[numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV] = "old-python" + + with numa_utils._set_numa_wrapper_env("new-args", "new-python"): + assert os.environ[numa_utils._NUMACTL_ARGS_ENV] == "new-args" + assert os.environ[numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV] == "new-python" + + assert os.environ[numa_utils._NUMACTL_ARGS_ENV] == "old-args" + assert os.environ[numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV] == "old-python" + + +def test_set_numa_wrapper_env_clears_values_when_unset(): + os.environ.pop(numa_utils._NUMACTL_ARGS_ENV, None) + os.environ.pop(numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV, None) + + with numa_utils._set_numa_wrapper_env("new-args", "new-python"): + assert os.environ[numa_utils._NUMACTL_ARGS_ENV] == "new-args" + assert os.environ[numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV] == "new-python" + + assert numa_utils._NUMACTL_ARGS_ENV not in os.environ + assert numa_utils._NUMACTL_PYTHON_EXECUTABLE_ENV not in os.environ + + +def test_parallel_config_validates_numa_bind_nodes(): + with pytest.raises(ValueError, match="non-negative"): + ParallelConfig(numa_bind_nodes=[0, -1]) + + +@pytest.mark.parametrize("cpuset", ["", "abc", "1-", "4-1", "1,,2", "1:2"]) +def test_parallel_config_rejects_invalid_numa_bind_cpus(cpuset): + with pytest.raises(ValueError, match="numa_bind_cpus"): + ParallelConfig(numa_bind_cpus=[cpuset]) diff --git a/tests/utils_/test_system_utils.py b/tests/utils_/test_system_utils.py index 3d1b1fc4c..5ef55877a 100644 --- a/tests/utils_/test_system_utils.py +++ b/tests/utils_/test_system_utils.py @@ -1,10 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import os import tempfile from pathlib import Path -from vllm.utils.system_utils import unique_filepath +from vllm.utils.system_utils import _maybe_force_spawn, unique_filepath def test_unique_filepath(): @@ -17,3 +18,10 @@ def test_unique_filepath(): paths.add(path) assert len(paths) == 10 assert len(list(Path(temp_dir).glob("*.txt"))) == 10 + + +def test_numa_bind_forces_spawn(monkeypatch): + monkeypatch.delenv("VLLM_WORKER_MULTIPROC_METHOD", raising=False) + monkeypatch.setattr("sys.argv", ["vllm", "serve", "--numa-bind"]) + _maybe_force_spawn() + assert os.environ["VLLM_WORKER_MULTIPROC_METHOD"] == "spawn" diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 107dcfa27..0b5c97ba0 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -6,6 +6,7 @@ import socket from collections.abc import Callable from typing import TYPE_CHECKING, Any, Literal, overload +import regex as re import torch from pydantic import Field, field_validator, model_validator from torch.distributed import ProcessGroup, ReduceOp, Store @@ -28,6 +29,7 @@ else: Executor = Any logger = init_logger(__name__) +_NUMACTL_CPUSET_PATTERN = re.compile(r"^\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*$") ExpertPlacementStrategy = Literal["linear", "round_robin"] DistributedExecutorBackend = Literal["ray", "mp", "uni", "external_launcher"] @@ -259,6 +261,27 @@ class ParallelConfig: nnodes: int = 1 """num of nodes for multi-node distributed inference when distributed_executor_backend is mp.""" + numa_bind: bool = False + """Enable NUMA binding for GPU worker subprocesses.""" + numa_bind_nodes: list[int] | None = None + """NUMA node to bind each GPU worker to. + + Specify one NUMA node per visible GPU, for example `[0, 0, 1, 1]` + for a 4-GPU system with GPUs 0-1 on NUMA node 0 and GPUs 2-3 on + NUMA node 1. If unset and `numa_bind=True`, vLLM auto-detects the + GPU-to-NUMA topology. The values are passed to `numactl --membind` + and `--cpunodebind`, so they must be valid `numactl` NUMA node indices. + """ + numa_bind_cpus: list[str] | None = None + """Optional CPU lists to bind each GPU worker to. + + Specify one CPU list per visible GPU, for example + `["0-3", "4-7", "8-11", "12-15"]`. When set, vLLM uses + `numactl --physcpubind` instead of `--cpunodebind`. This is useful + for custom policies such as binding to PCT or other high-frequency cores. + Each entry must use `numactl --physcpubind` CPU-list syntax, for example + `"0-3"` or `"0,2,4-7"`. + """ distributed_timeout_seconds: int | None = None """Timeout in seconds for distributed operations (e.g., init_process_group). @@ -345,6 +368,43 @@ class ParallelConfig: """Skip validation if the value is `None` when initialisation is delayed.""" return None if value is None else handler(value) + @field_validator("numa_bind_nodes") + @classmethod + def _validate_numa_bind_nodes(cls, value: list[int] | None) -> list[int] | None: + if value is None: + return None + if not value: + raise ValueError("numa_bind_nodes must not be empty.") + if any(node < 0 for node in value): + raise ValueError("numa_bind_nodes must contain non-negative integers.") + return value + + @field_validator("numa_bind_cpus") + @classmethod + def _validate_numa_bind_cpus(cls, value: list[str] | None) -> list[str] | None: + if value is None: + return None + if not value: + raise ValueError("numa_bind_cpus must not be empty.") + + for cpuset in value: + if not cpuset: + raise ValueError("numa_bind_cpus entries must not be empty.") + if not _NUMACTL_CPUSET_PATTERN.fullmatch(cpuset): + raise ValueError( + "numa_bind_cpus entries must use numactl CPU list syntax, " + "for example '0-3' or '0,2,4-7'." + ) + for part in cpuset.split(","): + if "-" not in part: + continue + start_str, end_str = part.split("-", 1) + if int(start_str) > int(end_str): + raise ValueError( + f"numa_bind_cpus ranges must be ascending, but got '{cpuset}'." + ) + return value + @model_validator(mode="after") def _validate_parallel_config(self) -> Self: if self._api_process_rank >= self._api_process_count: @@ -373,6 +433,13 @@ class ParallelConfig: "data_parallel_external_lb can only be set when data_parallel_size > 1" ) + if not self.numa_bind and ( + self.numa_bind_nodes is not None or self.numa_bind_cpus is not None + ): + raise ValueError( + "numa_bind_nodes and numa_bind_cpus require numa_bind=True." + ) + if self.enable_eplb: if not current_platform.is_cuda_alike(): raise ValueError( diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 90e10c1cb..55c87bf35 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -416,6 +416,9 @@ class EngineArgs: nnodes: int = ParallelConfig.nnodes node_rank: int = ParallelConfig.node_rank distributed_timeout_seconds: int | None = ParallelConfig.distributed_timeout_seconds + numa_bind: bool = ParallelConfig.numa_bind + numa_bind_nodes: list[int] | None = ParallelConfig.numa_bind_nodes + numa_bind_cpus: list[str] | None = ParallelConfig.numa_bind_cpus tensor_parallel_size: int = ParallelConfig.tensor_parallel_size prefill_context_parallel_size: int = ParallelConfig.prefill_context_parallel_size decode_context_parallel_size: int = ParallelConfig.decode_context_parallel_size @@ -861,6 +864,13 @@ class EngineArgs: "--distributed-timeout-seconds", **parallel_kwargs["distributed_timeout_seconds"], ) + parallel_group.add_argument("--numa-bind", **parallel_kwargs["numa_bind"]) + parallel_group.add_argument( + "--numa-bind-nodes", **parallel_kwargs["numa_bind_nodes"] + ) + parallel_group.add_argument( + "--numa-bind-cpus", **parallel_kwargs["numa_bind_cpus"] + ) parallel_group.add_argument( "--tensor-parallel-size", "-tp", **parallel_kwargs["tensor_parallel_size"] ) @@ -1826,6 +1836,9 @@ class EngineArgs: cp_kv_cache_interleave_size=self.cp_kv_cache_interleave_size, _api_process_count=self._api_process_count, _api_process_rank=self._api_process_rank, + numa_bind=self.numa_bind, + numa_bind_nodes=self.numa_bind_nodes, + numa_bind_cpus=self.numa_bind_cpus, ) speculative_config = self.create_speculative_config( diff --git a/vllm/platforms/cuda.py b/vllm/platforms/cuda.py index 10dd1b869..2e73c56b3 100644 --- a/vllm/platforms/cuda.py +++ b/vllm/platforms/cuda.py @@ -653,6 +653,111 @@ class NvmlCudaPlatform(CudaPlatformBase): handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) return pynvml.nvmlDeviceGetName(handle) + @classmethod + @with_nvml_context + def get_device_numa_node(cls, device_id: int = 0) -> int | None: + """Get the NUMA node ID for a GPU device.""" + physical_device_id = cls.device_id_to_physical_device_id(device_id) + handle = pynvml.nvmlDeviceGetHandleByIndex(physical_device_id) + + try: + return pynvml.nvmlDeviceGetNumaNodeId(handle) + except Exception: + pass + + try: + cpu_ids = cls._get_device_cpu_affinity(handle) + if cpu_ids: + numa_node = cls._get_numa_node_for_cpu(cpu_ids[0]) + if numa_node is not None: + logger.debug( + "Determined NUMA node %d for GPU %d via CPU affinity", + numa_node, + device_id, + ) + return numa_node + except Exception as e: + logger.warning("Failed to get NUMA node for GPU %d: %s", device_id, e) + + return None + + @classmethod + def _get_device_cpu_affinity(cls, handle) -> list[int]: + """Get the list of CPU IDs associated with a GPU via NVML.""" + cpu_count = os.cpu_count() + if cpu_count is None: + return [] + + cpu_set_size = (cpu_count + 63) // 64 + cpu_affinity_mask = pynvml.nvmlDeviceGetCpuAffinity(handle, cpu_set_size) + + cpu_ids = [] + for i, mask in enumerate(cpu_affinity_mask): + for bit in range(64): + cpu_id = i * 64 + bit + if cpu_id >= cpu_count: + break + if mask & (1 << bit): + cpu_ids.append(cpu_id) + return cpu_ids + + @classmethod + def _get_numa_node_for_cpu(cls, cpu_id: int) -> int | None: + """Determine which NUMA node a CPU belongs to.""" + from pathlib import Path + + node_path = Path("/sys/devices/system/node") + if not node_path.exists(): + return None + + for node_dir in node_path.iterdir(): + if not node_dir.name.startswith("node"): + continue + try: + node_id = int(node_dir.name[4:]) + cpulist_file = node_dir / "cpulist" + if cpulist_file.exists(): + cpulist = cpulist_file.read_text().strip() + if cls._cpu_in_cpulist(cpu_id, cpulist): + return node_id + except (ValueError, OSError): + continue + return None + + @classmethod + def _cpu_in_cpulist(cls, cpu_id: int, cpulist: str) -> bool: + """Check if a CPU ID is in a cpulist string such as '0-3,8-11'.""" + for part in cpulist.split(","): + part = part.strip() + if "-" in part: + start, end = part.split("-", 1) + if int(start) <= cpu_id <= int(end): + return True + elif part.isdigit() and int(part) == cpu_id: + return True + return False + + @classmethod + @with_nvml_context + def get_all_device_numa_nodes(cls) -> list[int] | None: + """Get NUMA nodes for all visible GPU devices.""" + try: + numa_nodes = [] + for device_id in range(cls.device_count()): + numa_node = cls.get_device_numa_node(device_id) + if numa_node is None: + logger.warning( + "Could not detect NUMA node for GPU %d, " + "disabling automatic NUMA binding", + device_id, + ) + return None + numa_nodes.append(numa_node) + return numa_nodes + except Exception as e: + logger.warning("Failed to get NUMA nodes for GPUs: %s", e) + return None + @classmethod @with_nvml_context def log_warnings(cls): @@ -695,6 +800,14 @@ class NonNvmlCudaPlatform(CudaPlatformBase): ) return False + @classmethod + def get_device_numa_node(cls, device_id: int = 0) -> int | None: + return None + + @classmethod + def get_all_device_numa_nodes(cls) -> list[int] | None: + return None + # Autodetect either NVML-enabled or non-NVML platform # based on whether NVML is available. diff --git a/vllm/utils/numa_utils.py b/vllm/utils/numa_utils.py new file mode 100644 index 000000000..4e1addad9 --- /dev/null +++ b/vllm/utils/numa_utils.py @@ -0,0 +1,317 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""NUMA binding utilities for vLLM worker processes. + +Adapted in part from SGLang's NUMA helper implementation: +https://github.com/sgl-project/sglang/blob/ba6d54d0f08f82f42b8224908ae2459a496b31b3/python/sglang/srt/utils/numa_utils.py +""" + +import ctypes +import logging +import multiprocessing +import os +import subprocess +from contextlib import contextmanager +from functools import cache +from pathlib import Path +from typing import TYPE_CHECKING + +import psutil + +from vllm import envs + +if TYPE_CHECKING: + from vllm.config import VllmConfig + +logger = logging.getLogger(__name__) +_NUMACTL_ARGS_ENV = "_VLLM_INTERNAL_NUMACTL_ARGS" +_NUMACTL_PYTHON_EXECUTABLE_ENV = "_VLLM_INTERNAL_NUMACTL_PYTHON_EXECUTABLE" + + +@cache +def get_libnuma(): + libnuma = None + for libnuma_so in ["libnuma.so", "libnuma.so.1"]: + try: + libnuma = ctypes.CDLL(libnuma_so) + except OSError: + libnuma = None + if libnuma is not None: + break + return libnuma + + +def _can_set_mempolicy() -> bool: + """Check whether the current process can use NUMA memory policy syscalls.""" + try: + libnuma = get_libnuma() + if libnuma is None or libnuma.numa_available() < 0: + return False + mode = ctypes.c_int() + ret = libnuma.get_mempolicy( + ctypes.byref(mode), None, ctypes.c_ulong(0), None, ctypes.c_ulong(0) + ) + return ret == 0 + except Exception: + return False + + +def _is_auto_numa_available() -> bool: + """Check whether automatic GPU-to-NUMA detection should be attempted.""" + from vllm.platforms import current_platform + + if not current_platform.is_cuda_alike(): + return False + + if not os.path.isdir("/sys/devices/system/node/node1"): + return False + + try: + process = psutil.Process(os.getpid()) + cpu_affinity = process.cpu_affinity() + cpu_count = psutil.cpu_count() + if cpu_count is not None and cpu_affinity != list(range(cpu_count)): + logger.warning( + "CPU affinity is already constrained for this process. " + "Skipping automatic NUMA binding; pass --numa-bind-nodes " + "explicitly to override." + ) + return False + except (AttributeError, NotImplementedError, psutil.Error): + pass + + if not _can_set_mempolicy(): + logger.warning( + "User lacks permission to set NUMA memory policy. " + "Automatic NUMA detection may not work; if you are using Docker, " + "try adding --cap-add SYS_NICE." + ) + return False + + if not hasattr(current_platform, "get_all_device_numa_nodes"): + logger.warning( + "Platform %s does not support automatic NUMA detection", + type(current_platform).__name__, + ) + return False + + return True + + +@cache +def get_auto_numa_nodes() -> list[int] | None: + """Auto-detect NUMA nodes for all visible GPUs.""" + from vllm.platforms import current_platform + + if not _is_auto_numa_available(): + return None + + numa_nodes = current_platform.get_all_device_numa_nodes() + if numa_nodes is not None: + logger.info("Auto-detected NUMA nodes for GPUs: %s", numa_nodes) + return numa_nodes + + +def _get_gpu_index( + parallel_config, local_rank: int, dp_local_rank: int | None = None +) -> int: + """Compute the physical GPU index used for NUMA lookup.""" + if ( + parallel_config.distributed_executor_backend not in ("ray", "external_launcher") + and parallel_config.data_parallel_backend != "ray" + and parallel_config.nnodes_within_dp == 1 + ): + if dp_local_rank is None: + dp_local_rank = parallel_config.data_parallel_rank_local + if dp_local_rank is None: + dp_local_rank = parallel_config.data_parallel_index + + tp_pp_world_size = ( + parallel_config.pipeline_parallel_size + * parallel_config.tensor_parallel_size + ) + return local_rank + dp_local_rank * tp_pp_world_size + + return local_rank + + +def _get_numa_node(parallel_config, gpu_index: int) -> int: + numa_nodes = parallel_config.numa_bind_nodes + if numa_nodes is None: + numa_nodes = get_auto_numa_nodes() + if numa_nodes is None: + raise RuntimeError( + "NUMA binding was requested, but vLLM could not detect the " + "GPU-to-NUMA topology automatically. Pass --numa-bind-nodes " + "explicitly or disable --numa-bind." + ) + parallel_config.numa_bind_nodes = numa_nodes + + if gpu_index >= len(numa_nodes): + raise ValueError( + f"GPU index {gpu_index} exceeds numa_bind_nodes size {len(numa_nodes)}. " + "Ensure the binding lists cover every visible GPU." + ) + + return numa_nodes[gpu_index] + + +def _get_cpu_binding(parallel_config, gpu_index: int) -> str | None: + cpu_bindings = parallel_config.numa_bind_cpus + if cpu_bindings is None: + return None + + if gpu_index >= len(cpu_bindings): + raise ValueError( + f"GPU index {gpu_index} exceeds numa_bind_cpus size " + f"{len(cpu_bindings)}. Ensure the binding lists cover every visible GPU." + ) + + return cpu_bindings[gpu_index] + + +def _get_numactl_args( + vllm_config: "VllmConfig", + local_rank: int, + dp_local_rank: int | None = None, + process_kind: str = "worker", +) -> str | None: + parallel_config = vllm_config.parallel_config + if not parallel_config.numa_bind: + return None + + gpu_index = _get_gpu_index(parallel_config, local_rank, dp_local_rank) + numa_node = _get_numa_node(parallel_config, gpu_index) + cpu_binding = _get_cpu_binding(parallel_config, gpu_index) + + if cpu_binding is not None: + bind_arg = f"--physcpubind={cpu_binding}" + logger.info( + "Binding %s subprocess (local_rank=%s, gpu_index=%s) to CPUs %s and NUMA node %s", # noqa: E501 + process_kind, + local_rank, + gpu_index, + cpu_binding, + numa_node, + ) + else: + bind_arg = f"--cpunodebind={numa_node}" + logger.info( + "Binding %s subprocess (local_rank=%s, gpu_index=%s) to NUMA node %s", + process_kind, + local_rank, + gpu_index, + numa_node, + ) + + return f"{bind_arg} --membind={numa_node}" + + +def _log_numactl_show(label: str) -> bool: + try: + result = subprocess.run( + ["numactl", "--show"], + check=True, + capture_output=True, + text=True, + ) + except (FileNotFoundError, subprocess.CalledProcessError) as e: + logger.warning("Failed to run `numactl --show` for %s: %s", label, e) + return False + + output = result.stdout.strip() + if not output: + logger.warning("`numactl --show` returned no output for %s", label) + return False + + summary = ", ".join(line.strip() for line in output.splitlines() if line.strip()) + logger.debug("%s affinity: %s", label, summary) + return True + + +def log_current_affinity_state(label: str) -> None: + """Log the process's effective NUMA affinity state.""" + _log_numactl_show(label) + + +@contextmanager +def configure_subprocess( + vllm_config: "VllmConfig", + local_rank: int, + dp_local_rank: int | None = None, + process_kind: str = "worker", +): + """Temporarily replace the multiprocessing executable with a numactl wrapper.""" + numactl_args = _get_numactl_args( + vllm_config, local_rank, dp_local_rank, process_kind + ) + if numactl_args is None: + yield + return + + executable, debug_str = _get_numactl_executable() + python_executable = os.fsdecode(multiprocessing.spawn.get_executable()) + with ( + _set_numa_wrapper_env(numactl_args, python_executable), + _mp_set_executable(executable, debug_str), + ): + yield + + +def _get_numactl_executable() -> tuple[str, str]: + """Return the fixed wrapper executable used to launch numactl.""" + from shutil import which + + if which("numactl") is None: + raise RuntimeError( + "numactl is required for NUMA binding but is not installed or " + "not available on PATH." + ) + + script_path = Path(__file__).with_name("numa_wrapper.sh") + return str(script_path), f"{script_path} via {_NUMACTL_ARGS_ENV}" + + +@contextmanager +def _set_numa_wrapper_env(numactl_args: str, python_executable: str): + old_numactl_args = os.environ.get(_NUMACTL_ARGS_ENV) + old_python_executable = os.environ.get(_NUMACTL_PYTHON_EXECUTABLE_ENV) + os.environ[_NUMACTL_ARGS_ENV] = numactl_args + os.environ[_NUMACTL_PYTHON_EXECUTABLE_ENV] = python_executable + try: + yield + finally: + if old_numactl_args is None: + os.environ.pop(_NUMACTL_ARGS_ENV, None) + else: + os.environ[_NUMACTL_ARGS_ENV] = old_numactl_args + + if old_python_executable is None: + os.environ.pop(_NUMACTL_PYTHON_EXECUTABLE_ENV, None) + else: + os.environ[_NUMACTL_PYTHON_EXECUTABLE_ENV] = old_python_executable + + +@contextmanager +def _mp_set_executable(executable: str, debug_str: str): + start_method = envs.VLLM_WORKER_MULTIPROC_METHOD + if start_method != "spawn": + logger.warning( + "NUMA binding requires spawn method but got '%s'. " + "NUMA binding will be ineffective. " + "Set VLLM_WORKER_MULTIPROC_METHOD=spawn to enable NUMA binding.", + start_method, + ) + yield + return + + old_executable = os.fsdecode(multiprocessing.spawn.get_executable()) + multiprocessing.spawn.set_executable(executable) + try: + yield + finally: + assert os.fsdecode(multiprocessing.spawn.get_executable()) == executable, ( + "Executable was changed during NUMA binding context: " + f"expected {executable}, got {multiprocessing.spawn.get_executable()}" + ) + multiprocessing.spawn.set_executable(old_executable) diff --git a/vllm/utils/numa_wrapper.sh b/vllm/utils/numa_wrapper.sh new file mode 100755 index 000000000..541801ed5 --- /dev/null +++ b/vllm/utils/numa_wrapper.sh @@ -0,0 +1,25 @@ +#!/bin/sh + +if [ -z "${_VLLM_INTERNAL_NUMACTL_ARGS:-}" ]; then + echo "_VLLM_INTERNAL_NUMACTL_ARGS is not set" >&2 + exit 1 +fi + +if [ -z "${_VLLM_INTERNAL_NUMACTL_PYTHON_EXECUTABLE:-}" ]; then + echo "_VLLM_INTERNAL_NUMACTL_PYTHON_EXECUTABLE is not set" >&2 + exit 1 +fi + +if ! command -v numactl >/dev/null 2>&1; then + echo "numactl is not available on PATH" >&2 + exit 1 +fi + +case "${_VLLM_INTERNAL_NUMACTL_ARGS}" in + *[![:alnum:]\ \-\_=,./]*) + echo "Invalid characters in _VLLM_INTERNAL_NUMACTL_ARGS" >&2 + exit 1 + ;; +esac + +exec numactl ${_VLLM_INTERNAL_NUMACTL_ARGS} "${_VLLM_INTERNAL_NUMACTL_PYTHON_EXECUTABLE}" "$@" diff --git a/vllm/utils/system_utils.py b/vllm/utils/system_utils.py index ca29dfd72..db904c29e 100644 --- a/vllm/utils/system_utils.py +++ b/vllm/utils/system_utils.py @@ -140,6 +140,11 @@ def _maybe_force_spawn(): os.environ["RAY_ADDRESS"] = ray.get_runtime_context().gcs_address reasons.append("In a Ray actor and can only be spawned") + # Force spawn if NUMA binding is enabled via --numa-bind. + # NUMA binding uses executable hijacking which requires spawn + if "--numa-bind" in sys.argv: + reasons.append("NUMA binding requires spawn method") + if cuda_is_initialized(): reasons.append("CUDA is initialized") elif xpu_is_initialized(): diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 0fa59579e..ff5f924a1 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -30,6 +30,7 @@ from vllm.multimodal import MULTIMODAL_REGISTRY from vllm.tasks import POOLING_TASKS, SupportedTask from vllm.tracing import instrument, maybe_init_worker_tracer from vllm.transformers_utils.config import maybe_register_config_serialize_by_value +from vllm.utils import numa_utils from vllm.utils.gc_utils import ( freeze_gc_heap, maybe_attach_gc_debug_callback, @@ -1055,6 +1056,8 @@ class EngineCoreProc(EngineCore): set_process_title(process_title) maybe_init_worker_tracer("vllm.engine_core", "engine_core", process_title) decorate_logs() + if parallel_config.numa_bind: + numa_utils.log_current_affinity_state(process_title) if data_parallel and vllm_config.kv_transfer_config is not None: # modify the engine_id and append the local_dp_rank to it to ensure @@ -1255,8 +1258,9 @@ class EngineCoreProc(EngineCore): return output = UtilityOutput(call_id) # Lazily look-up utility method so that failure will be handled/returned. - get_result = lambda: (method := getattr(self, method_name)) and method( - *self._convert_msgspec_args(method, args) + get_result = lambda: ( + (method := getattr(self, method_name)) + and method(*self._convert_msgspec_args(method, args)) ) enqueue_output = lambda out: self.output_queue.put_nowait( (client_idx, EngineCoreOutputs(utility_output=out)) diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index 0ce0ed88e..a378236c5 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -22,6 +22,7 @@ from vllm.config import CacheConfig, ParallelConfig, VllmConfig from vllm.logger import init_logger from vllm.platforms import current_platform from vllm.ray.ray_env import get_env_vars_to_copy +from vllm.utils import numa_utils from vllm.utils.network_utils import get_open_zmq_ipc_path, zmq_socket_ctx from vllm.utils.system_utils import get_mp_context from vllm.v1.engine.coordinator import DPCoordinator @@ -141,13 +142,33 @@ class CoreEngineProcManager: # Adjust device control in DP for non-CUDA platforms # as well as external and ray launchers # For CUDA platforms, we use torch.accelerator.set_device_index()() + device_control_context: contextlib.AbstractContextManager[None] = ( + contextlib.nullcontext() + ) if is_dp and ( not current_platform.is_cuda_alike() or vllm_config.parallel_config.use_ray ): - with set_device_control_env_var(vllm_config, local_dp_rank): - proc.start() - else: + device_control_context = set_device_control_env_var( + vllm_config, local_dp_rank + ) + + with ( + device_control_context, + numa_utils.configure_subprocess( + # EngineCore itself does not have a TP/PP-local rank. + # When DP is enabled, set_device_control_env_var() + # narrows visible devices to this DP shard first, so + # local_rank=0 means "the first local GPU in this + # shard". The actual TP/PP worker processes spawned by + # the executor are bound separately with their own + # local_rank values. + vllm_config, + local_rank=0, + dp_local_rank=local_dp_rank, + process_kind="EngineCore", + ), + ): proc.start() finally: # Kill other procs if not all are running. diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index ac61ded79..3eccce722 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -44,6 +44,7 @@ from vllm.envs import enable_envs_cache from vllm.logger import init_logger from vllm.platforms import current_platform from vllm.tracing import instrument, maybe_init_worker_tracer +from vllm.utils import numa_utils from vllm.utils.network_utils import ( get_distributed_init_method, get_ip, @@ -688,7 +689,12 @@ class WorkerProc: daemon=True, ) - proc.start() + # Apply NUMA binding if configured + with numa_utils.configure_subprocess( + vllm_config, local_rank, process_kind="worker" + ): + proc.start() + # Close child ends of pipes here in the parent ready_writer.close() death_reader.close() @@ -839,6 +845,8 @@ class WorkerProc: worker = WorkerProc(*args, **kwargs) assert worker.worker_response_mq is not None + if kwargs["vllm_config"].parallel_config.numa_bind: + numa_utils.log_current_affinity_state(f"Worker_{worker.rank}") worker.monitor_death_pipe(death_pipe, shutdown_requested)