[Cleanup] Move scheduler get_routed_experts logic to separate method (#32706)
Signed-off-by: Nick Hill <nickhill123@gmail.com>
This commit is contained in:
@@ -1197,28 +1197,7 @@ class Scheduler(SchedulerInterface):
|
|||||||
|
|
||||||
routed_experts = None
|
routed_experts = None
|
||||||
if stopped:
|
if stopped:
|
||||||
if self.vllm_config.model_config.enable_return_routed_experts:
|
routed_experts = self._get_routed_experts(request)
|
||||||
kv_blocks = self.kv_cache_manager.get_blocks(request.request_id)
|
|
||||||
block_ids = kv_blocks.get_block_ids()[0]
|
|
||||||
num_tokens = request.num_tokens - 1
|
|
||||||
|
|
||||||
# compute slot mapping
|
|
||||||
block_ids_array = np.array(block_ids, dtype=np.int32)
|
|
||||||
num_blocks = len(block_ids)
|
|
||||||
block_size = self.block_size
|
|
||||||
|
|
||||||
# generate block offsets
|
|
||||||
block_offsets = np.arange(0, block_size)
|
|
||||||
|
|
||||||
# compute slot mapping: slot = block_id * block_size + offset
|
|
||||||
slot_mapping = (
|
|
||||||
block_offsets.reshape((1, block_size))
|
|
||||||
+ block_ids_array.reshape((num_blocks, 1)) * block_size
|
|
||||||
).flatten()[:num_tokens]
|
|
||||||
|
|
||||||
routed_experts = self.routed_experts_reader.get_routed_experts(
|
|
||||||
indices=slot_mapping
|
|
||||||
)
|
|
||||||
kv_transfer_params = self._free_request(request)
|
kv_transfer_params = self._free_request(request)
|
||||||
if status_before_stop == RequestStatus.RUNNING:
|
if status_before_stop == RequestStatus.RUNNING:
|
||||||
stopped_running_reqs.add(request)
|
stopped_running_reqs.add(request)
|
||||||
@@ -1250,7 +1229,12 @@ class Scheduler(SchedulerInterface):
|
|||||||
|
|
||||||
# Get prompt logprobs for this request.
|
# Get prompt logprobs for this request.
|
||||||
prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
|
prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
|
||||||
if new_token_ids or pooler_output is not None or kv_transfer_params:
|
if (
|
||||||
|
new_token_ids
|
||||||
|
or pooler_output is not None
|
||||||
|
or kv_transfer_params
|
||||||
|
or stopped
|
||||||
|
):
|
||||||
# Add EngineCoreOutput for this Request.
|
# Add EngineCoreOutput for this Request.
|
||||||
outputs[request.client_index].append(
|
outputs[request.client_index].append(
|
||||||
EngineCoreOutput(
|
EngineCoreOutput(
|
||||||
@@ -1351,6 +1335,30 @@ class Scheduler(SchedulerInterface):
|
|||||||
|
|
||||||
return engine_core_outputs
|
return engine_core_outputs
|
||||||
|
|
||||||
|
def _get_routed_experts(self, request: Request) -> np.ndarray | None:
|
||||||
|
if not self.vllm_config.model_config.enable_return_routed_experts:
|
||||||
|
return None
|
||||||
|
|
||||||
|
kv_blocks = self.kv_cache_manager.get_blocks(request.request_id)
|
||||||
|
block_ids = kv_blocks.get_block_ids()[0]
|
||||||
|
num_tokens = request.num_tokens - 1
|
||||||
|
|
||||||
|
# compute slot mapping
|
||||||
|
block_ids_array = np.array(block_ids, dtype=np.int32)
|
||||||
|
num_blocks = len(block_ids)
|
||||||
|
block_size = self.block_size
|
||||||
|
|
||||||
|
# generate block offsets
|
||||||
|
block_offsets = np.arange(0, block_size)
|
||||||
|
|
||||||
|
# compute slot mapping: slot = block_id * block_size + offset
|
||||||
|
slot_mapping = (
|
||||||
|
block_offsets.reshape((1, block_size))
|
||||||
|
+ block_ids_array.reshape((num_blocks, 1)) * block_size
|
||||||
|
).flatten()[:num_tokens]
|
||||||
|
|
||||||
|
return self.routed_experts_reader.get_routed_experts(indices=slot_mapping)
|
||||||
|
|
||||||
def _update_request_with_output(
|
def _update_request_with_output(
|
||||||
self, request: Request, new_token_ids: list[int]
|
self, request: Request, new_token_ids: list[int]
|
||||||
) -> tuple[list[int], bool]:
|
) -> tuple[list[int], bool]:
|
||||||
|
|||||||
Reference in New Issue
Block a user