[CPU] Replace OMP initialization (#36487)

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
This commit is contained in:
Anton Ivanov
2026-04-03 11:42:43 +01:00
committed by GitHub
parent 25f2b55319
commit abebd9323d
7 changed files with 321 additions and 426 deletions

View File

@@ -23,22 +23,22 @@ if [ "$failed_req" -ne 0 ]; then
exit 1
fi
echo "--- DP+TP"
vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -dp=2 --max-model-len=4096 &
server_pid=$!
timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1
vllm bench serve \
--backend vllm \
--dataset-name random \
--model meta-llama/Llama-3.2-3B-Instruct \
--num-prompts 20 \
--result-dir ./test_results \
--result-filename dp_pp.json \
--save-result \
--endpoint /v1/completions
kill -s SIGTERM $server_pid; wait $server_pid || true
failed_req=$(jq '.failed' ./test_results/dp_pp.json)
if [ "$failed_req" -ne 0 ]; then
echo "Some requests were failed!"
exit 1
fi
#echo "--- DP+TP"
#vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -dp=2 --max-model-len=4096 &
#server_pid=$!
#timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1
#vllm bench serve \
# --backend vllm \
# --dataset-name random \
# --model meta-llama/Llama-3.2-3B-Instruct \
# --num-prompts 20 \
# --result-dir ./test_results \
# --result-filename dp_pp.json \
# --save-result \
# --endpoint /v1/completions
#kill -s SIGTERM $server_pid; wait $server_pid || true
#failed_req=$(jq '.failed' ./test_results/dp_pp.json)
#if [ "$failed_req" -ne 0 ]; then
# echo "Some requests were failed!"
# exit 1
#fi

View File

@@ -8,8 +8,6 @@
// libraries use different ISAs.
#define TORCH_EXTENSION_NAME _C
std::string init_cpu_threads_env(const std::string& cpu_ids);
void release_dnnl_matmul_handler(int64_t handler);
int64_t create_onednn_scaled_mm_handler(const torch::Tensor& b,
@@ -354,7 +352,6 @@ TORCH_LIBRARY_EXPAND(TORCH_EXTENSION_NAME, ops) {
"str act, str isa) -> ()");
ops.impl("cpu_fused_moe", torch::kCPU, &cpu_fused_moe);
#endif
ops.def("init_cpu_threads_env(str cpu_ids) -> str", &init_cpu_threads_env);
ops.def(
"mla_decode_kvcache("
" Tensor! out, Tensor query, Tensor kv_cache,"

View File

@@ -21,150 +21,6 @@ std::string init_cpu_threads_env(const std::string& cpu_ids) {
#endif
#ifndef VLLM_NUMA_DISABLED
std::string init_cpu_threads_env(const std::string& cpu_ids) {
bitmask* omp_cpu_mask = numa_parse_cpustring_all(cpu_ids.c_str());
TORCH_CHECK(omp_cpu_mask != nullptr,
"Failed to parse CPU string: " + cpu_ids);
TORCH_CHECK(omp_cpu_mask->size > 0);
std::vector<int> omp_cpu_ids;
omp_cpu_ids.reserve(omp_cpu_mask->size);
constexpr int group_size = 8 * sizeof(*omp_cpu_mask->maskp);
for (int offset = 0; offset < omp_cpu_mask->size; offset += group_size) {
unsigned long group_mask = omp_cpu_mask->maskp[offset / group_size];
int i = 0;
while (group_mask) {
if (group_mask & 1) {
omp_cpu_ids.emplace_back(offset + i);
}
++i;
group_mask >>= 1;
}
}
// Memory node binding
if (numa_available() != -1) {
std::set<int> node_ids;
for (const auto& cpu_id : omp_cpu_ids) {
int node_id = numa_node_of_cpu(cpu_id);
if (node_id != -1) {
node_ids.insert(node_id);
}
}
// Concatenate all node_ids into a single comma-separated string
if (!node_ids.empty()) {
std::string node_ids_str;
for (const int node_id : node_ids) {
if (!node_ids_str.empty()) {
node_ids_str += ",";
}
node_ids_str += std::to_string(node_id);
}
bitmask* mask = numa_parse_nodestring(node_ids_str.c_str());
bitmask* src_mask = numa_get_mems_allowed();
int pid = getpid();
if (mask && src_mask) {
// move all existing pages to the specified numa node.
*(src_mask->maskp) = *(src_mask->maskp) ^ *(mask->maskp);
int page_num = numa_migrate_pages(pid, src_mask, mask);
if (page_num == -1) {
TORCH_WARN("numa_migrate_pages failed. errno: " +
std::to_string(errno));
}
// Restrict memory allocation to the selected NUMA node(s).
// Enhances memory locality for the threads bound to those NUMA CPUs.
if (node_ids.size() > 1) {
errno = 0;
numa_set_interleave_mask(mask);
if (errno != 0) {
TORCH_WARN("numa_set_interleave_mask failed. errno: " +
std::to_string(errno));
} else {
TORCH_WARN(
"NUMA binding: Using INTERLEAVE policy for memory "
"allocation across multiple NUMA nodes (nodes: " +
node_ids_str +
"). Memory allocations will be "
"interleaved across the specified NUMA nodes.");
}
} else {
errno = 0;
numa_set_membind(mask);
if (errno != 0) {
TORCH_WARN("numa_set_membind failed. errno: " +
std::to_string(errno));
} else {
TORCH_WARN(
"NUMA binding: Using MEMBIND policy for memory "
"allocation on the NUMA nodes (" +
node_ids_str +
"). Memory allocations will be "
"strictly bound to these NUMA nodes.");
}
}
numa_set_strict(1);
numa_free_nodemask(mask);
numa_free_nodemask(src_mask);
} else {
TORCH_WARN(
"numa_parse_nodestring or numa_get_run_node_mask failed. errno: " +
std::to_string(errno));
}
}
}
// OMP threads binding
omp_set_num_threads((int)omp_cpu_ids.size());
torch::set_num_threads((int)omp_cpu_ids.size());
TORCH_CHECK_EQ(omp_cpu_ids.size(), torch::get_num_threads());
TORCH_CHECK_EQ(omp_cpu_ids.size(), omp_get_max_threads());
std::vector<std::pair<int, int>> thread_core_mapping;
thread_core_mapping.reserve(omp_cpu_ids.size());
omp_lock_t writelock;
omp_init_lock(&writelock);
#pragma omp parallel for schedule(static, 1)
for (size_t i = 0; i < omp_cpu_ids.size(); ++i) {
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(omp_cpu_ids[i], &mask);
int ret = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
if (ret == -1) {
TORCH_CHECK(false,
"sched_setaffinity failed. errno: " + std::to_string(errno));
}
omp_set_lock(&writelock);
thread_core_mapping.emplace_back(gettid(), omp_cpu_ids[i]);
omp_unset_lock(&writelock);
}
omp_destroy_lock(&writelock);
numa_free_nodemask(omp_cpu_mask);
std::stringstream ss;
ss << "OMP threads binding of Process " << getpid() << ":\n";
std::sort(thread_core_mapping.begin(), thread_core_mapping.end(),
[](auto&& a, auto&& b) { return a.second < b.second; });
for (auto&& item : thread_core_mapping) {
ss << "\t"
<< "OMP tid: " << item.first << ", core " << item.second << "\n";
}
return ss.str();
}
#endif // VLLM_NUMA_DISABLED
namespace cpu_utils {
ScratchPadManager::ScratchPadManager() : size_(0), ptr_(nullptr) {
this->realloc(allocation_unit * 128);

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import glob
import json
import os
import platform
import subprocess
@@ -11,11 +10,11 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING
import psutil
import regex as re
import torch
from vllm import envs
from vllm.logger import init_logger
from vllm.utils.ompmultiprocessing import OMPProcessManager
from vllm.utils.torch_utils import is_quantized_kv_cache
from vllm.v1.attention.backends.registry import AttentionBackendEnum
@@ -76,6 +75,10 @@ class CpuPlatform(Platform):
dispatch_key: str = "CPU"
dist_backend: str = "gloo"
device_control_env_var = "CPU_VISIBLE_MEMORY_NODES"
omp_process_manager = None
smt = 1 # SMT level for OMP - 4 threads on PowerPC, 1 on others
global_cpu_mask = None
simulate_numa = int(os.environ.get("_SIM_MULTI_NUMA", 0))
@property
def supported_dtypes(self) -> list[torch.dtype]:
@@ -191,26 +194,10 @@ class CpuPlatform(Platform):
cache_config.cpu_kvcache_space_bytes = CpuPlatform.get_device_total_memory()
# reserve at least one core for nixl_connector under p/d case
if vllm_config.kv_transfer_config and (
envs.VLLM_CPU_NUM_OF_RESERVED_CPU == 0
or envs.VLLM_CPU_NUM_OF_RESERVED_CPU is None
):
os.environ["VLLM_CPU_NUM_OF_RESERVED_CPU"] = "1"
parallel_config = vllm_config.parallel_config
if (
parallel_config.world_size > 1
and parallel_config.distributed_executor_backend is not None
and parallel_config.distributed_executor_backend != "mp"
):
logger.warning(
(
"%s is not supported on CPU, fallback to mp "
"distributed executor backend."
),
parallel_config.distributed_executor_backend,
)
# OMP requires the MP executor to function correctly, UniProc is not
# supported as it is not possible to set the OMP environment correctly
if parallel_config.distributed_executor_backend == "uni":
parallel_config.distributed_executor_backend = "mp"
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = "vllm.v1.worker.cpu_worker.CPUWorker"
@@ -267,14 +254,6 @@ class CpuPlatform(Platform):
# variable "NUMEXPR_MAX_THREADS" (64)'.
os.environ["NUMEXPR_MAX_THREADS"] = str(get_max_threads())
if envs.VLLM_CPU_OMP_THREADS_BIND != "nobind":
# Set default threads num for OpenMP parallel
os.environ["OMP_NUM_THREADS"] = str(torch.get_num_threads())
else:
# In this case, setting the OpenMP configuration via
# OMP_NUM_THREADS is up to the user.
logger.info("Disabling binding processes to CPU cores...")
# Disable torch async compiling which won't work with daemonic processes
os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1"
@@ -286,8 +265,8 @@ class CpuPlatform(Platform):
ld_preload_str = os.getenv("LD_PRELOAD", "")
# Intel OpenMP setting
if "libiomp5.so" in ld_preload_str:
# Intel and CLANG OpenMP setting
if "libiomp5.so" in ld_preload_str or "libomp5" in ld_preload_str:
# The time(milliseconds) that a thread should wait after
# completing the execution of a parallel region, before sleeping.
os.environ["KMP_BLOCKTIME"] = "1"
@@ -324,37 +303,6 @@ class CpuPlatform(Platform):
ld_preload_str = tcmalloc_so
os.environ["LD_PRELOAD"] = ld_preload_str
if (
platform.system() == "Linux"
and cpu_architecture in (CpuArchEnum.ARM, CpuArchEnum.POWERPC)
and not ("libomp" in ld_preload_str or "libgomp" in ld_preload_str)
):
# We need to LD_PRELOAD PyTorch's libgomp, otherwise only
# one core will be properly utilized when we thread-bind
# See: https://github.com/vllm-project/vllm/issues/27369
# TODO: Remove once:
# https://github.com/pytorch/pytorch/issues/166087 is fixed
# We need to find the location of PyTorch's libgomp
torch_pkg = os.path.dirname(torch.__file__)
site_root = os.path.dirname(torch_pkg)
# Search both torch.libs and torch/lib - See: https://github.com/vllm-project/vllm/issues/30470
torch_libs_paths = [
os.path.join(site_root, "torch.libs"),
os.path.join(torch_pkg, "lib"),
]
pytorch_libgomp_so_candidates = []
for torch_libs in torch_libs_paths:
pytorch_libgomp_so_candidates.extend(
glob.glob(os.path.join(torch_libs, "libgomp*.so*"))
)
if pytorch_libgomp_so_candidates:
pytorch_libgomp_so = pytorch_libgomp_so_candidates[0]
if ld_preload_str:
ld_preload_str += ":"
ld_preload_str += pytorch_libgomp_so
os.environ["LD_PRELOAD"] = ld_preload_str
os.environ["LOCAL_WORLD_SIZE"] = str(
vllm_config.parallel_config.tensor_parallel_size
)
@@ -369,6 +317,13 @@ class CpuPlatform(Platform):
vllm_config.model_config.max_model_len,
vllm_config.scheduler_config.DEFAULT_MAX_NUM_BATCHED_TOKENS,
)
# CI specific "quick" NUMA simulation - split all available CPUs
# into a fake NUMA topology
if os.environ.get("VLLM_CPU_SIM_MULTI_NUMA", None) is not None:
os.environ["_SIM_MULTI_NUMA"] = str(
vllm_config.parallel_config.world_size
* vllm_config.parallel_config._api_process_count
)
@classmethod
def update_block_size_for_backend(cls, vllm_config: "VllmConfig") -> None:
@@ -377,46 +332,71 @@ class CpuPlatform(Platform):
pass
@classmethod
def get_allowed_cpu_core_node_list(cls) -> tuple[list[int], list[LogicalCPUInfo]]:
assert platform.system() == "Linux"
def get_omp_manager(cls) -> OMPProcessManager:
# initialise the OMP resource management if need be and return the manager
if cls.omp_process_manager is None:
if cls.get_cpu_architecture() == CpuArchEnum.POWERPC:
cls.smt = 4
cls.omp_process_manager = OMPProcessManager(
affinity=cls.get_global_cpu_mask(), smt=cls.smt
)
# we need to fix up the topology returned by the OMP Manager for
# simulated NUMA environments in CI
if cls.simulate_numa > 0:
logger.info(
"Adjusting numa topology to resemble at least %d nodes",
int(cls.simulate_numa),
)
om = cls.omp_process_manager
while len(om.omp_places) < cls.simulate_numa:
new_omp_places = []
touched = False
for omp_place in om.omp_places:
if len(omp_place["mask"]) > 1:
touched = True
cpu_list = sorted(list(omp_place["mask"]))
new_omp_places.append(
{
"mask": set(cpu_list[0 : int(len(cpu_list) / 2)]),
"available": True,
}
)
new_omp_places.append(
{
"mask": set(cpu_list[int(len(cpu_list) / 2) :]),
"available": True,
}
)
if touched:
om.omp_places = new_omp_places
else:
raise ValueError(
"Cannot split the existing NUMA topology to match "
"simulation requirements"
)
# Init LogicalCPUInfo from lscpu
lscpu_output = subprocess.check_output(
"lscpu -J -e=CPU,CORE,NODE", shell=True, text=True
return cls.omp_process_manager
@classmethod
def get_global_cpu_mask(cls) -> set[int]:
# get global cpu mask
if cls.global_cpu_mask is None:
cls.global_cpu_mask = os.sched_getaffinity(0)
return cls.global_cpu_mask
@classmethod
def reserve_cpus(cls, reserve: set[int]) -> bool:
# remove CPUs from global mask, for now there is no "release" mechanism
if cls.omp_process_manager is not None:
for place in cls.omp_process_manager.omp_places:
if not place["available"]:
return False
cls.global_cpu_mask = cls.get_global_cpu_mask() - reserve
# reinitialize OMP resource management
cls.omp_process_manager = OMPProcessManager(
affinity=cls.global_cpu_mask, smt=cls.smt
)
lscpu_output = re.sub(r'"node":\s*-\s*(,|\n)', r'"node": 0\1', lscpu_output)
logical_cpu_list: list[LogicalCPUInfo] = json.loads(
lscpu_output, object_hook=LogicalCPUInfo.json_decoder
)["cpus"]
# Filter CPUs with invalid attributes
logical_cpu_list = [
x
for x in logical_cpu_list
if -1 not in (x.id, x.physical_core, x.numa_node)
]
# Filter allowed CPUs
if hasattr(os, "sched_getaffinity"):
allowed_cpu_id_list = os.sched_getaffinity(0)
else:
raise NotImplementedError("Unsupported OS")
logical_cpu_list = [x for x in logical_cpu_list if x.id in allowed_cpu_id_list]
# Get allowed NUMA nodes
allowed_numa_nodes = set()
for x in logical_cpu_list:
allowed_numa_nodes.add(x.numa_node) # type: ignore
allowed_numa_nodes_list = sorted(allowed_numa_nodes)
env_key = CpuPlatform.device_control_env_var
if env_key in os.environ and os.environ[env_key] != "":
visible_nodes = [int(s) for s in os.environ[env_key].split(",")]
allowed_numa_nodes_list = [
x for x in sorted(list(set(visible_nodes))) if x in allowed_numa_nodes
]
return allowed_numa_nodes_list, logical_cpu_list
return True
@classmethod
def discover_numa_topology(cls) -> list[list[int]]:

View File

@@ -0,0 +1,174 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""OMP Aware Multiprocessing manager for running multiprocessing.Process()
Copyright (c) 2026 Red Hat Inc
Copyright (c) 2026 Cambridge Greys Ltd
"""
import json
import os
import subprocess
def _int(arg):
"""Relaxed parsing of ints which handles a - instead of a number.
The lscpu json may contain that for nodes in some cases. If that
is the case we parse it to zero
"""
try:
if int(arg) >= 0:
return int(arg)
except ValueError:
pass
return 0
def parse_mask(mask):
"""Expand a X-Y,Z list"""
result = []
for token in mask.split(","):
try:
start, finish = token.split("-")
if int(start) > int(finish):
raise IndexError("Invalid Indexes for cpu ranges")
for cpu in range(int(start), int(finish) + 1):
result.append(cpu)
except ValueError:
result.append(int(token))
return set(result)
def enumerate_resources(resource_map, mask=None, allowed=None):
"""Enumerate system resources"""
if allowed is None:
allowed = os.sched_getaffinity(0)
if mask is not None:
allowed = allowed & mask
try:
allowed_nodes = parse_mask(os.environ["CPU_VISIBLE_MEMORY_NODES"])
except KeyError:
allowed_nodes = None
lscpu: dict[str, dict] = {"cpus": {}, "cores": {}, "nodes": {}}
for cpu in resource_map["cpus"]:
cpunum = int(cpu["cpu"])
if (
cpunum in allowed
and cpunum >= 0
and (allowed_nodes is None or _int(cpu["node"]) in allowed_nodes)
):
lscpu["cpus"][cpunum] = [cpu]
core = _int(cpu["core"])
if lscpu["cores"].get(core, None) is None:
lscpu["cores"][core] = [cpu]
else:
lscpu["cores"][core].append(cpu)
node = _int(cpu["node"])
if lscpu["nodes"].get(node, None) is None:
lscpu["nodes"][node] = [cpu]
else:
lscpu["nodes"][node].append(cpu)
return lscpu
def produce_cpu_list(cpus, smt=1):
"""Produce a CPU list with/without SMT pairs - main cpu list case"""
mask: list[int] = []
for key, value in cpus.items():
exists = 0
for cpu in mask:
if cpu == value[0]["core"]:
exists += 1
break
if exists < smt:
mask.append(int(key))
return {"mask": set(mask), "available": True}
def produce_cpu_sublist(scpus, smt=1):
"""Produce a CPU list with/without SMT pairs - resource leaf case"""
cpu_list: list[dict] = []
for value in scpus:
exists = 0
for cpu in cpu_list:
if int(cpu["core"]) == int(value["core"]):
exists += 1
break
if exists < smt:
cpu_list.append(value)
mask = []
for cpu in cpu_list:
mask.append(int(cpu["cpu"]))
return {"mask": set(mask), "available": True}
def create_omp_places(resources, strategy, smt=True):
"""Parse CPU topology and generate possible CPU masks"""
omp_places = []
if strategy == "all":
omp_places.append(produce_cpu_list(resources["cpus"], smt))
elif strategy == "cores":
for value in resources["cores"].values():
omp_places.append(produce_cpu_sublist(value, smt))
elif strategy == "nodes":
for value in resources["nodes"].values():
omp_places.append(produce_cpu_sublist(value, smt))
else:
raise NotImplementedError("Unknown strategy")
return omp_places
# pylint: disable=too-few-public-methods
class OMPProcessManager:
"""OMP aware wrapper to run mp Process()"""
def __init__(self, strategy="nodes", smt=1, mock=None, affinity=None):
self.strategy = strategy
self.smt = smt
self.omp_places = []
vllm_mask = os.environ.get("VLLM_CPU_OMP_THREADS_BIND", None)
self.setup_omp = vllm_mask != "nobind"
if self.setup_omp:
omp_places = []
if vllm_mask is not None:
masks = []
for spec in vllm_mask.split("|"):
masks.append(parse_mask(spec))
else:
masks = [None]
if mock is None:
data = subprocess.run(
["lscpu", "-Je"], check=True, capture_output=True
).stdout
else:
with open(mock, mode="rb") as jf:
data = jf.read()
lscpu = json.loads(data)
for mask in masks:
resources = enumerate_resources(lscpu, mask, affinity)
omp_places.extend(create_omp_places(resources, strategy, smt))
self.omp_places = sorted(
omp_places,
key=lambda p: "{:04d}-{:04d}".format(len(p["mask"]), max(p["mask"])),
reverse=True,
)
def run(self, what, *args, **kwargs):
"""Run arg with correct OMP environment"""
if self.setup_omp:
for place in self.omp_places:
if place["available"]:
reserve = int(os.environ.get("VLLM_CPU_NUM_OF_RESERVED_CPU", 0))
place["available"] = False
# pylint: disable=consider-using-f-string
os.environ["OMP_PLACES"] = "{}".format(place["mask"])
os.environ["OMP_NUM_THREADS"] = "{}".format(
len(place["mask"]) - reserve
)
os.environ["OMP_PROC_BIND"] = "TRUE"
return what(*args, **kwargs)
raise IndexError("Out of OMP places")
return what(*args, **kwargs)

View File

@@ -119,7 +119,6 @@ class MultiprocExecutor(Executor):
f"_parallel_size ({pcp_size}). "
)
# Set multiprocessing envs
set_multiprocessing_worker_envs()
# use the loopback address get_loopback_ip() for communication.
@@ -172,16 +171,31 @@ class MultiprocExecutor(Executor):
for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank)
unready_worker_handle = WorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
inherited_fds=inherited_fds,
)
if current_platform.is_cpu():
om = current_platform.get_omp_manager()
logger.info("Configured OMP PLACES %s", str(om.omp_places))
unready_worker_handle = om.run(
WorkerProc.make_worker_process,
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
inherited_fds=inherited_fds,
)
else:
unready_worker_handle = WorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
inherited_fds=inherited_fds,
)
unready_workers.append(unready_worker_handle)
if inherited_fds is not None:
inherited_fds.append(unready_worker_handle.death_writer.fileno())
@@ -1000,24 +1014,26 @@ def set_multiprocessing_worker_envs():
_maybe_force_spawn()
# Configure thread parallelism if OMP_NUM_THREADS isn't set
#
# Helps to avoid CPU contention. The default of spawning a thread per
# core combined with multiprocessing for each GPU can have a negative
# impact on performance. The contention is amplified when running in a
# container where CPU limits can cause throttling.
default_omp_num_threads = 1
if (
"OMP_NUM_THREADS" not in os.environ
and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads
):
logger.warning_once(
"Reducing Torch parallelism from %d threads to %d to avoid "
"unnecessary CPU contention. Set OMP_NUM_THREADS in the "
"external environment to tune this value as needed.",
current_parallelism,
default_omp_num_threads,
scope="local",
)
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)
if not current_platform.is_cpu():
# Configure thread parallelism if OMP_NUM_THREADS isn't set
#
# Helps to avoid CPU contention. The default of spawning a thread per
# core combined with multiprocessing for each GPU can have a negative
# impact on performance. The contention is amplified when running in a
# container where CPU limits can cause throttling.
default_omp_num_threads = 1
if (
"OMP_NUM_THREADS" not in os.environ
and (current_parallelism := torch.get_num_threads())
> default_omp_num_threads
):
logger.warning_once(
"Reducing Torch parallelism from %d threads to %d to avoid "
"unnecessary CPU contention. Set OMP_NUM_THREADS in the "
"external environment to tune this value as needed.",
current_parallelism,
default_omp_num_threads,
scope="local",
)
os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads)
torch.set_num_threads(default_omp_num_threads)

View File

@@ -1,18 +1,14 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
import platform
import sys
from collections.abc import Callable
from typing import Any
import torch
from vllm import envs
from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.platforms import CpuArchEnum, current_platform
from vllm.platforms.cpu import CpuPlatform, LogicalCPUInfo
from vllm.profiler.wrapper import TorchProfilerWrapper
from vllm.utils.torch_utils import set_random_seed
from vllm.v1.worker.cpu_model_runner import CPUModelRunner
@@ -71,44 +67,6 @@ class CPUWorker(Worker):
if current_platform.get_cpu_architecture() == CpuArchEnum.X86:
check_preloaded_libs("libiomp")
# Setup OpenMP threads affinity.
omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND
# Under numa binding some cores reserved for kv transfer in nixl_connector.py
if omp_cpuids == "auto" and platform.system() == "Linux":
cpu_arch = current_platform.get_cpu_architecture()
if cpu_arch in (CpuArchEnum.POWERPC, CpuArchEnum.S390X):
# For S390X/POWERPC SMT-8/4/2
self.local_omp_cpuid = self._get_autobind_cpu_ids(
lambda cpus: [cpu for cpu in cpus if cpu.id % 8 < 4]
)
elif cpu_arch == CpuArchEnum.X86:
# For x86 SMT-2, use 1 CPU per core
self.local_omp_cpuid = self._get_autobind_cpu_ids(
lambda cpus: cpus[-1:]
)
elif cpu_arch == CpuArchEnum.ARM:
# For AArch64, no SMT
self.local_omp_cpuid = self._get_autobind_cpu_ids(lambda cpus: cpus)
else:
self.local_omp_cpuid = "nobind"
elif omp_cpuids == "nobind":
self.local_omp_cpuid = "nobind"
else:
local_dp_rank = self.parallel_config.data_parallel_rank_local
omp_cpuids_list = omp_cpuids.split("|")
if local_dp_rank is not None:
world_size = self.parallel_config.world_size
omp_cpuids_list = omp_cpuids_list[
local_dp_rank * world_size : (local_dp_rank + 1) * world_size
]
self.local_omp_cpuid = omp_cpuids_list[self.rank]
if self.local_omp_cpuid != "nobind":
ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid)
if ret:
logger.info(ret)
# After the thread binding, changing thread num is not allowed
def skip_set_num_threads(x: int):
logger.warning(
"CPU backend doesn't allow to use "
@@ -153,92 +111,6 @@ class CPUWorker(Worker):
self.model_runner.warming_up_model()
return self.compilation_config.compilation_time
def _get_autobind_cpu_ids(
self, cpu_selector: Callable[[list[LogicalCPUInfo]], list[LogicalCPUInfo]]
) -> str:
"""
Return CPU ids to bind based on NUMA nodes.
Currently for rank N, only CPU ids on the N-th node in available NUMA
node list will be selected.
Args:
cpu_selector: a callable object to select CPUs from a CPU list
of a physical core. The input is a LogicalCPUInfo list, sorted by
the LogicalCPUInfo.id. A selected LogicalCPUInfo list should be
returned.
"""
# simulate multiple numa nodes, for testing
sim_multi_numa_nodes = os.environ.get("VLLM_CPU_SIM_MULTI_NUMA", "0") != "0"
allowed_numa_nodes, logical_cpu_list = (
CpuPlatform.get_allowed_cpu_core_node_list()
)
local_world_size = self.parallel_config.local_world_size
assert len(allowed_numa_nodes) >= local_world_size or sim_multi_numa_nodes, (
f"Not enough allowed NUMA nodes to bind threads of "
f"{local_world_size} local CPUWorkers. "
f"Allowed NUMA nodes are {allowed_numa_nodes}. "
"Please try to bind threads manually."
)
if not sim_multi_numa_nodes:
# Get CPUs on NUMA node `allowed_numa_nodes[local_rank]`
selected_numa_node = allowed_numa_nodes[self.local_rank] # type: ignore
logical_cpu_list = [
x for x in logical_cpu_list if x.numa_node == selected_numa_node
]
else:
# This is a bit tricky because the internal DP size
# is always 1 for non-MoE models
world_size_across_dp = (
self.parallel_config.world_size
* self.parallel_config._api_process_count
)
assert len(logical_cpu_list) >= world_size_across_dp
logical_cpu_list = sorted(logical_cpu_list, key=lambda x: x.numa_node)
sim_cpu_num_per_node = len(logical_cpu_list) // world_size_across_dp
assert self.parallel_config.data_parallel_rank_local is not None
start_idx = (
self.local_rank
+ self.parallel_config.world_size
* self.parallel_config.data_parallel_rank_local
) * sim_cpu_num_per_node
logical_cpu_list = logical_cpu_list[
start_idx : (start_idx + sim_cpu_num_per_node)
]
# Select CPUs from each physical core via cpu_selector
core_to_cpus: dict[int, list[LogicalCPUInfo]] = {}
for cpu_info in logical_cpu_list:
if cpu_info.physical_core not in core_to_cpus:
core_to_cpus[cpu_info.physical_core] = []
core_to_cpus[cpu_info.physical_core].append(cpu_info)
logical_cpu_list = []
for cpu_list in core_to_cpus.values():
cpu_list = sorted(cpu_list, key=lambda x: x.id)
logical_cpu_list.extend(cpu_selector(cpu_list))
logical_cpu_list = sorted(logical_cpu_list, key=lambda x: x.id)
# Reserve CPUs for other processes
reserve_cpu_num = envs.VLLM_CPU_NUM_OF_RESERVED_CPU
if reserve_cpu_num is None:
need_reserve = (
self.parallel_config.world_size > 1
or self.parallel_config.data_parallel_size_local > 1
)
reserve_cpu_num = 1 if need_reserve else 0
assert len(logical_cpu_list) > reserve_cpu_num, (
f"VLLM_CPU_NUM_OF_RESERVED_CPU ({reserve_cpu_num}) "
f"should less than {len(logical_cpu_list)}."
)
if reserve_cpu_num != 0:
logical_cpu_list = logical_cpu_list[:-reserve_cpu_num]
logger.info(
"auto thread-binding list (id, physical core): %s",
[(x.id, x.physical_core) for x in logical_cpu_list],
)
return ",".join([str(x.id) for x in logical_cpu_list])
def profile(self, is_start: bool = True, profile_prefix: str | None = None):
if self.profiler is None:
raise RuntimeError("Profiler is not enabled.")