1. grouped_linear.py: Remove conditional host read of GPU tensor - 'if group_offsets[0] != 0' reads GPU value on host → sync - Fix: unconditionally update offsets every call (GPU-only multiply) 2. test_cuda_graph_readiness.py: Use pinned CPU buffers for token transfer - dec_tid_buf[0] = python_int → CPU→GPU sync - Fix: write to pinned CPU buffer, then copy_ (async, graph-capturable) 3. Add dsv4/decode/cuda_graph_decoder.py (skeleton)
542 lines
24 KiB
Python
542 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""CUDA Graph Readiness Detector — Section A of GETTING_CUDAGRAPH_READY.md
|
|
|
|
Runs one decode step of single_shot_inference.py with:
|
|
1. torch.cuda.set_sync_debug_mode("error") — raises on any implicit device→host sync
|
|
2. torch.cuda.graph capture attempt — fails on .item(), sync, alloc, dynamic shape
|
|
|
|
This inventories EVERY existing sync in one pass so we get the full hunt-list upfront.
|
|
"""
|
|
import os, sys, time, json, math, traceback
|
|
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
|
|
import torch
|
|
import torch.nn.functional as F
|
|
|
|
# ==== CONFIG ====
|
|
CHECKPOINT_DIR = "/root/nvidia-meeting/DeepSeek-V4-Pro-NVFP4"
|
|
NUM_GPUS = 8
|
|
PROMPT = "The capital of France is"
|
|
MAX_CONTEXT = 8192
|
|
SEED = 42
|
|
|
|
# ==== Sync inventory ====
|
|
sync_violations = []
|
|
|
|
class SyncDetector:
|
|
"""Tracks all device→host sync violations found during forward."""
|
|
def __init__(self):
|
|
self.violations = []
|
|
self.phase = "unknown"
|
|
|
|
def record(self, category, location, detail):
|
|
self.violations.append({
|
|
"phase": self.phase,
|
|
"category": category,
|
|
"location": location,
|
|
"detail": detail,
|
|
})
|
|
print(f" [SYNC] {category}: {location} — {detail}", flush=True)
|
|
|
|
detector = SyncDetector()
|
|
|
|
# ==== Import single_shot components ====
|
|
# We need to import the functions/classes without running main()
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from single_shot_inference import (
|
|
load_all_weights, build_rope_cache, rmsnorm, unweighted_rmsnorm,
|
|
FP4_LUT, KVCache, Compressor, Indexer, HcHead,
|
|
make_nvfp4_linear, get_nvfp4_weight, dequant_nvfp4,
|
|
forward_layer, forward_attention, _run_production_fmha_mixed,
|
|
moe_forward, _apply_rope,
|
|
_load_moe_weights_stacked, _load_shared_expert_weights, _cache_layer_weights_no_experts,
|
|
)
|
|
from encoding.deepseek_v4_encoding import (
|
|
thinking_start_token, thinking_end_token,
|
|
USER_SP_TOKEN, ASSISTANT_SP_TOKEN,
|
|
)
|
|
|
|
|
|
def grep_sync_patterns(source_dir):
|
|
"""Grep the hot path for known sync patterns (Section B checklist)."""
|
|
import re
|
|
patterns = {
|
|
'item()': r'\.item\(\)',
|
|
'.cpu()': r'\.cpu\(\)',
|
|
'.tolist()': r'\.tolist\(\)',
|
|
'.numpy()': r'\.numpy\(\)',
|
|
'int(t)/float(t)': r'\bint\([^)]*\)|float\([^)]*\)', # rough
|
|
'cuda.synchronize()': r'torch\.cuda\.synchronize\(\)',
|
|
'isnan().any()': r'\.isnan\([^)]*\)\.any\(\)',
|
|
'isinf().any()': r'\.isinf\([^)]*\)\.any\(\)',
|
|
'if t:': r'if\s+\w+\.item\(\)',
|
|
'nonzero': r'\.nonzero\(\)',
|
|
'masked_select': r'\.masked_select\(',
|
|
'torch.where(one-arg)': r'torch\.where\([^,]+\)',
|
|
}
|
|
import glob
|
|
hot_files = [
|
|
'single_shot_inference.py',
|
|
'dsv4/layers/mhc.py',
|
|
'dsv4/layers/router.py',
|
|
'dsv4/layers/moe.py',
|
|
'dsv4/layers/shared_expert.py',
|
|
'dsv4/layers/linear.py',
|
|
'dsv4/layers/grouped_linear.py',
|
|
'dsv4/ops/quantize.py',
|
|
'dsv4/kernels/attention/production.py',
|
|
'dsv4/kernels/compressor/production_compress.py',
|
|
]
|
|
print("\n=== SECTION B: Grep Results (hot path sync patterns) ===", flush=True)
|
|
for fname in hot_files:
|
|
fpath = os.path.join(source_dir, fname)
|
|
if not os.path.exists(fpath):
|
|
continue
|
|
with open(fpath) as f:
|
|
lines = f.readlines()
|
|
for i, line in enumerate(lines, 1):
|
|
stripped = line.strip()
|
|
if stripped.startswith('#') or stripped.startswith('"""') or stripped.startswith("'''"):
|
|
continue
|
|
for pname, pat in patterns.items():
|
|
if re.search(pat, stripped):
|
|
# Skip comments
|
|
if '#' in stripped and stripped.index('#') < re.search(pat, stripped).start():
|
|
continue
|
|
print(f" [{pname}] {fname}:{i}: {stripped[:120]}", flush=True)
|
|
|
|
|
|
def run_sync_debug_mode():
|
|
"""Method 1: Run forward with sync debug mode to catch implicit syncs."""
|
|
print("\n=== METHOD 1: torch.cuda.set_sync_debug_mode('error') ===", flush=True)
|
|
|
|
# Build model components (same as single_shot main, but abbreviated)
|
|
with open(os.path.join(CHECKPOINT_DIR, "config.json")) as f:
|
|
cfg = json.load(f)
|
|
n_layers = cfg["num_hidden_layers"]
|
|
H = cfg["hidden_size"]
|
|
hd = cfg["head_dim"]
|
|
n_h = cfg["num_attention_heads"]
|
|
rd = cfg.get("qk_rope_head_dim", 64)
|
|
cr = cfg.get("compress_ratios", [128] * n_layers)
|
|
|
|
print(f"Model: {n_layers} layers, {n_h} heads, hd={hd}", flush=True)
|
|
|
|
# Load weights
|
|
print("Loading weights...", flush=True)
|
|
all_w = load_all_weights(CHECKPOINT_DIR)
|
|
|
|
# Build components
|
|
from dsv4.layers.mhc import mHCLayer
|
|
from dsv4.layers.router import Router
|
|
from dsv4.layers.moe import Nvfp4MoE
|
|
from dsv4.layers.shared_expert import Nvfp4SharedExpert
|
|
from dsv4.layers.grouped_linear import Nvfp4GroupedLinear
|
|
|
|
for g in range(NUM_GPUS):
|
|
torch.cuda.set_device(g)
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.set_device(0)
|
|
|
|
# Build mHC + norms
|
|
attn_mhcs, ffn_mhcs, attn_norms, ffn_norms = {}, {}, {}, {}
|
|
for li in range(n_layers):
|
|
dev = f"cuda:{li % NUM_GPUS}"
|
|
for tag, blocks, fn_s, base_s, scale_s in [
|
|
("attn", attn_mhcs, f"model.layers.{li}.attn_hc.fn", f"model.layers.{li}.attn_hc.base", f"model.layers.{li}.attn_hc.scale"),
|
|
("ffn", ffn_mhcs, f"model.layers.{li}.ffn_hc.fn", f"model.layers.{li}.ffn_hc.base", f"model.layers.{li}.ffn_hc.scale"),
|
|
]:
|
|
fn, base, scale = all_w.get(fn_s), all_w.get(base_s), all_w.get(scale_s)
|
|
if fn is not None and base is not None and scale is not None:
|
|
m = mHCLayer(hidden_dim=H, n_hc=4, t_max_sinkhorn=20, device=dev)
|
|
n = 4
|
|
m.load_weights(
|
|
W_pre=fn[0:n].to(dev, torch.float32), W_post=fn[n:2*n].to(dev, torch.float32),
|
|
W_comb=fn[2*n:].to(dev, torch.float32),
|
|
S_pre=base[0:n].reshape(1, n).to(dev, torch.float32),
|
|
S_post=base[n:2*n].reshape(n, 1).to(dev, torch.float32),
|
|
S_comb=base[2*n:].reshape(n, n).to(dev, torch.float32),
|
|
alpha_pre=scale[0].item(), alpha_post=scale[1].item(), alpha_comb=scale[2].item(),
|
|
)
|
|
blocks[li] = m
|
|
an_k = f"model.layers.{li}.input_layernorm.weight"
|
|
if an_k in all_w: attn_norms[li] = all_w[an_k].to(dev, torch.float32)
|
|
fn_k = f"model.layers.{li}.post_attention_layernorm.weight"
|
|
if fn_k in all_w: ffn_norms[li] = all_w[fn_k].to(dev, torch.float32)
|
|
|
|
# Build attention projections
|
|
prod_lins = {}
|
|
for li in range(n_layers):
|
|
dev = f"cuda:{li % NUM_GPUS}"
|
|
pfx = f"model.layers.{li}.self_attn"
|
|
torch.cuda.set_device(li % NUM_GPUS)
|
|
pl = {}
|
|
pl['q_a'] = make_nvfp4_linear(7168, 1536, dev, all_w, pfx, 'q_a_proj')
|
|
pl['q_b'] = make_nvfp4_linear(1536, 65536, dev, all_w, pfx, 'q_b_proj')
|
|
pl['kv'] = make_nvfp4_linear(7168, 512, dev, all_w, pfx, 'kv_proj')
|
|
n_local_groups = cfg.get('o_groups', 16)
|
|
heads_per_group = n_h // n_local_groups
|
|
o_rank_val = cfg.get('o_lora_rank', 1024)
|
|
wo_a = Nvfp4GroupedLinear(
|
|
n_local_groups=n_local_groups,
|
|
heads_per_group=heads_per_group,
|
|
head_dim=hd,
|
|
o_lora_rank=o_rank_val,
|
|
max_num_tokens=8192,
|
|
device=dev,
|
|
)
|
|
oa_w_nvfp4, oa_ws, oa_ws2, oa_isc = get_nvfp4_weight(all_w, pfx, 'o_a_proj')
|
|
if oa_w_nvfp4 is not None and oa_ws is not None:
|
|
wo_a.load_nvfp4_weight(oa_w_nvfp4.to(dev), oa_ws.to(dev),
|
|
oa_ws2.to(dev) if oa_ws2 is not None else None,
|
|
oa_isc.to(dev) if oa_isc is not None else None)
|
|
else:
|
|
oa_bf = all_w.get(f"{pfx}.o_a_proj.weight")
|
|
if oa_bf is not None:
|
|
wo_a.set_bf16_weight(oa_bf.bfloat16().to(dev))
|
|
pl['o_a'] = wo_a
|
|
wo_a._use_runtime_gsa = True
|
|
pl['o_b'] = make_nvfp4_linear(16384, 7168, dev, all_w, pfx, 'o_b_proj')
|
|
prod_lins[li] = pl
|
|
if (li+1) % 10 == 0:
|
|
print(f" {li+1}/{n_layers} attn projections", flush=True)
|
|
|
|
# Routers, MoE, shared experts
|
|
routers, moe_runners, se_runners = {}, {}, {}
|
|
for li in range(n_layers):
|
|
dev = f"cuda:{li % NUM_GPUS}"
|
|
pfx = f"model.layers.{li}.mlp"
|
|
torch.cuda.set_device(li % NUM_GPUS)
|
|
torch.cuda.synchronize()
|
|
is_hash = (li < cfg.get("num_hash_layers", 3)) and (f"{pfx}.gate.tid2eid" in all_w)
|
|
router = Router(hidden_size=H, num_experts=cfg["n_routed_experts"],
|
|
top_k=cfg.get("num_experts_per_tok", 6),
|
|
routed_scaling_factor=cfg.get("routed_scaling_factor", 2.5),
|
|
mode="hash" if is_hash else "dense",
|
|
vocab_size=cfg.get("vocab_size", 128000) if is_hash else None, device=dev)
|
|
if is_hash:
|
|
router.load_weights(hash_lut=all_w[f"{pfx}.gate.tid2eid"].to(dev, torch.int32))
|
|
else:
|
|
eb = all_w.get(f"{pfx}.gate.e_score_correction_bias")
|
|
gate_w, gate_ws, gate_ws2, gate_isc = get_nvfp4_weight(all_w, pfx, 'gate')
|
|
if gate_w is not None and gate_ws is not None:
|
|
gate_bf16 = dequant_nvfp4(gate_w.to(dev), gate_ws.to(dev), gate_ws2, gate_isc)
|
|
router.W_gate = gate_bf16.T.contiguous().to(dev)
|
|
else:
|
|
gw = all_w.get(f"{pfx}.gate.weight")
|
|
gate_bf16 = gw.bfloat16().to(dev)
|
|
if gate_bf16.shape[0] != H:
|
|
gate_bf16 = gate_bf16.T.contiguous()
|
|
router.W_gate = gate_bf16.contiguous()
|
|
router.gate_lin = None
|
|
router.load_weights(e_bias=eb.to(dev, torch.float32))
|
|
router.finalize_weights()
|
|
routers[li] = router
|
|
|
|
moe = Nvfp4MoE(num_experts=cfg["n_routed_experts"], hidden_size=H,
|
|
intermediate_size=cfg.get("moe_intermediate_size", 3072),
|
|
top_k=cfg.get("num_experts_per_tok", 6), device=dev)
|
|
moe.set_swiglu_limit(cfg.get("swiglu_limit", 10.0))
|
|
moe.set_fused_swiglu(True)
|
|
_load_moe_weights_stacked(all_w, li, pfx, dev, moe, cfg)
|
|
moe._ensure_stacked()
|
|
moe._use_runtime_gsa = True
|
|
moe_runners[li] = moe
|
|
|
|
se = Nvfp4SharedExpert(hidden_size=H, intermediate_size=cfg.get("moe_intermediate_size", 3072),
|
|
device=dev, swiglu_limit=cfg.get("swiglu_limit", 10.0))
|
|
se.set_fused_swiglu(True)
|
|
_load_shared_expert_weights(all_w, li, pfx, dev, se, cfg)
|
|
se._ensure_initialized()
|
|
if se._fused_swiglu:
|
|
from dsv4.ops.gemm_runner import warmup_fused_swiglu_compilation
|
|
K_packed = H // 2
|
|
N_packed_l1 = (2 * cfg.get("moe_intermediate_size", 3072)) // 2
|
|
warmup_fused_swiglu_compilation(1, K_packed, N_packed_l1, dev,
|
|
swiglu_limit=cfg.get("swiglu_limit", 10.0))
|
|
se._use_runtime_gsa = True
|
|
se_runners[li] = se
|
|
if (li+1) % 10 == 0:
|
|
print(f" {li+1}/{n_layers} MoE layers", flush=True)
|
|
torch.cuda.empty_cache()
|
|
|
|
# Global weights
|
|
torch.cuda.set_device(0)
|
|
embed_w = all_w.get("model.embed_tokens.weight")
|
|
embed = torch.nn.Embedding.from_pretrained(embed_w.bfloat16().to('cuda:0'))
|
|
lm_w = all_w.get("lm_head.weight", embed_w).bfloat16().to('cuda:0')
|
|
final_norm_w = all_w.get("model.norm.weight")
|
|
if final_norm_w is not None:
|
|
final_norm_w = final_norm_w.to('cuda:0', torch.float32)
|
|
|
|
hc_head = HcHead(H, 4, 'cuda:0')
|
|
hc_fn = all_w.get("model.hc_head.hc_fn")
|
|
hc_base = all_w.get("model.hc_head.hc_base")
|
|
hc_scale = all_w.get("model.hc_head.hc_scale")
|
|
if hc_fn is not None and hc_base is not None:
|
|
hc_head.load(hc_fn, hc_base, hc_scale)
|
|
|
|
# RoPE
|
|
rp = cfg.get("rope_scaling", cfg.get("rope_parameters", {}))
|
|
rt = rp.get("type", rp.get("rope_type", "yarn"))
|
|
rf = rp.get("factor", 16.0)
|
|
rtheta = cfg.get("rope_theta", 10000.)
|
|
romax = rp.get("original_max_position_embeddings", 65536)
|
|
rbfast, rbslow = rp.get("beta_fast", 32), rp.get("beta_slow", 1)
|
|
rope_caches = {g: build_rope_cache(romax, rd, f"cuda:{g}", rtheta, rt, rf, romax, rbfast, rbslow) for g in range(NUM_GPUS)}
|
|
comp_rtheta = cfg.get("compress_rope_theta", rtheta)
|
|
if comp_rtheta != rtheta:
|
|
comp_rope_caches = {g: build_rope_cache(romax, rd, f"cuda:{g}", comp_rtheta, rt, rf, romax, rbfast, rbslow) for g in range(NUM_GPUS)}
|
|
else:
|
|
comp_rope_caches = rope_caches
|
|
|
|
# KV caches, compressors, indexers
|
|
kv_caches, compressors, indexers = {}, {}, {}
|
|
n_ih = cfg.get("index_n_heads", 64)
|
|
ihd = cfg.get("index_head_dim", 128)
|
|
itk = cfg.get("index_topk", 1024)
|
|
for li in range(n_layers):
|
|
dev = f"cuda:{li % NUM_GPUS}"
|
|
ratio = cr[li] if li < len(cr) else 128
|
|
max_comp = (MAX_CONTEXT + ratio - 1) // ratio if ratio > 0 else 0
|
|
kv_caches[li] = KVCache(hd, cfg.get("sliding_window", 128), max_comp=max_comp, device=dev,
|
|
indexer_key_dim=ihd, compress_ratio=ratio, indexer_top_k=itk, rope_dim=rd)
|
|
if ratio > 0: compressors[li] = Compressor(ratio, hd, H, dev)
|
|
if ratio == 4: indexers[li] = Indexer(n_ih, ihd, itk, dev)
|
|
|
|
# Cache layer weights
|
|
devs = [f"cuda:{g}" for g in range(NUM_GPUS)]
|
|
layer_w = _cache_layer_weights_no_experts(all_w, n_layers, devs)
|
|
|
|
# Load compressor/indexer weights
|
|
for li in range(n_layers):
|
|
pfx = f"model.layers.{li}.self_attn.compressor"
|
|
if li in compressors: compressors[li].load(layer_w[li], pfx, dev=f"cuda:{li % NUM_GPUS}")
|
|
if li in indexers: indexers[li].load(layer_w[li], f"{pfx}.indexer", dev=f"cuda:{li % NUM_GPUS}")
|
|
|
|
del all_w
|
|
import gc; gc.collect()
|
|
for g in range(NUM_GPUS):
|
|
torch.cuda.set_device(g)
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.set_device(0)
|
|
|
|
print("\nAll components built. Running prefill...", flush=True)
|
|
|
|
# ---- Prefill (run normally, not under sync debug) ----
|
|
from transformers import AutoTokenizer
|
|
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_DIR)
|
|
from encoding.deepseek_v4_encoding import encode_messages
|
|
messages = [{"role": "user", "content": PROMPT}]
|
|
encoded_str = encode_messages(messages, thinking_mode='thinking')
|
|
generated = tokenizer.encode(encoded_str, add_special_tokens=False)
|
|
bos = tokenizer.bos_token_id or 0
|
|
if generated[0] != bos:
|
|
generated = [bos] + generated
|
|
|
|
PREFILL_CHUNK = 128
|
|
n_prefill = len(generated)
|
|
prefill_ids = torch.tensor(generated, dtype=torch.long, device='cuda:0')
|
|
prefill_ids32 = prefill_ids.to(torch.int32)
|
|
all_positions = torch.arange(n_prefill, dtype=torch.long, device='cuda:0')
|
|
|
|
chunk_starts = list(range(0, n_prefill, PREFILL_CHUNK))
|
|
for ci, cs in enumerate(chunk_starts):
|
|
ce = min(cs + PREFILL_CHUNK, n_prefill)
|
|
chunk_ids = prefill_ids[cs:ce]
|
|
chunk_ids32 = prefill_ids32[cs:ce]
|
|
chunk_positions = all_positions[cs:ce]
|
|
chunk_embed = embed(chunk_ids)
|
|
X = mHCLayer.init_state(chunk_embed)
|
|
|
|
for li in range(n_layers):
|
|
gpu = li % NUM_GPUS
|
|
if X.device != torch.device(f"cuda:{gpu}"):
|
|
X = X.to(f"cuda:{gpu}")
|
|
torch.cuda.set_device(gpu)
|
|
X = forward_layer(X, layer_w[li], li, cfg, *rope_caches[gpu],
|
|
attn_mhcs.get(li), ffn_mhcs.get(li),
|
|
attn_norms.get(li), ffn_norms.get(li),
|
|
kv_caches[li], chunk_positions, chunk_ids32,
|
|
compressors.get(li), indexers.get(li),
|
|
moe_runners.get(li), se_runners.get(li), routers.get(li),
|
|
prod_lin=prod_lins.get(li),
|
|
comp_rope_cos=comp_rope_caches[gpu][0],
|
|
comp_rope_sin=comp_rope_caches[gpu][1],
|
|
)
|
|
X = X.to('cuda:0')
|
|
print(f" Prefill chunk {ci+1}/{len(chunk_starts)}", flush=True)
|
|
|
|
print("Prefill complete. Starting sync detection...", flush=True)
|
|
|
|
# ---- NOW: Run one decode step under sync debug mode ----
|
|
all_tokens = generated.copy()
|
|
dec_tid_buf = torch.zeros(1, dtype=torch.long, device='cuda:0')
|
|
dec_pos_buf = torch.zeros(1, dtype=torch.long, device='cuda:0')
|
|
dec_tid32_buf = torch.zeros(1, dtype=torch.int32, device='cuda:0')
|
|
# Pinned CPU buffers for graph-capturable token/position transfer
|
|
dec_tid_pinned = torch.zeros(1, dtype=torch.long, device='cpu').pin_memory()
|
|
dec_tid32_pinned = torch.zeros(1, dtype=torch.int32, device='cpu').pin_memory()
|
|
dec_pos_pinned = torch.zeros(1, dtype=torch.long, device='cpu').pin_memory()
|
|
|
|
def write_token_to_gpu(token_id, position):
|
|
"""Write token/position to GPU buffers via pinned CPU (no CPU→GPU sync)."""
|
|
dec_tid_pinned[0] = token_id
|
|
dec_tid_buf.copy_(dec_tid_pinned)
|
|
dec_tid32_pinned[0] = token_id
|
|
dec_tid32_buf.copy_(dec_tid32_pinned)
|
|
dec_pos_pinned[0] = position
|
|
dec_pos_buf.copy_(dec_pos_pinned)
|
|
|
|
# Warmup step first (so CuTeDSL kernels are compiled)
|
|
print(" Warmup decode step (compiling CuTeDSL kernels)...", flush=True)
|
|
write_token_to_gpu(all_tokens[-1], len(all_tokens) - 1)
|
|
X = mHCLayer.init_state(embed(dec_tid_buf))
|
|
for li in range(n_layers):
|
|
gpu = li % NUM_GPUS
|
|
if X.device != torch.device(f"cuda:{gpu}"):
|
|
X = X.to(f"cuda:{gpu}")
|
|
torch.cuda.set_device(gpu)
|
|
X = forward_layer(X, layer_w[li], li, cfg, *rope_caches[gpu],
|
|
attn_mhcs.get(li), ffn_mhcs.get(li),
|
|
attn_norms.get(li), ffn_norms.get(li),
|
|
kv_caches[li], dec_pos_buf, dec_tid32_buf,
|
|
compressors.get(li), indexers.get(li),
|
|
moe_runners.get(li), se_runners.get(li), routers.get(li),
|
|
prod_lin=prod_lins.get(li),
|
|
comp_rope_cos=comp_rope_caches[gpu][0],
|
|
comp_rope_sin=comp_rope_caches[gpu][1],
|
|
)
|
|
X = X.to('cuda:0')
|
|
torch.cuda.set_device(0)
|
|
torch.cuda.synchronize()
|
|
print(" Warmup done.", flush=True)
|
|
|
|
# ==== METHOD 1: sync debug mode ====
|
|
print("\n [METHOD 1] Enabling sync debug mode...", flush=True)
|
|
torch.cuda.set_sync_debug_mode("error")
|
|
|
|
sync_errors = []
|
|
try:
|
|
detector.phase = "decode_forward"
|
|
write_token_to_gpu(all_tokens[-1], len(all_tokens) - 1)
|
|
|
|
X = mHCLayer.init_state(embed(dec_tid_buf))
|
|
for li in range(n_layers):
|
|
gpu = li % NUM_GPUS
|
|
if X.device != torch.device(f"cuda:{gpu}"):
|
|
X = X.to(f"cuda:{gpu}")
|
|
torch.cuda.set_device(gpu)
|
|
X = forward_layer(X, layer_w[li], li, cfg, *rope_caches[gpu],
|
|
attn_mhcs.get(li), ffn_mhcs.get(li),
|
|
attn_norms.get(li), ffn_norms.get(li),
|
|
kv_caches[li], dec_pos_buf, dec_tid32_buf,
|
|
compressors.get(li), indexers.get(li),
|
|
moe_runners.get(li), se_runners.get(li), routers.get(li),
|
|
prod_lin=prod_lins.get(li),
|
|
comp_rope_cos=comp_rope_caches[gpu][0],
|
|
comp_rope_sin=comp_rope_caches[gpu][1],
|
|
)
|
|
X = X.to('cuda:0')
|
|
torch.cuda.set_device(0)
|
|
|
|
# hc_head + norm + lm_head
|
|
x_out = hc_head.forward(X) if hc_head is not None else X[:, 0, :]
|
|
if final_norm_w is not None:
|
|
x_out = rmsnorm(x_out, final_norm_w)
|
|
logits = torch.nn.functional.linear(x_out, lm_w)
|
|
|
|
# Sampling (argmax — this WILL sync, but it's outside the graph)
|
|
# We test the FORWARD only, not the sampling loop
|
|
print(" Forward completed under sync debug mode!", flush=True)
|
|
except RuntimeError as e:
|
|
err_str = str(e)
|
|
sync_errors.append(err_str)
|
|
print(f"\n [SYNC VIOLATION CAUGHT] {err_str[:300]}", flush=True)
|
|
traceback.print_exc()
|
|
finally:
|
|
torch.cuda.set_sync_debug_mode("default")
|
|
|
|
if not sync_errors:
|
|
print(" METHOD 1: No sync violations in forward (or they're hidden behind conditional branches)", flush=True)
|
|
else:
|
|
print(f" METHOD 1: {len(sync_errors)} sync violation(s) found", flush=True)
|
|
|
|
# ==== METHOD 2: CUDA graph capture attempt ====
|
|
print("\n [METHOD 2] Attempting CUDA graph capture of decode forward...", flush=True)
|
|
|
|
# Pre-allocate static I/O buffers
|
|
static_x_in = torch.zeros(1, 4, H, dtype=torch.bfloat16, device='cuda:0')
|
|
static_logits = torch.zeros(1, cfg.get("vocab_size", 129280), dtype=torch.bfloat16, device='cuda:0')
|
|
static_token = torch.zeros(1, dtype=torch.long, device='cuda:0')
|
|
static_token32 = torch.zeros(1, dtype=torch.int32, device='cuda:0')
|
|
static_pos = torch.zeros(1, dtype=torch.long, device='cuda:0')
|
|
|
|
# Try to capture a single layer first (layer 0 on cuda:0)
|
|
print(" Attempting capture of L0 (cuda:0)...", flush=True)
|
|
li = 0
|
|
gpu = 0
|
|
capture_errors = []
|
|
|
|
try:
|
|
g = torch.cuda.CUDAGraph()
|
|
torch.cuda.set_device(0)
|
|
|
|
# Fill static buffers with current decode state (via pinned CPU — no sync)
|
|
dec_tid_pinned[0] = all_tokens[-1]
|
|
static_token.copy_(dec_tid_pinned)
|
|
dec_tid32_pinned[0] = all_tokens[-1]
|
|
static_token32.copy_(dec_tid32_pinned)
|
|
dec_pos_pinned[0] = len(all_tokens) - 1
|
|
static_pos.copy_(dec_pos_pinned)
|
|
|
|
with torch.cuda.graph(g):
|
|
X = mHCLayer.init_state(embed(static_token))
|
|
X = forward_layer(X, layer_w[li], li, cfg, *rope_caches[gpu],
|
|
attn_mhcs.get(li), ffn_mhcs.get(li),
|
|
attn_norms.get(li), ffn_norms.get(li),
|
|
kv_caches[li], static_pos, static_token32,
|
|
compressors.get(li), indexers.get(li),
|
|
moe_runners.get(li), se_runners.get(li), routers.get(li),
|
|
prod_lin=prod_lins.get(li),
|
|
comp_rope_cos=comp_rope_caches[gpu][0],
|
|
comp_rope_sin=comp_rope_caches[gpu][1],
|
|
)
|
|
static_x_in.copy_(X.to('cuda:0'))
|
|
|
|
print(" L0 CAPTURED SUCCESSFULLY!", flush=True)
|
|
except Exception as e:
|
|
err_str = str(e)
|
|
capture_errors.append(err_str)
|
|
print(f"\n [CAPTURE FAILURE] L0: {err_str[:500]}", flush=True)
|
|
traceback.print_exc()
|
|
|
|
# ==== Summary ====
|
|
print("\n" + "=" * 70, flush=True)
|
|
print("SYNC INVENTORY SUMMARY", flush=True)
|
|
print("=" * 70, flush=True)
|
|
print(f" Method 1 (sync debug): {len(sync_errors)} violations", flush=True)
|
|
print(f" Method 2 (graph capture L0): {'PASS' if not capture_errors else 'FAIL'}", flush=True)
|
|
print(f" Grep patterns: see above", flush=True)
|
|
print("=" * 70, flush=True)
|
|
|
|
# Save results
|
|
results = {
|
|
"sync_debug_violations": sync_errors,
|
|
"graph_capture_errors": capture_errors,
|
|
"grep_results": "see stdout",
|
|
}
|
|
with open("/tmp/cuda_graph_readiness_results.json", "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"Results saved to /tmp/cuda_graph_readiness_results.json", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
source_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# First: grep for sync patterns
|
|
grep_sync_patterns(source_dir)
|
|
|
|
# Then: run the forward under sync debug + capture attempt
|
|
run_sync_debug_mode()
|