[Misc] Update disaggregation benchmark scripts and test logs (#11456)

Signed-off-by: Jiaxin Shan <seedjeffwan@gmail.com>
This commit is contained in:
Jiaxin Shan
2024-12-24 22:58:48 -08:00
committed by GitHub
parent 9832e5572a
commit fc601665eb
6 changed files with 43 additions and 29 deletions

View File

@@ -48,7 +48,7 @@ def test_run(my_rank, buffer, device):
assert buffer.buffer_size == 0
assert len(buffer.buffer) == 0
print("Test run passed!")
print("My rank: %d, Test run passed!" % (my_rank))
def stress_test(my_rank, buf, device):
@@ -108,7 +108,7 @@ def stress_test(my_rank, buf, device):
else:
torch.distributed.send(torch.tensor([n]), 0)
print("Passed stress test!")
print("My rank: %d, Passed stress test!" % (my_rank))
if __name__ == "__main__":

View File

@@ -1,3 +1,8 @@
#!/bin/bash
RANK=0 python test_lookup_buffer.py &
RANK=1 python test_lookup_buffer.py &
RANK=0 python3 test_lookup_buffer.py &
PID0=$!
RANK=1 python3 test_lookup_buffer.py &
PID1=$!
wait $PID0
wait $PID1

View File

@@ -10,39 +10,42 @@ from vllm.distributed.kv_transfer.kv_pipe.pynccl_pipe import PyNcclPipe
def test_run(my_rank, pipe):
print(f"rank {my_rank} test_run starts....")
# test run
x = torch.tensor([1]).to(pipe.device)
y = torch.tensor([[2., 3., 4., 8.]]).to(pipe.device)
if my_rank == 0:
pipe.send_tensor(x)
print("sent tensor x")
print(f"rank {my_rank} sent tensor x")
pipe.send_tensor(y)
print("sent tensor y")
print(f"rank {my_rank} sent tensor y")
x2 = pipe.recv_tensor()
print("received x2 = ", x2)
print(f"rank {my_rank} received x2 = ", x2)
y2 = pipe.recv_tensor()
print("received y2 = ", x2)
print(f"rank {my_rank} received y2 = ", x2)
else:
x2 = pipe.recv_tensor()
print("received x2 = ", x2)
print(f"rank {my_rank} received x2 = ", x2)
y2 = pipe.recv_tensor()
print("received y2 = ", x2)
print(f"rank {my_rank} received y2 = ", x2)
pipe.send_tensor(x)
print("sent tensor x")
print(f"rank {my_rank} sent tensor x")
pipe.send_tensor(y)
print("sent tensor y")
print(f"rank {my_rank} sent tensor y")
assert torch.allclose(x, x2)
assert torch.allclose(y, y2)
print(f"rank {my_rank} test_run passed!")
def stress_test(my_rank, pipe):
torch.distributed.barrier()
print(f"rank {my_rank} stress_test starts....")
tensors: List[torch.Tensor] = []
torch.distributed.barrier()
torch.manual_seed(0)
for i in tqdm(range(500)):
@@ -86,7 +89,6 @@ def stress_test(my_rank, pipe):
def latency_test(my_rank, pipe, nelement, ntensor):
latencies = []
torch.distributed.barrier()
@@ -149,6 +151,7 @@ if __name__ == "__main__":
)
test_run(my_rank, pipe)
stress_test(my_rank, pipe)
# Use this function if you want to test the latency of pipe impl.

View File

@@ -1,3 +1,9 @@
#!/bin/bash
RANK=0 python3 test_send_recv.py &
RANK=1 python3 test_send_recv.py &
PID0=$!
RANK=1 python3 test_send_recv.py &
PID1=$!
wait $PID0
wait $PID1