[Frontend] Implement robust video frame recovery for corrupted videos (#29197)

Signed-off-by: cmartinez <cmartinez@roblox.com>
Signed-off-by: vSeamar <cmartinez@roblox.com>
This commit is contained in:
vSeamar
2026-01-06 17:13:24 -08:00
committed by GitHub
parent 364a8bc6dc
commit 6f351548b2
4 changed files with 421 additions and 15 deletions

View File

@@ -689,6 +689,31 @@ Full example: [examples/online_serving/openai_chat_completion_client_for_multimo
export VLLM_VIDEO_FETCH_TIMEOUT=<timeout>
```
#### Video Frame Recovery
For improved robustness when processing potentially corrupted or truncated video files, vLLM supports optional frame recovery using a dynamic window forward-scan approach. When enabled, if a target frame fails to load during sequential reading, the next successfully grabbed frame (before the next target frame) will be used in its place.
To enable video frame recovery, pass the `frame_recovery` parameter via `--media-io-kwargs`:
```bash
# Example: Enable frame recovery
vllm serve Qwen/Qwen3-VL-30B-A3B-Instruct \
--media-io-kwargs '{"video": {"frame_recovery": true}}'
```
**Parameters:**
- `frame_recovery`: Boolean flag to enable forward-scan recovery. When `true`, failed frames are recovered using the next available frame within the dynamic window (up to the next target frame). Default is `false`.
**How it works:**
1. The system reads frames sequentially
2. If a target frame fails to grab, it's marked as "failed"
3. The next successfully grabbed frame (before reaching the next target) is used to recover the failed frame
4. This approach handles both mid-video corruption and end-of-video truncation
Works with common video formats like MP4 when using OpenCV backends.
#### Custom RGBA Background Color
To use a custom background color for RGBA images, pass the `rgba_background_color` parameter via `--media-io-kwargs`:

View File

@@ -299,3 +299,212 @@ def test_video_media_io_backend_env_var_fallback(monkeypatch: pytest.MonkeyPatch
frames_missing, metadata_missing = videoio_missing.load_bytes(b"test")
np.testing.assert_array_equal(frames_missing, FAKE_OUTPUT_2)
assert metadata_missing["video_backend"] == "test_video_backend_override_2"
# ============================================================================
# Frame Recovery Tests
# ============================================================================
def test_video_recovery_simulated_failures(monkeypatch: pytest.MonkeyPatch):
"""
Test that frame recovery correctly uses the next valid frame when
target frames fail to load.
Uses corrupted.mp4 and mocks VideoCapture.grab() to fail on specific
frame indices (in addition to the real corruption at frame 17), then
verifies recovery produces more frames.
"""
import cv2
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv")
# Load corrupted.mp4 (26 frames, frame 17 is genuinely corrupted)
video_path = ASSETS_DIR / "corrupted.mp4"
with open(video_path, "rb") as f:
video_data = f.read()
# Simulate additional failures on frames 3 and 10
# (in addition to the real corruption at frame 17)
fail_on_frames = {3, 10}
# Store original VideoCapture class
original_video_capture = cv2.VideoCapture
class MockVideoCapture:
"""Wrapper that simulates grab() failures on specific frames."""
def __init__(self, *args, **kwargs):
self._cap = original_video_capture(*args, **kwargs)
self._current_frame = -1
def grab(self):
self._current_frame += 1
if self._current_frame in fail_on_frames:
return False # Simulate failure
return self._cap.grab()
def retrieve(self):
return self._cap.retrieve()
def get(self, prop):
return self._cap.get(prop)
def isOpened(self):
return self._cap.isOpened()
def release(self):
return self._cap.release()
# Patch cv2.VideoCapture
m.setattr(cv2, "VideoCapture", MockVideoCapture)
loader = VIDEO_LOADER_REGISTRY.load("opencv")
# Use num_frames=8 which samples: [0, 3, 7, 10, 14, 17, 21, 25]
# Frame 3: mocked failure, recovery window [3, 7) -> use frame 4
# Frame 10: mocked failure, recovery window [10, 14) -> use frame 11
# Frame 17: real corruption, recovery window [17, 21) -> use frame 18
# Test WITHOUT recovery - should have fewer frames due to failures
frames_no_recovery, meta_no = loader.load_bytes(
video_data, num_frames=8, frame_recovery=False
)
# Test WITH recovery - should recover using next valid frames
frames_with_recovery, meta_yes = loader.load_bytes(
video_data, num_frames=8, frame_recovery=True
)
# With recovery should have MORE frames than without
# Without: 5 frames (3, 10, 17 all fail)
# With: 8 frames (all recovered)
assert frames_with_recovery.shape[0] > frames_no_recovery.shape[0], (
f"Recovery should produce more frames. "
f"Without: {frames_no_recovery.shape[0]}, "
f"With: {frames_with_recovery.shape[0]}"
)
# Verify metadata consistency
assert frames_no_recovery.shape[0] == len(meta_no["frames_indices"])
assert frames_with_recovery.shape[0] == len(meta_yes["frames_indices"])
# Verify temporal order is preserved
assert meta_yes["frames_indices"] == sorted(meta_yes["frames_indices"])
def test_video_recovery_with_corrupted_file(monkeypatch: pytest.MonkeyPatch):
"""
Test frame recovery with an actual corrupted video file using sparse sampling.
This test uses corrupted.mp4 which has genuine H.264 codec errors on
frame 17. With num_frames=8, the target frames are [0, 3, 7, 10, 14, 17, 21, 25].
Frame 17 is corrupted but frames 18-20 are readable, so recovery can use
frame 18 to fill in for the failed frame 17.
This test verifies:
1. Without recovery: frame 17 is skipped (7 frames loaded)
2. With recovery: frame 18 fills in for frame 17 (8 frames loaded)
3. Recovery produces MORE frames than without recovery
4. Metadata is consistent with loaded frames
"""
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv")
corrupted_video_path = ASSETS_DIR / "corrupted.mp4"
with open(corrupted_video_path, "rb") as f:
video_data = f.read()
loader = VIDEO_LOADER_REGISTRY.load("opencv")
# Use num_frames=8 which makes frame 17 a target with recovery window [17, 21)
# Target frames: [0, 3, 7, 10, 14, 17, 21, 25]
# Frame 17 is corrupted, but frames 18-20 are readable for recovery
# Test without recovery - frame 17 will be skipped
frames_no_recovery, meta_no_recovery = loader.load_bytes(
video_data, num_frames=8, frame_recovery=False
)
# Test with recovery - frame 18 should fill in for frame 17
frames_with_recovery, meta_with_recovery = loader.load_bytes(
video_data, num_frames=8, frame_recovery=True
)
# Verify metadata consistency for both modes
assert frames_no_recovery.shape[0] == len(meta_no_recovery["frames_indices"]), (
"Frame count must match indices without recovery"
)
assert frames_with_recovery.shape[0] == len(
meta_with_recovery["frames_indices"]
), "Frame count must match indices with recovery"
# KEY ASSERTION: Recovery should produce MORE frames than without recovery
# Without recovery: 7 frames (frame 17 skipped)
# With recovery: 8 frames (frame 18 used for frame 17)
assert frames_with_recovery.shape[0] > frames_no_recovery.shape[0], (
f"Recovery should produce more frames with sparse sampling. "
f"Got {frames_with_recovery.shape[0]} with recovery vs "
f"{frames_no_recovery.shape[0]} without"
)
# Verify we got all 8 requested frames with recovery
assert frames_with_recovery.shape[0] == 8, (
f"With recovery, should load all 8 requested frames. "
f"Got {frames_with_recovery.shape[0]}"
)
# Verify the video metadata is correct
expected_total_frames = 26
assert meta_with_recovery["total_num_frames"] == expected_total_frames, (
f"Expected {expected_total_frames} total frames in metadata"
)
def test_video_recovery_dynamic_backend(monkeypatch: pytest.MonkeyPatch):
"""
Test that frame_recovery works with the dynamic video backend.
The dynamic backend samples frames based on fps/duration rather than
loading all frames. This test verifies recovery works in that context.
"""
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv_dynamic")
corrupted_video_path = ASSETS_DIR / "corrupted.mp4"
with open(corrupted_video_path, "rb") as f:
video_data = f.read()
loader = VIDEO_LOADER_REGISTRY.load("opencv_dynamic")
# Test without recovery
frames_no_recovery, meta_no = loader.load_bytes(
video_data, fps=2, max_duration=10, frame_recovery=False
)
# Test with frame_recovery enabled
frames_with_recovery, meta_with = loader.load_bytes(
video_data, fps=2, max_duration=10, frame_recovery=True
)
# Verify basic properties
assert frames_no_recovery.shape[0] > 0, (
"Should load some frames without recovery"
)
assert frames_with_recovery.shape[0] > 0, (
"Should load some frames with recovery"
)
assert "do_sample_frames" in meta_with
assert meta_with["do_sample_frames"] is False # Dynamic backend always False
assert frames_with_recovery.shape[0] == len(meta_with["frames_indices"])
# Key assertion: recovery should help when corrupted frames are sampled
# We expect recovery to produce >= frames than without recovery
assert frames_with_recovery.shape[0] >= frames_no_recovery.shape[0], (
f"Recovery should produce at least as many frames. "
f"Got {frames_with_recovery.shape[0]} with recovery vs "
f"{frames_no_recovery.shape[0]} without"
)

View File

@@ -867,7 +867,7 @@ class RandomMultiModalDataset(RandomDataset):
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = 30 # frames per second
with NamedTemporaryFile(suffix=".mp4", delete_on_close=False) as temp_file:
with NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
temp_path = temp_file.name
# Create video writer

View File

@@ -6,12 +6,15 @@ from abc import abstractmethod
from functools import partial
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
import numpy as np
import numpy.typing as npt
from PIL import Image
if TYPE_CHECKING:
import cv2
from vllm import envs
from vllm.logger import init_logger
from vllm.utils.registry import ExtensionManager
@@ -63,6 +66,127 @@ class VideoLoader:
) -> tuple[npt.NDArray, dict[str, Any]]:
raise NotImplementedError
@staticmethod
def _can_use_for_recovery(
idx: int,
failed_frames: list[int],
next_target_map: dict[int, int],
total_frames: int,
) -> bool:
"""Check if current frame can recover the oldest failed frame."""
if not failed_frames:
return False
oldest_failed = failed_frames[0]
limit = next_target_map.get(oldest_failed, total_frames)
return idx < limit
@staticmethod
def _read_frames_with_recovery(
cap: "cv2.VideoCapture",
frame_indices: list[int],
total_frames: int,
) -> tuple[npt.NDArray, list[int], dict[int, int]]:
"""
Read frames with dynamic window forward-scan recovery.
When a target frame fails to load, the next successfully grabbed
frame (before the next target frame) will be used to recover it.
Args:
cap: OpenCV VideoCapture object
frame_indices: Sorted list of target frame indices to load
total_frames: Total number of frames in the video
Returns:
Tuple of (frames_array, valid_frame_indices, recovered_map)
- frames_array: Array of loaded frames
- valid_frame_indices: List of frame indices that were loaded
- recovered_map: Dict mapping recovered_idx -> source_idx
"""
import cv2
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
assert width > 0 and height > 0, (
f"Invalid video frame size: width={width}, height={height}"
)
frame_idx_set = set(frame_indices)
max_frame_idx = frame_indices[-1] if frame_indices else 0
# Build map: target_idx -> next_target_idx (for recovery window)
next_target_map: dict[int, int] = {}
for k in range(len(frame_indices) - 1):
next_target_map[frame_indices[k]] = frame_indices[k + 1]
next_target_map[frame_indices[-1]] = total_frames
frames_list: list[npt.NDArray] = []
valid_frame_indices: list[int] = []
failed_frames_idx: list[int] = []
recovered_map: dict[int, int] = {}
i = 0
for idx in range(max_frame_idx + 1):
is_target_frame = idx in frame_idx_set
# Attempt to grab the current frame
ok = cap.grab()
if not ok:
if is_target_frame:
logger.warning(
"Failed to grab frame %d during video loading.",
idx,
)
failed_frames_idx.append(idx)
continue
# Check if we should retrieve: target frame OR can recover a failed one
can_recover = VideoLoader._can_use_for_recovery(
idx, failed_frames_idx, next_target_map, total_frames
)
if is_target_frame or can_recover:
ret, frame = cap.retrieve()
if ret and frame is not None and frame.size > 0:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames_list.append(rgb_frame)
valid_frame_indices.append(idx)
i += 1
if can_recover:
recovered_idx = failed_frames_idx.pop(0)
recovered_map[recovered_idx] = idx
logger.info(
"Recovered frame %d using frame %d (delay: %d)",
recovered_idx,
idx,
idx - recovered_idx,
)
elif is_target_frame:
logger.warning(
"Failed to retrieve frame %d during video loading.",
idx,
)
failed_frames_idx.append(idx)
# Log any remaining failed frames
for failed_idx in failed_frames_idx:
logger.warning(
"Frame %d could not be recovered (end of video).",
failed_idx,
)
# Stack frames
if frames_list:
frames = np.stack(frames_list)
else:
frames = np.empty((0, height, width, 3), dtype=np.uint8)
return frames, valid_frame_indices, recovered_map
@staticmethod
def _read_frames(
cap,
@@ -142,8 +266,23 @@ class OpenCVVideoBackend(VideoLoader):
data: bytes,
num_frames: int = -1,
fps: int = -1,
max_duration: int = 300,
frame_recovery: bool = False,
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
"""
Load video frames from bytes.
Args:
data: Raw video bytes
num_frames: Target number of frames to sample (-1 for all)
fps: Target FPS for sampling (-1 for original)
max_duration: Maximum duration (unused in base backend)
frame_recovery: Enable forward-scan recovery for failed frames
Returns:
Tuple of (frames_array, metadata_dict)
"""
import cv2
backend = cls().get_cv2_video_api()
@@ -172,11 +311,22 @@ class OpenCVVideoBackend(VideoLoader):
)
frame_idx = uniform_sampled_frames.tolist()
# Convert to set for O(1) lookup performance
frame_idx_set = set(frame_idx)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap, frame_idx_set, num_frames_to_sample, max(frame_idx)
)
if frame_recovery:
frames, valid_frame_indices, recovered_map = cls._read_frames_with_recovery(
cap, frame_idx, total_frames_num
)
valid_num_frames = len(valid_frame_indices)
if recovered_map:
logger.info(
"Frame recovery: %d frames recovered using forward scan.",
len(recovered_map),
)
else:
frame_idx_set = set(frame_idx)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap, frame_idx_set, num_frames_to_sample, max(frame_idx)
)
# Use transformers transformers.video_utils.VideoMetadata format
# NOTE(Isotr0py): For models like Qwen3-VL/GLM4.5V, this metadata
@@ -204,8 +354,22 @@ class OpenCVDynamicVideoBackend(OpenCVVideoBackend):
num_frames: int = -1,
fps: int = 2,
max_duration: int = 300,
frame_recovery: bool = False,
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
"""
Load video frames with dynamic sampling based on duration.
Args:
data: Raw video bytes
num_frames: Not used in dynamic backend
fps: Target FPS for sampling (default: 2)
max_duration: Maximum video duration to process (default: 300s)
frame_recovery: Enable forward-scan recovery for failed frames
Returns:
Tuple of (frames_array, metadata_dict)
"""
import cv2
backend = cls().get_cv2_video_api()
@@ -245,14 +409,22 @@ class OpenCVDynamicVideoBackend(OpenCVVideoBackend):
}
)
# Convert to set for O(1) lookup performance
frame_indices_set = set(frame_indices_list)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap,
frame_indices_set,
len(frame_indices_list),
total_frames_num - 1,
)
if frame_recovery:
frames, valid_frame_indices, recovered_map = cls._read_frames_with_recovery(
cap, frame_indices_list, total_frames_num
)
valid_num_frames = len(valid_frame_indices)
if recovered_map:
logger.info(
"Frame recovery: %d frames recovered using forward scan.",
len(recovered_map),
)
else:
frame_indices_set = set(frame_indices_list)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap, frame_indices_set, len(frame_indices_list), total_frames_num - 1
)
# Use transformers transformers.video_utils.VideoMetadata format
metadata = {