[Disagg] Support large batch size in proxy server and update NixlConnector doc for DP (#28782)
Signed-off-by: Ming Yang <minos.future@gmail.com>
This commit is contained in:
@@ -146,6 +146,8 @@ python tests/v1/kv_connector/nixl_integration/toy_proxy_server.py \
|
|||||||
--decoder-ports 8000 8000
|
--decoder-ports 8000 8000
|
||||||
```
|
```
|
||||||
|
|
||||||
|
For multi-host DP deployment, only need to provide the host/port of the head instances.
|
||||||
|
|
||||||
### KV Role Options
|
### KV Role Options
|
||||||
|
|
||||||
- **kv_producer**: For prefiller instances that generate KV caches
|
- **kv_producer**: For prefiller instances that generate KV caches
|
||||||
|
|||||||
@@ -26,9 +26,21 @@ async def lifespan(app: FastAPI):
|
|||||||
)
|
)
|
||||||
|
|
||||||
app.state.prefill_client = httpx.AsyncClient(
|
app.state.prefill_client = httpx.AsyncClient(
|
||||||
timeout=None, base_url=prefiller_base_url
|
timeout=None,
|
||||||
|
base_url=prefiller_base_url,
|
||||||
|
limits=httpx.Limits(
|
||||||
|
max_connections=None,
|
||||||
|
max_keepalive_connections=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
app.state.decode_client = httpx.AsyncClient(
|
||||||
|
timeout=None,
|
||||||
|
base_url=decoder_base_url,
|
||||||
|
limits=httpx.Limits(
|
||||||
|
max_connections=None,
|
||||||
|
max_keepalive_connections=None,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
app.state.decode_client = httpx.AsyncClient(timeout=None, base_url=decoder_base_url)
|
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
@@ -105,6 +117,11 @@ async def send_request_to_service(
|
|||||||
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
|
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
|
||||||
response = await client.post(endpoint, json=req_data, headers=headers)
|
response = await client.post(endpoint, json=req_data, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
|
# read/consume the response body to release the connection
|
||||||
|
# otherwise, it would http.ReadError
|
||||||
|
await response.aread()
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,14 @@ async def lifespan(app: FastAPI):
|
|||||||
prefiller_base_url = f"http://{host}:{port}/v1"
|
prefiller_base_url = f"http://{host}:{port}/v1"
|
||||||
app.state.prefill_clients.append(
|
app.state.prefill_clients.append(
|
||||||
{
|
{
|
||||||
"client": httpx.AsyncClient(timeout=None, base_url=prefiller_base_url),
|
"client": httpx.AsyncClient(
|
||||||
|
timeout=None,
|
||||||
|
base_url=prefiller_base_url,
|
||||||
|
limits=httpx.Limits(
|
||||||
|
max_connections=None,
|
||||||
|
max_keepalive_connections=None,
|
||||||
|
),
|
||||||
|
),
|
||||||
"host": host,
|
"host": host,
|
||||||
"port": port,
|
"port": port,
|
||||||
"id": i,
|
"id": i,
|
||||||
@@ -42,7 +49,14 @@ async def lifespan(app: FastAPI):
|
|||||||
decoder_base_url = f"http://{host}:{port}/v1"
|
decoder_base_url = f"http://{host}:{port}/v1"
|
||||||
app.state.decode_clients.append(
|
app.state.decode_clients.append(
|
||||||
{
|
{
|
||||||
"client": httpx.AsyncClient(timeout=None, base_url=decoder_base_url),
|
"client": httpx.AsyncClient(
|
||||||
|
timeout=None,
|
||||||
|
base_url=decoder_base_url,
|
||||||
|
limits=httpx.Limits(
|
||||||
|
max_connections=None,
|
||||||
|
max_keepalive_connections=None,
|
||||||
|
),
|
||||||
|
),
|
||||||
"host": host,
|
"host": host,
|
||||||
"port": port,
|
"port": port,
|
||||||
"id": i,
|
"id": i,
|
||||||
@@ -169,6 +183,10 @@ async def send_request_to_service(
|
|||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
|
# read/consume the response body to release the connection
|
||||||
|
# otherwise, it would http.ReadError
|
||||||
|
await response.aread()
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
@@ -206,6 +224,7 @@ async def _handle_completions(api: str, request: Request):
|
|||||||
|
|
||||||
# Extract the needed fields
|
# Extract the needed fields
|
||||||
response_json = response.json()
|
response_json = response.json()
|
||||||
|
await response.aclose() # CRITICAL: Release connection back to pool
|
||||||
kv_transfer_params = response_json.get("kv_transfer_params", {})
|
kv_transfer_params = response_json.get("kv_transfer_params", {})
|
||||||
if kv_transfer_params:
|
if kv_transfer_params:
|
||||||
req_data["kv_transfer_params"] = kv_transfer_params
|
req_data["kv_transfer_params"] = kv_transfer_params
|
||||||
|
|||||||
Reference in New Issue
Block a user