[Benchmark] Simplify SLA scan (#35306)

Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
This commit is contained in:
Cyrus Leung
2026-02-26 14:35:41 +08:00
committed by GitHub
parent 186ea22efe
commit d3a51da92a
8 changed files with 253 additions and 799 deletions

View File

@@ -4,6 +4,11 @@ This section guides you through running benchmark tests with the extensive datas
It's a living document, updated as new features and datasets become available.
!!! tip
The benchmarks described on this page are mainly for evaluating specific vLLM features as well as regression testing.
For benchmarking production vLLM servers, we recommend [GuideLLM](https://github.com/vllm-project/guidellm), an established performance benchmarking framework with live progress updates and automatic report generation. It is also more flexible than `vllm bench serve` in terms of dataset loading, request formatting, and workload patterns.
## Dataset Overview
<style>

View File

@@ -1,10 +1,15 @@
# Parameter Sweeps
`vllm bench sweep` is a suite of commands designed to run benchmarks across multiple configurations and compare them by visualizing the results.
## Online Benchmark
### Basic
`vllm bench sweep serve` automatically starts `vllm serve` and runs `vllm bench serve` to evaluate vLLM over multiple configurations.
`vllm bench sweep serve` starts `vllm serve` and iteratively runs `vllm bench serve` for each server configuration.
!!! tip
If you only need to run benchmarks for a single server configuration, consider using [GuideLLM](https://github.com/vllm-project/guidellm), an established performance benchmarking framework with live progress updates and automatic report generation. It is also more flexible than `vllm bench serve` in terms of dataset loading, request formatting, and workload patterns.
Follow these steps to run the script:
@@ -50,14 +55,17 @@ Follow these steps to run the script:
```json
[
{
"_benchmark_name": "scenario_A",
"random_input_len": 128,
"random_output_len": 32
},
{
"_benchmark_name": "scenario_B",
"random_input_len": 256,
"random_output_len": 64
},
{
"_benchmark_name": "scenario_C",
"random_input_len": 512,
"random_output_len": 128
}
@@ -77,6 +85,8 @@ vllm bench sweep serve \
-o benchmarks/results
```
By default, each parameter combination is benchmarked 3 times to make the results more reliable. You can adjust the number of runs by setting `--num-runs`.
!!! important
If both `--serve-params` and `--bench-params` are passed, the script will iterate over the Cartesian product between them.
You can use `--dry-run` to preview the commands to be run.
@@ -86,60 +96,40 @@ vllm bench sweep serve \
In case you are using a custom `--serve-cmd`, you can override the commands used for resetting the state by setting `--after-bench-cmd`.
!!! note
By default, each parameter combination is run 3 times to make the results more reliable. You can adjust the number of runs by setting `--num-runs`.
You should set `_benchmark_name` to provide a human-readable name for parameter combinations involving many variables.
This becomes mandatory if the file name would otherwise exceed the maximum path length allowed by the filesystem.
!!! tip
You can use the `--resume` option to continue the parameter sweep if one of the runs failed.
### SLA auto-tuner
You can use the `--resume` option to continue the parameter sweep if an unexpected error occurs, e.g., timeout when connecting to HF Hub.
`vllm bench sweep serve_sla` is a wrapper over `vllm bench sweep serve` that tunes either the request rate or concurrency (choose using `--sla-variable`) in order to satisfy the SLA constraints given by `--sla-params`.
### SLA Scanner
For example, to ensure E2E latency within different target values for 99% of requests:
```json
[
{
"p99_e2el_ms": "<=200"
},
{
"p99_e2el_ms": "<=500"
},
{
"p99_e2el_ms": "<=1000"
},
{
"p99_e2el_ms": "<=2000"
}
]
```
`vllm bench sweep serve_sla` is a variant of `vllm bench sweep serve` that scans through values of request rate or concurrency (choose using `--sla-variable`) in order to find the tradeoff between latency and throughput. The results can then be [visualized](#visualization) to determine the feasible SLAs.
Example command:
```bash
vllm bench sweep serve_sla \
--serve-cmd 'vllm serve meta-llama/Llama-2-7b-chat-hf' \
--bench-cmd 'vllm bench serve --model meta-llama/Llama-2-7b-chat-hf --backend vllm --endpoint /v1/completions --dataset-name sharegpt --dataset-path benchmarks/ShareGPT_V3_unfiltered_cleaned_split.json' \
--bench-cmd 'vllm bench serve --model meta-llama/Llama-2-7b-chat-hf --backend vllm --endpoint /v1/completions --dataset-name sharegpt --dataset-path benchmarks/ShareGPT_V3_unfiltered_cleaned_split.json --num-prompts 100' \
--serve-params benchmarks/serve_hparams.json \
--bench-params benchmarks/bench_hparams.json \
--sla-params benchmarks/sla_hparams.json \
--sla-variable max_concurrency \
--bench-params benchmarks/bench_hparams.json
-o benchmarks/results
```
The algorithm for adjusting the SLA variable is as follows:
The algorithm for scanning through different values of `sla_variable` can be summarized as follows:
1. Run the benchmark once with maximum possible QPS, and once with minimum possible QPS. For each run, calculate the distance of the SLA metrics from their targets, resulting in data points of QPS vs SLA distance.
2. Perform spline interpolation between the data points to estimate the QPS that results in zero SLA distance.
3. Run the benchmark with the estimated QPS and add the resulting data point to the history.
4. Repeat Steps 2 and 3 until the maximum QPS that passes SLA and the minimum QPS that fails SLA in the history are close enough to each other.
1. Run the benchmark once with `sla_variable = 1` to simulate serial inference. This results in the lowest possible latency and throughput.
2. Run the benchmark once with `sla_variable = num_prompts` to simulate batch inference over the whole dataset. This results in the highest possible latency and throughput.
3. Estimate the maximum value of `sla_variable` that can be supported by the server without oversaturating it.
4. Run the benchmark over intermediate values of `sla_variable` uniformly using the remaining iterations.
!!! important
SLA tuning is applied over each combination of `--serve-params`, `--bench-params`, and `--sla-params`.
You can override the number of iterations in the algorithm by setting `--sla-iters`.
For a given combination of `--serve-params` and `--bench-params`, we share the benchmark results across `--sla-params` to avoid rerunning benchmarks with the same SLA variable value.
!!! tip
This is our equivalent of [GuideLLM's `--profile sweep`](https://github.com/vllm-project/guidellm/blob/v0.5.3/src/guidellm/benchmark/profiles.py#L575).
### Startup
## Startup Benchmark
`vllm bench sweep startup` runs `vllm bench startup` across parameter combinations to compare cold/warm startup time for different engine settings.
@@ -202,15 +192,28 @@ vllm bench sweep startup \
`vllm bench sweep plot` can be used to plot performance curves from parameter sweep results.
Example command:
Control the variables to plot via `--var-x` and `--var-y`, optionally applying `--filter-by` and `--bin-by` to the values. The plot is organized according to `--fig-by`, `--row-by`, `--col-by`, and `--curve-by`.
Example commands for visualizing [SLA Scanner](#sla-scanner) results:
```bash
# Latency increases as the request rate increases
vllm bench sweep plot benchmarks/results/<timestamp> \
--var-x max_concurrency \
--var-x request_rate \
--var-y p99_ttft_ms \
--row-by random_input_len \
--col-by random_output_len \
--curve-by api_server_count,max_num_batched_tokens \
--filter-by 'max_concurrency<=1024'
--curve-by max_num_seqs,max_num_batched_tokens \
--filter-by 'request_rate<=128'
# Tradeoff between latency and throughput
vllm bench sweep plot benchmarks/results/<timestamp> \
--var-x request_throughput \
--var-y median_ttft_ms \
--row-by random_input_len \
--col-by random_output_len \
--curve-by max_num_seqs,max_num_batched_tokens \
--filter-by 'request_rate<=128'
```
!!! tip
@@ -233,3 +236,6 @@ Example:
vllm bench sweep plot_pareto benchmarks/results/<timestamp> \
--label-by max_concurrency,tensor_parallel_size,pipeline_parallel_size
```
!!! tip
You can use `--dry-run` to preview the figures to be plotted.

View File

@@ -1,298 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import json
from collections.abc import Callable
from pathlib import Path
from unittest.mock import patch
from vllm.benchmarks.sweep.param_sweep import ParameterSweepItem
from vllm.benchmarks.sweep.serve_sla import _get_sla_run_path, solve_sla
from vllm.benchmarks.sweep.server import ServerProcess
from vllm.benchmarks.sweep.sla_sweep import (
SLACriterionBase,
SLALessThan,
SLALessThanOrEqualTo,
SLASweepItem,
)
def _set_return_value(
var2metric: Callable[[ParameterSweepItem], list[dict[str, float]]],
):
"""
Create a patch for run_sla with a specific function
indicating the relationship between the benchmark combination
(which includes the SLA variable) and the SLA criterion.
"""
def mock_run_sla(
server: ServerProcess | None,
bench_cmd: list[str],
*,
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
iter_path: Path,
num_runs: int,
dry_run: bool,
):
iter_data = var2metric(bench_comb)
summary_path = _get_sla_run_path(iter_path, run_number=None)
summary_path.parent.mkdir(parents=True, exist_ok=True)
with summary_path.open("w") as f:
json.dump(iter_data, f, indent=4)
return iter_data
return patch("vllm.benchmarks.sweep.serve_sla.run_sla", side_effect=mock_run_sla)
def _var2metric_linear():
def wrapped(bench_comb):
x = float(bench_comb["request_rate"])
y = x
return [{"request_throughput": y}]
return wrapped
def _var2metric_concave(elbow_point: float):
def wrapped(bench_comb):
x = float(bench_comb["request_rate"])
if x < elbow_point:
y = 0.5 * (x - elbow_point) + elbow_point
else:
y = 1.5 * (x - elbow_point) + elbow_point
return [{"request_throughput": y}]
return wrapped
def _var2metric_convex(elbow_point: float):
def wrapped(bench_comb):
x = float(bench_comb["request_rate"])
if x < elbow_point:
y = 1.5 * (x - elbow_point) + elbow_point
else:
y = 0.5 * (x - elbow_point) + elbow_point
return [{"request_throughput": y}]
return wrapped
def _var2metric_quadratic(y_intercept: float):
def wrapped(bench_comb):
x = float(bench_comb["request_rate"])
y = y_intercept + 0.1 * x**2
return [{"request_throughput": y}]
return wrapped
def _var2metric_sqrt(y_intercept: float):
def wrapped(bench_comb):
x = float(bench_comb["request_rate"])
y = y_intercept + 10 * x**0.5
return [{"request_throughput": y}]
return wrapped
def _run_solve_sla(
var2metric: Callable[[ParameterSweepItem], list[dict[str, float]]],
criterion: SLACriterionBase,
base_path: Path,
min_value: int = 1,
max_value: int = 100,
):
with _set_return_value(var2metric):
result = solve_sla(
server=None,
bench_cmd=[],
serve_comb=ParameterSweepItem(),
bench_comb=ParameterSweepItem(),
sla_comb=SLASweepItem({"request_throughput": criterion}),
base_path=base_path,
num_runs=1,
dry_run=False,
sla_variable="request_rate",
sla_min_value=min_value,
sla_max_value=max_value,
)
assert result is not None
return result
def test_solve_linear_sla_le(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_linear(),
SLALessThanOrEqualTo(target=32),
tmp_path,
)
assert history.get_max_passing() == 32
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
32: True,
33: False,
}
def test_solve_linear_sla_lt(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_linear(),
SLALessThan(target=32),
tmp_path,
)
assert history.get_max_passing() == 31
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
31: True,
32: False,
}
def test_solve_linear_sla_oob(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_linear(),
SLALessThanOrEqualTo(target=32),
tmp_path,
min_value=64,
)
assert history.get_max_passing() == 64
assert history.get_min_failing() == 64
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
64: False,
}
def test_solve_concave_sla_le(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_concave(elbow_point=32),
SLALessThanOrEqualTo(target=24),
tmp_path,
)
assert history.get_max_passing() == 16
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
7: True,
13: True,
15: True,
16: True,
17: False,
}
def test_solve_convex_sla_le(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_convex(elbow_point=32),
SLALessThanOrEqualTo(target=24),
tmp_path,
)
assert history.get_max_passing() == 26
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
48: False,
30: False,
24: True,
26: True,
27: False,
}
def test_solve_quadratic_sla_le(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_quadratic(y_intercept=10),
SLALessThanOrEqualTo(target=50),
tmp_path,
)
assert history.get_max_passing() == 20
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
4: True,
20: True,
21: False,
}
def test_solve_sqrt_sla_le(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_sqrt(y_intercept=10),
SLALessThanOrEqualTo(target=100),
tmp_path,
)
assert history.get_max_passing() == 81
assert {val: margin <= 0 for val, margin in history.items()} == {
100: False,
1: True,
89: False,
81: True,
82: False,
}
def test_solve_reuse_history(tmp_path):
sla_data, history = _run_solve_sla(
_var2metric_linear(),
SLALessThanOrEqualTo(target=10),
tmp_path,
min_value=1,
max_value=20,
)
assert history.get_max_passing() == 10
assert {val: margin <= 0 for val, margin in history.items()} == {
20: False,
1: True,
10: True,
11: False,
}
sla_data, history = _run_solve_sla(
_var2metric_linear(),
SLALessThanOrEqualTo(target=30),
tmp_path,
min_value=21,
max_value=40,
)
assert history.get_max_passing() == 30
assert {val: margin <= 0 for val, margin in history.items()} == {
# Items from the past run
# (the margins are different because the target changed)
20: True,
1: True,
10: True,
11: True,
# Items from this run
40: False,
30: True,
31: False,
}

View File

@@ -576,7 +576,7 @@ class SweepPlotArgs:
parser.add_argument(
"--var-y",
type=str,
default="p99_e2el_ms",
default="p99_ttft_ms",
help="The variable for the y-axis",
)
parser.add_argument(

View File

@@ -92,7 +92,8 @@ def run_benchmark(
run_data: dict[str, object]
if output_path.exists():
print("Found existing results. Skipping.")
print("Found existing results.")
print("[SKIPPED BENCHMARK]")
with output_path.open("rb") as f:
run_data = json.load(f)
@@ -167,6 +168,43 @@ def _comb_needs_server(
return False
def server_ctx(
serve_cmd: list[str],
after_bench_cmd: list[str],
*,
show_stdout: bool,
serve_comb: ParameterSweepItem,
bench_params: ParameterSweep,
output_dir: Path,
dry_run: bool,
server_ready_timeout: int = 300,
):
if not _comb_needs_server(serve_comb, bench_params, output_dir):
return contextlib.nullcontext()
return run_server(
serve_cmd,
after_bench_cmd,
show_stdout=show_stdout,
serve_overrides=serve_comb,
dry_run=dry_run,
server_ready_timeout=server_ready_timeout,
)
def _comb_is_valid(
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
link_vars: list[tuple[str, str]],
) -> bool:
return all(
serve_key in serve_comb
and bench_key in bench_comb
and serve_comb[serve_key] == bench_comb[bench_key]
for serve_key, bench_key in link_vars
)
def run_comb(
server: ServerProcess | None,
bench_cmd: list[str],
@@ -176,7 +214,11 @@ def run_comb(
base_path: Path,
num_runs: int,
dry_run: bool,
link_vars: list[tuple[str, str]],
):
if not _comb_is_valid(serve_comb, bench_comb, link_vars):
return None
comb_data = list[dict[str, object]]()
for run_number in range(num_runs):
@@ -208,37 +250,27 @@ def run_combs(
after_bench_cmd: list[str],
*,
show_stdout: bool,
server_ready_timeout: int,
serve_params: ParameterSweep,
bench_params: ParameterSweep,
output_dir: Path,
num_runs: int,
dry_run: bool,
links: list[tuple[str, str]],
server_ready_timeout: int = 300,
link_vars: list[tuple[str, str]],
):
all_data = list[dict[str, object]]()
for serve_comb in serve_params:
with (
run_server(
serve_cmd,
after_bench_cmd,
show_stdout=show_stdout,
serve_overrides=serve_comb,
dry_run=dry_run,
server_ready_timeout=server_ready_timeout,
)
if _comb_needs_server(serve_comb, bench_params, output_dir)
else contextlib.nullcontext()
with server_ctx(
serve_cmd,
after_bench_cmd,
show_stdout=show_stdout,
serve_comb=serve_comb,
bench_params=bench_params,
output_dir=output_dir,
dry_run=dry_run,
server_ready_timeout=server_ready_timeout,
) as server:
for bench_comb in bench_params:
should_run = all(
serve_key in serve_comb
and bench_key in bench_comb
and serve_comb[serve_key] == bench_comb[bench_key]
for serve_key, bench_key in links
)
if not should_run:
continue
base_path = _get_comb_base_path(output_dir, serve_comb, bench_comb)
comb_data = run_comb(
@@ -249,6 +281,7 @@ def run_combs(
base_path=base_path,
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
)
if comb_data is not None:
@@ -269,14 +302,14 @@ class SweepServeArgs:
bench_cmd: list[str]
after_bench_cmd: list[str]
show_stdout: bool
server_ready_timeout: int
serve_params: ParameterSweep
bench_params: ParameterSweep
output_dir: Path
num_runs: int
dry_run: bool
resume: str | None
link_vars: list[tuple[str, str]] | None
server_ready_timeout: int
link_vars: list[tuple[str, str]]
parser_name: ClassVar[str] = "serve"
parser_help: ClassVar[str] = "Run vLLM server benchmark under multiple settings."
@@ -300,7 +333,9 @@ class SweepServeArgs:
else:
# i.e.: run bench_cmd without any modification
bench_params = ParameterSweep.from_records([{}])
link_vars = cls.parse_link_vars(args.link_vars)
num_runs = args.num_runs
if num_runs < 1:
raise ValueError("`num_runs` should be at least 1.")
@@ -437,13 +472,13 @@ def run_main(args: SweepServeArgs):
bench_cmd=args.bench_cmd,
after_bench_cmd=args.after_bench_cmd,
show_stdout=args.show_stdout,
server_ready_timeout=args.server_ready_timeout,
serve_params=args.serve_params,
bench_params=args.bench_params,
output_dir=output_dir,
num_runs=args.num_runs,
dry_run=args.dry_run,
links=args.link_vars,
server_ready_timeout=args.server_ready_timeout,
link_vars=args.link_vars,
)
except BaseException as exc:
raise RuntimeError(

View File

@@ -1,306 +1,162 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import argparse
import contextlib
import json
import math
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from typing import ClassVar, Literal, get_args
import numpy as np
from typing_extensions import assert_never
from vllm.utils.import_utils import PlaceholderModule
from .param_sweep import ParameterSweep, ParameterSweepItem
from .serve import SweepServeArgs, run_benchmark, run_server
from .serve import (
SweepServeArgs,
_get_comb_base_path,
run_comb,
server_ctx,
)
from .server import ServerProcess
from .sla_sweep import SLASweep, SLASweepItem
from .utils import sanitize_filename
try:
import pandas as pd
except ImportError:
pd = PlaceholderModule("pandas")
try:
from scipy.interpolate import PchipInterpolator
except ImportError:
PchipInterpolator = (
PlaceholderModule("scipy")
.placeholder_attr("interpolate")
.placeholder_attr("PchipInterpolator")
)
def _get_sla_base_path(
output_dir: Path,
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
):
parts = list[str]()
if serve_comb:
parts.extend(("SERVE-", serve_comb.as_text(sep="-")))
if bench_comb:
parts.extend(("BENCH-", bench_comb.as_text(sep="-")))
return output_dir / sanitize_filename("-".join(parts))
def _get_sla_iter_path(
base_path: Path,
sla_comb: SLASweepItem,
sla_variable: str,
sla_value: int | None,
):
if sla_value is None:
prefix = sla_comb.as_text(sep="-")
return base_path / f"SLA--{prefix}.json"
return base_path / f"{sla_variable}={sla_value}"
def _get_sla_run_path(iter_path: Path, run_number: int | None):
if run_number is None:
return iter_path / "summary.json"
return iter_path / f"run={run_number}.json"
def _iter_sla_val_paths(base_path: Path, sla_variable: str):
for iter_path in base_path.glob(f"{sla_variable}=*"):
sla_value = int(iter_path.name.removeprefix(f"{sla_variable}="))
summary_path = iter_path / "summary.json"
if summary_path.exists():
yield sla_value, summary_path
def _sla_needs_server(
serve_comb: ParameterSweepItem,
bench_combs: ParameterSweep,
sla_combs: SLASweep,
sla_variable: str,
output_dir: Path,
):
for bench_comb in bench_combs:
base_path = _get_sla_base_path(output_dir, serve_comb, bench_comb)
for sla_comb in sla_combs:
if not _get_sla_iter_path(
base_path,
sla_comb,
sla_variable,
sla_value=None,
).exists():
return True
return False
def run_sla(
server: ServerProcess | None,
bench_cmd: list[str],
*,
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
iter_path: Path,
num_runs: int,
dry_run: bool,
):
iter_data = list[dict[str, object]]()
for run_number in range(num_runs):
run_data = run_benchmark(
server,
bench_cmd,
serve_overrides=serve_comb,
bench_overrides=bench_comb,
run_number=run_number,
output_path=_get_sla_run_path(iter_path, run_number),
dry_run=dry_run,
)
if run_data is not None:
iter_data.append(run_data)
if dry_run:
return None
with _get_sla_run_path(iter_path, run_number=None).open("w") as f:
json.dump(iter_data, f, indent=4)
return iter_data
SLAVariable = Literal["request_rate", "max_concurrency"]
class SLAHistory(dict[int, float]):
def __init__(self, min_value: int, max_value: int) -> None:
super().__init__()
def _estimate_sla_value(run_data: dict[str, object], sla_variable: SLAVariable):
request_throughput = float(run_data["request_throughput"]) # type: ignore
if sla_variable == "request_rate":
return request_throughput
if sla_variable == "max_concurrency":
mean_latency_ms = float(run_data["mean_e2el_ms"]) # type: ignore
return request_throughput * mean_latency_ms / 1000
self.min_value = min_value
self.max_value = max_value
def get_xy(self) -> tuple[list[int], list[float]]:
xs = list[int]()
ys = list[float]()
for x, y in sorted(self.items()):
xs.append(x)
ys.append(y)
return xs, ys
def get_max_passing(self) -> float:
return max(
(val for val, margin in self.items() if margin <= 0),
default=self.min_value,
)
def get_min_failing(self) -> float:
return min(
(val for val, margin in self.items() if margin > 0),
default=self.max_value,
)
assert_never(sla_variable)
def _compute_margin(
sla_comb: SLASweepItem,
iter_data: list[dict[str, object]],
):
assert iter_data, "Summary should not be empty"
iter_data_mean = {
k: sum(float(run_data[k]) for run_data in iter_data) / len(iter_data) # type: ignore
for k in sla_comb
}
sla_margins = [
criterion.print_and_compute_margin(iter_data_mean, k)
for k, criterion in sla_comb.items()
]
return max(sla_margins)
def _estimate_sla_avg(runs: list[dict[str, object]], sla_variable: SLAVariable):
return sum(_estimate_sla_value(run, sla_variable) for run in runs) / len(runs)
def solve_sla(
def run_comb_sla(
server: ServerProcess | None,
bench_cmd: list[str],
*,
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
sla_comb: SLASweepItem,
base_path: Path,
output_dir: Path,
num_runs: int,
dry_run: bool,
link_vars: list[tuple[str, str]],
sla_variable: SLAVariable,
sla_min_value: int = 1,
sla_max_value: int = 8192, # The value that represents infinite QPS
):
sla_data = list[dict[str, object]]()
history = SLAHistory(min_value=sla_min_value, max_value=sla_max_value)
sla_value: int,
) -> list[dict[str, object]] | None:
bench_comb_sla = bench_comb | {sla_variable: sla_value}
# Use results from previous runs
for past_sla_value, path in _iter_sla_val_paths(base_path, sla_variable):
with path.open("rb") as f:
past_iter_data = json.load(f)
sla_data.append(past_iter_data)
history[past_sla_value] = _compute_margin(sla_comb, past_iter_data)
# NOTE: We don't use equality here to be more robust against noisy results
while history.get_max_passing() + 1 < history.get_min_failing():
if max(history, default=sla_min_value) < sla_max_value:
val = sla_max_value
elif min(history, default=sla_max_value) > sla_min_value:
val = sla_min_value
else:
spl = PchipInterpolator(*history.get_xy(), extrapolate=False)
spl_roots = spl.solve()
if len(spl_roots) == 0:
# Fallback to binary search
val = int((history.get_max_passing() + history.get_min_failing()) / 2)
else:
val = int(spl_roots[0])
if val in history:
# Cover both sides (floor and ceil) of the root to be sure
# that it is indeed the target value
val += 1
val = max(sla_min_value, min(val, sla_max_value))
print(f"Testing {sla_variable}: {val} req/s")
iter_data = run_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb | {sla_variable: val},
iter_path=_get_sla_iter_path(base_path, sla_comb, sla_variable, val),
num_runs=num_runs,
dry_run=dry_run,
)
if iter_data is None:
return None
margin = _compute_margin(sla_comb, iter_data)
if margin <= 0:
print(f"SLA criteria are met. ({margin=:.2f})")
else:
print(f"SLA criteria are not met. ({margin=:.2f})")
sla_data.extend(iter_data)
history[val] = margin
return sla_data, history
return run_comb(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb_sla,
base_path=_get_comb_base_path(output_dir, serve_comb, bench_comb_sla),
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
)
def search_sla(
def explore_sla(
server: ServerProcess | None,
bench_cmd: list[str],
*,
serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem,
sla_comb: SLASweepItem,
sla_variable: SLAVariable,
base_path: Path,
sla_iters: int,
output_dir: Path,
num_runs: int,
dry_run: bool,
link_vars: list[tuple[str, str]],
):
print("[SLA START]")
print(f"Serve parameters: {serve_comb.as_text() or '(None)'}")
print(f"Bench parameters: {bench_comb.as_text() or '(None)'}")
print(f"SLA criteria: {sla_comb.as_text()}")
print(f"Number of SLA iterations: {sla_iters}")
result = solve_sla(
if sla_iters < 2:
raise ValueError("`sla_iters` should be at least 2")
serial_comb_data = run_comb_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb,
sla_comb=sla_comb,
base_path=base_path,
output_dir=output_dir,
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
sla_variable=sla_variable,
sla_value=1,
)
if result is None:
assert dry_run
print("Omitting SLA search.")
print("[SLA END]")
batch_comb_data = run_comb_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb,
output_dir=output_dir,
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
sla_variable=sla_variable,
sla_value=int(bench_comb.get("num_prompts", 1000)), # type: ignore
)
if serial_comb_data is None or batch_comb_data is None:
if dry_run:
print("Omitting intermediate SLA iterations.")
print("[SLA END]")
return
sla_data, sla_history = result
sla_value = sla_history.get_max_passing()
print(f"Maximum {sla_variable} for SLA: {sla_value} req/s.")
serial_sla_value = math.ceil(_estimate_sla_avg(serial_comb_data, sla_variable))
print(f"Serial inference: {sla_variable}={serial_sla_value}")
with _get_sla_iter_path(
base_path,
sla_comb,
sla_variable,
sla_value=None,
).open("w") as f:
json.dump(sla_data, f, indent=4)
batch_sla_value = math.floor(_estimate_sla_avg(batch_comb_data, sla_variable))
print(f"Batch inference: {sla_variable}={batch_sla_value}")
# Avoid duplicated runs for intermediate values if the range between
# `serial_sla_value` and `batch_sla_value` is small
inter_sla_values = np.linspace(serial_sla_value, batch_sla_value, sla_iters)[1:-1]
inter_sla_values = sorted(set(map(round, inter_sla_values)))
inter_combs_data: list[dict[str, object]] = []
for inter_sla_value in inter_sla_values:
print(f"Exploring: {sla_variable}={inter_sla_value}")
inter_comb_data = run_comb_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb,
output_dir=output_dir,
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
sla_variable=sla_variable,
sla_value=inter_sla_value,
)
if inter_comb_data is not None:
inter_combs_data.extend(inter_comb_data)
print("[SLA END]")
return sla_data
return serial_comb_data + inter_combs_data + batch_comb_data
def run_slas(
@@ -309,13 +165,15 @@ def run_slas(
after_bench_cmd: list[str],
*,
show_stdout: bool,
server_ready_timeout: int,
serve_params: ParameterSweep,
bench_params: ParameterSweep,
sla_params: SLASweep,
sla_variable: SLAVariable,
sla_iters: int,
output_dir: Path,
num_runs: int,
dry_run: bool,
link_vars: list[tuple[str, str]],
):
if any(bench_comb.has_param(sla_variable) for bench_comb in bench_params):
raise ValueError(
@@ -325,41 +183,32 @@ def run_slas(
all_data = list[dict[str, object]]()
for serve_comb in serve_params:
with (
run_server(
serve_cmd,
after_bench_cmd,
show_stdout=show_stdout,
serve_overrides=serve_comb,
dry_run=dry_run,
)
if _sla_needs_server(
serve_comb,
bench_params,
sla_params,
sla_variable,
output_dir,
)
else contextlib.nullcontext()
with server_ctx(
serve_cmd,
after_bench_cmd,
show_stdout=show_stdout,
server_ready_timeout=server_ready_timeout,
serve_comb=serve_comb,
bench_params=bench_params,
output_dir=output_dir,
dry_run=dry_run,
) as server:
for bench_comb in bench_params:
for sla_comb in sla_params:
base_path = _get_sla_base_path(output_dir, serve_comb, bench_comb)
comb_data = explore_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb,
sla_variable=sla_variable,
sla_iters=sla_iters,
output_dir=output_dir,
num_runs=num_runs,
dry_run=dry_run,
link_vars=link_vars,
)
comb_data = search_sla(
server,
bench_cmd,
serve_comb=serve_comb,
bench_comb=bench_comb,
sla_comb=sla_comb,
sla_variable=sla_variable,
base_path=base_path,
num_runs=num_runs,
dry_run=dry_run,
)
if comb_data is not None:
all_data.extend(comb_data)
if comb_data is not None:
all_data.extend(comb_data)
if dry_run:
return None
@@ -372,26 +221,23 @@ def run_slas(
@dataclass
class SweepServeSLAArgs(SweepServeArgs):
sla_params: SLASweep
sla_variable: SLAVariable
sla_iters: int
parser_name: ClassVar[str] = "serve_sla"
parser_help: ClassVar[str] = "Tune a variable to meet SLAs under multiple settings."
parser_help: ClassVar[str] = (
"Explore the latency-throughput space for determining SLAs."
)
@classmethod
def from_cli_args(cls, args: argparse.Namespace):
# NOTE: Don't use super() as `from_cli_args` calls `cls()`
base_args = SweepServeArgs.from_cli_args(args)
if args.sla_params:
sla_params = SLASweep.read_json(args.sla_params)
else:
sla_params = SLASweep.from_records([])
return cls(
**asdict(base_args),
sla_params=sla_params,
sla_variable=args.sla_variable,
sla_iters=args.sla_iters,
)
@classmethod
@@ -399,25 +245,20 @@ class SweepServeSLAArgs(SweepServeArgs):
parser = super().add_cli_args(parser)
sla_group = parser.add_argument_group("sla options")
sla_group.add_argument(
"--sla-params",
type=str,
required=True,
help="Path to JSON file containing a list of SLA constraints to satisfy. "
'Each constraint is expressed in `{"<KEY>": "<OP><VALUE>"}` format, '
'e.g.: `{"p99_e2el_ms": "<=500"}` means that '
"the E2E latency should be less than 500ms 99%% of the time. "
"Setting this option runs this script in SLA mode, which searches for "
"the maximum `sla_variable` that satisfies the constraints for "
"each combination of `serve_params`, `bench_params`, and `sla_params`.",
)
sla_group.add_argument(
"--sla-variable",
type=str,
choices=get_args(SLAVariable),
default="request_rate",
help="Whether to tune request rate or maximum concurrency to satisfy "
"the SLA constraints.",
help="The variable to adjust in each iteration.",
)
sla_group.add_argument(
"--sla-iters",
type=int,
default=10,
help="Number of iterations used to explore the latency-throughput space. "
"This includes the first two iterations used to interpolate the value of "
"`sla_variable` for remaining iterations.",
)
return parser
@@ -436,13 +277,15 @@ def run_main(args: SweepServeSLAArgs):
bench_cmd=args.bench_cmd,
after_bench_cmd=args.after_bench_cmd,
show_stdout=args.show_stdout,
server_ready_timeout=args.server_ready_timeout,
serve_params=args.serve_params,
bench_params=args.bench_params,
sla_params=args.sla_params,
sla_variable=args.sla_variable,
sla_iters=args.sla_iters,
output_dir=output_dir,
num_runs=args.num_runs,
dry_run=args.dry_run,
link_vars=args.link_vars,
)
except BaseException as exc:
raise RuntimeError(

View File

@@ -1,138 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import json
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing_extensions import override
SLA_EPS = 1e-8
"""Offset used to differentiate margins for equality checks."""
@dataclass
class SLACriterionBase(ABC):
target: float
@abstractmethod
def compute_margin(self, actual: float) -> float:
"""
Return a negative value or `0` if this criterion is met;
otherwise a positive value indicating the distance to the target.
"""
raise NotImplementedError
@abstractmethod
def format_cond(self, lhs: str) -> str:
raise NotImplementedError
def print_and_compute_margin(
self,
metrics: dict[str, float],
metrics_key: str,
) -> float:
metric = metrics[metrics_key]
margin = self.compute_margin(metric)
cond = self.format_cond(f"{metrics_key} = {metric:.2f}")
print(f"Validating SLA: {cond} | " + ("PASSED" if margin <= 0 else "FAILED"))
return margin
@dataclass
class SLALessThan(SLACriterionBase):
@override
def compute_margin(self, actual: float) -> float:
return actual + SLA_EPS - self.target
@override
def format_cond(self, lhs: str) -> str:
return f"{lhs}<{self.target:.2f}"
@dataclass
class SLALessThanOrEqualTo(SLACriterionBase):
@override
def compute_margin(self, actual: float) -> float:
return actual - self.target
@override
def format_cond(self, lhs: str) -> str:
return f"{lhs}<={self.target:.2f}"
@dataclass
class SLAGreaterThan(SLACriterionBase):
@override
def compute_margin(self, actual: float) -> float:
return self.target + SLA_EPS - actual
@override
def format_cond(self, lhs: str) -> str:
return f"{lhs}>{self.target:.2f}"
@dataclass
class SLAGreaterThanOrEqualTo(SLACriterionBase):
@override
def compute_margin(self, actual: float) -> float:
return self.target - actual
@override
def format_cond(self, lhs: str) -> str:
return f"{lhs}>={self.target:.2f}"
# NOTE: The ordering is important! Match longer op_keys first
SLA_CRITERIA: dict[str, type[SLACriterionBase]] = {
"<=": SLALessThanOrEqualTo,
">=": SLAGreaterThanOrEqualTo,
"<": SLALessThan,
">": SLAGreaterThan,
}
class SLASweep(list["SLASweepItem"]):
@classmethod
def read_json(cls, filepath: os.PathLike):
with open(filepath, "rb") as f:
records = json.load(f)
return cls.from_records(records)
@classmethod
def from_records(cls, records: list[dict[str, str]]):
if not isinstance(records, list):
raise TypeError(
f"The SLA sweep should be a list of dictionaries, "
f"but found type: {type(records)}"
)
return cls(SLASweepItem.from_record(record) for record in records)
class SLASweepItem(dict[str, SLACriterionBase]):
@classmethod
def from_record(cls, record: dict[str, str]):
sla_criteria: dict[str, SLACriterionBase] = {}
for metric_key, metric_value in record.items():
for op_key in SLA_CRITERIA:
if metric_value.startswith(op_key):
sla_criteria[metric_key] = SLA_CRITERIA[op_key](
float(metric_value.removeprefix(op_key))
)
break
else:
raise ValueError(
f"Invalid operator for "
f"SLA constraint '{metric_key}={metric_value}'. "
f"Valid operators are: {sorted(SLA_CRITERIA)}",
)
return cls(sla_criteria)
def as_text(self, sep: str = ", ") -> str:
return sep.join(v.format_cond(k) for k, v in self.items())

View File

@@ -151,7 +151,8 @@ def run_benchmark(
print(f"Output file: {output_path}")
if output_path.exists():
print("Found existing results. Skipping.")
print("Found existing results.")
print("[SKIPPED BENCHMARK]")
with output_path.open("r", encoding="utf-8") as f:
run_data = json.load(f)