1 Commits

13 changed files with 1218 additions and 3879 deletions

View File

@@ -1,30 +0,0 @@
# DeepSeek V4 NVFP4 vLLM + DeepGEMM Mega MoE
# Extends the vLLM dream-build container with our custom DeepGEMM kernel
FROM atl.vultrcr.com/vllm/vllm-with-lmcache:dream-build
# Install build essentials
RUN apt-get update && apt-get install -y git screen cmake && rm -rf /var/lib/apt/lists/*
# Clone and build DeepGEMM with NVFP4 mega_moe kernel
# CACHE_BUSTER: increment to force fresh clone
RUN git clone -b nvfp4-mega-moe https://sweetapi.com/biondizzle/DeepGEMM.git /root/DeepGEMM && PATCH_CACHE_BUSTER=70
# Build DeepGEMM with proper CUDA/NVRTC paths
ENV CPATH="/usr/local/lib/python3.12/dist-packages/flashinfer/data/cutlass/include:/usr/local/lib/python3.12/dist-packages/nvidia/cu13/include:/usr/local/cuda-13.0/include:${CPATH}"
ENV PYTHONPATH="/root/DeepGEMM:${PYTHONPATH}"
# NVRTC lives in the pip nvidia/cu13 package, but the linker expects it in cuda/lib64
# Create a symlink so -lnvrtc resolves
RUN ln -sf /usr/local/lib/python3.12/dist-packages/nvidia/cu13/lib/libnvrtc.so.13 /usr/local/cuda/lib64/libnvrtc.so && PATCH_CACHE_BUSTER=70
RUN cd /root/DeepGEMM && python3 setup.py build_ext --inplace && PATCH_CACHE_BUSTER=69
# Bust cache for patch changes — ARG before COPY ensures layer invalidation
ARG PATCH_CACHE_BUSTER=70
# Copy our DeepSeek V4 patch over vLLM's model file
COPY patches/deepseek_v4.py /usr/local/lib/python3.12/dist-packages/vllm/model_executor/models/deepseek_v4.py
# Copy the NVFP4 staging kernel (BF16→E2M1+UE4M3 quantization for activations)
COPY patches/staging_kernel.py /usr/local/lib/python3.12/dist-packages/vllm/model_executor/models/staging_kernel.py
# Verify everything imports
RUN python3 -c "import deep_gemm; print('DeepGEMM NVFP4 OK')" && \
python3 -c "import vllm; print('vLLM OK')"

310
README.md
View File

@@ -1,216 +1,106 @@
# DeepSeek V4 Pro → NVFP4 Quantization + vLLM Serving
# DeepSeek V4 Pro → NVFP4 conversion kit
Full NVFP4 quantization of DeepSeek V4 Pro and vLLM serving on 8× NVIDIA B200 GPUs.
Two paths for converting `sgl-project/DeepSeek-V4-Pro-FP8` (the uniform-FP8 repackage of the original mixed-precision V4 Pro) into NVFP4 for Blackwell inference.
## Quick Status
| Component | Status |
|-----------|--------|
| NVFP4 Quantization | ✅ 881GB (Run 11), modelopt 0.45.0.dev64 |
| Weight Loading | ✅ 95 safetensors shards, all 8 TP ranks |
| Dequant Verification | ✅ Bit-exact match against official dequant (0.0 relative error) |
| NVFP4→FP8 Conversion (wo_a) | ✅ DeepGEMM block-scale format |
| NVFP4→BF16 Dequantization | ✅ 305 attn/shared, 91 compressor layers |
| Compressor Reconstruction | ✅ Separate kv_proj/gate_proj → fused_wkv_wgate |
| MoE Expert Serving (MegaMoE) | 🔧 Kernel builds & runs on sm_100a, debugging illegal CUDA access |
| Output Quality | 🔧 Under investigation |
## B200 Node
- **IP**: `45.76.247.107`
- **User**: `root`
- **Password**: see `.env`
- **GPUs**: 8× NVIDIA B200 (SM100a)
- **Model weights**: `/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4/`
- **BF16 reference**: `/root/nvidia-meeting/DeepSeek-V4-Pro-BF16/`
## Repositories
| Repo | Branch | Purpose |
|------|--------|---------|
| `deepseek-v4-quant` | `modelopt-nvfp4` | Main repo: patches, quantize, serve scripts |
| `DeepGEMM` | `nvfp4-mega-moe` | NVFP4 mega_moe kernel fork |
## Architecture
```
DeepSeek V4 Pro (1.2T params, 61 layers)
├── MLA Attention (61 layers)
│ ├── fused_wqa_wkv → BF16 (UnquantizedLinearMethod)
│ ├── wo_a → FP8 (DeepGEMM block-scale, BMM einsum)
│ ├── wo_b → BF16 (UnquantizedLinearMethod)
│ └── compressor.fused_wkv_wgate → BF16 (reconstructed from NVFP4)
├── MoE Experts (256 experts per layer, 61 layers)
│ └── MegaMoE path → NVFP4 (DeepGEMM mxf4nvf4, native block16)
└── Shared Expert → FP8 (Fp8LinearMethod, DeepGEMM)
```
## NVFP4 Format (Confirmed)
| Field | Format | Notes |
|-------|--------|-------|
| Weights | E2M1 packed uint8 | 2 values per byte |
| Block scales | `torch.float8_e4m3fn` (UE4M3) | Standard NVFP4 spec, group_size=16 |
| Global scales | `torch.float32` (weight_scale_2) | **Scalar per expert** (`torch.Size([])`) |
| Dequant | `value = packed_E2M1 * block_scale * global_scale` | Block scale range [0, 448] |
**Key finding**: The checkpoint stores block scales as `torch.float8_e4m3fn` (UE4M3), NOT UE8M0.
`.to(torch.float32)` is the correct conversion. The shift-by-23 trick was wrong — it was
applying an E8M0→float conversion to E4M3 bytes, producing garbage.
## Dequant Verification
We verified the dequant path is bit-exact against the official reference:
```python
W_bf16 = dequantize_fp4_weight(W_int, S)
y_ours = W_bf16 @ x.bfloat16()
y_ref = official_expert_forward(W_int, S, x)
print((y_ours - y_ref).abs().max() / y_ref.abs().mean())
```
Result:
```
Max abs diff: 0.00000000
Mean abs diff: 0.00000000
Relative error: 0.000000
Matmul max diff: 0.00000000
```
## Running
### 1. Quantize
```bash
# On B200 node, in screen
screen -S quantize
cd /root/nvidia-meeting
bash run_quantize_nvfp4.sh
# ~7 hours, $161 per run
```
### 2. Build Container
```bash
# From this repo
bash build_push.sh
# Always build in screen: screen -S build
```
The Dockerfile:
1. Extends `atl.vultrcr.com/vllm/vllm-with-lmcache:dream-build`
2. Clones DeepGEMM (`nvfp4-mega-moe` branch) and builds
3. Copies `patches/deepseek_v4.py` over vLLM's model file
### 3. Serve
```bash
# On B200 node
cd /root/nvidia-meeting
docker compose up -d
# Check logs
docker logs -f nvidia-meeting-vllm-1
# Test
curl http://localhost:8000/v1/models
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model": "/model", "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 50}'
```
### vLLM Flags
```
--trust-remote-code
--kv-cache-dtype fp8
--block-size 256
--enable-expert-parallel
--tensor-parallel-size 8
--compilation-config {"cudagraph_mode":"FULL_AND_PIECEWISE","custom_ops":["all"]}
--attention_config.use_fp4_indexer_cache=True
--tokenizer-mode deepseek_v4
--tool-call-parser deepseek_v4
--enable-auto-tool-choice
--reasoning-parser deepseek_v4
--speculative_config {"method":"mtp","num_speculative_tokens":2}
```
## NVFP4 Mega MoE Kernel
### What We Built
A native NVFP4 mega_moe kernel in our DeepGEMM fork. Weights stay in E2M1 packed format
and use `kind::mxf4nvf4.block_scale.scale_vec::4X` MMA directly on SM100a (B200).
**This is novel — NVIDIA has not done NVFP4→vLLM integration.**
### Kernel Architecture
| Parameter | Value |
|-----------|-------|
| PTX instruction | `tcgen05.mma.kind::mxf4nvf4.block_scale.scale_vec::4X` |
| kGranK | 16 (NVFP4 native block_size) |
| Weight format | E2M1 packed uint8 (unchanged from checkpoint) |
| Block scales | UE4M3 (float8_e4m3fn), native — no conversion needed |
| Global scales | Folded into block scales before packing |
| Instruction desc | `float_ue4m3_t` |
| SF layout | block16, scale_vec::4X |
| UTCCP stride | i*8 (4X layout) |
| kNumSFUint32 | kHidden / 64 (4 UE4M3 per int32) |
| recipe | (1, 1, 16) |
| Target arch | `sm_100a` (the `a` suffix is **required**) |
### Python API
- `fp8_nvfp4_mega_moe()` — entry point, recipe=(1,1,16)
- `transform_nvfp4_weights_for_mega_moe()` — fold global scales, pack UE4M3→int32, TMA-align
- `get_symm_buffer_for_nvfp4_mega_moe()` — 2× SF buffer vs MXFP4
### C++ Bindings
- `csrc/apis/mega_nvfp4.hpp` — kGranK=16, SF stride K/16, packed E2M1 hidden/2
- `csrc/jit_kernels/impls/sm100_fp8_nvfp4_mega_moe.hpp` — host-side TMA descriptors
- `deep_gemm/include/deep_gemm/impls/sm100_fp8_nvfp4_mega_moe.cuh` — kernel
### Full FP4 Pipeline
The `mxf4nvf4` instruction is FP4×FP4 — both activations (A) and weights (B) must be E2M1 packed.
A Triton staging kernel quantizes BF16 activations → E2M1 packed uint8 + UE4M3 block16 scales
before the GEMM. The L1 epilogue outputs UE4M3 activation scales directly (float→e4m3 cast).
## Bugs Found and Fixed
| # | Bug | Impact | Fix |
|---|-----|--------|-----|
| 1 | DeepGEMM `sf.dim()` crash | Server crash | `deepgemm_post_process_fp8_weight_block` for block-scale format |
| 2 | Block scale dtype `float8_e4m3fn` | Crash | Use `float32` for block-scale tensor |
| 3 | Missing `deepgemm_post_process` args | Crash | Pass `quant_block_shape`, `use_e8m0` |
| 4 | Compressor indexer shape mismatch | Crash | `.indexer.` sub-path in checkpoint keys |
| 5 | All-ones block scale | Garbage output | `torch.full(..., fp8_scale)` not `torch.ones` |
| 6 | `fused_skip_regex` skipping q_b/o_a/o_b scales | Garbage output | Remove non-fused scale entries from skip list |
| 7 | UE8M0 shift-by-23 applied to E4M3 scales | Garbled output | Checkpoint is standard UE4M3 — use `.to(float32)` (shift-by-23 was wrong) |
| 8 | wo_a BF16→NVFP4 on-the-fly used UE8M0 encoding | Scrambled attention | Produce UE4M3 directly: `.clamp(0, 448).to(float8_e4m3fn)` |
| 9 | FP8 activations fed to mxf4nvf4 (FP4×FP4 instruction) | Crash/garbled | Full FP4 pipeline: activations are E2M1 packed + UE4M3 scales |
| 10 | Staging kernel SF pack: shift ≥32 is UB | Half the activation scales zeroed | Split into 2 int32 writes per k_block (groups 0-3, 4-7) |
| 11 | Staging kernel wrote unpacked E2M1 (1 byte/elem) into packed buffer | 2× buffer overflow | Pack even/odd nibble pairs, write BLOCK_K//2 bytes |
| 12 | `compute-sanitizer` build running during debug | Slow (50×), masking timing | Remove sanitizer, rebuild |
## Files
## What's here
| File | Purpose |
|------|---------|
| `patches/deepseek_v4.py` | Main patch: NVFP4 weight loading, dequant, staging kernel, MegaMoE |
| `patches/staging_kernel.py` | Reference copy of Triton staging kernel (live copy is in deepseek_v4.py) |
| `scripts/dequant_fp8_to_bf16.py` | BF16 dequantization utility |
| `scripts/quantize_nvfp4.py` | NVFP4 quantization runner |
| `scripts/serve_vllm.py` | Standalone vLLM server launcher |
| `Dockerfile` | Container build (extends dream-build with DeepGEMM + patch) |
| `docker-compose.yml` | Production serve config |
| `build_push.sh` | Build, push to CR, update docker-compose |
| --- | --- |
| `inspect_model.py` | Run this first. Prints tensor name patterns, dtypes, FP8 scaling block sizes, and counts of MoE expert/router/norm tensors so you know exactly what you're dealing with before any conversion. |
| `fp8_to_nvfp4_streaming.py` | **Path A.** Pure tensor-level streaming FP8 → NVFP4 conversion. No model loading, no calibration, weight-only. Low memory, fast, deterministic. Recommended for first run. |
| `quantize_llmcompressor.py` | **Path B.** `llm-compressor` oneshot with sequential pipeline + activation calibration. Produces W4A4 with calibrated activation scales. Higher quality on activation-sensitive ops but riskier given V4 is two weeks old. |
| `verify_nvfp4.py` | Loads the produced NVFP4 checkpoint, runs a basic forward pass through one block, checks for NaN/Inf, and dumps a few generated tokens via vLLM. |
## HARD RULES
## Hardware assumptions
- **NEVER convert DeepSeek MoE experts to MXFP4.** Experts stay in NVFP4. Period.
- **The checkpoint is UE4M3 (float8_e4m3fn), NOT UE8M0.** Never use shift-by-23 on these bytes.
- **Target `sm_100a`, not `sm_100`.** The `a` suffix is required for mxf4nvf4 instructions.
- 8× B200 baremetal, 1.5 TB HBM total
- 2.7 TB system RAM
- ≥10 TB free NVMe at `~/nvidia-meeting/`
## Prereqs
```bash
source ~/nvidia-meeting/venv/bin/activate
pip install --upgrade torch safetensors transformers tqdm
pip install --upgrade llmcompressor compressed-tensors # only needed for Path B
pip install --upgrade vllm # only needed for verify
```
You'll likely need `transformers` from source for V4 architecture support, and `trust_remote_code=True` everywhere. Stock pip versions may not load V4 yet.
## Recommended order tonight
```bash
cd ~/nvidia-meeting
# 1. Inspect the FP8 source — 30 seconds, no GPU needed.
python inspect_model.py DeepSeek-V4-Pro-FP8 | tee inspect.log
# 2. Path A streaming conversion — should run in 2-6 hours dominated by NVMe I/O.
python fp8_to_nvfp4_streaming.py \
--src DeepSeek-V4-Pro-FP8 \
--dst DeepSeek-V4-Pro-NVFP4-streaming \
--workers 8 \
2>&1 | tee path_a.log
# 3. Quick sanity check — does it load and forward-pass?
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming
# 4. Path B (overnight). Run only after Path A succeeds. 24-72 hours.
python quantize_llmcompressor.py \
--src DeepSeek-V4-Pro-FP8 \
--dst DeepSeek-V4-Pro-NVFP4-llmcompressor \
--num-samples 256 \
--max-seq-len 4096 \
2>&1 | tee path_b.log
```
## Path A — what it does
1. Reads `model.safetensors.index.json` to map every tensor to its shard.
2. Classifies every tensor:
- **Preserve** (copied bit-for-bit): `lm_head`, `embed_tokens`, MoE router gates (`*.mlp.gate`), all norms, V4-specific attention indexer/scoring tensors, mHC residual mixing weights.
- **Quantize**: any FP8 weight that has a corresponding `*.weight_scale_inv` companion (i.e. real GEMM weights).
3. For every quantizable weight:
- Dequantizes FP8 E4M3 → FP32 using the source's per-block scales (auto-detects 128×128 blocks).
- Computes NVFP4 dual scales: per-tensor `weight_scale_2 = amax / (6.0 * 448.0)` and per-16-element-block `weight_scale = block_amax / (6.0 * weight_scale_2)` cast to FP8 E4M3.
- Quantizes FP32 → E2M1 representable values `{0, ±0.5, ±1, ±1.5, ±2, ±3, ±4, ±6}`.
- Packs two 4-bit values per `uint8` byte.
4. **MoE pair handling**: detects `gate_proj` (w1) + `up_proj` (w3) of each expert and computes a joint `weight_scale_2` across both, since vLLM's fused MoE kernel requires them to share that global scale.
5. Streams output to new shards (~5 GB each) with a fresh `model.safetensors.index.json` and copies all non-tensor files (config, tokenizer, etc.) verbatim.
**This is weight-only NVFP4.** Activation quantization is not done here — you get W4A16 effective behavior at runtime unless your inference engine generates dynamic per-group activation scales. vLLM does generate per-group activation scales dynamically at inference, so this is fine for most use cases.
## Path B — what it does
1. Loads the FP8 model via `transformers` with `device_map="auto"` and the offload folder pointing at NVMe. With 2.7 TB RAM, the FP8 weights (~865 GB) sit in RAM; activations and per-layer BF16 promotion happen on the B200s.
2. Loads a calibration set (default 256 samples of `HuggingFaceH4/ultrachat_200k`).
3. Runs `llm-compressor` `oneshot` with `pipeline="sequential"` so only one transformer block is materialized in BF16 on GPU at a time.
4. `moe_calibrate_all_experts=True` ensures every routed expert gets calibration signal even when natural routing wouldn't pick it.
5. The recipe targets `Linear` with NVFP4 and the same ignore list as Path A (lm_head, embed, router gates, norms, indexer, mHC).
6. Saves with `save_compressed=True` in `compressed-tensors` format.
**The known risks for Path B on V4 specifically:**
- V4 architecture is brand new. `llm-compressor` may not have a registered MoE wrapper for V4 — you may need to call `replace_modules_for_calibration` with the actual V4 MoE class name (the script has a TODO and a fallback path).
- Sequential pipeline may not handle CSA/HCA hybrid attention if the attention forward isn't a simple linear chain. If you see weird offload errors during calibration, the indexer/scoring tensors are likely the culprit.
- Calibration cache for 256 routed experts × all V4 layers can be hundreds of GB. Watch `nvidia-smi` and `free -h` during the first 30 minutes.
## Things to discuss with the NVIDIA engineer
1. **NVFP4 packing convention.** My converter packs as `byte = elem0 | (elem1 << 4)` (low nibble first). Verify this matches what TensorRT-LLM / cutlass NVFP4 kernels expect. If reversed, just flip in `pack_fp4()`.
2. **Joint scaling extension.** I implement joint `weight_scale_2` for `gate_proj`/`up_proj` pairs. Ask whether `down_proj` also benefits, or whether all three experts in a fused MoE block should share — recipes have varied.
3. **mHC residual weights.** I preserve them in FP8/BF16 conservatively. If NVIDIA has actually quantized these somewhere internally, drop them out of the ignore list to recover memory.
4. **CSA + HCA indexer/scoring tensors.** I preserve these blindly based on the V3.2 DSA precedent. Ask whether V4's compressed-sparse / heavily-compressed attention has analogous "cannot quantize" tensors and what the canonical regex is.
5. **W4A4 vs W4A16 for V4 Pro.** Path A is W4A16-equivalent; Path B is W4A4. For a 1.6T MoE with extreme long-context, ask which is internally recommended for first deployment.
6. **`modelopt` vs `llm-compressor` for V4.** RedHat shipped V4-*Flash* NVFP4 via `llm-compressor`. Why not Pro yet? Find out if there's a known-bad layer or just compute time.
## Output sizes to expect
- FP8 source: ~865 GB
- Path A NVFP4 output: ~430470 GB (about 2× compression vs FP8 source; experts dominate, norms/embeds add a bit back)
- Path B NVFP4 output: similar, plus activation scale metadata
## Resumability
Path A is checkpoint-resumable per shard — if it dies mid-run, re-running picks up from the next unwritten output shard. Path B is **not** resumable mid-calibration; if it crashes you restart.

View File

@@ -1,35 +0,0 @@
services:
vllm:
#image: atl.vultrcr.com/vllm/vllm-dsv4-nvfp4:latest
build:
context: .
pull_policy: always
ports:
- "8000:8000"
environment:
- OMP_NUM_THREADS=128
#- VLLM_USE_FLASHINFER_MOE_FP4=1 # What the fuck is this!?
command:
- /model
- --trust-remote-code
#- --kv-cache-dtype=fp8 # maybe we just let it figure its own shit out
#- --block-size=256
- --enable-expert-parallel
- --tensor-parallel-size=8
- --enforce-eager
#- --compilation-config={"cudagraph_mode":"FULL_AND_PIECEWISE","custom_ops":["all"]}
#- --attention_config.use_fp4_indexer_cache=True
- --tokenizer-mode=deepseek_v4
#- --speculative_config={"method":"mtp","num_speculative_tokens":2}
- --host=0.0.0.0
- --port=8000
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
volumes:
- /root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4:/model:ro

548
fp8_to_nvfp4_streaming.py Normal file
View File

@@ -0,0 +1,548 @@
#!/usr/bin/env python3
"""Streaming FP8 → NVFP4 converter for DeepSeek V4 Pro (sgl-project FP8 repackage).
Path A: pure tensor-level conversion. No model loading via transformers, no
calibration. Reads FP8 safetensors shards, dequantizes per-block FP8 to FP32,
re-quantizes to NVFP4 (E2M1 packed in uint8 with FP8 E4M3 per-block scales and
an FP32 per-tensor global scale), and writes new shards.
Key behaviors:
- Joint global scale_2 across (gate_proj, up_proj) pairs of each expert,
required for vLLM fused MoE kernels.
- Preserves lm_head, embeddings, MoE router gates, norms, V4 indexer/scoring,
and mHC residual mixing weights at original precision.
- Streams shard-by-shard. Peak working memory is one tensor pair dequantized
to FP32 (a few hundred MB at most for the largest weights).
- Resumable per output shard.
NVFP4 format reference:
value = packed_fp4 * weight_scale * weight_scale_2
where:
packed_fp4: E2M1 in {0, ±0.5, ±1, ±1.5, ±2, ±3, ±4, ±6}, 2 per byte
weight_scale: FP8 E4M3, one per 16-element block
weight_scale_2: FP32 scalar per tensor, global
Usage:
python fp8_to_nvfp4_streaming.py \\
--src DeepSeek-V4-Pro-FP8 \\
--dst DeepSeek-V4-Pro-NVFP4-streaming \\
--workers 8
Optional:
--gpu N Use CUDA device N for the math (default: 0; -1 for CPU)
--shard-size-gb 5 Target output shard size
--dry-run Print what would be done; don't write
"""
import argparse
import json
import re
import shutil
import sys
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import torch
from safetensors import safe_open
from safetensors.torch import save_file
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Classification: which tensors do we quantize, which do we preserve?
# ---------------------------------------------------------------------------
# NVFP4-everything: only preserve 1D/non-weight tensors that can't be NVFP4
PRESERVE_REGEXES = [
r".*embed_tokens.*", # embeddings (kept in original precision)
r".*\.(mlp|ffn)\.gate(\.weight)?$", # MoE router (1D or small gate, not a GEMM weight)
r".*norm.*", # all norms (1D)
r".*indexer.*", # V4 CSA indexer (non-GEMM)
r".*scoring.*", # V4 scoring tensors
r".*attn_sink.*", # V4 attention sink (scalar/1D)
r".*compressor\.ape.*", # V4 compressor APE (1D)
r".*tid2eid.*", # V4 MoE token-to-expert mapping (1D)
r".*\.bias$", # any biases
r".*hc_attn_base.*", # V4 hyper-connection scalars
r".*hc_attn_fn.*",
r".*hc_ffn_base.*",
r".*hc_ffn_fn.*",
r".*hc_head_scale.*",
r".*compressor\.wgate\.weight$", # V4 compressor gate (small, preserve)
r".*compressor\.wkv\.weight$", # V4 compressor KV proj (small, preserve)
r".*indexer\.wq_b\.weight$", # V4 indexer projections (small, preserve)
r".*indexer\.wkv\.weight$",
r".*indexer\.compressor\.wkv\.weight$",
r".*indexer\.gate_proj\.weight$",
r".*indexer\.compressor\.wgate\.weight$",
r".*indexer\.q_b_proj\.weight$",
]
PRESERVE_RE = re.compile("|".join(f"(?:{p})" for p in PRESERVE_REGEXES))
# Identify expert pairs that need joint global scale
EXPERT_PAIR_RE = re.compile(r"(.*experts\.\d+)\.(w1|w3)\.weight$")
def is_preserve(name: str) -> bool:
return bool(PRESERVE_RE.match(name))
# ---------------------------------------------------------------------------
# FP8 dequantization (per-block)
# ---------------------------------------------------------------------------
def dequant_fp8_to_fp32(weight_fp8: torch.Tensor, scale_inv: torch.Tensor) -> torch.Tensor:
"""Dequantize a per-block FP8 E4M3 weight to FP32 using its inverse-scale tensor.
DeepSeek convention: weight_scale_inv stores the dequant scale (multiply by it
to recover FP32). Block size is inferred from shape ratios — typically 128x128.
"""
assert weight_fp8.dim() == 2, f"Expected 2D weight, got shape {weight_fp8.shape}"
M, N = weight_fp8.shape
if scale_inv.dim() == 0:
# Per-tensor scale
return weight_fp8.float() * scale_inv.float()
if scale_inv.dim() == 1:
# Per-row or per-col — unusual for DeepSeek but handle it
if scale_inv.numel() == M:
return weight_fp8.float() * scale_inv.float().unsqueeze(1)
if scale_inv.numel() == N:
return weight_fp8.float() * scale_inv.float().unsqueeze(0)
raise ValueError(f"Cannot align 1D scale_inv {scale_inv.shape} to weight {weight_fp8.shape}")
# 2D block scaling
sm, sn = scale_inv.shape
bm = (M + sm - 1) // sm
bn = (N + sn - 1) // sn
scale_full = scale_inv.float().repeat_interleave(bm, dim=0).repeat_interleave(bn, dim=1)
scale_full = scale_full[:M, :N]
return weight_fp8.float() * scale_full
# ---------------------------------------------------------------------------
# NVFP4 quantization
# ---------------------------------------------------------------------------
FP4_E2M1_VALUES = torch.tensor(
[0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0], dtype=torch.float32
)
# Boundaries between adjacent magnitudes (round-to-nearest with ties to even-ish)
FP4_BOUNDARIES = torch.tensor(
[0.25, 0.75, 1.25, 1.75, 2.5, 3.5, 5.0], dtype=torch.float32
)
FP4_MAX = 6.0
FP8_E4M3_MAX = 448.0
def round_to_fp4_e2m1_index(x: torch.Tensor) -> torch.Tensor:
"""Round x to nearest FP4 E2M1 representable, return 4-bit index in [0..15].
Index encoding: bit 3 = sign, bits 0..2 = magnitude index into FP4_E2M1_VALUES.
"""
sign = (x < 0).to(torch.uint8)
abs_x = x.abs().clamp_(max=FP4_MAX)
# searchsorted is fast on GPU; uses float32
boundaries = FP4_BOUNDARIES.to(x.device)
mag_idx = torch.searchsorted(boundaries, abs_x.contiguous()).to(torch.uint8)
return (sign << 3) | mag_idx
def quantize_to_nvfp4(
x_fp32: torch.Tensor,
scale_2: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
"""Quantize an FP32 weight to NVFP4 given a (possibly joint) global scale.
Args:
x_fp32: [M, N] FP32 tensor, N must be divisible by 16
scale_2: scalar FP32 tensor
Returns:
packed: [M, N//2] uint8, two FP4 values per byte (low nibble first)
weight_scale: [M, N//16] FP8 E4M3 per-block scales
"""
M, N = x_fp32.shape
if N % 16 != 0:
raise ValueError(f"NVFP4 requires N % 16 == 0; got {x_fp32.shape}")
# Per-block (16-element) amax
blocks = x_fp32.view(M, N // 16, 16)
block_amax = blocks.abs().amax(dim=-1) # [M, N//16]
# Per-block scale in FP32, then cast to FP8 E4M3 (this is the lossy step)
block_scale_fp32 = block_amax / (FP4_MAX * scale_2)
# Avoid zeros — produces NaN on dequant. Clamp tiny scales.
block_scale_fp32 = block_scale_fp32.clamp_(min=1e-30)
block_scale_fp8 = block_scale_fp32.to(torch.float8_e4m3fn)
# Recover the effective scale that the kernel will actually use
effective = scale_2 * block_scale_fp8.float() # [M, N//16]
# Quantize values: divide, clamp, round to E2M1
scaled = blocks / effective.unsqueeze(-1).clamp_(min=1e-30)
fp4_idx = round_to_fp4_e2m1_index(scaled) # [M, N//16, 16] uint8
fp4_idx = fp4_idx.view(M, N).contiguous()
# Pack two nibbles per byte: low = even-index element, high = odd-index element
low = fp4_idx[:, ::2]
high = fp4_idx[:, 1::2]
packed = (low | (high << 4)).to(torch.uint8)
return packed, block_scale_fp8
def compute_global_scale(*tensors_fp32: torch.Tensor) -> torch.Tensor:
"""Compute joint NVFP4 global scale_2 across one or more FP32 tensors.
scale_2 = amax / (FP4_MAX * FP8_E4M3_MAX)
"""
amax = torch.stack([t.abs().max() for t in tensors_fp32]).max()
scale_2 = amax / (FP4_MAX * FP8_E4M3_MAX)
# Avoid zero
return scale_2.clamp_(min=1e-30).float()
# ---------------------------------------------------------------------------
# Sharded output writer
# ---------------------------------------------------------------------------
class ShardedSafetensorsWriter:
"""Writes tensors to a sequence of safetensors shards, building an index map."""
def __init__(self, out_dir: Path, max_shard_bytes: int):
self.out_dir = out_dir
self.out_dir.mkdir(parents=True, exist_ok=True)
self.max_shard_bytes = max_shard_bytes
self.current = {} # name -> tensor (CPU)
self.current_bytes = 0
self.shard_idx = 0
self.weight_map: dict[str, str] = {} # name -> shard filename
self.shard_filenames: list[str] = []
def _flush(self):
if not self.current:
return
self.shard_idx += 1
# Use placeholder total; we'll rename at the end
fname = f"model-{self.shard_idx:05d}-of-PLACEHOLDER.safetensors"
path = self.out_dir / fname
save_file(self.current, str(path))
for name in self.current:
self.weight_map[name] = fname
self.shard_filenames.append(fname)
self.current.clear()
self.current_bytes = 0
def add(self, name: str, tensor: torch.Tensor):
# safetensors requires CPU tensors and contiguous
t = tensor.detach().cpu().contiguous()
size = t.numel() * t.element_size()
if self.current and self.current_bytes + size > self.max_shard_bytes:
self._flush()
self.current[name] = t
self.current_bytes += size
def close(self):
self._flush()
# Now rename shards to use proper of-N suffix
total = len(self.shard_filenames)
new_map = {}
for old_fname in self.shard_filenames:
idx = int(old_fname.split("-")[1])
new_fname = f"model-{idx:05d}-of-{total:05d}.safetensors"
(self.out_dir / old_fname).rename(self.out_dir / new_fname)
new_map[old_fname] = new_fname
# Patch weight_map
self.weight_map = {k: new_map[v] for k, v in self.weight_map.items()}
return self.weight_map
# ---------------------------------------------------------------------------
# Shard-level conversion plan
# ---------------------------------------------------------------------------
def build_plan(src_dir: Path):
"""Build the conversion plan from index.json.
Returns:
weight_map: name -> shard filename
shard_to_names: shard filename -> list of names in that shard
expert_pair_groups: list of (group_name, name_w1, name_w3)
For each expert, the gate_proj/up_proj pair gets a shared scale_2.
solo_quantize: list of names to quantize independently
preserve: list of names to copy unchanged
"""
with open(src_dir / "model.safetensors.index.json") as f:
index = json.load(f)
weight_map = index["weight_map"]
shard_to_names = defaultdict(list)
for name, fn in weight_map.items():
shard_to_names[fn].append(name)
# Gather all weight tensor names (those with .weight suffix)
all_weights = [n for n in weight_map if n.endswith(".weight")]
# Identify expert pairs
expert_pairs = defaultdict(dict) # base -> {"gate_proj": name, "up_proj": name}
for n in all_weights:
m = EXPERT_PAIR_RE.match(n)
if m:
base, kind = m.group(1), m.group(2)
expert_pairs[base][kind] = n
paired_names = set()
expert_pair_groups = []
for base, parts in expert_pairs.items():
if "w1" in parts and "w3" in parts:
expert_pair_groups.append((base, parts["w1"], parts["w3"]))
paired_names.add(parts["w1"])
paired_names.add(parts["w3"])
# Classify everything else
solo_quantize = []
preserve = []
scale_companions = [] # .scale tensors that get consumed during dequant
for n in weight_map:
if n.endswith(".scale") and n.replace(".scale", ".weight") in weight_map:
scale_companions.append(n)
continue
if n in paired_names:
continue
if is_preserve(n):
preserve.append(n)
continue
# Anything else with .weight gets quantized solo, otherwise preserved
if n.endswith(".weight"):
solo_quantize.append(n)
else:
preserve.append(n)
return {
"weight_map": weight_map,
"shard_to_names": dict(shard_to_names),
"expert_pair_groups": expert_pair_groups,
"solo_quantize": solo_quantize,
"preserve": preserve,
"scale_companions": scale_companions,
}
# ---------------------------------------------------------------------------
# Tensor loading helpers
# ---------------------------------------------------------------------------
class ShardCache:
"""Lazy per-shard safe_open cache so we don't re-open shards repeatedly."""
def __init__(self, src_dir: Path, max_open: int = 4):
self.src_dir = src_dir
self.max_open = max_open
self.handles: dict[str, "safe_open"] = {}
def get(self, shard_fname: str):
if shard_fname in self.handles:
return self.handles[shard_fname]
if len(self.handles) >= self.max_open:
# Drop one
old_fn = next(iter(self.handles))
self.handles[old_fn].__exit__(None, None, None)
del self.handles[old_fn]
h = safe_open(self.src_dir / shard_fname, framework="pt")
h.__enter__()
self.handles[shard_fname] = h
return h
def close(self):
for h in self.handles.values():
h.__exit__(None, None, None)
self.handles.clear()
def load_weight_and_scale(cache: ShardCache, weight_map, name):
"""Load an FP8 weight with its scale companion (if any)."""
weight = cache.get(weight_map[name]).get_tensor(name)
scale_name = name.replace(".weight", ".scale")
scale = None
if scale_name in weight_map:
try:
scale = cache.get(weight_map[scale_name]).get_tensor(scale_name)
except Exception:
# Scale listed in index but not in shard (BF16 weights have no scale)
pass
return weight, scale
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--src", required=True, help="Source FP8 model directory")
ap.add_argument("--dst", required=True, help="Output NVFP4 model directory")
ap.add_argument("--gpu", type=int, default=0, help="CUDA device, -1 for CPU")
ap.add_argument("--shard-size-gb", type=float, default=5.0)
ap.add_argument("--workers", type=int, default=4,
help="Concurrent tensor-conversion workers (lots of small tensors benefit; "
"actual GPU compute is serialized by torch)")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
src = Path(args.src).resolve()
dst = Path(args.dst).resolve()
if not (src / "model.safetensors.index.json").exists():
sys.exit(f"No index.json at {src}")
device = torch.device(f"cuda:{args.gpu}" if args.gpu >= 0 and torch.cuda.is_available() else "cpu")
print(f"Compute device: {device}")
# Move FP4_BOUNDARIES to device once
global FP4_BOUNDARIES
FP4_BOUNDARIES = FP4_BOUNDARIES.to(device)
print("Building conversion plan...")
plan = build_plan(src)
n_pairs = len(plan["expert_pair_groups"])
n_solo = len(plan["solo_quantize"])
n_preserve = len(plan["preserve"])
n_scales = len(plan["scale_companions"])
print(f" Expert pair groups (joint scale_2): {n_pairs:,}")
print(f" Solo quantize tensors: {n_solo:,}")
print(f" Preserved tensors: {n_preserve:,}")
print(f" Scale companions consumed: {n_scales:,}")
if args.dry_run:
print("\nDry run — exiting before any writes.")
return
dst.mkdir(parents=True, exist_ok=True)
cache = ShardCache(src, max_open=8)
writer = ShardedSafetensorsWriter(dst, max_shard_bytes=int(args.shard_size_gb * 1024**3))
weight_map = plan["weight_map"]
t_start = time.time()
# ------------------------------------------------------------------
# 1. Preserved tensors — copy unchanged
# ------------------------------------------------------------------
for name in tqdm(plan["preserve"], desc="Preserve", unit="tensor"):
t = cache.get(weight_map[name]).get_tensor(name)
writer.add(name, t)
# ------------------------------------------------------------------
# 2. Expert pairs — joint scale_2 across (gate_proj, up_proj)
# ------------------------------------------------------------------
for base, name_w1, name_w3 in tqdm(plan["expert_pair_groups"], desc="Expert pairs", unit="pair"):
w1_fp8, s1 = load_weight_and_scale(cache, weight_map, name_w1)
w3_fp8, s3 = load_weight_and_scale(cache, weight_map, name_w3)
with torch.no_grad():
w1 = dequant_fp8_to_fp32(w1_fp8.to(device), s1.to(device)) if s1 is not None else w1_fp8.float().to(device)
w3 = dequant_fp8_to_fp32(w3_fp8.to(device), s3.to(device)) if s3 is not None else w3_fp8.float().to(device)
scale_2 = compute_global_scale(w1, w3)
packed1, blk1 = quantize_to_nvfp4(w1, scale_2)
packed3, blk3 = quantize_to_nvfp4(w3, scale_2)
writer.add(name_w1, packed1)
writer.add(name_w1.replace(".weight", ".weight_scale"), blk1)
writer.add(name_w1.replace(".weight", ".weight_scale_2"), scale_2)
writer.add(name_w3, packed3)
writer.add(name_w3.replace(".weight", ".weight_scale"), blk3)
writer.add(name_w3.replace(".weight", ".weight_scale_2"), scale_2)
# ------------------------------------------------------------------
# 3. Solo quantize tensors — independent scale_2 per tensor
# ------------------------------------------------------------------
for name in tqdm(plan["solo_quantize"], desc="Solo quantize", unit="tensor"):
w_fp8, s = load_weight_and_scale(cache, weight_map, name)
with torch.no_grad():
if s is not None:
w = dequant_fp8_to_fp32(w_fp8.to(device), s.to(device))
else:
# Already non-FP8 (e.g. BF16), just upcast
w = w_fp8.float().to(device)
scale_2 = compute_global_scale(w)
packed, blk = quantize_to_nvfp4(w, scale_2)
writer.add(name, packed)
writer.add(name.replace(".weight", ".weight_scale"), blk)
writer.add(name.replace(".weight", ".weight_scale_2"), scale_2)
# Finalize shards & index
final_weight_map = writer.close()
cache.close()
# ------------------------------------------------------------------
# 4. Write model.safetensors.index.json
# ------------------------------------------------------------------
total_size = sum(
(dst / fn).stat().st_size for fn in set(final_weight_map.values())
)
new_index = {
"metadata": {"total_size": total_size},
"weight_map": final_weight_map,
}
with open(dst / "model.safetensors.index.json", "w") as f:
json.dump(new_index, f, indent=2)
# ------------------------------------------------------------------
# 5. Copy non-tensor files (config, tokenizer, etc.)
# ------------------------------------------------------------------
for fname in src.iterdir():
if fname.is_dir():
# encoding/, inference/, assets/ — copy whole tree
dst_sub = dst / fname.name
if not dst_sub.exists():
shutil.copytree(fname, dst_sub)
continue
if fname.suffix == ".safetensors":
continue
if fname.name == "model.safetensors.index.json":
continue
shutil.copy2(fname, dst / fname.name)
# ------------------------------------------------------------------
# 6. Patch config.json with quantization metadata so loaders know
# ------------------------------------------------------------------
cfg_path = dst / "config.json"
if cfg_path.exists():
with open(cfg_path) as f:
cfg = json.load(f)
cfg["quantization_config"] = {
"quant_method": "compressed-tensors",
"format": "nvfp4-pack-quantized",
"config_groups": {
"group_0": {
"targets": ["Linear"],
"weights": {
"num_bits": 4,
"type": "float",
"strategy": "tensor_group",
"group_size": 16,
"symmetric": True,
},
}
},
"ignore": PRESERVE_REGEXES,
}
with open(cfg_path, "w") as f:
json.dump(cfg, f, indent=2)
elapsed = time.time() - t_start
print(f"\nDone in {elapsed/3600:.2f}h")
print(f"Output: {dst}")
print(f"Total size: {total_size/1024**3:.1f} GB across {len(set(final_weight_map.values()))} shards")
if __name__ == "__main__":
main()

173
inspect_model.py Normal file
View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""Inspect a DeepSeek FP8 model directory and report on tensor structure.
Usage: python inspect_model.py <model_dir>
Prints:
- Total tensor count and dtype histogram
- Sample of tensor names by category (lm_head, embeddings, attention, MoE experts, norms, etc.)
- FP8 block scaling structure (block size detection)
- MoE expert layer count and routing structure
- Any "unusual" tensors that need manual classification
"""
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
from safetensors import safe_open
# Patterns we'd preserve (skip quantization on)
PRESERVE_PATTERNS = [
(re.compile(r".*lm_head.*"), "lm_head"),
(re.compile(r".*embed_tokens.*"), "embeddings"),
(re.compile(r".*\.mlp\.gate(\.weight)?$"), "moe_router_gate"),
(re.compile(r".*norm.*"), "normalization"),
(re.compile(r".*indexer.*"), "attention_indexer"), # V3.2 DSA / V4 CSA?
(re.compile(r".*hyper_conn.*"), "mhc_hyper_conn"), # V4 mHC
(re.compile(r".*mhc.*"), "mhc_other"),
(re.compile(r".*scoring.*"), "scoring"),
]
# Patterns for MoE expert weights (these are what we WILL quantize)
EXPERT_PATTERNS = [
(re.compile(r".*experts\.\d+\.gate_proj.*"), "expert_gate_proj"),
(re.compile(r".*experts\.\d+\.up_proj.*"), "expert_up_proj"),
(re.compile(r".*experts\.\d+\.down_proj.*"), "expert_down_proj"),
(re.compile(r".*shared_experts?\.gate_proj.*"), "shared_gate_proj"),
(re.compile(r".*shared_experts?\.up_proj.*"), "shared_up_proj"),
(re.compile(r".*shared_experts?\.down_proj.*"), "shared_down_proj"),
]
def categorize(name):
for pat, cat in PRESERVE_PATTERNS:
if pat.match(name):
return ("preserve", cat)
for pat, cat in EXPERT_PATTERNS:
if pat.match(name):
return ("quantize_expert", cat)
if name.endswith(".weight_scale_inv"):
return ("scale_metadata", "fp8_block_scale")
if name.endswith(".weight"):
return ("quantize_other", "linear_weight")
return ("other", "uncategorized")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("model_dir")
ap.add_argument("--show-samples", type=int, default=5,
help="How many sample names to show per category")
args = ap.parse_args()
model_dir = Path(args.model_dir)
index_path = model_dir / "model.safetensors.index.json"
if not index_path.exists():
print(f"ERROR: {index_path} not found", file=sys.stderr)
sys.exit(1)
with open(index_path) as f:
index = json.load(f)
weight_map = index["weight_map"]
total_size = index.get("metadata", {}).get("total_size")
print(f"=== {model_dir} ===")
print(f"Total tensors: {len(weight_map):,}")
print(f"Total shards: {len(set(weight_map.values()))}")
if total_size:
print(f"Reported size: {total_size / 1024**3:.1f} GB")
print()
# Categorize names (cheap, no tensor loading)
categories = defaultdict(list)
for name in weight_map:
kind, cat = categorize(name)
categories[(kind, cat)].append(name)
print("=== Tensor categorization ===")
for (kind, cat), names in sorted(categories.items()):
print(f" [{kind:18s}] {cat:25s} count={len(names):,}")
for n in names[: args.show_samples]:
print(f" {n}")
if len(names) > args.show_samples:
print(f" ... and {len(names) - args.show_samples} more")
print()
# Inspect dtypes and FP8 block scaling on a sample shard
sample_shard = model_dir / sorted(set(weight_map.values()))[0]
print(f"=== Sampling dtypes from {sample_shard.name} ===")
dtype_hist = Counter()
fp8_block_sizes = Counter()
weight_with_scale = []
with safe_open(sample_shard, framework="pt") as f:
names_in_shard = list(f.keys())
for name in names_in_shard:
t = f.get_tensor(name)
dtype_hist[str(t.dtype)] += 1
# Check for FP8 weight + scale_inv pair
if name.endswith(".weight") and t.dtype.is_floating_point and t.element_size() == 1:
scale_name = name.replace(".weight", ".weight_scale_inv")
if scale_name in names_in_shard:
scale_t = f.get_tensor(scale_name)
bm = t.shape[0] / scale_t.shape[0] if scale_t.dim() == 2 else None
bn = t.shape[1] / scale_t.shape[1] if scale_t.dim() == 2 and t.dim() == 2 else None
fp8_block_sizes[(bm, bn)] += 1
if len(weight_with_scale) < 3:
weight_with_scale.append((name, t.shape, t.dtype, scale_t.shape, scale_t.dtype))
print(" Dtype histogram (this shard only):")
for d, c in dtype_hist.most_common():
print(f" {d:20s} {c:,}")
print()
print(" FP8 block-scale dimensions detected:")
for (bm, bn), c in fp8_block_sizes.most_common():
print(f" block_size = ({bm}, {bn}) count={c}")
print()
print(" Sample FP8 weight + scale_inv pairs:")
for name, wshape, wdt, sshape, sdt in weight_with_scale:
print(f" {name}")
print(f" weight: shape={tuple(wshape)} dtype={wdt}")
print(f" scale: shape={tuple(sshape)} dtype={sdt}")
# MoE structure summary
print()
print("=== MoE structure summary ===")
layer_experts = defaultdict(set)
for name in weight_map:
m = re.match(r".*layers\.(\d+)\..*experts\.(\d+)\..*", name)
if m:
layer_experts[int(m.group(1))].add(int(m.group(2)))
if layer_experts:
layer_count = len(layer_experts)
expert_counts = [len(v) for v in layer_experts.values()]
print(f" Layers with MoE experts: {layer_count}")
print(f" Experts per layer: min={min(expert_counts)} max={max(expert_counts)}")
print(f" Sample layer 0 experts: {sorted(list(layer_experts[min(layer_experts)]))[:5]}...")
else:
print(" No '.experts.N.' pattern found — MoE structure may use different naming.")
# Flag uncategorized for human review
print()
print("=== Uncategorized tensors (review these manually) ===")
uncat = categories.get(("other", "uncategorized"), [])
if uncat:
print(f" {len(uncat):,} tensors:")
for n in uncat[:20]:
print(f" {n}")
if len(uncat) > 20:
print(f" ... and {len(uncat) - 20} more")
else:
print(" None — every tensor matched a known pattern.")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -1,270 +0,0 @@
"""
NVFP4 staging kernel — full FP4 (E2M1) activations + UE4M3 block16 scales.
The mxf4nvf4 PTX instruction requires BOTH A and B to be FP4 (E2M1 packed).
This kernel quantizes BF16 activations → E2M1 packed uint8 with UE4M3 scales.
"""
import triton
import triton.language as tl
import torch
@triton.jit
def _deepseek_v4_stage_mega_moe_inputs_kernel(
hidden_states,
x_fp4, # uint8, shape (M, K//2) — E2M1 packed, 2 values per byte
x_sf, # int32, shape (M, K//64) — UE4M3 packed, 4 scales per int32
topk_ids,
topk_weights,
topk_idx_out,
topk_weights_out,
hidden_stride_m: tl.constexpr,
hidden_stride_k: tl.constexpr,
x_stride_m: tl.constexpr,
x_stride_k: tl.constexpr,
x_sf_stride_m: tl.constexpr,
x_sf_stride_k: tl.constexpr,
topk_ids_stride_m: tl.constexpr,
topk_ids_stride_k: tl.constexpr,
topk_weights_stride_m: tl.constexpr,
topk_weights_stride_k: tl.constexpr,
topk_idx_stride_m: tl.constexpr,
topk_idx_stride_k: tl.constexpr,
topk_weights_out_stride_m: tl.constexpr,
topk_weights_out_stride_k: tl.constexpr,
hidden_size: tl.constexpr,
top_k: tl.constexpr,
BLOCK_K: tl.constexpr, # 128 elements (loaded from hidden)
GROUP_K: tl.constexpr, # 16 (NVFP4 group_size)
BLOCK_TOPK: tl.constexpr,
) -> None:
token_id = tl.program_id(0)
k_block_id = tl.program_id(1)
k_offsets = k_block_id * BLOCK_K + tl.arange(0, BLOCK_K)
k_mask = k_offsets < hidden_size
hidden = tl.load(
hidden_states + token_id * hidden_stride_m + k_offsets * hidden_stride_k,
mask=k_mask,
other=0.0,
).to(tl.float32)
num_groups: tl.constexpr = BLOCK_K // GROUP_K # 8
hidden_groups = tl.reshape(hidden, [num_groups, GROUP_K])
abs_groups = tl.reshape(tl.abs(hidden), [num_groups, GROUP_K])
amax = tl.max(abs_groups, axis=1)
amax = tl.maximum(amax, 1.0e-4)
# ---- UE4M3 scale computation ----
# scale = amax / 6.0 (E2M1 max value = 6)
# Then quantize scale to UE4M3 format
scale = amax / 6.0
scale_bits = scale.to(tl.uint32, bitcast=True)
scale_exp = (scale_bits >> 23) & 0xFF
scale_mant = scale_bits & 0x7FFFFF
# Convert FP32 → E4M3 manually (with subnormal support)
# FP32 bias=127, E4M3 bias=7 → raw exp = scale_exp - 120
e4m3_exp_raw = scale_exp - 120 # can be negative → subnormal
# Normal path: exp >= 1, just truncate mantissa top 3 bits
# RNE rounding: need guard (bit 19), sticky (OR of bits 18:0), and LSB of result
normal_mant = scale_mant >> 20
guard_bit = (scale_mant >> 19) & 1
sticky_bit = tl.where((scale_mant & 0x7FFFF) != 0, 1, 0) # OR of bits [18:0]
result_lsb = normal_mant & 1
# RNE: round up if (guard=1 and sticky=1) or (guard=1 and sticky=0 and lsb=1)
round_up = guard_bit & (sticky_bit | result_lsb)
normal_mant = normal_mant + round_up
normal_exp = e4m3_exp_raw
# Subnormal path: exp_raw <= 0
# Insert implicit leading 1 and right-shift by (1 - exp_raw)
# E4M3 subnormal: value = (mant/8) * 2^(1-7) = (mant/8) * 2^-6
# So we need: (1 + mant_fp32/2^23) * 2^(exp_raw - 7) = (shifted_mant/8) * 2^-6
# shifted_mant = (implicit_1 | mant_fp32) >> (1 - exp_raw - 1) then take top 3 bits
shift = 1 - e4m3_exp_raw # positive when subnormal
mant_with_leading = (0x800000 | scale_mant) # insert implicit 1
# Right-shift to get into the 3-bit E4M3 mantissa window
# We want bits [shift+19 : shift+23) of mant_with_leading for 3 mantissa bits + 1 round bit
subnormal_mant = (mant_with_leading >> (shift.to(tl.int32) + 20)) & 0x7
sub_guard_bit = (mant_with_leading >> (shift.to(tl.int32) + 19)) & 1
# Sticky: OR of all bits below the guard bit in the shifted result
# shift ≤ 8 in practice (amax floor = 1e-4 → scale ≈ 2^-15 → exp_raw ≈ -7), so mask ≤ 2^27
sub_sticky_mask = (1 << (shift.to(tl.int32) + 19)) - 1
sub_sticky_bit = tl.where((mant_with_leading & sub_sticky_mask) != 0, 1, 0)
sub_result_lsb = subnormal_mant & 1
sub_round_up = sub_guard_bit & (sub_sticky_bit | sub_result_lsb)
subnormal_mant = subnormal_mant + sub_round_up
is_normal = e4m3_exp_raw >= 1
e4m3_mant = tl.where(is_normal, normal_mant, subnormal_mant)
e4m3_exp = tl.where(is_normal, normal_exp, 0) # exp=0 for subnormals
# Handle mantissa overflow after rounding
overflow = e4m3_mant >= 8
e4m3_mant = tl.where(overflow, 0, e4m3_mant)
e4m3_exp = tl.where(overflow, e4m3_exp + 1, e4m3_exp)
e4m3_exp = tl.maximum(e4m3_exp, 0)
e4m3_exp = tl.minimum(e4m3_exp, 15)
scale_e4m3_bits = (e4m3_exp << 3) | e4m3_mant
# Reconstruct dequantized scale by decoding the STORED E4M3 bits.
# This guarantees the E2M1 quantization divides by exactly the value
# the CUDA kernel will multiply back — same bits, single decode, no
# possibility of encode/decode disagreement.
stored_exp = (scale_e4m3_bits >> 3) & 0xF
stored_mant = scale_e4m3_bits & 0x7
e4m3_exp_for_recon = tl.maximum(stored_exp.to(tl.int32) - 7, -126)
two_pow_exp_bits = (e4m3_exp_for_recon + 127).to(tl.uint32) << 23
two_pow_exp = two_pow_exp_bits.to(tl.float32, bitcast=True)
normal_value = (1.0 + stored_mant.to(tl.float32) / 8.0) * two_pow_exp
subnormal_value = (stored_mant.to(tl.float32) / 8.0) * 0.015625
e4m3_value = tl.where(stored_exp == 0, subnormal_value, normal_value)
# ---- E2M1 FP4 quantization (unpacked, 1 byte/element) ----
# E2M1 LUT (unsigned): [0, 0.5, 1, 1.5, 2, 3, 4, 6]
# Nearest-neighbor using thresholds (midpoints between consecutive values)
scaled = hidden_groups * (1.0 / tl.maximum(e4m3_value, 1e-6))[:, None]
# Clamp to E2M1 range [-6, 6]
scaled = tl.maximum(scaled, -6.0)
scaled = tl.minimum(scaled, 6.0)
abs_s = tl.abs(scaled)
# Thresholds: midpoints between [0, 0.5, 1, 1.5, 2, 3, 4, 6]
# [0, 0.25, 0.75, 1.25, 1.75, 2.5, 3.5, 5.0, INF]
e2m1_idx = tl.where(abs_s < 0.25, 0,
tl.where(abs_s < 0.75, 1,
tl.where(abs_s < 1.25, 2,
tl.where(abs_s < 1.75, 3,
tl.where(abs_s < 2.5, 4,
tl.where(abs_s < 3.5, 5,
tl.where(abs_s < 5.0, 6, 7)))))))
sign_bit = (scaled < 0).to(tl.int32)
e2m1_4bit = (sign_bit << 3) | e2m1_idx # 4-bit: (sign << 3) | index
# Pack E2M1 pairs into single bytes (2 per byte, low nibble first)
# mxf4nvf4 reads FP4 packed from SMEM — must match kernel's TMA layout
e2m1_flat = tl.reshape(e2m1_4bit, [BLOCK_K])
e2m1_lo = e2m1_flat[0::2] # even indices → low nibble
e2m1_hi = e2m1_flat[1::2] # odd indices → high nibble
e2m1_packed = (e2m1_hi << 4 | e2m1_lo).to(tl.uint8) # [BLOCK_K // 2]
k_offsets_out = k_block_id * (BLOCK_K // 2) + tl.arange(0, BLOCK_K // 2)
k_mask_out = k_offsets_out < (hidden_size // 2)
tl.store(
x_fp4 + token_id * x_stride_m + k_offsets_out * x_stride_k,
e2m1_packed,
mask=k_mask_out,
)
# Pack UE4M3 bytes into int32 (NVFP4: group_size=16, 4 groups per 64 elements)
# 8 groups per k_block of 128 → 2 int32s per k_block
# int32 can only pack 4 bytes (shifts >= 32 are UB), so split into two packs
scale_offsets = tl.arange(0, num_groups) # [0..7]
first_half = scale_offsets < 4 # groups 0-3 → int32[0]
second_half = scale_offsets >= 4 # groups 4-7 → int32[1]
packed_lo = tl.sum(
tl.where(first_half, scale_e4m3_bits.to(tl.int32) << (scale_offsets * 8), 0),
axis=0,
).to(tl.int32)
packed_hi = tl.sum(
tl.where(second_half, scale_e4m3_bits.to(tl.int32) << ((scale_offsets - 4) * 8), 0),
axis=0,
).to(tl.int32)
# Write 2 int32s per k_block: x_sf shape is (M, K//64) = (M, num_k_blocks * 2)
sf_base = token_id * x_sf_stride_m + k_block_id * 2 * x_sf_stride_k
tl.store(x_sf + sf_base, packed_lo)
tl.store(x_sf + sf_base + x_sf_stride_k, packed_hi)
if k_block_id == 0:
topk_offsets = tl.arange(0, BLOCK_TOPK)
topk_mask = topk_offsets < top_k
ids = tl.load(
topk_ids + token_id * topk_ids_stride_m + topk_offsets * topk_ids_stride_k,
mask=topk_mask,
other=0,
).to(tl.int64)
tl.store(
topk_idx_out
+ token_id * topk_idx_stride_m
+ topk_offsets * topk_idx_stride_k,
ids,
mask=topk_mask,
)
weights = tl.load(
topk_weights
+ token_id * topk_weights_stride_m
+ topk_offsets * topk_weights_stride_k,
mask=topk_mask,
other=0.0,
)
tl.store(
topk_weights_out
+ token_id * topk_weights_out_stride_m
+ topk_offsets * topk_weights_out_stride_k,
weights,
mask=topk_mask,
)
def _stage_deepseek_v4_mega_moe_inputs(
hidden_states: torch.Tensor,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
x_fp4: torch.Tensor, # uint8, shape (M, K//2)
x_sf: torch.Tensor, # int32, shape (M, K//64)
topk_idx_out: torch.Tensor,
topk_weights_out: torch.Tensor,
) -> None:
num_tokens, hidden_size = hidden_states.shape
if num_tokens == 0:
return
if hidden_size % 128 != 0:
raise ValueError(
"DeepSeek V4 MegaMoE input staging requires hidden_size to be "
"a multiple of 128."
)
top_k = topk_ids.shape[1]
if topk_weights.shape != topk_ids.shape:
raise ValueError(
"DeepSeek V4 MegaMoE input staging requires topk_weights and "
"topk_ids to have the same shape."
)
block_k = 128
grid = (num_tokens, triton.cdiv(hidden_size, block_k))
block_topk = triton.next_power_of_2(top_k)
_deepseek_v4_stage_mega_moe_inputs_kernel[grid](
hidden_states,
x_fp4,
x_sf,
topk_ids,
topk_weights,
topk_idx_out,
topk_weights_out,
hidden_states.stride(0),
hidden_states.stride(1),
x_fp4.stride(0),
x_fp4.stride(1),
x_sf.stride(0),
x_sf.stride(1),
topk_ids.stride(0),
topk_ids.stride(1),
topk_weights.stride(0),
topk_weights.stride(1),
topk_idx_out.stride(0),
topk_idx_out.stride(1),
topk_weights_out.stride(0),
topk_weights_out.stride(1),
hidden_size,
top_k,
BLOCK_K=block_k,
GROUP_K=16, # NVFP4: group_size=16 (scale_vec::4X)
BLOCK_TOPK=block_topk,
num_warps=4,
)

218
quantize_llmcompressor.py Normal file
View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""Path B: llm-compressor oneshot NVFP4 quantization for DeepSeek V4 Pro.
Uses sequential pipeline + activation calibration to produce W4A4 NVFP4 with
calibrated activation global scales. Higher quality than the streaming converter
on activation-sensitive ops, at the cost of much longer wall time and more
fragility on a brand-new architecture.
Memory plan with 2.7 TB host RAM + 8x B200 (1.5 TB HBM):
- FP8 base resident in CPU RAM: ~865 GB
- One transformer block on GPU at a time: ~10-30 GB HBM
- Activation calibration cache: tens to a few hundred GB
- Headroom: ~1.5+ TB RAM, ~1.4+ TB HBM
Critical: this loads the model with trust_remote_code=True. V4 architecture is
brand new; expect to need:
- transformers from source (or recent main)
- llm-compressor from source
- The V4 modeling code in DeepSeek-V4-Pro-FP8/inference/ to be importable
Usage:
python quantize_llmcompressor.py \\
--src DeepSeek-V4-Pro-FP8 \\
--dst DeepSeek-V4-Pro-NVFP4-llmcompressor \\
--num-samples 256 \\
--max-seq-len 4096
"""
import argparse
import os
import sys
from pathlib import Path
import torch
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--src", required=True, help="Source FP8 model directory")
ap.add_argument("--dst", required=True, help="Output NVFP4 model directory")
ap.add_argument("--num-samples", type=int, default=256)
ap.add_argument("--max-seq-len", type=int, default=4096)
ap.add_argument("--calibration-dataset", default="HuggingFaceH4/ultrachat_200k")
ap.add_argument(
"--offload-folder", default="/root/nvidia-meeting/.offload",
help="NVMe folder for accelerate disk-offload spillover (rarely needed at 2.7TB RAM)",
)
ap.add_argument(
"--no-activation-quant", action="store_true",
help="Quantize weights only (no activation calibration). Faster, closer to Path A."
)
args = ap.parse_args()
src = Path(args.src).resolve()
dst = Path(args.dst).resolve()
if not (src / "config.json").exists():
sys.exit(f"No config.json at {src}")
Path(args.offload_folder).mkdir(parents=True, exist_ok=True)
# Heavy imports happen here so --help is fast
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
from llmcompressor import oneshot
from llmcompressor.modifiers.quantization import QuantizationModifier
# ----------------------------------------------------------------------
# 1. Load model
# ----------------------------------------------------------------------
print(f"Loading {src} ...")
print(" This will take several minutes — FP8 base is ~865 GB.")
# We want FP8 weights to stay as FP8 on CPU and only be promoted to BF16
# when each block goes to GPU during sequential calibration. The exact
# behavior depends on transformers' V4 modeling code — if it auto-dequants
# on load, expect 3.2 TB BF16 in RAM and you'll spill. Watch `free -h`.
tokenizer = AutoTokenizer.from_pretrained(src, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
src,
torch_dtype="auto",
device_map="cpu", # all on CPU; sequential pipeline moves blocks to GPU
trust_remote_code=True,
offload_folder=args.offload_folder,
)
print(f" Model class: {type(model).__name__}")
print(f" Param count: {sum(p.numel() for p in model.parameters()):,}")
# ----------------------------------------------------------------------
# 2. MoE handling — replace_modules_for_calibration
# ----------------------------------------------------------------------
# On Llama4/Qwen3-MoE, llm-compressor needs a wrapper class that exposes
# every expert during calibration (otherwise routed-only experts never see
# data). For DeepSeek V4 the MoE class name is something like
# `DeepseekV4MoE`. Try the canonical entrypoint first; fall back gracefully.
try:
from llmcompressor.modeling import replace_modules_for_calibration
print("Replacing MoE modules for calibration...")
replace_modules_for_calibration(model)
except ImportError:
print("WARN: replace_modules_for_calibration not available in this "
"llm-compressor version. Routed-only experts may not see "
"calibration data, lowering NVFP4 quality on rare experts.")
except Exception as e:
print(f"WARN: replace_modules_for_calibration failed: {e}")
print(" You may need to register a custom MoE wrapper for V4. "
"Find the MoE class name in DeepSeek-V4-Pro-FP8/inference/ and "
"register it via llmcompressor.modeling.register_module_replacement.")
# ----------------------------------------------------------------------
# 3. Calibration dataset
# ----------------------------------------------------------------------
print(f"Loading calibration dataset {args.calibration_dataset} ...")
ds = load_dataset(args.calibration_dataset, split="train_sft")
ds = ds.shuffle(seed=42).select(range(args.num_samples))
def preprocess(example):
# Use the model's chat template if it has one; ultrachat samples have a
# 'messages' field already in the OpenAI shape.
if "messages" in example:
try:
text = tokenizer.apply_chat_template(
example["messages"], tokenize=False, add_generation_prompt=False
)
except Exception:
text = "\n".join(m.get("content", "") for m in example["messages"])
else:
text = example.get("text") or example.get("prompt") or ""
return {"text": text}
ds = ds.map(preprocess, remove_columns=ds.column_names)
def tokenize(example):
return tokenizer(
example["text"],
truncation=True,
max_length=args.max_seq_len,
padding=False,
return_tensors=None,
)
ds = ds.map(tokenize, remove_columns=["text"])
# ----------------------------------------------------------------------
# 4. Recipe
# ----------------------------------------------------------------------
# NVFP4 W4A4 by default. The ignore list mirrors Path A's preserve list:
# output head, embeddings, MoE router gates (NOT gate_proj!), norms, and
# V4-specific attention indexer / mHC residual mixing weights.
ignore = [
"re:.*lm_head",
"re:.*embed_tokens$",
"re:.*\\.mlp\\.gate$",
"re:.*\\.mlp\\.gate\\.weight$",
"re:.*norm.*",
"re:.*indexer.*",
"re:.*hyper_conn.*",
"re:.*\\.mhc.*",
"re:.*scoring.*",
]
if args.no_activation_quant:
print("Recipe: NVFP4 weight-only (W4A16 effective)")
recipe = QuantizationModifier(
targets="Linear",
scheme="NVFP4A16", # weight-only variant
ignore=ignore,
)
else:
print("Recipe: NVFP4 W4A4 with activation calibration")
recipe = QuantizationModifier(
targets="Linear",
scheme="NVFP4",
ignore=ignore,
)
# ----------------------------------------------------------------------
# 5. Run oneshot — sequential pipeline is the key for memory
# ----------------------------------------------------------------------
print("Starting oneshot calibration + quantization (this is the long part)...")
print(f" num_samples={args.num_samples}, max_seq_len={args.max_seq_len}")
print(f" Watch with: watch -n 5 'free -h && nvidia-smi --query-gpu=memory.used,memory.free --format=csv'")
oneshot(
model=model,
dataset=ds,
recipe=recipe,
max_seq_length=args.max_seq_len,
num_calibration_samples=args.num_samples,
# Sequential pipeline: one block at a time on GPU, rest on CPU.
pipeline="sequential",
# Calibrate every expert, even routed-only ones that wouldn't see traffic.
moe_calibrate_all_experts=True,
)
# ----------------------------------------------------------------------
# 6. Save compressed
# ----------------------------------------------------------------------
print(f"Saving compressed checkpoint to {dst} ...")
dst.mkdir(parents=True, exist_ok=True)
model.save_pretrained(str(dst), save_compressed=True)
tokenizer.save_pretrained(str(dst))
# Copy any extra files that save_pretrained doesn't (encoding/, inference/, PDF)
import shutil
for fname in src.iterdir():
if fname.is_dir() and fname.name in {"encoding", "inference", "assets"}:
dst_sub = dst / fname.name
if not dst_sub.exists():
shutil.copytree(fname, dst_sub)
elif fname.suffix in {".pdf", ".md"} and not (dst / fname.name).exists():
shutil.copy2(fname, dst / fname.name)
print("Done.")
print(f"Output: {dst}")
if __name__ == "__main__":
main()

View File

@@ -1,7 +0,0 @@
compressed-tensors<0.15.0
nvidia-modelopt[hf]
fire
flash-attn>=2.6.0
transformers<5.0
transformers_stream_generator
zstandard

View File

@@ -1,276 +0,0 @@
#!/usr/bin/env python3
"""
Complete dequantization of DeepSeek V4 Pro mixed-precision to pure BF16.
Handles ALL compressed tensor types found in the mixed-precision model:
1. FP8 attention weights (float8_e4m3fn + float8_e8m0fnu block scales)
- weight × scale_expanded → BF16
- 128×128 block quantization
2. FP4 (E2M1) expert weights (int8 packed + float8_e8m0fnu block scales)
- Unpack 2 FP4 values per int8 byte (lower nibble first, upper second)
- Dequantize via E2M1 LUT lookup × scale_expanded → BF16
- Per-row, 32-column block scaling (MXFP4 microscaling format)
- Output dimensions are 2× the stored dimensions
- Verified: nibble index 0 vs 8 ratio = 0.996 (FP4 -0.0 vs +0.0),
NOT INT4 where index 8 = -8 would be rare
3. FP8 shared expert weights (float8_e4m3fn + float8_e8m0fnu block scales)
- Same as FP8 attention dequantization
After dequantization, all weights are pure BF16. FP8Linear.forward() sees
element_size() > 1 and falls back to F.linear(), avoiding broken FP8 kernels
on Blackwell GPUs. The model can then be loaded by modelopt without shape
mismatches.
"""
import os, glob, json, shutil, sys, time
from safetensors import safe_open
from safetensors.torch import save_file
import torch
FP8_WEIGHT_DTYPE = torch.float8_e4m3fn
FP8_SCALE_DTYPE = torch.float8_e8m0fnu
BLOCK_SIZE_FP8 = (128, 128)
FP4_BLOCK_SIZE = 32 # columns per scale value for MXFP4 expert weights
# E2M1 FP4 lookup table (MXFP4 microscaling format)
# Index 0-7: positive values (sign=0, 2-bit exp, 1-bit mantissa)
# Index 8-15: negative values (sign=1)
# Mapping: 0→0, 1→0.5, 2→1, 3→1.5, 4→2, 5→3, 6→4, 7→6
FP4_E2M1_LUT = torch.tensor([
0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0,
-0.0, -0.5, -1.0, -1.5, -2.0, -3.0, -4.0, -6.0,
], dtype=torch.float32)
def dequantize_fp8_weight(fp8_weight: torch.Tensor, scale: torch.Tensor) -> torch.Tensor:
"""Dequantize block-wise FP8 weight to BF16.
fp8_weight: (out_features, in_features) float8_e4m3fn
scale: (out_features//128, in_features//128) float8_e8m0fnu
"""
scale_f32 = scale.float()
out_features, in_features = fp8_weight.shape
scale_expanded = scale_f32.repeat_interleave(BLOCK_SIZE_FP8[0], dim=0).repeat_interleave(BLOCK_SIZE_FP8[1], dim=1)
scale_expanded = scale_expanded[:out_features, :in_features]
weight_bf16 = fp8_weight.float() * scale_expanded
return weight_bf16.to(torch.bfloat16)
def dequantize_fp4_weight(int8_packed: torch.Tensor, scale: torch.Tensor) -> torch.Tensor:
"""Dequantize MXFP4 (E2M1) expert weight to BF16.
FP4 values are packed 2-per-byte into int8 tensors.
Lower nibble (bits 0-3) is the first value, upper nibble (bits 4-7) is the second.
E2M1 format: 1 sign + 2 exponent + 1 mantissa bit.
Scale is per-row with 32-column blocks (float8_e8m0fnu, MX microscaling).
Output dimensions are 2× the stored dimensions.
int8_packed: (out_features, in_features//2) int8
scale: (out_features, in_features//32) float8_e8m0fnu
returns: (out_features, in_features) bfloat16
"""
lut = FP4_E2M1_LUT.to(int8_packed.device)
# Unpack nibble indices
lower_idx = (int8_packed & 0x0F).long() # 0-15
upper_idx = ((int8_packed >> 4) & 0x0F).long() # 0-15
# LUT lookup
lower = lut[lower_idx] # float32
upper = lut[upper_idx] # float32
out_features = int8_packed.shape[0]
in_features_full = int8_packed.shape[1] * 2 # 2× expansion
# Expand scale: (out_features, in_features//32) → (out_features, in_features)
scale_f32 = scale.float()
scale_expanded = scale_f32.repeat_interleave(FP4_BLOCK_SIZE, dim=1)
scale_expanded = scale_expanded[:, :in_features_full]
# Interleave lower and upper nibbles
unpacked = torch.empty(out_features, in_features_full, dtype=torch.float32, device=int8_packed.device)
unpacked[:, 0::2] = lower
unpacked[:, 1::2] = upper
# Dequantize: FP4 value × E8M0 scale
bf16_weight = (unpacked * scale_expanded).to(torch.bfloat16)
return bf16_weight
def dequantize_model(model_dir: str, out_dir: str):
os.makedirs(out_dir, exist_ok=True)
# Copy non-safetensor files
print("Copying metadata files...")
for f in os.listdir(model_dir):
fp = os.path.join(model_dir, f)
if not f.endswith(".safetensors") and os.path.isfile(fp):
shutil.copy2(fp, os.path.join(out_dir, f))
print(f" Copied {f}")
safetensor_files = sorted(glob.glob(os.path.join(model_dir, "*.safetensors")))
total_shards = len(safetensor_files)
print(f"Found {total_shards} shards")
# First pass: build scale-key → weight-key mapping
print("\nScanning for weight+scale pairs...")
scale_to_weight = {}
for f in safetensor_files:
with safe_open(f, framework="pt") as sf:
for key in sf.keys():
if key.endswith(".scale"):
weight_key = key[:-len(".scale")] + ".weight"
scale_to_weight[key] = weight_key
weight_to_scale = {v: k for k, v in scale_to_weight.items()}
print(f"Found {len(scale_to_weight)} weight+scale pairs")
# Classify weights by type (sample first 2 shards)
fp4_weight_keys = set()
fp8_weight_keys = set()
scale_keys = set(scale_to_weight.keys())
for f in safetensor_files[:2]:
with safe_open(f, framework="pt") as sf:
for key in sf.keys():
if key in weight_to_scale:
t = sf.get_tensor(key)
if t.dtype == torch.int8:
fp4_weight_keys.add(key)
elif t.dtype == FP8_WEIGHT_DTYPE:
fp8_weight_keys.add(key)
print(f" FP4 (E2M1) expert weights (packed): ~{len(fp4_weight_keys)} per shard")
print(f" FP8 attention/shared-expert weights: ~{len(fp8_weight_keys)} per shard")
# Second pass: dequantize and save
stats = {"fp4_dequantized": 0, "fp8_dequantized": 0, "scales_removed": 0, "unchanged": 0}
start_time = time.time()
for i, f in enumerate(safetensor_files):
shard_start = time.time()
tensors = {}
scales_in_shard = {}
with safe_open(f, framework="pt") as sf:
keys = list(sf.keys())
# First: collect scales
for key in keys:
if key in scale_keys:
t = sf.get_tensor(key)
scales_in_shard[key] = t
# Second: process weights and other tensors
for key in keys:
if key in scale_keys:
continue # handled separately
t = sf.get_tensor(key)
if key in weight_to_scale and t.dtype == torch.int8:
# FP4 (E2M1) packed expert weight (MXFP4 microscaling)
scale_key = weight_to_scale[key]
scale = scales_in_shard.get(scale_key)
if scale is None:
print(f" WARNING: scale {scale_key} not in same shard as {key}")
tensors[key] = t # keep as-is
continue
bf16 = dequantize_fp4_weight(t, scale)
tensors[key] = bf16
stats["fp4_dequantized"] += 1
del scales_in_shard[scale_key]
stats["scales_removed"] += 1
elif key in weight_to_scale and t.dtype == FP8_WEIGHT_DTYPE:
# FP8 weight (attention or shared expert)
scale_key = weight_to_scale[key]
scale = scales_in_shard.get(scale_key)
if scale is None:
print(f" WARNING: scale {scale_key} not in same shard as {key}")
tensors[key] = t
continue
bf16 = dequantize_fp8_weight(t, scale)
tensors[key] = bf16
stats["fp8_dequantized"] += 1
del scales_in_shard[scale_key]
stats["scales_removed"] += 1
else:
# Regular tensor (BF16, FP32, int64, etc.) - keep as-is
tensors[key] = t
stats["unchanged"] += 1
# Remove unused scales
for sk in scales_in_shard:
stats["scales_removed"] += 1
out_path = os.path.join(out_dir, os.path.basename(f))
if os.path.exists(out_path) and os.path.getsize(out_path) > 0:
# Resume: skip already-dequantized shards
print(f"[{i+1}/{total_shards}] Skipping (already done): {os.path.basename(f)}")
del tensors, scales_in_shard
continue
save_file(tensors, out_path)
shard_time = time.time() - shard_start
elapsed = time.time() - start_time
rate = (i + 1) / elapsed if elapsed > 0 else 0
eta = (total_shards - i - 1) / rate if rate > 0 else 0
print(f"[{i+1}/{total_shards}] {os.path.basename(f)} "
f"({stats['fp4_dequantized']} fp4, {stats['fp8_dequantized']} fp8, "
f"{stats['scales_removed']} scales rm) "
f"[{shard_time:.1f}s, ETA: {eta/60:.0f}min]")
del tensors, scales_in_shard
# Update config
cfg_path = os.path.join(out_dir, "config.json")
if os.path.exists(cfg_path):
cfg = json.load(open(cfg_path))
cfg["torch_dtype"] = "bfloat16"
cfg["_experts_implementation"] = "eager"
if "quantization_config" in cfg:
del cfg["quantization_config"]
json.dump(cfg, open(cfg_path, "w"), indent=2)
print(f"\nUpdated config.json: torch_dtype=bfloat16, _experts_implementation=eager")
total_time = time.time() - start_time
print(f"\nDone in {total_time/60:.1f} minutes!")
print(f" FP4 expert weights dequantized: {stats['fp4_dequantized']}")
print(f" FP8 weights dequantized: {stats['fp8_dequantized']}")
print(f" Scale tensors removed: {stats['scales_removed']}")
print(f" Unchanged tensors: {stats['unchanged']}")
# Verify no FP8/INT8 remaining
print("\nVerifying...")
remaining_compressed = 0
for f in sorted(glob.glob(os.path.join(out_dir, "*.safetensors")))[:5]:
with safe_open(f, framework="pt") as sf:
for key in sf.keys():
t = sf.get_tensor(key)
if t.dtype in (torch.float8_e8m0fnu, torch.float8_e4m3fn, torch.int8):
remaining_compressed += 1
if remaining_compressed <= 5:
print(f" REMAINING: {key} {t.dtype} {t.shape}")
if remaining_compressed == 0:
print(" ✅ No compressed tensors remaining — model is pure BF16!")
else:
print(f" ⚠️ {remaining_compressed} compressed tensors still present")
out_size = sum(os.path.getsize(os.path.join(out_dir, f)) for f in os.listdir(out_dir) if f.endswith(".safetensors"))
print(f"Output size: {out_size / 1e12:.2f} TB")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Complete dequantization of DeepSeek V4 Pro to BF16")
parser.add_argument("model_dir", help="Path to mixed-precision model")
parser.add_argument("out_dir", help="Path to write dequantized BF16 model")
args = parser.parse_args()
dequantize_model(args.model_dir, args.out_dir)

View File

@@ -1,587 +0,0 @@
#!/usr/bin/env python3
"""
DeepSeek V4 Pro → NVFP4 quantization — defensive edition.
This script:
1. Applies runtime patches for GPU tensor safety (before modelopt runs)
2. Calls the SAME hf_ptq.py pipeline that the shell script uses
3. After calibration, snapshots amax to CPU and saves model state
The key insight: we don't rewrite the pipeline. We let hf_ptq do its thing
with all its args, defaults, and edge cases handled correctly. We just add
our defensive patches and post-calibration saves.
Must be run from the modelopt example directory:
cd /root/nvidia-meeting/modelopt-repo/examples/llm_ptq
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py
Usage:
# Full run (calibrate + export):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py
# Re-run export only (after a calibration save exists):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py --export-only
# Validate saved calibration state (check amax values):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py --validate-only
"""
import argparse
import gc
import os
import sys
import time
import torch
# ── Config ──────────────────────────────────────────────────────────────────
MODEL = "/root/nvidia-meeting/DeepSeek-V4-Pro-BF16"
QUANT = "nvfp4"
TP = 8
CALIB_SIZE = 128
CALIB_SEQ = 512
KV_CACHE_QUANT = "fp8_cast"
GPU_MEM_PCT = 0.7
HF_TOKEN = "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"
# Paths
EXAMPLE_DIR = "/root/nvidia-meeting/modelopt-repo/examples/llm_ptq"
EXPORT_DIR = "/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4"
CALIB_SAVE_PATH = "/root/nvidia-meeting/v4_nvfp4_calibrated_state.pt"
AMAX_SNAPSHOT_PATH = "/root/nvidia-meeting/v4_nvfp4_amax_snapshots.pt"
def apply_patches():
"""Apply runtime patches for V4 compatibility and GPU tensor safety.
Root cause of all export crashes: use_seq_device_map keeps model weights on GPU
for 5+ hours during calibration. By export time, CUDA's memory allocator has
recycled the underlying memory, so any read of those GPU tensors triggers
cudaErrorIllegalAddress.
Fix strategy: patch at the EARLIEST possible entry points to force stale GPU
tensors to CPU before any downstream code reads them. This covers the full
chain of execution we traced through the export path:
_process_quantized_modules
→ _export_quantized_weight (or _export_fused_experts)
→ get_weight_scaling_factor
→ get_weights_scaling_factor_from_quantizer (reads weight, _amax, global_amax)
→ NVFP4QTensor.get_weights_scaling_factor (dynamic: reduce_block_amax on weight)
→ get_weight_scaling_factor_2 (reads _amax, global_amax)
→ get_activation_scaling_factor (reads _amax) [already patched]
→ to_quantized_weight (reads weight, does .to(weight.device) on scaling factors)
→ weight.to(dtype) (reads weight)
By forcing weight to CPU in Patch 4 (_export_quantized_weight), ALL downstream
.to(weight.device) calls resolve to CPU. Patches 5-8 are belt-and-suspenders.
"""
from modelopt.torch.quantization.nn.modules import tensor_quantizer as tq_module
from modelopt.torch.quantization.qtensor import nvfp4_tensor
from modelopt.torch.export import quant_utils
from modelopt.torch.quantization.utils import quantizer_attr_names as _quantizer_attr_names
import modelopt.torch.export.unified_export_hf as uehf
# ══════════════════════════════════════════════════════════════════════
# Patch 1: load_calib_amax — force _amax to CPU immediately after calibration
# This runs during calibration, right after each quantizer finishes.
# ══════════════════════════════════════════════════════════════════════
orig_load_calib_amax = tq_module.TensorQuantizer.load_calib_amax
def patched_load_calib_amax(self, *args, **kwargs):
orig_load_calib_amax(self, *args, **kwargs)
if hasattr(self, '_amax') and self._amax is not None:
self._amax = self._amax.cpu()
tq_module.TensorQuantizer.load_calib_amax = patched_load_calib_amax
print("✓ Patch 1: TensorQuantizer.load_calib_amax → force _amax to CPU")
# ══════════════════════════════════════════════════════════════════════
# Patch 2: export_amax — CPU safety net at export time
# ══════════════════════════════════════════════════════════════════════
orig_export_amax = tq_module.TensorQuantizer.export_amax
def patched_export_amax(self):
if hasattr(self, '_amax') and self._amax is not None and self._amax.is_cuda:
self._amax = self._amax.cpu()
return orig_export_amax(self)
tq_module.TensorQuantizer.export_amax = patched_export_amax
print("✓ Patch 2: TensorQuantizer.export_amax → CPU fallback")
# ══════════════════════════════════════════════════════════════════════
# Patch 3: get_activation_scaling_factor — CPU + clamp
# ══════════════════════════════════════════════════════════════════════
@classmethod
def patched_get_activation_scaling_factor(cls, quantizer):
if not quantizer.is_enabled:
return None
try:
amax = quantizer.export_amax()
except (torch.cuda.CudaError, RuntimeError) as e:
print(f" WARNING: export_amax() failed ({e}), attempting CPU recovery...")
if hasattr(quantizer, '_amax') and quantizer._amax is not None:
quantizer._amax = quantizer._amax.cpu()
amax = quantizer.export_amax()
if amax is None:
return None
amax = amax.cpu()
activation_scaling_factor = amax.float() / (quantizer.maxbound * 448.0)
if not torch.all(activation_scaling_factor > 0):
n_bad = (activation_scaling_factor <= 0).sum().item()
n_total = activation_scaling_factor.numel()
print(f" WARNING: {n_bad}/{n_total} activation scaling factors <= 0, clamping")
activation_scaling_factor = activation_scaling_factor.clamp(min=torch.finfo(torch.float32).tiny)
return activation_scaling_factor
nvfp4_tensor.NVFP4QTensor.get_activation_scaling_factor = patched_get_activation_scaling_factor
print("✓ Patch 3: NVFP4QTensor.get_activation_scaling_factor → CPU + clamp")
# ══════════════════════════════════════════════════════════════════════
# Patch 4: _export_quantized_weight — THE KEY PATCH
#
# This is the entry point for exporting each quantized module. It reads
# `weight = getattr(sub_module, weight_name)` which is on a stale GPU.
# By moving weight to CPU right here, ALL downstream functions are safe:
# - get_weight_scaling_factor: weight.device is now CPU
# - get_weights_scaling_factor: operates on CPU weight
# - to_quantized_weight: .to(weight.device) stays on CPU
# - weight.to(dtype): CPU cast
# We also force all quantizer state to CPU for the same reason.
# ══════════════════════════════════════════════════════════════════════
orig_export_quantized_weight = uehf._export_quantized_weight
def patched_export_quantized_weight(sub_module, dtype, weight_name="weight"):
# Move weight to CPU (stale GPU → safe CPU)
weight = getattr(sub_module, weight_name, None)
if weight is not None and isinstance(weight, torch.Tensor) and weight.is_cuda:
try:
weight_cpu = weight.cpu()
with torch.no_grad():
setattr(sub_module, weight_name, torch.nn.Parameter(weight_cpu))
except (torch.cuda.CudaError, RuntimeError) as e:
print(f" WARNING: weight.cpu() failed for {weight_name} ({e})")
raise
# Force all quantizer state to CPU
qattrs = _quantizer_attr_names(weight_name)
for qattr in [qattrs.weight_quantizer, qattrs.input_quantizer, qattrs.output_quantizer]:
if not qattr:
continue
quantizer = getattr(sub_module, qattr, None)
if quantizer is None:
continue
for attr in ['_amax', '_pre_quant_scale', 'global_amax', '_global_amax']:
val = getattr(quantizer, attr, None)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
try:
setattr(quantizer, attr, val.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
# Handle SequentialQuantizer (W4A8 path)
if hasattr(quantizer, 'quantizers'):
for sub_q in quantizer.quantizers:
for attr in ['_amax', '_pre_quant_scale', 'global_amax', '_global_amax']:
val = getattr(sub_q, attr, None)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
try:
setattr(sub_q, attr, val.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
return orig_export_quantized_weight(sub_module, dtype, weight_name)
uehf._export_quantized_weight = patched_export_quantized_weight
print("✓ Patch 4: _export_quantized_weight → force weight + quantizer state to CPU")
# ══════════════════════════════════════════════════════════════════════
# Patch 5: _export_fused_experts — same treatment for MoE expert weights
# DeepseekV4Experts go through this different code path.
# ══════════════════════════════════════════════════════════════════════
orig_export_fused_experts = uehf._export_fused_experts
def patched_export_fused_experts(sub_module, dtype):
# Force all expert weights to CPU
for name, param in list(sub_module.named_parameters()):
if isinstance(param, torch.Tensor) and param.is_cuda:
try:
with torch.no_grad():
setattr(sub_module, name, torch.nn.Parameter(param.cpu()))
except (torch.cuda.CudaError, RuntimeError):
pass
# Force all buffers to CPU
for name, buf in list(sub_module.named_buffers()):
if isinstance(buf, torch.Tensor) and buf.is_cuda:
try:
sub_module.register_buffer(name, buf.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
# Force all quantizer state to CPU
for mod in sub_module.modules():
for attr in ['_amax', '_pre_quant_scale', 'global_amax', '_global_amax']:
val = getattr(mod, attr, None)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
try:
setattr(mod, attr, val.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
return orig_export_fused_experts(sub_module, dtype)
uehf._export_fused_experts = patched_export_fused_experts
print("✓ Patch 5: _export_fused_experts → force expert weights + quantizer state to CPU")
# ══════════════════════════════════════════════════════════════════════
# Patch 6: to_quantized_weight — force scaling factors to CPU
# This does .to(weight.device) on scaling factors. With weight now on
# CPU (Patch 4), this should be a no-op, but belt-and-suspenders.
# ══════════════════════════════════════════════════════════════════════
orig_to_quantized_weight = quant_utils.to_quantized_weight
def patched_to_quantized_weight(weight, weights_scaling_factor, quantization,
weights_scaling_factor2=None, block_size=None):
if isinstance(weight, torch.Tensor) and weight.is_cuda:
weight = weight.cpu()
if weights_scaling_factor is not None and isinstance(weights_scaling_factor, torch.Tensor) and weights_scaling_factor.is_cuda:
weights_scaling_factor = weights_scaling_factor.cpu()
if weights_scaling_factor2 is not None and isinstance(weights_scaling_factor2, torch.Tensor) and weights_scaling_factor2.is_cuda:
weights_scaling_factor2 = weights_scaling_factor2.cpu()
return orig_to_quantized_weight(weight, weights_scaling_factor, quantization,
weights_scaling_factor2, block_size)
quant_utils.to_quantized_weight = patched_to_quantized_weight
print("✓ Patch 6: to_quantized_weight → force all tensors to CPU")
# ══════════════════════════════════════════════════════════════════════
# Patch 7: get_weight_scaling_factor — force weight + quantizer to CPU
# Belt and suspenders: Patch 4 should handle this, but this is also
# called from other code paths.
# ══════════════════════════════════════════════════════════════════════
orig_get_weight_scaling_factor = quant_utils.get_weight_scaling_factor
def patched_get_weight_scaling_factor(module, weight_name="weight"):
weight = getattr(module, weight_name, None)
if weight is not None and isinstance(weight, torch.Tensor) and weight.is_cuda:
try:
with torch.no_grad():
setattr(module, weight_name, torch.nn.Parameter(weight.cpu()))
except (torch.cuda.CudaError, RuntimeError) as e:
print(f" WARNING: weight.cpu() failed in get_weight_scaling_factor ({e})")
raise
weight_quantizer = getattr(module, _quantizer_attr_names(weight_name).weight_quantizer, None)
if weight_quantizer is not None:
for attr in ['_amax', '_pre_quant_scale', 'global_amax', '_global_amax']:
val = getattr(weight_quantizer, attr, None)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
try:
setattr(weight_quantizer, attr, val.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
return orig_get_weight_scaling_factor(module, weight_name)
quant_utils.get_weight_scaling_factor = patched_get_weight_scaling_factor
print("✓ Patch 7: get_weight_scaling_factor → force weight + quantizer to CPU")
# ══════════════════════════════════════════════════════════════════════
# Patch 8: get_weight_scaling_factor_2 — force quantizer to CPU
# ══════════════════════════════════════════════════════════════════════
orig_get_weight_scaling_factor_2 = quant_utils.get_weight_scaling_factor_2
def patched_get_weight_scaling_factor_2(module, weight_name="weight"):
weight_quantizer = getattr(module, _quantizer_attr_names(weight_name).weight_quantizer, None)
if weight_quantizer is not None:
for attr in ['_amax', '_pre_quant_scale', 'global_amax', '_global_amax']:
val = getattr(weight_quantizer, attr, None)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
try:
setattr(weight_quantizer, attr, val.cpu())
except (torch.cuda.CudaError, RuntimeError):
pass
return orig_get_weight_scaling_factor_2(module, weight_name)
quant_utils.get_weight_scaling_factor_2 = patched_get_weight_scaling_factor_2
print("✓ Patch 8: get_weight_scaling_factor_2 → force quantizer to CPU")
def snapshot_amax_to_cpu(model, snapshot_path):
"""Walk all quantizers, copy _amax to CPU, save to disk."""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
print(f"\nSnapshotting quantizer _amax to CPU...")
t0 = time.time()
snapshots = {}
n_moved = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
if hasattr(module, '_amax') and module._amax is not None:
amax_cpu = module._amax.detach().cpu().clone()
snapshots[name] = amax_cpu
module._amax.data.copy_(amax_cpu)
n_moved += 1
torch.save(snapshots, snapshot_path)
size_mb = os.path.getsize(snapshot_path) / (1024**2)
print(f"✓ Snapshotted {n_moved} quantizer _amax tensors to CPU ({time.time()-t0:.1f}s)")
print(f" Saved to: {snapshot_path} ({size_mb:.1f} MB)")
return snapshots
def restore_amax_from_snapshot(model, snapshot_path):
"""Restore _amax from a previously saved CPU snapshot."""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
print(f"Restoring _amax from snapshot: {snapshot_path}")
snapshots = torch.load(snapshot_path, map_location='cpu')
n_restored = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
if name in snapshots and hasattr(module, '_amax'):
module._amax.data.copy_(snapshots[name].to(module._amax.device))
n_restored += 1
print(f"✓ Restored {n_restored} _amax tensors from snapshot")
def force_all_amax_to_cpu(model):
"""Force ALL quantizer tensors to CPU."""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
count = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
for attr in ['_amax', '_pre_quant_scale', '_global_amax']:
if hasattr(module, attr):
val = getattr(module, attr)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
setattr(module, attr, val.cpu())
count += 1
print(f"✓ Forced {count} quantizer tensors to CPU")
def save_calibrated_state(model, path):
"""Save model state dict after calibration."""
print(f"\n{'='*60}")
print(f"SAVING CALIBRATED STATE → {path}")
print(f"{'='*60}")
start = time.time()
state = {
'model_state_dict': model.state_dict(),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
}
torch.save(state, path)
size_gb = os.path.getsize(path) / (1024**3)
print(f"✓ Saved calibrated state: {size_gb:.1f} GB ({time.time()-start:.0f}s)")
print(f" Path: {path}")
print(f" Re-run with --export-only to retry export.\n")
def run_calibration(model_path, export_dir, calib_save_path, amax_snapshot_path, calib_size, calib_seq):
"""Full pipeline: parse args via hf_ptq → load → quantize → snapshot → save → export."""
os.chdir(EXAMPLE_DIR)
sys.path.insert(0, EXAMPLE_DIR)
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
from hf_ptq import parse_args, main as hf_main
apply_patches()
# ── Build args using hf_ptq's own parser ──
# This guarantees ALL attributes exist with correct defaults.
# We temporarily replace sys.argv so parse_args() sees our config.
saved_argv = sys.argv
sys.argv = [
"hf_ptq.py",
"--pyt_ckpt_path", model_path,
"--qformat", QUANT,
"--calib_size", str(calib_size),
"--calib_seq", str(calib_seq),
"--kv_cache_qformat", KV_CACHE_QUANT,
"--inference_tensor_parallel", str(TP),
"--export_path", export_dir,
"--trust_remote_code",
"--use_seq_device_map",
"--gpu_max_mem_percentage", str(GPU_MEM_PCT),
"--batch_size", "0",
]
args = parse_args()
sys.argv = saved_argv
# Apply the same post-parse conversions that hf_ptq's __main__ block does
# (these normally run between parse_args() and main() in the original script,
# but since we call main() directly, we have to do them ourselves)
args.dataset = args.dataset.split(",") if isinstance(args.dataset, str) else args.dataset
args.calib_size = [int(num_sample) for num_sample in args.calib_size.split(",")]
# ── Post-calibration hook ──
# We monkey-patch export_quantized to add our defensive saves before export.
import hf_ptq
orig_export_quantized = hf_ptq.export_quantized
def patched_export_quantized(exp_args, full_model, language_model, model_type,
tokenizer, default_padding_side, default_pad_token):
"""Wrapper that snapshots amax and saves state before calling the real export."""
print("\n" + "="*60)
print("POST-CALIBRATION: Snapshotting amax and saving state")
print("="*60)
# Snapshot amax to CPU
snapshot_amax_to_cpu(language_model, amax_snapshot_path)
# Force all quantizer state to CPU
force_all_amax_to_cpu(language_model)
# Free GPU memory
torch.cuda.empty_cache()
gc.collect()
# Save calibrated state
save_calibrated_state(language_model, calib_save_path)
# Now run the real export
orig_export_quantized(exp_args, full_model, language_model, model_type,
tokenizer, default_padding_side, default_pad_token)
hf_ptq.export_quantized = patched_export_quantized
print("✓ Hooked export_quantized with amax snapshot + state save")
# ── Run hf_ptq's full pipeline ──
# This handles model loading, quantization, calibration, and export
# using the exact same code path as the shell script.
hf_main(args)
def run_export_only(calib_save_path, amax_snapshot_path, model_path, export_dir):
"""Load saved calibration state and run export only."""
os.chdir(EXAMPLE_DIR)
sys.path.insert(0, EXAMPLE_DIR)
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
apply_patches()
from example_utils import get_model, get_tokenizer
print(f"Loading model from {model_path}...")
model = get_model(
model_path,
device="cpu",
trust_remote_code=True,
)
tokenizer = get_tokenizer(model_path, trust_remote_code=True)
print(f"Loading calibrated state from {calib_save_path}...")
state = torch.load(calib_save_path, map_location='cpu')
model.load_state_dict(state['model_state_dict'])
print(f"✓ Loaded calibrated state (saved at {state['timestamp']})")
force_all_amax_to_cpu(model)
if amax_snapshot_path and os.path.exists(amax_snapshot_path):
restore_amax_from_snapshot(model, amax_snapshot_path)
torch.cuda.empty_cache()
gc.collect()
from modelopt.torch.export import export_hf_checkpoint
from hf_ptq import load_mtp_weights, copy_custom_model_files
print(f"\n{'='*60}")
print(f"EXPORTING → {export_dir}")
print(f"{'='*60}")
t0 = time.time()
try:
mtp_layer_prefixes, mtp_state_dict = load_mtp_weights(model, model_path)
if mtp_layer_prefixes:
model._mtp_layer_prefixes = mtp_layer_prefixes
export_hf_checkpoint(model, export_dir=export_dir, extra_state_dict=mtp_state_dict)
tokenizer.save_pretrained(export_dir)
copy_custom_model_files(model_path, export_dir, True)
print(f"\n✓ Export complete in {time.time()-t0:.0f}s → {export_dir}")
except Exception as e:
print(f"\n✗ EXPORT FAILED: {e}")
print(f" Calibrated state: {CALIB_SAVE_PATH}")
print(f" Amax snapshots: {AMAX_SNAPSHOT_PATH}")
raise
def run_validate(calib_save_path, amax_snapshot_path):
"""Validate saved calibration state — check amax values are valid."""
print(f"\nValidating calibration state...")
if os.path.exists(amax_snapshot_path):
snapshots = torch.load(amax_snapshot_path, map_location='cpu')
n_total = len(snapshots)
n_valid = n_zero = n_nan = n_neg = 0
for name, amax in snapshots.items():
if torch.any(torch.isnan(amax)):
n_nan += 1
elif torch.any(amax < 0):
n_neg += 1
elif torch.all(amax == 0):
n_zero += 1
else:
n_valid += 1
print(f"\nAmax snapshot validation:")
print(f" Total: {n_total} Valid: {n_valid} Zero: {n_zero} Neg: {n_neg} NaN: {n_nan}")
if n_valid == n_total:
print(f"\n✓ All {n_total} amax snapshots are valid!")
else:
print(f"\n{n_total - n_valid} quantizers have invalid amax!")
else:
print(f"✗ No amax snapshot found at {amax_snapshot_path}")
if os.path.exists(calib_save_path):
size_gb = os.path.getsize(calib_save_path) / (1024**3)
print(f"\nCalibrated state: {calib_save_path} ({size_gb:.1f} GB)")
else:
print(f"\n✗ No calibrated state found at {calib_save_path}")
def main():
parser = argparse.ArgumentParser(description="DeepSeek V4 Pro NVFP4 Quantization")
parser.add_argument("--export-only", action="store_true",
help="Skip calibration, load saved state and run export only")
parser.add_argument("--validate-only", action="store_true",
help="Validate saved calibration state without running anything")
parser.add_argument("--model", default=MODEL, help="Path to BF16 model")
parser.add_argument("--export-dir", default=EXPORT_DIR, help="Export output directory")
parser.add_argument("--calib-save", default=CALIB_SAVE_PATH, help="Calibration state save path")
parser.add_argument("--amax-snapshot", default=AMAX_SNAPSHOT_PATH, help="Amax snapshot path")
parser.add_argument("--calib-size", type=int, default=CALIB_SIZE, help="Calibration samples")
parser.add_argument("--calib-seq", type=int, default=CALIB_SEQ, help="Calibration sequence length")
args = parser.parse_args()
if args.validate_only:
run_validate(args.calib_save, args.amax_snapshot)
elif args.export_only:
if not os.path.exists(args.calib_save):
print(f"ERROR: No calibration state found at {args.calib_save}")
sys.exit(1)
run_export_only(args.calib_save, args.amax_snapshot, args.model, args.export_dir)
else:
run_calibration(args.model, args.export_dir, args.calib_save,
args.amax_snapshot, args.calib_size, args.calib_seq)
if __name__ == "__main__":
main()

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""
DeepSeek V4 Pro NVFP4 — vLLM OpenAI-compatible server.
Run from the venv on the B200 node:
source /root/nvidia-meeting/venv/bin/activate
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/serve_vllm.py
Or in the background:
nohup python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/serve_vllm.py \
> /root/nvidia-meeting/vllm_serve.log 2>&1 &
"""
import subprocess
import sys
# ── Patch: Add compress_ratios to DeepseekV4Config ──────────────────────────
# transformers 5.8.0 renamed compress_ratios → compress_rates (dict format).
# vllm 0.20.2 still expects compress_ratios as a list indexed by layer_id.
# We patch the Config class to expose compress_ratios as a property that
# converts the new dict format back to the list format vllm expects.
import transformers
try:
from transformers import DeepseekV4Config
_orig_init = DeepseekV4Config.__init__
def _patched_init(self, *args, **kwargs):
_orig_init(self, *args, **kwargs)
# If compress_ratios already exists as a list, leave it alone
if hasattr(self, 'compress_ratios') and isinstance(self.compress_ratios, list):
return
# Convert compress_rates dict → compress_ratios list
if hasattr(self, 'compress_rates') and isinstance(self.compress_rates, dict):
rates = self.compress_rates
# Build per-layer list from the dict schema
# V4 pattern: layers 0-1=128, then alternating 4/128, last=0
n_layers = getattr(self, 'num_hidden_layers', 61)
cr = rates.get('compressed_sparse_attention', 4)
hr = rates.get('heavily_compressed_attention', 128)
ratios = []
for i in range(n_layers):
if i < 2:
ratios.append(hr)
elif i == n_layers - 1:
ratios.append(0)
else:
ratios.append(cr if i % 2 == 0 else hr)
self.compress_ratios = ratios
elif hasattr(self, 'compress_rates') and isinstance(self.compress_rates, list):
self.compress_ratios = self.compress_rates
DeepseekV4Config.__init__ = _patched_init
print("✓ Patched DeepseekV4Config.__init__ to add compress_ratios")
except ImportError:
print("⚠ DeepseekV4Config not found, skipping compress_ratios patch")
MODEL = "/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4"
# These flags are critical for V4 — do not change without understanding why:
# --trust-remote-code V4 needs custom modeling code
# --kv-cache-dtype fp8 Match our kv_cache_qformat=fp8_cast quantization
# --block-size 256 V4 recommended block size
# --enable-expert-parallel Distribute expert computation across GPUs (critical for 256-expert MoE)
# --tensor-parallel-size 8 8× B200
# --compilation-config CUDA graphs for throughput — FULL_AND_PIECEWISE + all custom ops
# --attention_config FP4 indexer cache for V4 MLA attention
# --moe-backend deep_gemm_mega_moe — optimized MoE kernel for Blackwell
# --tokenizer-mode deepseek_v4 — V4-specific tokenizer
# --tool-call-parser deepseek_v4 — native tool calling
# --enable-auto-tool-choice Auto tool choice for function calling
# --reasoning-parser deepseek_v4 — reasoning/thinking output parsing
# --speculative_config MTP speculative decoding (2 speculative tokens)
cmd = [
sys.executable, "-m", "vllm.entrypoints.openai.api_server",
"--model", MODEL,
"--trust-remote-code",
"--kv-cache-dtype", "fp8",
"--block-size", "256",
"--enable-expert-parallel",
"--tensor-parallel-size", "8",
"--compilation-config", '{"cudagraph_mode":"FULL_AND_PIECEWISE", "custom_ops":["all"]}',
"--attention_config.use_fp4_indexer_cache=True",
"--moe-backend", "deep_gemm_mega_moe", # WARN: No NVFP4 mega_moe kernel. Use docker-compose (omits this flag) instead.
"--tokenizer-mode", "deepseek_v4",
"--tool-call-parser", "deepseek_v4",
"--enable-auto-tool-choice",
"--reasoning-parser", "deepseek_v4",
"--speculative_config", '{"method":"mtp","num_speculative_tokens":2}',
"--host", "0.0.0.0",
"--port", "8000",
]
print(f"Starting vLLM server for {MODEL}")
print(f"Command: {' '.join(cmd)}")
print(f"Log: /root/nvidia-meeting/vllm_serve.log")
print()
sys.exit(subprocess.call(cmd))

179
verify_nvfp4.py Normal file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""Sanity check an NVFP4 DeepSeek V4 Pro checkpoint.
Two modes:
1) --tensor-only (default): no model loading. Just inspects the safetensors
shards: confirms NVFP4 packing structure (uint8 weight + FP8 weight_scale
+ FP32 weight_scale_2), checks for NaN/Inf in scales, samples a few
dequantizations to confirm they look plausible.
2) --vllm: tries to load the model with vLLM and generate a few tokens.
Requires vLLM with NVFP4 support (SM100+ Blackwell GPU).
Usage:
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming
python verify_nvfp4.py DeepSeek-V4-Pro-NVFP4-streaming --vllm
"""
import argparse
import json
import sys
from pathlib import Path
import torch
from safetensors import safe_open
FP4_E2M1_VALUES = torch.tensor(
[0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 6.0,
-0.0, -0.5, -1.0, -1.5, -2.0, -3.0, -4.0, -6.0],
dtype=torch.float32,
)
def unpack_fp4(packed: torch.Tensor) -> torch.Tensor:
"""Reverse the (low | high<<4) byte pack into a [M, N] tensor of FP4 indices."""
low = packed & 0x0F
high = (packed >> 4) & 0x0F
M, N_half = packed.shape
out = torch.empty(M, N_half * 2, dtype=torch.uint8)
out[:, ::2] = low
out[:, 1::2] = high
return out
def dequant_nvfp4(packed_uint8, weight_scale_fp8, weight_scale_2_fp32):
"""Reconstruct FP32 values from NVFP4 storage."""
fp4_idx = unpack_fp4(packed_uint8)
values = FP4_E2M1_VALUES[fp4_idx.long()] # [M, N]
M, N = values.shape
# Per-block scale broadcast back over 16 elements
scale_blocks = weight_scale_fp8.float() # [M, N//16]
scale_per_elem = scale_blocks.unsqueeze(-1).expand(-1, -1, 16).reshape(M, N)
return values * scale_per_elem * weight_scale_2_fp32.float()
def tensor_only_check(model_dir: Path):
index_path = model_dir / "model.safetensors.index.json"
if not index_path.exists():
sys.exit(f"No index.json at {model_dir}")
with open(index_path) as f:
index = json.load(f)
weight_map = index["weight_map"]
# Find one quantized weight to sample
sample = None
for name, fn in weight_map.items():
if name.endswith(".weight") and (name.replace(".weight", ".weight_scale") in weight_map):
sample = name
break
if not sample:
sys.exit("Couldn't find an NVFP4-quantized weight (expected *.weight_scale companion).")
print(f"Sampling: {sample}")
shard_fn = weight_map[sample]
scale_name = sample.replace(".weight", ".weight_scale")
scale_2_name = sample.replace(".weight", ".weight_scale_2")
scale_shard = weight_map[scale_name]
scale_2_shard = weight_map[scale_2_name]
def open_get(fn, name):
with safe_open(model_dir / fn, framework="pt") as f:
return f.get_tensor(name)
packed = open_get(shard_fn, sample)
weight_scale = open_get(scale_shard, scale_name)
weight_scale_2 = open_get(scale_2_shard, scale_2_name)
print(f" packed: shape={tuple(packed.shape)} dtype={packed.dtype}")
print(f" weight_scale: shape={tuple(weight_scale.shape)} dtype={weight_scale.dtype}")
print(f" weight_scale_2: shape={tuple(weight_scale_2.shape)} dtype={weight_scale_2.dtype} "
f"value={weight_scale_2.float().item():.6e}")
# Structural assertions
M = packed.shape[0]
assert packed.dtype == torch.uint8, f"packed should be uint8, got {packed.dtype}"
assert weight_scale.dtype == torch.float8_e4m3fn, \
f"weight_scale should be FP8 E4M3, got {weight_scale.dtype}"
assert weight_scale.shape == (M, packed.shape[1] * 2 // 16), \
f"weight_scale shape {weight_scale.shape} doesn't match expected (M, N/16)"
# Check for NaN/Inf in scales
s_fp32 = weight_scale.float()
assert torch.isfinite(s_fp32).all(), "weight_scale contains NaN/Inf"
assert torch.isfinite(weight_scale_2.float()).all(), "weight_scale_2 is NaN/Inf"
print(f" scales: all finite ✓")
print(f" weight_scale stats: min={s_fp32.min().item():.3e} max={s_fp32.max().item():.3e} "
f"mean={s_fp32.mean().item():.3e}")
# Spot-check dequantization
print("\nDequantizing first 4x32 block for visual check:")
rec = dequant_nvfp4(packed[:4, :16], weight_scale[:4, :2], weight_scale_2)
print(rec)
assert torch.isfinite(rec).all(), "Dequantized values contain NaN/Inf"
print(f" dequant: all finite ✓")
print(f" dequant range: [{rec.min().item():.4f}, {rec.max().item():.4f}]")
# Count what's quantized vs preserved across the whole model
quantized_weights = []
preserved = []
for name in weight_map:
if name.endswith(".weight"):
if name.replace(".weight", ".weight_scale") in weight_map:
quantized_weights.append(name)
else:
preserved.append(name)
print(f"\nWhole-model summary:")
print(f" Quantized .weight tensors: {len(quantized_weights):,}")
print(f" Preserved .weight tensors: {len(preserved):,}")
print(f" Total tensors in index: {len(weight_map):,}")
# Show a few preserved names to confirm the right things stayed in higher precision
print(f"\n Sample preserved tensors (should be lm_head, embed, gates, norms, etc.):")
for n in preserved[:10]:
print(f" {n}")
def vllm_check(model_dir: Path):
print("Loading model with vLLM... (requires Blackwell GPU + vLLM with NVFP4 support)")
from vllm import LLM, SamplingParams
llm = LLM(
model=str(model_dir),
trust_remote_code=True,
quantization="compressed-tensors",
dtype="auto",
tensor_parallel_size=8,
max_model_len=8192,
)
sampling = SamplingParams(temperature=1.0, top_p=1.0, max_tokens=64)
prompts = [
"Write a short poem about quantization:",
"What is 17 * 23?",
"Explain MoE routing in one sentence.",
]
outputs = llm.generate(prompts, sampling)
for o in outputs:
print("=" * 60)
print("PROMPT:", o.prompt)
print("OUTPUT:", o.outputs[0].text)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("model_dir")
ap.add_argument("--vllm", action="store_true")
args = ap.parse_args()
model_dir = Path(args.model_dir)
tensor_only_check(model_dir)
if args.vllm:
print("\n" + "=" * 60)
vllm_check(model_dir)
if __name__ == "__main__":
main()