Compare commits

..

30 Commits

Author SHA1 Message Date
Woosuk Kwon
2e0b6e7757 Bump up to v0.2.7 (#2337)
Some checks failed
Create Release / Create Release (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.10, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.11, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.8, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (11.8, ubuntu-20.04, 3.9, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.10, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.11, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.8, 2.1.2) (push) Has been cancelled
Create Release / Build Wheel (12.1, ubuntu-20.04, 3.9, 2.1.2) (push) Has been cancelled
2024-01-03 17:35:56 -08:00
Woosuk Kwon
941767127c Revert the changes in test_cache (#2335) 2024-01-03 17:32:05 -08:00
Ronen Schaffer
74d8d77626 Remove unused const TIMEOUT_TO_PREVENT_DEADLOCK (#2321) 2024-01-03 15:49:07 -08:00
Zhuohan Li
fd4ea8ef5c Use NCCL instead of ray for control-plane communication to remove serialization overhead (#2221) 2024-01-03 11:30:22 -08:00
Ronen Schaffer
1066cbd152 Remove deprecated parameter: concurrency_count (#2315) 2024-01-03 09:56:21 -08:00
Woosuk Kwon
6ef00b03a2 Enable CUDA graph for GPTQ & SqueezeLLM (#2318) 2024-01-03 09:52:29 -08:00
Roy
9140561059 [Minor] Fix typo and remove unused code (#2305) 2024-01-02 19:23:15 -08:00
Jee Li
77af974b40 [FIX] Support non-zero CUDA devices in custom kernels (#1959) 2024-01-02 19:09:59 -08:00
Jong-hun Shin
4934d49274 Support GPT-NeoX Models without attention biases (#2301) 2023-12-30 11:42:04 -05:00
Zhuohan Li
358c328d69 [BUGFIX] Fix communication test (#2285) 2023-12-27 17:18:11 -05:00
Zhuohan Li
4aaafdd289 [BUGFIX] Fix the path of test prompts (#2273) 2023-12-26 10:37:21 -08:00
Zhuohan Li
66b108d142 [BUGFIX] Fix API server test (#2270) 2023-12-26 10:37:06 -08:00
Zhuohan Li
e0ff920001 [BUGFIX] Do not return ignored sentences twice in async llm engine (#2258) 2023-12-26 13:41:09 +08:00
blueceiling
face83c7ec [Docs] Add "About" Heading to README.md (#2260) 2023-12-25 16:37:07 -08:00
Shivam Thakkar
1db83e31a2 [Docs] Update installation instructions to include CUDA 11.8 xFormers (#2246) 2023-12-22 23:20:02 -08:00
Woosuk Kwon
a1b9cb2a34 [BugFix] Fix recovery logic for sequence group (#2186) 2023-12-20 21:52:37 -08:00
Woosuk Kwon
3a4fd5ca59 Disable Ray usage stats collection (#2206) 2023-12-20 21:52:08 -08:00
Ronen Schaffer
c17daa9f89 [Docs] Fix broken links (#2222) 2023-12-20 12:43:42 -08:00
Antoni Baum
bd29cf3d3a Remove Sampler copy stream (#2209) 2023-12-20 00:04:33 -08:00
Hanzhi Zhou
31bff69151 Make _prepare_sample non-blocking and use pinned memory for input buffers (#2207) 2023-12-19 16:52:46 -08:00
Woosuk Kwon
ba4f826738 [BugFix] Fix weight loading for Mixtral with TP (#2208) 2023-12-19 16:16:11 -08:00
avideci
de60a3fb93 Added DeciLM-7b and DeciLM-7b-instruct (#2062) 2023-12-19 02:29:33 -08:00
Woosuk Kwon
21d5daa4ac Add warning on CUDA graph memory usage (#2182) 2023-12-18 18:16:17 -08:00
Suhong Moon
290e015c6c Update Help Text for --gpu-memory-utilization Argument (#2183) 2023-12-18 11:33:24 -08:00
kliuae
1b7c791d60 [ROCm] Fixes for GPTQ on ROCm (#2180) 2023-12-18 10:41:04 -08:00
JohnSaxon
bbe4466fd9 [Minor] Fix typo (#2166)
Co-authored-by: John-Saxon <zhang.xiangxuan@oushu.com>
2023-12-17 23:28:49 -08:00
Harry Mellor
08133c4d1a Add SSL arguments to API servers (#2109) 2023-12-18 10:56:23 +08:00
Woosuk Kwon
76a7983b23 [BugFix] Fix RoPE kernel on long sequences(#2164) 2023-12-17 17:09:10 -08:00
Woosuk Kwon
8041b7305e [BugFix] Raise error when max_model_len is larger than KV cache (#2163) 2023-12-17 17:08:23 -08:00
Suhong Moon
3ec8c25cd0 [Docs] Update documentation for gpu-memory-utilization option (#2162) 2023-12-17 10:51:57 -08:00
67 changed files with 876 additions and 428 deletions

View File

@@ -27,7 +27,7 @@ Easy, fast, and cheap LLM serving for everyone
- [2023/06] We officially released vLLM! FastChat-vLLM integration has powered [LMSYS Vicuna and Chatbot Arena](https://chat.lmsys.org) since mid-April. Check out our [blog post](https://vllm.ai). - [2023/06] We officially released vLLM! FastChat-vLLM integration has powered [LMSYS Vicuna and Chatbot Arena](https://chat.lmsys.org) since mid-April. Check out our [blog post](https://vllm.ai).
--- ---
## About
vLLM is a fast and easy-to-use library for LLM inference and serving. vLLM is a fast and easy-to-use library for LLM inference and serving.
vLLM is fast with: vLLM is fast with:
@@ -54,6 +54,7 @@ vLLM seamlessly supports many Hugging Face models, including the following archi
- Baichuan & Baichuan2 (`baichuan-inc/Baichuan2-13B-Chat`, `baichuan-inc/Baichuan-7B`, etc.) - Baichuan & Baichuan2 (`baichuan-inc/Baichuan2-13B-Chat`, `baichuan-inc/Baichuan-7B`, etc.)
- BLOOM (`bigscience/bloom`, `bigscience/bloomz`, etc.) - BLOOM (`bigscience/bloom`, `bigscience/bloomz`, etc.)
- ChatGLM (`THUDM/chatglm2-6b`, `THUDM/chatglm3-6b`, etc.) - ChatGLM (`THUDM/chatglm2-6b`, `THUDM/chatglm3-6b`, etc.)
- DeciLM (`Deci/DeciLM-7B`, `Deci/DeciLM-7B-instruct`, etc.)
- Falcon (`tiiuae/falcon-7b`, `tiiuae/falcon-40b`, `tiiuae/falcon-rw-7b`, etc.) - Falcon (`tiiuae/falcon-7b`, `tiiuae/falcon-40b`, `tiiuae/falcon-rw-7b`, etc.)
- GPT-2 (`gpt2`, `gpt2-xl`, etc.) - GPT-2 (`gpt2`, `gpt2-xl`, etc.)
- GPT BigCode (`bigcode/starcoder`, `bigcode/gpt_bigcode-santacoder`, etc.) - GPT BigCode (`bigcode/starcoder`, `bigcode/gpt_bigcode-santacoder`, etc.)

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAContext.h>
#include <torch/extension.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h" #include "cuda_compat.h"
#include "dispatch_utils.h" #include "dispatch_utils.h"
@@ -36,6 +37,7 @@ void silu_and_mul(
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(d, 1024)); dim3 block(std::min(d, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(), input.scalar_type(),
@@ -71,6 +73,7 @@ __global__ void activation_kernel(
int64_t num_tokens = input.numel() / d; \ int64_t num_tokens = input.numel() / d; \
dim3 grid(num_tokens); \ dim3 grid(num_tokens); \
dim3 block(std::min(d, 1024)); \ dim3 block(std::min(d, 1024)); \
const at::cuda::OptionalCUDAGuard device_guard(device_of(input)); \
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); \ const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); \
VLLM_DISPATCH_FLOATING_TYPES( \ VLLM_DISPATCH_FLOATING_TYPES( \
input.scalar_type(), \ input.scalar_type(), \

View File

@@ -21,6 +21,7 @@
#include <torch/extension.h> #include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "attention_dtypes.h" #include "attention_dtypes.h"
#include "attention_utils.cuh" #include "attention_utils.cuh"
@@ -616,6 +617,7 @@ void paged_attention_v1_launcher(
dim3 grid(num_heads, num_seqs, 1); dim3 grid(num_heads, num_seqs, 1);
dim3 block(NUM_THREADS); dim3 block(NUM_THREADS);
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
switch (head_size) { switch (head_size) {
// NOTE(woosuk): To reduce the compilation time, we only compile for the // NOTE(woosuk): To reduce the compilation time, we only compile for the
@@ -784,6 +786,7 @@ void paged_attention_v2_launcher(
int reduce_shared_mem_size = 2 * max_num_partitions * sizeof(float); int reduce_shared_mem_size = 2 * max_num_partitions * sizeof(float);
dim3 block(NUM_THREADS); dim3 block(NUM_THREADS);
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
switch (head_size) { switch (head_size) {
// NOTE(woosuk): To reduce the compilation time, we only compile for the // NOTE(woosuk): To reduce the compilation time, we only compile for the

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h> #include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h" #include "cuda_compat.h"
#include "dispatch_utils.h" #include "dispatch_utils.h"
@@ -33,6 +34,7 @@ void swap_blocks(
char *dst_ptr = static_cast<char*>(dst.data_ptr()); char *dst_ptr = static_cast<char*>(dst.data_ptr());
const int64_t block_size_in_bytes = src.element_size() * src[0].numel(); const int64_t block_size_in_bytes = src.element_size() * src[0].numel();
const at::cuda::OptionalCUDAGuard device_guard(src_device);
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
// NOTE(woosuk): This can be slow if the number of blocks is large. // NOTE(woosuk): This can be slow if the number of blocks is large.
for (const auto& pair : block_mapping) { for (const auto& pair : block_mapping) {
@@ -127,6 +129,7 @@ void copy_blocks(
const int numel_per_block = key_caches[0][0].numel(); const int numel_per_block = key_caches[0][0].numel();
dim3 grid(num_layers, num_pairs); dim3 grid(num_layers, num_pairs);
dim3 block(std::min(1024, numel_per_block)); dim3 block(std::min(1024, numel_per_block));
const at::cuda::OptionalCUDAGuard device_guard(cache_device);
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
key_caches[0].scalar_type(), "copy_blocks_kernel", ([&] { key_caches[0].scalar_type(), "copy_blocks_kernel", ([&] {
@@ -207,6 +210,7 @@ void reshape_and_cache(
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(num_heads * head_size, 512)); dim3 block(std::min(num_heads * head_size, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
key.scalar_type(), key.scalar_type(),
@@ -367,6 +371,7 @@ void gather_cached_kv(
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(num_heads * head_size, 512)); dim3 block(std::min(num_heads * head_size, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
key.scalar_type(), key.scalar_type(),

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h> #include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "dispatch_utils.h" #include "dispatch_utils.h"
#include "reduction_utils.cuh" #include "reduction_utils.cuh"
@@ -76,6 +77,7 @@ void rms_norm(
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(hidden_size, 1024)); dim3 block(std::min(hidden_size, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(), input.scalar_type(),
@@ -101,6 +103,7 @@ void fused_add_rms_norm(
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(hidden_size, 1024)); dim3 block(std::min(hidden_size, 1024));
const at::cuda::OptionalCUDAGuard device_guard(device_of(input));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
input.scalar_type(), input.scalar_type(),

View File

@@ -1,5 +1,6 @@
#include <torch/extension.h> #include <torch/extension.h>
#include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include "cuda_compat.h" #include "cuda_compat.h"
#include "dispatch_utils.h" #include "dispatch_utils.h"
@@ -43,8 +44,8 @@ __global__ void rotary_embedding_kernel(
scalar_t* __restrict__ key, // [batch_size, seq_len, num_kv_heads, head_size] or [num_tokens, num_kv_heads, head_size] scalar_t* __restrict__ key, // [batch_size, seq_len, num_kv_heads, head_size] or [num_tokens, num_kv_heads, head_size]
const scalar_t* __restrict__ cos_sin_cache, // [max_position, 2, rot_dim // 2] const scalar_t* __restrict__ cos_sin_cache, // [max_position, 2, rot_dim // 2]
const int rot_dim, const int rot_dim,
const int query_stride, const int64_t query_stride,
const int key_stride, const int64_t key_stride,
const int num_heads, const int num_heads,
const int num_kv_heads, const int num_kv_heads,
const int head_size) { const int head_size) {
@@ -60,7 +61,7 @@ __global__ void rotary_embedding_kernel(
const int nq = num_heads * embed_dim; const int nq = num_heads * embed_dim;
for (int i = threadIdx.x; i < nq; i += blockDim.x) { for (int i = threadIdx.x; i < nq; i += blockDim.x) {
const int head_idx = i / embed_dim; const int head_idx = i / embed_dim;
const int token_head = token_idx * query_stride + head_idx * head_size; const int64_t token_head = token_idx * query_stride + head_idx * head_size;
const int rot_offset = i % embed_dim; const int rot_offset = i % embed_dim;
apply_rotary_embedding<scalar_t, IS_NEOX>(query + token_head, cos_ptr, apply_rotary_embedding<scalar_t, IS_NEOX>(query + token_head, cos_ptr,
sin_ptr, rot_offset, embed_dim); sin_ptr, rot_offset, embed_dim);
@@ -69,7 +70,7 @@ __global__ void rotary_embedding_kernel(
const int nk = num_kv_heads * embed_dim; const int nk = num_kv_heads * embed_dim;
for (int i = threadIdx.x; i < nk; i += blockDim.x) { for (int i = threadIdx.x; i < nk; i += blockDim.x) {
const int head_idx = i / embed_dim; const int head_idx = i / embed_dim;
const int token_head = token_idx * key_stride + head_idx * head_size; const int64_t token_head = token_idx * key_stride + head_idx * head_size;
const int rot_offset = i % embed_dim; const int rot_offset = i % embed_dim;
apply_rotary_embedding<scalar_t, IS_NEOX>(key + token_head, cos_ptr, apply_rotary_embedding<scalar_t, IS_NEOX>(key + token_head, cos_ptr,
sin_ptr, rot_offset, embed_dim); sin_ptr, rot_offset, embed_dim);
@@ -89,11 +90,12 @@ void rotary_embedding(
int rot_dim = cos_sin_cache.size(1); int rot_dim = cos_sin_cache.size(1);
int num_heads = query.size(-1) / head_size; int num_heads = query.size(-1) / head_size;
int num_kv_heads = key.size(-1) / head_size; int num_kv_heads = key.size(-1) / head_size;
int query_stride = query.stride(-2); int64_t query_stride = query.stride(-2);
int key_stride = key.stride(-2); int64_t key_stride = key.stride(-2);
dim3 grid(num_tokens); dim3 grid(num_tokens);
dim3 block(std::min(num_heads * rot_dim / 2, 512)); dim3 block(std::min(num_heads * rot_dim / 2, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(query));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream(); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
VLLM_DISPATCH_FLOATING_TYPES( VLLM_DISPATCH_FLOATING_TYPES(
query.scalar_type(), query.scalar_type(),

View File

@@ -28,6 +28,7 @@ namespace gptq {
#define DIVIDE(x, size) (((x) + (size) - 1) / (size)) #define DIVIDE(x, size) (((x) + (size) - 1) / (size))
#if defined(USE_ROCM) #if defined(USE_ROCM)
#include <hipblas/hipblas.h>
__host__ __forceinline__ hipblasStatus_t __compat_hipblasHgemm(hipblasHandle_t handle, __host__ __forceinline__ hipblasStatus_t __compat_hipblasHgemm(hipblasHandle_t handle,
hipblasOperation_t transA, hipblasOperation_t transA,
hipblasOperation_t transB, hipblasOperation_t transB,
@@ -286,7 +287,8 @@ void gemm_half_q_half_cuda_part
fp_gemm_half_q_half_gptq_kernel kernel = pick_gemm_half_q_half_gptq_kernel(true, m_count); fp_gemm_half_q_half_gptq_kernel kernel = pick_gemm_half_q_half_gptq_kernel(true, m_count);
kernel<<<gridDim, blockDim>>> const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
kernel<<<gridDim, blockDim, 0, stream>>>
( (
a, a,
b_q_weight, b_q_weight,
@@ -433,7 +435,8 @@ void reconstruct_exllama
gridDim.y = DIVIDE(height, BLOCK_KN_SIZE); gridDim.y = DIVIDE(height, BLOCK_KN_SIZE);
gridDim.x = DIVIDE(width, BLOCK_KN_SIZE); gridDim.x = DIVIDE(width, BLOCK_KN_SIZE);
reconstruct_exllama_kernel<<<gridDim, blockDim>>> const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
reconstruct_exllama_kernel<<<gridDim, blockDim, 0, stream>>>
( (
b_q_weight, b_q_weight,
b_q_perm, b_q_perm,
@@ -520,12 +523,21 @@ __global__ void gemm_half_q_half_alt_kernel(
zeros_tmp[tmp_k] = zero; zeros_tmp[tmp_k] = zero;
} }
for (int m = 0; m < b_end; m++) { for (int m = 0; m < b_end; m++) {
#ifndef USE_ROCM
res2 = {}; res2 = {};
#else
res2.x = __half_as_ushort(__float2half(0));
res2.y = __half_as_ushort(__float2half(0));
#endif
res2 = __hfma2(__hfma2(deq2[(tmp >> 0) & 0xff][off], scales_tmp[0], zeros_tmp[0]), blockvec[m][k + 0], res2); res2 = __hfma2(__hfma2(deq2[(tmp >> 0) & 0xff][off], scales_tmp[0], zeros_tmp[0]), blockvec[m][k + 0], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 8) & 0xff][off], scales_tmp[1], zeros_tmp[1]), blockvec[m][k + 1], res2); res2 = __hfma2(__hfma2(deq2[(tmp >> 8) & 0xff][off], scales_tmp[1], zeros_tmp[1]), blockvec[m][k + 1], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 16) & 0xff][off], scales_tmp[2], zeros_tmp[2]), blockvec[m][k + 2], res2); res2 = __hfma2(__hfma2(deq2[(tmp >> 16) & 0xff][off], scales_tmp[2], zeros_tmp[2]), blockvec[m][k + 2], res2);
res2 = __hfma2(__hfma2(deq2[(tmp >> 24) & 0xff][off], scales_tmp[3], zeros_tmp[3]), blockvec[m][k + 3], res2); res2 = __hfma2(__hfma2(deq2[(tmp >> 24) & 0xff][off], scales_tmp[3], zeros_tmp[3]), blockvec[m][k + 3], res2);
#ifndef USE_ROCM
res[m] = __hadd(res[m], __hadd(res2.x, res2.y)); res[m] = __hadd(res[m], __hadd(res2.x, res2.y));
#else
res[m] = __hadd(res[m], __hadd(__ushort_as_half(res2.x), __ushort_as_half(res2.y)));
#endif
} }
i += width; i += width;
k += 4; k += 4;
@@ -557,7 +569,8 @@ void gemm_half_q_half_alt
gridDim.y = DIVIDE(size_m, BLOCK_M_SIZE_MAX); gridDim.y = DIVIDE(size_m, BLOCK_M_SIZE_MAX);
gridDim.z = DIVIDE(size_k, BLOCK_KN_SIZE); gridDim.z = DIVIDE(size_k, BLOCK_KN_SIZE);
gemm_half_q_half_alt_kernel<<<gridDim, blockDim>>> const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
gemm_half_q_half_alt_kernel<<<gridDim, blockDim, 0, stream>>>
( (
(const half2*) a, (const half2*) a,
b_q_weight, b_q_weight,
@@ -629,7 +642,8 @@ void reconstruct_gptq
blockDim.y = 1; blockDim.y = 1;
gridDim.y = DIVIDE(height, 8); gridDim.y = DIVIDE(height, 8);
gridDim.x = DIVIDE(width, BLOCK_KN_SIZE); gridDim.x = DIVIDE(width, BLOCK_KN_SIZE);
reconstruct_gptq_kernel<<<gridDim, blockDim>>> const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
reconstruct_gptq_kernel<<<gridDim, blockDim, 0, stream>>>
( (
b_q_weight, b_q_weight,
b_gptq_scales, b_gptq_scales,
@@ -784,7 +798,8 @@ void shuffle_exllama_weight
gridDim.x = DIVIDE(width, THREADS_X); gridDim.x = DIVIDE(width, THREADS_X);
gridDim.y = height / 8; gridDim.y = height / 8;
make_sequential_kernel<<<gridDim, blockDim>>> const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
make_sequential_kernel<<<gridDim, blockDim, 0, stream>>>
( (
q_weight, q_weight,
new_qweight, new_qweight,
@@ -803,7 +818,8 @@ void shuffle_exllama_weight
blockDim.y = 1; blockDim.y = 1;
gridDim.x = DIVIDE(width, THREADS_X); gridDim.x = DIVIDE(width, THREADS_X);
gridDim.y = 1; gridDim.y = 1;
shuffle_kernel<<<gridDim, blockDim>>>(q_weight, height, width); const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
shuffle_kernel<<<gridDim, blockDim, 0, stream>>>(q_weight, height, width);
} }
} // namespace gptq } // namespace gptq

View File

@@ -7,6 +7,7 @@
// half-tensor // half-tensor
#include <c10/cuda/CUDAStream.h> #include <c10/cuda/CUDAStream.h>
#include <ATen/cuda/CUDATensorMethods.cuh> #include <ATen/cuda/CUDATensorMethods.cuh>
#include <c10/cuda/CUDAGuard.h>
#define BLOCKWIDTH 128 #define BLOCKWIDTH 128
#define BLOCKHEIGHT4 16 #define BLOCKHEIGHT4 16
@@ -200,7 +201,9 @@ void squeezellm_gemm(
); );
dim3 threads(BLOCKWIDTH); dim3 threads(BLOCKWIDTH);
vllm::squeezellm::NUQ4MatMulKernel<<<blocks, threads>>>( const at::cuda::OptionalCUDAGuard device_guard(device_of(vec));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
vllm::squeezellm::NUQ4MatMulKernel<<<blocks, threads, 0, stream>>>(
#ifndef USE_ROCM #ifndef USE_ROCM
(half2*) vec.data<at::Half>(), (half2*) vec.data<at::Half>(),
#else #else

View File

@@ -116,6 +116,7 @@ Alternatively, if you plan to install vLLM-ROCm on a local machine or start from
- `ROCm <https://rocm.docs.amd.com/en/latest/deploy/linux/index.html>`_ - `ROCm <https://rocm.docs.amd.com/en/latest/deploy/linux/index.html>`_
- `Pytorch <https://pytorch.org/>`_ - `Pytorch <https://pytorch.org/>`_
- `hipBLAS <https://rocm.docs.amd.com/projects/hipBLAS/en/latest/install.html>`_
1. Install `flash attention for ROCm <https://github.com/ROCmSoftwarePlatform/flash-attention/tree/flash_attention_for_rocm>`_ 1. Install `flash attention for ROCm <https://github.com/ROCmSoftwarePlatform/flash-attention/tree/flash_attention_for_rocm>`_

View File

@@ -42,6 +42,10 @@ You can install vLLM using pip:
$ pip uninstall torch -y $ pip uninstall torch -y
$ pip install torch --upgrade --index-url https://download.pytorch.org/whl/cu118 $ pip install torch --upgrade --index-url https://download.pytorch.org/whl/cu118
$ # Re-install xFormers with CUDA 11.8.
$ pip uninstall xformers -y
$ pip install --upgrade xformers --index-url https://download.pytorch.org/whl/cu118
.. _build_from_source: .. _build_from_source:

View File

@@ -58,11 +58,10 @@ Next, you need to rewrite the :code:`forward` methods of your model by following
+ positions: torch.Tensor, + positions: torch.Tensor,
+ kv_caches: List[KVCache], + kv_caches: List[KVCache],
+ input_metadata: InputMetadata, + input_metadata: InputMetadata,
+ cache_events: Optional[List[torch.cuda.Event]], +) -> Optional[SamplerOutput]:
+) -> SamplerOutput:
3. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors. 1. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors.
4. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture. 2. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture.
.. note:: .. note::
Currently, vLLM supports the basic multi-head attention mechanism and its variant with rotary positional embeddings. Currently, vLLM supports the basic multi-head attention mechanism and its variant with rotary positional embeddings.

View File

@@ -89,9 +89,11 @@ Below, you can find an explanation of every engine argument for vLLM:
CPU swap space size (GiB) per GPU. CPU swap space size (GiB) per GPU.
.. option:: --gpu-memory-utilization <percentage> .. option:: --gpu-memory-utilization <fraction>
The percentage of GPU memory to be used for the model executor. The fraction of GPU memory to be used for the model executor, which can range from 0 to 1.
For example, a value of 0.5 would imply 50% GPU memory utilization.
If unspecified, will use the default value of 0.9.
.. option:: --max-num-batched-tokens <tokens> .. option:: --max-num-batched-tokens <tokens>

View File

@@ -23,6 +23,9 @@ Alongside each architecture, we include some popular models that use it.
* - :code:`ChatGLMModel` * - :code:`ChatGLMModel`
- ChatGLM - ChatGLM
- :code:`THUDM/chatglm2-6b`, :code:`THUDM/chatglm3-6b`, etc. - :code:`THUDM/chatglm2-6b`, :code:`THUDM/chatglm3-6b`, etc.
* - :code:`DeciLMForCausalLM`
- DeciLM
- :code:`Deci/DeciLM-7B`, :code:`Deci/DeciLM-7B-instruct`, etc.
* - :code:`BloomForCausalLM` * - :code:`BloomForCausalLM`
- BLOOM, BLOOMZ, BLOOMChat - BLOOM, BLOOMZ, BLOOMChat
- :code:`bigscience/bloom`, :code:`bigscience/bloomz`, etc. - :code:`bigscience/bloom`, :code:`bigscience/bloomz`, etc.
@@ -90,7 +93,7 @@ Alternatively, you can raise an issue on our `GitHub <https://github.com/vllm-pr
If vLLM successfully generates text, it indicates that your model is supported. If vLLM successfully generates text, it indicates that your model is supported.
.. tip:: .. tip::
To use models from `ModelScope <www.modelscope.cn>`_ instead of HuggingFace Hub, set an environment variable: To use models from `ModelScope <https://www.modelscope.cn>`_ instead of HuggingFace Hub, set an environment variable:
.. code-block:: shell .. code-block:: shell

View File

@@ -28,4 +28,4 @@ To run inference on a single or multiple GPUs, use ``VLLM`` class from ``langcha
print(llm("What is the capital of France ?")) print(llm("What is the capital of France ?"))
Please refer to this `Tutorial <https://github.com/langchain-ai/langchain/blob/master/docs/extras/integrations/llms/vllm.ipynb>`_ for more details. Please refer to this `Tutorial <https://github.com/langchain-ai/langchain/blob/master/docs/docs/integrations/llms/vllm.ipynb>`_ for more details.

View File

@@ -47,6 +47,6 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
demo = build_demo() demo = build_demo()
demo.queue(concurrency_count=100).launch(server_name=args.host, demo.queue().launch(server_name=args.host,
server_port=args.port, server_port=args.port,
share=True) share=True)

View File

@@ -3,8 +3,6 @@ typing-extensions>=4.8.0
starlette starlette
psutil psutil
ray >= 2.5.1 ray >= 2.5.1
pandas # Required for Ray data.
pyarrow # Required for Ray data.
sentencepiece # Required for LLaMA tokenizer. sentencepiece # Required for LLaMA tokenizer.
numpy numpy
tokenizers>=0.15.0 tokenizers>=0.15.0

View File

@@ -1,8 +1,6 @@
ninja # For faster builds. ninja # For faster builds.
psutil psutil
ray >= 2.5.1 ray >= 2.5.1
pandas # Required for Ray data.
pyarrow # Required for Ray data.
sentencepiece # Required for LLaMA tokenizer. sentencepiece # Required for LLaMA tokenizer.
numpy numpy
torch == 2.1.2 torch == 2.1.2

View File

@@ -219,13 +219,13 @@ vllm_extension_sources = [
"csrc/activation_kernels.cu", "csrc/activation_kernels.cu",
"csrc/layernorm_kernels.cu", "csrc/layernorm_kernels.cu",
"csrc/quantization/squeezellm/quant_cuda_kernel.cu", "csrc/quantization/squeezellm/quant_cuda_kernel.cu",
"csrc/quantization/gptq/q_gemm.cu",
"csrc/cuda_utils_kernels.cu", "csrc/cuda_utils_kernels.cu",
"csrc/pybind.cpp", "csrc/pybind.cpp",
] ]
if _is_cuda(): if _is_cuda():
vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu") vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu")
vllm_extension_sources.append("csrc/quantization/gptq/q_gemm.cu")
vllm_extension = CUDAExtension( vllm_extension = CUDAExtension(
name="vllm._C", name="vllm._C",

View File

@@ -8,11 +8,11 @@ import pytest
import requests import requests
def _query_server(prompt: str) -> dict: def _query_server(prompt: str, max_tokens: int = 5) -> dict:
response = requests.post("http://localhost:8000/generate", response = requests.post("http://localhost:8000/generate",
json={ json={
"prompt": prompt, "prompt": prompt,
"max_tokens": 100, "max_tokens": max_tokens,
"temperature": 0, "temperature": 0,
"ignore_eos": True "ignore_eos": True
}) })
@@ -20,6 +20,10 @@ def _query_server(prompt: str) -> dict:
return response.json() return response.json()
def _query_server_long(prompt: str) -> dict:
return _query_server(prompt, max_tokens=500)
@pytest.fixture @pytest.fixture
def api_server(): def api_server():
script_path = Path(__file__).parent.joinpath( script_path = Path(__file__).parent.joinpath(
@@ -44,13 +48,14 @@ def test_api_server(api_server):
""" """
with Pool(32) as pool: with Pool(32) as pool:
# Wait until the server is ready # Wait until the server is ready
prompts = ["Hello world"] * 1 prompts = ["warm up"] * 1
result = None result = None
while not result: while not result:
try: try:
for _ in pool.map(_query_server, prompts): for r in pool.map(_query_server, prompts):
result = r
break break
except Exception: except requests.exceptions.ConnectionError:
time.sleep(1) time.sleep(1)
# Actual tests start here # Actual tests start here
@@ -63,12 +68,14 @@ def test_api_server(api_server):
assert num_aborted_requests == 0 assert num_aborted_requests == 0
# Try with 100 prompts # Try with 100 prompts
prompts = ["Hello world"] * 100 prompts = ["test prompt"] * 100
for result in pool.map(_query_server, prompts): for result in pool.map(_query_server, prompts):
assert result assert result
with Pool(32) as pool:
# Cancel requests # Cancel requests
pool.map_async(_query_server, prompts) prompts = ["canceled requests"] * 100
pool.map_async(_query_server_long, prompts)
time.sleep(0.01) time.sleep(0.01)
pool.terminate() pool.terminate()
pool.join() pool.join()
@@ -81,6 +88,6 @@ def test_api_server(api_server):
# check that server still runs after cancellations # check that server still runs after cancellations
with Pool(32) as pool: with Pool(32) as pool:
# Try with 100 prompts # Try with 100 prompts
prompts = ["Hello world"] * 100 prompts = ["test prompt after canceled"] * 100
for result in pool.map(_query_server, prompts): for result in pool.map(_query_server, prompts):
assert result assert result

View File

@@ -8,8 +8,9 @@ from transformers import AutoModelForCausalLM
from vllm import LLM, SamplingParams from vllm import LLM, SamplingParams
from vllm.transformers_utils.tokenizer import get_tokenizer from vllm.transformers_utils.tokenizer import get_tokenizer
_TEST_PROMPTS = ["prompts/example.txt"] _TEST_DIR = os.path.dirname(__file__)
_LONG_PROMPTS = ["prompts/summary.txt"] _TEST_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "example.txt")]
_LONG_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "summary.txt")]
def _read_prompts(filename: str) -> str: def _read_prompts(filename: str) -> str:
@@ -24,7 +25,7 @@ def _read_prompts(filename: str) -> str:
def example_prompts() -> List[str]: def example_prompts() -> List[str]:
prompts = [] prompts = []
for filename in _TEST_PROMPTS: for filename in _TEST_PROMPTS:
prompts += _read_prompts(os.path.join("tests", filename)) prompts += _read_prompts(filename)
return prompts return prompts
@@ -32,7 +33,7 @@ def example_prompts() -> List[str]:
def example_long_prompts() -> List[str]: def example_long_prompts() -> List[str]:
prompts = [] prompts = []
for filename in _LONG_PROMPTS: for filename in _LONG_PROMPTS:
prompts += _read_prompts(os.path.join("tests", filename)) prompts += _read_prompts(filename)
return prompts return prompts

View File

@@ -8,7 +8,7 @@ import pytest
import torch import torch
from vllm.config import ParallelConfig from vllm.config import ParallelConfig
from vllm.engine.ray_utils import get_open_port from vllm.utils import get_open_port
from vllm.model_executor.parallel_utils.communication_op import ( from vllm.model_executor.parallel_utils.communication_op import (
tensor_model_parallel_all_reduce, tensor_model_parallel_all_reduce,
tensor_model_parallel_all_gather, tensor_model_parallel_all_gather,

View File

@@ -12,6 +12,7 @@ def create_kv_caches(
head_size: int, head_size: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: str,
) -> Tuple[List[torch.Tensor], List[torch.Tensor]]: ) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
@@ -23,7 +24,7 @@ def create_kv_caches(
for _ in range(num_layers): for _ in range(num_layers):
key_cache = torch.empty(size=key_cache_shape, key_cache = torch.empty(size=key_cache_shape,
dtype=dtype, dtype=dtype,
device='cuda') device=device)
key_cache.uniform_(-scale, scale) key_cache.uniform_(-scale, scale)
key_caches.append(key_cache) key_caches.append(key_cache)
@@ -32,7 +33,7 @@ def create_kv_caches(
for _ in range(num_layers): for _ in range(num_layers):
value_cache = torch.empty(size=value_cache_shape, value_cache = torch.empty(size=value_cache_shape,
dtype=dtype, dtype=dtype,
device='cuda') device=device)
value_cache.uniform_(-scale, scale) value_cache.uniform_(-scale, scale)
value_caches.append(value_cache) value_caches.append(value_cache)
return key_caches, value_caches return key_caches, value_caches

View File

@@ -7,22 +7,26 @@ DTYPES = [torch.half, torch.bfloat16, torch.float]
NUM_TOKENS = [7, 83, 2048] # Arbitrary values for testing NUM_TOKENS = [7, 83, 2048] # Arbitrary values for testing
D = [512, 4096, 5120, 13824] # Arbitrary values for testing D = [512, 4096, 5120, 13824] # Arbitrary values for testing
SEEDS = [0] SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_tokens", NUM_TOKENS) @pytest.mark.parametrize("num_tokens", NUM_TOKENS)
@pytest.mark.parametrize("d", D) @pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_silu_and_mul( def test_silu_and_mul(
num_tokens: int, num_tokens: int,
d: int, d: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, 2 * d, dtype=dtype, device="cuda") gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, 2 * d, dtype=dtype, device=gpu_id)
layer = SiluAndMul() layer = SiluAndMul()
out = layer(x) out = layer(x)
ref_out = layer._forward(x) ref_out = layer._forward(x)
@@ -33,16 +37,19 @@ def test_silu_and_mul(
@pytest.mark.parametrize("d", D) @pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_gelu_new( def test_gelu_new(
num_tokens: int, num_tokens: int,
d: int, d: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, d, dtype=dtype, device="cuda") gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, d, dtype=dtype, device=gpu_id)
layer = NewGELU() layer = NewGELU()
out = layer(x) out = layer(x)
ref_out = layer._forward(x) ref_out = layer._forward(x)
@@ -53,15 +60,18 @@ def test_gelu_new(
@pytest.mark.parametrize("d", D) @pytest.mark.parametrize("d", D)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
def test_gelu_fast( def test_gelu_fast(
num_tokens: int, num_tokens: int,
d: int, d: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
x = torch.randn(num_tokens, d, dtype=dtype, device="cuda") gpu_id = f"cuda:{device}"
x = torch.randn(num_tokens, d, dtype=dtype, device=gpu_id)
layer = FastGELU() layer = FastGELU()
out = layer(x) out = layer(x)
ref_out = layer._forward(x) ref_out = layer._forward(x)

View File

@@ -24,6 +24,7 @@ HEAD_SIZES = [64, 80, 96, 112, 128, 256]
BLOCK_SIZES = [16, 32] BLOCK_SIZES = [16, 32]
USE_ALIBI = [False, True] USE_ALIBI = [False, True]
SEEDS = [0] SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
def ref_masked_attention( def ref_masked_attention(
@@ -87,7 +88,7 @@ def ref_single_query_cached_kv_attention(
alibi_bias = None alibi_bias = None
if alibi_slopes is not None: if alibi_slopes is not None:
# Create the ALiBi bias used in the paged attention kernel. # Create the ALiBi bias used in the paged attention kernel.
position_ids = torch.arange(context_len, device="cuda").int() position_ids = torch.arange(context_len, device=query.device).int()
alibi_bias = (position_ids - context_len + 1).float() alibi_bias = (position_ids - context_len + 1).float()
alibi_bias = alibi_slopes.view(-1, 1, 1) * alibi_bias.view( alibi_bias = alibi_slopes.view(-1, 1, 1) * alibi_bias.view(
1, 1, -1) 1, 1, -1)
@@ -105,6 +106,7 @@ def ref_single_query_cached_kv_attention(
@pytest.mark.parametrize("block_size", BLOCK_SIZES) @pytest.mark.parametrize("block_size", BLOCK_SIZES)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
def test_paged_attention( def test_paged_attention(
kv_cache_factory, kv_cache_factory,
version: str, version: str,
@@ -115,18 +117,19 @@ def test_paged_attention(
block_size: int, block_size: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
random.seed(seed) random.seed(seed)
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
scale = float(1.0 / (head_size**0.5)) scale = float(1.0 / (head_size**0.5))
num_query_heads, num_kv_heads = num_heads num_query_heads, num_kv_heads = num_heads
query = torch.empty(num_seqs, query = torch.empty(num_seqs,
num_query_heads, num_query_heads,
head_size, head_size,
dtype=dtype, dtype=dtype,
device="cuda") device=gpu_id)
query.uniform_(-scale, scale) query.uniform_(-scale, scale)
assert num_query_heads % num_kv_heads == 0 assert num_query_heads % num_kv_heads == 0
@@ -135,12 +138,12 @@ def test_paged_attention(
if use_alibi: if use_alibi:
alibi_slopes = torch.randn(num_query_heads, alibi_slopes = torch.randn(num_query_heads,
dtype=torch.float, dtype=torch.float,
device="cuda") device=gpu_id)
context_lens = [random.randint(1, MAX_SEQ_LEN) for _ in range(num_seqs)] context_lens = [random.randint(1, MAX_SEQ_LEN) for _ in range(num_seqs)]
context_lens[-1] = MAX_SEQ_LEN context_lens[-1] = MAX_SEQ_LEN
max_context_len = max(context_lens) max_context_len = max(context_lens)
context_lens = torch.tensor(context_lens, dtype=torch.int, device="cuda") context_lens = torch.tensor(context_lens, dtype=torch.int, device=gpu_id)
# Create the block tables. # Create the block tables.
max_num_blocks_per_seq = (max_context_len + block_size - 1) // block_size max_num_blocks_per_seq = (max_context_len + block_size - 1) // block_size
@@ -151,12 +154,12 @@ def test_paged_attention(
for _ in range(max_num_blocks_per_seq) for _ in range(max_num_blocks_per_seq)
] ]
block_tables.append(block_table) block_tables.append(block_table)
block_tables = torch.tensor(block_tables, dtype=torch.int, device="cuda") block_tables = torch.tensor(block_tables, dtype=torch.int, device=gpu_id)
# Create the KV caches. # Create the KV caches.
key_caches, value_caches = kv_cache_factory(NUM_BLOCKS, block_size, 1, key_caches, value_caches = kv_cache_factory(NUM_BLOCKS, block_size, 1,
num_kv_heads, head_size, dtype, num_kv_heads, head_size, dtype,
seed) seed, gpu_id)
key_cache, value_cache = key_caches[0], value_caches[0] key_cache, value_cache = key_caches[0], value_caches[0]
# Call the paged attention kernel. # Call the paged attention kernel.
@@ -249,7 +252,7 @@ def ref_multi_query_kv_attention(
attn_mask = torch.triu(torch.ones(seq_len, seq_len, dtype=dtype), attn_mask = torch.triu(torch.ones(seq_len, seq_len, dtype=dtype),
diagonal=1) diagonal=1)
attn_mask = attn_mask * torch.finfo(dtype).min attn_mask = attn_mask * torch.finfo(dtype).min
attn_mask = attn_mask.to(dtype=dtype, device="cuda") attn_mask = attn_mask.to(dtype=dtype, device=query.device)
ref_output = ref_masked_attention( ref_output = ref_masked_attention(
query[start_idx:end_idx], query[start_idx:end_idx],
@@ -269,6 +272,7 @@ def ref_multi_query_kv_attention(
@pytest.mark.parametrize("head_size", HEAD_SIZES) @pytest.mark.parametrize("head_size", HEAD_SIZES)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_multi_query_kv_attention( def test_multi_query_kv_attention(
num_seqs: int, num_seqs: int,
@@ -276,11 +280,12 @@ def test_multi_query_kv_attention(
head_size: int, head_size: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
random.seed(seed) random.seed(seed)
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# MAX_SEQ_LEN sometimes causes OOM in the reference implementation. # MAX_SEQ_LEN sometimes causes OOM in the reference implementation.
# As the xformers library is already tested with its own tests, we can use # As the xformers library is already tested with its own tests, we can use
# a smaller MAX_SEQ_LEN here. # a smaller MAX_SEQ_LEN here.
@@ -294,7 +299,7 @@ def test_multi_query_kv_attention(
num_query_heads + 2 * num_kv_heads, num_query_heads + 2 * num_kv_heads,
head_size, head_size,
dtype=dtype, dtype=dtype,
device="cuda") device=gpu_id)
qkv.uniform_(-scale, scale) qkv.uniform_(-scale, scale)
query, key, value = qkv.split( query, key, value = qkv.split(
[num_query_heads, num_kv_heads, num_kv_heads], dim=1) [num_query_heads, num_kv_heads, num_kv_heads], dim=1)

View File

@@ -14,6 +14,7 @@ BLOCK_SIZES = [8, 16, 32]
NUM_BLOCKS = [1024, 36000] # Arbitrary values for testing NUM_BLOCKS = [1024, 36000] # Arbitrary values for testing
NUM_MAPPINGS = [256] # Arbitrary values for testing NUM_MAPPINGS = [256] # Arbitrary values for testing
SEEDS = [0] SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS) @pytest.mark.parametrize("num_mappings", NUM_MAPPINGS)
@@ -24,6 +25,7 @@ SEEDS = [0]
@pytest.mark.parametrize("num_blocks", NUM_BLOCKS) @pytest.mark.parametrize("num_blocks", NUM_BLOCKS)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_copy_blocks( def test_copy_blocks(
kv_cache_factory, kv_cache_factory,
@@ -35,11 +37,12 @@ def test_copy_blocks(
num_blocks: int, num_blocks: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
random.seed(seed) random.seed(seed)
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# Generate random block mappings where each source block is mapped to two # Generate random block mappings where each source block is mapped to two
# destination blocks. # destination blocks.
assert 2 * num_mappings <= num_blocks assert 2 * num_mappings <= num_blocks
@@ -56,7 +59,7 @@ def test_copy_blocks(
# Create the KV caches. # Create the KV caches.
key_caches, value_caches = kv_cache_factory(num_blocks, block_size, key_caches, value_caches = kv_cache_factory(num_blocks, block_size,
num_layers, num_heads, num_layers, num_heads,
head_size, dtype, seed) head_size, dtype, seed, gpu_id)
# Clone the KV caches. # Clone the KV caches.
cloned_key_caches = [key_cache.clone() for key_cache in key_caches] cloned_key_caches = [key_cache.clone() for key_cache in key_caches]
@@ -88,6 +91,7 @@ def test_copy_blocks(
@pytest.mark.parametrize("num_blocks", NUM_BLOCKS) @pytest.mark.parametrize("num_blocks", NUM_BLOCKS)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_reshape_and_cache( def test_reshape_and_cache(
kv_cache_factory, kv_cache_factory,
@@ -98,28 +102,29 @@ def test_reshape_and_cache(
num_blocks: int, num_blocks: int,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
random.seed(seed) random.seed(seed)
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
# Create a random slot mapping. # Create a random slot mapping.
num_slots = block_size * num_blocks num_slots = block_size * num_blocks
slot_mapping = random.sample(range(num_slots), num_tokens) slot_mapping = random.sample(range(num_slots), num_tokens)
slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device="cuda") slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device=gpu_id)
qkv = torch.randn(num_tokens, qkv = torch.randn(num_tokens,
3, 3,
num_heads, num_heads,
head_size, head_size,
dtype=dtype, dtype=dtype,
device="cuda") device=gpu_id)
_, key, value = qkv.unbind(dim=1) _, key, value = qkv.unbind(dim=1)
# Create the KV caches. # Create the KV caches.
key_caches, value_caches = kv_cache_factory(num_blocks, block_size, 1, key_caches, value_caches = kv_cache_factory(num_blocks, block_size, 1,
num_heads, head_size, dtype, num_heads, head_size, dtype,
seed) seed, gpu_id)
key_cache, value_cache = key_caches[0], value_caches[0] key_cache, value_cache = key_caches[0], value_caches[0]
# Clone the KV caches. # Clone the KV caches.

View File

@@ -8,6 +8,7 @@ NUM_TOKENS = [7, 83, 4096] # Arbitrary values for testing
HIDDEN_SIZES = [768, 5120, 8192] # Arbitrary values for testing HIDDEN_SIZES = [768, 5120, 8192] # Arbitrary values for testing
ADD_RESIDUAL = [False, True] ADD_RESIDUAL = [False, True]
SEEDS = [0] SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("num_tokens", NUM_TOKENS) @pytest.mark.parametrize("num_tokens", NUM_TOKENS)
@@ -15,6 +16,7 @@ SEEDS = [0]
@pytest.mark.parametrize("add_residual", ADD_RESIDUAL) @pytest.mark.parametrize("add_residual", ADD_RESIDUAL)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_rms_norm( def test_rms_norm(
num_tokens: int, num_tokens: int,
@@ -22,14 +24,15 @@ def test_rms_norm(
add_residual: bool, add_residual: bool,
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
) -> None: ) -> None:
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
layer = RMSNorm(hidden_size).to(dtype).cuda() layer = RMSNorm(hidden_size).to(dtype=dtype, device=gpu_id)
layer.weight.data.normal_(mean=1.0, std=0.1) layer.weight.data.normal_(mean=1.0, std=0.1)
scale = 1 / (2 * hidden_size) scale = 1 / (2 * hidden_size)
x = torch.randn(num_tokens, hidden_size, dtype=dtype, device="cuda") x = torch.randn(num_tokens, hidden_size, dtype=dtype, device=gpu_id)
x *= scale x *= scale
residual = torch.randn_like(x) * scale if add_residual else None residual = torch.randn_like(x) * scale if add_residual else None

View File

@@ -13,6 +13,7 @@ NUM_HEADS = [7, 17] # Arbitrary values for testing
BATCH_SIZES = [1, 5] # Arbitrary values for testing BATCH_SIZES = [1, 5] # Arbitrary values for testing
SEQ_LENS = [11, 8192] # Arbitrary values for testing SEQ_LENS = [11, 8192] # Arbitrary values for testing
SEEDS = [0] SEEDS = [0]
DEVICES = [i for i in range(1 if torch.cuda.device_count() == 1 else 2)]
@pytest.mark.parametrize("is_neox_style", IS_NEOX_STYLE) @pytest.mark.parametrize("is_neox_style", IS_NEOX_STYLE)
@@ -23,6 +24,7 @@ SEEDS = [0]
@pytest.mark.parametrize("rotary_dim", ROTARY_DIMS) @pytest.mark.parametrize("rotary_dim", ROTARY_DIMS)
@pytest.mark.parametrize("dtype", DTYPES) @pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode() @torch.inference_mode()
def test_rotary_embedding( def test_rotary_embedding(
is_neox_style: bool, is_neox_style: bool,
@@ -33,6 +35,7 @@ def test_rotary_embedding(
rotary_dim: Optional[int], rotary_dim: Optional[int],
dtype: torch.dtype, dtype: torch.dtype,
seed: int, seed: int,
device: int,
max_position: int = 8192, max_position: int = 8192,
base: int = 10000, base: int = 10000,
) -> None: ) -> None:
@@ -40,20 +43,20 @@ def test_rotary_embedding(
rotary_dim = head_size rotary_dim = head_size
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
torch.cuda.manual_seed(seed) torch.cuda.manual_seed(seed)
gpu_id = f"cuda:{device}"
if rotary_dim is None: if rotary_dim is None:
rotary_dim = head_size rotary_dim = head_size
rope = get_rope(head_size, rotary_dim, max_position, base, is_neox_style) rope = get_rope(head_size, rotary_dim, max_position, base, is_neox_style)
rope = rope.to(dtype).cuda() rope = rope.to(dtype=dtype, device=gpu_id)
positions = torch.randint(0, positions = torch.randint(0,
max_position, (batch_size, seq_len), max_position, (batch_size, seq_len),
device="cuda") device=gpu_id)
query = torch.randn(batch_size, query = torch.randn(batch_size,
seq_len, seq_len,
num_heads * head_size, num_heads * head_size,
dtype=dtype, dtype=dtype,
device="cuda") device=gpu_id)
key = torch.randn_like(query) key = torch.randn_like(query)
# NOTE(woosuk): The reference implementation should be executed first # NOTE(woosuk): The reference implementation should be executed first

View File

@@ -8,6 +8,7 @@ MODELS = [
"facebook/opt-125m", "facebook/opt-125m",
"meta-llama/Llama-2-7b-hf", "meta-llama/Llama-2-7b-hf",
"mistralai/Mistral-7B-v0.1", "mistralai/Mistral-7B-v0.1",
"Deci/DeciLM-7b",
"tiiuae/falcon-7b", "tiiuae/falcon-7b",
"gpt2", "gpt2",
"bigcode/tiny_starcoder_py", "bigcode/tiny_starcoder_py",

View File

@@ -33,8 +33,9 @@ def test_prepare_prompt():
expected_selected_token_indices.append(selected_token_start_idx + expected_selected_token_indices.append(selected_token_start_idx +
prompt_len - 1) prompt_len - 1)
selected_token_start_idx += max_seq_len selected_token_start_idx += max_seq_len
input_tokens, input_positions, _ = model_runner._prepare_prompt( input_tokens, input_positions, _, return_prompt_lens = (
seq_group_metadata_list) model_runner._prepare_prompt(seq_group_metadata_list))
assert return_prompt_lens == prompt_lens
sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list, sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list,
prompt_lens) prompt_lens)
assert input_tokens.shape == (batch_size, max_seq_len) assert input_tokens.shape == (batch_size, max_seq_len)

View File

@@ -8,7 +8,7 @@ from vllm.entrypoints.llm import LLM
from vllm.outputs import CompletionOutput, RequestOutput from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
__version__ = "0.2.6" __version__ = "0.2.7"
__all__ = [ __all__ = [
"LLM", "LLM",

View File

@@ -112,24 +112,20 @@ class ModelConfig:
supported_load_format = [ supported_load_format = [
"auto", "pt", "safetensors", "npcache", "dummy" "auto", "pt", "safetensors", "npcache", "dummy"
] ]
rocm_not_supported_load_format = ["safetensors"] rocm_not_supported_load_format = []
if load_format not in supported_load_format: if load_format not in supported_load_format:
raise ValueError( raise ValueError(
f"Unknown load format: {self.load_format}. Must be one of " f"Unknown load format: {self.load_format}. Must be one of "
"'auto', 'pt', 'safetensors', 'npcache', or 'dummy'.") "'auto', 'pt', 'safetensors', 'npcache', or 'dummy'.")
if is_hip(): if is_hip() and load_format in rocm_not_supported_load_format:
if load_format in ["safetensors"]: rocm_supported_load_format = [
rocm_supported_load_format = [ f for f in supported_load_format
f for f in supported_load_format if (f not in rocm_not_supported_load_format)
if (f not in rocm_not_supported_load_format) ]
] raise ValueError(
raise ValueError( f"load format \'{load_format}\' is not supported in ROCm. "
f"load format \'{load_format}\' is not supported in ROCm. " f"Supported load format are "
f"Supported load format are " f"{rocm_supported_load_format}")
f"{rocm_supported_load_format}")
# Force ROCm to load from pt weights if nothing specific is set
if load_format == "auto":
load_format = "pt"
# TODO: Remove this check once HF updates the pt weights of Mixtral. # TODO: Remove this check once HF updates the pt weights of Mixtral.
architectures = getattr(self.hf_config, "architectures", []) architectures = getattr(self.hf_config, "architectures", [])
@@ -149,7 +145,7 @@ class ModelConfig:
def _verify_quantization(self) -> None: def _verify_quantization(self) -> None:
supported_quantization = ["awq", "gptq", "squeezellm"] supported_quantization = ["awq", "gptq", "squeezellm"]
rocm_not_supported_quantization = ["awq", "gptq"] rocm_not_supported_quantization = ["awq"]
if self.quantization is not None: if self.quantization is not None:
self.quantization = self.quantization.lower() self.quantization = self.quantization.lower()
@@ -185,12 +181,6 @@ class ModelConfig:
self.max_context_len_to_capture = self.max_model_len self.max_context_len_to_capture = self.max_model_len
self.max_context_len_to_capture = min(self.max_context_len_to_capture, self.max_context_len_to_capture = min(self.max_context_len_to_capture,
self.max_model_len) self.max_model_len)
if (self.quantization in ["gptq", "squeezellm"]
and not self.enforce_eager):
# Related issue: https://github.com/vllm-project/vllm/issues/2147
logger.warning(f"{self.quantization} does not support CUDA graph "
"yet. Disabling CUDA graph.")
self.enforce_eager = True
def verify_with_parallel_config( def verify_with_parallel_config(
self, self,

View File

@@ -103,7 +103,7 @@ class BlockSpaceManager:
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus: def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
# FIXME(woosuk): Here we assume that all sequences in the group share # FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences. # the same prompt. This may not be true for preempted sequences.
seq = seq_group.get_seqs()[0] seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
num_required_blocks = len(seq.logical_token_blocks) num_required_blocks = len(seq.logical_token_blocks)
if self.block_sliding_window is not None: if self.block_sliding_window is not None:
num_required_blocks = min(num_required_blocks, num_required_blocks = min(num_required_blocks,
@@ -122,7 +122,7 @@ class BlockSpaceManager:
def allocate(self, seq_group: SequenceGroup) -> None: def allocate(self, seq_group: SequenceGroup) -> None:
# NOTE: Here we assume that all sequences in the group have the same # NOTE: Here we assume that all sequences in the group have the same
# prompt. # prompt.
seq = seq_group.get_seqs()[0] seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
# Allocate new physical token blocks that will store the prompt tokens. # Allocate new physical token blocks that will store the prompt tokens.
block_table: BlockTable = [] block_table: BlockTable = []
@@ -137,7 +137,7 @@ class BlockSpaceManager:
block_table.append(block) block_table.append(block)
# Assign the block table for each sequence. # Assign the block table for each sequence.
for seq in seq_group.get_seqs(): for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
self.block_tables[seq.seq_id] = block_table.copy() self.block_tables[seq.seq_id] = block_table.copy()
def can_append_slot(self, seq_group: SequenceGroup) -> bool: def can_append_slot(self, seq_group: SequenceGroup) -> bool:

View File

@@ -139,15 +139,17 @@ class Scheduler:
while self.waiting: while self.waiting:
seq_group = self.waiting[0] seq_group = self.waiting[0]
assert seq_group.num_seqs() == 1, ( waiting_seqs = seq_group.get_seqs(
status=SequenceStatus.WAITING)
assert len(waiting_seqs) == 1, (
"Waiting sequence group should have only one prompt " "Waiting sequence group should have only one prompt "
"sequence.") "sequence.")
num_prompt_tokens = seq_group.get_seqs()[0].get_len() num_prompt_tokens = waiting_seqs[0].get_len()
if num_prompt_tokens > self.prompt_limit: if num_prompt_tokens > self.prompt_limit:
logger.warning( logger.warning(
f"Input prompt ({num_prompt_tokens} tokens) is too long" f"Input prompt ({num_prompt_tokens} tokens) is too long"
f" and exceeds limit of {self.prompt_limit}") f" and exceeds limit of {self.prompt_limit}")
for seq in seq_group.get_seqs(): for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group) ignored_seq_groups.append(seq_group)
self.waiting.pop(0) self.waiting.pop(0)
@@ -161,7 +163,7 @@ class Scheduler:
logger.warning( logger.warning(
f"Input prompt ({num_prompt_tokens} tokens) is too long" f"Input prompt ({num_prompt_tokens} tokens) is too long"
f" and exceeds the capacity of block_manager") f" and exceeds the capacity of block_manager")
for seq in seq_group.get_seqs(): for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group) ignored_seq_groups.append(seq_group)
self.waiting.pop(0) self.waiting.pop(0)
@@ -317,7 +319,7 @@ class Scheduler:
def _allocate(self, seq_group: SequenceGroup) -> None: def _allocate(self, seq_group: SequenceGroup) -> None:
self.block_manager.allocate(seq_group) self.block_manager.allocate(seq_group)
for seq in seq_group.get_seqs(): for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
seq.status = SequenceStatus.RUNNING seq.status = SequenceStatus.RUNNING
def _append_slot( def _append_slot(

View File

@@ -156,11 +156,13 @@ class EngineArgs:
type=int, type=int,
default=EngineArgs.swap_space, default=EngineArgs.swap_space,
help='CPU swap space size (GiB) per GPU') help='CPU swap space size (GiB) per GPU')
parser.add_argument('--gpu-memory-utilization', parser.add_argument(
type=float, '--gpu-memory-utilization',
default=EngineArgs.gpu_memory_utilization, type=float,
help='the percentage of GPU memory to be used for' default=EngineArgs.gpu_memory_utilization,
'the model executor') help='the fraction of GPU memory to be used for '
'the model executor, which can range from 0 to 1.'
'If unspecified, will use the default value of 0.9.')
parser.add_argument('--max-num-batched-tokens', parser.add_argument('--max-num-batched-tokens',
type=int, type=int,
default=EngineArgs.max_num_batched_tokens, default=EngineArgs.max_num_batched_tokens,

View File

@@ -183,49 +183,53 @@ class _AsyncLLMEngine(LLMEngine):
and updates the scheduler with the model outputs. Finally, it decodes and updates the scheduler with the model outputs. Finally, it decodes
the sequences and returns the newly generated results. the sequences and returns the newly generated results.
""" """
seq_group_metadata_list, scheduler_outputs, ignored = self._schedule() seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
if scheduler_outputs.is_empty():
return ignored
# Execute the model. if not scheduler_outputs.is_empty():
output = await self._run_workers_async( # Execute the model.
"execute_model", all_outputs = await self._run_workers_async(
seq_group_metadata_list=seq_group_metadata_list, "execute_model",
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in, driver_kwargs={
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out, "seq_group_metadata_list": seq_group_metadata_list,
blocks_to_copy=scheduler_outputs.blocks_to_copy, "blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
) "blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
})
return self._process_model_outputs(output, scheduler_outputs) + ignored # Only the driver worker returns the sampling results.
output = all_outputs[0]
else:
output = []
return self._process_model_outputs(output, scheduler_outputs)
async def _run_workers_async( async def _run_workers_async(
self, self,
method: str, method: str,
*args, *args,
get_all_outputs: bool = False, driver_args: Optional[List[Any]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Runs the given method on all workers.""" """Runs the given method on all workers."""
coros = [] coros = []
if driver_args is None:
driver_args = args
if driver_kwargs is None:
driver_kwargs = kwargs
# Run the driver worker asynchronously.
driver_executor = getattr(self.driver_worker, method)
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(driver_executor, *driver_args, **driver_kwargs)))
# Run the ray workers asynchronously.
for worker in self.workers: for worker in self.workers:
if self.parallel_config.worker_use_ray: coros.append(worker.execute_method.remote(method, *args, **kwargs))
coros.append(
worker.execute_method.remote(method, *args, **kwargs))
else:
executor = getattr(worker, method)
coros.append(asyncio.get_event_loop().run_in_executor(
None, partial(executor, *args, **kwargs)))
all_outputs = await asyncio.gather(*coros) all_outputs = await asyncio.gather(*coros)
return all_outputs
if get_all_outputs:
return all_outputs
# Make sure all workers have the same results.
output = all_outputs[0]
for other_output in all_outputs[1:]:
assert output == other_output
return output
class AsyncLLMEngine: class AsyncLLMEngine:
@@ -490,13 +494,12 @@ class AsyncLLMEngine:
engine_configs = engine_args.create_engine_configs() engine_configs = engine_args.create_engine_configs()
parallel_config = engine_configs[2] parallel_config = engine_configs[2]
# Initialize the cluster. # Initialize the cluster.
distributed_init_method, placement_group = initialize_cluster( placement_group = initialize_cluster(parallel_config,
parallel_config, engine_args.engine_use_ray) engine_args.engine_use_ray)
# Create the async LLM engine. # Create the async LLM engine.
engine = cls(parallel_config.worker_use_ray, engine = cls(parallel_config.worker_use_ray,
engine_args.engine_use_ray, engine_args.engine_use_ray,
*engine_configs, *engine_configs,
distributed_init_method,
placement_group, placement_group,
log_requests=not engine_args.disable_log_requests, log_requests=not engine_args.disable_log_requests,
log_stats=not engine_args.disable_log_stats, log_stats=not engine_args.disable_log_stats,

View File

@@ -1,7 +1,9 @@
import copy import copy
from collections import defaultdict
import os
import time import time
from functools import partial from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple,
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union Union)
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig, from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
SchedulerConfig) SchedulerConfig)
@@ -13,14 +15,12 @@ from vllm.logger import init_logger
from vllm.outputs import RequestOutput from vllm.outputs import RequestOutput
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
from vllm.sequence import (SamplerOutput, Sequence, SequenceGroup, from vllm.sequence import (SamplerOutput, Sequence, SequenceGroup,
SequenceGroupMetadata, SequenceGroupOutput, SequenceGroupOutput, SequenceOutput, SequenceStatus)
SequenceOutput, SequenceStatus)
from vllm.transformers_utils.tokenizer import (detokenize_incrementally, from vllm.transformers_utils.tokenizer import (detokenize_incrementally,
get_tokenizer) get_tokenizer)
from vllm.utils import Counter from vllm.utils import Counter, set_cuda_visible_devices, get_ip, get_open_port
if ray: if ray:
from ray.air.util.torch_dist import init_torch_dist_process_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -53,8 +53,6 @@ class LLMEngine:
management. management.
parallel_config: The configuration related to distributed execution. parallel_config: The configuration related to distributed execution.
scheduler_config: The configuration related to the request scheduler. scheduler_config: The configuration related to the request scheduler.
distributed_init_method: The initialization method for distributed
execution. See `torch.distributed.init_process_group` for details.
placement_group: Ray placement group for distributed execution. placement_group: Ray placement group for distributed execution.
Required for distributed execution. Required for distributed execution.
log_stats: Whether to log statistics. log_stats: Whether to log statistics.
@@ -66,7 +64,6 @@ class LLMEngine:
cache_config: CacheConfig, cache_config: CacheConfig,
parallel_config: ParallelConfig, parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig, scheduler_config: SchedulerConfig,
distributed_init_method: str,
placement_group: Optional["PlacementGroup"], placement_group: Optional["PlacementGroup"],
log_stats: bool, log_stats: bool,
) -> None: ) -> None:
@@ -105,9 +102,13 @@ class LLMEngine:
# Create the parallel GPU workers. # Create the parallel GPU workers.
if self.parallel_config.worker_use_ray: if self.parallel_config.worker_use_ray:
# Disable Ray usage stats collection.
ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
if ray_usage != "1":
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
self._init_workers_ray(placement_group) self._init_workers_ray(placement_group)
else: else:
self._init_workers(distributed_init_method) self._init_workers()
# Profile the memory usage and initialize the cache. # Profile the memory usage and initialize the cache.
self._init_cache() self._init_cache()
@@ -122,7 +123,7 @@ class LLMEngine:
# List of (timestamp, num_tokens) # List of (timestamp, num_tokens)
self.num_generation_tokens: List[Tuple[float, int]] = [] self.num_generation_tokens: List[Tuple[float, int]] = []
def _init_workers(self, distributed_init_method: str): def _init_workers(self):
# Lazy import the Worker to avoid importing torch.cuda/xformers # Lazy import the Worker to avoid importing torch.cuda/xformers
# before CUDA_VISIBLE_DEVICES is set in the Worker # before CUDA_VISIBLE_DEVICES is set in the Worker
from vllm.worker.worker import Worker from vllm.worker.worker import Worker
@@ -131,70 +132,122 @@ class LLMEngine:
"Ray is required if parallel_config.world_size > 1.") "Ray is required if parallel_config.world_size > 1.")
self.workers: List[Worker] = [] self.workers: List[Worker] = []
worker = Worker( distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}"
self.driver_worker = Worker(
self.model_config, self.model_config,
self.parallel_config, self.parallel_config,
self.scheduler_config, self.scheduler_config,
0, local_rank=0,
distributed_init_method, rank=0,
) distributed_init_method=distributed_init_method,
self.workers.append(worker) is_driver_worker=True,
self._run_workers(
"init_model",
get_all_outputs=True,
)
self._run_workers(
"load_model",
get_all_outputs=True,
max_concurrent_workers=self.parallel_config.
max_parallel_loading_workers,
) )
self._run_workers("init_model")
self._run_workers("load_model")
def _init_workers_ray(self, placement_group: "PlacementGroup", def _init_workers_ray(self, placement_group: "PlacementGroup",
**ray_remote_kwargs): **ray_remote_kwargs):
if self.parallel_config.tensor_parallel_size == 1:
num_gpus = self.cache_config.gpu_memory_utilization
else:
num_gpus = 1
self.driver_dummy_worker: RayWorkerVllm = None
self.workers: List[RayWorkerVllm] = []
driver_ip = get_ip()
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0):
continue
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=bundle_id,
)
worker = ray.remote(
num_cpus=0,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs,
)(RayWorkerVllm).remote(self.model_config.trust_remote_code)
worker_ip = ray.get(worker.get_node_ip.remote())
if worker_ip == driver_ip and self.driver_dummy_worker is None:
# If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process.
self.driver_dummy_worker = worker
else:
self.workers.append(worker)
if self.driver_dummy_worker is None:
raise ValueError(
"Ray does not allocate any GPUs on the driver node. Consider "
"adjusting the Ray placement group or running the driver on a "
"GPU node.")
driver_node_id, driver_gpu_ids = ray.get(
self.driver_dummy_worker.get_node_and_gpu_ids.remote())
worker_node_and_gpu_ids = ray.get(
[worker.get_node_and_gpu_ids.remote() for worker in self.workers])
node_workers = defaultdict(list)
node_gpus = defaultdict(list)
node_workers[driver_node_id].append(0)
node_gpus[driver_node_id].extend(driver_gpu_ids)
for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids,
start=1):
node_workers[node_id].append(i)
node_gpus[node_id].extend(gpu_ids)
for node_id, gpu_ids in node_gpus.items():
node_gpus[node_id] = sorted(gpu_ids)
# Set CUDA_VISIBLE_DEVICES for the driver.
set_cuda_visible_devices(node_gpus[driver_node_id])
for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids):
worker.set_cuda_visible_devices.remote(node_gpus[node_id])
distributed_init_method = f"tcp://{driver_ip}:{get_open_port()}"
# Lazy import the Worker to avoid importing torch.cuda/xformers # Lazy import the Worker to avoid importing torch.cuda/xformers
# before CUDA_VISIBLE_DEVICES is set in the Worker # before CUDA_VISIBLE_DEVICES is set in the Worker
from vllm.worker.worker import Worker from vllm.worker.worker import Worker
self.workers: List[Worker] = []
for bundle in placement_group.bundle_specs:
if not bundle.get("GPU", 0):
continue
if self.parallel_config.tensor_parallel_size == 1:
num_gpus = self.cache_config.gpu_memory_utilization
else:
num_gpus = 1
worker = ray.remote(
num_cpus=0,
num_gpus=num_gpus,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True),
**ray_remote_kwargs,
)(RayWorkerVllm).remote(self.model_config.trust_remote_code)
self.workers.append(worker)
# Initialize torch distributed process group for the workers. # Initialize torch distributed process group for the workers.
init_torch_dist_process_group(self.workers, backend="nccl")
model_config = copy.deepcopy(self.model_config) model_config = copy.deepcopy(self.model_config)
parallel_config = copy.deepcopy(self.parallel_config) parallel_config = copy.deepcopy(self.parallel_config)
scheduler_config = copy.deepcopy(self.scheduler_config) scheduler_config = copy.deepcopy(self.scheduler_config)
self._run_workers("init_worker",
get_all_outputs=True, for rank, (worker, (node_id,
worker_init_fn=lambda: Worker( _)) in enumerate(zip(self.workers,
model_config, worker_node_and_gpu_ids),
parallel_config, start=1):
scheduler_config, local_rank = node_workers[node_id].index(rank)
None, worker.init_worker.remote(
None, lambda rank=rank, local_rank=local_rank: Worker(
)) model_config,
self._run_workers( parallel_config,
"init_model", scheduler_config,
get_all_outputs=True, local_rank,
rank,
distributed_init_method,
))
driver_rank = 0
driver_local_rank = node_workers[driver_node_id].index(driver_rank)
self.driver_worker = Worker(
model_config,
parallel_config,
scheduler_config,
driver_local_rank,
driver_rank,
distributed_init_method,
is_driver_worker=True,
) )
self._run_workers("init_model")
self._run_workers( self._run_workers(
"load_model", "load_model",
get_all_outputs=True,
max_concurrent_workers=self.parallel_config. max_concurrent_workers=self.parallel_config.
max_parallel_loading_workers, max_parallel_loading_workers,
) )
@@ -208,7 +261,6 @@ class LLMEngine:
# Get the maximum number of blocks that can be allocated on GPU and CPU. # Get the maximum number of blocks that can be allocated on GPU and CPU.
num_blocks = self._run_workers( num_blocks = self._run_workers(
"profile_num_available_blocks", "profile_num_available_blocks",
get_all_outputs=True,
block_size=self.cache_config.block_size, block_size=self.cache_config.block_size,
gpu_memory_utilization=self.cache_config.gpu_memory_utilization, gpu_memory_utilization=self.cache_config.gpu_memory_utilization,
cpu_swap_space=self.cache_config.swap_space_bytes, cpu_swap_space=self.cache_config.swap_space_bytes,
@@ -227,6 +279,14 @@ class LLMEngine:
raise ValueError("No available memory for the cache blocks. " raise ValueError("No available memory for the cache blocks. "
"Try increasing `gpu_memory_utilization` when " "Try increasing `gpu_memory_utilization` when "
"initializing the engine.") "initializing the engine.")
max_seq_len = self.cache_config.block_size * num_gpu_blocks
if self.model_config.max_model_len > max_seq_len:
raise ValueError(
f"The model's max seq len ({self.model_config.max_model_len}) "
"is larger than the maximum number of tokens that can be "
f"stored in KV cache ({max_seq_len}). Try increasing "
"`gpu_memory_utilization` or decreasing `max_model_len` when "
"initializing the engine.")
self.cache_config.num_gpu_blocks = num_gpu_blocks self.cache_config.num_gpu_blocks = num_gpu_blocks
self.cache_config.num_cpu_blocks = num_cpu_blocks self.cache_config.num_cpu_blocks = num_cpu_blocks
@@ -244,11 +304,9 @@ class LLMEngine:
engine_configs = engine_args.create_engine_configs() engine_configs = engine_args.create_engine_configs()
parallel_config = engine_configs[2] parallel_config = engine_configs[2]
# Initialize the cluster. # Initialize the cluster.
distributed_init_method, placement_group = initialize_cluster( placement_group = initialize_cluster(parallel_config)
parallel_config)
# Create the LLM engine. # Create the LLM engine.
engine = cls(*engine_configs, engine = cls(*engine_configs,
distributed_init_method,
placement_group, placement_group,
log_stats=not engine_args.disable_log_stats) log_stats=not engine_args.disable_log_stats)
return engine return engine
@@ -315,16 +373,6 @@ class LLMEngine:
"""Returns True if there are unfinished requests.""" """Returns True if there are unfinished requests."""
return self.scheduler.has_unfinished_seqs() return self.scheduler.has_unfinished_seqs()
def _schedule(
self
) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs,
List[RequestOutput]]:
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
return seq_group_metadata_list, scheduler_outputs, [
RequestOutput.from_seq_group(seq_group)
for seq_group in scheduler_outputs.ignored_seq_groups
]
def _check_beam_search_early_stopping( def _check_beam_search_early_stopping(
self, self,
early_stopping: Union[bool, str], early_stopping: Union[bool, str],
@@ -573,18 +621,23 @@ class LLMEngine:
and updates the scheduler with the model outputs. Finally, it decodes and updates the scheduler with the model outputs. Finally, it decodes
the sequences and returns the newly generated results. the sequences and returns the newly generated results.
""" """
seq_group_metadata_list, scheduler_outputs, ignored = self._schedule() seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
if scheduler_outputs.is_empty():
return ignored
# Execute the model. if not scheduler_outputs.is_empty():
output = self._run_workers( # Execute the model.
"execute_model", all_outputs = self._run_workers(
seq_group_metadata_list=seq_group_metadata_list, "execute_model",
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in, driver_kwargs={
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out, "seq_group_metadata_list": seq_group_metadata_list,
blocks_to_copy=scheduler_outputs.blocks_to_copy, "blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
) "blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
})
# Only the driver worker returns the sampling results.
output = all_outputs[0]
else:
output = []
return self._process_model_outputs(output, scheduler_outputs) return self._process_model_outputs(output, scheduler_outputs)
@@ -712,53 +765,38 @@ class LLMEngine:
seq.status = SequenceStatus.FINISHED_STOPPED seq.status = SequenceStatus.FINISHED_STOPPED
return return
def _run_workers_in_batch(
self,
workers,
method: str,
*args,
**kwargs,
):
all_outputs = []
for worker in workers:
if self.parallel_config.worker_use_ray:
executor = partial(worker.execute_method.remote, method)
else:
executor = getattr(worker, method)
output = executor(*args, **kwargs)
all_outputs.append(output)
if self.parallel_config.worker_use_ray:
all_outputs = ray.get(all_outputs)
return all_outputs
def _run_workers( def _run_workers(
self, self,
method: str, method: str,
*args, *args,
get_all_outputs: bool = False, driver_args: Optional[List[Any]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
max_concurrent_workers: Optional[int] = None, max_concurrent_workers: Optional[int] = None,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Runs the given method on all workers.""" """Runs the given method on all workers."""
all_outputs = []
if max_concurrent_workers: if max_concurrent_workers:
work_groups = [ raise NotImplementedError(
self.workers[i:i + max_concurrent_workers] "max_concurrent_workers is not supported yet.")
for i in range(0, len(self.workers), max_concurrent_workers)
]
else:
work_groups = [self.workers]
for workers in work_groups: # Start the ray workers first.
all_outputs.extend( ray_worker_outputs = [
self._run_workers_in_batch(workers, method, *args, **kwargs)) worker.execute_method.remote(method, *args, **kwargs)
for worker in self.workers
]
if get_all_outputs: if driver_args is None:
return all_outputs driver_args = args
if driver_kwargs is None:
driver_kwargs = kwargs
# Make sure all workers have the same results. # Start the driver worker after all the ray workers.
output = all_outputs[0] driver_worker_output = getattr(self.driver_worker,
for other_output in all_outputs[1:]: method)(*driver_args, **driver_kwargs)
assert output == other_output
return output # Get the results of the ray workers.
if self.workers:
ray_worker_outputs = ray.get(ray_worker_outputs)
return [driver_worker_output] + ray_worker_outputs

View File

@@ -1,16 +1,15 @@
from typing import Optional, Tuple, TYPE_CHECKING from typing import Optional, List, Tuple, TYPE_CHECKING
from vllm.config import ParallelConfig from vllm.config import ParallelConfig
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.utils import get_open_port, is_hip from vllm.utils import is_hip, set_cuda_visible_devices, get_ip
logger = init_logger(__name__) logger = init_logger(__name__)
try: try:
import ray import ray
from ray.air.util.torch_dist import TorchDistributedWorker
class RayWorkerVllm(TorchDistributedWorker): class RayWorkerVllm:
"""Ray wrapper for vllm.worker.Worker, allowing Worker to be """Ray wrapper for vllm.worker.Worker, allowing Worker to be
lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES.""" lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES."""
@@ -30,12 +29,22 @@ try:
executor = getattr(self, method) executor = getattr(self, method)
return executor(*args, **kwargs) return executor(*args, **kwargs)
def get_node_ip(self) -> str:
return get_ip()
def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]:
node_id = ray.get_runtime_context().get_node_id()
gpu_ids = ray.get_gpu_ids()
return node_id, gpu_ids
def set_cuda_visible_devices(self, device_ids) -> None:
set_cuda_visible_devices(device_ids)
except ImportError as e: except ImportError as e:
logger.warning(f"Failed to import Ray with {e!r}. " logger.warning(f"Failed to import Ray with {e!r}. "
"For distributed inference, please install Ray with " "For distributed inference, please install Ray with "
"`pip install ray pandas pyarrow`.") "`pip install ray pandas pyarrow`.")
ray = None ray = None
TorchDistributedWorker = None
RayWorkerVllm = None RayWorkerVllm = None
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -75,13 +84,11 @@ def initialize_cluster(
ray.init(address=ray_address, ignore_reinit_error=True) ray.init(address=ray_address, ignore_reinit_error=True)
if not parallel_config.worker_use_ray: if not parallel_config.worker_use_ray:
# Initialize cluster locally. assert parallel_config.world_size == 1, (
port = get_open_port() "Ray is required if parallel_config.world_size > 1.")
# We need to setup the distributed init method to make sure return None
# the distributed megatron code (e.g., get world size) works correctly.
distributed_init_method = f"tcp://localhost:{port}"
return distributed_init_method, None
# Create placement group for worker processes
current_placement_group = ray.util.get_current_placement_group() current_placement_group = ray.util.get_current_placement_group()
if current_placement_group: if current_placement_group:
# We are in a placement group # We are in a placement group
@@ -106,12 +113,12 @@ def initialize_cluster(
"The number of required GPUs exceeds the total number of " "The number of required GPUs exceeds the total number of "
"available GPUs in the cluster.") "available GPUs in the cluster.")
# Create a new placement group # Create a new placement group
current_placement_group = ray.util.placement_group([{ placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
"GPU": 1 current_placement_group = ray.util.placement_group(
}] * parallel_config.world_size) placement_group_specs)
# Wait until PG is ready - this will block until all # Wait until PG is ready - this will block until all
# requested resources are available, and will timeout # requested resources are available, and will timeout
# if they cannot be provisioned. # if they cannot be provisioned.
ray.get(current_placement_group.ready(), timeout=1800) ray.get(current_placement_group.ready(), timeout=1800)
return None, current_placement_group return current_placement_group

View File

@@ -12,7 +12,6 @@ from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid from vllm.utils import random_uuid
TIMEOUT_KEEP_ALIVE = 5 # seconds. TIMEOUT_KEEP_ALIVE = 5 # seconds.
TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds.
app = FastAPI() app = FastAPI()
engine = None engine = None
@@ -73,6 +72,8 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default=None) parser.add_argument("--host", type=str, default=None)
parser.add_argument("--port", type=int, default=8000) parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--ssl-keyfile", type=str, default=None)
parser.add_argument("--ssl-certfile", type=str, default=None)
parser = AsyncEngineArgs.add_cli_args(parser) parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args() args = parser.parse_args()
@@ -83,4 +84,6 @@ if __name__ == "__main__":
host=args.host, host=args.host,
port=args.port, port=args.port,
log_level="debug", log_level="debug",
timeout_keep_alive=TIMEOUT_KEEP_ALIVE) timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile)

View File

@@ -80,6 +80,14 @@ def parse_args():
default="assistant", default="assistant",
help="The role name to return if " help="The role name to return if "
"`request.add_generation_prompt=true`.") "`request.add_generation_prompt=true`.")
parser.add_argument("--ssl-keyfile",
type=str,
default=None,
help="The file path to the SSL key file")
parser.add_argument("--ssl-certfile",
type=str,
default=None,
help="The file path to the SSL cert file")
parser = AsyncEngineArgs.add_cli_args(parser) parser = AsyncEngineArgs.add_cli_args(parser)
return parser.parse_args() return parser.parse_args()
@@ -744,4 +752,6 @@ if __name__ == "__main__":
host=args.host, host=args.host,
port=args.port, port=args.port,
log_level="info", log_level="info",
timeout_keep_alive=TIMEOUT_KEEP_ALIVE) timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
ssl_certfile=args.ssl_certfile)

View File

@@ -1,4 +1,4 @@
from typing import List, Optional from typing import Optional
import torch import torch
@@ -16,28 +16,27 @@ class InputMetadata:
def __init__( def __init__(
self, self,
prompt_lens: List[int], is_prompt: bool,
slot_mapping: torch.Tensor, slot_mapping: torch.Tensor,
max_context_len: Optional[int], max_context_len: Optional[int],
context_lens: Optional[torch.Tensor], context_lens: Optional[torch.Tensor],
block_tables: Optional[torch.Tensor], block_tables: Optional[torch.Tensor],
use_cuda_graph: bool, use_cuda_graph: bool,
) -> None: ) -> None:
self.prompt_lens = prompt_lens self.is_prompt = is_prompt
self.max_context_len = max_context_len self.max_context_len = max_context_len
self.slot_mapping = slot_mapping self.slot_mapping = slot_mapping
self.context_lens = context_lens self.context_lens = context_lens
self.block_tables = block_tables self.block_tables = block_tables
self.use_cuda_graph = use_cuda_graph self.use_cuda_graph = use_cuda_graph
self.is_prompt = len(prompt_lens) > 0
# Set during the execution of the first attention op. # Set during the execution of the first attention op.
# FIXME(woosuk): This is a hack. # FIXME(woosuk): This is a hack.
self.attn_bias = None self.attn_bias = None
def __repr__(self) -> str: def __repr__(self) -> str:
return ("InputMetadata(" return ("InputMetadata("
f"prompt_lens={self.prompt_lens}, " f"is_prompt={self.is_prompt}, "
f"max_context_len={self.max_context_len}, " f"max_context_len={self.max_context_len}, "
f"slot_mapping={self.slot_mapping}, " f"slot_mapping={self.slot_mapping}, "
f"context_lens={self.context_lens}, " f"context_lens={self.context_lens}, "

View File

@@ -5,7 +5,7 @@ import torch
import torch.nn as nn import torch.nn as nn
from vllm.model_executor.parallel_utils.communication_op import ( from vllm.model_executor.parallel_utils.communication_op import (
tensor_model_parallel_all_gather) tensor_model_parallel_gather)
from vllm.model_executor.sampling_metadata import SamplingMetadata, SamplingTensors from vllm.model_executor.sampling_metadata import SamplingMetadata, SamplingTensors
from vllm.sampling_params import SamplingParams, SamplingType from vllm.sampling_params import SamplingParams, SamplingType
from vllm.sequence import (PromptLogprobs, SampleLogprobs, SamplerOutput, from vllm.sequence import (PromptLogprobs, SampleLogprobs, SamplerOutput,
@@ -30,7 +30,6 @@ class Sampler(nn.Module):
def __init__(self, vocab_size: int) -> None: def __init__(self, vocab_size: int) -> None:
super().__init__() super().__init__()
self.vocab_size = vocab_size self.vocab_size = vocab_size
self._copy_stream: torch.cuda.Stream = torch.cuda.Stream()
def forward( def forward(
self, self,
@@ -38,7 +37,7 @@ class Sampler(nn.Module):
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
embedding_bias: Optional[torch.Tensor] = None, embedding_bias: Optional[torch.Tensor] = None,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
# Get the hidden states that we use for sampling. # Get the hidden states that we use for sampling.
hidden_states = _prune_hidden_states(hidden_states, sampling_metadata) hidden_states = _prune_hidden_states(hidden_states, sampling_metadata)
@@ -46,19 +45,23 @@ class Sampler(nn.Module):
logits = _get_logits(hidden_states, embedding, embedding_bias, logits = _get_logits(hidden_states, embedding, embedding_bias,
self.vocab_size) self.vocab_size)
# Only perform sampling in the driver worker.
# Note: `_get_logits` is still distributed across TP workers because
# the `embedding` weight is distributed across TP workers.
# TODO(zhuohan): Change the get_logits part to a separate stage.
if not sampling_metadata.perform_sampling:
return None
assert logits is not None
_, vocab_size = logits.shape _, vocab_size = logits.shape
# Apply logits processors (if any). # Apply logits processors (if any).
logits = _apply_logits_processors(logits, sampling_metadata) logits = _apply_logits_processors(logits, sampling_metadata)
# Prepare sampling tensors in another stream to overlap # Prepare sampling tensors with pinned memory to avoid blocking.
# CPU<->GPU data transfer with GPU computation in forward pass. (sampling_tensors, do_penalties, do_top_p_top_k,
with torch.cuda.stream(self._copy_stream): do_min_p) = SamplingTensors.from_sampling_metadata(
(sampling_tensors, do_penalties, do_top_p_top_k, sampling_metadata, vocab_size, logits.device, logits.dtype)
do_min_p) = SamplingTensors.from_sampling_metadata(
sampling_metadata, vocab_size, logits.device, logits.dtype)
torch.cuda.current_stream().wait_stream(self._copy_stream)
# Apply presence and frequency penalties. # Apply presence and frequency penalties.
if do_penalties: if do_penalties:
@@ -97,14 +100,15 @@ class Sampler(nn.Module):
def _get_logits(hidden_states: torch.Tensor, embedding: torch.Tensor, def _get_logits(hidden_states: torch.Tensor, embedding: torch.Tensor,
embedding_bias: Optional[torch.Tensor], embedding_bias: Optional[torch.Tensor],
vocab_size: int) -> torch.Tensor: vocab_size: int) -> Optional[torch.Tensor]:
# Get the logits for the next tokens. # Get the logits for the next tokens.
logits = torch.matmul(hidden_states, embedding.t()) logits = torch.matmul(hidden_states, embedding.t())
if embedding_bias is not None: if embedding_bias is not None:
logits += embedding_bias logits += embedding_bias
logits = tensor_model_parallel_all_gather(logits) logits = tensor_model_parallel_gather(logits)
# Remove paddings in vocab (if any). # Remove paddings in vocab (if any).
logits = logits[:, :vocab_size] if logits is not None:
logits = logits[:, :vocab_size]
return logits return logits
@@ -117,27 +121,6 @@ def _prune_hidden_states(
sampling_metadata.selected_token_indices) sampling_metadata.selected_token_indices)
def _get_prompt_and_output_tokens(
sampling_metadata: SamplingMetadata,
) -> Tuple[List[List[int]], List[List[int]]]:
prompt_tokens: List[List[int]] = []
output_tokens: List[List[int]] = []
for i, seq_group in enumerate(sampling_metadata.seq_groups):
seq_ids, sampling_params = seq_group
if (i < sampling_metadata.num_prompts
and sampling_params.prompt_logprobs is not None):
# NOTE: prompt token positions do not need output tokens to
# compute penalties.
prompt_len = sampling_metadata.prompt_lens[i]
prompt_tokens.extend([] for _ in range(prompt_len - 1))
output_tokens.extend([] for _ in range(prompt_len - 1))
for seq_id in seq_ids:
seq_data = sampling_metadata.seq_data[seq_id]
prompt_tokens.append(seq_data.prompt_token_ids)
output_tokens.append(seq_data.output_token_ids)
return prompt_tokens, output_tokens
def _get_bin_counts_and_mask( def _get_bin_counts_and_mask(
tokens: torch.Tensor, tokens: torch.Tensor,
vocab_size: int, vocab_size: int,

View File

@@ -17,6 +17,7 @@ _MODELS = {
"BloomForCausalLM": ("bloom", "BloomForCausalLM"), "BloomForCausalLM": ("bloom", "BloomForCausalLM"),
"ChatGLMModel": ("chatglm", "ChatGLMForCausalLM"), "ChatGLMModel": ("chatglm", "ChatGLMForCausalLM"),
"ChatGLMForConditionalGeneration": ("chatglm", "ChatGLMForCausalLM"), "ChatGLMForConditionalGeneration": ("chatglm", "ChatGLMForCausalLM"),
"DeciLMForCausalLM": ("decilm", "DeciLMForCausalLM"),
"FalconForCausalLM": ("falcon", "FalconForCausalLM"), "FalconForCausalLM": ("falcon", "FalconForCausalLM"),
"GPT2LMHeadModel": ("gpt2", "GPT2LMHeadModel"), "GPT2LMHeadModel": ("gpt2", "GPT2LMHeadModel"),
"GPTBigCodeForCausalLM": ("gpt_bigcode", "GPTBigCodeForCausalLM"), "GPTBigCodeForCausalLM": ("gpt_bigcode", "GPTBigCodeForCausalLM"),

View File

@@ -298,7 +298,7 @@ class AquilaForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -313,7 +313,7 @@ class BaiChuanBaseForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -290,7 +290,7 @@ class BloomForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -349,7 +349,7 @@ class ChatGLMForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -0,0 +1,123 @@
# coding=utf-8
# Adapted from
# https://github.com/huggingface/transformers/blob/v4.28.0/src/transformers/models/llama/modeling_llama.py
# Copyright 2023 DeciAI Research Team. All rights reserved.
# Copyright 2023 The vLLM team.
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on MistralAI GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Inference-only DeciLM model compatible with HuggingFace weights."""
from typing import Optional
import torch
from transformers import PretrainedConfig
from vllm.model_executor.layers.linear import LinearMethodBase
from vllm.model_executor.models.llama import LlamaForCausalLM
from vllm.model_executor.weight_utils import (default_weight_loader,
hf_model_weights_iterator)
class DeciLMForCausalLM(LlamaForCausalLM):
"""
Implementation for https://huggingface.co/Deci/DeciLM-7b-instruct.
Based on the llama executor.
The main difference is that DeciLM uses Variable Grouped Query Attention.
The constant number of GQA heads in the decoder is overriden with a value
per layer.
Usually, in the HuggingFace implementation, instead of
"config.num_key_value_heads", we use
"config.num_key_value_heads_per_layer[i]" which varies.
Currently, PagedAttention does not work well with variable GQA, so we
normalize the weights upon loading, and use uniform GQA with the max value
instead.
"""
def __init__(
self,
config: Optional[PretrainedConfig] = None,
linear_method: Optional[LinearMethodBase] = None,
) -> None:
config.num_key_value_heads = max(config.num_key_value_heads_per_layer)
delattr(config, "num_key_value_heads_per_layer")
super().__init__(config=config, linear_method=linear_method)
def load_weights(self,
model_name_or_path: str,
cache_dir: Optional[str] = None,
load_format: str = "auto",
revision: Optional[str] = None):
stacked_params_mapping = [
# (param_name, shard_name, shard_id)
("qkv_proj", "q_proj", "q"),
("qkv_proj", "k_proj", "k"),
("qkv_proj", "v_proj", "v"),
("gate_up_proj", "gate_proj", 0),
("gate_up_proj", "up_proj", 1),
]
params_dict = dict(self.named_parameters())
for name, loaded_weight in hf_model_weights_iterator(
model_name_or_path, cache_dir, load_format, revision):
if "rotary_emb.inv_freq" in name:
continue
if "k_proj" in name or "v_proj" in name:
loaded_weight = self._degroup_weight(loaded_weight)
for (param_name, weight_name, shard_id) in stacked_params_mapping:
if weight_name not in name:
continue
name = name.replace(weight_name, param_name)
# Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict:
continue
param = params_dict[name]
weight_loader = param.weight_loader
weight_loader(param, loaded_weight, shard_id)
break
else:
# Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict:
continue
param = params_dict[name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
weight_loader(param, loaded_weight)
def _degroup_weight(self, loaded_weight: torch.Tensor) -> torch.Tensor:
hidden_size = self.config.hidden_size
head_size = self.config.hidden_size // self.config.num_attention_heads
target_num_kv_heads = self.config.num_key_value_heads
num_kv_heads = loaded_weight.shape[0] // head_size
n_repeats = target_num_kv_heads / num_kv_heads
assert n_repeats == int(n_repeats)
n_repeats = int(n_repeats)
loaded_weight = loaded_weight.view(num_kv_heads, head_size,
hidden_size)
loaded_weight = torch.repeat_interleave(loaded_weight,
repeats=n_repeats,
dim=0)
loaded_weight = loaded_weight.reshape(target_num_kv_heads * head_size,
hidden_size)
return loaded_weight

View File

@@ -394,7 +394,7 @@ class FalconForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -235,7 +235,7 @@ class GPT2LMHeadModel(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -254,7 +254,7 @@ class GPTBigCodeForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -240,7 +240,7 @@ class GPTJForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata, self.lm_head.bias) sampling_metadata, self.lm_head.bias)
return next_tokens return next_tokens

View File

@@ -54,6 +54,7 @@ class GPTNeoXAttention(nn.Module):
self.total_num_heads = config.num_attention_heads self.total_num_heads = config.num_attention_heads
self.hidden_size = config.hidden_size self.hidden_size = config.hidden_size
self.head_size = self.hidden_size // self.total_num_heads self.head_size = self.hidden_size // self.total_num_heads
self.bias = getattr(config, "attention_bias", True)
tensor_model_parallel_world_size = ( tensor_model_parallel_world_size = (
get_tensor_model_parallel_world_size()) get_tensor_model_parallel_world_size())
@@ -65,11 +66,13 @@ class GPTNeoXAttention(nn.Module):
config.hidden_size, config.hidden_size,
self.head_size, self.head_size,
self.total_num_heads, self.total_num_heads,
bias=self.bias,
linear_method=linear_method, linear_method=linear_method,
) )
self.dense = RowParallelLinear( self.dense = RowParallelLinear(
config.hidden_size, config.hidden_size,
config.hidden_size, config.hidden_size,
bias=self.bias,
linear_method=linear_method, linear_method=linear_method,
) )
scaling = self.head_size**-0.5 scaling = self.head_size**-0.5
@@ -252,7 +255,7 @@ class GPTNeoXForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.embed_out.weight, hidden_states, next_tokens = self.sampler(self.embed_out.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -255,7 +255,7 @@ class InternLMForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -291,7 +291,7 @@ class LlamaForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -287,7 +287,7 @@ class MistralForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -49,7 +49,6 @@ from vllm.model_executor.parallel_utils.parallel_state import (
from vllm.model_executor.sampling_metadata import SamplingMetadata from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.model_executor.weight_utils import (default_weight_loader, from vllm.model_executor.weight_utils import (default_weight_loader,
hf_model_weights_iterator) hf_model_weights_iterator)
from vllm.model_executor.utils import set_weight_attrs
from vllm.sequence import SamplerOutput from vllm.sequence import SamplerOutput
KVCache = Tuple[torch.Tensor, torch.Tensor] KVCache = Tuple[torch.Tensor, torch.Tensor]
@@ -94,30 +93,6 @@ class MixtralMLP(nn.Module):
return current_hidden_states return current_hidden_states
class DummyModule(nn.Module):
def __init__(self) -> None:
super().__init__()
self.w1 = nn.Linear(0, 0, bias=False)
self.w2 = nn.Linear(0, 0, bias=False)
self.w3 = nn.Linear(0, 0, bias=False)
set_weight_attrs(self.w1.weight,
{"weight_loader": self.dummy_weight_loader})
set_weight_attrs(self.w2.weight,
{"weight_loader": self.dummy_weight_loader})
set_weight_attrs(self.w3.weight,
{"weight_loader": self.dummy_weight_loader})
def forward(self, *args, **kwargs) -> None:
raise NotImplementedError()
def dummy_weight_loader(self, *args, **kwargs) -> None: # pylint: disable=unused-argument
# Noop
return
class MixtralMoE(nn.Module): class MixtralMoE(nn.Module):
def __init__( def __init__(
@@ -147,7 +122,7 @@ class MixtralMoE(nn.Module):
config.hidden_size, config.hidden_size,
config.intermediate_size, config.intermediate_size,
linear_method=linear_method) linear_method=linear_method)
if idx in self.expert_indicies else DummyModule() if idx in self.expert_indicies else None
for idx in range(self.num_total_experts) for idx in range(self.num_total_experts)
]) ])
self.gate = ReplicatedLinear(config.hidden_size, self.gate = ReplicatedLinear(config.hidden_size,
@@ -345,7 +320,7 @@ class MixtralModel(nn.Module):
positions: torch.Tensor, positions: torch.Tensor,
kv_caches: List[KVCache], kv_caches: List[KVCache],
input_metadata: InputMetadata, input_metadata: InputMetadata,
) -> SamplerOutput: ) -> torch.Tensor:
hidden_states = self.embed_tokens(input_ids) hidden_states = self.embed_tokens(input_ids)
residual = None residual = None
for i in range(len(self.layers)): for i in range(len(self.layers)):
@@ -386,7 +361,7 @@ class MixtralForCausalLM(nn.Module):
self, self,
hidden_states: Optional[torch.Tensor], hidden_states: Optional[torch.Tensor],
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens
@@ -427,6 +402,10 @@ class MixtralForCausalLM(nn.Module):
# Skip loading extra bias for GPTQ models. # Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict: if name.endswith(".bias") and name not in params_dict:
continue continue
# Skip experts that are not assigned to this worker.
if ("block_sparse_moe.experts." in name
and name not in params_dict):
continue
param = params_dict[name] param = params_dict[name]
weight_loader = getattr(param, "weight_loader", weight_loader = getattr(param, "weight_loader",
default_weight_loader) default_weight_loader)

View File

@@ -276,7 +276,7 @@ class MPTForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -309,7 +309,7 @@ class OPTForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head_weight, hidden_states, next_tokens = self.sampler(self.lm_head_weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -280,7 +280,7 @@ class PhiForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
head = self.lm_head.linear head = self.lm_head.linear
next_tokens = self.sampler(head.weight, hidden_states, next_tokens = self.sampler(head.weight, hidden_states,
sampling_metadata, head.bias) sampling_metadata, head.bias)

View File

@@ -247,7 +247,7 @@ class QWenLMHeadModel(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -286,7 +286,7 @@ class YiForCausalLM(nn.Module):
self, self,
hidden_states: torch.Tensor, hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata, sampling_metadata: SamplingMetadata,
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
next_tokens = self.sampler(self.lm_head.weight, hidden_states, next_tokens = self.sampler(self.lm_head.weight, hidden_states,
sampling_metadata) sampling_metadata)
return next_tokens return next_tokens

View File

@@ -1,6 +1,7 @@
import torch import torch
from vllm.model_executor.parallel_utils.parallel_state import ( from vllm.model_executor.parallel_utils.parallel_state import (
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size, get_tensor_model_parallel_world_size,
get_tensor_model_parallel_group, get_tensor_model_parallel_group,
) )
@@ -45,3 +46,61 @@ def tensor_model_parallel_all_gather(input_, dim=-1):
(world_size * input_size[dim], ) + (world_size * input_size[dim], ) +
input_size[dim + 1:]) input_size[dim + 1:])
return output_tensor return output_tensor
def tensor_model_parallel_gather(input_, dst=0, dim=-1):
"""Gather the input tensor across model parallel group.
NOTE: We assume that the input tensor is on the same device across
all the ranks.
"""
world_size = get_tensor_model_parallel_world_size()
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input_
assert -input_.dim() <= dim < input_.dim(), (
f"Invalid dim ({dim}) for input tensor with shape {input_.size()}")
if dim < 0:
# Convert negative dim to positive.
dim += input_.dim()
# Allocate output tensor.
if get_tensor_model_parallel_rank() == dst:
gather_list = [torch.empty_like(input_) for _ in range(world_size)]
else:
gather_list = None
# Gather.
torch.distributed.gather(input_,
gather_list,
dst=dst,
group=get_tensor_model_parallel_group())
if get_tensor_model_parallel_rank() == dst:
output_tensor = torch.cat(gather_list, dim=dim)
else:
output_tensor = None
return output_tensor
def broadcast(input_, src=0):
"""Broadcast the input tensor."""
world_size = torch.distributed.get_world_size()
assert 0 <= src < world_size, f"Invalid src rank ({src})"
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input_
# Broadcast.
torch.distributed.broadcast(input_, src=src)
return input_
def broadcast_object_list(obj_list, src=0):
"""Broadcast the input object list."""
world_size = torch.distributed.get_world_size()
assert 0 <= src < world_size, f"Invalid src rank ({src})"
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return obj_list
# Broadcast.
torch.distributed.broadcast_object_list(obj_list, src=src)
return obj_list

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, List, Tuple from typing import Dict, List, Optional, Tuple
import torch import torch
@@ -18,24 +18,29 @@ class SamplingMetadata:
seq_data: Seq_id -> SequenceData. seq_data: Seq_id -> SequenceData.
prompt_lens: Lengths of prompts. prompt_lens: Lengths of prompts.
selected_token_indices: Token indices selected for sampling. selected_token_indices: Token indices selected for sampling.
categorized_sample_indices: SamplingType -> token indicies to sample. categorized_sample_indices: SamplingType -> token indices to sample.
perform_sampling: Whether to perform sampling. This option is used to
make the sampling only happens in the driver worker, and disable
sampling in other worker processes.
""" """
def __init__( def __init__(
self, self,
seq_groups: List[Tuple[List[int], SamplingParams]], seq_groups: Optional[List[Tuple[List[int], SamplingParams]]],
seq_data: Dict[int, SequenceData], seq_data: Optional[Dict[int, SequenceData]],
prompt_lens: List[int], prompt_lens: Optional[List[int]],
selected_token_indices: torch.Tensor, selected_token_indices: torch.Tensor,
categorized_sample_indices: Dict[SamplingType, torch.Tensor], categorized_sample_indices: Optional[Dict[SamplingType, torch.Tensor]],
perform_sampling: bool = True,
) -> None: ) -> None:
self.seq_groups = seq_groups self.seq_groups = seq_groups
self.seq_data = seq_data self.seq_data = seq_data
self.prompt_lens = prompt_lens self.prompt_lens = prompt_lens
self.selected_token_indices = selected_token_indices self.selected_token_indices = selected_token_indices
self.categorized_sample_indices = categorized_sample_indices self.categorized_sample_indices = categorized_sample_indices
self.perform_sampling = perform_sampling
self.num_prompts = len(prompt_lens) self.num_prompts = len(prompt_lens) if prompt_lens is not None else 0
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
@@ -44,7 +49,8 @@ class SamplingMetadata:
f"seq_data={self.seq_data}, " f"seq_data={self.seq_data}, "
f"prompt_lens={self.prompt_lens}, " f"prompt_lens={self.prompt_lens}, "
f"selected_token_indices={self.selected_token_indices}, " f"selected_token_indices={self.selected_token_indices}, "
f"categorized_sample_indices={self.categorized_sample_indices})") f"categorized_sample_indices={self.categorized_sample_indices}), "
f"perform_sampling={self.perform_sampling})")
@dataclass @dataclass

View File

@@ -100,7 +100,7 @@ class SamplingParams:
temperature: float = 1.0, temperature: float = 1.0,
top_p: float = 1.0, top_p: float = 1.0,
top_k: int = -1, top_k: int = -1,
min_p: int = 0.0, min_p: float = 0.0,
use_beam_search: bool = False, use_beam_search: bool = False,
length_penalty: float = 1.0, length_penalty: float = 1.0,
early_stopping: Union[bool, str] = False, early_stopping: Union[bool, str] = False,

View File

@@ -1,7 +1,9 @@
import enum import enum
import os
import socket import socket
import uuid import uuid
from platform import uname from platform import uname
from typing import List
import psutil import psutil
import torch import torch
@@ -55,7 +57,15 @@ def in_wsl() -> bool:
return "microsoft" in " ".join(uname()).lower() return "microsoft" in " ".join(uname()).lower()
def get_open_port(): def get_ip() -> str:
return socket.gethostbyname(socket.gethostname())
def get_open_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) s.bind(("", 0))
return s.getsockname()[1] return s.getsockname()[1]
def set_cuda_visible_devices(device_ids: List[int]) -> None:
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, device_ids))

View File

@@ -1,5 +1,5 @@
import time import time
from typing import Dict, List, Tuple, Union from typing import Dict, List, Optional, Tuple, Union
import numpy as np import numpy as np
import torch import torch
@@ -8,8 +8,11 @@ import torch.nn as nn
from vllm.config import ModelConfig, ParallelConfig, SchedulerConfig from vllm.config import ModelConfig, ParallelConfig, SchedulerConfig
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.model_executor import get_model, InputMetadata, SamplingMetadata from vllm.model_executor import get_model, InputMetadata, SamplingMetadata
from vllm.model_executor.parallel_utils.communication_op import (
broadcast, broadcast_object_list)
from vllm.sampling_params import SamplingParams, SamplingType from vllm.sampling_params import SamplingParams, SamplingType
from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata
from vllm.utils import in_wsl
logger = init_logger(__name__) logger = init_logger(__name__)
@@ -27,10 +30,12 @@ class ModelRunner:
model_config: ModelConfig, model_config: ModelConfig,
parallel_config: ParallelConfig, parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig, scheduler_config: SchedulerConfig,
is_driver_worker: bool = False,
): ):
self.model_config = model_config self.model_config = model_config
self.parallel_config = parallel_config self.parallel_config = parallel_config
self.scheduler_config = scheduler_config self.scheduler_config = scheduler_config
self.is_driver_worker = is_driver_worker
# model_config can be None in tests/samplers/test_sampler.py. # model_config can be None in tests/samplers/test_sampler.py.
# FIXME(woosuk): This is a hack to make the tests work. Refactor this. # FIXME(woosuk): This is a hack to make the tests work. Refactor this.
@@ -52,6 +57,8 @@ class ModelRunner:
# The shape of the cached block table will be # The shape of the cached block table will be
# (max batch size to capture, max context len to capture / block size). # (max batch size to capture, max context len to capture / block size).
self.graph_block_tables = None # Set after initial profiling. self.graph_block_tables = None # Set after initial profiling.
# cache in_wsl result
self.in_wsl = in_wsl()
def load_model(self) -> None: def load_model(self) -> None:
self.model = get_model(self.model_config) self.model = get_model(self.model_config)
@@ -67,7 +74,7 @@ class ModelRunner:
def _prepare_prompt( def _prepare_prompt(
self, self,
seq_group_metadata_list: List[SequenceGroupMetadata], seq_group_metadata_list: List[SequenceGroupMetadata],
) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata]: ) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata, List[int]]:
assert len(seq_group_metadata_list) > 0 assert len(seq_group_metadata_list) > 0
input_tokens: List[List[int]] = [] input_tokens: List[List[int]] = []
input_positions: List[List[int]] = [] input_positions: List[List[int]] = []
@@ -132,14 +139,14 @@ class ModelRunner:
dtype=torch.long) dtype=torch.long)
input_metadata = InputMetadata( input_metadata = InputMetadata(
prompt_lens=prompt_lens, is_prompt=True,
slot_mapping=slot_mapping, slot_mapping=slot_mapping,
max_context_len=None, max_context_len=None,
context_lens=None, context_lens=None,
block_tables=None, block_tables=None,
use_cuda_graph=False, use_cuda_graph=False,
) )
return input_tokens, input_positions, input_metadata return input_tokens, input_positions, input_metadata, prompt_lens
def _prepare_decode( def _prepare_decode(
self, self,
@@ -200,27 +207,24 @@ class ModelRunner:
block_tables.append([]) block_tables.append([])
batch_size = graph_batch_size batch_size = graph_batch_size
# When using CUDA graph, we don't need to make the tensors on the GPU
# because they will be eventually copied to the designated GPU buffer.
device = "cpu" if use_captured_graph else "cuda"
input_tokens = _make_tensor_with_pad(input_tokens, input_tokens = _make_tensor_with_pad(input_tokens,
max_len=1, max_len=1,
pad=0, pad=0,
dtype=torch.long, dtype=torch.long,
device=device) device="cuda")
input_positions = _make_tensor_with_pad(input_positions, input_positions = _make_tensor_with_pad(input_positions,
max_len=1, max_len=1,
pad=0, pad=0,
dtype=torch.long, dtype=torch.long,
device=device) device="cuda")
slot_mapping = _make_tensor_with_pad(slot_mapping, slot_mapping = _make_tensor_with_pad(slot_mapping,
max_len=1, max_len=1,
pad=_PAD_SLOT_ID, pad=_PAD_SLOT_ID,
dtype=torch.long, dtype=torch.long,
device=device) device="cuda")
context_lens = torch.tensor(context_lens, context_lens = torch.tensor(context_lens,
dtype=torch.int, dtype=torch.int,
device=device) device="cuda")
if use_captured_graph: if use_captured_graph:
# The shape of graph_block_tables is # The shape of graph_block_tables is
@@ -229,17 +233,18 @@ class ModelRunner:
for i, block_table in enumerate(block_tables): for i, block_table in enumerate(block_tables):
if block_table: if block_table:
input_block_tables[i, :len(block_table)] = block_table input_block_tables[i, :len(block_table)] = block_table
block_tables = torch.from_numpy(input_block_tables).to(device) block_tables = torch.tensor(input_block_tables, device="cuda")
else: else:
block_tables = _make_tensor_with_pad( block_tables = _make_tensor_with_pad(
block_tables, block_tables,
max_len=max_context_len, max_len=max_context_len,
pad=0, pad=0,
dtype=torch.int, dtype=torch.int,
device="cuda",
) )
input_metadata = InputMetadata( input_metadata = InputMetadata(
prompt_lens=[], is_prompt=False,
slot_mapping=slot_mapping, slot_mapping=slot_mapping,
max_context_len=max_context_len, max_context_len=max_context_len,
context_lens=context_lens, context_lens=context_lens,
@@ -297,11 +302,11 @@ class ModelRunner:
categorized_sample_indices_start_idx + num_seqs)) categorized_sample_indices_start_idx + num_seqs))
categorized_sample_indices_start_idx += num_seqs categorized_sample_indices_start_idx += num_seqs
selected_token_indices = torch.tensor(selected_token_indices, selected_token_indices = _async_h2d(selected_token_indices,
dtype=torch.long, dtype=torch.long,
device="cuda") pin_memory=not self.in_wsl)
categorized_sample_indices = { categorized_sample_indices = {
t: torch.tensor(seq_ids, dtype=torch.int, device="cuda") t: _async_h2d(seq_ids, dtype=torch.int, pin_memory=not self.in_wsl)
for t, seq_ids in categorized_sample_indices.items() for t, seq_ids in categorized_sample_indices.items()
} }
@@ -318,25 +323,127 @@ class ModelRunner:
) )
return sampling_metadata return sampling_metadata
def prepare_input_tensors(
self,
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
) -> Tuple[torch.Tensor, torch.Tensor, InputMetadata, SamplingMetadata]:
if self.is_driver_worker:
# NOTE: We assume that all sequences in the group are all prompts or
# all decodes.
is_prompt = seq_group_metadata_list[0].is_prompt
# Prepare input tensors.
if is_prompt:
(input_tokens, input_positions, input_metadata,
prompt_lens) = self._prepare_prompt(seq_group_metadata_list)
else:
(input_tokens, input_positions, input_metadata
) = self._prepare_decode(seq_group_metadata_list)
prompt_lens = []
sampling_metadata = self._prepare_sample(seq_group_metadata_list,
prompt_lens)
def get_size_or_none(x: Optional[torch.Tensor]):
return x.size() if x is not None else None
# Broadcast the input data. For input tensors, we first broadcast
# its shape and then broadcast the tensor to avoid high
# serialization cost.
py_data = {
"input_tokens_size":
input_tokens.size(),
"input_positions_size":
input_positions.size(),
"is_prompt":
input_metadata.is_prompt,
"slot_mapping_size":
get_size_or_none(input_metadata.slot_mapping),
"max_context_len":
input_metadata.max_context_len,
"context_lens_size":
get_size_or_none(input_metadata.context_lens),
"block_tables_size":
get_size_or_none(input_metadata.block_tables),
"use_cuda_graph":
input_metadata.use_cuda_graph,
"selected_token_indices_size":
sampling_metadata.selected_token_indices.size(),
}
broadcast_object_list([py_data], src=0)
# TODO(zhuohan): Combine the broadcasts or set async_op=True.
broadcast(input_tokens, src=0)
broadcast(input_positions, src=0)
if input_metadata.slot_mapping is not None:
broadcast(input_metadata.slot_mapping, src=0)
if input_metadata.context_lens is not None:
broadcast(input_metadata.context_lens, src=0)
if input_metadata.block_tables is not None:
broadcast(input_metadata.block_tables, src=0)
broadcast(sampling_metadata.selected_token_indices, src=0)
else:
receving_list = [None]
broadcast_object_list(receving_list, src=0)
py_data = receving_list[0]
input_tokens = torch.empty(*py_data["input_tokens_size"],
dtype=torch.long,
device="cuda")
broadcast(input_tokens, src=0)
input_positions = torch.empty(*py_data["input_positions_size"],
dtype=torch.long,
device="cuda")
broadcast(input_positions, src=0)
if py_data["slot_mapping_size"] is not None:
slot_mapping = torch.empty(*py_data["slot_mapping_size"],
dtype=torch.long,
device="cuda")
broadcast(slot_mapping, src=0)
else:
slot_mapping = None
if py_data["context_lens_size"] is not None:
context_lens = torch.empty(*py_data["context_lens_size"],
dtype=torch.int,
device="cuda")
broadcast(context_lens, src=0)
else:
context_lens = None
if py_data["block_tables_size"] is not None:
block_tables = torch.empty(*py_data["block_tables_size"],
dtype=torch.int,
device="cuda")
broadcast(block_tables, src=0)
else:
block_tables = None
selected_token_indices = torch.empty(
*py_data["selected_token_indices_size"],
dtype=torch.long,
device="cuda")
broadcast(selected_token_indices, src=0)
input_metadata = InputMetadata(
is_prompt=py_data["is_prompt"],
slot_mapping=slot_mapping,
max_context_len=py_data["max_context_len"],
context_lens=context_lens,
block_tables=block_tables,
use_cuda_graph=py_data["use_cuda_graph"],
)
sampling_metadata = SamplingMetadata(
seq_groups=None,
seq_data=None,
prompt_lens=None,
selected_token_indices=selected_token_indices,
categorized_sample_indices=None,
perform_sampling=False,
)
return input_tokens, input_positions, input_metadata, sampling_metadata
@torch.inference_mode() @torch.inference_mode()
def execute_model( def execute_model(
self, self,
seq_group_metadata_list: List[SequenceGroupMetadata], seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
kv_caches: List[Tuple[torch.Tensor, torch.Tensor]], kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
) -> SamplerOutput: ) -> Optional[SamplerOutput]:
# NOTE: We assume that all sequences in the group are all prompts or input_tokens, input_positions, input_metadata, sampling_metadata = (
# all decodes. self.prepare_input_tensors(seq_group_metadata_list))
is_prompt = seq_group_metadata_list[0].is_prompt
# Prepare input tensors.
if is_prompt:
inputs = self._prepare_prompt(seq_group_metadata_list)
input_tokens, input_positions, input_metadata = inputs
else:
inputs = self._prepare_decode(seq_group_metadata_list)
input_tokens, input_positions, input_metadata = inputs
sampling_metadata = self._prepare_sample(seq_group_metadata_list,
input_metadata.prompt_lens)
# Execute the model. # Execute the model.
if input_metadata.use_cuda_graph: if input_metadata.use_cuda_graph:
graph_batch_size = input_tokens.shape[0] graph_batch_size = input_tokens.shape[0]
@@ -395,6 +502,9 @@ class ModelRunner:
"unexpected consequences if the model is not static. To " "unexpected consequences if the model is not static. To "
"run the model in eager mode, set 'enforce_eager=True' or " "run the model in eager mode, set 'enforce_eager=True' or "
"use '--enforce-eager' in the CLI.") "use '--enforce-eager' in the CLI.")
logger.info("CUDA graphs can take additional 1~3 GiB memory per GPU. "
"If you are running out of memory, consider decreasing "
"`gpu_memory_utilization` or enforcing eager mode.")
start_time = time.perf_counter() start_time = time.perf_counter()
# Prepare dummy inputs. These will be reused for all batch sizes. # Prepare dummy inputs. These will be reused for all batch sizes.
@@ -412,7 +522,7 @@ class ModelRunner:
for batch_size in reversed(_BATCH_SIZES_TO_CAPTURE): for batch_size in reversed(_BATCH_SIZES_TO_CAPTURE):
# Create dummy input_metadata. # Create dummy input_metadata.
input_metadata = InputMetadata( input_metadata = InputMetadata(
prompt_lens=[], is_prompt=False,
slot_mapping=slot_mapping[:batch_size], slot_mapping=slot_mapping[:batch_size],
max_context_len=self.max_context_len_to_capture, max_context_len=self.max_context_len_to_capture,
context_lens=context_lens[:batch_size], context_lens=context_lens[:batch_size],
@@ -499,11 +609,14 @@ class CUDAGraphRunner:
del kv_caches del kv_caches
# Copy the input tensors to the input buffers. # Copy the input tensors to the input buffers.
self.input_buffers["input_ids"].copy_(input_ids) self.input_buffers["input_ids"].copy_(input_ids, non_blocking=True)
self.input_buffers["positions"].copy_(positions) self.input_buffers["positions"].copy_(positions, non_blocking=True)
self.input_buffers["slot_mapping"].copy_(input_metadata.slot_mapping) self.input_buffers["slot_mapping"].copy_(input_metadata.slot_mapping,
self.input_buffers["context_lens"].copy_(input_metadata.context_lens) non_blocking=True)
self.input_buffers["block_tables"].copy_(input_metadata.block_tables) self.input_buffers["context_lens"].copy_(input_metadata.context_lens,
non_blocking=True)
self.input_buffers["block_tables"].copy_(input_metadata.block_tables,
non_blocking=True)
# Run the graph. # Run the graph.
self.graph.replay() self.graph.replay()
@@ -526,9 +639,13 @@ def _make_tensor_with_pad(
pad: int, pad: int,
dtype: torch.dtype, dtype: torch.dtype,
device: Union[str, torch.device] = "cuda", device: Union[str, torch.device] = "cuda",
pin_memory: bool = False,
) -> torch.Tensor: ) -> torch.Tensor:
padded_x = [_pad_to_max(x_i, max_len, pad) for x_i in x] padded_x = [_pad_to_max(x_i, max_len, pad) for x_i in x]
return torch.tensor(padded_x, dtype=dtype, device=device) return torch.tensor(padded_x,
dtype=dtype,
device=device,
pin_memory=pin_memory and str(device) == "cpu")
def _get_graph_batch_size(batch_size: int) -> int: def _get_graph_batch_size(batch_size: int) -> int:
@@ -538,3 +655,8 @@ def _get_graph_batch_size(batch_size: int) -> int:
return 4 return 4
else: else:
return (batch_size + 7) // 8 * 8 return (batch_size + 7) // 8 * 8
def _async_h2d(data: list, dtype, pin_memory):
t = torch.tensor(data, dtype=dtype, pin_memory=pin_memory)
return t.to(device="cuda", non_blocking=True)

View File

@@ -8,6 +8,8 @@ import torch.distributed
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig, from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
SchedulerConfig) SchedulerConfig)
from vllm.model_executor import set_random_seed from vllm.model_executor import set_random_seed
from vllm.model_executor.parallel_utils.communication_op import (
broadcast_object_list)
from vllm.model_executor.parallel_utils.parallel_state import ( from vllm.model_executor.parallel_utils.parallel_state import (
initialize_model_parallel) initialize_model_parallel)
from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.sequence import SamplerOutput, SequenceGroupMetadata
@@ -28,17 +30,23 @@ class Worker:
model_config: ModelConfig, model_config: ModelConfig,
parallel_config: ParallelConfig, parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig, scheduler_config: SchedulerConfig,
rank: Optional[int] = None, local_rank: int,
distributed_init_method: Optional[str] = None, rank: int,
distributed_init_method: str,
is_driver_worker: bool = False,
) -> None: ) -> None:
self.model_config = model_config self.model_config = model_config
self.parallel_config = parallel_config self.parallel_config = parallel_config
self.scheduler_config = scheduler_config self.scheduler_config = scheduler_config
self.local_rank = local_rank
self.rank = rank self.rank = rank
self.distributed_init_method = distributed_init_method self.distributed_init_method = distributed_init_method
self.is_driver_worker = is_driver_worker
if self.is_driver_worker:
assert self.rank == 0, "The driver worker must have rank 0."
self.model_runner = ModelRunner(model_config, parallel_config, self.model_runner = ModelRunner(model_config, parallel_config,
scheduler_config) scheduler_config, is_driver_worker)
# Uninitialized cache engine. Will be initialized by # Uninitialized cache engine. Will be initialized by
# self.init_cache_engine(). # self.init_cache_engine().
self.cache_config = None self.cache_config = None
@@ -57,13 +65,7 @@ class Worker:
# This env var set by Ray causes exceptions with graph building. # This env var set by Ray causes exceptions with graph building.
os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)
# Env vars will be set by Ray. self.device = torch.device(f"cuda:{self.local_rank}")
self.rank = self.rank if self.rank is not None else int(
os.getenv("RANK", "-1"))
local_rank = int(os.getenv("LOCAL_RANK", "0"))
self.device = torch.device(f"cuda:{local_rank}")
if self.rank < 0:
raise ValueError("Invalid or unspecified rank.")
torch.cuda.set_device(self.device) torch.cuda.set_device(self.device)
_check_if_gpu_supports_dtype(self.model_config.dtype) _check_if_gpu_supports_dtype(self.model_config.dtype)
@@ -125,14 +127,12 @@ class Worker:
# the model initialization and profiling. # the model initialization and profiling.
set_random_seed(self.model_config.seed) set_random_seed(self.model_config.seed)
@torch.inference_mode() def cache_swap(
def execute_model(
self, self,
seq_group_metadata_list: List[SequenceGroupMetadata],
blocks_to_swap_in: Dict[int, int], blocks_to_swap_in: Dict[int, int],
blocks_to_swap_out: Dict[int, int], blocks_to_swap_out: Dict[int, int],
blocks_to_copy: Dict[int, List[int]], blocks_to_copy: Dict[int, List[int]],
) -> SamplerOutput: ) -> None:
# Issue cache operations. # Issue cache operations.
issued_cache_op = False issued_cache_op = False
if blocks_to_swap_in: if blocks_to_swap_in:
@@ -152,8 +152,38 @@ class Worker:
if cache_events is not None: if cache_events is not None:
for event in cache_events: for event in cache_events:
event.wait() event.wait()
@torch.inference_mode()
def execute_model(
self,
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None,
blocks_to_swap_in: Optional[Dict[int, int]] = None,
blocks_to_swap_out: Optional[Dict[int, int]] = None,
blocks_to_copy: Optional[Dict[int, List[int]]] = None,
) -> Optional[SamplerOutput]:
if self.is_driver_worker:
assert seq_group_metadata_list is not None
num_seq_groups = len(seq_group_metadata_list)
assert blocks_to_swap_in is not None
assert blocks_to_swap_out is not None
assert blocks_to_copy is not None
block_swapping_info = [
blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy
]
broadcast_object_list([num_seq_groups] + block_swapping_info,
src=0)
else:
# num_seq_groups, blocks_to_swap_in, blocks_to_swap_out,
# blocks_to_copy (4 elements)
recv_data = [None] * 4
broadcast_object_list(recv_data, src=0)
num_seq_groups = recv_data[0]
block_swapping_info = recv_data[1:]
self.cache_swap(*block_swapping_info)
# If there is no input, we don't need to execute the model. # If there is no input, we don't need to execute the model.
if not seq_group_metadata_list: if num_seq_groups == 0:
return {} return {}
output = self.model_runner.execute_model(seq_group_metadata_list, output = self.model_runner.execute_model(seq_group_metadata_list,