Compare commits

..

30 Commits

Author SHA1 Message Date
Woosuk Kwon
2e0b6e7757 Bump up to v0.2.7 (#2337)
Some checks failed
Create Release / Create Release (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.10, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.11, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.8, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.9, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.10, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.11, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.8, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.9, 2.1.2) (push) Has been cancelled
2024-01-03 17:35:56 -08:00
Woosuk Kwon
941767127c Revert the changes in test_cache (#2335) 2024-01-03 17:32:05 -08:00
Ronen Schaffer
74d8d77626 Remove unused const TIMEOUT_TO_PREVENT_DEADLOCK (#2321) 2024-01-03 15:49:07 -08:00
Zhuohan Li
fd4ea8ef5c Use NCCL instead of ray for control-plane communication to remove serialization overhead (#2221) 2024-01-03 11:30:22 -08:00
Ronen Schaffer
1066cbd152 Remove deprecated parameter: concurrency_count (#2315) 2024-01-03 09:56:21 -08:00
Woosuk Kwon
6ef00b03a2 Enable CUDA graph for GPTQ & SqueezeLLM (#2318) 2024-01-03 09:52:29 -08:00
Roy
9140561059 [Minor] Fix typo and remove unused code (#2305) 2024-01-02 19:23:15 -08:00
Jee Li
77af974b40 [FIX] Support non-zero CUDA devices in custom kernels (#1959) 2024-01-02 19:09:59 -08:00
Jong-hun Shin
4934d49274 Support GPT-NeoX Models without attention biases (#2301) 2023-12-30 11:42:04 -05:00
Zhuohan Li
358c328d69 [BUGFIX] Fix communication test (#2285) 2023-12-27 17:18:11 -05:00
Zhuohan Li
4aaafdd289 [BUGFIX] Fix the path of test prompts (#2273) 2023-12-26 10:37:21 -08:00
Zhuohan Li
66b108d142 [BUGFIX] Fix API server test (#2270) 2023-12-26 10:37:06 -08:00
Zhuohan Li
e0ff920001 [BUGFIX] Do not return ignored sentences twice in async llm engine (#2258) 2023-12-26 13:41:09 +08:00
blueceiling
face83c7ec [Docs] Add "About" Heading to README.md (#2260) 2023-12-25 16:37:07 -08:00
Shivam Thakkar
1db83e31a2 [Docs] Update installation instructions to include CUDA 11.8 xFormers (#2246) 2023-12-22 23:20:02 -08:00
Woosuk Kwon
a1b9cb2a34 [BugFix] Fix recovery logic for sequence group (#2186) 2023-12-20 21:52:37 -08:00
Woosuk Kwon
3a4fd5ca59 Disable Ray usage stats collection (#2206) 2023-12-20 21:52:08 -08:00
Ronen Schaffer
c17daa9f89 [Docs] Fix broken links (#2222) 2023-12-20 12:43:42 -08:00
Antoni Baum
bd29cf3d3a Remove Sampler copy stream (#2209) 2023-12-20 00:04:33 -08:00
Hanzhi Zhou
31bff69151 Make _prepare_sample non-blocking and use pinned memory for input buffers (#2207) 2023-12-19 16:52:46 -08:00
Woosuk Kwon
ba4f826738 [BugFix] Fix weight loading for Mixtral with TP (#2208) 2023-12-19 16:16:11 -08:00
avideci
de60a3fb93 Added DeciLM-7b and DeciLM-7b-instruct (#2062) 2023-12-19 02:29:33 -08:00
Woosuk Kwon
21d5daa4ac Add warning on CUDA graph memory usage (#2182) 2023-12-18 18:16:17 -08:00
Suhong Moon
290e015c6c Update Help Text for --gpu-memory-utilization Argument (#2183) 2023-12-18 11:33:24 -08:00
kliuae
1b7c791d60 [ROCm] Fixes for GPTQ on ROCm (#2180) 2023-12-18 10:41:04 -08:00
JohnSaxon
bbe4466fd9 [Minor] Fix typo (#2166)
Co-authored-by: John-Saxon <zhang.xiangxuan@oushu.com>
2023-12-17 23:28:49 -08:00
Harry Mellor
08133c4d1a Add SSL arguments to API servers (#2109) 2023-12-18 10:56:23 +08:00
Woosuk Kwon
76a7983b23 [BugFix] Fix RoPE kernel on long sequences(#2164) 2023-12-17 17:09:10 -08:00
Woosuk Kwon
8041b7305e [BugFix] Raise error when max_model_len is larger than KV cache (#2163) 2023-12-17 17:08:23 -08:00
Suhong Moon
3ec8c25cd0 [Docs] Update documentation for gpu-memory-utilization option (#2162) 2023-12-17 10:51:57 -08:00
67 changed files with 876 additions and 428 deletions

View File

@@ -27,7 +27,7 @@ Easy, fast, and cheap LLM serving for everyone
- [2023/06] We officially released vLLM! FastChat-vLLM integration has powered [LMSYS Vicuna and Chatbot Arena](https://chat.lmsys.org) since mid-April. Check out our [blog post](https://vllm.ai).
---
## About
vLLM is a fast and easy-to-use library for LLM inference and serving.
vLLM is fast with:
@@ -54,6 +54,7 @@ vLLM seamlessly supports many Hugging Face models, including the following archi
- Baichuan & Baichuan2 (`baichuan-inc/Baichuan2-13B-Chat`, `baichuan-inc/Baichuan-7B`, etc.)
- BLOOM (`bigscience/bloom`, `bigscience/bloomz`, etc.)
- ChatGLM (`THUDM/chatglm2-6b`, `THUDM/chatglm3-6b`, etc.)
- DeciLM (`Deci/DeciLM-7B`, `Deci/DeciLM-7B-instruct`, etc.)
- Falcon (`tiiuae/falcon-7b`, `tiiuae/falcon-40b`, `tiiuae/falcon-rw-7b`, etc.)
- GPT-2 (`gpt2`, `gpt2-xl`, etc.)
- GPT BigCode (`bigcode/starcoder`, `bigcode/gpt_bigcode-santacoder`, etc.)

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h>
#include <torch/extension.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h"
#include "dispatch_utils.h"
@@ -36,6 +37,7 @@ void silu_and_mul(
dim3 grid(num_tokens);
dim3 block(std::min(d, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(),
@@ -71,6 +73,7 @@ __global__ void activation_kernel(
int64_t num_tokens = input.numel() / d; \
dim3 grid(num_tokens); \
dim3 block(std::min(d, 1024)); \
const at::cuda::OptionalCUDAGuard device_guard(device_of(input)); \
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); \
VLLM_DISPATCH_FLOATING_TYPES( \
input.scalar_type(), \

View File

@@ -21,6 +21,7 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "attention_dtypes.h"
#include "attention_utils.cuh"
@@ -616,6 +617,7 @@ void paged_attention_v1_launcher(
dim3 grid(num_heads, num_seqs, 1);
dim3 block(NUM_THREADS);
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
switch (head_size) {
// NOTE(woosuk): To reduce the compilation time, we only compile for the
@@ -784,6 +786,7 @@ void paged_attention_v2_launcher(
int reduce_shared_mem_size = 2 * max_num_partitions * sizeof(float);
dim3 block(NUM_THREADS);
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
switch (head_size) {
// NOTE(woosuk): To reduce the compilation time, we only compile for the

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h"
#include "dispatch_utils.h"
@@ -33,6 +34,7 @@ void swap_blocks(
char *dst_ptr = static_cast<char*>(dst.data_ptr());
const int64_t block_size_in_bytes = src.element_size() * src[0].numel();
const at::cuda::OptionalCUDAGuard device_guard(src_device);
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
// NOTE(woosuk): This can be slow if the number of blocks is large.
for (const auto& pair : block_mapping) {
@@ -127,6 +129,7 @@ void copy_blocks(
const int numel_per_block = key_caches[0][0].numel();
dim3 grid(num_layers, num_pairs);
dim3 block(std::min(1024, numel_per_block));
const at::cuda::OptionalCUDAGuard device_guard(cache_device);
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
key_caches[0].scalar_type(), "copy_blocks_kernel", ([&] {
@@ -207,6 +210,7 @@ void reshape_and_cache(
dim3 grid(num_tokens);
dim3 block(std::min(num_heads * head_size, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
key.scalar_type(),
@@ -367,6 +371,7 @@ void gather_cached_kv(
dim3 grid(num_tokens);
dim3 block(std::min(num_heads * head_size, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
key.scalar_type(),

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "dispatch_utils.h"
#include "reduction_utils.cuh"
@@ -76,6 +77,7 @@ void rms_norm(
dim3 grid(num_tokens);
dim3 block(std::min(hidden_size, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(),
@@ -101,6 +103,7 @@ void fused_add_rms_norm(
dim3 grid(num_tokens);
dim3 block(std::min(hidden_size, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(),

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h"
#include "dispatch_utils.h"
@@ -43,8 +44,8 @@ __global__ void rotary_embedding_kernel(
scalar_t* __restrict__ key, // [batch_size, seq_len, num_kv_heads, head_size] or [num_tokens, num_kv_heads, head_size]
const scalar_t* __restrict__ cos_sin_cache, // [max_position, 2, rot_dim // 2]
const int rot_dim,
const int query_stride,
const int key_stride,
const int64_t query_stride,
const int64_t key_stride,
const int num_heads,
const int num_kv_heads,
const int head_size) {
@@ -60,7 +61,7 @@ __global__ void rotary_embedding_kernel(
const int nq = num_heads * embed_dim;
for (int i = threadIdx.x; i < nq; i += blockDim.x) {
const int head_idx = i / embed_dim;
const int token_head = token_idx * query_stride + head_idx * head_size;
const int64_t token_head = token_idx * query_stride + head_idx * head_size;
const int rot_offset = i % embed_dim;
apply_rotary_embedding<scalar_t, IS_NEOX>(query + token_head, cos_ptr,
sin_ptr, rot_offset, embed_dim);
@@ -69,7 +70,7 @@ __global__ void rotary_embedding_kernel(
const int nk = num_kv_heads * embed_dim;
for (int i = threadIdx.x; i < nk; i += blockDim.x) {
const int head_idx = i / embed_dim;
const int token_head = token_idx * key_stride + head_idx * head_size;
const int64_t token_head = token_idx * key_stride + head_idx * head_size;
const int rot_offset = i % embed_dim;
apply_rotary_embedding<scalar_t, IS_NEOX>(key + token_head, cos_ptr,
sin_ptr, rot_offset, embed_dim);
@@ -89,11 +90,12 @@ void rotary_embedding(
int rot_dim = cos_sin_cache.size(1);
int num_heads = query.size(-1) / head_size;
int num_kv_heads = key.size(-1) / head_size;
int query_stride = query.stride(-2);
int key_stride = key.stride(-2);
int64_t query_stride = query.stride(-2);
int64_t key_stride = key.stride(-2);
dim3 grid(num_tokens);
dim3 block(std::min(num_heads * rot_dim / 2, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES(
query.scalar_type(),

View File

@@ -28,6 +28,7 @@ namespace gptq {
#define DIVIDE(x, size) (((x) + (size) - 1) / (size))
#if defined(USE_ROCM)
#include <hipblas/hipblas.h>
__host__ __forceinline__ hipblasStatus_t __compat_hipblasHgemm(hipblasHandle_t handle,
hipblasOperation_t transA,
hipblasOperation_t transB,
@@ -286,7 +287,8 @@ void gemm_half_q_half_cuda_part
fp_gemm_half_q_half_gptq_kernel kernel = pick_gemm_half_q_half_gptq_kernel(true, m_count);
kernel<<<gridDim, blockDim>>>
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
kernel<<<gridDim, blockDim, 0, stream>>>
(
a,
b_q_weight,
@@ -433,7 +435,8 @@ void reconstruct_exllama
gridDim.y = DIVIDE(height, BLOCK_KN_SIZE);
gridDim.x = DIVIDE(width, BLOCK_KN_SIZE);
reconstruct_exllama_kernel<<<gridDim, blockDim>>>
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
reconstruct_exllama_kernel<<<gridDim, blockDim, 0, stream>>>
(
b_q_weight,
b_q_perm,
@@ -520,12 +523,21 @@ __global__ void gemm_half_q_half_alt_kernel(
zeros_tmp[tmp_k] = zero;
}
for (int m = 0; m < b_end; m++) {
#ifndef USE_ROCM
res2 = {};
#else
res2.x = __half_as_ushort(__float2half(0));
res2.y = __half_as_ushort(__float2half(0));
#endif
res2 = __hfma2(__hfma2(deq2[(tmp >> 0) & 0xff][off], scales_tmp[0], zeros_tmp[0]), blockvec[m][k + 0], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 8) & 0xff][off], scales_tmp[1], zeros_tmp[1]), blockvec[m][k + 1], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 16) & 0xff][off], scales_tmp[2], zeros_tmp[2]), blockvec[m][k + 2], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 24) & 0xff][off], scales_tmp[3], zeros_tmp[3]), blockvec[m][k + 3], res2);
#ifndef USE_ROCM
res[m] = __hadd(res[m], __hadd(res2.x, res2.y));
#else
res[m] = __hadd(res[m], __hadd(__ushort_as_half(res2.x), __ushort_as_half(res2.y)));
#endif
}
i += width;
k += 4;
@@ -557,7 +569,8 @@ void gemm_half_q_half_alt
gridDim.y = DIVIDE(size_m, BLOCK_M_SIZE_MAX);
gridDim.z = DIVIDE(size_k, BLOCK_KN_SIZE);
gemm_half_q_half_alt_kernel<<<gridDim, blockDim>>>
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
gemm_half_q_half_alt_kernel<<<gridDim, blockDim, 0, stream>>>
(
(const half2*) a,
b_q_weight,
@@ -629,7 +642,8 @@ void reconstruct_gptq
blockDim.y = 1;
gridDim.y = DIVIDE(height, 8);
gridDim.x = DIVIDE(width, BLOCK_KN_SIZE);
reconstruct_gptq_kernel<<<gridDim, blockDim>>>
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
reconstruct_gptq_kernel<<<gridDim, blockDim, 0, stream>>>
(
b_q_weight,
b_gptq_scales,
@@ -784,7 +798,8 @@ void shuffle_exllama_weight
gridDim.x = DIVIDE(width, THREADS_X);
gridDim.y = height / 8;
make_sequential_kernel<<<gridDim, blockDim>>>
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
make_sequential_kernel<<<gridDim, blockDim, 0, stream>>>
(
q_weight,
new_qweight,
@@ -803,7 +818,8 @@ void shuffle_exllama_weight
blockDim.y = 1;
gridDim.x = DIVIDE(width, THREADS_X);
gridDim.y = 1;
shuffle_kernel<<<gridDim, blockDim>>>(q_weight, height, width);
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
shuffle_kernel<<<gridDim, blockDim, 0, stream>>>(q_weight, height, width);
}
} // namespace gptq

View File

@@ -7,6 +7,7 @@
// half-tensor
#include <c10/cuda/CUDAStream.h>
#include <ATen/cuda/CUDATensorMethods.cuh>
#include <c10/cuda/CUDAGuard.h>
#define BLOCKWIDTH 128
#define BLOCKHEIGHT4 16
@@ -200,7 +201,9 @@ void squeezellm_gemm(
);
dim3 threads(BLOCKWIDTH);
vllm::squeezellm::NUQ4MatMulKernel<<<blocks, threads>>>(
const at::cuda::OptionalCUDAGuard device_guard(device_of(vec));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
vllm::squeezellm::NUQ4MatMulKernel<<<blocks, threads, 0, stream>>>(
#ifndef USE_ROCM
(half2*) vec.data<at::Half>(),
#else

View File

@@ -116,6 +116,7 @@ Alternatively, if you plan to install vLLM-ROCm on a local machine or start from
- `ROCm <https://rocm.docs.amd.com/en/latest/deploy/linux/index.html>`_
- `Pytorch <https://pytorch.org/>`_
- `hipBLAS <https://rocm.docs.amd.com/projects/hipBLAS/en/latest/install.html>`_
1. Install `flash attention for ROCm <https://github.com/ROCmSoftwarePlatform/flash-attention/tree/flash_attention_for_rocm>`_

View File

@@ -42,6 +42,10 @@ You can install vLLM using pip:
$ pip uninstall torch -y
$ pip install torch --upgrade --index-url https://download.pytorch.org/whl/cu118
$ # Re-install xFormers with CUDA 11.8.
$ pip uninstall xformers -y
$ pip install --upgrade xformers --index-url https://download.pytorch.org/whl/cu118
.. _build_from_source:

View File

@@ -58,11 +58,10 @@ Next, you need to rewrite the :code:`forward` methods of your model by following
+ positions: torch.Tensor,
+ kv_caches: List[KVCache],
+ input_metadata: InputMetadata,
+ cache_events: Optional[List[torch.cuda.Event]],
+) -> SamplerOutput:
+) -> Optional[SamplerOutput]:
3. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors.
4. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture.
1. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors.
2. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture.
.. note::
Currently, vLLM supports the basic multi-head attention mechanism and its variant with rotary positional embeddings.

View File

@@ -89,9 +89,11 @@ Below, you can find an explanation of every engine argument for vLLM:
CPU swap space size (GiB) per GPU.
.. option:: --gpu-memory-utilization <percentage>
.. option:: --gpu-memory-utilization <fraction>
The percentage of GPU memory to be used for the model executor.
The fraction of GPU memory to be used for the model executor, which can range from 0 to 1.
For example, a value of 0.5 would imply 50% GPU memory utilization.
If unspecified, will use the default value of 0.9.
.. option:: --max-num-batched-tokens <tokens>

View File

@@ -23,6 +23,9 @@ Alongside each architecture, we include some popular models that use it.
* - :code:`ChatGLMModel`
- ChatGLM
- :code:`THUDM/chatglm2-6b`, :code:`THUDM/chatglm3-6b`, etc.
* - :code:`DeciLMForCausalLM`
- DeciLM
- :code:`Deci/DeciLM-7B`, :code:`Deci/DeciLM-7B-instruct`, etc.
* - :code:`BloomForCausalLM`
- BLOOM, BLOOMZ, BLOOMChat
- :code:`bigscience/bloom`, :code:`bigscience/bloomz`, etc.
@@ -90,7 +93,7 @@ Alternatively, you can raise an issue on our `GitHub <https://github.com/vllm-pr
If vLLM successfully generates text, it indicates that your model is supported.
.. tip::
To use models from `ModelScope <www.modelscope.cn>`_ instead of HuggingFace Hub, set an environment variable:
To use models from `ModelScope <https://www.modelscope.cn>`_ instead of HuggingFace Hub, set an environment variable:
.. code-block:: shell

View File

@@ -28,4 +28,4 @@ To run inference on a single or multiple GPUs, use ``VLLM`` class from ``langcha
print(llm("What is the capital of France ?"))
Please refer to this `Tutorial <https://github.com/langchain-ai/langchain/blob/master/docs/extras/integrations/llms/vllm.ipynb>`_ for more details.
Please refer to this `Tutorial <https://github.com/langchain-ai/langchain/blob/master/docs/docs/integrations/llms/vllm.ipynb>`_ for more details.

View File

@@ -47,6 +47,6 @@ if __name__ == "__main__":
args = parser.parse_args()
demo = build_demo()
demo.queue(concurrency_count=100).launch(server_name=args.host,
server_port=args.port,
share=True)
demo.queue().launch(server_name=args.host,
server_port=args.port,
share=True)

View File

@@ -3,8 +3,6 @@ typing-extensions>=4.8.0
starlette
psutil
ray >= 2.5.1
pandas # Required for Ray data.
pyarrow # Required for Ray data.
sentencepiece # Required for LLaMA tokenizer.
numpy
tokenizers>=0.15.0

View File

@@ -1,8 +1,6 @@
ninja # For faster builds.
psutil
ray >= 2.5.1
pandas # Required for Ray data.
pyarrow # Required for Ray data.
sentencepiece # Required for LLaMA tokenizer.
numpy
torch == 2.1.2

View File

@@ -219,13 +219,13 @@ vllm_extension_sources = [
"csrc/activation_kernels.cu",
"csrc/layernorm_kernels.cu",
"csrc/quantization/squeezellm/quant_cuda_kernel.cu",
"csrc/quantization/gptq/q_gemm.cu",
"csrc/cuda_utils_kernels.cu",
"csrc/pybind.cpp",
]
if _is_cuda():
vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu")
vllm_extension_sources.append("csrc/quantization/gptq/q_gemm.cu")
vllm_extension = CUDAExtension(
name="vllm._C",

View File

@@ -8,11 +8,11 @@ import pytest
import requests
def _query_server(prompt: str) -> dict:
def _query_server(prompt: str, max_tokens: int = 5) -> dict:
response = requests.post("http://localhost:8000/generate",
json={
"prompt": prompt,
"max_tokens": 100,
"max_tokens": max_tokens,
"temperature": 0,
"ignore_eos": True
})
@@ -20,6 +20,10 @@ def _query_server(prompt: str) -> dict:
return response.json()
def _query_server_long(prompt: str) -> dict:
return _query_server(prompt, max_tokens=500)
@pytest.fixture
def api_server():
script_path = Path(__file__).parent.joinpath(
@@ -44,13 +48,14 @@ def test_api_server(api_server):
"""
with Pool(32) as pool:
# Wait until the server is ready
prompts = ["Hello world"] * 1
prompts = ["warm up"] * 1
result = None
while not result:
try:
for _ in pool.map(_query_server, prompts):
for r in pool.map(_query_server, prompts):
result = r
break
except Exception:
except requests.exceptions.ConnectionError:
time.sleep(1)
# Actual tests start here
@@ -63,12 +68,14 @@ def test_api_server(api_server):
assert num_aborted_requests == 0
# Try with 100 prompts
prompts = ["Hello world"] * 100
prompts = ["test prompt"] * 100
for result in pool.map(_query_server, prompts):
assert result
with Pool(32) as pool:
# Cancel requests
pool.map_async(_query_server, prompts)
prompts = ["canceled requests"] * 100
pool.map_async(_query_server_long, prompts)
time.sleep(0.01)
pool.terminate()
pool.join()
@@ -81,6 +88,6 @@ def test_api_server(api_server):
# check that server still runs after cancellations
with Pool(32) as pool:
# Try with 100 prompts
prompts = ["Hello world"] * 100
prompts = ["test prompt after canceled"] * 100
for result in pool.map(_query_server, prompts):
assert result

View File

@@ -8,8 +8,9 @@ from transformers import AutoModelForCausalLM
from vllm import LLM, SamplingParams
from vllm.transformers_utils.tokenizer import get_tokenizer
_TEST_PROMPTS = ["prompts/example.txt"]
_LONG_PROMPTS = ["prompts/summary.txt"]
_TEST_DIR = os.path.dirname(__file__)
_TEST_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "example.txt")]
_LONG_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "summary.txt")]
def _read_prompts(filename: str) -> str:
@@ -24,7 +25,7 @@ def _read_prompts(filename: str) -> str:
def example_prompts() -> List[str]:
prompts = []
for filename in _TEST_PROMPTS:
prompts += _read_prompts(os.path.join("tests", filename))
prompts += _read_prompts(filename)
return prompts
@@ -32,7 +33,7 @@ def example_prompts() -> List[str]:
def example_long_prompts() -> List[str]:
prompts = []
for filename in _LONG_PROMPTS:
prompts += _read_prompts(os.path.join("tests", filename))
prompts += _read_prompts(filename)
return prompts

View File

@@ -8,7 +8,7 @@ import pytest
import torch
from vllm.config import ParallelConfig
from vllm.engine.ray_utils import get_open_port
from vllm.utils import get_open_port
from vllm.model_executor.parallel_utils.communication_op import (
tensor_model_parallel_all_reduce,
tensor_model_parallel_all_gather,

View File

@@ -12,6 +12,7 @@ def create_kv_caches(
head_size: int,
dtype: torch.dtype,
seed: int,
device: str,
) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
@@ -23,7 +24,7 @@ def create_kv_caches(
for _ in range(num_layers):
key_cache = torch.empty(size=key_cache_shape,
dtype=dtype,
device='cuda')
device=device)
key_cache.uniform_(-scale, scale)
key_caches.append(key_cache)
@@ -32,7 +33,7 @@ def create_kv_caches(
for _ in range(num_layers):
value_cache = torch.empty(size=value_cache_shape,
dtype=dtype,
device='cuda')
device=device)
value_cache.uniform_(-scale, scale)
value_caches.append(value_cache)
return key_caches, value_caches

View File

@@ -7,22 +7,26 @@ DTYPES = [torch.half, torch.bfloat16, torch.float]
NUM_TOKENS = [7, 83, 2048] # Arbitrary values for testing
D = [512, 4096, 5120, 13824] # Arbitrary values for testing
SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_tokens", NUM_TOKENS)
@pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_silu_and_mul(
num_tokens: int,
d: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, 2 * d, dtype=dtype, device="cuda")
gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, 2 * d, dtype=dtype, device=gpu_id)
layer = SiluAndMul()
out = layer(x)
ref_out = layer._forward(x)
@@ -33,16 +37,19 @@ def test_silu_and_mul(
@pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_gelu_new(
num_tokens: int,
d: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, d, dtype=dtype, device="cuda")
gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, d, dtype=dtype, device=gpu_id)
layer = NewGELU()
out = layer(x)
ref_out = layer._forward(x)
@@ -53,15 +60,18 @@ def test_gelu_new(
@pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
def test_gelu_fast(
num_tokens: int,
d: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, d, dtype=dtype, device="cuda")
gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, d, dtype=dtype, device=gpu_id)
layer = FastGELU()
out = layer(x)
ref_out = layer._forward(x)

View File

@@ -24,6 +24,7 @@ HEAD_SIZES = [64, 80, 96, 112, 128, 256]
BLOCK_SIZES = [16, 32]
USE_ALIBI = [False, True]
SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
def ref_masked_attention(
@@ -87,7 +88,7 @@ def ref_single_query_cached_kv_attention(
alibi_bias = None
if alibi_slopes is not None:
# Create the ALiBi bias used in the paged attention kernel.
position_ids = torch.arange(context_len, device="cuda").int()
position_ids = torch.arange(context_len, device=query.device).int()
alibi_bias = (position_ids - context_len + 1).float()
alibi_bias = alibi_slopes.view(-1, 1, 1) * alibi_bias.view(
1, 1, -1)
@@ -105,6 +106,7 @@ def ref_single_query_cached_kv_attention(
@pytest.mark.parametrize("block_size", BLOCK_SIZES)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
def test_paged_attention(
kv_cache_factory,
version: str,
@@ -115,18 +117,19 @@ def test_paged_attention(
block_size: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
random.seed(seed)
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
scale = float(1.0 / (head_size**0.5))
num_query_heads, num_kv_heads = num_heads
query = torch.empty(num_seqs,
num_query_heads,
head_size,
dtype=dtype,
device="cuda")
device=gpu_id)
query.uniform_(-scale, scale)
assert num_query_heads % num_kv_heads == 0
@@ -135,12 +138,12 @@ def test_paged_attention(
if use_alibi:
alibi_slopes = torch.randn(num_query_heads,
dtype=torch.float,
device="cuda")
device=gpu_id)
context_lens = [random.randint(1, MAX_SEQ_LEN) for _ in range(num_seqs)]
context_lens[-1] = MAX_SEQ_LEN
max_context_len = max(context_lens)
context_lens = torch.tensor(context_lens, dtype=torch.int, device="cuda")
context_lens = torch.tensor(context_lens, dtype=torch.int, device=gpu_id)
# Create the block tables.
max_num_blocks_per_seq = (max_context_len + block_size - 1) // block_size
@@ -151,12 +154,12 @@ def test_paged_attention(
for _ in range(max_num_blocks_per_seq)
]
block_tables.append(block_table)
block_tables = torch.tensor(block_tables, dtype=torch.int, device="cuda")
block_tables = torch.tensor(block_tables, dtype=torch.int, device=gpu_id)
# Create the KV caches.
key_caches, value_caches = kv_cache_factory(NUM_BLOCKS, block_size, 1,
num_kv_heads, head_size, dtype,
seed)
seed, gpu_id)
key_cache, value_cache = key_caches[0], value_caches[0]
# Call the paged attention kernel.
@@ -249,7 +252,7 @@ def ref_multi_query_kv_attention(
attn_mask = torch.triu(torch.ones(seq_len, seq_len, dtype=dtype),
diagonal=1)
attn_mask = attn_mask * torch.finfo(dtype).min
attn_mask = attn_mask.to(dtype=dtype, device="cuda")
attn_mask = attn_mask.to(dtype=dtype, device=query.device)
ref_output = ref_masked_attention(
query[start_idx:end_idx],
@@ -269,6 +272,7 @@ def ref_multi_query_kv_attention(
@pytest.mark.parametrize("head_size", HEAD_SIZES)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_multi_query_kv_attention(
num_seqs: int,
@@ -276,11 +280,12 @@ def test_multi_query_kv_attention(
head_size: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
random.seed(seed)
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# MAX_SEQ_LEN sometimes causes OOM in the reference implementation.
# As the xformers library is already tested with its own tests, we can use
# a smaller MAX_SEQ_LEN here.
@@ -294,7 +299,7 @@ def test_multi_query_kv_attention(
num_query_heads + 2 * num_kv_heads,
head_size,
dtype=dtype,
device="cuda")
device=gpu_id)
qkv.uniform_(-scale, scale)
query, key, value = qkv.split(
[num_query_heads, num_kv_heads, num_kv_heads], dim=1)

View File

@@ -14,6 +14,7 @@ BLOCK_SIZES = [8, 16, 32]
NUM_BLOCKS = [1024, 36000] # Arbitrary values for testing
NUM_MAPPINGS = [256] # Arbitrary values for testing
SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS)
@@ -24,6 +25,7 @@ SEEDS = [0]
@pytest.mark.parametrize("num_blocks", NUM_BLOCKS)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_copy_blocks(
kv_cache_factory,
@@ -35,11 +37,12 @@ def test_copy_blocks(
num_blocks: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
random.seed(seed)
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# Generate random block mappings where each source block is mapped to two
# destination blocks.
assert 2 * num_mappings <= num_blocks
@@ -56,7 +59,7 @@ def test_copy_blocks(
# Create the KV caches.
key_caches, value_caches = kv_cache_factory(num_blocks, block_size,
num_layers, num_heads,
head_size, dtype, seed)
head_size, dtype, seed, gpu_id)
# Clone the KV caches.
cloned_key_caches = [key_cache.clone() for key_cache in key_caches]
@@ -88,6 +91,7 @@ def test_copy_blocks(
@pytest.mark.parametrize("num_blocks", NUM_BLOCKS)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_reshape_and_cache(
kv_cache_factory,
@@ -98,28 +102,29 @@ def test_reshape_and_cache(
num_blocks: int,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
random.seed(seed)
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# Create a random slot mapping.
num_slots = block_size * num_blocks
slot_mapping = random.sample(range(num_slots), num_tokens)
slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device="cuda")
slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device=gpu_id)
qkv = torch.randn(num_tokens,
3,
num_heads,
head_size,
dtype=dtype,
device="cuda")
device=gpu_id)
_, key, value = qkv.unbind(dim=1)
# Create the KV caches.
key_caches, value_caches = kv_cache_factory(num_blocks, block_size, 1,
num_heads, head_size, dtype,
seed)
seed, gpu_id)
key_cache, value_cache = key_caches[0], value_caches[0]
# Clone the KV caches.

View File

@@ -8,6 +8,7 @@ NUM_TOKENS = [7, 83, 4096] # Arbitrary values for testing
HIDDEN_SIZES = [768, 5120, 8192] # Arbitrary values for testing
ADD_RESIDUAL = [False, True]
SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_tokens", NUM_TOKENS)
@@ -15,6 +16,7 @@ SEEDS = [0]
@pytest.mark.parametrize("add_residual", ADD_RESIDUAL)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_rms_norm(
num_tokens: int,
@@ -22,14 +24,15 @@ def test_rms_norm(
add_residual: bool,
dtype: torch.dtype,
seed: int,
device: int,
) -> None:
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
layer = RMSNorm(hidden_size).to(dtype).cuda()
gpu_id = f"cuda:{device}"
layer = RMSNorm(hidden_size).to(dtype=dtype, device=gpu_id)
layer.weight.data.normal_(mean=1.0, std=0.1)
scale = 1 / (2 * hidden_size)
x = torch.randn(num_tokens, hidden_size, dtype=dtype, device="cuda")
x = torch.randn(num_tokens, hidden_size, dtype=dtype, device=gpu_id)
x *= scale
residual = torch.randn_like(x) * scale if add_residual else None

View File

@@ -13,6 +13,7 @@ NUM_HEADS = [7, 17] # Arbitrary values for testing
BATCH_SIZES = [1, 5] # Arbitrary values for testing
SEQ_LENS = [11, 8192] # Arbitrary values for testing
SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("is_neox_style", IS_NEOX_STYLE)
@@ -23,6 +24,7 @@ SEEDS = [0]
@pytest.mark.parametrize("rotary_dim", ROTARY_DIMS)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_rotary_embedding(
is_neox_style: bool,
@@ -33,6 +35,7 @@ def test_rotary_embedding(
rotary_dim: Optional[int],
dtype: torch.dtype,
seed: int,
device: int,
max_position: int = 8192,
base: int = 10000,
) -> None:
@@ -40,20 +43,20 @@ def test_rotary_embedding(
rotary_dim = head_size
torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
if rotary_dim is None:
rotary_dim = head_size
rope = get_rope(head_size, rotary_dim, max_position, base, is_neox_style)
rope = rope.to(dtype).cuda()
rope = rope.to(dtype=dtype, device=gpu_id)
positions = torch.randint(0,
max_position, (batch_size, seq_len),
device="cuda")
device=gpu_id)
query = torch.randn(batch_size,
seq_len,
num_heads * head_size,
dtype=dtype,
device="cuda")
device=gpu_id)
key = torch.randn_like(query)
# NOTE(woosuk): The reference implementation should be executed first

View File

@@ -8,6 +8,7 @@ MODELS = [
"facebook/opt-125m",
"meta-llama/Llama-2-7b-hf",
"mistralai/Mistral-7B-v0.1",
"Deci/DeciLM-7b",
"tiiuae/falcon-7b",
"gpt2",
"bigcode/tiny_starcoder_py",

View File

@@ -33,8 +33,9 @@ def test_prepare_prompt():
expected_selected_token_indices.append(selected_token_start_idx +
prompt_len - 1)
selected_token_start_idx += max_seq_len
input_tokens, input_positions, _ = model_runner._prepare_prompt(
seq_group_metadata_list)
input_tokens, input_positions, _, return_prompt_lens = (
model_runner._prepare_prompt(seq_group_metadata_list))
assert return_prompt_lens == prompt_lens
sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list,
prompt_lens)
assert input_tokens.shape == (batch_size, max_seq_len)

View File

@@ -8,7 +8,7 @@ from vllm.entrypoints.llm import LLM
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import SamplingParams
__version__ = "0.2.6"
__version__ = "0.2.7"
__all__ = [
"LLM",

View File

@@ -112,24 +112,20 @@ class ModelConfig:
supported_load_format = [
"auto", "pt", "safetensors", "npcache", "dummy"
]
rocm_not_supported_load_format = ["safetensors"]
rocm_not_supported_load_format = []
if load_format not in supported_load_format:
raise ValueError(
f"Unknown load format: {self.load_format}. Must be one of "
"'auto', 'pt', 'safetensors', 'npcache', or 'dummy'.")
if is_hip():
if load_format in ["safetensors"]:
rocm_supported_load_format = [
f for f in supported_load_format
if (f not in rocm_not_supported_load_format)
]
raise ValueError(
f"load format \'{load_format}\' is not supported in ROCm. "
f"Supported load format are "
f"{rocm_supported_load_format}")
# Force ROCm to load from pt weights if nothing specific is set
if load_format == "auto":
load_format = "pt"
if is_hip() and load_format in rocm_not_supported_load_format:
rocm_supported_load_format = [
f for f in supported_load_format
if (f not in rocm_not_supported_load_format)
]
raise ValueError(
f"load format \'{load_format}\' is not supported in ROCm. "
f"Supported load format are "
f"{rocm_supported_load_format}")
# TODO: Remove this check once HF updates the pt weights of Mixtral.
architectures = getattr(self.hf_config, "architectures", [])
@@ -149,7 +145,7 @@ class ModelConfig:
def _verify_quantization(self) -> None:
supported_quantization = ["awq", "gptq", "squeezellm"]
rocm_not_supported_quantization = ["awq", "gptq"]
rocm_not_supported_quantization = ["awq"]
if self.quantization is not None:
self.quantization = self.quantization.lower()
@@ -185,12 +181,6 @@ class ModelConfig:
self.max_context_len_to_capture = self.max_model_len
self.max_context_len_to_capture = min(self.max_context_len_to_capture,
self.max_model_len)
if (self.quantization in ["gptq", "squeezellm"]
and not self.enforce_eager):
# Related issue: https://github.com/vllm-project/vllm/issues/2147
logger.warning(f"{self.quantization} does not support CUDA graph "
"yet. Disabling CUDA graph.")
self.enforce_eager = True
def verify_with_parallel_config(
self,

View File

@@ -103,7 +103,7 @@ class BlockSpaceManager:
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
# FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences.
seq = seq_group.get_seqs()[0]
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
num_required_blocks = len(seq.logical_token_blocks)
if self.block_sliding_window is not None:
num_required_blocks = min(num_required_blocks,
@@ -122,7 +122,7 @@ class BlockSpaceManager:
def allocate(self, seq_group: SequenceGroup) -> None:
# NOTE: Here we assume that all sequences in the group have the same
# prompt.
seq = seq_group.get_seqs()[0]
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
# Allocate new physical token blocks that will store the prompt tokens.
block_table: BlockTable = []
@@ -137,7 +137,7 @@ class BlockSpaceManager:
block_table.append(block)
# Assign the block table for each sequence.
for seq in seq_group.get_seqs():
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
self.block_tables[seq.seq_id] = block_table.copy()
def can_append_slot(self, seq_group: SequenceGroup) -> bool:

View File

@@ -139,15 +139,17 @@ class Scheduler:
while self.waiting:
seq_group = self.waiting[0]
assert seq_group.num_seqs() == 1, (
waiting_seqs = seq_group.get_seqs(
status=SequenceStatus.WAITING)
assert len(waiting_seqs) == 1, (
"Waiting sequence group should have only one prompt "
"sequence.")
num_prompt_tokens = seq_group.get_seqs()[0].get_len()
num_prompt_tokens = waiting_seqs[0].get_len()
if num_prompt_tokens > self.prompt_limit:
logger.warning(
f"Input prompt ({num_prompt_tokens} tokens) is too long"
f" and exceeds limit of {self.prompt_limit}")
for seq in seq_group.get_seqs():
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
self.waiting.pop(0)
@@ -161,7 +163,7 @@ class Scheduler:
logger.warning(
f"Input prompt ({num_prompt_tokens} tokens) is too long"
f" and exceeds the capacity of block_manager")
for seq in seq_group.get_seqs():
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
self.waiting.pop(0)
@@ -317,7 +319,7 @@ class Scheduler:
def _allocate(self, seq_group: SequenceGroup) -> None:
self.block_manager.allocate(seq_group)
for seq in seq_group.get_seqs():
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
seq.status = SequenceStatus.RUNNING
def _append_slot(

View File

@@ -156,11 +156,13 @@ class EngineArgs:
type=int,
default=EngineArgs.swap_space,
help='CPU swap space size (GiB) per GPU')
parser.add_argument('--gpu-memory-utilization',
type=float,
default=EngineArgs.gpu_memory_utilization,
help='the percentage of GPU memory to be used for'
'the model executor')
parser.add_argument(
'--gpu-memory-utilization',
type=float,
default=EngineArgs.gpu_memory_utilization,
help='the fraction of GPU memory to be used for '
'the model executor, which can range from 0 to 1.'
'If unspecified, will use the default value of 0.9.')
parser.add_argument('--max-num-batched-tokens',
type=int,
default=EngineArgs.max_num_batched_tokens,

View File

@@ -183,49 +183,53 @@ class _AsyncLLMEngine(LLMEngine):
and updates the scheduler with the model outputs. Finally, it decodes
the sequences and returns the newly generated results.
"""
seq_group_metadata_list, scheduler_outputs, ignored = self._schedule()
if scheduler_outputs.is_empty():
return ignored
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
# Execute the model.
output = await self._run_workers_async(
"execute_model",
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
)
if not scheduler_outputs.is_empty():
# Execute the model.
all_outputs = await self._run_workers_async(
"execute_model",
driver_kwargs={
"seq_group_metadata_list": seq_group_metadata_list,
"blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
"blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
})
return self._process_model_outputs(output, scheduler_outputs) + ignored
# Only the driver worker returns the sampling results.
output = all_outputs[0]
else:
output = []
return self._process_model_outputs(output, scheduler_outputs)
async def _run_workers_async(
self,
method: str,
*args,
get_all_outputs: bool = False,
driver_args: Optional[List[Any]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
"""Runs the given method on all workers."""
coros = []
if driver_args is None:
driver_args = args
if driver_kwargs is None:
driver_kwargs = kwargs
# Run the driver worker asynchronously.
driver_executor = getattr(self.driver_worker, method)
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(driver_executor, *driver_args, **driver_kwargs)))
# Run the ray workers asynchronously.
for worker in self.workers:
if self.parallel_config.worker_use_ray:
coros.append(
worker.execute_method.remote(method, *args, **kwargs))
else:
executor = getattr(worker, method)
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(executor, *args, **kwargs)))
coros.append(worker.execute_method.remote(method, *args, **kwargs))
all_outputs = await asyncio.gather(*coros)
if get_all_outputs:
return all_outputs
# Make sure all workers have the same results.
output = all_outputs[0]
for other_output in all_outputs[1:]:
assert output == other_output
return output
return all_outputs
class AsyncLLMEngine:
@@ -490,13 +494,12 @@ class AsyncLLMEngine:
engine_configs = engine_args.create_engine_configs()
parallel_config = engine_configs[2]
# Initialize the cluster.
distributed_init_method, placement_group = initialize_cluster(
parallel_config, engine_args.engine_use_ray)
placement_group = initialize_cluster(parallel_config,
engine_args.engine_use_ray)
# Create the async LLM engine.
engine = cls(parallel_config.worker_use_ray,
engine_args.engine_use_ray,
*engine_configs,
distributed_init_method,
placement_group,
log_requests=not engine_args.disable_log_requests,
log_stats=not engine_args.disable_log_stats,

View File

@@ -1,7 +1,9 @@
import copy
from collections import defaultdict
import os
import time
from functools import partial
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union
from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple,
Union)
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
SchedulerConfig)
@@ -13,14 +15,12 @@ from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.sampling_params import SamplingParams
from vllm.sequence import (SamplerOutput, Sequence, SequenceGroup,
SequenceGroupMetadata, SequenceGroupOutput,
SequenceOutput, SequenceStatus)
SequenceGroupOutput, SequenceOutput, SequenceStatus)
from vllm.transformers_utils.tokenizer import (detokenize_incrementally,
get_tokenizer)
from vllm.utils import Counter
from vllm.utils import Counter, set_cuda_visible_devices, get_ip, get_open_port
if ray:
from ray.air.util.torch_dist import init_torch_dist_process_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
if TYPE_CHECKING:
@@ -53,8 +53,6 @@ class LLMEngine:
management.
parallel_config: The configuration related to distributed execution.
scheduler_config: The configuration related to the request scheduler.
distributed_init_method: The initialization method for distributed
execution. See `torch.distributed.init_process_group` for details.
placement_group: Ray placement group for distributed execution.
Required for distributed execution.
log_stats: Whether to log statistics.
@@ -66,7 +64,6 @@ class LLMEngine:
cache_config: CacheConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
distributed_init_method: str,
placement_group: Optional["PlacementGroup"],
log_stats: bool,
) -> None:
@@ -105,9 +102,13 @@ class LLMEngine:
# Create the parallel GPU workers.
if self.parallel_config.worker_use_ray:
# Disable Ray usage stats collection.
ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
if ray_usage != "1":
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
self._init_workers_ray(placement_group)
else:
self._init_workers(distributed_init_method)
self._init_workers()
# Profile the memory usage and initialize the cache.
self._init_cache()
@@ -122,7 +123,7 @@ class LLMEngine:
# List of (timestamp, num_tokens)
self.num_generation_tokens: List[Tuple[float, int]] = []
def _init_workers(self, distributed_init_method: str):
def _init_workers(self):
# Lazy import the Worker to avoid importing torch.cuda/xformers
# before CUDA_VISIBLE_DEVICES is set in the Worker
from vllm.worker.worker import Worker
@@ -131,70 +132,122 @@ class LLMEngine:
"Ray is required if parallel_config.world_size > 1.")
self.workers: List[Worker] = []
worker = Worker(
distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}"
self.driver_worker = Worker(
self.model_config,
self.parallel_config,
self.scheduler_config,
0,
distributed_init_method,
)
self.workers.append(worker)
self._run_workers(
"init_model",
get_all_outputs=True,
)
self._run_workers(
"load_model",
get_all_outputs=True,
max_concurrent_workers=self.parallel_config.
max_parallel_loading_workers,
local_rank=0,
rank=0,
distributed_init_method=distributed_init_method,
is_driver_worker=True,
)
self._run_workers("init_model")
self._run_workers("load_model")
def _init_workers_ray(self, placement_group: "PlacementGroup",
**ray_remote_kwargs):
if self.parallel_config.tensor_parallel_size == 1:
num_gpus = self.cache_config.gpu_memory_utilization
else:
num_gpus = 1
self.driver_dummy_worker: RayWorkerVllm = None
self.workers: List[RayWorkerVllm] = []
driver_ip = get_ip()
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0):
continue
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=bundle_id,
)
worker = ray.remote(
num_cpus=0,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs,
)(RayWorkerVllm).remote(self.model_config.trust_remote_code)
worker_ip = ray.get(worker.get_node_ip.remote())
if worker_ip == driver_ip and self.driver_dummy_worker is None:
# If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process.
self.driver_dummy_worker = worker
else:
self.workers.append(worker)
if self.driver_dummy_worker is None:
raise ValueError(
"Ray does not allocate any GPUs on the driver node. Consider "
"adjusting the Ray placement group or running the driver on a "
"GPU node.")
driver_node_id, driver_gpu_ids = ray.get(
self.driver_dummy_worker.get_node_and_gpu_ids.remote())
worker_node_and_gpu_ids = ray.get(
[worker.get_node_and_gpu_ids.remote() for worker in self.workers])
node_workers = defaultdict(list)
node_gpus = defaultdict(list)
node_workers[driver_node_id].append(0)
node_gpus[driver_node_id].extend(driver_gpu_ids)
for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids,
start=1):
node_workers[node_id].append(i)
node_gpus[node_id].extend(gpu_ids)
for node_id, gpu_ids in node_gpus.items():
node_gpus[node_id] = sorted(gpu_ids)
# Set CUDA_VISIBLE_DEVICES for the driver.
set_cuda_visible_devices(node_gpus[driver_node_id])
for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids):
worker.set_cuda_visible_devices.remote(node_gpus[node_id])
distributed_init_method = f"tcp://{driver_ip}:{get_open_port()}"
# Lazy import the Worker to avoid importing torch.cuda/xformers
# before CUDA_VISIBLE_DEVICES is set in the Worker
from vllm.worker.worker import Worker
self.workers: List[Worker] = []
for bundle in placement_group.bundle_specs:
if not bundle.get("GPU", 0):
continue
if self.parallel_config.tensor_parallel_size == 1:
num_gpus = self.cache_config.gpu_memory_utilization
else:
num_gpus = 1
worker = ray.remote(
num_cpus=0,
num_gpus=num_gpus,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True),
**ray_remote_kwargs,
)(RayWorkerVllm).remote(self.model_config.trust_remote_code)
self.workers.append(worker)
# Initialize torch distributed process group for the workers.
init_torch_dist_process_group(self.workers, backend="nccl")
model_config = copy.deepcopy(self.model_config)
parallel_config = copy.deepcopy(self.parallel_config)
scheduler_config = copy.deepcopy(self.scheduler_config)
self._run_workers("init_worker",
get_all_outputs=True,
worker_init_fn=lambda: Worker(
model_config,
parallel_config,
scheduler_config,
None,
None,
))
self._run_workers(
"init_model",
get_all_outputs=True,
for rank, (worker, (node_id,
_)) in enumerate(zip(self.workers,
worker_node_and_gpu_ids),
start=1):
local_rank = node_workers[node_id].index(rank)
worker.init_worker.remote(
lambda rank=rank, local_rank=local_rank: Worker(
model_config,
parallel_config,
scheduler_config,
local_rank,
rank,
distributed_init_method,
))
driver_rank = 0
driver_local_rank = node_workers[driver_node_id].index(driver_rank)
self.driver_worker = Worker(
model_config,
parallel_config,
scheduler_config,
driver_local_rank,
driver_rank,
distributed_init_method,
is_driver_worker=True,
)
self._run_workers("init_model")
self._run_workers(
"load_model",
get_all_outputs=True,
max_concurrent_workers=self.parallel_config.
max_parallel_loading_workers,
)
@@ -208,7 +261,6 @@ class LLMEngine:
# Get the maximum number of blocks that can be allocated on GPU and CPU.
num_blocks = self._run_workers(
"profile_num_available_blocks",
get_all_outputs=True,
block_size=self.cache_config.block_size,
gpu_memory_utilization=self.cache_config.gpu_memory_utilization,
cpu_swap_space=self.cache_config.swap_space_bytes,
@@ -227,6 +279,14 @@ class LLMEngine:
raise ValueError("No available memory for the cache blocks. "
"Try increasing `gpu_memory_utilization` when "
"initializing the engine.")
max_seq_len = self.cache_config.block_size * num_gpu_blocks
if self.model_config.max_model_len > max_seq_len:
raise ValueError(
f"The model's max seq len ({self.model_config.max_model_len}) "
"is larger than the maximum number of tokens that can be "
f"stored in KV cache ({max_seq_len}). Try increasing "
"`gpu_memory_utilization` or decreasing `max_model_len` when "
"initializing the engine.")
self.cache_config.num_gpu_blocks = num_gpu_blocks
self.cache_config.num_cpu_blocks = num_cpu_blocks
@@ -244,11 +304,9 @@ class LLMEngine:
engine_configs = engine_args.create_engine_configs()
parallel_config = engine_configs[2]
# Initialize the cluster.
distributed_init_method, placement_group = initialize_cluster(
parallel_config)
placement_group = initialize_cluster(parallel_config)
# Create the LLM engine.
engine = cls(*engine_configs,
distributed_init_method,
placement_group,
log_stats=not engine_args.disable_log_stats)
return engine
@@ -315,16 +373,6 @@ class LLMEngine:
"""Returns True if there are unfinished requests."""
return self.scheduler.has_unfinished_seqs()
def _schedule(
self
) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs,
List[RequestOutput]]:
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
return seq_group_metadata_list, scheduler_outputs, [
RequestOutput.from_seq_group(seq_group)
for seq_group in scheduler_outputs.ignored_seq_groups
]
def _check_beam_search_early_stopping(
self,
early_stopping: Union[bool, str],
@@ -573,18 +621,23 @@ class LLMEngine:
and updates the scheduler with the model outputs. Finally, it decodes
the sequences and returns the newly generated results.
"""
seq_group_metadata_list, scheduler_outputs, ignored = self._schedule()
if scheduler_outputs.is_empty():
return ignored
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
# Execute the model.
output = self._run_workers(
"execute_model",
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
)
if not scheduler_outputs.is_empty():
# Execute the model.
all_outputs = self._run_workers(
"execute_model",
driver_kwargs={
"seq_group_metadata_list": seq_group_metadata_list,
"blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
"blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
})
# Only the driver worker returns the sampling results.
output = all_outputs[0]
else:
output = []
return self._process_model_outputs(output, scheduler_outputs)
@@ -712,53 +765,38 @@ class LLMEngine:
seq.status = SequenceStatus.FINISHED_STOPPED
return
def _run_workers_in_batch(
self,
workers,
method: str,
*args,
**kwargs,
):
all_outputs = []
for worker in workers:
if self.parallel_config.worker_use_ray:
executor = partial(worker.execute_method.remote, method)
else:
executor = getattr(worker, method)
output = executor(*args, **kwargs)
all_outputs.append(output)
if self.parallel_config.worker_use_ray:
all_outputs = ray.get(all_outputs)
return all_outputs
def _run_workers(
self,
method: str,
*args,
get_all_outputs: bool = False,
driver_args: Optional[List[Any]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
max_concurrent_workers: Optional[int] = None,
**kwargs,
) -> Any:
"""Runs the given method on all workers."""
all_outputs = []
if max_concurrent_workers:
work_groups = [
self.workers[i:i + max_concurrent_workers]
for i in range(0, len(self.workers), max_concurrent_workers)
]
else:
work_groups = [self.workers]
raise NotImplementedError(
"max_concurrent_workers is not supported yet.")
for workers in work_groups:
all_outputs.extend(
self._run_workers_in_batch(workers, method, *args, **kwargs))
# Start the ray workers first.
ray_worker_outputs = [
worker.execute_method.remote(method, *args, **kwargs)
for worker in self.workers
]
if get_all_outputs:
return all_outputs
if driver_args is None:
driver_args = args
if driver_kwargs is None:
driver_kwargs = kwargs
# Make sure all workers have the same results.
output = all_outputs[0]
for other_output in all_outputs[1:]:
assert output == other_output
return output
# Start the driver worker after all the ray workers.
driver_worker_output = getattr(self.driver_worker,
method)(*driver_args, **driver_kwargs)
# Get the results of the ray workers.
if self.workers:
ray_worker_outputs = ray.get(ray_worker_outputs)
return [driver_worker_output] + ray_worker_outputs

View File

@@ -1,16 +1,15 @@
from typing import Optional, Tuple, TYPE_CHECKING
from typing import Optional, List, Tuple, TYPE_CHECKING
from vllm.config import ParallelConfig
from vllm.logger import init_logger
from vllm.utils import get_open_port, is_hip
from vllm.utils import is_hip, set_cuda_visible_devices, get_ip
logger = init_logger(__name__)
try:
import ray
from ray.air.util.torch_dist import TorchDistributedWorker
class RayWorkerVllm(TorchDistributedWorker):
class RayWorkerVllm:
"""Ray wrapper for vllm.worker.Worker, allowing Worker to be
lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES."""
@@ -30,12 +29,22 @@ try:
executor = getattr(self, method)
return executor(*args, **kwargs)
def get_node_ip(self) -> str:
return get_ip()
def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]:
node_id = ray.get_runtime_context().get_node_id()
gpu_ids = ray.get_gpu_ids()
return node_id, gpu_ids
def set_cuda_visible_devices(self, device_ids) -> None:
set_cuda_visible_devices(device_ids)
except ImportError as e:
logger.warning(f"Failed to import Ray with {e!r}. "
"For distributed inference, please install Ray with "
"`pip install ray pandas pyarrow`.")
ray = None
TorchDistributedWorker = None
RayWorkerVllm = None
if TYPE_CHECKING:
@@ -75,13 +84,11 @@ def initialize_cluster(
ray.init(address=ray_address, ignore_reinit_error=True)
if not parallel_config.worker_use_ray:
# Initialize cluster locally.
port = get_open_port()
# We need to setup the distributed init method to make sure
# the distributed megatron code (e.g., get world size) works correctly.
distributed_init_method = f"tcp://localhost:{port}"
return distributed_init_method, None
assert parallel_config.world_size == 1, (
"Ray is required if parallel_config.world_size > 1.")
return None
# Create placement group for worker processes
current_placement_group = ray.util.get_current_placement_group()
if current_placement_group:
# We are in a placement group
@@ -106,12 +113,12 @@ def initialize_cluster(
"The number of required GPUs exceeds the total number of "
"available GPUs in the cluster.")
# Create a new placement group
current_placement_group = ray.util.placement_group([{
"GPU": 1
}] * parallel_config.world_size)
placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
current_placement_group = ray.util.placement_group(
placement_group_specs)
# Wait until PG is ready - this will block until all
# requested resources are available, and will timeout
# if they cannot be provisioned.
ray.get(current_placement_group.ready(), timeout=1800)
return None, current_placement_group
return current_placement_group

View File

@@ -12,7 +12,6 @@ from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid
TIMEOUT_KEEP_ALIVE = 5 # seconds.
TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds.
app = FastAPI()
engine = None
@@ -73,6 +72,8 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default=None)
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--ssl-keyfile", type=str, default=None)
parser.add_argument("--ssl-certfile", type=str, default=None)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args()
@@ -83,4 +84,6 @@ if __name__ == "__main__":
host=args.host,
port=args.port,
log_level="debug",
timeout_keep_alive=TIMEOUT_KEEP_ALIVE)
timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile)

View File

@@ -80,6 +80,14 @@ def parse_args():
default="assistant",
help="The role name to return if "
"`request.add_generation_prompt=true`.")
parser.add_argument("--ssl-keyfile",
type=str,
default=None,
help="The file path to the SSL key file")
parser.add_argument("--ssl-certfile",
type=str,
default=None,
help="The file path to the SSL cert file")
parser = AsyncEngineArgs.add_cli_args(parser)
return parser.parse_args()
@@ -744,4 +752,6 @@ if __name__ == "__main__":
host=args.host,
port=args.port,
log_level="info",
timeout_keep_alive=TIMEOUT_KEEP_ALIVE)
timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile)

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Optional
import torch
@@ -16,28 +16,27 @@ class InputMetadata:
def __init__(
self,
prompt_lens: List[int],
is_prompt: bool,
slot_mapping: torch.Tensor,
max_context_len: Optional[int],
context_lens: Optional[torch.Tensor],
block_tables: Optional[torch.Tensor],
use_cuda_graph: bool,
) -> None:
self.prompt_lens = prompt_lens
self.is_prompt = is_prompt
self.max_context_len = max_context_len
self.slot_mapping = slot_mapping
self.context_lens = context_lens
self.block_tables = block_tables
self.use_cuda_graph = use_cuda_graph
self.is_prompt = len(prompt_lens) > 0
# Set during the execution of the first attention op.
# FIXME(woosuk): This is a hack.
self.attn_bias = None
def __repr__(self) -> str:
return ("InputMetadata("
f"prompt_lens={self.prompt_lens}, "
f"is_prompt={self.is_prompt}, "
f"max_context_len={self.max_context_len}, "
f"slot_mapping={self.slot_mapping}, "
f"context_lens={self.context_lens}, "

View File

@@ -5,7 +5,7 @@ import torch
import torch.nn as nn
from vllm.model_executor.parallel_utils.communication_op import (
tensor_model_parallel_all_gather)
tensor_model_parallel_gather)
from vllm.model_executor.sampling_metadata import SamplingMetadata, SamplingTensors
from vllm.sampling_params import SamplingParams, SamplingType
from vllm.sequence import (PromptLogprobs, SampleLogprobs, SamplerOutput,
@@ -30,7 +30,6 @@ class Sampler(nn.Module):
def __init__(self, vocab_size: int) -> None:
super().__init__()
self.vocab_size = vocab_size
self._copy_stream: torch.cuda.Stream = torch.cuda.Stream()
def forward(
self,
@@ -38,7 +37,7 @@ class Sampler(nn.Module):
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
embedding_bias: Optional[torch.Tensor] = None,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
# Get the hidden states that we use for sampling.
hidden_states = _prune_hidden_states(hidden_states, sampling_metadata)
@@ -46,19 +45,23 @@ class Sampler(nn.Module):
logits = _get_logits(hidden_states, embedding, embedding_bias,
self.vocab_size)
# Only perform sampling in the driver worker.
# Note: `_get_logits` is still distributed across TP workers because
# the `embedding` weight is distributed across TP workers.
# TODO(zhuohan): Change the get_logits part to a separate stage.
if not sampling_metadata.perform_sampling:
return None
assert logits is not None
_, vocab_size = logits.shape
# Apply logits processors (if any).
logits = _apply_logits_processors(logits, sampling_metadata)
# Prepare sampling tensors in another stream to overlap
# CPU<->GPU data transfer with GPU computation in forward pass.
with torch.cuda.stream(self._copy_stream):
(sampling_tensors, do_penalties, do_top_p_top_k,
do_min_p) = SamplingTensors.from_sampling_metadata(
sampling_metadata, vocab_size, logits.device, logits.dtype)
torch.cuda.current_stream().wait_stream(self._copy_stream)
# Prepare sampling tensors with pinned memory to avoid blocking.
(sampling_tensors, do_penalties, do_top_p_top_k,
do_min_p) = SamplingTensors.from_sampling_metadata(
sampling_metadata, vocab_size, logits.device, logits.dtype)
# Apply presence and frequency penalties.
if do_penalties:
@@ -97,14 +100,15 @@ class Sampler(nn.Module):
def _get_logits(hidden_states: torch.Tensor, embedding: torch.Tensor,
embedding_bias: Optional[torch.Tensor],
vocab_size: int) -> torch.Tensor:
vocab_size: int) -> Optional[torch.Tensor]:
# Get the logits for the next tokens.
logits = torch.matmul(hidden_states, embedding.t())
if embedding_bias is not None:
logits += embedding_bias
logits = tensor_model_parallel_all_gather(logits)
logits = tensor_model_parallel_gather(logits)
# Remove paddings in vocab (if any).
logits = logits[:, :vocab_size]
if logits is not None:
logits = logits[:, :vocab_size]
return logits
@@ -117,27 +121,6 @@ def _prune_hidden_states(
sampling_metadata.selected_token_indices)
def _get_prompt_and_output_tokens(
sampling_metadata: SamplingMetadata,
) -> Tuple[List[List[int]], List[List[int]]]:
prompt_tokens: List[List[int]] = []
output_tokens: List[List[int]] = []
for i, seq_group in enumerate(sampling_metadata.seq_groups):
seq_ids, sampling_params = seq_group
if (i < sampling_metadata.num_prompts
and sampling_params.prompt_logprobs is not None):
# NOTE: prompt token positions do not need output tokens to
# compute penalties.
prompt_len = sampling_metadata.prompt_lens[i]
prompt_tokens.extend([] for _ in range(prompt_len - 1))
output_tokens.extend([] for _ in range(prompt_len - 1))
for seq_id in seq_ids:
seq_data = sampling_metadata.seq_data[seq_id]
prompt_tokens.append(seq_data.prompt_token_ids)
output_tokens.append(seq_data.output_token_ids)
return prompt_tokens, output_tokens
def _get_bin_counts_and_mask(
tokens: torch.Tensor,
vocab_size: int,

View File

@@ -17,6 +17,7 @@ _MODELS = {
"BloomForCausalLM": ("bloom", "BloomForCausalLM"),
"ChatGLMModel": ("chatglm", "ChatGLMForCausalLM"),
"ChatGLMForConditionalGeneration": ("chatglm", "ChatGLMForCausalLM"),
"DeciLMForCausalLM": ("decilm", "DeciLMForCausalLM"),
"FalconForCausalLM": ("falcon", "FalconForCausalLM"),
"GPT2LMHeadModel": ("gpt2", "GPT2LMHeadModel"),
"GPTBigCodeForCausalLM": ("gpt_bigcode", "GPTBigCodeForCausalLM"),

View File

@@ -298,7 +298,7 @@ class AquilaForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -313,7 +313,7 @@ class BaiChuanBaseForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -290,7 +290,7 @@ class BloomForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -349,7 +349,7 @@ class ChatGLMForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -0,0 +1,123 @@
# coding=utf-8
# Adapted from
# https://github.com/huggingface/transformers/blob/v4.28.0/src/transformers/models/llama/modeling_llama.py
# Copyright 2023 DeciAI Research Team. All rights reserved.
# Copyright 2023 The vLLM team.
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on MistralAI GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Inference-only DeciLM model compatible with HuggingFace weights."""
from typing import Optional
import torch
from transformers import PretrainedConfig
from vllm.model_executor.layers.linear import LinearMethodBase
from vllm.model_executor.models.llama import LlamaForCausalLM
from vllm.model_executor.weight_utils import (default_weight_loader,
hf_model_weights_iterator)
class DeciLMForCausalLM(LlamaForCausalLM):
"""
Implementation for https://huggingface.co/Deci/DeciLM-7b-instruct.
Based on the llama executor.
The main difference is that DeciLM uses Variable Grouped Query Attention.
The constant number of GQA heads in the decoder is overriden with a value
per layer.
Usually, in the HuggingFace implementation, instead of
"config.num_key_value_heads", we use
"config.num_key_value_heads_per_layer[i]" which varies.
Currently, PagedAttention does not work well with variable GQA, so we
normalize the weights upon loading, and use uniform GQA with the max value
instead.
"""
def __init__(
self,
config: Optional[PretrainedConfig] = None,
linear_method: Optional[LinearMethodBase] = None,
) -> None:
config.num_key_value_heads = max(config.num_key_value_heads_per_layer)
delattr(config, "num_key_value_heads_per_layer")
super().__init__(config=config, linear_method=linear_method)
def load_weights(self,
model_name_or_path: str,
cache_dir: Optional[str] = None,
load_format: str = "auto",
revision: Optional[str] = None):
stacked_params_mapping = [
# (param_name, shard_name, shard_id)
("qkv_proj", "q_proj", "q"),
("qkv_proj", "k_proj", "k"),
("qkv_proj", "v_proj", "v"),
("gate_up_proj", "gate_proj", 0),
("gate_up_proj", "up_proj", 1),
]
params_dict = dict(self.named_parameters())
for name, loaded_weight in hf_model_weights_iterator(
model_name_or_path, cache_dir, load_format, revision):
if "rotary_emb.inv_freq" in name:
continue
if "k_proj" in name or "v_proj" in name:
loaded_weight = self._degroup_weight(loaded_weight)
for (param_name, weight_name, shard_id) in stacked_params_mapping:
if weight_name not in name:
continue
name = name.replace(weight_name, param_name)
# Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict:
continue
param = params_dict[name]
weight_loader = param.weight_loader
weight_loader(param, loaded_weight, shard_id)
break
else:
# Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict:
continue
param = params_dict[name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
weight_loader(param, loaded_weight)
def _degroup_weight(self, loaded_weight: torch.Tensor) -> torch.Tensor:
hidden_size = self.config.hidden_size
head_size = self.config.hidden_size // self.config.num_attention_heads
target_num_kv_heads = self.config.num_key_value_heads
num_kv_heads = loaded_weight.shape[0] // head_size
n_repeats = target_num_kv_heads / num_kv_heads
assert n_repeats == int(n_repeats)
n_repeats = int(n_repeats)
loaded_weight = loaded_weight.view(num_kv_heads, head_size,
hidden_size)
loaded_weight = torch.repeat_interleave(loaded_weight,
repeats=n_repeats,
dim=0)
loaded_weight = loaded_weight.reshape(target_num_kv_heads * head_size,
hidden_size)
return loaded_weight

View File

@@ -394,7 +394,7 @@ class FalconForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -235,7 +235,7 @@ class GPT2LMHeadModel(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -254,7 +254,7 @@ class GPTBigCodeForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -240,7 +240,7 @@ class GPTJForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata, self.lm_head.bias)
return next_tokens

View File

@@ -54,6 +54,7 @@ class GPTNeoXAttention(nn.Module):
self.total_num_heads = config.num_attention_heads
self.hidden_size = config.hidden_size
self.head_size = self.hidden_size // self.total_num_heads
self.bias = getattr(config, "attention_bias", True)
tensor_model_parallel_world_size = (
get_tensor_model_parallel_world_size())
@@ -65,11 +66,13 @@ class GPTNeoXAttention(nn.Module):
config.hidden_size,
self.head_size,
self.total_num_heads,
bias=self.bias,
linear_method=linear_method,
)
self.dense = RowParallelLinear(
config.hidden_size,
config.hidden_size,
bias=self.bias,
linear_method=linear_method,
)
scaling = self.head_size**-0.5
@@ -252,7 +255,7 @@ class GPTNeoXForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.embed_out.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -255,7 +255,7 @@ class InternLMForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -291,7 +291,7 @@ class LlamaForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -287,7 +287,7 @@ class MistralForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -49,7 +49,6 @@ from vllm.model_executor.parallel_utils.parallel_state import (
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.model_executor.weight_utils import (default_weight_loader,
hf_model_weights_iterator)
from vllm.model_executor.utils import set_weight_attrs
from vllm.sequence import SamplerOutput
KVCache = Tuple[torch.Tensor, torch.Tensor]
@@ -94,30 +93,6 @@ class MixtralMLP(nn.Module):
return current_hidden_states
class DummyModule(nn.Module):
def __init__(self) -> None:
super().__init__()
self.w1 = nn.Linear(0, 0, bias=False)
self.w2 = nn.Linear(0, 0, bias=False)
self.w3 = nn.Linear(0, 0, bias=False)
set_weight_attrs(self.w1.weight,
{"weight_loader": self.dummy_weight_loader})
set_weight_attrs(self.w2.weight,
{"weight_loader": self.dummy_weight_loader})
set_weight_attrs(self.w3.weight,
{"weight_loader": self.dummy_weight_loader})
def forward(self, *args, **kwargs) -> None:
raise NotImplementedError()
def dummy_weight_loader(self, *args, **kwargs) -> None: # pylint: disable=unused-argument
# Noop
return
class MixtralMoE(nn.Module):
def __init__(
@@ -147,7 +122,7 @@ class MixtralMoE(nn.Module):
config.hidden_size,
config.intermediate_size,
linear_method=linear_method)
if idx in self.expert_indicies else DummyModule()
if idx in self.expert_indicies else None
for idx in range(self.num_total_experts)
])
self.gate = ReplicatedLinear(config.hidden_size,
@@ -345,7 +320,7 @@ class MixtralModel(nn.Module):
positions: torch.Tensor,
kv_caches: List[KVCache],
input_metadata: InputMetadata,
) -> SamplerOutput:
) -> torch.Tensor:
hidden_states = self.embed_tokens(input_ids)
residual = None
for i in range(len(self.layers)):
@@ -386,7 +361,7 @@ class MixtralForCausalLM(nn.Module):
self,
hidden_states: Optional[torch.Tensor],
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens
@@ -427,6 +402,10 @@ class MixtralForCausalLM(nn.Module):
# Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict:
continue
# Skip experts that are not assigned to this worker.
if ("block_sparse_moe.experts." in name
and name not in params_dict):
continue
param = params_dict[name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)

View File

@@ -276,7 +276,7 @@ class MPTForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -309,7 +309,7 @@ class OPTForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -280,7 +280,7 @@ class PhiForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
head = self.lm_head.linear
next_tokens = self.sampler(head.weight, hidden_states,
sampling_metadata, head.bias)

View File

@@ -247,7 +247,7 @@ class QWenLMHeadModel(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -286,7 +286,7 @@ class YiForCausalLM(nn.Module):
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata)
return next_tokens

View File

@@ -1,6 +1,7 @@
import torch
from vllm.model_executor.parallel_utils.parallel_state import (
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
get_tensor_model_parallel_group,
)
@@ -45,3 +46,61 @@ def tensor_model_parallel_all_gather(input_, dim=-1):
(world_size * input_size[dim], ) +
input_size[dim + 1:])
return output_tensor
def tensor_model_parallel_gather(input_, dst=0, dim=-1):
"""Gather the input tensor across model parallel group.
NOTE: We assume that the input tensor is on the same device across
all the ranks.
"""
world_size = get_tensor_model_parallel_world_size()
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input_
assert -input_.dim() <= dim < input_.dim(), (
f"Invalid dim ({dim}) for input tensor with shape {input_.size()}")
if dim < 0:
# Convert negative dim to positive.
dim += input_.dim()
# Allocate output tensor.
if get_tensor_model_parallel_rank() == dst:
gather_list = [torch.empty_like(input_) for _ in range(world_size)]
else:
gather_list = None
# Gather.
torch.distributed.gather(input_,
gather_list,
dst=dst,
group=get_tensor_model_parallel_group())
if get_tensor_model_parallel_rank() == dst:
output_tensor = torch.cat(gather_list, dim=dim)
else:
output_tensor = None
return output_tensor
def broadcast(input_, src=0):
"""Broadcast the input tensor."""
world_size = torch.distributed.get_world_size()
assert 0 <= src < world_size, f"Invalid src rank ({src})"
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input_
# Broadcast.
torch.distributed.broadcast(input_, src=src)
return input_
def broadcast_object_list(obj_list, src=0):
"""Broadcast the input object list."""
world_size = torch.distributed.get_world_size()
assert 0 <= src < world_size, f"Invalid src rank ({src})"
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return obj_list
# Broadcast.
torch.distributed.broadcast_object_list(obj_list, src=src)
return obj_list

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple
import torch
@@ -18,24 +18,29 @@ class SamplingMetadata:
seq_data: Seq_id -> SequenceData.
prompt_lens: Lengths of prompts.
selected_token_indices: Token indices selected for sampling.
categorized_sample_indices: SamplingType -> token indicies to sample.
categorized_sample_indices: SamplingType -> token indices to sample.
perform_sampling: Whether to perform sampling. This option is used to
make the sampling only happens in the driver worker, and disable
sampling in other worker processes.
"""
def __init__(
self,
seq_groups: List[Tuple[List[int], SamplingParams]],
seq_data: Dict[int, SequenceData],
prompt_lens: List[int],
seq_groups: Optional[List[Tuple[List[int], SamplingParams]]],
seq_data: Optional[Dict[int, SequenceData]],
prompt_lens: Optional[List[int]],
selected_token_indices: torch.Tensor,
categorized_sample_indices: Dict[SamplingType, torch.Tensor],
categorized_sample_indices: Optional[Dict[SamplingType, torch.Tensor]],
perform_sampling: bool = True,
) -> None:
self.seq_groups = seq_groups
self.seq_data = seq_data
self.prompt_lens = prompt_lens
self.selected_token_indices = selected_token_indices
self.categorized_sample_indices = categorized_sample_indices
self.perform_sampling = perform_sampling
self.num_prompts = len(prompt_lens)
self.num_prompts = len(prompt_lens) if prompt_lens is not None else 0
def __repr__(self) -> str:
return (
@@ -44,7 +49,8 @@ class SamplingMetadata:
f"seq_data={self.seq_data}, "
f"prompt_lens={self.prompt_lens}, "
f"selected_token_indices={self.selected_token_indices}, "
f"categorized_sample_indices={self.categorized_sample_indices})")
f"categorized_sample_indices={self.categorized_sample_indices}), "
f"perform_sampling={self.perform_sampling})")
@dataclass

View File

@@ -100,7 +100,7 @@ class SamplingParams:
temperature: float = 1.0,
top_p: float = 1.0,
top_k: int = -1,
min_p: int = 0.0,
min_p: float = 0.0,
use_beam_search: bool = False,
length_penalty: float = 1.0,
early_stopping: Union[bool, str] = False,

View File

@@ -1,7 +1,9 @@
import enum
import os
import socket
import uuid
from platform import uname
from typing import List
import psutil
import torch
@@ -55,7 +57,15 @@ def in_wsl() -> bool:
return "microsoft" in " ".join(uname()).lower()
def get_open_port():
def get_ip() -> str:
return socket.gethostbyname(socket.gethostname())
def get_open_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
def set_cuda_visible_devices(device_ids: List[int]) -> None:
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, device_ids))

View File

@@ -1,5 +1,5 @@
import time
from typing import Dict, List, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import torch
@@ -8,8 +8,11 @@ import torch.nn as nn
from vllm.config import ModelConfig, ParallelConfig, SchedulerConfig
from vllm.logger import init_logger
from vllm.model_executor import get_model, InputMetadata, SamplingMetadata
from vllm.model_executor.parallel_utils.communication_op import (
broadcast, broadcast_object_list)
from vllm.sampling_params import SamplingParams, SamplingType
from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata
from vllm.utils import in_wsl
logger = init_logger(__name__)
@@ -27,10 +30,12 @@ class ModelRunner:
model_config: ModelConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
is_driver_worker: bool = False,
):
self.model_config = model_config
self.parallel_config = parallel_config
self.scheduler_config = scheduler_config
self.is_driver_worker = is_driver_worker
# model_config can be None in tests/samplers/test_sampler.py.
# FIXME(woosuk): This is a hack to make the tests work. Refactor this.
@@ -52,6 +57,8 @@ class ModelRunner:
# The shape of the cached block table will be
# (max batch size to capture, max context len to capture / block size).
self.graph_block_tables = None # Set after initial profiling.
# cache in_wsl result
self.in_wsl = in_wsl()
def load_model(self) -> None:
self.model = get_model(self.model_config)
@@ -67,7 +74,7 @@ class ModelRunner:
def _prepare_prompt(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata]:
) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata, List[int]]:
assert len(seq_group_metadata_list) > 0
input_tokens: List[List[int]] = []
input_positions: List[List[int]] = []
@@ -132,14 +139,14 @@ class ModelRunner:
dtype=torch.long)
input_metadata = InputMetadata(
prompt_lens=prompt_lens,
is_prompt=True,
slot_mapping=slot_mapping,
max_context_len=None,
context_lens=None,
block_tables=None,
use_cuda_graph=False,
)
return input_tokens, input_positions, input_metadata
return input_tokens, input_positions, input_metadata, prompt_lens
def _prepare_decode(
self,
@@ -200,27 +207,24 @@ class ModelRunner:
block_tables.append([])
batch_size = graph_batch_size
# When using CUDA graph, we don't need to make the tensors on the GPU
# because they will be eventually copied to the designated GPU buffer.
device = "cpu" if use_captured_graph else "cuda"
input_tokens = _make_tensor_with_pad(input_tokens,
max_len=1,
pad=0,
dtype=torch.long,
device=device)
device="cuda")
input_positions = _make_tensor_with_pad(input_positions,
max_len=1,
pad=0,
dtype=torch.long,
device=device)
device="cuda")
slot_mapping = _make_tensor_with_pad(slot_mapping,
max_len=1,
pad=_PAD_SLOT_ID,
dtype=torch.long,
device=device)
device="cuda")
context_lens = torch.tensor(context_lens,
dtype=torch.int,
device=device)
device="cuda")
if use_captured_graph:
# The shape of graph_block_tables is
@@ -229,17 +233,18 @@ class ModelRunner:
for i, block_table in enumerate(block_tables):
if block_table:
input_block_tables[i, :len(block_table)] = block_table
block_tables = torch.from_numpy(input_block_tables).to(device)
block_tables = torch.tensor(input_block_tables, device="cuda")
else:
block_tables = _make_tensor_with_pad(
block_tables,
max_len=max_context_len,
pad=0,
dtype=torch.int,
device="cuda",
)
input_metadata = InputMetadata(
prompt_lens=[],
is_prompt=False,
slot_mapping=slot_mapping,
max_context_len=max_context_len,
context_lens=context_lens,
@@ -297,11 +302,11 @@ class ModelRunner:
categorized_sample_indices_start_idx + num_seqs))
categorized_sample_indices_start_idx += num_seqs
selected_token_indices = torch.tensor(selected_token_indices,
dtype=torch.long,
device="cuda")
selected_token_indices = _async_h2d(selected_token_indices,
dtype=torch.long,
pin_memory=not self.in_wsl)
categorized_sample_indices = {
t: torch.tensor(seq_ids, dtype=torch.int, device="cuda")
t: _async_h2d(seq_ids, dtype=torch.int, pin_memory=not self.in_wsl)
for t, seq_ids in categorized_sample_indices.items()
}
@@ -318,25 +323,127 @@ class ModelRunner:
)
return sampling_metadata
def prepare_input_tensors(
self,
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata, SamplingMetadata]:
if self.is_driver_worker:
# NOTE: We assume that all sequences in the group are all prompts or
# all decodes.
is_prompt = seq_group_metadata_list[0].is_prompt
# Prepare input tensors.
if is_prompt:
(input_tokens, input_positions, input_metadata,
prompt_lens) = self._prepare_prompt(seq_group_metadata_list)
else:
(input_tokens, input_positions, input_metadata
) = self._prepare_decode(seq_group_metadata_list)
prompt_lens = []
sampling_metadata = self._prepare_sample(seq_group_metadata_list,
prompt_lens)
def get_size_or_none(x: Optional[torch.Tensor]):
return x.size() if x is not None else None
# Broadcast the input data. For input tensors, we first broadcast
# its shape and then broadcast the tensor to avoid high
# serialization cost.
py_data = {
"input_tokens_size":
input_tokens.size(),
"input_positions_size":
input_positions.size(),
"is_prompt":
input_metadata.is_prompt,
"slot_mapping_size":
get_size_or_none(input_metadata.slot_mapping),
"max_context_len":
input_metadata.max_context_len,
"context_lens_size":
get_size_or_none(input_metadata.context_lens),
"block_tables_size":
get_size_or_none(input_metadata.block_tables),
"use_cuda_graph":
input_metadata.use_cuda_graph,
"selected_token_indices_size":
sampling_metadata.selected_token_indices.size(),
}
broadcast_object_list([py_data], src=0)
# TODO(zhuohan): Combine the broadcasts or set async_op=True.
broadcast(input_tokens, src=0)
broadcast(input_positions, src=0)
if input_metadata.slot_mapping is not None:
broadcast(input_metadata.slot_mapping, src=0)
if input_metadata.context_lens is not None:
broadcast(input_metadata.context_lens, src=0)
if input_metadata.block_tables is not None:
broadcast(input_metadata.block_tables, src=0)
broadcast(sampling_metadata.selected_token_indices, src=0)
else:
receving_list = [None]
broadcast_object_list(receving_list, src=0)
py_data = receving_list[0]
input_tokens = torch.empty(*py_data["input_tokens_size"],
dtype=torch.long,
device="cuda")
broadcast(input_tokens, src=0)
input_positions = torch.empty(*py_data["input_positions_size"],
dtype=torch.long,
device="cuda")
broadcast(input_positions, src=0)
if py_data["slot_mapping_size"] is not None:
slot_mapping = torch.empty(*py_data["slot_mapping_size"],
dtype=torch.long,
device="cuda")
broadcast(slot_mapping, src=0)
else:
slot_mapping = None
if py_data["context_lens_size"] is not None:
context_lens = torch.empty(*py_data["context_lens_size"],
dtype=torch.int,
device="cuda")
broadcast(context_lens, src=0)
else:
context_lens = None
if py_data["block_tables_size"] is not None:
block_tables = torch.empty(*py_data["block_tables_size"],
dtype=torch.int,
device="cuda")
broadcast(block_tables, src=0)
else:
block_tables = None
selected_token_indices = torch.empty(
*py_data["selected_token_indices_size"],
dtype=torch.long,
device="cuda")
broadcast(selected_token_indices, src=0)
input_metadata = InputMetadata(
is_prompt=py_data["is_prompt"],
slot_mapping=slot_mapping,
max_context_len=py_data["max_context_len"],
context_lens=context_lens,
block_tables=block_tables,
use_cuda_graph=py_data["use_cuda_graph"],
)
sampling_metadata = SamplingMetadata(
seq_groups=None,
seq_data=None,
prompt_lens=None,
selected_token_indices=selected_token_indices,
categorized_sample_indices=None,
perform_sampling=False,
)
return input_tokens, input_positions, input_metadata, sampling_metadata
@torch.inference_mode()
def execute_model(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
) -> SamplerOutput:
# NOTE: We assume that all sequences in the group are all prompts or
# all decodes.
is_prompt = seq_group_metadata_list[0].is_prompt
# Prepare input tensors.
if is_prompt:
inputs = self._prepare_prompt(seq_group_metadata_list)
input_tokens, input_positions, input_metadata = inputs
else:
inputs = self._prepare_decode(seq_group_metadata_list)
input_tokens, input_positions, input_metadata = inputs
sampling_metadata = self._prepare_sample(seq_group_metadata_list,
input_metadata.prompt_lens)
) -> Optional[SamplerOutput]:
input_tokens, input_positions, input_metadata, sampling_metadata = (
self.prepare_input_tensors(seq_group_metadata_list))
# Execute the model.
if input_metadata.use_cuda_graph:
graph_batch_size = input_tokens.shape[0]
@@ -395,6 +502,9 @@ class ModelRunner:
"unexpected consequences if the model is not static. To "
"run the model in eager mode, set 'enforce_eager=True' or "
"use '--enforce-eager' in the CLI.")
logger.info("CUDA graphs can take additional 1~3 GiB memory per GPU. "
"If you are running out of memory, consider decreasing "
"`gpu_memory_utilization` or enforcing eager mode.")
start_time = time.perf_counter()
# Prepare dummy inputs. These will be reused for all batch sizes.
@@ -412,7 +522,7 @@ class ModelRunner:
for batch_size in reversed(_BATCH_SIZES_TO_CAPTURE):
# Create dummy input_metadata.
input_metadata = InputMetadata(
prompt_lens=[],
is_prompt=False,
slot_mapping=slot_mapping[:batch_size],
max_context_len=self.max_context_len_to_capture,
context_lens=context_lens[:batch_size],
@@ -499,11 +609,14 @@ class CUDAGraphRunner:
del kv_caches
# Copy the input tensors to the input buffers.
self.input_buffers["input_ids"].copy_(input_ids)
self.input_buffers["positions"].copy_(positions)
self.input_buffers["slot_mapping"].copy_(input_metadata.slot_mapping)
self.input_buffers["context_lens"].copy_(input_metadata.context_lens)
self.input_buffers["block_tables"].copy_(input_metadata.block_tables)
self.input_buffers["input_ids"].copy_(input_ids, non_blocking=True)
self.input_buffers["positions"].copy_(positions, non_blocking=True)
self.input_buffers["slot_mapping"].copy_(input_metadata.slot_mapping,
non_blocking=True)
self.input_buffers["context_lens"].copy_(input_metadata.context_lens,
non_blocking=True)
self.input_buffers["block_tables"].copy_(input_metadata.block_tables,
non_blocking=True)
# Run the graph.
self.graph.replay()
@@ -526,9 +639,13 @@ def _make_tensor_with_pad(
pad: int,
dtype: torch.dtype,
device: Union[str, torch.device] = "cuda",
pin_memory: bool = False,
) -> torch.Tensor:
padded_x = [_pad_to_max(x_i, max_len, pad) for x_i in x]
return torch.tensor(padded_x, dtype=dtype, device=device)
return torch.tensor(padded_x,
dtype=dtype,
device=device,
pin_memory=pin_memory and str(device) == "cpu")
def _get_graph_batch_size(batch_size: int) -> int:
@@ -538,3 +655,8 @@ def _get_graph_batch_size(batch_size: int) -> int:
return 4
else:
return (batch_size + 7) // 8 * 8
def _async_h2d(data: list, dtype, pin_memory):
t = torch.tensor(data, dtype=dtype, pin_memory=pin_memory)
return t.to(device="cuda", non_blocking=True)

View File

@@ -8,6 +8,8 @@ import torch.distributed
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
SchedulerConfig)
from vllm.model_executor import set_random_seed
from vllm.model_executor.parallel_utils.communication_op import (
broadcast_object_list)
from vllm.model_executor.parallel_utils.parallel_state import (
initialize_model_parallel)
from vllm.sequence import SamplerOutput, SequenceGroupMetadata
@@ -28,17 +30,23 @@ class Worker:
model_config: ModelConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
rank: Optional[int] = None,
distributed_init_method: Optional[str] = None,
local_rank: int,
rank: int,
distributed_init_method: str,
is_driver_worker: bool = False,
) -> None:
self.model_config = model_config
self.parallel_config = parallel_config
self.scheduler_config = scheduler_config
self.local_rank = local_rank
self.rank = rank
self.distributed_init_method = distributed_init_method
self.is_driver_worker = is_driver_worker
if self.is_driver_worker:
assert self.rank == 0, "The driver worker must have rank 0."
self.model_runner = ModelRunner(model_config, parallel_config,
scheduler_config)
scheduler_config, is_driver_worker)
# Uninitialized cache engine. Will be initialized by
# self.init_cache_engine().
self.cache_config = None
@@ -57,13 +65,7 @@ class Worker:
# This env var set by Ray causes exceptions with graph building.
os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)
# Env vars will be set by Ray.
self.rank = self.rank if self.rank is not None else int(
os.getenv("RANK", "-1"))
local_rank = int(os.getenv("LOCAL_RANK", "0"))
self.device = torch.device(f"cuda:{local_rank}")
if self.rank < 0:
raise ValueError("Invalid or unspecified rank.")
self.device = torch.device(f"cuda:{self.local_rank}")
torch.cuda.set_device(self.device)
_check_if_gpu_supports_dtype(self.model_config.dtype)
@@ -125,14 +127,12 @@ class Worker:
# the model initialization and profiling.
set_random_seed(self.model_config.seed)
@torch.inference_mode()
def execute_model(
def cache_swap(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
blocks_to_swap_in: Dict[int, int],
blocks_to_swap_out: Dict[int, int],
blocks_to_copy: Dict[int, List[int]],
) -> SamplerOutput:
) -> None:
# Issue cache operations.
issued_cache_op = False
if blocks_to_swap_in:
@@ -152,8 +152,38 @@ class Worker:
if cache_events is not None:
for event in cache_events:
event.wait()
@torch.inference_mode()
def execute_model(
self,
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None,
blocks_to_swap_in: Optional[Dict[int, int]] = None,
blocks_to_swap_out: Optional[Dict[int, int]] = None,
blocks_to_copy: Optional[Dict[int, List[int]]] = None,
) -> Optional[SamplerOutput]:
if self.is_driver_worker:
assert seq_group_metadata_list is not None
num_seq_groups = len(seq_group_metadata_list)
assert blocks_to_swap_in is not None
assert blocks_to_swap_out is not None
assert blocks_to_copy is not None
block_swapping_info = [
blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy
]
broadcast_object_list([num_seq_groups] + block_swapping_info,
src=0)
else:
# num_seq_groups, blocks_to_swap_in, blocks_to_swap_out,
# blocks_to_copy (4 elements)
recv_data = [None] * 4
broadcast_object_list(recv_data, src=0)
num_seq_groups = recv_data[0]
block_swapping_info = recv_data[1:]
self.cache_swap(*block_swapping_info)
# If there is no input, we don't need to execute the model.
if not seq_group_metadata_list:
if num_seq_groups == 0:
return {}
output = self.model_runner.execute_model(seq_group_metadata_list,