[CI] Fix "2 Node Tests (4 GPUs in total)" (#31090)

Signed-off-by: Lucas Wilkinson <lwilkins@redhat.com>
This commit is contained in:
Lucas Wilkinson
2025-12-21 21:32:40 -05:00
committed by GitHub
parent 9d701e90d8
commit 7e065eba59
4 changed files with 51 additions and 27 deletions

View File

@@ -14,19 +14,19 @@ Multi-node:
--model="ibm-research/PowerMoE-3b" \
-dp=2 \
-tp=2 \
--nnodes=2 \
--node-rank=0 \
--master-addr=10.99.48.128 \
--master-port=13345
--dp-num-nodes=2 \
--dp-node-rank=0 \
--dp-master-addr=10.99.48.128 \
--dp-master-port=13345
Node 1:
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
-dp=2 \
-tp=2 \
--nnodes=2 \
--node-rank=1 \
--master-addr=10.99.48.128 \
--master-port=13345
--dp-num-nodes=2 \
--dp-node-rank=1 \
--dp-master-addr=10.99.48.128 \
--dp-master-port=13345
"""
import os
@@ -48,7 +48,31 @@ def create_parser():
enable_expert_parallel=True,
)
# Add timeout (not in EngineArgs)
# Add DP-specific args (separate from engine args to avoid conflicts)
parser.add_argument(
"--dp-num-nodes",
type=int,
default=1,
help="Total number of nodes for data parallel.",
)
parser.add_argument(
"--dp-node-rank",
type=int,
default=0,
help="Rank of the current node for data parallel.",
)
parser.add_argument(
"--dp-master-addr",
type=str,
default="",
help="Master node IP address for DP coordination.",
)
parser.add_argument(
"--dp-master-port",
type=int,
default=0,
help="Master node port for DP coordination.",
)
parser.add_argument(
"--timeout",
type=int,
@@ -132,26 +156,26 @@ if __name__ == "__main__":
parser = create_parser()
args = vars(parser.parse_args())
# Extract DP-specific args
# Extract DP-specific args (pop to remove from engine_args)
dp_size = args.pop("data_parallel_size")
nnodes = args.get("nnodes", 1)
node_rank = args.get("node_rank", 0)
master_addr = args.get("master_addr", "")
master_port = args.get("master_port", 0)
dp_num_nodes = args.pop("dp_num_nodes")
dp_node_rank = args.pop("dp_node_rank")
dp_master_addr = args.pop("dp_master_addr")
dp_master_port = args.pop("dp_master_port")
timeout = args.pop("timeout")
# Remaining args are engine args
engine_args = args
if nnodes == 1:
if dp_num_nodes == 1:
dp_master_ip = "127.0.0.1"
dp_master_port = get_open_port()
dp_master_port_val = get_open_port()
else:
dp_master_ip = master_addr
dp_master_port = master_port
dp_master_ip = dp_master_addr
dp_master_port_val = dp_master_port
assert dp_size % nnodes == 0, "dp_size should be divisible by nnodes"
dp_per_node = dp_size // nnodes
assert dp_size % dp_num_nodes == 0, "dp_size should be divisible by dp_num_nodes"
dp_per_node = dp_size // dp_num_nodes
from multiprocessing import Process
@@ -162,7 +186,7 @@ if __name__ == "__main__":
procs = []
for local_dp_rank, global_dp_rank in enumerate(
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)
range(dp_node_rank * dp_per_node, (dp_node_rank + 1) * dp_per_node)
):
proc = Process(
target=main,
@@ -171,7 +195,7 @@ if __name__ == "__main__":
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
dp_master_port_val,
engine_args,
),
)