NVIDIA Model Optimizer branch: nvfp4_experts_only PTQ for DeepSeek V4 Pro

This commit is contained in:
2026-05-07 00:11:31 +00:00
parent c40607053b
commit b32bb2e84d
7 changed files with 214 additions and 1191 deletions

131
README.md
View File

@@ -1,106 +1,75 @@
# DeepSeek V4 Pro → NVFP4 conversion kit
# DeepSeek V4 Pro → NVFP4 via NVIDIA Model Optimizer
Two paths for converting `sgl-project/DeepSeek-V4-Pro-FP8` (the uniform-FP8 repackage of the original mixed-precision V4 Pro) into NVFP4 for Blackwell inference.
Fallback quantization path using NVIDIA's official Model Optimizer (`nvidia-modelopt`) PTQ pipeline.
## Why this branch
Path A (custom streaming FP8→NVFP4) is weight-only W4A16. If it doesn't produce good enough accuracy, NVIDIA's Model Optimizer provides data-driven calibration with proper activation scales, and is the officially supported path for DeepSeek V3/V4 NVFP4.
## What's here
| File | Purpose |
| --- | --- |
| `inspect_model.py` | Run this first. Prints tensor name patterns, dtypes, FP8 scaling block sizes, and counts of MoE expert/router/norm tensors so you know exactly what you're dealing with before any conversion. |
| `fp8_to_nvfp4_streaming.py` | **Path A.** Pure tensor-level streaming FP8 → NVFP4 conversion. No model loading, no calibration, weight-only. Low memory, fast, deterministic. Recommended for first run. |
| `quantize_llmcompressor.py` | **Path B.** `llm-compressor` oneshot with sequential pipeline + activation calibration. Produces W4A4 with calibrated activation scales. Higher quality on activation-sensitive ops but riskier given V4 is two weeks old. |
| `verify_nvfp4.py` | Loads the produced NVFP4 checkpoint, runs a basic forward pass through one block, checks for NaN/Inf, and dumps a few generated tokens via vLLM. |
| `quantize_modelopt.py` | PTQ via `nvidia-modelopt` with `NVFP4_EXPERTS_ONLY` config |
## Hardware assumptions
## Quantization config
- 8× B200 baremetal, 1.5 TB HBM total
- 2.7 TB system RAM
- ≥10 TB free NVMe at `~/nvidia-meeting/`
Using `nvfp4_experts_only` — NVIDIA's recommended config for MoE models. This quantizes only the expert MLP layers (`mlp.experts` / `block_sparse_moe`) while keeping attention QKV projections in higher precision. Options:
## Prereqs
- `nvfp4_experts_only` — Experts only (recommended for MoE)
- `nvfp4_mlp_only` — All MLP layers (experts + shared)
- `nvfp4` — Full model NVFP4 (riskier for attention)
## Prerequisites
```bash
source ~/nvidia-meeting/venv/bin/activate
pip install --upgrade torch safetensors transformers tqdm
pip install --upgrade llmcompressor compressed-tensors # only needed for Path B
pip install --upgrade vllm # only needed for verify
# Use the TensorRT-LLM docker if possible:
# docker run --gpus all -it nvcr.io/nvidia/tensorrt-llm/release:1.2.0 bash
# Otherwise pip install:
pip install -U "nvidia-modelopt[hf]"
pip install compressed-tensors fire flash-attn transformers_stream_generator zstandard
# Note: requires transformers<5.0 for modelopt compatibility
```
You'll likely need `transformers` from source for V4 architecture support, and `trust_remote_code=True` everywhere. Stock pip versions may not load V4 yet.
## Recommended order tonight
## Usage
```bash
cd ~/nvidia-meeting
# On the B200 node (8× B200, 2.7 TB RAM)
cd /root/nvidia-meeting
source venv/bin/activate
# 1. Inspect the FP8 source — 30 seconds, no GPU needed.
python inspect_model.py DeepSeek-V4-Pro-FP8 | tee inspect.log
# Using BF16 source weights (preferred for modelopt calibration)
python quantize_modelopt.py \
--model /root/nvidia-meeting/DeepSeek-V4-Pro \
--export_dir /root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4-modelopt \
--qformat nvfp4_experts_only \
--tp 8 \
--calib_size 256
# 2. Path A streaming conversion — should run in 2-6 hours dominated by NVMe I/O.
python fp8_to_nvfp4_streaming.py \
--src DeepSeek-V4-Pro-FP8 \
--dst DeepSeek-V4-Pro-NVFP4-streaming \
--workers 8 \
2>&1 | tee path_a.log
# 3. Quick sanity check — does it load and forward-pass?
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming
# 4. Path B (overnight). Run only after Path A succeeds. 24-72 hours.
python quantize_llmcompressor.py \
--src DeepSeek-V4-Pro-FP8 \
--dst DeepSeek-V4-Pro-NVFP4-llmcompressor \
--num-samples 256 \
--max-seq-len 4096 \
2>&1 | tee path_b.log
# Using FP8 source (modelopt handles dequant internally)
python quantize_modelopt.py \
--model /root/nvidia-meeting/DeepSeek-V4-Pro-FP8 \
--export_dir /root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4-modelopt-fp8src \
--qformat nvfp4_experts_only \
--tp 8 \
--calib_size 256
```
## Path A — what it does
## Low-memory options
1. Reads `model.safetensors.index.json` to map every tensor to its shard.
2. Classifies every tensor:
- **Preserve** (copied bit-for-bit): `lm_head`, `embed_tokens`, MoE router gates (`*.mlp.gate`), all norms, V4-specific attention indexer/scoring tensors, mHC residual mixing weights.
- **Quantize**: any FP8 weight that has a corresponding `*.weight_scale_inv` companion (i.e. real GEMM weights).
3. For every quantizable weight:
- Dequantizes FP8 E4M3 → FP32 using the source's per-block scales (auto-detects 128×128 blocks).
- Computes NVFP4 dual scales: per-tensor `weight_scale_2 = amax / (6.0 * 448.0)` and per-16-element-block `weight_scale = block_amax / (6.0 * weight_scale_2)` cast to FP8 E4M3.
- Quantizes FP32 → E2M1 representable values `{0, ±0.5, ±1, ±1.5, ±2, ±3, ±4, ±6}`.
- Packs two 4-bit values per `uint8` byte.
4. **MoE pair handling**: detects `gate_proj` (w1) + `up_proj` (w3) of each expert and computes a joint `weight_scale_2` across both, since vLLM's fused MoE kernel requires them to share that global scale.
5. Streams output to new shards (~5 GB each) with a fresh `model.safetensors.index.json` and copies all non-tensor files (config, tokenizer, etc.) verbatim.
If you hit OOM during calibration:
**This is weight-only NVFP4.** Activation quantization is not done here — you get W4A16 effective behavior at runtime unless your inference engine generates dynamic per-group activation scales. vLLM does generate per-group activation scales dynamically at inference, so this is fine for most use cases.
- `--use_seq_device_map` — sequential device mapping across GPUs
- `--low_memory_mode` — compress weights before calibration (FP8/NVFP4 only)
## Path B — what it does
## Output
1. Loads the FP8 model via `transformers` with `device_map="auto"` and the offload folder pointing at NVMe. With 2.7 TB RAM, the FP8 weights (~865 GB) sit in RAM; activations and per-layer BF16 promotion happen on the B200s.
2. Loads a calibration set (default 256 samples of `HuggingFaceH4/ultrachat_200k`).
3. Runs `llm-compressor` `oneshot` with `pipeline="sequential"` so only one transformer block is materialized in BF16 on GPU at a time.
4. `moe_calibrate_all_experts=True` ensures every routed expert gets calibration signal even when natural routing wouldn't pick it.
5. The recipe targets `Linear` with NVFP4 and the same ignore list as Path A (lm_head, embed, router gates, norms, indexer, mHC).
6. Saves with `save_compressed=True` in `compressed-tensors` format.
Exports a **Unified HuggingFace checkpoint** compatible with:
- TensorRT-LLM (PyTorch and C++ backends)
- vLLM
- SGLang
**The known risks for Path B on V4 specifically:**
## Expected runtime
- V4 architecture is brand new. `llm-compressor` may not have a registered MoE wrapper for V4 — you may need to call `replace_modules_for_calibration` with the actual V4 MoE class name (the script has a TODO and a fallback path).
- Sequential pipeline may not handle CSA/HCA hybrid attention if the attention forward isn't a simple linear chain. If you see weird offload errors during calibration, the indexer/scoring tensors are likely the culprit.
- Calibration cache for 256 routed experts × all V4 layers can be hundreds of GB. Watch `nvidia-smi` and `free -h` during the first 30 minutes.
## Things to discuss with the NVIDIA engineer
1. **NVFP4 packing convention.** My converter packs as `byte = elem0 | (elem1 << 4)` (low nibble first). Verify this matches what TensorRT-LLM / cutlass NVFP4 kernels expect. If reversed, just flip in `pack_fp4()`.
2. **Joint scaling extension.** I implement joint `weight_scale_2` for `gate_proj`/`up_proj` pairs. Ask whether `down_proj` also benefits, or whether all three experts in a fused MoE block should share — recipes have varied.
3. **mHC residual weights.** I preserve them in FP8/BF16 conservatively. If NVIDIA has actually quantized these somewhere internally, drop them out of the ignore list to recover memory.
4. **CSA + HCA indexer/scoring tensors.** I preserve these blindly based on the V3.2 DSA precedent. Ask whether V4's compressed-sparse / heavily-compressed attention has analogous "cannot quantize" tensors and what the canonical regex is.
5. **W4A4 vs W4A16 for V4 Pro.** Path A is W4A16-equivalent; Path B is W4A4. For a 1.6T MoE with extreme long-context, ask which is internally recommended for first deployment.
6. **`modelopt` vs `llm-compressor` for V4.** RedHat shipped V4-*Flash* NVFP4 via `llm-compressor`. Why not Pro yet? Find out if there's a known-bad layer or just compute time.
## Output sizes to expect
- FP8 source: ~865 GB
- Path A NVFP4 output: ~430470 GB (about 2× compression vs FP8 source; experts dominate, norms/embeds add a bit back)
- Path B NVFP4 output: similar, plus activation scale metadata
## Resumability
Path A is checkpoint-resumable per shard — if it dies mid-run, re-running picks up from the next unwritten output shard. Path B is **not** resumable mid-calibration; if it crashes you restart.
24-72 hours for full calibration on 8× B200 with 256 calibration samples.

View File

@@ -1,540 +0,0 @@
#!/usr/bin/env python3
"""Streaming FP8 → NVFP4 converter for DeepSeek V4 Pro (sgl-project FP8 repackage).
Path A: pure tensor-level conversion. No model loading via transformers, no
calibration. Reads FP8 safetensors shards, dequantizes per-block FP8 to FP32,
re-quantizes to NVFP4 (E2M1 packed in uint8 with FP8 E4M3 per-block scales and
an FP32 per-tensor global scale), and writes new shards.
Key behaviors:
- Joint global scale_2 across (gate_proj, up_proj) pairs of each expert,
required for vLLM fused MoE kernels.
- Preserves lm_head, embeddings, MoE router gates, norms, V4 indexer/scoring,
and mHC residual mixing weights at original precision.
- Streams shard-by-shard. Peak working memory is one tensor pair dequantized
to FP32 (a few hundred MB at most for the largest weights).
- Resumable per output shard.
NVFP4 format reference:
value = packed_fp4 * weight_scale * weight_scale_2
where:
packed_fp4: E2M1 in {0, ±0.5, ±1, ±1.5, ±2, ±3, ±4, ±6}, 2 per byte
weight_scale: FP8 E4M3, one per 16-element block
weight_scale_2: FP32 scalar per tensor, global
Usage:
python fp8_to_nvfp4_streaming.py \\
--src DeepSeek-V4-Pro-FP8 \\
--dst DeepSeek-V4-Pro-NVFP4-streaming \\
--workers 8
Optional:
--gpu N Use CUDA device N for the math (default: 0; -1 for CPU)
--shard-size-gb 5 Target output shard size
--dry-run Print what would be done; don't write
"""
import argparse
import json
import re
import shutil
import sys
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import torch
from safetensors import safe_open
from safetensors.torch import save_file
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Classification: which tensors do we quantize, which do we preserve?
# ---------------------------------------------------------------------------
PRESERVE_REGEXES = [
r".*lm_head.*",
r".*embed_tokens.*",
r".*\.(mlp|ffn)\.gate(\.weight)?$", # MoE router (NOT gate_proj)
r".*norm.*",
r".*indexer.*", # V3.2 DSA / V4 CSA indexer
r".*hyper_conn.*", # V4 mHC
r".*\.mhc.*",
r".*hc_attn.*", # V4 hyper-connection attn
r".*hc_ffn.*", # V4 hyper-connection ffn
r".*hc_head.*", # V4 hyper-connection head
r".*scoring.*",
r".*attn_sink.*", # V4 attention sink
r".*compressor\.ape.*", # V4 compressor absolute pos encoding
r".*tid2eid.*", # V4 MoE token-to-expert mapping
r".*\.bias$", # any biases
]
PRESERVE_RE = re.compile("|".join(f"(?:{p})" for p in PRESERVE_REGEXES))
# Identify expert pairs that need joint global scale
EXPERT_PAIR_RE = re.compile(r"(.*experts\.\d+)\.(w1|w3)\.weight$")
def is_preserve(name: str) -> bool:
return bool(PRESERVE_RE.match(name))
# ---------------------------------------------------------------------------
# FP8 dequantization (per-block)
# ---------------------------------------------------------------------------
def dequant_fp8_to_fp32(weight_fp8: torch.Tensor, scale_inv: torch.Tensor) -> torch.Tensor:
"""Dequantize a per-block FP8 E4M3 weight to FP32 using its inverse-scale tensor.
DeepSeek convention: weight_scale_inv stores the dequant scale (multiply by it
to recover FP32). Block size is inferred from shape ratios — typically 128x128.
"""
assert weight_fp8.dim() == 2, f"Expected 2D weight, got shape {weight_fp8.shape}"
M, N = weight_fp8.shape
if scale_inv.dim() == 0:
# Per-tensor scale
return weight_fp8.float() * scale_inv.float()
if scale_inv.dim() == 1:
# Per-row or per-col — unusual for DeepSeek but handle it
if scale_inv.numel() == M:
return weight_fp8.float() * scale_inv.float().unsqueeze(1)
if scale_inv.numel() == N:
return weight_fp8.float() * scale_inv.float().unsqueeze(0)
raise ValueError(f"Cannot align 1D scale_inv {scale_inv.shape} to weight {weight_fp8.shape}")
# 2D block scaling
sm, sn = scale_inv.shape
bm = (M + sm - 1) // sm
bn = (N + sn - 1) // sn
scale_full = scale_inv.float().repeat_interleave(bm, dim=0).repeat_interleave(bn, dim=1)
scale_full = scale_full[:M, :N]
return weight_fp8.float() * scale_full
# ---------------------------------------------------------------------------
# NVFP4 quantization
# ---------------------------------------------------------------------------
FP4_E2M1_VALUES = torch.tensor(
[0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0], dtype=torch.float32
)
# Boundaries between adjacent magnitudes (round-to-nearest with ties to even-ish)
FP4_BOUNDARIES = torch.tensor(
[0.25, 0.75, 1.25, 1.75, 2.5, 3.5, 5.0], dtype=torch.float32
)
FP4_MAX = 6.0
FP8_E4M3_MAX = 448.0
def round_to_fp4_e2m1_index(x: torch.Tensor) -> torch.Tensor:
"""Round x to nearest FP4 E2M1 representable, return 4-bit index in [0..15].
Index encoding: bit 3 = sign, bits 0..2 = magnitude index into FP4_E2M1_VALUES.
"""
sign = (x < 0).to(torch.uint8)
abs_x = x.abs().clamp_(max=FP4_MAX)
# searchsorted is fast on GPU; uses float32
boundaries = FP4_BOUNDARIES.to(x.device)
mag_idx = torch.searchsorted(boundaries, abs_x.contiguous()).to(torch.uint8)
return (sign << 3) | mag_idx
def quantize_to_nvfp4(
x_fp32: torch.Tensor,
scale_2: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
"""Quantize an FP32 weight to NVFP4 given a (possibly joint) global scale.
Args:
x_fp32: [M, N] FP32 tensor, N must be divisible by 16
scale_2: scalar FP32 tensor
Returns:
packed: [M, N//2] uint8, two FP4 values per byte (low nibble first)
weight_scale: [M, N//16] FP8 E4M3 per-block scales
"""
M, N = x_fp32.shape
if N % 16 != 0:
raise ValueError(f"NVFP4 requires N % 16 == 0; got {x_fp32.shape}")
# Per-block (16-element) amax
blocks = x_fp32.view(M, N // 16, 16)
block_amax = blocks.abs().amax(dim=-1) # [M, N//16]
# Per-block scale in FP32, then cast to FP8 E4M3 (this is the lossy step)
block_scale_fp32 = block_amax / (FP4_MAX * scale_2)
# Avoid zeros — produces NaN on dequant. Clamp tiny scales.
block_scale_fp32 = block_scale_fp32.clamp_(min=1e-30)
block_scale_fp8 = block_scale_fp32.to(torch.float8_e4m3fn)
# Recover the effective scale that the kernel will actually use
effective = scale_2 * block_scale_fp8.float() # [M, N//16]
# Quantize values: divide, clamp, round to E2M1
scaled = blocks / effective.unsqueeze(-1).clamp_(min=1e-30)
fp4_idx = round_to_fp4_e2m1_index(scaled) # [M, N//16, 16] uint8
fp4_idx = fp4_idx.view(M, N).contiguous()
# Pack two nibbles per byte: low = even-index element, high = odd-index element
low = fp4_idx[:, ::2]
high = fp4_idx[:, 1::2]
packed = (low | (high << 4)).to(torch.uint8)
return packed, block_scale_fp8
def compute_global_scale(*tensors_fp32: torch.Tensor) -> torch.Tensor:
"""Compute joint NVFP4 global scale_2 across one or more FP32 tensors.
scale_2 = amax / (FP4_MAX * FP8_E4M3_MAX)
"""
amax = torch.stack([t.abs().max() for t in tensors_fp32]).max()
scale_2 = amax / (FP4_MAX * FP8_E4M3_MAX)
# Avoid zero
return scale_2.clamp_(min=1e-30).float()
# ---------------------------------------------------------------------------
# Sharded output writer
# ---------------------------------------------------------------------------
class ShardedSafetensorsWriter:
"""Writes tensors to a sequence of safetensors shards, building an index map."""
def __init__(self, out_dir: Path, max_shard_bytes: int):
self.out_dir = out_dir
self.out_dir.mkdir(parents=True, exist_ok=True)
self.max_shard_bytes = max_shard_bytes
self.current = {} # name -> tensor (CPU)
self.current_bytes = 0
self.shard_idx = 0
self.weight_map: dict[str, str] = {} # name -> shard filename
self.shard_filenames: list[str] = []
def _flush(self):
if not self.current:
return
self.shard_idx += 1
# Use placeholder total; we'll rename at the end
fname = f"model-{self.shard_idx:05d}-of-PLACEHOLDER.safetensors"
path = self.out_dir / fname
save_file(self.current, str(path))
for name in self.current:
self.weight_map[name] = fname
self.shard_filenames.append(fname)
self.current.clear()
self.current_bytes = 0
def add(self, name: str, tensor: torch.Tensor):
# safetensors requires CPU tensors and contiguous
t = tensor.detach().cpu().contiguous()
size = t.numel() * t.element_size()
if self.current and self.current_bytes + size > self.max_shard_bytes:
self._flush()
self.current[name] = t
self.current_bytes += size
def close(self):
self._flush()
# Now rename shards to use proper of-N suffix
total = len(self.shard_filenames)
new_map = {}
for old_fname in self.shard_filenames:
idx = int(old_fname.split("-")[1])
new_fname = f"model-{idx:05d}-of-{total:05d}.safetensors"
(self.out_dir / old_fname).rename(self.out_dir / new_fname)
new_map[old_fname] = new_fname
# Patch weight_map
self.weight_map = {k: new_map[v] for k, v in self.weight_map.items()}
return self.weight_map
# ---------------------------------------------------------------------------
# Shard-level conversion plan
# ---------------------------------------------------------------------------
def build_plan(src_dir: Path):
"""Build the conversion plan from index.json.
Returns:
weight_map: name -> shard filename
shard_to_names: shard filename -> list of names in that shard
expert_pair_groups: list of (group_name, name_w1, name_w3)
For each expert, the gate_proj/up_proj pair gets a shared scale_2.
solo_quantize: list of names to quantize independently
preserve: list of names to copy unchanged
"""
with open(src_dir / "model.safetensors.index.json") as f:
index = json.load(f)
weight_map = index["weight_map"]
shard_to_names = defaultdict(list)
for name, fn in weight_map.items():
shard_to_names[fn].append(name)
# Gather all weight tensor names (those with .weight suffix)
all_weights = [n for n in weight_map if n.endswith(".weight")]
# Identify expert pairs
expert_pairs = defaultdict(dict) # base -> {"gate_proj": name, "up_proj": name}
for n in all_weights:
m = EXPERT_PAIR_RE.match(n)
if m:
base, kind = m.group(1), m.group(2)
expert_pairs[base][kind] = n
paired_names = set()
expert_pair_groups = []
for base, parts in expert_pairs.items():
if "w1" in parts and "w3" in parts:
expert_pair_groups.append((base, parts["w1"], parts["w3"]))
paired_names.add(parts["w1"])
paired_names.add(parts["w3"])
# Classify everything else
solo_quantize = []
preserve = []
scale_companions = [] # .scale tensors that get consumed during dequant
for n in weight_map:
if n.endswith(".scale") and n.replace(".scale", ".weight") in weight_map:
scale_companions.append(n)
continue
if n in paired_names:
continue
if is_preserve(n):
preserve.append(n)
continue
# Anything else with .weight gets quantized solo, otherwise preserved
if n.endswith(".weight"):
solo_quantize.append(n)
else:
preserve.append(n)
return {
"weight_map": weight_map,
"shard_to_names": dict(shard_to_names),
"expert_pair_groups": expert_pair_groups,
"solo_quantize": solo_quantize,
"preserve": preserve,
"scale_companions": scale_companions,
}
# ---------------------------------------------------------------------------
# Tensor loading helpers
# ---------------------------------------------------------------------------
class ShardCache:
"""Lazy per-shard safe_open cache so we don't re-open shards repeatedly."""
def __init__(self, src_dir: Path, max_open: int = 4):
self.src_dir = src_dir
self.max_open = max_open
self.handles: dict[str, "safe_open"] = {}
def get(self, shard_fname: str):
if shard_fname in self.handles:
return self.handles[shard_fname]
if len(self.handles) >= self.max_open:
# Drop one
old_fn = next(iter(self.handles))
self.handles[old_fn].__exit__(None, None, None)
del self.handles[old_fn]
h = safe_open(self.src_dir / shard_fname, framework="pt")
h.__enter__()
self.handles[shard_fname] = h
return h
def close(self):
for h in self.handles.values():
h.__exit__(None, None, None)
self.handles.clear()
def load_weight_and_scale(cache: ShardCache, weight_map, name):
"""Load an FP8 weight with its scale companion (if any)."""
weight = cache.get(weight_map[name]).get_tensor(name)
scale_name = name.replace(".weight", ".scale")
scale = None
if scale_name in weight_map:
try:
scale = cache.get(weight_map[scale_name]).get_tensor(scale_name)
except Exception:
# Scale listed in index but not in shard (BF16 weights have no scale)
pass
return weight, scale
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--src", required=True, help="Source FP8 model directory")
ap.add_argument("--dst", required=True, help="Output NVFP4 model directory")
ap.add_argument("--gpu", type=int, default=0, help="CUDA device, -1 for CPU")
ap.add_argument("--shard-size-gb", type=float, default=5.0)
ap.add_argument("--workers", type=int, default=4,
help="Concurrent tensor-conversion workers (lots of small tensors benefit; "
"actual GPU compute is serialized by torch)")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
src = Path(args.src).resolve()
dst = Path(args.dst).resolve()
if not (src / "model.safetensors.index.json").exists():
sys.exit(f"No index.json at {src}")
device = torch.device(f"cuda:{args.gpu}" if args.gpu >= 0 and torch.cuda.is_available() else "cpu")
print(f"Compute device: {device}")
# Move FP4_BOUNDARIES to device once
global FP4_BOUNDARIES
FP4_BOUNDARIES = FP4_BOUNDARIES.to(device)
print("Building conversion plan...")
plan = build_plan(src)
n_pairs = len(plan["expert_pair_groups"])
n_solo = len(plan["solo_quantize"])
n_preserve = len(plan["preserve"])
n_scales = len(plan["scale_companions"])
print(f" Expert pair groups (joint scale_2): {n_pairs:,}")
print(f" Solo quantize tensors: {n_solo:,}")
print(f" Preserved tensors: {n_preserve:,}")
print(f" Scale companions consumed: {n_scales:,}")
if args.dry_run:
print("\nDry run — exiting before any writes.")
return
dst.mkdir(parents=True, exist_ok=True)
cache = ShardCache(src, max_open=8)
writer = ShardedSafetensorsWriter(dst, max_shard_bytes=int(args.shard_size_gb * 1024**3))
weight_map = plan["weight_map"]
t_start = time.time()
# ------------------------------------------------------------------
# 1. Preserved tensors — copy unchanged
# ------------------------------------------------------------------
for name in tqdm(plan["preserve"], desc="Preserve", unit="tensor"):
t = cache.get(weight_map[name]).get_tensor(name)
writer.add(name, t)
# ------------------------------------------------------------------
# 2. Expert pairs — joint scale_2 across (gate_proj, up_proj)
# ------------------------------------------------------------------
for base, name_w1, name_w3 in tqdm(plan["expert_pair_groups"], desc="Expert pairs", unit="pair"):
w1_fp8, s1 = load_weight_and_scale(cache, weight_map, name_w1)
w3_fp8, s3 = load_weight_and_scale(cache, weight_map, name_w3)
with torch.no_grad():
w1 = dequant_fp8_to_fp32(w1_fp8.to(device), s1.to(device)) if s1 is not None else w1_fp8.float().to(device)
w3 = dequant_fp8_to_fp32(w3_fp8.to(device), s3.to(device)) if s3 is not None else w3_fp8.float().to(device)
scale_2 = compute_global_scale(w1, w3)
packed1, blk1 = quantize_to_nvfp4(w1, scale_2)
packed3, blk3 = quantize_to_nvfp4(w3, scale_2)
writer.add(name_w1, packed1)
writer.add(name_w1.replace(".weight", ".weight_scale"), blk1)
writer.add(name_w1.replace(".weight", ".weight_scale_2"), scale_2)
writer.add(name_w3, packed3)
writer.add(name_w3.replace(".weight", ".weight_scale"), blk3)
writer.add(name_w3.replace(".weight", ".weight_scale_2"), scale_2)
# ------------------------------------------------------------------
# 3. Solo quantize tensors — independent scale_2 per tensor
# ------------------------------------------------------------------
for name in tqdm(plan["solo_quantize"], desc="Solo quantize", unit="tensor"):
w_fp8, s = load_weight_and_scale(cache, weight_map, name)
with torch.no_grad():
if s is not None:
w = dequant_fp8_to_fp32(w_fp8.to(device), s.to(device))
else:
# Already non-FP8 (e.g. BF16), just upcast
w = w_fp8.float().to(device)
scale_2 = compute_global_scale(w)
packed, blk = quantize_to_nvfp4(w, scale_2)
writer.add(name, packed)
writer.add(name.replace(".weight", ".weight_scale"), blk)
writer.add(name.replace(".weight", ".weight_scale_2"), scale_2)
# Finalize shards & index
final_weight_map = writer.close()
cache.close()
# ------------------------------------------------------------------
# 4. Write model.safetensors.index.json
# ------------------------------------------------------------------
total_size = sum(
(dst / fn).stat().st_size for fn in set(final_weight_map.values())
)
new_index = {
"metadata": {"total_size": total_size},
"weight_map": final_weight_map,
}
with open(dst / "model.safetensors.index.json", "w") as f:
json.dump(new_index, f, indent=2)
# ------------------------------------------------------------------
# 5. Copy non-tensor files (config, tokenizer, etc.)
# ------------------------------------------------------------------
for fname in src.iterdir():
if fname.is_dir():
# encoding/, inference/, assets/ — copy whole tree
dst_sub = dst / fname.name
if not dst_sub.exists():
shutil.copytree(fname, dst_sub)
continue
if fname.suffix == ".safetensors":
continue
if fname.name == "model.safetensors.index.json":
continue
shutil.copy2(fname, dst / fname.name)
# ------------------------------------------------------------------
# 6. Patch config.json with quantization metadata so loaders know
# ------------------------------------------------------------------
cfg_path = dst / "config.json"
if cfg_path.exists():
with open(cfg_path) as f:
cfg = json.load(f)
cfg["quantization_config"] = {
"quant_method": "compressed-tensors",
"format": "nvfp4-pack-quantized",
"config_groups": {
"group_0": {
"targets": ["Linear"],
"weights": {
"num_bits": 4,
"type": "float",
"strategy": "tensor_group",
"group_size": 16,
"symmetric": True,
},
}
},
"ignore": PRESERVE_REGEXES,
}
with open(cfg_path, "w") as f:
json.dump(cfg, f, indent=2)
elapsed = time.time() - t_start
print(f"\nDone in {elapsed/3600:.2f}h")
print(f"Output: {dst}")
print(f"Total size: {total_size/1024**3:.1f} GB across {len(set(final_weight_map.values()))} shards")
if __name__ == "__main__":
main()

View File

@@ -1,173 +0,0 @@
#!/usr/bin/env python3
"""Inspect a DeepSeek FP8 model directory and report on tensor structure.
Usage: python inspect_model.py <model_dir>
Prints:
- Total tensor count and dtype histogram
- Sample of tensor names by category (lm_head, embeddings, attention, MoE experts, norms, etc.)
- FP8 block scaling structure (block size detection)
- MoE expert layer count and routing structure
- Any "unusual" tensors that need manual classification
"""
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
from safetensors import safe_open
# Patterns we'd preserve (skip quantization on)
PRESERVE_PATTERNS = [
(re.compile(r".*lm_head.*"), "lm_head"),
(re.compile(r".*embed_tokens.*"), "embeddings"),
(re.compile(r".*\.mlp\.gate(\.weight)?$"), "moe_router_gate"),
(re.compile(r".*norm.*"), "normalization"),
(re.compile(r".*indexer.*"), "attention_indexer"), # V3.2 DSA / V4 CSA?
(re.compile(r".*hyper_conn.*"), "mhc_hyper_conn"), # V4 mHC
(re.compile(r".*mhc.*"), "mhc_other"),
(re.compile(r".*scoring.*"), "scoring"),
]
# Patterns for MoE expert weights (these are what we WILL quantize)
EXPERT_PATTERNS = [
(re.compile(r".*experts\.\d+\.gate_proj.*"), "expert_gate_proj"),
(re.compile(r".*experts\.\d+\.up_proj.*"), "expert_up_proj"),
(re.compile(r".*experts\.\d+\.down_proj.*"), "expert_down_proj"),
(re.compile(r".*shared_experts?\.gate_proj.*"), "shared_gate_proj"),
(re.compile(r".*shared_experts?\.up_proj.*"), "shared_up_proj"),
(re.compile(r".*shared_experts?\.down_proj.*"), "shared_down_proj"),
]
def categorize(name):
for pat, cat in PRESERVE_PATTERNS:
if pat.match(name):
return ("preserve", cat)
for pat, cat in EXPERT_PATTERNS:
if pat.match(name):
return ("quantize_expert", cat)
if name.endswith(".weight_scale_inv"):
return ("scale_metadata", "fp8_block_scale")
if name.endswith(".weight"):
return ("quantize_other", "linear_weight")
return ("other", "uncategorized")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("model_dir")
ap.add_argument("--show-samples", type=int, default=5,
help="How many sample names to show per category")
args = ap.parse_args()
model_dir = Path(args.model_dir)
index_path = model_dir / "model.safetensors.index.json"
if not index_path.exists():
print(f"ERROR: {index_path} not found", file=sys.stderr)
sys.exit(1)
with open(index_path) as f:
index = json.load(f)
weight_map = index["weight_map"]
total_size = index.get("metadata", {}).get("total_size")
print(f"=== {model_dir} ===")
print(f"Total tensors: {len(weight_map):,}")
print(f"Total shards: {len(set(weight_map.values()))}")
if total_size:
print(f"Reported size: {total_size / 1024**3:.1f} GB")
print()
# Categorize names (cheap, no tensor loading)
categories = defaultdict(list)
for name in weight_map:
kind, cat = categorize(name)
categories[(kind, cat)].append(name)
print("=== Tensor categorization ===")
for (kind, cat), names in sorted(categories.items()):
print(f" [{kind:18s}] {cat:25s} count={len(names):,}")
for n in names[: args.show_samples]:
print(f" {n}")
if len(names) > args.show_samples:
print(f" ... and {len(names) - args.show_samples} more")
print()
# Inspect dtypes and FP8 block scaling on a sample shard
sample_shard = model_dir / sorted(set(weight_map.values()))[0]
print(f"=== Sampling dtypes from {sample_shard.name} ===")
dtype_hist = Counter()
fp8_block_sizes = Counter()
weight_with_scale = []
with safe_open(sample_shard, framework="pt") as f:
names_in_shard = list(f.keys())
for name in names_in_shard:
t = f.get_tensor(name)
dtype_hist[str(t.dtype)] += 1
# Check for FP8 weight + scale_inv pair
if name.endswith(".weight") and t.dtype.is_floating_point and t.element_size() == 1:
scale_name = name.replace(".weight", ".weight_scale_inv")
if scale_name in names_in_shard:
scale_t = f.get_tensor(scale_name)
bm = t.shape[0] / scale_t.shape[0] if scale_t.dim() == 2 else None
bn = t.shape[1] / scale_t.shape[1] if scale_t.dim() == 2 and t.dim() == 2 else None
fp8_block_sizes[(bm, bn)] += 1
if len(weight_with_scale) < 3:
weight_with_scale.append((name, t.shape, t.dtype, scale_t.shape, scale_t.dtype))
print(" Dtype histogram (this shard only):")
for d, c in dtype_hist.most_common():
print(f" {d:20s} {c:,}")
print()
print(" FP8 block-scale dimensions detected:")
for (bm, bn), c in fp8_block_sizes.most_common():
print(f" block_size = ({bm}, {bn}) count={c}")
print()
print(" Sample FP8 weight + scale_inv pairs:")
for name, wshape, wdt, sshape, sdt in weight_with_scale:
print(f" {name}")
print(f" weight: shape={tuple(wshape)} dtype={wdt}")
print(f" scale: shape={tuple(sshape)} dtype={sdt}")
# MoE structure summary
print()
print("=== MoE structure summary ===")
layer_experts = defaultdict(set)
for name in weight_map:
m = re.match(r".*layers\.(\d+)\..*experts\.(\d+)\..*", name)
if m:
layer_experts[int(m.group(1))].add(int(m.group(2)))
if layer_experts:
layer_count = len(layer_experts)
expert_counts = [len(v) for v in layer_experts.values()]
print(f" Layers with MoE experts: {layer_count}")
print(f" Experts per layer: min={min(expert_counts)} max={max(expert_counts)}")
print(f" Sample layer 0 experts: {sorted(list(layer_experts[min(layer_experts)]))[:5]}...")
else:
print(" No '.experts.N.' pattern found — MoE structure may use different naming.")
# Flag uncategorized for human review
print()
print("=== Uncategorized tensors (review these manually) ===")
uncat = categories.get(("other", "uncategorized"), [])
if uncat:
print(f" {len(uncat):,} tensors:")
for n in uncat[:20]:
print(f" {n}")
if len(uncat) > 20:
print(f" ... and {len(uncat) - 20} more")
else:
print(" None — every tensor matched a known pattern.")
if __name__ == "__main__":
main()

View File

@@ -1,218 +0,0 @@
#!/usr/bin/env python3
"""Path B: llm-compressor oneshot NVFP4 quantization for DeepSeek V4 Pro.
Uses sequential pipeline + activation calibration to produce W4A4 NVFP4 with
calibrated activation global scales. Higher quality than the streaming converter
on activation-sensitive ops, at the cost of much longer wall time and more
fragility on a brand-new architecture.
Memory plan with 2.7 TB host RAM + 8x B200 (1.5 TB HBM):
- FP8 base resident in CPU RAM: ~865 GB
- One transformer block on GPU at a time: ~10-30 GB HBM
- Activation calibration cache: tens to a few hundred GB
- Headroom: ~1.5+ TB RAM, ~1.4+ TB HBM
Critical: this loads the model with trust_remote_code=True. V4 architecture is
brand new; expect to need:
- transformers from source (or recent main)
- llm-compressor from source
- The V4 modeling code in DeepSeek-V4-Pro-FP8/inference/ to be importable
Usage:
python quantize_llmcompressor.py \\
--src DeepSeek-V4-Pro-FP8 \\
--dst DeepSeek-V4-Pro-NVFP4-llmcompressor \\
--num-samples 256 \\
--max-seq-len 4096
"""
import argparse
import os
import sys
from pathlib import Path
import torch
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--src", required=True, help="Source FP8 model directory")
ap.add_argument("--dst", required=True, help="Output NVFP4 model directory")
ap.add_argument("--num-samples", type=int, default=256)
ap.add_argument("--max-seq-len", type=int, default=4096)
ap.add_argument("--calibration-dataset", default="HuggingFaceH4/ultrachat_200k")
ap.add_argument(
"--offload-folder", default="/root/nvidia-meeting/.offload",
help="NVMe folder for accelerate disk-offload spillover (rarely needed at 2.7TB RAM)",
)
ap.add_argument(
"--no-activation-quant", action="store_true",
help="Quantize weights only (no activation calibration). Faster, closer to Path A."
)
args = ap.parse_args()
src = Path(args.src).resolve()
dst = Path(args.dst).resolve()
if not (src / "config.json").exists():
sys.exit(f"No config.json at {src}")
Path(args.offload_folder).mkdir(parents=True, exist_ok=True)
# Heavy imports happen here so --help is fast
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
from llmcompressor import oneshot
from llmcompressor.modifiers.quantization import QuantizationModifier
# ----------------------------------------------------------------------
# 1. Load model
# ----------------------------------------------------------------------
print(f"Loading {src} ...")
print(" This will take several minutes — FP8 base is ~865 GB.")
# We want FP8 weights to stay as FP8 on CPU and only be promoted to BF16
# when each block goes to GPU during sequential calibration. The exact
# behavior depends on transformers' V4 modeling code — if it auto-dequants
# on load, expect 3.2 TB BF16 in RAM and you'll spill. Watch `free -h`.
tokenizer = AutoTokenizer.from_pretrained(src, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
src,
torch_dtype="auto",
device_map="cpu", # all on CPU; sequential pipeline moves blocks to GPU
trust_remote_code=True,
offload_folder=args.offload_folder,
)
print(f" Model class: {type(model).__name__}")
print(f" Param count: {sum(p.numel() for p in model.parameters()):,}")
# ----------------------------------------------------------------------
# 2. MoE handling — replace_modules_for_calibration
# ----------------------------------------------------------------------
# On Llama4/Qwen3-MoE, llm-compressor needs a wrapper class that exposes
# every expert during calibration (otherwise routed-only experts never see
# data). For DeepSeek V4 the MoE class name is something like
# `DeepseekV4MoE`. Try the canonical entrypoint first; fall back gracefully.
try:
from llmcompressor.modeling import replace_modules_for_calibration
print("Replacing MoE modules for calibration...")
replace_modules_for_calibration(model)
except ImportError:
print("WARN: replace_modules_for_calibration not available in this "
"llm-compressor version. Routed-only experts may not see "
"calibration data, lowering NVFP4 quality on rare experts.")
except Exception as e:
print(f"WARN: replace_modules_for_calibration failed: {e}")
print(" You may need to register a custom MoE wrapper for V4. "
"Find the MoE class name in DeepSeek-V4-Pro-FP8/inference/ and "
"register it via llmcompressor.modeling.register_module_replacement.")
# ----------------------------------------------------------------------
# 3. Calibration dataset
# ----------------------------------------------------------------------
print(f"Loading calibration dataset {args.calibration_dataset} ...")
ds = load_dataset(args.calibration_dataset, split="train_sft")
ds = ds.shuffle(seed=42).select(range(args.num_samples))
def preprocess(example):
# Use the model's chat template if it has one; ultrachat samples have a
# 'messages' field already in the OpenAI shape.
if "messages" in example:
try:
text = tokenizer.apply_chat_template(
example["messages"], tokenize=False, add_generation_prompt=False
)
except Exception:
text = "\n".join(m.get("content", "") for m in example["messages"])
else:
text = example.get("text") or example.get("prompt") or ""
return {"text": text}
ds = ds.map(preprocess, remove_columns=ds.column_names)
def tokenize(example):
return tokenizer(
example["text"],
truncation=True,
max_length=args.max_seq_len,
padding=False,
return_tensors=None,
)
ds = ds.map(tokenize, remove_columns=["text"])
# ----------------------------------------------------------------------
# 4. Recipe
# ----------------------------------------------------------------------
# NVFP4 W4A4 by default. The ignore list mirrors Path A's preserve list:
# output head, embeddings, MoE router gates (NOT gate_proj!), norms, and
# V4-specific attention indexer / mHC residual mixing weights.
ignore = [
"re:.*lm_head",
"re:.*embed_tokens$",
"re:.*\\.mlp\\.gate$",
"re:.*\\.mlp\\.gate\\.weight$",
"re:.*norm.*",
"re:.*indexer.*",
"re:.*hyper_conn.*",
"re:.*\\.mhc.*",
"re:.*scoring.*",
]
if args.no_activation_quant:
print("Recipe: NVFP4 weight-only (W4A16 effective)")
recipe = QuantizationModifier(
targets="Linear",
scheme="NVFP4A16", # weight-only variant
ignore=ignore,
)
else:
print("Recipe: NVFP4 W4A4 with activation calibration")
recipe = QuantizationModifier(
targets="Linear",
scheme="NVFP4",
ignore=ignore,
)
# ----------------------------------------------------------------------
# 5. Run oneshot — sequential pipeline is the key for memory
# ----------------------------------------------------------------------
print("Starting oneshot calibration + quantization (this is the long part)...")
print(f" num_samples={args.num_samples}, max_seq_len={args.max_seq_len}")
print(f" Watch with: watch -n 5 'free -h && nvidia-smi --query-gpu=memory.used,memory.free --format=csv'")
oneshot(
model=model,
dataset=ds,
recipe=recipe,
max_seq_length=args.max_seq_len,
num_calibration_samples=args.num_samples,
# Sequential pipeline: one block at a time on GPU, rest on CPU.
pipeline="sequential",
# Calibrate every expert, even routed-only ones that wouldn't see traffic.
moe_calibrate_all_experts=True,
)
# ----------------------------------------------------------------------
# 6. Save compressed
# ----------------------------------------------------------------------
print(f"Saving compressed checkpoint to {dst} ...")
dst.mkdir(parents=True, exist_ok=True)
model.save_pretrained(str(dst), save_compressed=True)
tokenizer.save_pretrained(str(dst))
# Copy any extra files that save_pretrained doesn't (encoding/, inference/, PDF)
import shutil
for fname in src.iterdir():
if fname.is_dir() and fname.name in {"encoding", "inference", "assets"}:
dst_sub = dst / fname.name
if not dst_sub.exists():
shutil.copytree(fname, dst_sub)
elif fname.suffix in {".pdf", ".md"} and not (dst / fname.name).exists():
shutil.copy2(fname, dst / fname.name)
print("Done.")
print(f"Output: {dst}")
if __name__ == "__main__":
main()

157
quantize_modelopt.py Normal file
View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""NVIDIA Model Optimizer PTQ for DeepSeek V4 Pro → NVFP4.
Uses nvidia-modelopt's official PTQ pipeline with NVFP4Experts-Only config,
which quantizes only MoE expert layers while keeping attention QKV in higher
precision — the recommended approach for DeepSeek MoE models.
Output is a Unified HuggingFace checkpoint deployable on TRT-LLM / vLLM / SGLang.
Usage:
python quantize_modelopt.py \
--model /root/nvidia-meeting/DeepSeek-V4-Pro \
--export_dir /root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4-modelopt \
--qformat nvfp4_experts_only \
--tp 8 \
--calib_size 256
For the FP8 source variant, just change --model path. modelopt handles
dequantization internally.
"""
import argparse
import os
import random
import time
import numpy as np
import torch
import modelopt.torch.opt as mto
import modelopt.torch.quantization as mtq
from modelopt.torch.export import export_hf_checkpoint
from modelopt.torch.utils.dataset_utils import create_forward_loop
from transformers import AutoModelForCausalLM, AutoTokenizer
mto.enable_huggingface_checkpointing()
QUANT_CONFIGS = {
"nvfp4": mtq.NVFP4_DEFAULT_CFG,
"nvfp4_experts_only": mtq.NVFP4_EXPERTS_ONLY_CFG,
"nvfp4_mlp_only": mtq.NVFP4_MLP_ONLY_CFG,
"nvfp4_omlp_only": mtq.NVFP4_OMLP_ONLY_CFG,
"fp8": mtq.FP8_DEFAULT_CFG,
}
def main():
ap = argparse.ArgumentParser(description="Model Optimizer PTQ for DeepSeek V4 Pro")
ap.add_argument("--model", required=True, help="Path to HF model (BF16 or FP8)")
ap.add_argument("--export_dir", required=True, help="Output directory for quantized checkpoint")
ap.add_argument("--qformat", default="nvfp4_experts_only",
choices=list(QUANT_CONFIGS.keys()),
help="Quantization format (default: nvfp4_experts_only for MoE)")
ap.add_argument("--kv_cache_qformat", default="fp8_cast",
help="KV cache quantization (default: fp8_cast, fast no-calib)")
ap.add_argument("--tp", type=int, default=8, help="Tensor parallelism for export")
ap.add_argument("--calib_size", type=int, nargs="+", default=[256],
help="Calibration dataset size (per dataset)")
ap.add_argument("--batch_size", type=int, default=1, help="Calibration batch size")
ap.add_argument("--calib_seq", type=int, default=4096, help="Max calibration sequence length")
ap.add_argument("--trust_remote_code", action="store_true", default=True,
help="Trust remote code (required for V4)")
ap.add_argument("--use_seq_device_map", action="store_true",
help="Use sequential device map for low-memory calibration")
ap.add_argument("--low_memory_mode", action="store_true",
help="Compress weights before calibration (FP8/NVFP4 only)")
args = ap.parse_args()
print(f"=== Model Optimizer PTQ ===")
print(f" Model: {args.model}")
print(f" QFormat: {args.qformat}")
print(f" KV Cache: {args.kv_cache_qformat}")
print(f" TP: {args.tp}")
print(f" Calib: {args.calib_size} samples, seq_len={args.calib_seq}")
print()
# Seed everything
random.seed(1234)
np.random.seed(1234)
torch.manual_seed(1234)
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(
args.model,
trust_remote_code=args.trust_remote_code,
padding_side="left",
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model
print("Loading model...")
model_kwargs = {
"trust_remote_code": args.trust_remote_code,
"torch_dtype": torch.bfloat16,
}
if args.use_seq_device_map:
model_kwargs["device_map"] = "auto"
model = AutoModelForCausalLM.from_pretrained(args.model, **model_kwargs)
if not args.use_seq_device_map:
model = model.cuda()
# Build calibration dataloader
print("Building calibration dataset...")
calib_dataloader = create_forward_loop(
model,
dataloader=get_dataloader(
tokenizer=tokenizer,
calib_size=args.calib_size,
batch_size=args.batch_size,
calib_seq=args.calib_seq,
),
)
# Quantize
quant_cfg = QUANT_CONFIGS[args.qformat]
print(f"Running PTQ with {args.qformat}...")
t0 = time.time()
model = mtq.quantize(model, quant_cfg, calib_dataloader)
elapsed = time.time() - t0
print(f"Quantization complete in {elapsed/60:.1f} min")
# Export
print(f"Exporting to {args.export_dir} ...")
with torch.inference_mode():
export_hf_checkpoint(
model,
args.export_dir,
tokenizer=tokenizer,
export_tensorrt_llm_plugins=True,
)
print(f"Done. Output at {args.export_dir}")
def get_dataloader(tokenizer, calib_size, batch_size, calib_seq):
"""Create calibration dataloader using modelopt's built-in dataset utils."""
from modelopt.torch.utils.dataset_utils import get_dataset_dataloader
return get_dataset_dataloader(
tokenizer=tokenizer,
num_samples=calib_size[0],
batch_size=batch_size,
seq_len=calib_seq,
)
if __name__ == "__main__":
main()

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
compressed-tensors<0.15.0
nvidia-modelopt[hf]
fire
flash-attn>=2.6.0
transformers<5.0
transformers_stream_generator
zstandard

View File

@@ -1,179 +0,0 @@
#!/usr/bin/env python3
"""Sanity check an NVFP4 DeepSeek V4 Pro checkpoint.
Two modes:
1) --tensor-only (default): no model loading. Just inspects the safetensors
shards: confirms NVFP4 packing structure (uint8 weight + FP8 weight_scale
+ FP32 weight_scale_2), checks for NaN/Inf in scales, samples a few
dequantizations to confirm they look plausible.
2) --vllm: tries to load the model with vLLM and generate a few tokens.
Requires vLLM with NVFP4 support (SM100+ Blackwell GPU).
Usage:
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming --vllm
"""
import argparse
import json
import sys
from pathlib import Path
import torch
from safetensors import safe_open
FP4_E2M1_VALUES = torch.tensor(
[0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0,
-0.0, -0.5, -1.0, -1.5, -2.0, -3.0, -4.0, -6.0],
dtype=torch.float32,
)
def unpack_fp4(packed: torch.Tensor) -> torch.Tensor:
"""Reverse the (low | high<<4) byte pack into a [M, N] tensor of FP4 indices."""
low = packed & 0x0F
high = (packed >> 4) & 0x0F
M, N_half = packed.shape
out = torch.empty(M, N_half * 2, dtype=torch.uint8)
out[:, ::2] = low
out[:, 1::2] = high
return out
def dequant_nvfp4(packed_uint8, weight_scale_fp8, weight_scale_2_fp32):
"""Reconstruct FP32 values from NVFP4 storage."""
fp4_idx = unpack_fp4(packed_uint8)
values = FP4_E2M1_VALUES[fp4_idx.long()] # [M, N]
M, N = values.shape
# Per-block scale broadcast back over 16 elements
scale_blocks = weight_scale_fp8.float() # [M, N//16]
scale_per_elem = scale_blocks.unsqueeze(-1).expand(-1, -1, 16).reshape(M, N)
return values * scale_per_elem * weight_scale_2_fp32.float()
def tensor_only_check(model_dir: Path):
index_path = model_dir / "model.safetensors.index.json"
if not index_path.exists():
sys.exit(f"No index.json at {model_dir}")
with open(index_path) as f:
index = json.load(f)
weight_map = index["weight_map"]
# Find one quantized weight to sample
sample = None
for name, fn in weight_map.items():
if name.endswith(".weight") and (name.replace(".weight", ".weight_scale") in weight_map):
sample = name
break
if not sample:
sys.exit("Couldn't find an NVFP4-quantized weight (expected *.weight_scale companion).")
print(f"Sampling: {sample}")
shard_fn = weight_map[sample]
scale_name = sample.replace(".weight", ".weight_scale")
scale_2_name = sample.replace(".weight", ".weight_scale_2")
scale_shard = weight_map[scale_name]
scale_2_shard = weight_map[scale_2_name]
def open_get(fn, name):
with safe_open(model_dir / fn, framework="pt") as f:
return f.get_tensor(name)
packed = open_get(shard_fn, sample)
weight_scale = open_get(scale_shard, scale_name)
weight_scale_2 = open_get(scale_2_shard, scale_2_name)
print(f" packed: shape={tuple(packed.shape)} dtype={packed.dtype}")
print(f" weight_scale: shape={tuple(weight_scale.shape)} dtype={weight_scale.dtype}")
print(f" weight_scale_2: shape={tuple(weight_scale_2.shape)} dtype={weight_scale_2.dtype} "
f"value={weight_scale_2.float().item():.6e}")
# Structural assertions
M = packed.shape[0]
assert packed.dtype == torch.uint8, f"packed should be uint8, got {packed.dtype}"
assert weight_scale.dtype == torch.float8_e4m3fn, \
f"weight_scale should be FP8 E4M3, got {weight_scale.dtype}"
assert weight_scale.shape == (M, packed.shape[1] * 2 // 16), \
f"weight_scale shape {weight_scale.shape} doesn't match expected (M, N/16)"
# Check for NaN/Inf in scales
s_fp32 = weight_scale.float()
assert torch.isfinite(s_fp32).all(), "weight_scale contains NaN/Inf"
assert torch.isfinite(weight_scale_2.float()).all(), "weight_scale_2 is NaN/Inf"
print(f" scales: all finite ✓")
print(f" weight_scale stats: min={s_fp32.min().item():.3e} max={s_fp32.max().item():.3e} "
f"mean={s_fp32.mean().item():.3e}")
# Spot-check dequantization
print("\nDequantizing first 4x32 block for visual check:")
rec = dequant_nvfp4(packed[:4, :16], weight_scale[:4, :2], weight_scale_2)
print(rec)
assert torch.isfinite(rec).all(), "Dequantized values contain NaN/Inf"
print(f" dequant: all finite ✓")
print(f" dequant range: [{rec.min().item():.4f}, {rec.max().item():.4f}]")
# Count what's quantized vs preserved across the whole model
quantized_weights = []
preserved = []
for name in weight_map:
if name.endswith(".weight"):
if name.replace(".weight", ".weight_scale") in weight_map:
quantized_weights.append(name)
else:
preserved.append(name)
print(f"\nWhole-model summary:")
print(f" Quantized .weight tensors: {len(quantized_weights):,}")
print(f" Preserved .weight tensors: {len(preserved):,}")
print(f" Total tensors in index: {len(weight_map):,}")
# Show a few preserved names to confirm the right things stayed in higher precision
print(f"\n Sample preserved tensors (should be lm_head, embed, gates, norms, etc.):")
for n in preserved[:10]:
print(f" {n}")
def vllm_check(model_dir: Path):
print("Loading model with vLLM... (requires Blackwell GPU + vLLM with NVFP4 support)")
from vllm import LLM, SamplingParams
llm = LLM(
model=str(model_dir),
trust_remote_code=True,
quantization="compressed-tensors",
dtype="auto",
tensor_parallel_size=8,
max_model_len=8192,
)
sampling = SamplingParams(temperature=1.0, top_p=1.0, max_tokens=64)
prompts = [
"Write a short poem about quantization:",
"What is 17 * 23?",
"Explain MoE routing in one sentence.",
]
outputs = llm.generate(prompts, sampling)
for o in outputs:
print("=" * 60)
print("PROMPT:", o.prompt)
print("OUTPUT:", o.outputs[0].text)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("model_dir")
ap.add_argument("--vllm", action="store_true")
args = ap.parse_args()
model_dir = Path(args.model_dir)
tensor_only_check(model_dir)
if args.vllm:
print("\n" + "=" * 60)
vllm_check(model_dir)
if __name__ == "__main__":
main()