diff --git a/tests/unit/test_kv_merge_debug.py b/tests/unit/test_kv_merge_debug.py new file mode 100644 index 00000000..71f977d0 --- /dev/null +++ b/tests/unit/test_kv_merge_debug.py @@ -0,0 +1,49 @@ +"""Quick test: KV merge only.""" +import torch +import math +import cutlass.cute as cute +import cutlass.torch as ct +import cuda.bindings.driver as cuda +from dsv4.kernels.attention.fmha import FmhaKernel + +def reference_attention_with_lse(q, k, v, scale): + scores = torch.matmul(q.float(), k.float().T) * scale + max_s = scores.max(dim=-1, keepdim=True).values + exp_s = (scores - max_s).exp() + sum_s = exp_s.sum(dim=-1, keepdim=True) + p = exp_s / sum_s + o = torch.matmul(p, v.float()) + lse = (scores - max_s).exp().sum(dim=-1).log() + max_s.squeeze(-1) + return o.to(torch.bfloat16), lse + +torch.manual_seed(42) +m, s_k, hd = 128, 256, 64 +scale = 1.0 / math.sqrt(hd) + +q = torch.randn(m, hd, 1, dtype=torch.bfloat16, device='cuda') +k = torch.randn(s_k, hd, 1, dtype=torch.bfloat16, device='cuda') +v = torch.randn(s_k, hd, dtype=torch.bfloat16, device='cuda') + +ref_o, _ = reference_attention_with_lse(q[:, :, 0], k[:, :, 0], v, scale) + +# Single-segment kernel test first +kernel = FmhaKernel(head_dim=hd, s_k=128, normalize=False) +pv_n_tile = kernel.pv_n_tile +stream = cuda.CUstream(torch.cuda.current_stream().cuda_stream) + +v_tile = v[:, 0:pv_n_tile].contiguous().unsqueeze(-1) +c_tile = torch.zeros(m, pv_n_tile, 1, dtype=torch.bfloat16, device='cuda') +lse_tensor = torch.zeros(m, 1, 1, dtype=torch.float32, device='cuda') + +k_seg = k[:128, :, :].contiguous() +mQ = ct.from_dlpack(q).mark_layout_dynamic(leading_dim=ct.get_leading_dim(q)) +mK = ct.from_dlpack(k_seg).mark_layout_dynamic(leading_dim=ct.get_leading_dim(k_seg)) +mV = ct.from_dlpack(v_tile).mark_layout_dynamic(leading_dim=ct.get_leading_dim(v_tile)) +mC = ct.from_dlpack(c_tile).mark_layout_dynamic(leading_dim=ct.get_leading_dim(c_tile)) +mLSE = ct.from_dlpack(lse_tensor).mark_layout_dynamic(leading_dim=ct.get_leading_dim(lse_tensor)) + +print(f"q shape: {q.shape}, k_seg shape: {k_seg.shape}, v_tile shape: {v_tile.shape}") +compiled = cute.compile(kernel, mQ, mK, mV, mC, stream, mLSE) +print("Compile succeeded!") +compiled(mQ, mK, mV, mC, stream, mLSE) +print("Run succeeded!")