[Bugfix][ROCm] running new process using spawn method for rocm in tests. (#14810)

Signed-off-by: vllmellm <vllm.ellm@embeddedllm.com>
Signed-off-by: tjtanaa <tunjian.tan@embeddedllm.com>
Co-authored-by: TJian <tunjian.tan@embeddedllm.com>
Co-authored-by: Cyrus Leung <cyrus.tl.leung@gmail.com>
This commit is contained in:
vllmellm
2025-03-17 19:33:35 +08:00
committed by GitHub
parent 6eaf1e5c52
commit 2bb0e1a799
21 changed files with 174 additions and 99 deletions

View File

@@ -7,12 +7,14 @@ import os
import signal
import subprocess
import sys
import tempfile
import time
import warnings
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Literal, Optional, Union
import cloudpickle
import openai
import pytest
import requests
@@ -703,6 +705,78 @@ def fork_new_process_for_each_test(
return wrapper
def spawn_new_process_for_each_test(
f: Callable[_P, None]) -> Callable[_P, None]:
"""Decorator to spawn a new process for each test function.
"""
@functools.wraps(f)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
# Check if we're already in a subprocess
if os.environ.get('RUNNING_IN_SUBPROCESS') == '1':
# If we are, just run the function directly
return f(*args, **kwargs)
import torch.multiprocessing as mp
with suppress(RuntimeError):
mp.set_start_method('spawn')
# Get the module
module_name = f.__module__
# Create a process with environment variable set
env = os.environ.copy()
env['RUNNING_IN_SUBPROCESS'] = '1'
with tempfile.TemporaryDirectory() as tempdir:
output_filepath = os.path.join(tempdir, "new_process.tmp")
# `cloudpickle` allows pickling complex functions directly
input_bytes = cloudpickle.dumps((f, output_filepath))
cmd = [sys.executable, "-m", f"{module_name}"]
returned = subprocess.run(cmd,
input=input_bytes,
capture_output=True,
env=env)
# check if the subprocess is successful
try:
returned.check_returncode()
except Exception as e:
# wrap raised exception to provide more information
raise RuntimeError(f"Error raised in subprocess:\n"
f"{returned.stderr.decode()}") from e
return wrapper
def create_new_process_for_each_test(
method: Optional[Literal["spawn", "fork"]] = None
) -> Callable[[Callable[_P, None]], Callable[_P, None]]:
"""Creates a decorator that runs each test function in a new process.
Args:
method: The process creation method. Can be either "spawn" or "fork".
If not specified,
it defaults to "spawn" on ROCm platforms and "fork" otherwise.
Returns:
A decorator to run test functions in separate processes.
"""
if method is None:
method = "spawn" if current_platform.is_rocm() else "fork"
assert method in ["spawn",
"fork"], "Method must be either 'spawn' or 'fork'"
if method == "fork":
return fork_new_process_for_each_test
return spawn_new_process_for_each_test
def large_gpu_mark(min_gb: int) -> pytest.MarkDecorator:
"""
Get a pytest mark, which skips the test if the GPU doesn't meet
@@ -762,7 +836,7 @@ def multi_gpu_test(*, num_gpus: int):
marks = multi_gpu_marks(num_gpus=num_gpus)
def wrapper(f: Callable[_P, None]) -> Callable[_P, None]:
func = fork_new_process_for_each_test(f)
func = create_new_process_for_each_test()(f)
for mark in reversed(marks):
func = mark(func)