Files
deepseek-v4-quant/scripts/quantize_nvfp4.py

466 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
DeepSeek V4 Pro NVFP4 quantization defensive edition.
Runs the full ModelOpt PTQ pipeline with maximum protection against GPU tensor
corruption that crashes the export after 6 hours of calibration.
Key defense: immediately after calibration, every quantizer _amax tensor is
snapshotted to CPU. Then the model state is saved to disk. If export crashes,
the state can be reloaded and export retried without re-calibrating.
The _amax tensors are tiny (scalars and small vectors). Snapshotting ~49K of them
to CPU costs almost nothing in memory and guarantees we have valid calibration
data regardless of what CUDA does to the GPU copies afterward.
Must be run from the modelopt example directory for imports:
cd /root/nvidia-meeting/modelopt-repo/examples/llm_ptq
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py
Usage:
# Full run (calibrate + export):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py
# Re-run export only (after a calibration save exists):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py --export-only
# Validate saved calibration state (check amax values):
python3 /root/nvidia-meeting/deepseek-v4-quant/scripts/quantize_nvfp4.py --validate-only
"""
import argparse
import copy
import gc
import os
import sys
import time
import warnings
import torch
# ── Config ──────────────────────────────────────────────────────────────────
MODEL = "/root/nvidia-meeting/DeepSeek-V4-Pro-BF16"
QUANT = "nvfp4"
TP = 8
CALIB_SIZE = 128
CALIB_SEQ = 512
KV_CACHE_QUANT = "fp8_cast"
GPU_MEM_PCT = 0.7
HF_TOKEN = "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"
# Paths
EXAMPLE_DIR = "/root/nvidia-meeting/modelopt-repo/examples/llm_ptq"
EXPORT_DIR = "/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4"
CALIB_SAVE_PATH = "/root/nvidia-meeting/v4_nvfp4_calibrated_state.pt"
AMAX_SNAPSHOT_PATH = "/root/nvidia-meeting/v4_nvfp4_amax_snapshots.pt"
def apply_patches():
"""Apply runtime patches for V4 compatibility and GPU tensor safety."""
from modelopt.torch.quantization.nn.modules import tensor_quantizer as tq_module
from modelopt.torch.quantization.qtensor import nvfp4_tensor
# ── Patch 1: load_calib_amax — force _amax to CPU after calibration ──
#
# load_calib_amax() is called by max_calibrate() after the forward loop
# finishes. It writes _amax to GPU by default. We patch it so _amax
# goes to CPU immediately, preventing GPU corruption during the long
# wait before export.
orig_load_calib_amax = tq_module.TensorQuantizer.load_calib_amax
def patched_load_calib_amax(self, *args, **kwargs):
orig_load_calib_amax(self, *args, **kwargs)
if hasattr(self, '_amax') and self._amax is not None:
self._amax = self._amax.cpu()
tq_module.TensorQuantizer.load_calib_amax = patched_load_calib_amax
print("✓ Patched TensorQuantizer.load_calib_amax (force _amax to CPU)")
# ── Patch 2: export_amax — CPU safety ──
# If any _amax is still on GPU at export time, move it before reading.
orig_export_amax = tq_module.TensorQuantizer.export_amax
def patched_export_amax(self):
if self.amax is not None and self.amax.is_cuda:
self._amax = self._amax.cpu()
return orig_export_amax(self)
tq_module.TensorQuantizer.export_amax = patched_export_amax
print("✓ Patched TensorQuantizer.export_amax (CPU fallback)")
# ── Patch 3: NVFP4QTensor.get_activation_scaling_factor — graceful degradation ──
@classmethod
def patched_get_activation_scaling_factor(cls, quantizer):
if not quantizer.is_enabled:
return None
try:
amax = quantizer.export_amax()
except (torch.cuda.CudaError, RuntimeError) as e:
print(f" WARNING: export_amax() failed ({e}), attempting CPU recovery...")
if hasattr(quantizer, '_amax') and quantizer._amax is not None:
quantizer._amax = quantizer._amax.cpu()
amax = quantizer.export_amax()
if amax is None:
return None
amax = amax.cpu()
activation_scaling_factor = amax.float() / (quantizer.maxbound * 448.0)
if not torch.all(activation_scaling_factor > 0):
n_bad = (activation_scaling_factor <= 0).sum().item()
n_total = activation_scaling_factor.numel()
print(f" WARNING: {n_bad}/{n_total} activation scaling factors <= 0, clamping")
activation_scaling_factor = activation_scaling_factor.clamp(min=torch.finfo(torch.float32).tiny)
return activation_scaling_factor
nvfp4_tensor.NVFP4QTensor.get_activation_scaling_factor = patched_get_activation_scaling_factor
print("✓ Patched NVFP4QTensor.get_activation_scaling_factor (CPU + clamp)")
def snapshot_amax_to_cpu(model, snapshot_path):
"""Walk all quantizers, copy their _amax to CPU, save to disk.
After calibration completes, the _amax tensors are fresh and valid on GPU.
We copy them to CPU immediately and save to disk. This costs almost nothing
(~50MB for ~49K quantizers) but guarantees we have valid calibration data
even if CUDA corrupts the GPU copies later.
"""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
print(f"\nSnapshotting quantizer _amax to CPU...")
t0 = time.time()
snapshots = {}
n_moved = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
if hasattr(module, '_amax') and module._amax is not None:
amax_cpu = module._amax.detach().cpu().clone()
snapshots[name] = amax_cpu
module._amax.data.copy_(amax_cpu)
n_moved += 1
torch.save(snapshots, snapshot_path)
size_mb = os.path.getsize(snapshot_path) / (1024**2)
print(f"✓ Snapshotted {n_moved} quantizer _amax tensors to CPU ({time.time()-t0:.1f}s)")
print(f" Saved to: {snapshot_path} ({size_mb:.1f} MB)")
return snapshots
def restore_amax_from_snapshot(model, snapshot_path):
"""Restore _amax from a previously saved CPU snapshot."""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
print(f"Restoring _amax from snapshot: {snapshot_path}")
snapshots = torch.load(snapshot_path, map_location='cpu')
n_restored = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
if name in snapshots and hasattr(module, '_amax'):
module._amax.data.copy_(snapshots[name].to(module._amax.device))
n_restored += 1
print(f"✓ Restored {n_restored} _amax tensors from snapshot")
def force_all_amax_to_cpu(model):
"""Force ALL quantizer tensors to CPU. Nuclear option after calibration."""
from modelopt.torch.quantization.nn.modules.tensor_quantizer import TensorQuantizer
count = 0
for name, module in model.named_modules():
if not isinstance(module, TensorQuantizer):
continue
for attr in ['_amax', '_pre_quant_scale', '_global_amax']:
if hasattr(module, attr):
val = getattr(module, attr)
if val is not None and isinstance(val, torch.Tensor) and val.is_cuda:
setattr(module, attr, val.cpu())
count += 1
print(f"✓ Forced {count} quantizer tensors to CPU")
def save_calibrated_state(model, path):
"""Save model state dict after calibration."""
print(f"\n{'='*60}")
print(f"SAVING CALIBRATED STATE → {path}")
print(f"{'='*60}")
start = time.time()
state = {
'model_state_dict': model.state_dict(),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
}
torch.save(state, path)
size_gb = os.path.getsize(path) / (1024**3)
print(f"✓ Saved calibrated state: {size_gb:.1f} GB ({time.time()-start:.0f}s)")
print(f" Path: {path}")
print(f" Re-run with --export-only to retry export.\n")
def run_calibration(model_path, export_dir, calib_save_path, amax_snapshot_path, calib_size, calib_seq):
"""Full pipeline: load → quantize → calibrate → snapshot → save → export."""
os.chdir(EXAMPLE_DIR)
sys.path.insert(0, EXAMPLE_DIR)
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
# Import from hf_ptq and modelopt — all verified against the example script
from example_utils import get_model, get_tokenizer
from hf_ptq import (
make_calib_dataloader,
build_quant_cfg,
load_mtp_weights,
copy_custom_model_files,
QUANT_CFG_CHOICES,
KV_QUANT_CFG_CHOICES,
)
from modelopt.torch import quantization as mtq
from modelopt.torch.quantization.config import need_calibration
from modelopt.torch.utils.dataset_utils import get_max_batch_size
from modelopt.torch.export import export_hf_checkpoint
apply_patches()
# ── Load model ──
# Use modelopt's get_model() — handles max_memory properly for 3TB model.
# Raw AutoModelForCausalLM.from_pretrained OOMs during expert weight conversion.
print(f"\nLoading model from {model_path}...")
t0 = time.time()
model = get_model(
model_path,
gpu_mem_percentage=GPU_MEM_PCT,
trust_remote_code=True,
use_seq_device_map=True,
)
tokenizer = get_tokenizer(model_path, trust_remote_code=True)
print(f"✓ Model loaded in {time.time()-t0:.0f}s")
# ── Setup quantization config ──
# Same flow as hf_ptq's quantize_main()
quant_cfg = copy.deepcopy(QUANT_CFG_CHOICES[QUANT])
quant_cfg = build_quant_cfg(QUANT, quant_cfg, None, None, None)
if KV_CACHE_QUANT != "none":
enable_quant_kv_cache = True
print(f"✓ KV cache quantization: {KV_CACHE_QUANT}")
quant_cfg = mtq.update_quant_cfg_with_kv_cache_quant(
quant_cfg,
getattr(mtq, KV_QUANT_CFG_CHOICES[KV_CACHE_QUANT])["quant_cfg"],
)
else:
enable_quant_kv_cache = False
# ── Detect batch size ──
# Same as hf_ptq's quantize_main()
print("\nDetecting max calibration batch size...")
batch_size = get_max_batch_size(
model,
max_sample_length=calib_seq,
sample_memory_usage_ratio=1.1,
)
batch_size = min(batch_size, calib_size)
print(f"✓ Using calibration batch_size={batch_size}")
# ── Prepare dataloader ──
# Same args structure as hf_ptq
args = argparse.Namespace(
calib_size=[calib_size],
calib_seq=calib_seq,
calib_dataset="",
dataset=None, # None triggers default: ["cnn_dailymail", "nemotron-post-training-dataset-v2"]
batch_size=batch_size,
calib_batch_size=0,
calib_with_images=False,
auto_quantize_bits=None,
auto_quantize_method=None,
specdec_offline_dataset=None,
inference_pipeline_parallel=1,
)
calib_dataloader, _ = make_calib_dataloader(
args, model, None, tokenizer, torch.device("cuda"), None,
)
# ── Quantize + Calibrate ──
print(f"\n{'='*60}")
print(f"QUANTIZING: {QUANT} with {calib_size} calibration samples")
print(f"{'='*60}")
t0 = time.time()
model = mtq.quantize(model, quant_cfg, forward_loop=calib_dataloader)
print(f"✓ Quantization + calibration complete in {time.time()-t0:.0f}s")
# ── IMMEDIATELY snapshot all _amax to CPU ──
snapshots = snapshot_amax_to_cpu(model, amax_snapshot_path)
# ── Force ALL quantizer state to CPU ──
force_all_amax_to_cpu(model)
# ── Free GPU memory ──
torch.cuda.empty_cache()
gc.collect()
# ── SAVE STATE ──
save_calibrated_state(model, calib_save_path)
# ── Export ──
run_export(model, tokenizer, model_path, export_dir, amax_snapshot_path)
def run_export(model, tokenizer, model_path, export_dir, amax_snapshot_path=None):
"""Export the quantized model to HF safetensors format."""
from modelopt.torch.export import export_hf_checkpoint
from hf_ptq import load_mtp_weights, copy_custom_model_files
print(f"\n{'='*60}")
print(f"EXPORTING → {export_dir}")
print(f"{'='*60}")
force_all_amax_to_cpu(model)
if amax_snapshot_path and os.path.exists(amax_snapshot_path):
restore_amax_from_snapshot(model, amax_snapshot_path)
torch.cuda.empty_cache()
gc.collect()
t0 = time.time()
try:
mtp_layer_prefixes, mtp_state_dict = load_mtp_weights(model, model_path)
if mtp_layer_prefixes:
model._mtp_layer_prefixes = mtp_layer_prefixes
export_hf_checkpoint(
model,
export_dir=export_dir,
extra_state_dict=mtp_state_dict,
)
tokenizer.save_pretrained(export_dir)
copy_custom_model_files(model_path, export_dir, True)
print(f"\n✓ Export complete in {time.time()-t0:.0f}s → {export_dir}")
except Exception as e:
print(f"\n✗ EXPORT FAILED: {e}")
print(f" Calibrated state: {CALIB_SAVE_PATH}")
print(f" Amax snapshots: {AMAX_SNAPSHOT_PATH}")
print(f" Re-run with --export-only to retry")
raise
def run_export_only(calib_save_path, amax_snapshot_path, model_path, export_dir):
"""Load saved calibration state and run export only."""
os.chdir(EXAMPLE_DIR)
sys.path.insert(0, EXAMPLE_DIR)
os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
apply_patches()
from example_utils import get_model, get_tokenizer
print(f"Loading model from {model_path}...")
model = get_model(
model_path,
device="cpu",
trust_remote_code=True,
)
tokenizer = get_tokenizer(model_path, trust_remote_code=True)
print(f"Loading calibrated state from {calib_save_path}...")
state = torch.load(calib_save_path, map_location='cpu')
model.load_state_dict(state['model_state_dict'])
print(f"✓ Loaded calibrated state (saved at {state['timestamp']})")
run_export(model, tokenizer, model_path, export_dir, amax_snapshot_path)
def run_validate(calib_save_path, amax_snapshot_path):
"""Validate saved calibration state — check amax values are valid."""
print(f"\nValidating calibration state...")
if os.path.exists(amax_snapshot_path):
snapshots = torch.load(amax_snapshot_path, map_location='cpu')
n_total = len(snapshots)
n_valid = 0
n_zero = 0
n_nan = 0
n_neg = 0
for name, amax in snapshots.items():
if torch.any(torch.isnan(amax)):
n_nan += 1
elif torch.any(amax < 0):
n_neg += 1
elif torch.all(amax == 0):
n_zero += 1
else:
n_valid += 1
print(f"\nAmax snapshot validation:")
print(f" Total quantizers: {n_total}")
print(f" Valid: {n_valid}")
print(f" All zeros: {n_zero}")
print(f" Negative: {n_neg}")
print(f" NaN: {n_nan}")
if n_valid == n_total:
print(f"\n✓ All {n_total} amax snapshots are valid!")
else:
print(f"\n{n_total - n_valid} quantizers have invalid amax!")
else:
print(f"✗ No amax snapshot found at {amax_snapshot_path}")
if os.path.exists(calib_save_path):
size_gb = os.path.getsize(calib_save_path) / (1024**3)
print(f"\nCalibrated state: {calib_save_path} ({size_gb:.1f} GB)")
else:
print(f"\n✗ No calibrated state found at {calib_save_path}")
def main():
parser = argparse.ArgumentParser(description="DeepSeek V4 Pro NVFP4 Quantization")
parser.add_argument("--export-only", action="store_true",
help="Skip calibration, load saved state and run export only")
2026-05-09 08:02:09 +00:00
parser.add_argument("--validate-only", action="store_true",
help="Validate saved calibration state without running anything")
parser.add_argument("--model", default=MODEL, help="Path to BF16 model")
parser.add_argument("--export-dir", default=EXPORT_DIR, help="Export output directory")
parser.add_argument("--calib-save", default=CALIB_SAVE_PATH, help="Calibration state save path")
parser.add_argument("--amax-snapshot", default=AMAX_SNAPSHOT_PATH, help="Amax snapshot path")
parser.add_argument("--calib-size", type=int, default=CALIB_SIZE, help="Calibration samples")
parser.add_argument("--calib-seq", type=int, default=CALIB_SEQ, help="Calibration sequence length")
args = parser.parse_args()
if args.validate_only:
run_validate(args.calib_save, args.amax_snapshot)
elif args.export_only:
if not os.path.exists(args.calib_save):
print(f"ERROR: No calibration state found at {args.calib_save}")
sys.exit(1)
run_export_only(args.calib_save, args.amax_snapshot, args.model, args.export_dir)
else:
run_calibration(args.model, args.export_dir, args.calib_save,
args.amax_snapshot, args.calib_size, args.calib_seq)
if __name__ == "__main__":
main()