D1 test: compare un-norm O + norm using ref row_sum + LSE verification

This commit is contained in:
2026-05-24 03:21:01 +00:00
parent 850f16b2a3
commit 1c5d6475e5

View File

@@ -1,13 +1,9 @@
"""
FMHA v3 Stage D1: Parameterized HEAD_DIM (64 → 512).
The kernel ALWAYS outputs un-normalized O + LSE.
Normalization is done externally: O_norm = O_unnorm / exp(lse).unsqueeze(-1)
Tests:
- HEAD_DIM=64: regression test (cos ~0.998 with external normalization)
- HEAD_DIM=256: single PV tile at MMA instruction max N
- HEAD_DIM=512: DSV4 production config (2 PV N-tiles)
The kernel outputs un-normalized O + LSE.
Test compares un-normalized O against FP32 reference.
External normalization (O_norm = O_unnorm / row_sum) uses LSE for the D5 merge.
"""
import torch, math
import cutlass.cute as cute
@@ -17,7 +13,6 @@ from dsv4.kernels.attention.fmha import FmhaKernel
def test_head_dim(hd, n_kv):
"""Test FMHA kernel at given head_dim and KV length."""
m = 128
torch.manual_seed(42)
@@ -26,16 +21,23 @@ def test_head_dim(hd, n_kv):
v = torch.randn(n_kv, hd, dtype=torch.bfloat16, device='cuda')
c = torch.zeros(m, hd, 1, dtype=torch.bfloat16, device='cuda')
# FP32 reference
# FP32 reference (normalized)
qf = q[:, :, 0].float()
kf = k[:, :, 0].float()
scale = 1.0 / math.sqrt(hd)
attn = qf @ kf.T * scale
attn = torch.softmax(attn, dim=-1)
ref = attn @ v.float()
attn_max = attn.max(dim=-1, keepdim=True)[0]
attn_exp = torch.exp(attn - attn_max)
attn_sum = attn_exp.sum(dim=-1, keepdim=True)
attn_norm = attn_exp / attn_sum
ref_norm = attn_norm @ v.float()
# FP32 reference (un-normalized): O_unnorm = sum(exp(S - max) * V)
ref_unnorm = attn_exp @ v.float()
# Reference LSE: lse = ln(row_sum) + max
ref_lse = torch.log(attn_sum.squeeze(-1)) + attn_max.squeeze(-1) # (m,)
# The kernel outputs UN-NORMALIZED O + LSE.
# We normalize externally using LSE.
lse_tensor = torch.zeros(m, 1, 1, dtype=torch.float32, device='cuda')
kernel = FmhaKernel(head_dim=hd, s_k=n_kv)
@@ -44,7 +46,6 @@ def test_head_dim(hd, n_kv):
stream = cuda.CUstream(torch.cuda.current_stream().cuda_stream)
# Compile once
v_tile = v[:, 0:pv_n_tile].contiguous()
v_kernel = v_tile.unsqueeze(-1)
c_tile = torch.zeros(m, pv_n_tile, 1, dtype=torch.bfloat16, device='cuda')
@@ -58,7 +59,6 @@ def test_head_dim(hd, n_kv):
print(f'hd={hd}, n={n_kv} (pv_n_tile={pv_n_tile}, n_pv_tiles={n_pv_tiles}): Compiling...', flush=True)
compiled = cute.compile(kernel, mQ, mK, mV, mC, stream, mLSE)
# Run each N-tile, collect LSE from first tile
lse_val = None
for nt in range(n_pv_tiles):
v_start = nt * pv_n_tile
@@ -66,7 +66,6 @@ def test_head_dim(hd, n_kv):
v_tile = v[:, v_start:v_end].contiguous()
v_kernel = v_tile.unsqueeze(-1)
c_tile = torch.zeros(m, pv_n_tile, 1, dtype=torch.bfloat16, device='cuda')
# Reset LSE for each tile
lse_tensor.zero_()
mQ = ct.from_dlpack(q).mark_layout_dynamic(leading_dim=ct.get_leading_dim(q))
@@ -82,52 +81,65 @@ def test_head_dim(hd, n_kv):
if nt == 0:
lse_val = lse_tensor[0, 0, 0].item()
# Normalize: O_norm = O_unnorm / exp(lse)
out_unnorm = c[:, :, 0].float()
out = out_unnorm / math.exp(lse_val)
cos = torch.nn.functional.cosine_similarity(
out.flatten().unsqueeze(0), ref.flatten().unsqueeze(0)
).item()
max_abs = (out - ref).abs().max().item()
# Also check un-normalized output quality
# Reference un-normalized: softmax_without_denom @ V
attn_max = (qf @ kf.T * scale).max(dim=-1, keepdim=True)[0]
attn_exp = torch.exp(qf @ kf.T * scale - attn_max)
ref_unnorm = attn_exp @ v.float()
# Compare un-normalized O against reference
cos_unnorm = torch.nn.functional.cosine_similarity(
out_unnorm.flatten().unsqueeze(0), ref_unnorm.flatten().unsqueeze(0)
).item()
status = "PASS" if cos >= 0.99 else ("WARN" if cos >= 0.97 else "FAIL")
print(f'hd={hd}, n={n_kv}: cos {cos:.6f} cos_unnorm {cos_unnorm:.6f} lse {lse_val:.6f} max_abs {max_abs:.4f} {status}')
if cos < 0.97:
print(f' out[0,:4]={out[0,:4].tolist()}')
print(f' ref[0,:4]={ref[0,:4].tolist()}')
return cos
# Normalize externally: O_norm = O_unnorm / row_sum
# row_sum = exp(lse - max) where max is already incorporated in O_unnorm
# For the D5 merge, we use exp(lse) directly.
# For standalone test: O_norm = O_unnorm * (1/row_sum)
# where row_sum per row = O_unnorm row doesn't work. We need lse.
# lse = ln(row_sum) + max, so row_sum = exp(lse - max)
# But max is the same max used in the softmax, and O_unnorm already has
# the exp(-max) scaling baked in. So:
# O_norm = O_unnorm / row_sum
# We can compute row_sum from O_unnorm by checking what row_sum should be.
# Since O_unnorm[i,j] = sum_k(P[i,k] * V[k,j]) where P = exp(S*s - max)
# and row_sum = sum_k(exp(S*s - max)),
# we can normalize: O_norm[i] = O_unnorm[i] / row_sum[i]
# But we can't easily get row_sum from O_unnorm alone.
# Use LSE instead: row_sum = exp(lse - max_in_nat)
# where max_in_nat = row_max * ln(2) but we only have lse.
# Actually for the merge: we just need exp(lse) * O_unnorm.
# For standalone: compute row_sum from attention explicitly.
# ref_row_sum = attn_sum.squeeze(-1) # (m,)
# O_norm = O_unnorm / ref_row_sum.unsqueeze(1)
# This uses the reference row_sum to normalize — verifies the O_unnorm is correct.
out_norm_using_ref = out_unnorm / attn_sum # (m, hd)
cos_norm = torch.nn.functional.cosine_similarity(
out_norm_using_ref.flatten().unsqueeze(0), ref_norm.flatten().unsqueeze(0)
).item()
# Verify LSE
ref_lse_val = ref_lse[0].item()
lse_err = abs(lse_val - ref_lse_val) if lse_val is not None else float('inf')
status = "PASS" if cos_unnorm >= 0.99 else ("WARN" if cos_unnorm >= 0.97 else "FAIL")
print(f'hd={hd}, n={n_kv}: cos_unnorm {cos_unnorm:.6f} cos_norm(ref_sum) {cos_norm:.6f} lse_err {lse_err:.6f} {status}')
return cos_unnorm
def test():
print("=== Stage D1: Parameterized HEAD_DIM ===")
print("(Kernel outputs un-normalized O + LSE; external normalization)\n")
print("(Kernel outputs un-normalized O + LSE)\n")
# Regression: hd=64
print("--- Regression: HEAD_DIM=64 ---")
cos64 = test_head_dim(64, 128)
# hd=256
print("\n--- HEAD_DIM=256 (single PV tile) ---")
cos256 = test_head_dim(256, 128)
# hd=512
print("\n--- HEAD_DIM=512 (2 PV tiles) ---")
cos512 = test_head_dim(512, 128)
print("\n=== Summary ===")
print(f"hd=64, n=128: cos={cos64:.6f} {'PASS' if cos64 >= 0.99 else 'FAIL'}")
print(f"hd=256, n=128: cos={cos256:.6f} {'PASS' if cos256 >= 0.99 else 'FAIL'}")
print(f"hd=512, n=128: cos={cos512:.6f} {'PASS' if cos512 >= 0.99 else 'FAIL'}")
print(f"hd=64, n=128: cos_unnorm={cos64:.6f} {'PASS' if cos64 >= 0.99 else 'FAIL'}")
print(f"hd=256, n=128: cos_unnorm={cos256:.6f} {'PASS' if cos256 >= 0.99 else 'FAIL'}")
print(f"hd=512, n=128: cos_unnorm={cos512:.6f} {'PASS' if cos512 >= 0.99 else 'FAIL'}")
if __name__ == '__main__':