|
|
|
|
@@ -1,21 +1,16 @@
|
|
|
|
|
import enum
|
|
|
|
|
import os
|
|
|
|
|
import pickle
|
|
|
|
|
import time
|
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
|
|
|
|
from cacheflow.core.block_manager import BlockSpaceManager
|
|
|
|
|
from cacheflow.logger import init_logger
|
|
|
|
|
from cacheflow.core.policy import PolicyFactory
|
|
|
|
|
from cacheflow.sampling_params import SamplingParams
|
|
|
|
|
from cacheflow.sequence import Sequence
|
|
|
|
|
from cacheflow.sequence import SequenceGroup
|
|
|
|
|
from cacheflow.sequence import SequenceGroupMetadata
|
|
|
|
|
from cacheflow.sequence import SequenceOutputs
|
|
|
|
|
from cacheflow.sequence import SequenceStatus
|
|
|
|
|
|
|
|
|
|
from cacheflow.sequence import (Sequence, SequenceGroup, SequenceGroupMetadata,
|
|
|
|
|
SequenceOutputs, SequenceStatus)
|
|
|
|
|
|
|
|
|
|
logger = init_logger(__name__)
|
|
|
|
|
|
|
|
|
|
_LOGGING_INTERVAL_SEC = 10
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -129,7 +124,6 @@ class Scheduler:
|
|
|
|
|
|
|
|
|
|
# Swap in the sequence groups in the SWAPPED state if possible.
|
|
|
|
|
self.swapped = self.policy.sort_by_priority(now, self.swapped)
|
|
|
|
|
# FCFS
|
|
|
|
|
while self.swapped and not blocks_to_swap_out:
|
|
|
|
|
seq_group = self.swapped[0]
|
|
|
|
|
# If the sequence group has been preempted in this step, stop.
|
|
|
|
|
@@ -162,7 +156,9 @@ class Scheduler:
|
|
|
|
|
# This is because we want to bound the amount of CPU memory taken by
|
|
|
|
|
# the swapped sequence groups.
|
|
|
|
|
if not self.swapped:
|
|
|
|
|
self.waiting = self.policy.sort_by_priority(now, self.waiting)
|
|
|
|
|
# Optimization: We do not sort the waiting queue since the preempted
|
|
|
|
|
# sequence groups are added to the front and the new sequence groups
|
|
|
|
|
# are added to the back.
|
|
|
|
|
while self.waiting:
|
|
|
|
|
seq_group = self.waiting[0]
|
|
|
|
|
# If the sequence group has been preempted in this step, stop.
|
|
|
|
|
@@ -347,7 +343,6 @@ class Scheduler:
|
|
|
|
|
self.block_manager.allocate(seq_group)
|
|
|
|
|
for seq in seq_group.seqs:
|
|
|
|
|
seq.status = SequenceStatus.RUNNING
|
|
|
|
|
# FIXME(woosuk): Support interactive generation.
|
|
|
|
|
if seq_group.group_id not in self.num_steps:
|
|
|
|
|
self.num_steps[seq_group.group_id] = 0
|
|
|
|
|
|
|
|
|
|
@@ -404,7 +399,9 @@ class Scheduler:
|
|
|
|
|
for seq in seqs:
|
|
|
|
|
seq.status = SequenceStatus.WAITING
|
|
|
|
|
self.block_manager.free(seq)
|
|
|
|
|
self.waiting.append(seq_group)
|
|
|
|
|
# NOTE: For FCFS, we insert the preempted sequence group to the front
|
|
|
|
|
# of the waiting queue.
|
|
|
|
|
self.waiting.insert(0, seq_group)
|
|
|
|
|
|
|
|
|
|
def _preempt_by_swap(
|
|
|
|
|
self,
|
|
|
|
|
|