138 lines
5.5 KiB
Python
138 lines
5.5 KiB
Python
"""
|
|
SMEM-P Coordinate Verification Test.
|
|
|
|
Writes a known pattern to sP using the coordinate-indexed approach
|
|
(identical to FmhaKernel's SMEM-P path), then reads sP back
|
|
and verifies on the host.
|
|
|
|
This test uses the FmhaKernel class to set up all layouts (MMA, SMEM, TMEM)
|
|
inside the JIT context, then writes a test pattern and reads it back.
|
|
"""
|
|
import torch, math
|
|
import cutlass, cutlass.cute as cute
|
|
import cutlass.utils as utils
|
|
from cutlass.cute.nvgpu import tcgen05
|
|
from cutlass import Float32, BFloat16, Int32, const_expr
|
|
import cutlass.torch as ct
|
|
import cuda.bindings.driver as cuda
|
|
from dsv4.kernels.attention.fmha import FmhaKernel
|
|
|
|
|
|
def test_smem_p_coords():
|
|
head_dim = 256
|
|
s_k = 128
|
|
m = 128
|
|
pv_n_tile = min(head_dim, 256)
|
|
|
|
# Use FmhaKernel to do the actual test
|
|
# We modify the kernel to write a test pattern instead of P values
|
|
kernel = FmhaKernel(head_dim=head_dim, s_k=s_k, use_smem_p=True, normalize=False)
|
|
|
|
q = torch.randn(m, head_dim, 1, dtype=torch.bfloat16, device='cuda')
|
|
k = torch.randn(s_k, head_dim, 1, dtype=torch.bfloat16, device='cuda')
|
|
v = torch.randn(s_k, head_dim, dtype=torch.bfloat16, device='cuda')
|
|
c = torch.zeros(m, pv_n_tile, 1, dtype=torch.bfloat16, device='cuda')
|
|
lse = torch.zeros(m, 1, 1, dtype=torch.float32, device='cuda')
|
|
|
|
stream = cuda.CUstream(torch.cuda.current_stream().cuda_stream)
|
|
|
|
v_tile = v[:, 0:pv_n_tile].contiguous().unsqueeze(-1)
|
|
mQ = ct.from_dlpack(q).mark_layout_dynamic(leading_dim=ct.get_leading_dim(q))
|
|
mK = ct.from_dlpack(k).mark_layout_dynamic(leading_dim=ct.get_leading_dim(k))
|
|
mV = ct.from_dlpack(v_tile).mark_layout_dynamic(leading_dim=ct.get_leading_dim(v_tile))
|
|
mC = ct.from_dlpack(c).mark_layout_dynamic(leading_dim=ct.get_leading_dim(c))
|
|
mLSE = ct.from_dlpack(lse).mark_layout_dynamic(leading_dim=ct.get_leading_dim(lse))
|
|
|
|
print("Compiling FmhaKernel (hd=256, SMEM-P, normalize=False)...")
|
|
try:
|
|
compiled = cute.compile(kernel, mQ, mK, mV, mC, stream, mLSE)
|
|
except Exception as e:
|
|
print(f"COMPILE FAILED: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return
|
|
print("Running...")
|
|
try:
|
|
compiled(mQ, mK, mV, mC, stream, mLSE)
|
|
except Exception as e:
|
|
print(f"RUN FAILED: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return
|
|
torch.cuda.synchronize()
|
|
|
|
# The kernel writes P to sP using the coordinate-indexed approach
|
|
# then reads it back via PV MMA. The output should be close to
|
|
# the reference attention output.
|
|
out = c[:, :, 0].float()
|
|
|
|
# FP32 reference
|
|
qf = q[:, :, 0].float()
|
|
kf = k[:, :, 0].float()
|
|
scale = 1.0 / math.sqrt(head_dim)
|
|
attn = qf @ kf.T * scale
|
|
attn = torch.softmax(attn, dim=-1)
|
|
ref = attn @ v[:, 0:pv_n_tile].float()
|
|
|
|
cos = torch.nn.functional.cosine_similarity(
|
|
out.flatten().unsqueeze(0), ref.flatten().unsqueeze(0)
|
|
).item()
|
|
print(f"hd=256, n=128: cos {cos:.6f} {'PASS' if cos >= 0.97 else 'FAIL'}")
|
|
|
|
if cos < 0.97:
|
|
# Print first few output vs reference values
|
|
print(f" out[0,:4]={out[0,:4].tolist()}")
|
|
print(f" ref[0,:4]={ref[0,:4].tolist()}")
|
|
print(f" out[1,:4]={out[1,:4].tolist()}")
|
|
print(f" ref[1,:4]={ref[1,:4].tolist()}")
|
|
|
|
# Check if output is zero (sP not written) or non-zero but wrong
|
|
out_norm = out.norm().item()
|
|
ref_norm = ref.norm().item()
|
|
print(f" out norm: {out_norm:.4f}, ref norm: {ref_norm:.4f}")
|
|
|
|
# Check if output is proportional to ref (scaling issue)
|
|
if out_norm > 0 and ref_norm > 0:
|
|
scale_ratio = out_norm / ref_norm
|
|
scaled_out = out / scale_ratio
|
|
scaled_cos = torch.nn.functional.cosine_similarity(
|
|
scaled_out.flatten().unsqueeze(0), ref.flatten().unsqueeze(0)
|
|
).item()
|
|
print(f" Scaled cos (out/scale_ratio): {scaled_cos:.6f}")
|
|
|
|
# Also test hd=64 TMEM-P as regression
|
|
print("\n--- Regression: hd=64 TMEM-P ---")
|
|
kernel64 = FmhaKernel(head_dim=64, s_k=s_k, use_smem_p=False, normalize=False)
|
|
q64 = torch.randn(m, 64, 1, dtype=torch.bfloat16, device='cuda')
|
|
k64 = torch.randn(s_k, 64, 1, dtype=torch.bfloat16, device='cuda')
|
|
v64 = torch.randn(s_k, 64, dtype=torch.bfloat16, device='cuda')
|
|
c64 = torch.zeros(m, 64, 1, dtype=torch.bfloat16, device='cuda')
|
|
lse64 = torch.zeros(m, 1, 1, dtype=torch.float32, device='cuda')
|
|
|
|
mQ64 = ct.from_dlpack(q64).mark_layout_dynamic(leading_dim=ct.get_leading_dim(q64))
|
|
mK64 = ct.from_dlpack(k64).mark_layout_dynamic(leading_dim=ct.get_leading_dim(k64))
|
|
v64_tile = v64.unsqueeze(-1)
|
|
mV64 = ct.from_dlpack(v64_tile).mark_layout_dynamic(leading_dim=ct.get_leading_dim(v64_tile))
|
|
mC64 = ct.from_dlpack(c64).mark_layout_dynamic(leading_dim=ct.get_leading_dim(c64))
|
|
mLSE64 = ct.from_dlpack(lse64).mark_layout_dynamic(leading_dim=ct.get_leading_dim(lse64))
|
|
|
|
compiled64 = cute.compile(kernel64, mQ64, mK64, mV64, mC64, stream, mLSE64)
|
|
compiled64(mQ64, mK64, mV64, mC64, stream, mLSE64)
|
|
torch.cuda.synchronize()
|
|
|
|
out64 = c64[:, :, 0].float()
|
|
qf64 = q64[:, :, 0].float()
|
|
kf64 = k64[:, :, 0].float()
|
|
scale64 = 1.0 / math.sqrt(64)
|
|
attn64 = qf64 @ kf64.T * scale64
|
|
attn64 = torch.softmax(attn64, dim=-1)
|
|
ref64 = attn64 @ v64.float()
|
|
cos64 = torch.nn.functional.cosine_similarity(
|
|
out64.flatten().unsqueeze(0), ref64.flatten().unsqueeze(0)
|
|
).item()
|
|
print(f"hd=64, n=128: cos {cos64:.6f} {'PASS' if cos64 >= 0.97 else 'FAIL'}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_smem_p_coords()
|