Files
deepseek-v4-quant/scripts/quantize_nvfp4.py

356 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
DeepSeek V4 Pro NVFP4 quantization.
Runs the full ModelOpt PTQ pipeline in-process (not wrapping the shell script),
saves model state after calibration (so we don't lose 6 hours of work to an
export crash), and patches the export path to handle stale GPU tensors.
Usage:
# Full run (calibrate + export):
python3 scripts/quantize_nvfp4.py
# Re-run export only (after a calibration save exists):
python3 scripts/quantize_nvfp4.py --export-only
Pipeline:
1. Load BF16 model with sequential device map
2. Patch modelopt for V4 compatibility
3. Quantize + calibrate (5-6 hours)
4. SAVE model state to disk checkpoint so export failures don't waste calibration
5. Export to HF safetensors
"""
import argparse
import copy
import os
import sys
import time
import warnings
import torch
# ── Config ──────────────────────────────────────────────────────────────────
MODEL = "/root/nvidia-meeting/DeepSeek-V4-Pro-BF16"
QUANT = "nvfp4"
TP = 8
CALIB_SIZE = 128
CALIB_SEQ = 512
KV_CACHE_QUANT = "fp8_cast"
GPU_MEM_PCT = 0.7
HF_TOKEN = "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"
# Output paths
SCRIPT_DIR = "/root/nvidia-meeting/modelopt-repo/examples/llm_ptq" # needed for example_utils imports
EXPORT_DIR = "/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4"
CALIB_SAVE_PATH = "/root/nvidia-meeting/v4_nvfp4_calibrated_state.pt"
def apply_patches():
"""Apply runtime patches for V4 compatibility."""
# 1. Patch quant_module.py for V4's ModuleList expert quantizers
from modelopt.torch.quantization.nn import quant_module
orig_iter = quant_module._QuantFusedExperts.iter_weights_for_calibration
def patched_iter_weights_for_calibration(self, **kwargs):
"""Handle V4's nn.ModuleList expert quantizers (vs singular TensorQuantizer)."""
for name, quantizer in self.named_modules():
if not isinstance(quantizer, quant_module.TensorQuantizer):
continue
if quantizer.is_enabled:
yield name, quantizer
quant_module._QuantFusedExperts.iter_weights_for_calibration = patched_iter_weights_for_calibration
print("✓ Patched _QuantFusedExperts.iter_weights_for_calibration for V4 ModuleList")
# 2. Patch nvfp4_tensor.get_activation_scaling_factor to move amax to CPU first
from modelopt.torch.quantization.qtensor import nvfp4_tensor
orig_get_asf = nvfp4_tensor.NVFP4QTensor.get_activation_scaling_factor
@classmethod
def patched_get_activation_scaling_factor(cls, quantizer):
"""Move amax to CPU before export to avoid stale GPU tensor reads."""
if not quantizer.is_enabled:
return None
try:
amax = quantizer.export_amax()
except (torch.cuda.CudaError, RuntimeError) as e:
# GPU tensor is corrupted — try moving _amax to CPU first then retry
print(f" WARNING: export_amax() failed ({e}), attempting CPU recovery...")
if hasattr(quantizer, '_amax') and quantizer._amax is not None:
quantizer._amax = quantizer._amax.cpu()
amax = quantizer.export_amax()
if amax is None:
return None
# Move to CPU for safety
amax = amax.cpu()
activation_scaling_factor = amax.float() / (quantizer.maxbound * 448.0)
# Replace hard assert with warning + clamp (invalid values from GPU corruption)
if not torch.all(activation_scaling_factor > 0):
n_bad = (activation_scaling_factor <= 0).sum().item()
n_total = activation_scaling_factor.numel()
print(f" WARNING: {n_bad}/{n_total} activation scaling factors <= 0, clamping to tiny")
activation_scaling_factor = activation_scaling_factor.clamp(min=torch.finfo(torch.float32).tiny)
return activation_scaling_factor
nvfp4_tensor.NVFP4QTensor.get_activation_scaling_factor = patched_get_activation_scaling_factor
print("✓ Patched NVFP4QTensor.get_activation_scaling_factor (CPU safety + graceful degradation)")
# 3. Patch tensor_quantizer.export_amax to move _amax to CPU before reading
from modelopt.torch.quantization.nn.modules import tensor_quantizer as tq_module
orig_export_amax = tq_module.TensorQuantizer.export_amax
def patched_export_amax(self):
"""Move _amax to CPU before export to prevent CUDA illegal memory access."""
if self.amax is not None and self.amax.is_cuda:
self._amax = self._amax.cpu()
return orig_export_amax(self)
tq_module.TensorQuantizer.export_amax = patched_export_amax
print("✓ Patched TensorQuantizer.export_amax (CPU safety)")
def move_quantizers_to_cpu(model):
"""Move all quantizer amax tensors to CPU to prevent stale GPU reads during export."""
count = 0
for name, module in model.named_modules():
if hasattr(module, '_amax') and module._amax is not None:
if module._amax.is_cuda:
module._amax = module._amax.cpu()
count += 1
print(f"✓ Moved {count} quantizer _amax tensors to CPU")
def save_calibrated_state(model, path):
"""Save model state dict + quantizer metadata after calibration.
This is the insurance policy: if export crashes, we can reload
and retry export without re-running 6 hours of calibration.
"""
print(f"\n{'='*60}")
print(f"SAVING CALIBRATED STATE → {path}")
print(f"{'='*60}")
start = time.time()
# Move quantizers to CPU first
move_quantizers_to_cpu(model)
state = {
'model_state_dict': model.state_dict(),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
}
torch.save(state, path)
size_gb = os.path.getsize(path) / (1024**3)
print(f"✓ Saved calibrated state: {size_gb:.1f} GB ({time.time()-start:.0f}s)")
print(f" Path: {path}")
print(f" This allows re-running export without re-calibrating.\n")
def load_calibrated_state(model, path):
"""Load previously saved calibrated state into model."""
print(f"Loading calibrated state from {path}...")
state = torch.load(path, map_location='cpu')
model.load_state_dict(state['model_state_dict'])
print(f"✓ Loaded calibrated state (saved at {state['timestamp']})")
def run_calibration(model_path, export_dir, calib_save_path):
"""Full pipeline: load → quantize → calibrate → save → export."""
# Must be in the example dir for the relative imports (example_utils, etc.)
os.chdir(SCRIPT_DIR)
sys.path.insert(0, SCRIPT_DIR)
from hf_ptq import get_model, get_tokenizer, make_calib_dataloader, pre_quantize
from modelopt.torch import quantization as mtq
from modelopt.torch.quantization.config import need_calibration, QUANT_CFG_CHOICES
from modelopt.torch.utils.dataset_utils import get_max_batch_size
from hf_ptq import build_quant_cfg
# Apply patches before loading model
apply_patches()
# ── Load model ──
print(f"\nLoading model from {model_path}...")
t0 = time.time()
# Set HF token for gated datasets
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
from transformers import AutoModelForCausalLM, AutoTokenizer
from accelerate import infer_auto_device_map
# Load with sequential device map (model doesn't fit in GPU VRAM alone)
model = AutoModelForCausalLM.from_pretrained(
model_path,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="sequential",
offload_folder="offload",
)
print(f"✓ Model loaded in {time.time()-t0:.0f}s")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
# ── Setup quantization config ──
quant_cfg = copy.deepcopy(QUANT_CFG_CHOICES[QUANT])
quant_cfg = build_quant_cfg(QUANT, quant_cfg, None, None, None)
# KV cache quantization
if KV_CACHE_QUANT != "none":
quant_cfg = mtq.update_quant_cfg_with_kv_cache_quant(
quant_cfg,
getattr(mtq, mtq.KV_QUANT_CFG_CHOICES[KV_CACHE_QUANT])["quant_cfg"],
)
print(f"✓ KV cache quantization: {KV_CACHE_QUANT}")
# ── Detect batch size ──
print("\nDetecting max calibration batch size...")
batch_size = get_max_batch_size(
model,
max_sample_length=CALIB_SEQ,
sample_memory_usage_ratio=1.1,
)
batch_size = min(batch_size, CALIB_SIZE)
print(f"✓ Using calibration batch_size={batch_size}")
# ── Prepare dataloader ──
calib_dataloader, _ = make_calib_dataloader(
argparse.Namespace(
calib_size=[CALIB_SIZE],
calib_seq=CALIB_SEQ,
calib_dataset="",
batch_size=batch_size,
calib_batch_size=0,
),
model, None, tokenizer, torch.device("cuda"), None,
)
# ── Quantize + Calibrate ──
print(f"\n{'='*60}")
print(f"QUANTIZING: {QUANT} with {CALIB_SIZE} calibration samples")
print(f"{'='*60}")
t0 = time.time()
model = mtq.quantize(model, quant_cfg, forward_loop=calib_dataloader)
print(f"✓ Quantization + calibration complete in {time.time()-t0:.0f}s")
# ── SAVE STATE (the whole point of this script) ──
save_calibrated_state(model, calib_save_path)
# ── Export ──
run_export(model, tokenizer, model_path, export_dir)
def run_export(model, tokenizer, model_path, export_dir):
"""Export the quantized model to HF safetensors format."""
from modelopt.torch.export import export_hf_checkpoint
from hf_ptq import load_mtp_weights
print(f"\n{'='*60}")
print(f"EXPORTING → {export_dir}")
print(f"{'='*60}")
# Move quantizers to CPU before export
move_quantizers_to_cpu(model)
t0 = time.time()
try:
# Load MTP weights if present
mtp_layer_prefixes, mtp_state_dict = load_mtp_weights(model, model_path)
if mtp_layer_prefixes:
model._mtp_layer_prefixes = mtp_layer_prefixes
export_hf_checkpoint(
model,
export_dir=export_dir,
extra_state_dict=mtp_state_dict,
)
# Save tokenizer
tokenizer.save_pretrained(export_dir)
# Copy custom model files
from hf_ptq import copy_custom_model_files
copy_custom_model_files(model_path, export_dir, True)
elapsed = time.time() - t0
print(f"\n✓ Export complete in {elapsed:.0f}s → {export_dir}")
except Exception as e:
print(f"\n✗ EXPORT FAILED: {e}")
print(f" Calibrated state is saved at: {CALIB_SAVE_PATH}")
print(f" Re-run with --export-only to retry export")
raise
def run_export_only(calib_save_path, model_path, export_dir):
"""Load previously saved calibration state and run export only."""
os.chdir(SCRIPT_DIR)
sys.path.insert(0, SCRIPT_DIR)
apply_patches()
from transformers import AutoModelForCausalLM, AutoTokenizer
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
# Load a fresh model (we just need the architecture, then overlay the state)
print(f"Loading model skeleton from {model_path}...")
model = AutoModelForCausalLM.from_pretrained(
model_path,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="cpu", # Don't load onto GPU yet
)
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
# Load the calibrated state
load_calibrated_state(model, calib_save_path)
# Export
run_export(model, tokenizer, model_path, export_dir)
def main():
parser = argparse.ArgumentParser(description="DeepSeek V4 Pro NVFP4 Quantization")
parser.add_argument("--export-only", action="store_true",
help="Skip calibration, load saved state and run export only")
parser.add_argument("--model", default=MODEL, help="Path to BF16 model")
parser.add_argument("--export-dir", default=EXPORT_DIR, help="Export output directory")
parser.add_argument("--calib-save", default=CALIB_SAVE_PATH, help="Calibration state save path")
parser.add_argument("--calib-size", type=int, default=CALIB_SIZE, help="Calibration samples")
parser.add_argument("--calib-seq", type=int, default=CALIB_SEQ, help="Calibration sequence length")
args = parser.parse_args()
if args.export_only:
if not os.path.exists(args.calib_save):
print(f"ERROR: No calibration state found at {args.calib_save}")
print("Run without --export-only first to calibrate.")
sys.exit(1)
run_export_only(args.calib_save, args.model, args.export_dir)
else:
run_calibration(args.model, args.export_dir, args.calib_save)
if __name__ == "__main__":
main()