diff --git a/vllm/v1/worker/cpu_worker.py b/vllm/v1/worker/cpu_worker.py index 0bd3e580b..d31991b5b 100644 --- a/vllm/v1/worker/cpu_worker.py +++ b/vllm/v1/worker/cpu_worker.py @@ -13,12 +13,20 @@ from vllm.logger import init_logger from vllm.model_executor.utils import set_random_seed from vllm.platforms import CpuArchEnum, current_platform from vllm.sequence import IntermediateTensors +from vllm.utils import PlaceholderModule from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.worker.cpu_model_runner import CPUModelRunner from vllm.v1.worker.gpu_worker import (Worker, init_worker_distributed_environment) +try: + import psutil + from numa import info +except ImportError: + psutil = PlaceholderModule("psutil") # type: ignore[assignment] + numa = PlaceholderModule("numa") # type: ignore[assignment] + logger = init_logger(__name__) @@ -37,6 +45,8 @@ class CPUWorker(Worker): is_driver_worker=is_driver_worker) self.parallel_config.disable_custom_all_reduce = True + self.manually_bind_threads_suggestion = ( + "To get better performance, please try to manually bind threads.") def init_device(self): # Setup OpenMP threads affinity. @@ -112,50 +122,111 @@ class CPUWorker(Worker): assert isinstance(output, ModelRunnerOutput) return output if self.is_driver_worker else None + def warn_inability_to_detect_numa(self) -> None: + logger.warning( + "Auto thread-binding failed due to the " + "inability to detect numa nodes. %s", + self.manually_bind_threads_suggestion) + + def warn_lack_of_numa_and_psutil(self) -> None: + logger.warning( + "Auto thread-binding failed due to " + "the lack of package numa and psutil. %s", + self.manually_bind_threads_suggestion) + + def warn_world_size_too_large(self, world_size: int, + node_to_cpus_len: int) -> None: + logger.warning( + "Auto thread-binding failed due to " + "world size: %d being larger than " + "allowed NUMA nodes number: %d. %s", world_size, node_to_cpus_len, + self.manually_bind_threads_suggestion) + + def get_cpus_allow_list_and_numa_size(self): + cpus_allow_list = psutil.Process().cpu_affinity() + numa_size = info.get_num_configured_nodes() + return cpus_allow_list, numa_size + + def auto_thread_binding_based_on_numa_nodes(self, world_size: int, + rank_to_cpus: str) -> str: + cpu_count = psutil.cpu_count(logical=False) + cpus_allow_list, numa_size = self.get_cpus_allow_list_and_numa_size() + if not numa_size: + self.warn_inability_to_detect_numa() + return rank_to_cpus + + cpu_count_per_numa = cpu_count // numa_size + num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU, + cpu_count_per_numa // 2) + + node_to_cpus = [] + for i in range(numa_size): + node_intersect = set( + info.node_to_cpus(i)).intersection(cpus_allow_list) + if bool(node_intersect): + node_to_cpus.append(list(node_intersect)) + + node_to_cpus_len = len(node_to_cpus) + if world_size > node_to_cpus_len: + self.warn_world_size_too_large(world_size, node_to_cpus_len) + else: + end = cpu_count_per_numa - num_of_reserved_cpu + rank_to_cpus_list = node_to_cpus[self.rank][:end] + rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) + logger.info("auto thread-binding list: %s", rank_to_cpus) + return rank_to_cpus + + def libnuma_and_psutil_found(self) -> bool: + libnuma_found = util.find_spec("numa") is not None + psutil_found = util.find_spec("psutil") is not None + + return libnuma_found and psutil_found + def get_cpus_id_binding_based_on_numa_nodes(self) -> str: """Return CPUs id binding based on NUMA nodes. """ rank_to_cpus = self.local_omp_cpuid # Setup OpenMP thread affinity based on NUMA nodes automatically world_size = self.vllm_config.parallel_config.world_size - libnuma_found = util.find_spec("numa") is not None - psutil_found = util.find_spec("psutil") is not None - if libnuma_found and psutil_found: - import psutil - from numa import info - cpu_count = psutil.cpu_count(logical=False) - cpus_allow_list = psutil.Process().cpu_affinity() - numa_size = info.get_num_configured_nodes() - cpu_count_per_numa = cpu_count // numa_size + if self.libnuma_and_psutil_found(): + rank_to_cpus = self.auto_thread_binding_based_on_numa_nodes( + world_size, rank_to_cpus) + else: + self.warn_lack_of_numa_and_psutil() + return rank_to_cpus + + def select_threads_per_power_core(self, + node_cpu_ids: list[int]) -> list[int]: + return [cpu for cpu in node_cpu_ids if cpu % 8 < 4] + + def auto_thread_binding_based_on_numa_nodes_ppc64le( + self, world_size: int, rank_to_cpus: str) -> str: + cpus_allow_list, numa_size = self.get_cpus_allow_list_and_numa_size() + if not numa_size: + self.warn_inability_to_detect_numa() + return rank_to_cpus + + node_to_cpus = [] + for i in range(numa_size): + node_intersect = set( + info.node_to_cpus(i)).intersection(cpus_allow_list) + if bool(node_intersect): + node_to_cpus.append(sorted(list(node_intersect))) + + node_to_cpus_len = len(node_to_cpus) + if world_size > node_to_cpus_len: + self.warn_world_size_too_large(world_size, node_to_cpus_len) + else: + node_cpus_this_rank = node_to_cpus[self.rank] + node_cpus_this_rank = self.select_threads_per_power_core( + node_cpus_this_rank) + cpu_count_per_numa = len(node_cpus_this_rank) num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU, cpu_count_per_numa // 2) - - # check allow node_to_cpus list - node_to_cpus = [] - for i in range(numa_size): - node_intersect = set( - info.node_to_cpus(i)).intersection(cpus_allow_list) - if bool(node_intersect): - node_to_cpus.append(list(node_intersect)) - - if world_size > len(node_to_cpus): - logger.error( - "Auto thread-binding failed due to " - "world size: %d is larger than " - "allowed NUMA nodes number: %d." - "Please try to bind threads manually.", world_size, - len(node_to_cpus)) - else: - end = cpu_count_per_numa - num_of_reserved_cpu - rank_to_cpus_list = node_to_cpus[self.rank][:end] - rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) - logger.info("auto thread-binding list: %s", rank_to_cpus) - else: - logger.warning( - "Auto thread-binding is not supported due to " - "the lack of package numa and psutil," - "fallback to no thread-binding. To get better performance," - "please try to manually bind threads.") + end = cpu_count_per_numa - num_of_reserved_cpu + rank_to_cpus_list = node_cpus_this_rank[:end] + rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) + logger.info("ppc64le thread-binding list: %s", rank_to_cpus) return rank_to_cpus def get_cpus_id_binding_based_on_numa_nodes_ppc64le(self) -> str: @@ -166,48 +237,11 @@ class CPUWorker(Worker): performance by avoiding oversubscription of logical CPUs on Power. """ - def select_threads_per_power_core(node_cpu_ids): - return [cpu for cpu in node_cpu_ids if cpu % 8 < 4] - rank_to_cpus = self.local_omp_cpuid world_size = self.vllm_config.parallel_config.world_size - libnuma_found = util.find_spec("numa") is not None - psutil_found = util.find_spec("psutil") is not None - if libnuma_found and psutil_found: - import psutil - from numa import info - cpus_allow_list = psutil.Process().cpu_affinity() - numa_size = info.get_num_configured_nodes() - - node_to_cpus = [] - for i in range(numa_size): - node_intersect = set( - info.node_to_cpus(i)).intersection(cpus_allow_list) - if bool(node_intersect): - node_to_cpus.append(sorted(list(node_intersect))) - - if world_size > len(node_to_cpus): - logger.error( - "Auto thread-binding failed due to " - "world size: %d is larger than " - "allowed NUMA nodes number: %d." - "Please try to bind threads manually.", world_size, - len(node_to_cpus)) - else: - node_cpus_this_rank = node_to_cpus[self.rank] - node_cpus_this_rank = select_threads_per_power_core( - node_cpus_this_rank) - cpu_count_per_numa = len(node_cpus_this_rank) - num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU, - cpu_count_per_numa // 2) - end = cpu_count_per_numa - num_of_reserved_cpu - rank_to_cpus_list = node_cpus_this_rank[:end] - rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) - logger.info("ppc64le thread-binding list: %s", rank_to_cpus) + if self.libnuma_and_psutil_found(): + rank_to_cpus = self.auto_thread_binding_based_on_numa_nodes_ppc64le( + world_size, rank_to_cpus) else: - logger.warning( - "Auto thread-binding is not supported due to " - "the lack of package numa and psutil," - "fallback to no thread-binding. To get better performance," - "please try to manually bind threads.") + self.warn_lack_of_numa_and_psutil() return rank_to_cpus