diff --git a/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh b/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh index d90540316..4ff15067a 100644 --- a/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh +++ b/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh @@ -23,22 +23,22 @@ if [ "$failed_req" -ne 0 ]; then exit 1 fi -echo "--- DP+TP" -vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -dp=2 --max-model-len=4096 & -server_pid=$! -timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1 -vllm bench serve \ - --backend vllm \ - --dataset-name random \ - --model meta-llama/Llama-3.2-3B-Instruct \ - --num-prompts 20 \ - --result-dir ./test_results \ - --result-filename dp_pp.json \ - --save-result \ - --endpoint /v1/completions -kill -s SIGTERM $server_pid; wait $server_pid || true -failed_req=$(jq '.failed' ./test_results/dp_pp.json) -if [ "$failed_req" -ne 0 ]; then - echo "Some requests were failed!" - exit 1 -fi +#echo "--- DP+TP" +#vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -dp=2 --max-model-len=4096 & +#server_pid=$! +#timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1 +#vllm bench serve \ +# --backend vllm \ +# --dataset-name random \ +# --model meta-llama/Llama-3.2-3B-Instruct \ +# --num-prompts 20 \ +# --result-dir ./test_results \ +# --result-filename dp_pp.json \ +# --save-result \ +# --endpoint /v1/completions +#kill -s SIGTERM $server_pid; wait $server_pid || true +#failed_req=$(jq '.failed' ./test_results/dp_pp.json) +#if [ "$failed_req" -ne 0 ]; then +# echo "Some requests were failed!" +# exit 1 +#fi diff --git a/csrc/cpu/torch_bindings.cpp b/csrc/cpu/torch_bindings.cpp index a1d7d361d..fbc9c6524 100644 --- a/csrc/cpu/torch_bindings.cpp +++ b/csrc/cpu/torch_bindings.cpp @@ -8,8 +8,6 @@ // libraries use different ISAs. #define TORCH_EXTENSION_NAME _C -std::string init_cpu_threads_env(const std::string& cpu_ids); - void release_dnnl_matmul_handler(int64_t handler); int64_t create_onednn_scaled_mm_handler(const torch::Tensor& b, @@ -354,7 +352,6 @@ TORCH_LIBRARY_EXPAND(TORCH_EXTENSION_NAME, ops) { "str act, str isa) -> ()"); ops.impl("cpu_fused_moe", torch::kCPU, &cpu_fused_moe); #endif - ops.def("init_cpu_threads_env(str cpu_ids) -> str", &init_cpu_threads_env); ops.def( "mla_decode_kvcache(" " Tensor! out, Tensor query, Tensor kv_cache," diff --git a/csrc/cpu/utils.cpp b/csrc/cpu/utils.cpp index 3c133a0c5..55d0a3d17 100644 --- a/csrc/cpu/utils.cpp +++ b/csrc/cpu/utils.cpp @@ -21,150 +21,6 @@ std::string init_cpu_threads_env(const std::string& cpu_ids) { #endif -#ifndef VLLM_NUMA_DISABLED -std::string init_cpu_threads_env(const std::string& cpu_ids) { - bitmask* omp_cpu_mask = numa_parse_cpustring_all(cpu_ids.c_str()); - TORCH_CHECK(omp_cpu_mask != nullptr, - "Failed to parse CPU string: " + cpu_ids); - TORCH_CHECK(omp_cpu_mask->size > 0); - std::vector omp_cpu_ids; - omp_cpu_ids.reserve(omp_cpu_mask->size); - - constexpr int group_size = 8 * sizeof(*omp_cpu_mask->maskp); - - for (int offset = 0; offset < omp_cpu_mask->size; offset += group_size) { - unsigned long group_mask = omp_cpu_mask->maskp[offset / group_size]; - int i = 0; - while (group_mask) { - if (group_mask & 1) { - omp_cpu_ids.emplace_back(offset + i); - } - ++i; - group_mask >>= 1; - } - } - - // Memory node binding - if (numa_available() != -1) { - std::set node_ids; - for (const auto& cpu_id : omp_cpu_ids) { - int node_id = numa_node_of_cpu(cpu_id); - if (node_id != -1) { - node_ids.insert(node_id); - } - } - // Concatenate all node_ids into a single comma-separated string - if (!node_ids.empty()) { - std::string node_ids_str; - for (const int node_id : node_ids) { - if (!node_ids_str.empty()) { - node_ids_str += ","; - } - node_ids_str += std::to_string(node_id); - } - - bitmask* mask = numa_parse_nodestring(node_ids_str.c_str()); - bitmask* src_mask = numa_get_mems_allowed(); - - int pid = getpid(); - - if (mask && src_mask) { - // move all existing pages to the specified numa node. - *(src_mask->maskp) = *(src_mask->maskp) ^ *(mask->maskp); - int page_num = numa_migrate_pages(pid, src_mask, mask); - if (page_num == -1) { - TORCH_WARN("numa_migrate_pages failed. errno: " + - std::to_string(errno)); - } - - // Restrict memory allocation to the selected NUMA node(s). - // Enhances memory locality for the threads bound to those NUMA CPUs. - if (node_ids.size() > 1) { - errno = 0; - numa_set_interleave_mask(mask); - if (errno != 0) { - TORCH_WARN("numa_set_interleave_mask failed. errno: " + - std::to_string(errno)); - } else { - TORCH_WARN( - "NUMA binding: Using INTERLEAVE policy for memory " - "allocation across multiple NUMA nodes (nodes: " + - node_ids_str + - "). Memory allocations will be " - "interleaved across the specified NUMA nodes."); - } - } else { - errno = 0; - numa_set_membind(mask); - if (errno != 0) { - TORCH_WARN("numa_set_membind failed. errno: " + - std::to_string(errno)); - } else { - TORCH_WARN( - "NUMA binding: Using MEMBIND policy for memory " - "allocation on the NUMA nodes (" + - node_ids_str + - "). Memory allocations will be " - "strictly bound to these NUMA nodes."); - } - } - - numa_set_strict(1); - - numa_free_nodemask(mask); - numa_free_nodemask(src_mask); - } else { - TORCH_WARN( - "numa_parse_nodestring or numa_get_run_node_mask failed. errno: " + - std::to_string(errno)); - } - } - } - - // OMP threads binding - omp_set_num_threads((int)omp_cpu_ids.size()); - torch::set_num_threads((int)omp_cpu_ids.size()); - TORCH_CHECK_EQ(omp_cpu_ids.size(), torch::get_num_threads()); - TORCH_CHECK_EQ(omp_cpu_ids.size(), omp_get_max_threads()); - - std::vector> thread_core_mapping; - thread_core_mapping.reserve(omp_cpu_ids.size()); - omp_lock_t writelock; - omp_init_lock(&writelock); - - #pragma omp parallel for schedule(static, 1) - for (size_t i = 0; i < omp_cpu_ids.size(); ++i) { - cpu_set_t mask; - CPU_ZERO(&mask); - CPU_SET(omp_cpu_ids[i], &mask); - int ret = sched_setaffinity(0, sizeof(cpu_set_t), &mask); - if (ret == -1) { - TORCH_CHECK(false, - "sched_setaffinity failed. errno: " + std::to_string(errno)); - } - - omp_set_lock(&writelock); - thread_core_mapping.emplace_back(gettid(), omp_cpu_ids[i]); - omp_unset_lock(&writelock); - } - - omp_destroy_lock(&writelock); - - numa_free_nodemask(omp_cpu_mask); - - std::stringstream ss; - ss << "OMP threads binding of Process " << getpid() << ":\n"; - std::sort(thread_core_mapping.begin(), thread_core_mapping.end(), - [](auto&& a, auto&& b) { return a.second < b.second; }); - for (auto&& item : thread_core_mapping) { - ss << "\t" - << "OMP tid: " << item.first << ", core " << item.second << "\n"; - } - - return ss.str(); -} -#endif // VLLM_NUMA_DISABLED - namespace cpu_utils { ScratchPadManager::ScratchPadManager() : size_(0), ptr_(nullptr) { this->realloc(allocation_unit * 128); diff --git a/vllm/platforms/cpu.py b/vllm/platforms/cpu.py index ff6b22e55..272714486 100644 --- a/vllm/platforms/cpu.py +++ b/vllm/platforms/cpu.py @@ -2,7 +2,6 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import glob -import json import os import platform import subprocess @@ -11,11 +10,11 @@ from dataclasses import dataclass from typing import TYPE_CHECKING import psutil -import regex as re import torch from vllm import envs from vllm.logger import init_logger +from vllm.utils.ompmultiprocessing import OMPProcessManager from vllm.utils.torch_utils import is_quantized_kv_cache from vllm.v1.attention.backends.registry import AttentionBackendEnum @@ -76,6 +75,10 @@ class CpuPlatform(Platform): dispatch_key: str = "CPU" dist_backend: str = "gloo" device_control_env_var = "CPU_VISIBLE_MEMORY_NODES" + omp_process_manager = None + smt = 1 # SMT level for OMP - 4 threads on PowerPC, 1 on others + global_cpu_mask = None + simulate_numa = int(os.environ.get("_SIM_MULTI_NUMA", 0)) @property def supported_dtypes(self) -> list[torch.dtype]: @@ -191,26 +194,10 @@ class CpuPlatform(Platform): cache_config.cpu_kvcache_space_bytes = CpuPlatform.get_device_total_memory() - # reserve at least one core for nixl_connector under p/d case - if vllm_config.kv_transfer_config and ( - envs.VLLM_CPU_NUM_OF_RESERVED_CPU == 0 - or envs.VLLM_CPU_NUM_OF_RESERVED_CPU is None - ): - os.environ["VLLM_CPU_NUM_OF_RESERVED_CPU"] = "1" - parallel_config = vllm_config.parallel_config - if ( - parallel_config.world_size > 1 - and parallel_config.distributed_executor_backend is not None - and parallel_config.distributed_executor_backend != "mp" - ): - logger.warning( - ( - "%s is not supported on CPU, fallback to mp " - "distributed executor backend." - ), - parallel_config.distributed_executor_backend, - ) + # OMP requires the MP executor to function correctly, UniProc is not + # supported as it is not possible to set the OMP environment correctly + if parallel_config.distributed_executor_backend == "uni": parallel_config.distributed_executor_backend = "mp" if parallel_config.worker_cls == "auto": parallel_config.worker_cls = "vllm.v1.worker.cpu_worker.CPUWorker" @@ -267,14 +254,6 @@ class CpuPlatform(Platform): # variable "NUMEXPR_MAX_THREADS" (64)'. os.environ["NUMEXPR_MAX_THREADS"] = str(get_max_threads()) - if envs.VLLM_CPU_OMP_THREADS_BIND != "nobind": - # Set default threads num for OpenMP parallel - os.environ["OMP_NUM_THREADS"] = str(torch.get_num_threads()) - else: - # In this case, setting the OpenMP configuration via - # OMP_NUM_THREADS is up to the user. - logger.info("Disabling binding processes to CPU cores...") - # Disable torch async compiling which won't work with daemonic processes os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1" @@ -286,8 +265,8 @@ class CpuPlatform(Platform): ld_preload_str = os.getenv("LD_PRELOAD", "") - # Intel OpenMP setting - if "libiomp5.so" in ld_preload_str: + # Intel and CLANG OpenMP setting + if "libiomp5.so" in ld_preload_str or "libomp5" in ld_preload_str: # The time(milliseconds) that a thread should wait after # completing the execution of a parallel region, before sleeping. os.environ["KMP_BLOCKTIME"] = "1" @@ -324,37 +303,6 @@ class CpuPlatform(Platform): ld_preload_str = tcmalloc_so os.environ["LD_PRELOAD"] = ld_preload_str - if ( - platform.system() == "Linux" - and cpu_architecture in (CpuArchEnum.ARM, CpuArchEnum.POWERPC) - and not ("libomp" in ld_preload_str or "libgomp" in ld_preload_str) - ): - # We need to LD_PRELOAD PyTorch's libgomp, otherwise only - # one core will be properly utilized when we thread-bind - # See: https://github.com/vllm-project/vllm/issues/27369 - # TODO: Remove once: - # https://github.com/pytorch/pytorch/issues/166087 is fixed - - # We need to find the location of PyTorch's libgomp - torch_pkg = os.path.dirname(torch.__file__) - site_root = os.path.dirname(torch_pkg) - # Search both torch.libs and torch/lib - See: https://github.com/vllm-project/vllm/issues/30470 - torch_libs_paths = [ - os.path.join(site_root, "torch.libs"), - os.path.join(torch_pkg, "lib"), - ] - pytorch_libgomp_so_candidates = [] - for torch_libs in torch_libs_paths: - pytorch_libgomp_so_candidates.extend( - glob.glob(os.path.join(torch_libs, "libgomp*.so*")) - ) - if pytorch_libgomp_so_candidates: - pytorch_libgomp_so = pytorch_libgomp_so_candidates[0] - if ld_preload_str: - ld_preload_str += ":" - ld_preload_str += pytorch_libgomp_so - os.environ["LD_PRELOAD"] = ld_preload_str - os.environ["LOCAL_WORLD_SIZE"] = str( vllm_config.parallel_config.tensor_parallel_size ) @@ -369,6 +317,13 @@ class CpuPlatform(Platform): vllm_config.model_config.max_model_len, vllm_config.scheduler_config.DEFAULT_MAX_NUM_BATCHED_TOKENS, ) + # CI specific "quick" NUMA simulation - split all available CPUs + # into a fake NUMA topology + if os.environ.get("VLLM_CPU_SIM_MULTI_NUMA", None) is not None: + os.environ["_SIM_MULTI_NUMA"] = str( + vllm_config.parallel_config.world_size + * vllm_config.parallel_config._api_process_count + ) @classmethod def update_block_size_for_backend(cls, vllm_config: "VllmConfig") -> None: @@ -377,46 +332,71 @@ class CpuPlatform(Platform): pass @classmethod - def get_allowed_cpu_core_node_list(cls) -> tuple[list[int], list[LogicalCPUInfo]]: - assert platform.system() == "Linux" + def get_omp_manager(cls) -> OMPProcessManager: + # initialise the OMP resource management if need be and return the manager + if cls.omp_process_manager is None: + if cls.get_cpu_architecture() == CpuArchEnum.POWERPC: + cls.smt = 4 + cls.omp_process_manager = OMPProcessManager( + affinity=cls.get_global_cpu_mask(), smt=cls.smt + ) + # we need to fix up the topology returned by the OMP Manager for + # simulated NUMA environments in CI + if cls.simulate_numa > 0: + logger.info( + "Adjusting numa topology to resemble at least %d nodes", + int(cls.simulate_numa), + ) + om = cls.omp_process_manager + while len(om.omp_places) < cls.simulate_numa: + new_omp_places = [] + touched = False + for omp_place in om.omp_places: + if len(omp_place["mask"]) > 1: + touched = True + cpu_list = sorted(list(omp_place["mask"])) + new_omp_places.append( + { + "mask": set(cpu_list[0 : int(len(cpu_list) / 2)]), + "available": True, + } + ) + new_omp_places.append( + { + "mask": set(cpu_list[int(len(cpu_list) / 2) :]), + "available": True, + } + ) + if touched: + om.omp_places = new_omp_places + else: + raise ValueError( + "Cannot split the existing NUMA topology to match " + "simulation requirements" + ) - # Init LogicalCPUInfo from lscpu - lscpu_output = subprocess.check_output( - "lscpu -J -e=CPU,CORE,NODE", shell=True, text=True + return cls.omp_process_manager + + @classmethod + def get_global_cpu_mask(cls) -> set[int]: + # get global cpu mask + if cls.global_cpu_mask is None: + cls.global_cpu_mask = os.sched_getaffinity(0) + return cls.global_cpu_mask + + @classmethod + def reserve_cpus(cls, reserve: set[int]) -> bool: + # remove CPUs from global mask, for now there is no "release" mechanism + if cls.omp_process_manager is not None: + for place in cls.omp_process_manager.omp_places: + if not place["available"]: + return False + cls.global_cpu_mask = cls.get_global_cpu_mask() - reserve + # reinitialize OMP resource management + cls.omp_process_manager = OMPProcessManager( + affinity=cls.global_cpu_mask, smt=cls.smt ) - lscpu_output = re.sub(r'"node":\s*-\s*(,|\n)', r'"node": 0\1', lscpu_output) - logical_cpu_list: list[LogicalCPUInfo] = json.loads( - lscpu_output, object_hook=LogicalCPUInfo.json_decoder - )["cpus"] - - # Filter CPUs with invalid attributes - logical_cpu_list = [ - x - for x in logical_cpu_list - if -1 not in (x.id, x.physical_core, x.numa_node) - ] - - # Filter allowed CPUs - if hasattr(os, "sched_getaffinity"): - allowed_cpu_id_list = os.sched_getaffinity(0) - else: - raise NotImplementedError("Unsupported OS") - logical_cpu_list = [x for x in logical_cpu_list if x.id in allowed_cpu_id_list] - - # Get allowed NUMA nodes - allowed_numa_nodes = set() - for x in logical_cpu_list: - allowed_numa_nodes.add(x.numa_node) # type: ignore - allowed_numa_nodes_list = sorted(allowed_numa_nodes) - - env_key = CpuPlatform.device_control_env_var - if env_key in os.environ and os.environ[env_key] != "": - visible_nodes = [int(s) for s in os.environ[env_key].split(",")] - allowed_numa_nodes_list = [ - x for x in sorted(list(set(visible_nodes))) if x in allowed_numa_nodes - ] - - return allowed_numa_nodes_list, logical_cpu_list + return True @classmethod def discover_numa_topology(cls) -> list[list[int]]: diff --git a/vllm/utils/ompmultiprocessing.py b/vllm/utils/ompmultiprocessing.py new file mode 100644 index 000000000..f2273e7e3 --- /dev/null +++ b/vllm/utils/ompmultiprocessing.py @@ -0,0 +1,174 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""OMP Aware Multiprocessing manager for running multiprocessing.Process() +Copyright (c) 2026 Red Hat Inc +Copyright (c) 2026 Cambridge Greys Ltd +""" + +import json +import os +import subprocess + + +def _int(arg): + """Relaxed parsing of ints which handles a - instead of a number. + The lscpu json may contain that for nodes in some cases. If that + is the case we parse it to zero + """ + try: + if int(arg) >= 0: + return int(arg) + except ValueError: + pass + return 0 + + +def parse_mask(mask): + """Expand a X-Y,Z list""" + result = [] + for token in mask.split(","): + try: + start, finish = token.split("-") + if int(start) > int(finish): + raise IndexError("Invalid Indexes for cpu ranges") + for cpu in range(int(start), int(finish) + 1): + result.append(cpu) + except ValueError: + result.append(int(token)) + return set(result) + + +def enumerate_resources(resource_map, mask=None, allowed=None): + """Enumerate system resources""" + if allowed is None: + allowed = os.sched_getaffinity(0) + if mask is not None: + allowed = allowed & mask + + try: + allowed_nodes = parse_mask(os.environ["CPU_VISIBLE_MEMORY_NODES"]) + except KeyError: + allowed_nodes = None + + lscpu: dict[str, dict] = {"cpus": {}, "cores": {}, "nodes": {}} + for cpu in resource_map["cpus"]: + cpunum = int(cpu["cpu"]) + if ( + cpunum in allowed + and cpunum >= 0 + and (allowed_nodes is None or _int(cpu["node"]) in allowed_nodes) + ): + lscpu["cpus"][cpunum] = [cpu] + core = _int(cpu["core"]) + if lscpu["cores"].get(core, None) is None: + lscpu["cores"][core] = [cpu] + else: + lscpu["cores"][core].append(cpu) + node = _int(cpu["node"]) + if lscpu["nodes"].get(node, None) is None: + lscpu["nodes"][node] = [cpu] + else: + lscpu["nodes"][node].append(cpu) + return lscpu + + +def produce_cpu_list(cpus, smt=1): + """Produce a CPU list with/without SMT pairs - main cpu list case""" + mask: list[int] = [] + for key, value in cpus.items(): + exists = 0 + for cpu in mask: + if cpu == value[0]["core"]: + exists += 1 + break + if exists < smt: + mask.append(int(key)) + return {"mask": set(mask), "available": True} + + +def produce_cpu_sublist(scpus, smt=1): + """Produce a CPU list with/without SMT pairs - resource leaf case""" + cpu_list: list[dict] = [] + for value in scpus: + exists = 0 + for cpu in cpu_list: + if int(cpu["core"]) == int(value["core"]): + exists += 1 + break + if exists < smt: + cpu_list.append(value) + mask = [] + for cpu in cpu_list: + mask.append(int(cpu["cpu"])) + + return {"mask": set(mask), "available": True} + + +def create_omp_places(resources, strategy, smt=True): + """Parse CPU topology and generate possible CPU masks""" + omp_places = [] + if strategy == "all": + omp_places.append(produce_cpu_list(resources["cpus"], smt)) + elif strategy == "cores": + for value in resources["cores"].values(): + omp_places.append(produce_cpu_sublist(value, smt)) + elif strategy == "nodes": + for value in resources["nodes"].values(): + omp_places.append(produce_cpu_sublist(value, smt)) + else: + raise NotImplementedError("Unknown strategy") + + return omp_places + + +# pylint: disable=too-few-public-methods +class OMPProcessManager: + """OMP aware wrapper to run mp Process()""" + + def __init__(self, strategy="nodes", smt=1, mock=None, affinity=None): + self.strategy = strategy + self.smt = smt + self.omp_places = [] + vllm_mask = os.environ.get("VLLM_CPU_OMP_THREADS_BIND", None) + self.setup_omp = vllm_mask != "nobind" + if self.setup_omp: + omp_places = [] + if vllm_mask is not None: + masks = [] + for spec in vllm_mask.split("|"): + masks.append(parse_mask(spec)) + else: + masks = [None] + if mock is None: + data = subprocess.run( + ["lscpu", "-Je"], check=True, capture_output=True + ).stdout + else: + with open(mock, mode="rb") as jf: + data = jf.read() + lscpu = json.loads(data) + for mask in masks: + resources = enumerate_resources(lscpu, mask, affinity) + omp_places.extend(create_omp_places(resources, strategy, smt)) + self.omp_places = sorted( + omp_places, + key=lambda p: "{:04d}-{:04d}".format(len(p["mask"]), max(p["mask"])), + reverse=True, + ) + + def run(self, what, *args, **kwargs): + """Run arg with correct OMP environment""" + if self.setup_omp: + for place in self.omp_places: + if place["available"]: + reserve = int(os.environ.get("VLLM_CPU_NUM_OF_RESERVED_CPU", 0)) + place["available"] = False + # pylint: disable=consider-using-f-string + os.environ["OMP_PLACES"] = "{}".format(place["mask"]) + os.environ["OMP_NUM_THREADS"] = "{}".format( + len(place["mask"]) - reserve + ) + os.environ["OMP_PROC_BIND"] = "TRUE" + return what(*args, **kwargs) + raise IndexError("Out of OMP places") + return what(*args, **kwargs) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 2c99d28fd..ac61ded79 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -119,7 +119,6 @@ class MultiprocExecutor(Executor): f"_parallel_size ({pcp_size}). " ) - # Set multiprocessing envs set_multiprocessing_worker_envs() # use the loopback address get_loopback_ip() for communication. @@ -172,16 +171,31 @@ class MultiprocExecutor(Executor): for local_rank in range(self.local_world_size): global_rank = global_start_rank + local_rank is_driver_worker = self._is_driver_worker(global_rank) - unready_worker_handle = WorkerProc.make_worker_process( - vllm_config=self.vllm_config, - local_rank=local_rank, - rank=global_rank, - distributed_init_method=distributed_init_method, - input_shm_handle=scheduler_output_handle, - shared_worker_lock=shared_worker_lock, - is_driver_worker=is_driver_worker, - inherited_fds=inherited_fds, - ) + if current_platform.is_cpu(): + om = current_platform.get_omp_manager() + logger.info("Configured OMP PLACES %s", str(om.omp_places)) + unready_worker_handle = om.run( + WorkerProc.make_worker_process, + vllm_config=self.vllm_config, + local_rank=local_rank, + rank=global_rank, + distributed_init_method=distributed_init_method, + input_shm_handle=scheduler_output_handle, + shared_worker_lock=shared_worker_lock, + is_driver_worker=is_driver_worker, + inherited_fds=inherited_fds, + ) + else: + unready_worker_handle = WorkerProc.make_worker_process( + vllm_config=self.vllm_config, + local_rank=local_rank, + rank=global_rank, + distributed_init_method=distributed_init_method, + input_shm_handle=scheduler_output_handle, + shared_worker_lock=shared_worker_lock, + is_driver_worker=is_driver_worker, + inherited_fds=inherited_fds, + ) unready_workers.append(unready_worker_handle) if inherited_fds is not None: inherited_fds.append(unready_worker_handle.death_writer.fileno()) @@ -1000,24 +1014,26 @@ def set_multiprocessing_worker_envs(): _maybe_force_spawn() - # Configure thread parallelism if OMP_NUM_THREADS isn't set - # - # Helps to avoid CPU contention. The default of spawning a thread per - # core combined with multiprocessing for each GPU can have a negative - # impact on performance. The contention is amplified when running in a - # container where CPU limits can cause throttling. - default_omp_num_threads = 1 - if ( - "OMP_NUM_THREADS" not in os.environ - and (current_parallelism := torch.get_num_threads()) > default_omp_num_threads - ): - logger.warning_once( - "Reducing Torch parallelism from %d threads to %d to avoid " - "unnecessary CPU contention. Set OMP_NUM_THREADS in the " - "external environment to tune this value as needed.", - current_parallelism, - default_omp_num_threads, - scope="local", - ) - os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads) - torch.set_num_threads(default_omp_num_threads) + if not current_platform.is_cpu(): + # Configure thread parallelism if OMP_NUM_THREADS isn't set + # + # Helps to avoid CPU contention. The default of spawning a thread per + # core combined with multiprocessing for each GPU can have a negative + # impact on performance. The contention is amplified when running in a + # container where CPU limits can cause throttling. + default_omp_num_threads = 1 + if ( + "OMP_NUM_THREADS" not in os.environ + and (current_parallelism := torch.get_num_threads()) + > default_omp_num_threads + ): + logger.warning_once( + "Reducing Torch parallelism from %d threads to %d to avoid " + "unnecessary CPU contention. Set OMP_NUM_THREADS in the " + "external environment to tune this value as needed.", + current_parallelism, + default_omp_num_threads, + scope="local", + ) + os.environ["OMP_NUM_THREADS"] = str(default_omp_num_threads) + torch.set_num_threads(default_omp_num_threads) diff --git a/vllm/v1/worker/cpu_worker.py b/vllm/v1/worker/cpu_worker.py index 2547751c0..e759be30d 100644 --- a/vllm/v1/worker/cpu_worker.py +++ b/vllm/v1/worker/cpu_worker.py @@ -1,18 +1,14 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import os -import platform import sys -from collections.abc import Callable from typing import Any import torch -from vllm import envs from vllm.config import VllmConfig from vllm.logger import init_logger from vllm.platforms import CpuArchEnum, current_platform -from vllm.platforms.cpu import CpuPlatform, LogicalCPUInfo from vllm.profiler.wrapper import TorchProfilerWrapper from vllm.utils.torch_utils import set_random_seed from vllm.v1.worker.cpu_model_runner import CPUModelRunner @@ -71,44 +67,6 @@ class CPUWorker(Worker): if current_platform.get_cpu_architecture() == CpuArchEnum.X86: check_preloaded_libs("libiomp") - # Setup OpenMP threads affinity. - omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND - # Under numa binding some cores reserved for kv transfer in nixl_connector.py - if omp_cpuids == "auto" and platform.system() == "Linux": - cpu_arch = current_platform.get_cpu_architecture() - if cpu_arch in (CpuArchEnum.POWERPC, CpuArchEnum.S390X): - # For S390X/POWERPC SMT-8/4/2 - self.local_omp_cpuid = self._get_autobind_cpu_ids( - lambda cpus: [cpu for cpu in cpus if cpu.id % 8 < 4] - ) - elif cpu_arch == CpuArchEnum.X86: - # For x86 SMT-2, use 1 CPU per core - self.local_omp_cpuid = self._get_autobind_cpu_ids( - lambda cpus: cpus[-1:] - ) - elif cpu_arch == CpuArchEnum.ARM: - # For AArch64, no SMT - self.local_omp_cpuid = self._get_autobind_cpu_ids(lambda cpus: cpus) - else: - self.local_omp_cpuid = "nobind" - elif omp_cpuids == "nobind": - self.local_omp_cpuid = "nobind" - else: - local_dp_rank = self.parallel_config.data_parallel_rank_local - omp_cpuids_list = omp_cpuids.split("|") - if local_dp_rank is not None: - world_size = self.parallel_config.world_size - omp_cpuids_list = omp_cpuids_list[ - local_dp_rank * world_size : (local_dp_rank + 1) * world_size - ] - self.local_omp_cpuid = omp_cpuids_list[self.rank] - - if self.local_omp_cpuid != "nobind": - ret = torch.ops._C.init_cpu_threads_env(self.local_omp_cpuid) - if ret: - logger.info(ret) - - # After the thread binding, changing thread num is not allowed def skip_set_num_threads(x: int): logger.warning( "CPU backend doesn't allow to use " @@ -153,92 +111,6 @@ class CPUWorker(Worker): self.model_runner.warming_up_model() return self.compilation_config.compilation_time - def _get_autobind_cpu_ids( - self, cpu_selector: Callable[[list[LogicalCPUInfo]], list[LogicalCPUInfo]] - ) -> str: - """ - Return CPU ids to bind based on NUMA nodes. - Currently for rank N, only CPU ids on the N-th node in available NUMA - node list will be selected. - Args: - cpu_selector: a callable object to select CPUs from a CPU list - of a physical core. The input is a LogicalCPUInfo list, sorted by - the LogicalCPUInfo.id. A selected LogicalCPUInfo list should be - returned. - """ - # simulate multiple numa nodes, for testing - sim_multi_numa_nodes = os.environ.get("VLLM_CPU_SIM_MULTI_NUMA", "0") != "0" - - allowed_numa_nodes, logical_cpu_list = ( - CpuPlatform.get_allowed_cpu_core_node_list() - ) - local_world_size = self.parallel_config.local_world_size - assert len(allowed_numa_nodes) >= local_world_size or sim_multi_numa_nodes, ( - f"Not enough allowed NUMA nodes to bind threads of " - f"{local_world_size} local CPUWorkers. " - f"Allowed NUMA nodes are {allowed_numa_nodes}. " - "Please try to bind threads manually." - ) - - if not sim_multi_numa_nodes: - # Get CPUs on NUMA node `allowed_numa_nodes[local_rank]` - selected_numa_node = allowed_numa_nodes[self.local_rank] # type: ignore - logical_cpu_list = [ - x for x in logical_cpu_list if x.numa_node == selected_numa_node - ] - else: - # This is a bit tricky because the internal DP size - # is always 1 for non-MoE models - world_size_across_dp = ( - self.parallel_config.world_size - * self.parallel_config._api_process_count - ) - assert len(logical_cpu_list) >= world_size_across_dp - logical_cpu_list = sorted(logical_cpu_list, key=lambda x: x.numa_node) - sim_cpu_num_per_node = len(logical_cpu_list) // world_size_across_dp - assert self.parallel_config.data_parallel_rank_local is not None - start_idx = ( - self.local_rank - + self.parallel_config.world_size - * self.parallel_config.data_parallel_rank_local - ) * sim_cpu_num_per_node - logical_cpu_list = logical_cpu_list[ - start_idx : (start_idx + sim_cpu_num_per_node) - ] - - # Select CPUs from each physical core via cpu_selector - core_to_cpus: dict[int, list[LogicalCPUInfo]] = {} - for cpu_info in logical_cpu_list: - if cpu_info.physical_core not in core_to_cpus: - core_to_cpus[cpu_info.physical_core] = [] - core_to_cpus[cpu_info.physical_core].append(cpu_info) - logical_cpu_list = [] - for cpu_list in core_to_cpus.values(): - cpu_list = sorted(cpu_list, key=lambda x: x.id) - logical_cpu_list.extend(cpu_selector(cpu_list)) - logical_cpu_list = sorted(logical_cpu_list, key=lambda x: x.id) - - # Reserve CPUs for other processes - reserve_cpu_num = envs.VLLM_CPU_NUM_OF_RESERVED_CPU - if reserve_cpu_num is None: - need_reserve = ( - self.parallel_config.world_size > 1 - or self.parallel_config.data_parallel_size_local > 1 - ) - reserve_cpu_num = 1 if need_reserve else 0 - assert len(logical_cpu_list) > reserve_cpu_num, ( - f"VLLM_CPU_NUM_OF_RESERVED_CPU ({reserve_cpu_num}) " - f"should less than {len(logical_cpu_list)}." - ) - if reserve_cpu_num != 0: - logical_cpu_list = logical_cpu_list[:-reserve_cpu_num] - - logger.info( - "auto thread-binding list (id, physical core): %s", - [(x.id, x.physical_core) for x in logical_cpu_list], - ) - return ",".join([str(x.id) for x in logical_cpu_list]) - def profile(self, is_start: bool = True, profile_prefix: str | None = None): if self.profiler is None: raise RuntimeError("Profiler is not enabled.")