Deep Learning Study Guide
Neural networks with PyTorch β from tensors to production-ready models.
10 Topics • PyTorch • CNNs / LSTMs / Transfer LearningPyTorch tensors are the foundation β multi-dimensional arrays with GPU support and autograd.
Creating & Manipulating Tensors
# pip install torch torchvision
import torch
# Create tensors
t1 = torch.tensor([1.0, 2.0, 3.0])
t2 = torch.zeros(3, 4)
t3 = torch.ones(2, 3)
t4 = torch.rand(3, 3)
t5 = torch.arange(0, 10, step=2, dtype=torch.float32)
print('t1:', t1)
print('t2 shape:', t2.shape)
print('t4:', t4)
# Operations
a = torch.tensor([[1., 2.], [3., 4.]])
b = torch.tensor([[5., 6.], [7., 8.]])
print('Add:', a + b)
print('Matmul:', a @ b)
print('Mean:', a.mean())
# Reshape
x = torch.arange(12, dtype=torch.float32)
print('Reshape:', x.reshape(3, 4))Tensor Indexing, Device & NumPy Bridge
import torch
import numpy as np
t = torch.rand(4, 5)
# Indexing (same as NumPy)
print('Row 0:', t[0])
print('Col 1:', t[:, 1])
print('Slice:', t[1:3, 2:4])
# Boolean mask
print('> 0.5:', t[t > 0.5][:5])
# NumPy <-> Tensor (shared memory on CPU)
arr = np.array([1.0, 2.0, 3.0])
tensor_from_np = torch.from_numpy(arr)
np_from_tensor = t.numpy() # only works on CPU tensors
print('From numpy:', tensor_from_np)
# Device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using device:', device)
t_device = t.to(device)
print('Tensor device:', t_device.device)Broadcasting, einsum & GPU Fallback
import torch
torch.manual_seed(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)
# Broadcasting: (3,1) + (1,4) -> (3,4)
a = torch.randn(3, 1, device=device)
b = torch.randn(1, 4, device=device)
c = a + b
print('Broadcast shape:', c.shape) # (3, 4)
# torch.einsum: batch matrix multiply
# 'bij,bjk->bik' = batched matmul
X = torch.randn(8, 3, 4, device=device) # batch of 8, (3x4)
Y = torch.randn(8, 4, 5, device=device) # batch of 8, (4x5)
Z = torch.einsum('bij,bjk->bik', X, Y)
print('einsum batch matmul:', Z.shape) # (8, 3, 5)
# L2 norm per row
M = torch.randn(3, 4, device=device)
row_norms = torch.norm(M, dim=1)
print('Row norms:', row_norms)
# Move to CPU and convert to numpy
M_np = M.cpu().numpy()
print('As numpy shape:', M_np.shape)Sparse Tensors & Boolean Mask Indexing
import torch
torch.manual_seed(0)
# --- Sparse Tensors ---
# Define a sparse COO tensor: indices (2, nnz) and values (nnz,)
indices = torch.tensor([[0, 1, 2], [1, 0, 3]]) # row, col positions
values = torch.tensor([3.0, 5.0, 7.0])
sparse = torch.sparse_coo_tensor(indices, values, size=(4, 5))
print('Sparse tensor:')
print(sparse.to_dense())
print('nnz (non-zero):', sparse._nnz())
# --- Boolean Mask Indexing ---
x = torch.randn(4, 5)
print('\nOriginal tensor:\n', x.round(decimals=2))
# Select elements where value > 0
mask = x > 0
print('Positive values:', x[mask][:6].round(decimals=3))
# Zero-out negatives in-place
x_clipped = x.clone()
x_clipped[x_clipped < 0] = 0.0
print('ReLU via mask (min):', x_clipped.min().item())
# Row-wise mask: keep rows where row mean > 0
row_mask = x.mean(dim=1) > 0
print('Rows with positive mean:', x[row_mask].shape)import torch
import numpy as np
# Simulate batch of 8 RGB images (224x224)
np.random.seed(42)
batch_np = np.random.randint(0, 256, (8, 3, 224, 224), dtype=np.uint8)
# Convert to float tensor and normalize to [0, 1]
batch = torch.from_numpy(batch_np).float() / 255.0
# ImageNet normalization: mean & std per channel
mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1)
normalized = (batch - mean) / std
print('Batch shape:', normalized.shape) # [8, 3, 224, 224]
print('Channel means:', normalized.mean(dim=[0, 2, 3]).round(decimals=3))
print('Channel stds: ', normalized.std(dim=[0, 2, 3]).round(decimals=3))
print('dtype:', normalized.dtype)import torch
import numpy as np
torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using: {device}')
# TODO: Create tensor A (3x4) and B (4x5) with random values
# A = torch.randn(???)
# B = torch.randn(???)
# TODO: Matrix multiply A @ B
# C = ???
# print('C shape:', C.shape)
# TODO: Compute L2 norm of each row of A
# norms = torch.norm(A, dim=???)
# print('Row norms:', norms)
# TODO: Move A to device and back to CPU numpy
# A_dev = A.to(device)
# A_np = A_dev.cpu().numpy()
# print('Numpy shape:', A_np.shape)PyTorch's autograd engine automatically computes gradients for backpropagation.
requires_grad & backward()
import torch
# requires_grad=True tracks operations
x = torch.tensor(3.0, requires_grad=True)
y = x ** 2 + 2 * x + 1 # y = x^2 + 2x + 1
print('y:', y.item())
# Compute dy/dx
y.backward()
print('dy/dx at x=3:', x.grad.item()) # 2x + 2 = 8
# Computation graph example
a = torch.tensor(2.0, requires_grad=True)
b = torch.tensor(3.0, requires_grad=True)
c = a * b + b ** 2 # c = ab + b^2
c.backward()
print('dc/da:', a.grad.item()) # b = 3
print('dc/db:', b.grad.item()) # a + 2b = 8
# Stop tracking
with torch.no_grad():
z = a * b
print('z requires_grad:', z.requires_grad)Manual Gradient Descent
import torch
# Fit y = 2x + 1 using gradient descent
torch.manual_seed(42)
X = torch.linspace(0, 1, 100).unsqueeze(1)
y_true = 2 * X + 1 + 0.1 * torch.randn_like(X)
# Parameters to learn
w = torch.randn(1, requires_grad=True)
b = torch.randn(1, requires_grad=True)
lr = 0.1
for epoch in range(200):
y_pred = X * w + b
loss = ((y_pred - y_true) ** 2).mean()
loss.backward()
with torch.no_grad():
w -= lr * w.grad
b -= lr * b.grad
w.grad.zero_()
b.grad.zero_()
if (epoch + 1) % 50 == 0:
print(f'Epoch {epoch+1}: loss={loss.item():.4f} w={w.item():.3f} b={b.item():.3f}')Jacobian & Hessian with autograd.functional
import torch
from torch.autograd.functional import jacobian, hessian
# Scalar function: f(x) = x1^2 + x1*x2 + x2^2
def f(x):
return (x[0]**2 + x[0]*x[1] + x[1]**2).unsqueeze(0)
x0 = torch.tensor([1.0, 2.0])
# Jacobian: df/dx β shape (1, 2)
J = jacobian(f, x0)
print('Jacobian:', J) # [2x0+x1, x0+2x1] = [4, 5]
# Hessian: d^2f/dx^2 β shape (2, 2)
def f_scalar(x):
return x[0]**2 + x[0]*x[1] + x[1]**2
H = hessian(f_scalar, x0)
print('Hessian:')
print(H) # [[2, 1], [1, 2]]
# Verify: condition number of Hessian
eigvals = torch.linalg.eigvalsh(H)
print('Eigenvalues:', eigvals)
print('Condition number:', (eigvals.max() / eigvals.min()).item())torch.no_grad, Gradient Clipping & register_hook
import torch
import torch.nn as nn
torch.manual_seed(42)
# --- torch.no_grad: disable gradient tracking ---
x = torch.randn(4, 8, requires_grad=True)
with torch.no_grad():
y = x * 2 # no graph built
print('y requires_grad:', y.requires_grad) # False
# --- Gradient Clipping ---
model = nn.Linear(8, 4)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
# Simulate large gradients
out = model(x)
loss = (out * 1000).mean() # artificially huge loss
loss.backward()
before = max(p.grad.abs().max().item() for p in model.parameters())
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
after = max(p.grad.abs().max().item() for p in model.parameters())
print(f'Max grad before clip: {before:.2f} | after clip: {after:.4f}')
# --- register_hook: inspect gradients during backprop ---
grad_log = []
h = torch.randn(3, requires_grad=True)
h.register_hook(lambda g: grad_log.append(g.clone()))
loss2 = (h ** 2).sum()
loss2.backward()
print('Gradient via hook:', grad_log[0]) # should be 2*h
print('Gradient via .grad:', h.grad)import torch
torch.manual_seed(42)
n = 200
X = torch.randn(n, 3)
true_w = torch.tensor([1.5, -2.0, 0.8])
y_true = X @ true_w + 0.5 * torch.randn(n)
# Parameters
w = torch.zeros(3, requires_grad=True)
b = torch.zeros(1, requires_grad=True)
# Custom weighted MSE: penalize under-predictions 3x
def weighted_loss(pred, target):
residuals = pred - target
weights = torch.where(residuals < 0, torch.tensor(3.0), torch.tensor(1.0))
return (weights * residuals ** 2).mean()
optimizer = torch.optim.Adam([w, b], lr=0.05)
for epoch in range(300):
pred = X @ w + b
loss = weighted_loss(pred, y_true)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print('True weights:', true_w.tolist())
print('Learned: ', [round(v, 3) for v in w.tolist()])
print('Final loss:', loss.item().__round__(4))import torch
torch.manual_seed(42)
# TODO: Initialize x and y at (0, 0) with requires_grad=True
# x = torch.tensor(???, requires_grad=True)
# y = torch.tensor(???, requires_grad=True)
lr = 0.1
for step in range(50):
# TODO: Define f = (x-3)^2 + (y+2)^2
# f = ???
# TODO: Backpropagate
# f.backward()
# TODO: Update x and y manually (use torch.no_grad)
# with torch.no_grad():
# x -= lr * x.grad
# y -= lr * y.grad
# x.grad.zero_()
# y.grad.zero_()
if step % 10 == 0:
pass # TODO: print step, f.item(), x.item(), y.item()
# TODO: Verify convergence β x should be ~3, y should be ~-2
# print(f'Converged to x={x.item():.4f}, y={y.item():.4f}')Define models by subclassing nn.Module. Stack layers, define forward(), and let PyTorch handle the rest.
Fully Connected Network
import torch
import torch.nn as nn
class MLP(nn.Module):
def __init__(self, in_features, hidden, out_features):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_features, hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.ReLU(),
nn.Linear(hidden, out_features)
)
def forward(self, x):
return self.net(x)
model = MLP(in_features=10, hidden=64, out_features=1)
print(model)
# Count parameters
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Total params: {total:,} | Trainable: {trainable:,}')
# Test forward pass
x = torch.randn(32, 10) # batch of 32
out = model(x)
print('Output shape:', out.shape)Using nn.Sequential & Common Layers
import torch
import torch.nn as nn
# Quick model with Sequential
model = nn.Sequential(
nn.Linear(20, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 3) # 3-class output
)
x = torch.randn(16, 20)
logits = model(x)
print('Logits shape:', logits.shape)
# Apply softmax for probabilities
probs = torch.softmax(logits, dim=1)
print('Probs sum per sample:', probs.sum(dim=1)[:3].round(decimals=4))
# Loss functions
targets = torch.randint(0, 3, (16,))
ce_loss = nn.CrossEntropyLoss()(logits, targets)
print('CrossEntropy loss:', ce_loss.item().__round__(4))nn.ModuleList & ModuleDict for Dynamic Architectures
import torch
import torch.nn as nn
# nn.ModuleList: dynamic list of layers
class FlexMLP(nn.Module):
def __init__(self, layer_sizes):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(layer_sizes[i], layer_sizes[i+1])
for i in range(len(layer_sizes) - 1)
])
self.acts = nn.ModuleList([
nn.ReLU() for _ in range(len(layer_sizes) - 2)
])
def forward(self, x):
for i, layer in enumerate(self.layers[:-1]):
x = self.acts[i](layer(x))
return self.layers[-1](x)
model = FlexMLP([10, 64, 32, 1])
print(model)
# nn.ModuleDict: named modules
heads = nn.ModuleDict({
'classifier': nn.Linear(64, 5),
'regressor': nn.Linear(64, 1),
})
feat = torch.randn(8, 64)
print('Classifier out:', heads['classifier'](feat).shape)
print('Regressor out: ', heads['regressor'](feat).shape)
x = torch.randn(16, 10)
print('FlexMLP output:', model(x).shape)Parameter Sharing Between Layers
import torch
import torch.nn as nn
torch.manual_seed(42)
# --- Parameter Sharing: two layers share the SAME weight tensor ---
class TiedAutoEncoder(nn.Module):
'''Encoder and decoder share the same weight matrix (tied weights).'''
def __init__(self, in_dim, hid_dim):
super().__init__()
self.encoder = nn.Linear(in_dim, hid_dim, bias=True)
# Decoder reuses encoder weight (transposed), has its own bias
self.dec_bias = nn.Parameter(torch.zeros(in_dim))
def encode(self, x):
return torch.relu(self.encoder(x))
def decode(self, h):
# W^T h + bias (tied weights)
return h @ self.encoder.weight + self.dec_bias
def forward(self, x):
return self.decode(self.encode(x))
model = TiedAutoEncoder(in_dim=16, hid_dim=8)
# Verify encoder & decoder share the same weight object
enc_weight = model.encoder.weight
print('Encoder weight shape:', enc_weight.shape) # (8, 16)
x = torch.randn(4, 16)
recon = model(x)
print('Reconstruction shape:', recon.shape) # (4, 16)
# Count parameters (shared weight counted once)
total = sum(p.numel() for p in model.parameters())
print(f'Total params: {total}') # 8*16 + 8 (enc bias) + 16 (dec bias)
# Gradient flows through the shared weight from both encoder & decoder
loss = nn.MSELoss()(recon, x)
loss.backward()
print('Encoder weight grad shape:', model.encoder.weight.grad.shape)import torch
import torch.nn as nn
class CLVNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(15, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(256, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, 1) # predict LTV in dollars
)
def forward(self, x):
return self.net(x).squeeze(1)
model = CLVNet()
print(model)
# Simulate a batch
torch.manual_seed(42)
batch_features = torch.randn(64, 15) # 64 customers, 15 features
batch_ltv = torch.rand(64) * 5000 # true LTV labels
output = model(batch_features)
loss = nn.MSELoss()(output, batch_ltv)
print('Output shape:', output.shape)
print('Initial MSE loss:', loss.item().__round__(2))import torch
import torch.nn as nn
torch.manual_seed(42)
class ConfigMLP(nn.Module):
def __init__(self, layer_sizes, dropout=0.3):
super().__init__()
layers = []
for i in range(len(layer_sizes) - 1):
# TODO: Add Linear layer from layer_sizes[i] to layer_sizes[i+1]
# layers.append(nn.Linear(???, ???))
if i < len(layer_sizes) - 2: # hidden layers only
# TODO: Add BatchNorm1d for layer_sizes[i+1]
# layers.append(???)
# TODO: Add ReLU activation
# layers.append(???)
# TODO: Add Dropout
# layers.append(???)
# TODO: Wrap in nn.Sequential
# self.net = nn.Sequential(*layers)
def forward(self, x):
# TODO: return self.net(x)
pass
# TODO: Instantiate with layer_sizes=[128, 64, 32, 1]
# model = ConfigMLP([128, 64, 32, 1])
# print(model)
# TODO: Count total and trainable parameters
# total = sum(p.numel() for p in model.parameters())
# trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
# print(f'Total: {total:,} | Trainable: {trainable:,}')
# TODO: Test forward pass with input (16, 128)
# x = torch.randn(16, 128)
# out = model(x)
# print('Output shape:', out.shape)The complete training loop: forward pass, loss, backward, optimizer step, and validation.
Full Training Loop
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# Data
X, y = make_classification(n_samples=2000, n_features=10, random_state=42)
X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_tr)
X_val = scaler.transform(X_val)
to_tensor = lambda a, t: torch.tensor(a, dtype=t)
train_ds = TensorDataset(to_tensor(X_tr, torch.float32), to_tensor(y_tr, torch.long))
val_ds = TensorDataset(to_tensor(X_val, torch.float32), to_tensor(y_val, torch.long))
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=64)
model = nn.Sequential(nn.Linear(10,64), nn.ReLU(), nn.Linear(64,32), nn.ReLU(), nn.Linear(32,2))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(1, 6):
model.train()
train_loss = 0
for xb, yb in train_dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
train_loss += loss.item()
model.eval()
correct = 0
with torch.no_grad():
for xb, yb in val_dl:
preds = model(xb).argmax(dim=1)
correct += (preds == yb).sum().item()
acc = correct / len(val_ds)
print(f'Epoch {epoch}: train_loss={train_loss/len(train_dl):.4f} val_acc={acc:.4f}')Learning Rate Scheduler
import torch
import torch.nn as nn
model = nn.Sequential(nn.Linear(10, 64), nn.ReLU(), nn.Linear(64, 1))
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# ReduceLROnPlateau β halve lr when val loss plateaus
scheduler_plateau = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5
)
# CosineAnnealingLR
optimizer2 = torch.optim.SGD(model.parameters(), lr=0.1)
scheduler_cos = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer2, T_max=50, eta_min=1e-4
)
# Simulate training
for epoch in range(1, 11):
# Fake val loss
val_loss = 1.0 / (epoch + 1) + 0.01
scheduler_plateau.step(val_loss)
scheduler_cos.step()
lr1 = optimizer.param_groups[0]['lr']
lr2 = optimizer2.param_groups[0]['lr']
print(f'Epoch {epoch:2d}: plateau_lr={lr1:.6f} cosine_lr={lr2:.5f}')Gradient Clipping, CosineAnnealingLR & Early Stopping
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
X = torch.randn(500, 10)
y = torch.randint(0, 2, (500,))
dl = DataLoader(TensorDataset(X, y), batch_size=64, shuffle=True)
model = nn.Sequential(
nn.Linear(10, 64), nn.ReLU(),
nn.Linear(64, 32), nn.ReLU(),
nn.Linear(32, 2)
)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=20, eta_min=1e-5
)
criterion = nn.CrossEntropyLoss()
best_loss, patience, wait = float('inf'), 5, 0
for epoch in range(1, 21):
model.train()
epoch_loss = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
# Gradient clipping: prevent exploding gradients
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
epoch_loss += loss.item()
scheduler.step()
avg = epoch_loss / len(dl)
lr = optimizer.param_groups[0]['lr']
if avg < best_loss:
best_loss, wait = avg, 0
else:
wait += 1
if wait >= patience:
print(f'Early stop at epoch {epoch}')
break
if epoch <= 3 or epoch % 5 == 0:
print(f'Epoch {epoch:2d}: loss={avg:.4f} lr={lr:.6f} wait={wait}')Early Stopping & Linear LR Warmup Scheduler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
X = torch.randn(600, 10)
y = torch.randint(0, 2, (600,))
dl = DataLoader(TensorDataset(X, y), batch_size=64, shuffle=True)
model = nn.Sequential(
nn.Linear(10, 32), nn.ReLU(), nn.Linear(32, 2)
)
base_lr = 1e-3
warmup_epochs = 5
total_epochs = 25
optimizer = torch.optim.Adam(model.parameters(), lr=base_lr)
criterion = nn.CrossEntropyLoss()
# Linear warmup: lr ramps from 0 -> base_lr over warmup_epochs
def warmup_lambda(epoch):
if epoch < warmup_epochs:
return (epoch + 1) / warmup_epochs # ramp up
# Cosine decay after warmup
progress = (epoch - warmup_epochs) / max(total_epochs - warmup_epochs, 1)
return 0.5 * (1 + torch.cos(torch.tensor(3.14159 * progress)).item())
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=warmup_lambda)
# Early stopping state
best_loss, patience, wait = float('inf'), 6, 0
for epoch in range(1, total_epochs + 1):
model.train()
epoch_loss = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
scheduler.step()
avg = epoch_loss / len(dl)
lr = optimizer.param_groups[0]['lr']
# Early stopping check
if avg < best_loss - 1e-4:
best_loss, wait = avg, 0
else:
wait += 1
if wait >= patience:
print(f'Early stop at epoch {epoch}')
break
if epoch <= warmup_epochs or epoch % 5 == 0:
print(f'Epoch {epoch:2d}: loss={avg:.4f} lr={lr:.6f} wait={wait}')import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# Imbalanced: 5% fraud
X, y = make_classification(
n_samples=5000, weights=[0.95, 0.05],
n_features=12, n_informative=8, random_state=42
)
X_tr, X_val, y_tr, y_val = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)
X_tr = StandardScaler().fit_transform(X_tr)
X_t = torch.tensor(X_tr, dtype=torch.float32)
y_t = torch.tensor(y_tr, dtype=torch.long)
dl = DataLoader(TensorDataset(X_t, y_t), batch_size=128, shuffle=True)
# Weighted CrossEntropy: penalize fraud misses 19x more
class_weights = torch.tensor([1.0, 19.0])
criterion = nn.CrossEntropyLoss(weight=class_weights)
model = nn.Sequential(
nn.Linear(12, 64), nn.ReLU(), nn.Dropout(0.3),
nn.Linear(64, 32), nn.ReLU(),
nn.Linear(32, 2)
)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(1, 6):
model.train()
total_loss = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f'Epoch {epoch}: loss={total_loss/len(dl):.4f}')import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
torch.manual_seed(42)
# TODO: Generate data with make_classification(n_samples=1000, n_features=10, random_state=42)
# X, y = make_classification(???)
# TODO: Split 80/20, convert to tensors, create DataLoaders
# X_tr, X_val, y_tr, y_val = train_test_split(???)
# train_dl = DataLoader(TensorDataset(???), batch_size=32, shuffle=True)
# val_dl = DataLoader(TensorDataset(???), batch_size=32)
# TODO: Define a 2-layer MLP (10->64->32->2)
# model = nn.Sequential(???)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# criterion = nn.CrossEntropyLoss()
best_loss, patience, wait = float('inf'), 5, 0
for epoch in range(1, 51):
# TODO: Training phase β model.train(), loop over train_dl
# model.train()
# train_loss = 0
# for xb, yb in train_dl: ...
# TODO: Validation phase β model.eval(), loop over val_dl
# model.eval()
# val_loss = 0
# with torch.no_grad(): ...
# TODO: Early stopping check
# if val_loss < best_loss: best_loss, wait = val_loss, 0
# else: wait += 1
# if wait >= patience: print('Early stop'); break
pass # remove once implementedCNNs learn spatial features from images using convolutional filters, pooling, and fully connected heads.
Building a CNN
import torch
import torch.nn as nn
class SimpleCNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, padding=1), # grayscale in
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(2, 2), # 14x14
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2, 2) # 7x7
)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(64 * 7 * 7, 256),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(256, num_classes)
)
def forward(self, x):
return self.classifier(self.features(x))
model = SimpleCNN(num_classes=10)
x = torch.randn(8, 1, 28, 28) # batch of 8 MNIST-like images
print('Output:', model(x).shape)
total = sum(p.numel() for p in model.parameters())
print(f'Parameters: {total:,}')CNN Feature Map Visualization
import torch
import torch.nn as nn
# Simple edge-detection filters (like early CNN layers learn)
conv = nn.Conv2d(1, 4, kernel_size=3, padding=1, bias=False)
# Manual: set Sobel-like weights
with torch.no_grad():
# Horizontal edge filter
conv.weight[0, 0] = torch.tensor([[-1.,-2.,-1.],[0.,0.,0.],[1.,2.,1.]])
# Vertical edge filter
conv.weight[1, 0] = torch.tensor([[-1.,0.,1.],[-2.,0.,2.],[-1.,0.,1.]])
# Blur
conv.weight[2, 0] = torch.ones(3, 3) / 9
# Sharpen
conv.weight[3, 0] = torch.tensor([[0.,-1.,0.],[-1.,5.,-1.],[0.,-1.,0.]])
# Apply to a synthetic gradient image
img = torch.linspace(0, 1, 28*28).reshape(1, 1, 28, 28)
fmaps = conv(img)
print('Input:', img.shape)
print('Feature maps:', fmaps.shape) # 4 channels
for i, name in enumerate(['Horiz','Vert','Blur','Sharpen']):
print(f' {name}: min={fmaps[0,i].min():.3f} max={fmaps[0,i].max():.3f}')Residual Block (Mini ResNet Block)
import torch
import torch.nn as nn
class ResBlock(nn.Module):
'''Basic residual block: F(x) + x with optional projection.'''
def __init__(self, in_ch, out_ch, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_ch)
self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_ch)
self.relu = nn.ReLU(inplace=True)
# Skip connection: project if dims change
self.skip = nn.Sequential(
nn.Conv2d(in_ch, out_ch, 1, stride=stride, bias=False),
nn.BatchNorm2d(out_ch)
) if (in_ch != out_ch or stride != 1) else nn.Identity()
def forward(self, x):
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
return self.relu(out + self.skip(x)) # residual
# Stack residual blocks into a tiny CNN
model = nn.Sequential(
nn.Conv2d(1, 16, 3, padding=1), nn.BatchNorm2d(16), nn.ReLU(),
ResBlock(16, 16),
ResBlock(16, 32, stride=2), # downsample
ResBlock(32, 32),
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(32, 10)
)
x = torch.randn(4, 1, 28, 28)
print('Output:', model(x).shape)
print('Params:', sum(p.numel() for p in model.parameters()))Depthwise Separable Convolution & Global Average Pooling
import torch
import torch.nn as nn
torch.manual_seed(42)
# --- Depthwise Separable Convolution ---
# Standard Conv2d: C_in * C_out * K * K params
# DW-Sep: C_in * K * K (depthwise) + C_in * C_out (pointwise) β much fewer params
class DepthwiseSeparableConv(nn.Module):
def __init__(self, in_ch, out_ch, kernel_size=3, padding=1):
super().__init__()
# Depthwise: one filter per channel (groups=in_ch)
self.dw = nn.Conv2d(in_ch, in_ch, kernel_size,
padding=padding, groups=in_ch, bias=False)
# Pointwise: 1x1 conv to mix channels
self.pw = nn.Conv2d(in_ch, out_ch, 1, bias=False)
self.bn = nn.BatchNorm2d(out_ch)
self.relu = nn.ReLU()
def forward(self, x):
return self.relu(self.bn(self.pw(self.dw(x))))
# Compare parameter counts
standard = nn.Conv2d(32, 64, 3, padding=1, bias=False)
dw_sep = DepthwiseSeparableConv(32, 64)
std_params = sum(p.numel() for p in standard.parameters())
dws_params = sum(p.numel() for p in dw_sep.parameters())
print(f'Standard Conv params: {std_params:,}')
print(f'DW-Sep Conv params: {dws_params:,} ({dws_params/std_params:.1%} of standard)')
# --- Global Average Pooling vs Flatten ---
x = torch.randn(4, 64, 7, 7) # feature maps after conv layers
gap = nn.AdaptiveAvgPool2d(1) # collapses spatial dims to 1x1
out_gap = gap(x).squeeze(-1).squeeze(-1) # (4, 64)
out_flat = x.flatten(1) # (4, 64*7*7 = 3136)
print(f'GAP output shape: {out_gap.shape} (no spatial params!)')
print(f'Flatten output shape: {out_flat.shape}')
# Full MobileNet-style block: DWS + GAP + Linear
backbone = nn.Sequential(
DepthwiseSeparableConv(3, 32),
DepthwiseSeparableConv(32, 64),
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(64, 5)
)
x_img = torch.randn(8, 3, 28, 28)
print('Backbone output:', backbone(x_img).shape) # (8, 5)
print('Backbone params:', sum(p.numel() for p in backbone.parameters()))import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
# Simulate factory image dataset: 3-channel 64x64 images
n_train, n_val = 800, 200
X_train = torch.randn(n_train, 3, 64, 64)
y_train = torch.randint(0, 2, (n_train,))
X_val = torch.randn(n_val, 3, 64, 64)
y_val = torch.randint(0, 2, (n_val,))
train_dl = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
class DefectCNN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(32 * 16 * 16, 128), nn.ReLU(),
nn.Linear(128, 2)
)
def forward(self, x): return self.net(x)
model = DefectCNN()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(1, 4):
model.train()
total = 0
for xb, yb in train_dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
total += loss.item()
print(f'Epoch {epoch}: loss={total/len(train_dl):.4f}')import torch
import torch.nn as nn
torch.manual_seed(42)
# TODO: Define a CNN with:
# Conv2d(1, 16, 3, padding=1) -> ReLU -> MaxPool2d(2)
# Conv2d(16, 32, 3, padding=1) -> ReLU -> MaxPool2d(2)
# Flatten -> Linear(32*7*7, 64) -> ReLU -> Linear(64, 10)
class MnistCNN(nn.Module):
def __init__(self):
super().__init__()
# TODO: define self.features and self.classifier
pass
def forward(self, x):
# TODO: run through features and classifier
# print shape at each step
pass
# TODO: Create batch of 8 random (1x28x28) images
# x = torch.randn(8, 1, 28, 28)
# TODO: Instantiate and run forward pass
# model = MnistCNN()
# out = model(x)
# print('Final output shape:', out.shape)
# TODO: Compute cross-entropy loss and backpropagate
# labels = torch.randint(0, 10, (8,))
# loss = nn.CrossEntropyLoss()(out, labels)
# loss.backward()
# print('Loss:', loss.item())Reuse pretrained models (ResNet, ViT, BERT) β fine-tune on your small dataset for state-of-the-art results.
Fine-tuning ResNet18
import torch
import torch.nn as nn
import torchvision.models as models
# Load pretrained ResNet18
model = models.resnet18(weights='IMAGENET1K_V1')
# Freeze all layers
for param in model.parameters():
param.requires_grad = False
# Replace the final FC layer for our task (e.g., 5 classes)
num_features = model.fc.in_features
model.fc = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(256, 5) # 5 output classes
)
# Only the new head is trainable
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
print(f'Trainable: {trainable:,} / {total:,} ({trainable/total:.1%})')
# Test forward pass
x = torch.randn(4, 3, 224, 224)
out = model(x)
print('Output shape:', out.shape)Progressive Unfreezing
import torch
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights='IMAGENET1K_V1')
# Replace head for 3-class task
model.fc = nn.Linear(model.fc.in_features, 3)
# Phase 1: only train head
for param in model.parameters():
param.requires_grad = False
for param in model.fc.parameters():
param.requires_grad = True
print('Phase 1 trainable params:',
sum(p.numel() for p in model.parameters() if p.requires_grad))
# Phase 2: unfreeze layer4 + head
for param in model.layer4.parameters():
param.requires_grad = True
print('Phase 2 trainable params:',
sum(p.numel() for p in model.parameters() if p.requires_grad))
# Phase 3: unfreeze all
for param in model.parameters():
param.requires_grad = True
print('Phase 3 (all) trainable params:',
sum(p.numel() for p in model.parameters() if p.requires_grad))Feature Extraction vs Fine-Tuning Last N Layers
import torch
import torch.nn as nn
import torchvision.models as models
def count_params(m):
total = sum(p.numel() for p in m.parameters())
trainable = sum(p.numel() for p in m.parameters() if p.requires_grad)
return total, trainable
# --- Mode 1: Feature Extraction (freeze all, new head only) ---
model_fe = models.resnet18(weights='IMAGENET1K_V1')
for p in model_fe.parameters():
p.requires_grad = False
model_fe.fc = nn.Linear(model_fe.fc.in_features, 4)
t, tr = count_params(model_fe)
print(f'Feature Extraction: {tr:,}/{t:,} trainable ({tr/t:.2%})')
# --- Mode 2: Fine-tune last N=2 blocks (layer3, layer4, fc) ---
model_ft = models.resnet18(weights='IMAGENET1K_V1')
for p in model_ft.parameters():
p.requires_grad = False
for p in model_ft.layer3.parameters():
p.requires_grad = True
for p in model_ft.layer4.parameters():
p.requires_grad = True
model_ft.fc = nn.Linear(model_ft.fc.in_features, 4)
t, tr = count_params(model_ft)
print(f'Fine-tune last 2 blocks: {tr:,}/{t:,} trainable ({tr/t:.2%})')
# Both accept same input
x = torch.randn(2, 3, 224, 224)
print('FE output:', model_fe(x).shape)
print('FT output:', model_ft(x).shape)Freezing/Unfreezing Layers with Differential Learning Rates
import torch
import torch.nn as nn
import torchvision.models as models
torch.manual_seed(42)
# Load pretrained ResNet18
model = models.resnet18(weights='IMAGENET1K_V1')
model.fc = nn.Linear(model.fc.in_features, 5)
# --- Strategy: differential learning rates ---
# Early layers: very low lr (already well-trained)
# Later layers: medium lr
# New head: full lr
param_groups = [
{'params': list(model.layer1.parameters()) +
list(model.layer2.parameters()), 'lr': 1e-5},
{'params': list(model.layer3.parameters()) +
list(model.layer4.parameters()), 'lr': 1e-4},
{'params': model.fc.parameters(), 'lr': 1e-3},
]
optimizer = torch.optim.Adam(param_groups)
print('Param groups:')
for g in optimizer.param_groups:
n = sum(p.numel() for p in g['params'])
print(f' lr={g["lr"]} params={n:,}')
# --- Freeze then unfreeze on schedule ---
def set_requires_grad(module, value):
for p in module.parameters():
p.requires_grad = value
# Phase 1: freeze backbone, train only head
set_requires_grad(model, False)
set_requires_grad(model.fc, True)
phase1_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Phase 1 trainable: {phase1_trainable:,}')
# Phase 2: unfreeze layer4 for fine-tuning
set_requires_grad(model.layer4, True)
phase2_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Phase 2 trainable: {phase2_trainable:,}')
# Verify forward pass works in both phases
x = torch.randn(2, 3, 224, 224)
print('Output shape:', model(x).shape)import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
# Simulate small medical image dataset
n = 500
X = torch.randn(n, 3, 224, 224)
y = torch.randint(0, 2, (n,)) # 0=normal, 1=pneumonia
dl = DataLoader(TensorDataset(X, y), batch_size=16, shuffle=True)
# Load ResNet, freeze all, replace head
model = models.resnet18(weights='IMAGENET1K_V1')
for p in model.parameters():
p.requires_grad = False
model.fc = nn.Sequential(
nn.Linear(model.fc.in_features, 64),
nn.ReLU(),
nn.Linear(64, 2)
)
optimizer = torch.optim.Adam(model.fc.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
model.train()
for epoch in range(1, 4):
total = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
total += loss.item()
print(f'Epoch {epoch}: loss={total/len(dl):.4f}')import torch
import torch.nn as nn
import torchvision.models as models
torch.manual_seed(42)
# TODO: Load ResNet18 with pretrained weights
# model = models.resnet18(weights='IMAGENET1K_V1')
# TODO: Freeze ALL parameters
# for p in model.parameters():
# p.requires_grad = False
# TODO: Replace model.fc with a new Linear layer for 5 classes
# model.fc = nn.Linear(???, 5)
# TODO: Count trainable vs frozen parameters
# trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
# frozen = sum(p.numel() for p in model.parameters() if not p.requires_grad)
# print(f'Trainable: {trainable:,} | Frozen: {frozen:,}')
# TODO: Run one forward pass with 4 random (3x224x224) images
# x = torch.randn(4, 3, 224, 224)
# out = model(x)
# print('Output shape:', out.shape) # should be (4, 5)Build custom Dataset classes to load any data efficiently with batching, shuffling, and augmentation.
Custom Dataset Class
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
class TabularDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.y)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# Create dataset
np.random.seed(42)
X = np.random.randn(1000, 8)
y = (X[:, 0] + X[:, 1] > 0).astype(int)
dataset = TabularDataset(X, y)
print(f'Dataset size: {len(dataset)}')
print(f'Sample: {dataset[0]}')
# DataLoader: batching + shuffling
loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0)
xb, yb = next(iter(loader))
print(f'Batch X: {xb.shape}, y: {yb.shape}')Data Augmentation with Transforms
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
# Standard ImageNet transforms
train_transforms = T.Compose([
T.ToPILImage(),
T.Resize((224, 224)),
T.RandomHorizontalFlip(p=0.5),
T.RandomRotation(15),
T.ColorJitter(brightness=0.2, contrast=0.2),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
val_transforms = T.Compose([
T.ToPILImage(),
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Apply to a random image
import numpy as np
img_np = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)
tensor_img = train_transforms(img_np)
print('Augmented shape:', tensor_img.shape)
print('Value range: [{:.3f}, {:.3f}]'.format(tensor_img.min().item(), tensor_img.max().item()))Iterable Dataset & collate_fn for Variable-Length Sequences
import torch
from torch.utils.data import IterableDataset, DataLoader
import numpy as np
# IterableDataset: useful for streaming / large on-disk data
class StreamDataset(IterableDataset):
def __init__(self, n_samples, n_features, seed=0):
self.n = n_samples
self.d = n_features
self.seed = seed
def __iter__(self):
rng = np.random.default_rng(self.seed)
for _ in range(self.n):
x = rng.standard_normal(self.d).astype(np.float32)
y = int(x[0] > 0)
yield torch.tensor(x), torch.tensor(y)
stream_ds = StreamDataset(200, 8)
dl = DataLoader(stream_ds, batch_size=16)
xb, yb = next(iter(dl))
print('Streamed batch:', xb.shape, yb.shape)
# collate_fn: handle variable-length sequences (pad to max)
def pad_collate(batch):
seqs, labels = zip(*batch)
lengths = [s.size(0) for s in seqs]
padded = torch.zeros(len(seqs), max(lengths))
for i, s in enumerate(seqs):
padded[i, :lengths[i]] = s
return padded, torch.tensor(labels), torch.tensor(lengths)
var_data = [(torch.randn(np.random.randint(3, 10)), torch.tensor(i % 2))
for i in range(8)]
padded, lbls, lens = pad_collate(var_data)
print('Padded shape:', padded.shape)
print('Lengths:', lens.tolist())Weighted Random Sampler for Imbalanced Data
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import numpy as np
# Simulate imbalanced dataset: 90% class 0, 10% class 1
np.random.seed(42)
n = 1000
X = torch.randn(n, 8)
y = torch.tensor(np.random.choice([0, 1], n, p=[0.9, 0.1]), dtype=torch.long)
print(f'Class distribution: 0={( y==0).sum()}, 1={(y==1).sum()}')
class TabularDataset(Dataset):
def __init__(self, X, y): self.X, self.y = X, y
def __len__(self): return len(self.y)
def __getitem__(self, i): return self.X[i], self.y[i]
dataset = TabularDataset(X, y)
# WeightedRandomSampler: oversample minority class
class_counts = torch.bincount(y)
class_weights = 1.0 / class_counts.float()
sample_weights = class_weights[y]
sampler = WeightedRandomSampler(sample_weights, num_samples=500, replacement=True)
loader = DataLoader(dataset, batch_size=32, sampler=sampler)
# Verify balance in sampled batches
all_labels = []
for _, labels in loader:
all_labels.extend(labels.tolist())
sampled_0 = all_labels.count(0)
sampled_1 = all_labels.count(1)
print(f'Sampled class distribution: 0={sampled_0}, 1={sampled_1}')
print(f'Balance ratio: {sampled_1/max(sampled_0,1):.2f}')import torch
from torch.utils.data import Dataset, DataLoader
class SentimentDataset(Dataset):
def __init__(self, texts, labels, vocab, max_len=20):
self.texts = texts
self.labels = labels
self.vocab = vocab
self.max_len = max_len
def tokenize(self, text):
tokens = [self.vocab.get(w, 1) for w in text.lower().split()]
tokens = tokens[:self.max_len]
tokens += [0] * (self.max_len - len(tokens)) # pad
return torch.tensor(tokens, dtype=torch.long)
def __len__(self): return len(self.labels)
def __getitem__(self, i): return self.tokenize(self.texts[i]), torch.tensor(self.labels[i])
texts = ['great product love it', 'terrible waste of money',
'amazing highly recommend', 'broken arrived damaged',
'best purchase ever', 'awful customer service']
labels = [1, 0, 1, 0, 1, 0]
words = set(w for t in texts for w in t.split())
vocab = {w: i+2 for i, w in enumerate(words)} # 0=pad, 1=unk
ds = SentimentDataset(texts, labels, vocab)
dl = DataLoader(ds, batch_size=2, shuffle=True)
for xb, yb in dl:
print('Token batch:', xb.shape, '| Labels:', yb)
breakimport torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
torch.manual_seed(42)
np.random.seed(42)
# Generate synthetic data
X = np.random.randn(500, 10).astype(np.float32)
y = (X[:, 0] + X[:, 1] > 0).astype(np.int64)
# TODO: Define normalization transform
# mean, std = X.mean(axis=0), X.std(axis=0)
# def normalize(x): return (x - mean) / (std + 1e-8)
class TabularDataset(Dataset):
def __init__(self, X, y, transform=None):
# TODO: store X and y as tensors, store transform
pass
def __len__(self):
# TODO: return number of samples
pass
def __getitem__(self, idx):
# TODO: return (optionally transformed) X[idx], y[idx]
pass
# TODO: 80/20 split
# n_train = int(0.8 * len(X))
# train_ds = TabularDataset(X[:n_train], y[:n_train], transform=normalize)
# val_ds = TabularDataset(X[n_train:], y[n_train:], transform=normalize)
# TODO: Create DataLoaders
# train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
# val_dl = DataLoader(val_ds, batch_size=32)
# TODO: Print one batch shapes
# xb, yb = next(iter(train_dl))
# print('X batch:', xb.shape, '| y batch:', yb.shape)Prevent overfitting with Dropout, Batch Normalization, Weight Decay, and Early Stopping.
Dropout & Batch Normalization
import torch
import torch.nn as nn
class RegularizedNet(nn.Module):
def __init__(self, dropout_rate=0.3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(20, 256),
nn.BatchNorm1d(256), # normalize activations
nn.ReLU(),
nn.Dropout(dropout_rate), # randomly zero activations
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(128, 1)
)
def forward(self, x):
return self.net(x)
model = RegularizedNet(dropout_rate=0.4)
x = torch.randn(16, 20)
model.train() # Dropout ACTIVE
out_train = model(x)
model.eval() # Dropout INACTIVE
with torch.no_grad():
out_eval = model(x)
print('Train std:', out_train.std().item().__round__(4))
print('Eval std:', out_eval.std().item().__round__(4))Weight Decay & Early Stopping
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
X = torch.randn(500, 10)
y = (X[:, 0] > 0).float()
dl = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)
model = nn.Sequential(nn.Linear(10,64), nn.ReLU(), nn.Linear(64,1))
# weight_decay = L2 regularization
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
criterion = nn.BCEWithLogitsLoss()
# Early stopping
best_loss, patience, wait = float('inf'), 5, 0
for epoch in range(1, 30):
total = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb).squeeze(), yb)
loss.backward()
optimizer.step()
total += loss.item()
avg = total / len(dl)
if avg < best_loss:
best_loss, wait = avg, 0
else:
wait += 1
if wait >= patience:
print(f'Early stop at epoch {epoch}')
break
if epoch <= 3 or epoch % 5 == 0:
print(f'Epoch {epoch:2d}: loss={avg:.4f} (wait={wait})')Mixup Augmentation & Label Smoothing from Scratch
import torch
import torch.nn as nn
import numpy as np
torch.manual_seed(42)
# --- Mixup: blend two samples and their labels ---
def mixup_batch(x, y, alpha=0.4, num_classes=3):
lam = np.random.beta(alpha, alpha)
idx = torch.randperm(x.size(0))
x_mix = lam * x + (1 - lam) * x[idx]
# One-hot then mix
y_oh = torch.zeros(x.size(0), num_classes).scatter_(1, y.unsqueeze(1), 1)
y_mix = lam * y_oh + (1 - lam) * y_oh[idx]
return x_mix, y_mix
# --- Label Smoothing: soften hard targets ---
class LabelSmoothingLoss(nn.Module):
def __init__(self, num_classes, smoothing=0.1):
super().__init__()
self.eps = smoothing
self.K = num_classes
def forward(self, logits, targets):
log_probs = torch.log_softmax(logits, dim=1)
# Smooth targets: (1-eps)*one_hot + eps/K
with torch.no_grad():
smooth = torch.zeros_like(log_probs).fill_(self.eps / self.K)
smooth.scatter_(1, targets.unsqueeze(1), 1 - self.eps + self.eps / self.K)
return -(smooth * log_probs).sum(dim=1).mean()
x = torch.randn(16, 10)
y = torch.randint(0, 3, (16,))
x_mix, y_mix = mixup_batch(x, y, num_classes=3)
print('Mixup x shape:', x_mix.shape)
print('Mixup y (soft):', y_mix[:2].round(decimals=3))
model = nn.Linear(10, 3)
ls_loss = LabelSmoothingLoss(3, smoothing=0.1)
ce_loss = nn.CrossEntropyLoss()
logits = model(x)
print('Label Smoothing loss:', ls_loss(logits, y).item().__round__(4))
print('Plain CE loss: ', ce_loss(logits, y).item().__round__(4))Focal Loss, Label Smoothing & Weighted Cross-Entropy
import torch
import torch.nn as nn
import torch.nn.functional as F
# Focal Loss β down-weights easy examples, focuses on hard ones
class FocalLoss(nn.Module):
def __init__(self, alpha=1.0, gamma=2.0):
super().__init__()
self.alpha, self.gamma = alpha, gamma
def forward(self, logits, targets):
ce = F.cross_entropy(logits, targets, reduction='none')
pt = torch.exp(-ce)
loss = self.alpha * (1 - pt) ** self.gamma * ce
return loss.mean()
# Label smoothing β prevents overconfident predictions
class LabelSmoothingLoss(nn.Module):
def __init__(self, num_classes, smoothing=0.1):
super().__init__()
self.smoothing = smoothing
self.cls = num_classes
def forward(self, logits, targets):
log_probs = F.log_softmax(logits, dim=-1)
smooth = self.smoothing / (self.cls - 1)
one_hot = torch.full_like(log_probs, smooth)
one_hot.scatter_(1, targets.unsqueeze(1), 1 - self.smoothing)
return -(one_hot * log_probs).sum(dim=-1).mean()
# Compare losses on synthetic predictions
torch.manual_seed(42)
logits = torch.randn(8, 3) # 8 samples, 3 classes
targets = torch.randint(0, 3, (8,))
ce_loss = nn.CrossEntropyLoss()(logits, targets)
fl_loss = FocalLoss(gamma=2.0)(logits, targets)
ls_loss = LabelSmoothingLoss(3, 0.1)(logits, targets)
print(f'CrossEntropy: {ce_loss.item():.4f}')
print(f'FocalLoss: {fl_loss.item():.4f}')
print(f'LabelSmoothing: {ls_loss.item():.4f}')
# Weighted CrossEntropy for class imbalance
weights = torch.tensor([1.0, 5.0, 3.0]) # class 1 is rare, upweighted
wce = nn.CrossEntropyLoss(weight=weights)(logits, targets)
print(f'WeightedCE: {wce.item():.4f}')import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=2000, n_features=15, random_state=42)
X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
sc = StandardScaler()
X_tr = sc.fit_transform(X_tr)
X_val = sc.transform(X_val)
train_dl = DataLoader(TensorDataset(
torch.tensor(X_tr, dtype=torch.float32),
torch.tensor(y_tr, dtype=torch.long)
), batch_size=32, shuffle=True)
model = nn.Sequential(
nn.Linear(15, 64), nn.BatchNorm1d(64), nn.ReLU(), nn.Dropout(0.5),
nn.Linear(64, 32), nn.BatchNorm1d(32), nn.ReLU(), nn.Dropout(0.4),
nn.Linear(32, 2)
)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(1, 6):
model.train()
total = 0
for xb, yb in train_dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
total += loss.item()
print(f'Epoch {epoch}: loss={total/len(train_dl):.4f}')
model.eval()
with torch.no_grad():
X_v = torch.tensor(X_val, dtype=torch.float32)
y_v = torch.tensor(y_val, dtype=torch.long)
acc = (model(X_v).argmax(1) == y_v).float().mean()
print(f'Val accuracy: {acc.item():.4f}')import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
torch.manual_seed(42)
# TODO: Generate data
# X, y = make_regression(n_samples=500, n_features=10, noise=30, random_state=42)
# X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# scaler = StandardScaler()
# X_tr = scaler.fit_transform(X_tr)
# X_val = scaler.transform(X_val)
# TODO: Build two identical networks β one regularized, one not
# def make_model(regularized=True):
# if regularized:
# return nn.Sequential(
# nn.Linear(10, 64), nn.ReLU(), nn.Dropout(0.3),
# nn.Linear(64, 32), nn.ReLU(), nn.Dropout(0.3),
# nn.Linear(32, 1)
# )
# else:
# return nn.Sequential(
# nn.Linear(10, 64), nn.ReLU(),
# nn.Linear(64, 32), nn.ReLU(),
# nn.Linear(32, 1)
# )
# TODO: Train both for 30 epochs and compare val MSE
# For each model, track val_mse_list and print final val MSE
passPersist trained models with state_dict, checkpoint training progress, and export for deployment.
Save & Load state_dict
import torch
import torch.nn as nn
import os, tempfile
# Build and 'train' a model
model = nn.Sequential(
nn.Linear(10, 64), nn.ReLU(),
nn.Linear(64, 32), nn.ReLU(),
nn.Linear(32, 3)
)
tmp = tempfile.gettempdir()
path = os.path.join(tmp, 'model.pth')
# Save weights only (recommended)
torch.save(model.state_dict(), path)
size_kb = os.path.getsize(path) / 1024
print(f'Saved model: {size_kb:.1f} KB')
# Load into same architecture
loaded = nn.Sequential(
nn.Linear(10, 64), nn.ReLU(),
nn.Linear(64, 32), nn.ReLU(),
nn.Linear(32, 3)
)
loaded.load_state_dict(torch.load(path, weights_only=True))
loaded.eval()
x = torch.randn(4, 10)
print('Output:', loaded(x).shape)
print('Weights match:', torch.allclose(
list(model.parameters())[0],
list(loaded.parameters())[0]
))Full Checkpoint (Resume Training)
import torch
import torch.nn as nn
import os, tempfile
model = nn.Linear(10, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
epoch = 5
best_val_loss = 0.042
tmp = tempfile.gettempdir()
ckpt_path = os.path.join(tmp, 'checkpoint.pth')
# Save full checkpoint
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'best_val_loss': best_val_loss
}, ckpt_path)
print(f'Checkpoint saved ({os.path.getsize(ckpt_path)/1024:.1f} KB)')
# Resume training
checkpoint = torch.load(ckpt_path, weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch'] + 1
print(f'Resuming from epoch {start_epoch}')
print(f'Best val loss was: {checkpoint["best_val_loss"]}')state_dict vs Full Model Save & TorchScript
import torch
import torch.nn as nn
import os, tempfile
torch.manual_seed(42)
class SmallNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(8, 16)
self.fc2 = nn.Linear(16, 1)
def forward(self, x):
return self.fc2(torch.relu(self.fc1(x)))
model = SmallNet()
tmp = tempfile.gettempdir()
# Method 1: state_dict (recommended, portable)
sd_path = os.path.join(tmp, 'net_sd.pth')
torch.save(model.state_dict(), sd_path)
m2 = SmallNet()
m2.load_state_dict(torch.load(sd_path, weights_only=True))
print('state_dict load OK')
# Method 2: full model save (pickle-based, less portable)
full_path = os.path.join(tmp, 'net_full.pth')
torch.save(model, full_path)
m3 = torch.load(full_path, weights_only=False)
print('Full model load OK')
# Method 3: TorchScript (portable, no Python needed at inference)
scripted = torch.jit.script(model)
ts_path = os.path.join(tmp, 'net_scripted.pt')
scripted.save(ts_path)
loaded_ts = torch.jit.load(ts_path)
x = torch.randn(4, 8)
print('TorchScript output:', loaded_ts(x).shape)
print('Outputs match:', torch.allclose(model(x), loaded_ts(x), atol=1e-5))Model Versioning with State Dict & ONNX-like Summary
import torch
import torch.nn as nn
import io
class MLP(nn.Module):
def __init__(self, in_f, hidden, out_f):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_f, hidden), nn.ReLU(),
nn.Linear(hidden, out_f)
)
def forward(self, x): return self.net(x)
model = MLP(16, 64, 4)
# Save/load state dict to buffer (no file system needed)
buf = io.BytesIO()
torch.save(model.state_dict(), buf)
buf.seek(0)
size_kb = buf.getbuffer().nbytes / 1024
print(f'State dict size: {size_kb:.2f} KB')
# Load into new model
model2 = MLP(16, 64, 4)
model2.load_state_dict(torch.load(buf, weights_only=True))
model2.eval()
# Verify identical outputs
torch.manual_seed(0)
x = torch.randn(4, 16)
with torch.no_grad():
out1 = model(x)
out2 = model2(x)
print(f'Outputs identical: {torch.allclose(out1, out2)}')
# Parameter count per layer
print('\nModel parameter summary:')
total = 0
for name, p in model.named_parameters():
n = p.numel()
total += n
print(f' {name:25s}: {list(p.shape)} = {n:,} params')
print(f' Total: {total:,} parameters')import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import os, tempfile
torch.manual_seed(42)
X = torch.randn(1000, 10)
y = torch.randint(0, 3, (1000,))
dl = DataLoader(TensorDataset(X, y), batch_size=64, shuffle=True)
model = nn.Sequential(nn.Linear(10,64), nn.ReLU(), nn.Linear(64,3))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
tmp = tempfile.gettempdir()
best_loss = float('inf')
for epoch in range(1, 16):
model.train()
total = 0
for xb, yb in dl:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
total += loss.item()
avg_loss = total / len(dl)
# Save best model
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(model.state_dict(), os.path.join(tmp, 'best_model.pth'))
# Periodic checkpoint every 5 epochs
if epoch % 5 == 0:
ckpt = os.path.join(tmp, f'ckpt_epoch{epoch}.pth')
torch.save({'epoch': epoch, 'model': model.state_dict(), 'loss': avg_loss}, ckpt)
print(f'Epoch {epoch:2d}: loss={avg_loss:.4f} [checkpoint saved]')import torch
import torch.nn as nn
import os, tempfile
torch.manual_seed(42)
# TODO: Build a small MLP (input=8, hidden=16, output=1)
# model = nn.Sequential(???)
# TODO: Generate synthetic data and train for 5 epochs
# X = torch.randn(200, 8)
# y = torch.randn(200, 1)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# for epoch in range(5):
# loss = nn.MSELoss()(model(X), y)
# optimizer.zero_grad(); loss.backward(); optimizer.step()
# print(f'Epoch {epoch+1}: loss={loss.item():.4f}')
# TODO: Save state_dict
# path = os.path.join(tempfile.gettempdir(), 'model.pth')
# torch.save(model.state_dict(), path)
# TODO: Create new model, load weights, verify outputs match
# model2 = nn.Sequential(???)
# model2.load_state_dict(torch.load(path, weights_only=True))
# model.eval(); model2.eval()
# with torch.no_grad():
# out1 = model(X[:4])
# out2 = model2(X[:4])
# print('Match:', torch.allclose(out1, out2))
# TODO: Export as TorchScript
# scripted = torch.jit.script(model)
# scripted.save(os.path.join(tempfile.gettempdir(), 'model_scripted.pt'))
# print('TorchScript saved')Process sequential data β time series, text, signals β with RNNs, LSTMs, and GRUs.
LSTM for Sequence Classification
import torch
import torch.nn as nn
class LSTMClassifier(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.2 if num_layers > 1 else 0
)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# x: [batch, seq_len, input_size]
out, (h_n, c_n) = self.lstm(x)
return self.fc(out[:, -1, :]) # last timestep
model = LSTMClassifier(
input_size=10, hidden_size=64,
num_layers=2, num_classes=3
)
print(model)
# Batch of 16 sequences, each 30 timesteps, 10 features
x = torch.randn(16, 30, 10)
out = model(x)
print('Output shape:', out.shape) # [16, 3]
print('Params:', sum(p.numel() for p in model.parameters()))Time Series Forecasting with GRU
import torch
import torch.nn as nn
import numpy as np
class GRUForecaster(nn.Module):
def __init__(self, input_size=1, hidden=64, horizon=1):
super().__init__()
self.gru = nn.GRU(input_size, hidden, batch_first=True)
self.fc = nn.Linear(hidden, horizon)
def forward(self, x):
out, _ = self.gru(x)
return self.fc(out[:, -1, :])
model = GRUForecaster(input_size=1, hidden=64, horizon=1)
# Generate sine wave data
np.random.seed(42)
t = np.linspace(0, 20 * np.pi, 1000)
signal = np.sin(t) + 0.1 * np.random.randn(1000)
# Create sliding window sequences (window=30)
W = 30
X = np.array([signal[i:i+W] for i in range(len(signal)-W)])
y = signal[W:]
X_t = torch.tensor(X, dtype=torch.float32).unsqueeze(2) # [n, 30, 1]
y_t = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
out = model(X_t[:8])
print('Forecast shape:', out.shape) # [8, 1]
loss = nn.MSELoss()(out, y_t[:8])
print('Initial MSE:', loss.item().__round__(4))Bidirectional LSTM & PackedSequence
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
# Bidirectional LSTM: processes sequence forward AND backward
class BiLSTM(nn.Module):
def __init__(self, input_size, hidden, num_classes):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden,
batch_first=True,
bidirectional=True # 2x hidden size in output
)
self.fc = nn.Linear(hidden * 2, num_classes)
def forward(self, x, lengths):
# Pack variable-length sequences for efficiency
packed = pack_padded_sequence(x, lengths, batch_first=True,
enforce_sorted=False)
out_packed, (h_n, _) = self.lstm(packed)
out, _ = pad_packed_sequence(out_packed, batch_first=True)
# Concatenate last forward and first backward hidden states
last = torch.cat([h_n[0], h_n[1]], dim=1)
return self.fc(last)
model = BiLSTM(input_size=8, hidden=32, num_classes=4)
# Variable-length batch (padded)
batch_size = 6
max_len = 15
lengths = torch.tensor([15, 12, 10, 8, 7, 5])
x = torch.zeros(batch_size, max_len, 8)
for i, l in enumerate(lengths):
x[i, :l] = torch.randn(l, 8)
out = model(x, lengths)
print('BiLSTM output:', out.shape) # [6, 4]
print('Params:', sum(p.numel() for p in model.parameters()))Bidirectional LSTM & Packed Sequences
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence
torch.manual_seed(42)
# Bidirectional LSTM
bi_lstm = nn.LSTM(input_size=16, hidden_size=32, num_layers=2,
batch_first=True, bidirectional=True, dropout=0.2)
x = torch.randn(8, 20, 16) # batch=8, seq_len=20, features=16
out, (h_n, c_n) = bi_lstm(x)
print('Bidirectional LSTM:')
print(f' Input: {list(x.shape)}')
print(f' Output: {list(out.shape)} (hidden*2={32*2} for bidirectional)')
print(f' h_n: {list(h_n.shape)} (layers*2, batch, hidden)')
# Packed sequences β handle variable-length inputs efficiently
sequences = [torch.randn(length, 8) for length in [10, 7, 5, 3]]
lengths = torch.tensor([10, 7, 5, 3])
padded = pad_sequence(sequences, batch_first=True) # (4, 10, 8)
lstm = nn.LSTM(input_size=8, hidden_size=16, batch_first=True)
packed = pack_padded_sequence(padded, lengths, batch_first=True, enforce_sorted=True)
out_packed, _ = lstm(packed)
out_padded, out_lengths = pad_packed_sequence(out_packed, batch_first=True)
print('\nPacked sequences:')
print(f' Padded input: {list(padded.shape)}')
print(f' Output: {list(out_padded.shape)}')
print(f' Out lengths: {out_lengths.tolist()}')import torch
import torch.nn as nn
import numpy as np
torch.manual_seed(42)
np.random.seed(42)
# Simulate sensor readings (mostly normal, some anomalies)
n = 2000
sensor = np.sin(np.linspace(0, 10*np.pi, n)) + 0.05*np.random.randn(n)
# Inject anomalies at random positions
anomaly_idx = np.random.choice(n, 20, replace=False)
sensor[anomaly_idx] += np.random.uniform(2, 4, 20)
W = 20 # lookback window
X = torch.tensor([sensor[i:i+W] for i in range(n-W)], dtype=torch.float32).unsqueeze(2)
y = torch.tensor(sensor[W:], dtype=torch.float32).unsqueeze(1)
class AnomalyLSTM(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(1, 32, batch_first=True)
self.fc = nn.Linear(32, 1)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(out[:, -1, :])
net = AnomalyLSTM()
optim = torch.optim.Adam(net.parameters(), lr=1e-3)
for epoch in range(5):
pred = net(X)
loss = nn.MSELoss()(pred, y)
optim.zero_grad(); loss.backward(); optim.step()
if epoch % 2 == 0: print(f'Epoch {epoch}: MSE={loss.item():.5f}')
# Detect anomalies: reconstruction error > threshold
net.eval()
with torch.no_grad():
errors = (net(X).squeeze() - y.squeeze()).abs()
threshold = errors.mean() + 3 * errors.std()
detected = (errors > threshold).sum()
print(f'Anomalies detected: {detected.item()} (threshold={threshold.item():.4f})')import torch
import torch.nn as nn
torch.manual_seed(42)
# Hyperparameters
batch_size = 8
seq_len = 20
input_size = 10
hidden_size = 32
num_classes = 2
# TODO: Create batch of random sequences
# x = torch.randn(batch_size, seq_len, input_size)
# print('Input shape:', x.shape)
# TODO: Define LSTM module and FC head
# lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
# fc = nn.Linear(hidden_size, num_classes)
# TODO: Forward pass through LSTM
# out, (h_n, c_n) = lstm(x)
# print('LSTM output shape:', out.shape) # (batch, seq_len, hidden)
# print('Final hidden shape:', h_n.shape) # (1, batch, hidden)
# TODO: Take last timestep and pass to FC
# last = out[:, -1, :] # (batch, hidden)
# logits = fc(last) # (batch, num_classes)
# print('Logits shape:', logits.shape)
# TODO: Compute cross-entropy loss and backprop
# labels = torch.randint(0, num_classes, (batch_size,))
# loss = nn.CrossEntropyLoss()(logits, labels)
# loss.backward()
# print('Loss:', loss.item())Implement scaled dot-product attention, multi-head attention, and self-attention from scratch. Understand how transformers use attention to model long-range dependencies.
Scaled Dot-Product Attention
import numpy as np
def softmax(x, axis=-1):
x = x - x.max(axis=axis, keepdims=True)
e = np.exp(x)
return e / e.sum(axis=axis, keepdims=True)
def scaled_dot_product_attention(Q, K, V, mask=None):
d_k = Q.shape[-1]
scores = Q @ K.swapaxes(-2, -1) / np.sqrt(d_k) # (batch, heads, seq, seq)
if mask is not None:
scores = np.where(mask, scores, -1e9)
weights = softmax(scores, axis=-1)
return weights @ V, weights
np.random.seed(42)
batch, seq_len, d_model = 2, 5, 8
Q = np.random.randn(batch, seq_len, d_model)
K = np.random.randn(batch, seq_len, d_model)
V = np.random.randn(batch, seq_len, d_model)
output, weights = scaled_dot_product_attention(Q, K, V)
print('Q shape:', Q.shape)
print('Attention output shape:', output.shape)
print('Attention weights shape:', weights.shape)
print('Attention weights (sample 0, row 0):', weights[0, 0].round(3))
print('Row sum (should be 1.0):', weights[0, 0].sum(axis=-1).round(4))Multi-Head Attention from Scratch
import numpy as np
def softmax(x, axis=-1):
x -= x.max(axis=axis, keepdims=True)
return np.exp(x) / np.exp(x).sum(axis=axis, keepdims=True)
class MultiHeadAttention:
def __init__(self, d_model=64, n_heads=4, seed=42):
self.h = n_heads
self.d_k = d_model // n_heads
rng = np.random.default_rng(seed)
self.W_Q = rng.standard_normal((d_model, d_model)) * 0.1
self.W_K = rng.standard_normal((d_model, d_model)) * 0.1
self.W_V = rng.standard_normal((d_model, d_model)) * 0.1
self.W_O = rng.standard_normal((d_model, d_model)) * 0.1
def split_heads(self, x):
B, L, D = x.shape
return x.reshape(B, L, self.h, self.d_k).transpose(0, 2, 1, 3)
def forward(self, x):
Q = self.split_heads(x @ self.W_Q)
K = self.split_heads(x @ self.W_K)
V = self.split_heads(x @ self.W_V)
scores = Q @ K.swapaxes(-2,-1) / self.d_k**0.5
attn = softmax(scores) @ V
concat = attn.transpose(0,2,1,3).reshape(x.shape[0], x.shape[1], -1)
return concat @ self.W_O
np.random.seed(0)
B, L, D = 2, 10, 64
x = np.random.randn(B, L, D)
mha = MultiHeadAttention(d_model=D, n_heads=4)
out = mha.forward(x)
print(f'Input: {x.shape}')
print(f'Output: {out.shape}')
print(f'Mean abs output: {np.abs(out).mean():.4f}')Causal (Masked) Self-Attention
import numpy as np
def softmax(x, axis=-1):
x = x - x.max(axis=axis, keepdims=True)
return np.exp(x) / np.exp(x).sum(axis=axis, keepdims=True)
def causal_attention(Q, K, V):
d_k = Q.shape[-1]
seq = Q.shape[-2]
# Causal mask: upper triangle = -inf
mask = np.tril(np.ones((seq, seq))).astype(bool)
scores = Q @ K.swapaxes(-2,-1) / np.sqrt(d_k)
scores = np.where(mask, scores, -1e9)
weights = softmax(scores)
return weights @ V, weights
np.random.seed(42)
L, D = 6, 16
Q = np.random.randn(1, L, D)
K = np.random.randn(1, L, D)
V = np.random.randn(1, L, D)
out, attn = causal_attention(Q, K, V)
print('Causal attention weights (6x6):')
print(attn[0].round(3))
print('\nRow sums:', attn[0].sum(axis=-1).round(4))
print('Upper triangle is zero (causal mask verified):', (attn[0] * np.triu(np.ones((L,L)),1) < 1e-6).all())Attention in PyTorch nn.MultiheadAttention
try:
import torch
import torch.nn as nn
torch.manual_seed(42)
d_model, n_heads = 64, 4
mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=n_heads, batch_first=True)
B, L = 2, 10
x = torch.randn(B, L, d_model)
# Self-attention (Q=K=V=x)
out, weights = mha(x, x, x)
print(f'Input: {tuple(x.shape)}')
print(f'Output: {tuple(out.shape)}')
print(f'Weights: {tuple(weights.shape)}')
# Causal (autoregressive) mask
mask = torch.triu(torch.ones(L, L), diagonal=1).bool()
out_causal, w_causal = mha(x, x, x, attn_mask=mask)
print(f'Causal output: {tuple(out_causal.shape)}')
print(f'Params: {sum(p.numel() for p in mha.parameters()):,}')
except ImportError:
print('pip install torch')
print('nn.MultiheadAttention: built-in PyTorch multi-head attention')
print('batch_first=True: input shape (batch, seq, features)')import numpy as np
def softmax(x, axis=-1):
x -= x.max(axis=axis, keepdims=True)
return np.exp(x) / np.exp(x).sum(axis=axis, keepdims=True)
# Simulate token embeddings for a support ticket
np.random.seed(42)
tokens = ['my', 'account', 'is', 'locked', 'I', 'cannot', 'login', '[PAD]']
d_model = 16
x = np.random.randn(1, len(tokens), d_model) # (batch=1, seq=8, d=16)
# Simple 1-head attention
W_Q = np.random.randn(d_model, d_model) * 0.1
W_K = np.random.randn(d_model, d_model) * 0.1
W_V = np.random.randn(d_model, d_model) * 0.1
Q = x @ W_Q
K = x @ W_K
V = x @ W_V
scores = Q @ K.swapaxes(-2,-1) / d_model**0.5
weights = softmax(scores, axis=-1)
output = weights @ V
print('Token attention weights from [CLS]-like position (row 0):')
for tok, w in zip(tokens, weights[0, 0]):
bar = '|' * int(w * 100)
print(f'{tok:12s}: {w:.4f} {bar}')import numpy as np
def softmax(x, axis=-1):
x = x - x.max(axis=axis, keepdims=True)
return np.exp(x) / np.exp(x).sum(axis=axis, keepdims=True)
def attention(Q, K, V, mask=None, dropout=0.0, rng=None):
# TODO: compute scaled dot-product attention
# TODO: apply mask if provided (causal upper-tri mask)
# TODO: apply dropout to weights if dropout > 0
# return (output, weights)
pass
np.random.seed(42)
B, L, D = 2, 8, 16
Q = np.random.randn(B, L, D)
K = np.random.randn(B, L, D)
V = np.random.randn(B, L, D)
# Test 1: no mask
out, w = attention(Q, K, V)
# print('Output shape:', out.shape, '| Weights shape:', w.shape)
# print('Row sums (should be 1):', w[0,0].sum(axis=-1))
# Test 2: causal mask
mask = np.tril(np.ones((L, L))).astype(bool)
out_c, w_c = attention(Q, K, V, mask=mask)
# print('Causal upper triangle is 0:', (w_c[0] * np.triu(np.ones((L,L)),1) < 1e-9).all())
Implement Variational Autoencoders (VAE) and Generative Adversarial Networks (GAN) concepts. Understand reparameterization trick, ELBO loss, and adversarial training.
Autoencoder from Scratch (NumPy)
import numpy as np
np.random.seed(42)
def relu(x): return np.maximum(0, x)
def sigmoid(x): return 1 / (1 + np.exp(-np.clip(x, -100, 100)))
class Autoencoder:
def __init__(self, in_dim=28, latent=4, lr=0.01):
self.lr = lr
# Encoder
self.W_enc = np.random.randn(in_dim, 8) * 0.1
self.W_lat = np.random.randn(8, latent) * 0.1
# Decoder
self.W_dec = np.random.randn(latent, 8) * 0.1
self.W_out = np.random.randn(8, in_dim) * 0.1
def encode(self, x):
self.h1 = relu(x @ self.W_enc)
return self.h1 @ self.W_lat
def decode(self, z):
self.h2 = relu(z @ self.W_dec)
return sigmoid(self.h2 @ self.W_out)
def forward(self, x):
self.z = self.encode(x)
self.rec = self.decode(self.z)
return self.rec
def loss(self, x, rec):
return -np.mean(x * np.log(rec + 1e-8) + (1-x) * np.log(1-rec + 1e-8))
# Train on random binary data
X = (np.random.rand(500, 28) > 0.5).astype(float)
ae = Autoencoder(in_dim=28, latent=4)
for epoch in range(50):
rec = ae.forward(X)
if epoch % 10 == 0:
print(f'Epoch {epoch}: loss={ae.loss(X, rec):.4f}')VAE Reparameterization Trick
import numpy as np
np.random.seed(42)
# Demonstrate the reparameterization trick
# Instead of z ~ N(mu, sigma^2), sample:
# epsilon ~ N(0, 1), then z = mu + sigma * epsilon
def sample_vae(mu, log_var):
"""Reparameterization: z = mu + exp(0.5*log_var) * eps"""
eps = np.random.standard_normal(mu.shape)
sigma = np.exp(0.5 * log_var)
return mu + sigma * eps
def kl_divergence(mu, log_var):
"""KL(N(mu,sigma) || N(0,1)) in closed form."""
return -0.5 * np.mean(1 + log_var - mu**2 - np.exp(log_var))
def vae_loss(x, x_rec, mu, log_var):
recon = np.mean((x - x_rec)**2) # MSE reconstruction
kl = kl_divergence(mu, log_var)
return recon + kl, recon, kl
# Simulate encoder outputs
batch, latent = 32, 8
mu = np.random.randn(batch, latent) * 0.5
log_var = np.random.randn(batch, latent) * 0.5 - 1 # log_var < 0 -> small sigma
z = sample_vae(mu, log_var)
x = np.random.randn(batch, 16)
x_rec = x + np.random.randn(*x.shape) * 0.3 # simulated reconstruction
total, recon, kl = vae_loss(x, x_rec, mu, log_var)
print(f'Total VAE loss: {total:.4f}')
print(f'Reconstruction: {recon:.4f}')
print(f'KL divergence: {kl:.4f}')
print(f'Latent z shape: {z.shape}')GAN Adversarial Training Concept
import numpy as np
np.random.seed(42)
def sigmoid(x): return 1 / (1 + np.exp(-np.clip(x, -10, 10)))
def bce(y_true, y_pred): return -np.mean(y_true*np.log(y_pred+1e-8) + (1-y_true)*np.log(1-y_pred+1e-8))
# Minimal GAN training loop concept
# Real distribution: N(4, 0.5)
real_data = np.random.normal(4, 0.5, (1000, 1))
# Generator: noise -> fake samples
class Generator:
def __init__(self):
self.W1 = np.random.randn(1, 8) * 0.1
self.W2 = np.random.randn(8, 1) * 0.1
def generate(self, z):
h = np.tanh(z @ self.W1)
return h @ self.W2 # fake samples
# Discriminator: sample -> P(real)
class Discriminator:
def __init__(self):
self.W1 = np.random.randn(1, 8) * 0.1
self.W2 = np.random.randn(8, 1) * 0.1
def discriminate(self, x):
h = np.maximum(0, x @ self.W1)
return sigmoid(h @ self.W2)
G = Generator(); D = Discriminator()
batch = 32
print('GAN Training Loop (concepts):')
for step in range(5):
# Discriminator: maximize log D(real) + log(1-D(fake))
real = real_data[np.random.randint(0, len(real_data), batch)]
z = np.random.randn(batch, 1)
fake = G.generate(z)
d_real = D.discriminate(real)
d_fake = D.discriminate(fake)
d_loss = bce(np.ones((batch,1)), d_real) + bce(np.zeros((batch,1)), d_fake)
# Generator: maximize log D(fake) = minimize -log D(fake)
g_loss = bce(np.ones((batch,1)), d_fake)
print(f'Step {step+1}: D_loss={d_loss:.4f} G_loss={g_loss:.4f} | '
f'fake_mean={fake.mean():.2f} (target~4.0)')VAE in PyTorch
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
class VAE(nn.Module):
def __init__(self, input_dim=784, latent_dim=20, hidden=256):
super().__init__()
self.encoder = nn.Sequential(nn.Linear(input_dim, hidden), nn.ReLU())
self.mu_layer = nn.Linear(hidden, latent_dim)
self.logvar_layer = nn.Linear(hidden, latent_dim)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden), nn.ReLU(),
nn.Linear(hidden, input_dim), nn.Sigmoid()
)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
return mu + std * torch.randn_like(std)
def forward(self, x):
h = self.encoder(x)
mu = self.mu_layer(h)
lv = self.logvar_layer(h)
z = self.reparameterize(mu, lv)
rec = self.decoder(z)
kl = -0.5 * (1 + lv - mu**2 - lv.exp()).mean()
return rec, kl
torch.manual_seed(42)
vae = VAE(input_dim=784, latent_dim=16)
x = torch.rand(32, 784)
rec, kl = vae(x)
recon_loss = F.binary_cross_entropy(rec, x, reduction='mean')
loss = recon_loss + kl
print(f'VAE total loss: {loss.item():.4f} (recon={recon_loss.item():.4f}, kl={kl.item():.4f})')
print(f'Params: {sum(p.numel() for p in vae.parameters()):,}')
except ImportError:
print('pip install torch')import numpy as np
np.random.seed(42)
# Simulate customer transaction data
n = 1000
real_data = np.column_stack([
np.random.exponential(100, n), # transaction_amount
np.random.randint(0, 5, n).astype(float), # category (0-4)
np.random.normal(25, 8, n), # customer_age_proxy
np.random.choice([0, 1], n, p=[0.97, 0.03]).astype(float), # fraud
])
# Normalize
mu = real_data.mean(axis=0)
sigma = real_data.std(axis=0) + 1e-8
X = (real_data - mu) / sigma
# Simulate VAE encoder output (in practice: train the VAE)
latent_dim = 4
Z_mu = X @ np.random.randn(4, latent_dim) * 0.3
Z_logvar = np.random.randn(*Z_mu.shape) * 0.5 - 1
Z_samples = Z_mu + np.exp(0.5 * Z_logvar) * np.random.randn(*Z_mu.shape)
# Simulate decoder
X_fake = Z_samples @ np.random.randn(latent_dim, 4) * 0.5
synth = X_fake * sigma + mu # de-normalize
print('Real data stats:'); print(f' Amount: mean={real_data[:,0].mean():.1f}, std={real_data[:,0].std():.1f}')
print('Synthetic stats:'); print(f' Amount: mean={synth[:,0].mean():.1f}, std={synth[:,0].std():.1f}')
print(f'KL divergence (latent): {(-0.5*(1+Z_logvar-Z_mu**2-np.exp(Z_logvar)).mean()):.4f}')import numpy as np
np.random.seed(42)
batch, input_dim, latent_dim, hidden = 16, 32, 4, 16
# Network weights (random init)
W_enc = np.random.randn(input_dim, hidden) * 0.1
W_mu = np.random.randn(hidden, latent_dim) * 0.1
W_lv = np.random.randn(hidden, latent_dim) * 0.1
W_dec1 = np.random.randn(latent_dim, hidden) * 0.1
W_dec2 = np.random.randn(hidden, input_dim) * 0.1
X = np.random.rand(batch, input_dim) # input data in [0,1]
# TODO: (1) Encode: h = relu(X @ W_enc), mu = h @ W_mu, lv = h @ W_lv
# TODO: (2) Reparameterize: z = mu + exp(0.5*lv) * eps
# TODO: (3) Decode: h2 = relu(z @ W_dec1), x_rec = sigmoid(h2 @ W_dec2)
# TODO: (4) Compute MSE recon loss, KL divergence, total ELBO (beta=0.5)
# TODO: Print each component
Explain black-box model predictions using SHAP values, integrated gradients, LIME, and attention visualization. Make models auditable and trustworthy.
SHAP Values with TreeExplainer
try:
import shap
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=8, n_informative=5, random_state=42)
feat_names = [f'feature_{i}' for i in range(8)]
model = GradientBoostingClassifier(n_estimators=50, random_state=0).fit(X, y)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X[:100])
print('SHAP values shape:', shap_values.shape)
print('\nMean |SHAP| per feature (global importance):')
importances = np.abs(shap_values).mean(axis=0)
for name, imp in sorted(zip(feat_names, importances), key=lambda x: -x[1]):
bar = '|' * int(imp * 20)
print(f'{name}: {imp:.4f} {bar}')
print('\nSHAP values for sample 0:')
for name, sv in zip(feat_names, shap_values[0]):
print(f' {name}: {sv:+.4f}')
except ImportError:
print('pip install shap')Permutation Feature Importance
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=10, n_informative=5, random_state=42)
feat_names = [f'f{i}' for i in range(10)]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
model = GradientBoostingClassifier(n_estimators=50, random_state=0).fit(X_tr, y_tr)
result = permutation_importance(
model, X_te, y_te, n_repeats=10, random_state=42, scoring='roc_auc'
)
print('Permutation Feature Importance (ROC-AUC drop):')
print(f'{"Feature":<12} {"Mean Drop":>12} {"Std":>8}')
print('-' * 35)
order = np.argsort(-result.importances_mean)
for i in order:
print(f'{feat_names[i]:<12} {result.importances_mean[i]:>12.4f} {result.importances_std[i]:>8.4f}')LIME-style Local Explanations
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=8, n_informative=5, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
model = GradientBoostingClassifier(n_estimators=50, random_state=0).fit(X_tr, y_tr)
def lime_explain(model, x_instance, X_train, n_samples=500, sigma=0.1):
"""Approximate local explanation around x_instance."""
rng = np.random.default_rng(42)
# Sample neighborhood around x_instance
X_perturbed = x_instance + rng.normal(0, sigma, (n_samples, len(x_instance)))
# Get black-box predictions
y_hat = model.predict_proba(X_perturbed)[:, 1]
# Weight by distance
distances = np.linalg.norm(X_perturbed - x_instance, axis=1)
weights = np.exp(-distances / distances.mean())
# Fit local linear model
local_model = Ridge(alpha=0.1)
local_model.fit(X_perturbed, y_hat, sample_weight=weights)
return local_model.coef_
x0 = X_te[0]
coef = lime_explain(model, x0, X_tr)
feat_names = [f'f{i}' for i in range(8)]
print(f'Sample 0 prediction: {model.predict_proba(x0.reshape(1,-1))[0,1]:.3f}')
print('LIME explanation (local coefficients):')
for name, c in sorted(zip(feat_names, coef), key=lambda x: -abs(x[1])):
print(f' {name}: {c:+.4f}')Saliency Maps & Integrated Gradients (NumPy)
import numpy as np
np.random.seed(42)
# Simulate a simple neural network with gradient computation
def relu(x): return np.maximum(0, x)
def relu_grad(x): return (x > 0).astype(float)
def sigmoid(x): return 1 / (1 + np.exp(-np.clip(x, -10, 10)))
class SimpleNet:
def __init__(self, d=8, h=16):
self.W1 = np.random.randn(d, h) * 0.1
self.W2 = np.random.randn(h, 1) * 0.1
def forward(self, x):
self.x = x
self.h = relu(x @ self.W1)
return sigmoid(self.h @ self.W2)
def saliency(self, x):
out = self.forward(x)
# Backprop to input
d_out = out * (1 - out) # sigmoid grad
d_h = d_out @ self.W2.T
d_h *= relu_grad(x @ self.W1)
d_x = d_h @ self.W1.T
return np.abs(d_x)
def integrated_gradients(self, x, n_steps=50):
baseline = np.zeros_like(x)
alphas = np.linspace(0, 1, n_steps)
grads = [self.saliency(baseline + a * x) for a in alphas]
ig = (x - baseline) * np.mean(grads, axis=0)
return ig
net = SimpleNet(d=8)
x = np.random.randn(1, 8)
sal = net.saliency(x)[0]
ig = net.integrated_gradients(x)[0]
print('Saliency scores:', sal.round(4))
print('Integrated Gradients:', ig.round(4))
print('Top feature (saliency):', np.argmax(sal))
print('Top feature (IG):', np.argmax(np.abs(ig)))import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=10, n_informative=7, random_state=42)
feat_names = ['income','debt_ratio','credit_score','employment_yrs','loan_amount',
'assets','late_payments','dependents','savings','age']
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
model = GradientBoostingClassifier(n_estimators=100, random_state=0).fit(X_tr, y_tr)
# Compute permutation importance as SHAP proxy
from sklearn.inspection import permutation_importance
imp = permutation_importance(model, X_te, y_te, n_repeats=5, random_state=42)
global_imp = dict(zip(feat_names, imp.importances_mean))
# Explain rejections (predicted 0)
pred_proba = model.predict_proba(X_te)[:,1]
rejected = np.where(pred_proba < 0.3)[0][:3]
print(f'Explaining {len(rejected)} rejections:')
for idx in rejected:
pred = pred_proba[idx]
top3 = sorted(global_imp.items(), key=lambda x: -x[1])[:3]
print(f' Sample {idx}: P(approve)={pred:.2%}')
for f, imp_val in top3:
print(f' {f}: importance={imp_val:.4f}')import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
np.random.seed(42)
X, y = make_classification(n_samples=1500, n_features=15, n_informative=8, random_state=42)
feat_names = [f'f{i}' for i in range(15)]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25, random_state=42)
model = GradientBoostingClassifier(n_estimators=50, random_state=0).fit(X_tr, y_tr)
# TODO: (1) Extract and rank built-in feature_importances_
# TODO: (2) Compute permutation importance on test set
# TODO: (3) Find 5 samples with lowest predict_proba, explain with simple local Ridge
# TODO: Compare top-5 features across all methods
CNNs use learnable filters to detect spatial patterns in images. Conv layers extract local features, pooling reduces dimensionality, and residual connections enable training very deep networks by solving the vanishing gradient problem.
Simple CNN Classifier
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
np.random.seed(42); torch.manual_seed(42)
# Simulate image-like data
X = torch.randn(1000, 1, 28, 28)
y = torch.randint(0, 10, (1000,))
class SimpleCNN(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.relu = nn.ReLU()
self.fc1 = nn.Linear(32 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10)
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = self.pool(self.relu(self.conv1(x))) # -> (16, 14, 14)
x = self.pool(self.relu(self.conv2(x))) # -> (32, 7, 7)
x = x.view(x.size(0), -1)
x = self.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x)
model = SimpleCNN()
print(f"Parameters: {sum(p.numel() for p in model.parameters()):,}")
loader = DataLoader(TensorDataset(X, y), batch_size=32)
x_batch, _ = next(iter(loader))
out = model(x_batch)
print(f"Output shape: {out.shape}")
Residual Block & Skip Connections
import torch
import torch.nn as nn
import numpy as np
class ResidualBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(channels)
self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
self.bn2 = nn.BatchNorm2d(channels)
self.relu = nn.ReLU()
def forward(self, x):
identity = x
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
return self.relu(out + identity) # skip connection
class MiniResNet(nn.Module):
def __init__(self, n_classes=10):
super().__init__()
self.stem = nn.Conv2d(1, 32, 3, padding=1)
self.res1 = ResidualBlock(32)
self.pool = nn.AdaptiveAvgPool2d(4)
self.fc = nn.Linear(32 * 4 * 4, n_classes)
def forward(self, x):
x = torch.relu(self.stem(x))
x = self.res1(x)
x = self.pool(x)
return self.fc(x.flatten(1))
model = MiniResNet()
x = torch.randn(8, 1, 28, 28)
print(f"Output shape: {model(x).shape}")
print(f"Params: {sum(p.numel() for p in model.parameters()):,}")
3-Channel CNN with BatchNorm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
torch.manual_seed(0); np.random.seed(0)
# Multi-class classification
X = torch.randn(2000, 3, 32, 32)
y = torch.randint(0, 5, (2000,))
class CNN3(nn.Module):
def __init__(self, n_classes=5):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
nn.AdaptiveAvgPool2d(2),
nn.Flatten(),
nn.Linear(128*4, 256), nn.ReLU(), nn.Dropout(0.4),
nn.Linear(256, n_classes)
)
def forward(self, x): return self.net(x)
model = CNN3()
loader = DataLoader(TensorDataset(X[:200], y[:200]), batch_size=32, shuffle=True)
opt = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()
for epoch in range(3):
total_loss = 0
for xb, yb in loader:
opt.zero_grad()
loss = loss_fn(model(xb), yb)
loss.backward(); opt.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: loss={total_loss/len(loader):.4f}")
Transfer Learning: Frozen Backbone
import torch
import torch.nn as nn
import numpy as np
# Transfer learning simulation with pretrained-like frozen backbone
class FrozenBackbone(nn.Module):
def __init__(self):
super().__init__()
# Simulated frozen backbone (fixed weights)
self.backbone = nn.Sequential(
nn.Conv2d(3, 64, 3, padding=1), nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
nn.AdaptiveAvgPool2d(4),
)
# Freeze backbone
for p in self.backbone.parameters():
p.requires_grad = False
# Trainable head
self.head = nn.Sequential(
nn.Flatten(),
nn.Linear(128*16, 256), nn.ReLU(), nn.Dropout(0.3),
nn.Linear(256, 3) # 3 classes
)
def forward(self, x):
with torch.no_grad():
features = self.backbone(x)
return self.head(features)
model = FrozenBackbone()
frozen = sum(p.numel() for p in model.parameters() if not p.requires_grad)
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Frozen params: {frozen:,}")
print(f"Trainable params: {trainable:,}")
x = torch.randn(4, 3, 64, 64)
print(f"Output: {model(x).shape}")
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
torch.manual_seed(7); np.random.seed(7)
# Simulate chest X-ray binary classification (pneumonia vs normal)
n_train, n_val = 800, 200
X_tr = torch.randn(n_train, 1, 64, 64)
y_tr = torch.randint(0, 2, (n_train,))
X_va = torch.randn(n_val, 1, 64, 64)
y_va = torch.randint(0, 2, (n_val,))
class ChestCNN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(1, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d(4),
nn.Flatten(), nn.Linear(128*16, 256), nn.ReLU(), nn.Dropout(0.5),
nn.Linear(256, 2)
)
def forward(self, x): return self.net(x)
model = ChestCNN()
opt = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
loss_fn = nn.CrossEntropyLoss()
tr_loader = DataLoader(TensorDataset(X_tr, y_tr), batch_size=32, shuffle=True)
for epoch in range(3):
model.train(); tr_loss = 0
for xb, yb in tr_loader:
opt.zero_grad(); loss = loss_fn(model(xb), yb)
loss.backward(); opt.step(); tr_loss += loss.item()
model.eval()
with torch.no_grad():
val_pred = model(X_va).argmax(1)
val_acc = (val_pred == y_va).float().mean()
print(f"Epoch {epoch+1}: loss={tr_loss/len(tr_loader):.4f}, val_acc={val_acc:.4f}")
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
torch.manual_seed(0); np.random.seed(0)
# CIFAR-10 like: 3-channel 32x32, 10 classes
X = torch.randn(1000, 3, 32, 32)
y = torch.randint(0, 10, (1000,))
X_val = torch.randn(200, 3, 32, 32)
y_val = torch.randint(0, 10, (200,))
# TODO: Build CNN with at least 3 conv layers + batch norm + dropout
# TODO: Add a residual skip connection in one of the layers
# TODO: Train for 5 epochs, log train loss + val accuracy
# TODO: Report param count (frozen vs trainable)
# TODO: Try learning rate 1e-3 vs 1e-4 and compare convergence
Transformers use self-attention to relate every position to every other position in a sequence. Multi-head attention, positional encoding, and residual connections make them the foundation of modern NLP and vision models.
Transformer Block with Multi-Head Attention
import torch
import torch.nn as nn
import numpy as np
class TransformerBlock(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True, dropout=dropout)
self.ff = nn.Sequential(
nn.Linear(d_model, d_ff), nn.GELU(), nn.Linear(d_ff, d_model)
)
self.ln1 = nn.LayerNorm(d_model)
self.ln2 = nn.LayerNorm(d_model)
self.drop = nn.Dropout(dropout)
def forward(self, x, mask=None):
# Pre-norm formulation
attn_out, weights = self.attn(self.ln1(x), self.ln1(x), self.ln1(x))
x = x + self.drop(attn_out)
x = x + self.drop(self.ff(self.ln2(x)))
return x, weights
d_model, n_heads, d_ff = 64, 4, 256
block = TransformerBlock(d_model, n_heads, d_ff)
x = torch.randn(8, 20, d_model) # batch=8, seq_len=20
out, weights = block(x)
print(f"Output shape: {out.shape}")
print(f"Attn weights: {weights.shape}")
Text Classification Transformer
import torch
import torch.nn as nn
import numpy as np
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=512, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len, d_model)
pos = torch.arange(max_len).unsqueeze(1)
div = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model))
pe[:, 0::2] = torch.sin(pos * div)
pe[:, 1::2] = torch.cos(pos * div)
self.register_buffer("pe", pe.unsqueeze(0))
def forward(self, x):
return self.dropout(x + self.pe[:, :x.size(1)])
class TextTransformer(nn.Module):
def __init__(self, vocab_size=1000, d_model=64, n_heads=4, n_layers=2, n_classes=3):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model, padding_idx=0)
self.pos_enc = PositionalEncoding(d_model)
encoder_layer = nn.TransformerEncoderLayer(d_model, n_heads, dim_feedforward=256,
batch_first=True, norm_first=True)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
self.classifier = nn.Linear(d_model, n_classes)
def forward(self, x, src_key_padding_mask=None):
x = self.pos_enc(self.embed(x))
x = self.encoder(x, src_key_padding_mask=src_key_padding_mask)
return self.classifier(x.mean(dim=1)) # mean pooling
model = TextTransformer()
tokens = torch.randint(1, 1000, (4, 30)) # batch=4, seq_len=30
out = model(tokens)
print(f"Output: {out.shape}, params: {sum(p.numel() for p in model.parameters()):,}")
Training a Mini-Transformer on Text
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
torch.manual_seed(0)
# Text classification with Transformer
vocab_size, d_model, n_classes = 500, 32, 4
class MiniTransformer(nn.Module):
def __init__(self):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model, padding_idx=0)
layer = nn.TransformerEncoderLayer(d_model, 4, 128, batch_first=True, norm_first=True)
self.encoder = nn.TransformerEncoder(layer, num_layers=2)
self.head = nn.Linear(d_model, n_classes)
def forward(self, x):
mask = (x == 0)
return self.head(self.encoder(self.embed(x), src_key_padding_mask=mask).mean(1))
model = MiniTransformer()
X = torch.randint(0, vocab_size, (200, 20))
X[:, 15:] = 0 # padding
y = torch.randint(0, n_classes, (200,))
loader = torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(X, y), batch_size=32, shuffle=True)
opt = optim.Adam(model.parameters(), lr=3e-4)
for epoch in range(5):
total = 0
for xb, yb in loader:
opt.zero_grad(); loss = nn.CrossEntropyLoss()(model(xb), yb)
loss.backward(); opt.step(); total += loss.item()
print(f"Epoch {epoch+1}: loss={total/len(loader):.4f}")
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
torch.manual_seed(42)
# Sentiment analysis with Transformer on simulated review data
vocab_size = 2000
class SentimentTransformer(nn.Module):
def __init__(self, d=64, h=4, layers=2):
super().__init__()
self.embed = nn.Embedding(vocab_size, d, padding_idx=0)
enc_layer = nn.TransformerEncoderLayer(d, h, d*4, batch_first=True, norm_first=True, dropout=0.1)
self.encoder = nn.TransformerEncoder(enc_layer, num_layers=layers)
self.head = nn.Sequential(nn.Linear(d, 32), nn.ReLU(), nn.Linear(32, 2))
def forward(self, x):
pad_mask = (x == 0)
z = self.encoder(self.embed(x), src_key_padding_mask=pad_mask)
return self.head(z.mean(dim=1))
model = SentimentTransformer()
n = 500
X = torch.randint(1, vocab_size, (n, 40))
X[:, 35:] = 0 # simulate padding
y = torch.randint(0, 2, (n,))
X_val, y_val = X[:100], y[:100]
X_tr, y_tr = X[100:], y[100:]
loader = torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(X_tr, y_tr), batch_size=32, shuffle=True)
opt = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
for epoch in range(5):
model.train(); tr_loss = 0
for xb, yb in loader:
opt.zero_grad(); loss = nn.CrossEntropyLoss()(model(xb), yb)
loss.backward(); opt.step(); tr_loss += loss.item()
model.eval()
with torch.no_grad():
val_acc = (model(X_val).argmax(1) == y_val).float().mean()
print(f"Epoch {epoch+1}: loss={tr_loss/len(loader):.4f}, val_acc={val_acc:.4f}")
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
torch.manual_seed(3)
vocab_size = 1000
# Simulated multi-class text classification (4 categories)
X = torch.randint(1, vocab_size, (600, 50))
X[:, 45:] = 0 # padding
y = torch.randint(0, 4, (600,))
X_val, y_val = X[:100], y[:100]
X_tr, y_tr = X[100:], y[100:]
# TODO: Build TransformerEncoder with 3 layers, d_model=64, n_heads=4
# TODO: Add positional encoding (sinusoidal)
# TODO: Use mean pooling over sequence before classification head
# TODO: Train with AdamW + cosine LR schedule for 8 epochs
# TODO: Report train loss and val accuracy each epoch
# TODO: Print total parameter count
Variational Autoencoders learn a compressed latent distribution and can generate new samples. GANs pit a generator against a discriminator in an adversarial game. Both are used for data augmentation, anomaly detection, and synthetic data generation.
Variational Autoencoder (VAE)
import torch
import torch.nn as nn
import numpy as np
# Variational Autoencoder (VAE)
class VAE(nn.Module):
def __init__(self, input_dim=784, hidden=256, latent=16):
super().__init__()
self.encoder = nn.Sequential(nn.Linear(input_dim, hidden), nn.ReLU())
self.mu_layer = nn.Linear(hidden, latent)
self.log_var_layer = nn.Linear(hidden, latent)
self.decoder = nn.Sequential(
nn.Linear(latent, hidden), nn.ReLU(),
nn.Linear(hidden, input_dim), nn.Sigmoid()
)
def encode(self, x):
h = self.encoder(x)
return self.mu_layer(h), self.log_var_layer(h)
def reparameterize(self, mu, log_var):
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x):
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
return self.decoder(z), mu, log_var
vae = VAE()
x = torch.randn(32, 784).clamp(0, 1)
recon, mu, log_var = vae(x)
recon_loss = nn.functional.binary_cross_entropy(recon, x, reduction="sum")
kl_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
loss = recon_loss + kl_loss
print(f"Recon loss: {recon_loss.item():.2f}, KL loss: {kl_loss.item():.2f}")
print(f"Latent shape: {mu.shape}")
Generative Adversarial Network (GAN)
import torch
import torch.nn as nn
import numpy as np
# Simple GAN for 1D distribution
class Generator(nn.Module):
def __init__(self, z_dim=8, out_dim=1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(z_dim, 32), nn.LeakyReLU(0.2),
nn.Linear(32, 64), nn.LeakyReLU(0.2),
nn.Linear(64, out_dim)
)
def forward(self, z): return self.net(z)
class Discriminator(nn.Module):
def __init__(self, in_dim=1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_dim, 64), nn.LeakyReLU(0.2),
nn.Linear(64, 32), nn.LeakyReLU(0.2),
nn.Linear(32, 1), nn.Sigmoid()
)
def forward(self, x): return self.net(x)
torch.manual_seed(0)
G, D = Generator(), Discriminator()
G_opt = torch.optim.Adam(G.parameters(), lr=2e-4, betas=(0.5, 0.999))
D_opt = torch.optim.Adam(D.parameters(), lr=2e-4, betas=(0.5, 0.999))
bce = nn.BCELoss()
# Target: N(3, 0.5) distribution
for step in range(300):
real = torch.randn(64, 1) * 0.5 + 3.0
z = torch.randn(64, 8)
fake = G(z)
d_loss = bce(D(real), torch.ones(64,1)) + bce(D(fake.detach()), torch.zeros(64,1))
D_opt.zero_grad(); d_loss.backward(); D_opt.step()
g_loss = bce(D(G(torch.randn(64,8))), torch.ones(64,1))
G_opt.zero_grad(); g_loss.backward(); G_opt.step()
with torch.no_grad():
samples = G(torch.randn(1000, 8)).squeeze()
print(f"Generated: mean={samples.mean():.3f}, std={samples.std():.3f}")
print(f"Target: mean=3.000, std=0.500")
Autoencoder for Anomaly Detection
import torch
import torch.nn as nn
import numpy as np
# Autoencoder for anomaly detection
class Autoencoder(nn.Module):
def __init__(self, input_dim=20, bottleneck=4):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 12), nn.ReLU(),
nn.Linear(12, bottleneck), nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(bottleneck, 12), nn.ReLU(),
nn.Linear(12, input_dim)
)
def forward(self, x):
return self.decoder(self.encoder(x))
torch.manual_seed(5)
ae = Autoencoder()
# Train on normal data
X_normal = torch.randn(500, 20)
opt = torch.optim.Adam(ae.parameters(), lr=1e-3)
loader = torch.utils.data.DataLoader(X_normal, batch_size=32, shuffle=True)
for epoch in range(20):
for xb in loader:
recon = ae(xb)
loss = nn.MSELoss()(recon, xb)
opt.zero_grad(); loss.backward(); opt.step()
# Anomaly detection
X_test_normal = torch.randn(50, 20)
X_test_anomaly = torch.randn(10, 20) * 3 # out-of-distribution
X_test = torch.cat([X_test_normal, X_test_anomaly])
labels = torch.cat([torch.zeros(50), torch.ones(10)])
with torch.no_grad():
errors = ((ae(X_test) - X_test)**2).mean(dim=1)
threshold = errors[:50].mean() + 2*errors[:50].std()
preds = (errors > threshold).float()
accuracy = (preds == labels).float().mean()
print(f"Anomaly threshold: {threshold:.4f}")
print(f"Detection accuracy: {accuracy:.4f}")
import torch
import torch.nn as nn
import numpy as np
torch.manual_seed(42)
# VAE for anomaly detection in manufacturing sensor data
input_dim, latent_dim = 15, 4
class SensorVAE(nn.Module):
def __init__(self):
super().__init__()
self.enc = nn.Sequential(nn.Linear(input_dim, 32), nn.ELU())
self.mu = nn.Linear(32, latent_dim)
self.lv = nn.Linear(32, latent_dim)
self.dec = nn.Sequential(
nn.Linear(latent_dim, 32), nn.ELU(),
nn.Linear(32, input_dim)
)
def forward(self, x):
h = self.enc(x)
mu, lv = self.mu(h), self.lv(h)
z = mu + torch.exp(0.5*lv) * torch.randn_like(mu)
return self.dec(z), mu, lv
vae = SensorVAE()
X_normal = torch.randn(1000, input_dim)
loader = torch.utils.data.DataLoader(X_normal, batch_size=64, shuffle=True)
opt = torch.optim.Adam(vae.parameters(), lr=1e-3)
for epoch in range(15):
total = 0
for xb in loader:
recon, mu, lv = vae(xb)
recon_loss = nn.MSELoss(reduction="sum")(recon, xb)
kl = -0.5 * torch.sum(1 + lv - mu.pow(2) - lv.exp())
loss = recon_loss + 0.1 * kl
opt.zero_grad(); loss.backward(); opt.step()
total += loss.item()
if epoch % 5 == 4:
print(f"Epoch {epoch+1}: loss={total/len(loader):.2f}")
vae.eval()
X_test_norm = torch.randn(100, input_dim)
X_test_anom = torch.randn(20, input_dim) * 4
X_all = torch.cat([X_test_norm, X_test_anom])
true_labels = torch.cat([torch.zeros(100), torch.ones(20)])
with torch.no_grad():
recon_all, _, _ = vae(X_all)
errors = ((recon_all - X_all)**2).mean(dim=1)
threshold = errors[:100].mean() + 3*errors[:100].std()
pred = (errors > threshold).float()
precision = (pred * true_labels).sum() / pred.sum()
recall = (pred * true_labels).sum() / true_labels.sum()
print(f"Precision: {precision:.3f}, Recall: {recall:.3f}")
import torch
import torch.nn as nn
import numpy as np
torch.manual_seed(7)
input_dim = 10
# Generate: 80% normal data, 20% anomalies (higher variance)
X_normal = torch.randn(800, input_dim)
X_anomaly = torch.randn(200, input_dim) * 3
labels = torch.cat([torch.zeros(800), torch.ones(200)])
# Shuffle
perm = torch.randperm(1000)
X_all = torch.cat([X_normal, X_anomaly])[perm]
y_all = labels[perm]
X_tr = X_all[:700] # train on mostly normal (won't know ground truth)
# TODO: Build VAE (encoder->mu/logvar, decoder) with bottleneck=3
# TODO: Train for 20 epochs on X_tr
# TODO: Compute reconstruction error on full dataset
# TODO: Choose threshold as mean + 2*std of training errors
# TODO: Report precision, recall, F1 for anomaly detection
LSTMs and GRUs solve the vanishing gradient problem for long sequences using gating mechanisms. LSTMs have separate cell/hidden states; GRUs merge them into a single hidden state for fewer parameters.
LSTM Time Series Forecasting
import torch
import torch.nn as nn
import numpy as np
torch.manual_seed(42); np.random.seed(42)
t = np.linspace(0, 8*np.pi, 500)
signal = np.sin(t) + 0.1*np.random.randn(len(t))
def make_sequences(data, seq_len=20):
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len])
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
X, y = make_sequences(signal)
split = int(len(X)*0.8)
X_tr = torch.tensor(X[:split]).unsqueeze(-1)
X_te = torch.tensor(X[split:]).unsqueeze(-1)
y_tr = torch.tensor(y[:split]).unsqueeze(-1)
y_te = torch.tensor(y[split:]).unsqueeze(-1)
class LSTMForecaster(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(1, 32, 2, batch_first=True, dropout=0.2)
self.fc = nn.Linear(32, 1)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(out[:, -1, :])
model = LSTMForecaster()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.MSELoss()
for epoch in range(50):
model.train()
loss = crit(model(X_tr), y_tr)
opt.zero_grad(); loss.backward(); opt.step()
if (epoch+1) % 10 == 0:
model.eval()
with torch.no_grad():
vl = crit(model(X_te), y_te).item()
print(f"Epoch {epoch+1}: train={loss.item():.4f}, val={vl:.4f}")
GRU Sentiment Classifier
import torch
import torch.nn as nn
torch.manual_seed(42)
VOCAB = list("abcdefghijklmnopqrstuvwxyz ")
char2idx = {c: i+1 for i, c in enumerate(VOCAB)}
MAX = 40
def encode(text):
enc = [char2idx.get(c, 0) for c in text.lower()[:MAX]]
return enc + [0]*(MAX-len(enc))
texts = ["great product love it", "terrible quality broke",
"amazing fast shipping", "waste of money poor",
"highly recommend excellent", "disappointed does not work"]
labels = [1, 0, 1, 0, 1, 0]
X = torch.tensor([encode(t) for t in texts], dtype=torch.long)
y = torch.tensor(labels, dtype=torch.float)
class GRUClassifier(nn.Module):
def __init__(self):
super().__init__()
self.embed = nn.Embedding(len(VOCAB)+2, 16, padding_idx=0)
self.gru = nn.GRU(16, 32, batch_first=True, bidirectional=True)
self.fc = nn.Linear(64, 1)
def forward(self, x):
x = self.embed(x)
_, h = self.gru(x)
h = torch.cat([h[-2], h[-1]], dim=-1)
return torch.sigmoid(self.fc(h)).squeeze()
model = GRUClassifier()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.BCELoss()
for _ in range(100):
loss = crit(model(X), y)
opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
preds = (model(X) > 0.5).int().tolist()
acc = sum(p==l for p,l in zip(preds, labels))/len(labels)
print(f"GRU Accuracy: {acc:.2f}")
for t, p, l in zip(texts, preds, labels):
print(f" [{'OK' if p==l else 'X'}] {t}: pred={p}")
LSTM vs GRU Performance Comparison
import torch
import torch.nn as nn
import time
import numpy as np
torch.manual_seed(42)
# Compare LSTM vs GRU on same task
X = torch.randn(64, 50, 10) # batch=64, seq=50, features=10
y = torch.randint(0, 2, (64,)).float()
class RNNModel(nn.Module):
def __init__(self, cell='lstm', hidden=64):
super().__init__()
if cell == 'lstm':
self.rnn = nn.LSTM(10, hidden, 2, batch_first=True)
else:
self.rnn = nn.GRU(10, hidden, 2, batch_first=True)
self.fc = nn.Linear(hidden, 1)
def forward(self, x):
out, _ = self.rnn(x)
return torch.sigmoid(self.fc(out[:, -1])).squeeze()
results = {}
for cell in ['lstm', 'gru']:
model = RNNModel(cell=cell)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.BCELoss()
n_params = sum(p.numel() for p in model.parameters())
start = time.time()
for _ in range(100):
loss = crit(model(X), y)
opt.zero_grad(); loss.backward(); opt.step()
elapsed = time.time() - start
acc = ((model(X) > 0.5).float() == y).float().mean().item()
results[cell] = {'params': n_params, 'time': elapsed, 'acc': acc, 'loss': loss.item()}
print("LSTM vs GRU Comparison:")
for cell, r in results.items():
print(f" {cell.upper()}: params={r['params']:,}, time={r['time']:.2f}s, "
f"acc={r['acc']:.4f}, loss={r['loss']:.4f}")
print("\nConclusion: GRU has fewer params and is faster; LSTM often better for long deps")
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42); np.random.seed(42)
n = 500
prices = np.cumsum(np.random.randn(n)*0.5) + 100
prices = (prices - prices.mean()) / prices.std()
def make_data(prices, seq=60):
X, y = [], []
for i in range(len(prices)-seq-1):
X.append(prices[i:i+seq])
y.append(1 if prices[i+seq] > prices[i+seq-1] else 0)
return (torch.tensor(np.array(X), dtype=torch.float32).unsqueeze(-1),
torch.tensor(y, dtype=torch.float32))
X, y = make_data(prices)
sp = int(len(X)*0.8)
X_tr, X_te, y_tr, y_te = X[:sp], X[sp:], y[:sp], y[sp:]
for cell, RNN in [('LSTM', nn.LSTM), ('GRU', nn.GRU)]:
class Model(nn.Module):
def __init__(self):
super().__init__()
self.rnn = RNN(1, 64, 2, batch_first=True, dropout=0.2)
self.fc = nn.Sequential(nn.Linear(64, 16), nn.ReLU(), nn.Linear(16, 1))
def forward(self, x):
out, _ = self.rnn(x)
return torch.sigmoid(self.fc(out[:, -1])).squeeze()
model = Model()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.BCELoss()
for epoch in range(60):
model.train()
loss = crit(model(X_tr), y_tr)
opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
acc = ((model(X_te)>0.5).float()==y_te).float().mean().item()
print(f"{cell}: val_acc={acc:.4f}")
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42); np.random.seed(42)
# 1. Generate 400-day synthetic price series (random walk)
# 2. Create 30-day windows with direction labels (up=1, down=0)
# 3. Build bidirectional GRU classifier
# 4. Build standard GRU classifier
# 5. Train both 50 epochs, compare val_acc
Transfer learning reuses pretrained model weights, dramatically reducing training time and data requirements. Strategies include feature extraction (frozen backbone), gradual unfreezing, and layer-wise learning rates.
Feature Extraction with Frozen Backbone
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
torch.manual_seed(42)
class PretrainedBackbone(nn.Module):
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, 32), nn.ReLU(),
)
for p in self.parameters():
p.requires_grad = False # Freeze
def forward(self, x): return self.features(x)
class TransferModel(nn.Module):
def __init__(self, n_classes=4):
super().__init__()
self.backbone = PretrainedBackbone()
self.head = nn.Sequential(
nn.Linear(32, 16), nn.ReLU(), nn.Dropout(0.3),
nn.Linear(16, n_classes)
)
def forward(self, x): return self.head(self.backbone(x))
model = TransferModel()
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
print(f"Trainable: {trainable}/{total} ({trainable/total*100:.1f}%)")
X = torch.randn(200, 128); y = torch.randint(0, 4, (200,))
loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)
opt = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
crit = nn.CrossEntropyLoss()
for epoch in range(15):
for xb, yb in loader:
loss = crit(model(xb), yb)
opt.zero_grad(); loss.backward(); opt.step()
if (epoch+1) % 5 == 0:
model.eval()
with torch.no_grad():
acc = (model(X).argmax(1)==y).float().mean().item()
model.train()
print(f"Epoch {epoch+1}: loss={loss.item():.4f}, acc={acc:.4f}")
Gradual Unfreezing with Layer-wise LRs
import torch
import torch.nn as nn
torch.manual_seed(42)
class PretrainedModel(nn.Module):
def __init__(self):
super().__init__()
self.layer1 = nn.Linear(64, 32)
self.layer2 = nn.Linear(32, 16)
self.head = nn.Linear(16, 3)
self.relu = nn.ReLU()
for layer in [self.layer1, self.layer2]:
for p in layer.parameters():
p.requires_grad = False
def forward(self, x):
return self.head(self.relu(self.layer2(self.relu(self.layer1(x)))))
X = torch.randn(200, 64); y = torch.randint(0, 3, (200,))
crit = nn.CrossEntropyLoss()
model = PretrainedModel()
# Phase 1: head only
opt = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
for _ in range(20):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
print(f"Phase 1 (head only): {loss.item():.4f}")
# Phase 2: unfreeze layer2 with smaller LR
for p in model.layer2.parameters(): p.requires_grad = True
opt = torch.optim.Adam([
{"params": model.layer2.parameters(), "lr": 1e-4},
{"params": model.head.parameters(), "lr": 1e-3},
])
for _ in range(20):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
print(f"Phase 2 (+layer2): {loss.item():.4f}")
# Phase 3: unfreeze all
for p in model.layer1.parameters(): p.requires_grad = True
opt = torch.optim.Adam([
{"params": model.layer1.parameters(), "lr": 1e-5},
{"params": model.layer2.parameters(), "lr": 1e-4},
{"params": model.head.parameters(), "lr": 1e-3},
])
for _ in range(30):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
acc = (model(X).argmax(1)==y).float().mean().item()
print(f"Phase 3 (all): loss={loss.item():.4f}, acc={acc:.4f}")
Transfer Learning Diagnostics
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
class Encoder(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(50, 32)
self.fc2 = nn.Linear(32, 16)
self.relu = nn.ReLU()
def forward(self, x): return self.relu(self.fc2(self.relu(self.fc1(x))))
class TargetModel(nn.Module):
def __init__(self, freeze=True):
super().__init__()
self.encoder = Encoder()
self.head = nn.Linear(16, 2)
if freeze:
for p in self.encoder.parameters(): p.requires_grad = False
def forward(self, x): return self.head(self.encoder(x))
X_src = torch.randn(500, 50); y_src = torch.randint(0, 2, (500,))
X_tgt = torch.randn(50, 50); y_tgt = torch.randint(0, 2, (50,)) # small target
crit = nn.CrossEntropyLoss()
for mode in ["frozen", "full_finetune"]:
model = TargetModel(freeze=(mode=="frozen"))
opt = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
for _ in range(50):
loss = crit(model(X_tgt), y_tgt); opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
src_acc = (model(X_src).argmax(1)==y_src).float().mean().item()
tgt_acc = (model(X_tgt).argmax(1)==y_tgt).float().mean().item()
print(f"{mode:>16}: target_acc={tgt_acc:.4f}, source_acc={src_acc:.4f}")
print("\nTip: Frozen backbone works best with limited target data (< 500 samples)")
print("Tip: Full fine-tuning risks catastrophic forgetting on small datasets")
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42); np.random.seed(42)
class Encoder(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(100, 64), nn.ReLU(), nn.Linear(64, 32), nn.ReLU())
def forward(self, x): return self.layers(x)
class MedModel(nn.Module):
def __init__(self):
super().__init__()
self.encoder = Encoder()
self.head = nn.Sequential(nn.Linear(32, 8), nn.ReLU(), nn.Linear(8, 1))
for p in self.encoder.parameters(): p.requires_grad = False
def forward(self, x): return torch.sigmoid(self.head(self.encoder(x))).squeeze()
X = torch.randn(300, 100); y = torch.randint(0, 2, (300,)).float()
Xv = torch.randn(100, 100); yv = torch.randint(0, 2, (100,)).float()
crit = nn.BCELoss()
model = MedModel()
for phase, lr_enc, lr_head, unfreeze in [
("Phase 1: head only", None, 1e-3, False),
("Phase 2: full tune", 1e-5, 1e-4, True),
]:
if unfreeze:
for p in model.encoder.parameters(): p.requires_grad = True
opt = torch.optim.Adam([
{"params": model.encoder.parameters(), "lr": lr_enc},
{"params": model.head.parameters(), "lr": lr_head},
])
else:
opt = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=lr_head)
for _ in range(40):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
va = ((model(Xv)>0.5).float()==yv).float().mean().item()
model.train()
print(f"{phase}: val_acc={va:.4f}")
import torch, torch.nn as nn
torch.manual_seed(42)
# 1. Build PretrainedEncoder (freeze all layers)
# 2. Add new classification head
# 3. Phase 1: train head only for 20 epochs
# 4. Phase 2: unfreeze encoder, use lr=1e-5 for encoder, lr=1e-3 for head
# 5. Report val_acc after each phase
X = torch.randn(100, 32); y = torch.randint(0, 3, (100,))
Attention allows models to focus on relevant parts of input. Scaled dot-product attention computes query-key similarity, softmax-normalizes scores, then aggregates values. Multi-head attention runs this in parallel across multiple representation subspaces.
Scaled Dot-Product Attention
import torch
import torch.nn as nn
torch.manual_seed(42)
def sdp_attention(Q, K, V, mask=None):
d_k = Q.size(-1)
scores = torch.matmul(Q, K.transpose(-2, -1)) / (d_k**0.5)
if mask is not None:
scores = scores.masked_fill(mask==0, float('-inf'))
weights = torch.softmax(scores, dim=-1)
return torch.matmul(weights, V), weights
class MultiHeadAttention(nn.Module):
def __init__(self, d_model=64, n_heads=4):
super().__init__()
self.d_k = d_model // n_heads
self.n_heads = n_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def split(self, x):
B, T, D = x.size()
return x.view(B, T, self.n_heads, self.d_k).transpose(1, 2)
def forward(self, Q, K, V):
B = Q.size(0)
Q, K, V = self.split(self.W_q(Q)), self.split(self.W_k(K)), self.split(self.W_v(V))
out, attn = sdp_attention(Q, K, V)
out = out.transpose(1,2).contiguous().view(B, -1, self.n_heads*self.d_k)
return self.W_o(out), attn
B, T, D = 2, 10, 64
x = torch.randn(B, T, D)
mha = MultiHeadAttention(d_model=64, n_heads=4)
out, attn = mha(x, x, x)
print(f"Input: {x.shape} -> Output: {out.shape}")
print(f"Attention weights: {attn.shape}")
print(f"Attention sums to 1: {attn[0,0,0].sum().item():.4f}")
Transformer Encoder Block
import torch, torch.nn as nn, math
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=1000):
super().__init__()
pe = torch.zeros(max_len, d_model)
pos = torch.arange(0, max_len).unsqueeze(1).float()
div = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000.0)/d_model))
pe[:, 0::2] = torch.sin(pos*div)
pe[:, 1::2] = torch.cos(pos*div)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x): return x + self.pe[:, :x.size(1)]
class TransformerBlock(nn.Module):
def __init__(self, d=64, n_heads=4, ff=256, drop=0.1):
super().__init__()
self.attn = nn.MultiheadAttention(d, n_heads, dropout=drop, batch_first=True)
self.ff = nn.Sequential(nn.Linear(d, ff), nn.GELU(), nn.Dropout(drop), nn.Linear(ff, d))
self.n1, self.n2 = nn.LayerNorm(d), nn.LayerNorm(d)
self.drop = nn.Dropout(drop)
def forward(self, x):
a, _ = self.attn(x, x, x)
x = self.n1(x + self.drop(a))
return self.n2(x + self.drop(self.ff(x)))
class TransformerClassifier(nn.Module):
def __init__(self, vocab=100, d=64, n_heads=4, n_layers=2, n_classes=3):
super().__init__()
self.embed = nn.Embedding(vocab, d, padding_idx=0)
self.pos = PositionalEncoding(d)
self.layers = nn.Sequential(*[TransformerBlock(d, n_heads) for _ in range(n_layers)])
self.head = nn.Linear(d, n_classes)
def forward(self, x): return self.head(self.layers(self.pos(self.embed(x))).mean(1))
torch.manual_seed(42)
model = TransformerClassifier()
x = torch.randint(1, 100, (4, 20)); y = torch.randint(0, 3, (4,))
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
for i in range(30):
loss = nn.CrossEntropyLoss()(model(x), y)
opt.zero_grad(); loss.backward(); opt.step()
print(f"Transformer: {sum(p.numel() for p in model.parameters()):,} params")
print(f"After 30 steps: loss={loss.item():.4f}, preds={model(x).argmax(1).tolist()}")
Attention Visualization
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
def sdp_attention(Q, K, V):
d_k = Q.size(-1)
scores = torch.matmul(Q, K.transpose(-2,-1)) / (d_k**0.5)
weights = torch.softmax(scores, dim=-1)
return torch.matmul(weights, V), weights
# Simple sequence: "the cat sat on the mat"
tokens = ["the", "cat", "sat", "on", "the", "mat"]
d_model = 8
x = torch.randn(1, len(tokens), d_model)
# Single-head attention
W_q = nn.Linear(d_model, d_model, bias=False)
W_k = nn.Linear(d_model, d_model, bias=False)
W_v = nn.Linear(d_model, d_model, bias=False)
Q, K, V = W_q(x), W_k(x), W_v(x)
out, attn = sdp_attention(Q, K, V)
attn_matrix = attn[0].detach().numpy()
print("Attention weights matrix (rows=query, cols=key):")
print("Tokens:", tokens)
header = " " + "".join(f"{t:>6}" for t in tokens)
print(header)
for i, row in enumerate(attn_matrix):
row_str = f"{tokens[i]:>6}" + "".join(f"{v:>6.3f}" for v in row)
print(row_str)
# Check: each row sums to 1
print(f"\nRow sums: {attn_matrix.sum(axis=1).round(4)}")
print(f"Diagonal (self-attention strength): {attn_matrix.diagonal().round(4)}")
import torch, torch.nn as nn, numpy as np, math
torch.manual_seed(42)
class TFEncoder(nn.Module):
def __init__(self, vocab=50, d=32, heads=4, layers=2, classes=4):
super().__init__()
self.embed = nn.Embedding(vocab, d, padding_idx=0)
self.enc = nn.TransformerEncoder(
nn.TransformerEncoderLayer(d, heads, dim_feedforward=64, dropout=0.1, batch_first=True),
num_layers=layers)
self.head = nn.Linear(d, classes)
def forward(self, x): return self.head(self.enc(self.embed(x)).mean(1))
X = torch.randint(1, 50, (64, 20))
y = torch.randint(0, 4, (64,))
model = TFEncoder()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.CrossEntropyLoss()
for epoch in range(50):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
if (epoch+1) % 10 == 0:
acc = (model(X).argmax(1)==y).float().mean().item()
print(f"Epoch {epoch+1}: loss={loss.item():.4f}, acc={acc:.4f}")
import torch, torch.nn as nn
def sdp_attention(Q, K, V, mask=None):
# 1. Compute attention scores: Q @ K.T / sqrt(d_k)
# 2. Apply mask if provided
# 3. Softmax over last dimension
# 4. Return weighted sum of V, and the attention weights
pass
B, T, D = 4, 8, 16
x = torch.randn(B, T, D)
out, weights = sdp_attention(x, x, x)
# Assert: weights.sum(dim=-1).allclose(torch.ones(B, T))
BatchNorm stabilizes training by normalizing layer inputs per batch. Dropout randomly zeroes activations during training. Weight decay (L2) penalizes large weights. LayerNorm is preferred in Transformers.
BatchNorm, Dropout & Weight Decay Comparison
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42); np.random.seed(42)
class PlainNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(40, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1))
def forward(self, x): return self.net(x).squeeze()
class RegNet(nn.Module):
def __init__(self, dropout=0.3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(40, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(dropout),
nn.Linear(128, 64), nn.BatchNorm1d(64), nn.ReLU(), nn.Dropout(dropout),
nn.Linear(64, 1))
def forward(self, x): return self.net(x).squeeze()
X_tr = torch.randn(100, 40); y_tr = (X_tr[:, 0] > 0).float()
X_va = torch.randn(500, 40); y_va = (X_va[:, 0] > 0).float()
crit = nn.BCEWithLogitsLoss()
def train(model, n=100, wd=0.0):
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=wd)
for _ in range(n):
model.train()
loss = crit(model(X_tr), y_tr); opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
tr_acc = ((model(X_tr)>0).float()==y_tr).float().mean().item()
va_acc = ((model(X_va)>0).float()==y_va).float().mean().item()
return tr_acc, va_acc
for name, model, wd in [
("No regularization", PlainNet(), 0),
("BN + Dropout(0.3)", RegNet(0.3), 0),
("BN + Drop + L2", RegNet(0.3), 1e-4),
]:
tr, va = train(model, n=100, wd=wd)
print(f"{name:<22}: train={tr:.4f}, val={va:.4f}, overfit={tr-va:.4f}")
LayerNorm vs BatchNorm
import torch, torch.nn as nn
torch.manual_seed(42)
B, T, D = 4, 10, 32
x = torch.randn(B, T, D)
# BatchNorm (normalizes over N,L per channel D)
bn = nn.BatchNorm1d(D)
x2d = x.view(-1, D)
out_bn = bn(x2d).view(B, T, D)
# LayerNorm (normalizes over D per token position)
ln = nn.LayerNorm(D)
out_ln = ln(x)
print(f"Input: mean={x.mean():.4f}, std={x.std():.4f}")
print(f"BatchNorm: mean={out_bn.mean():.6f}, std={out_bn.std():.4f}")
print(f"LayerNorm: mean={out_ln.mean():.6f}, std={out_ln.std():.4f}")
# Verify LayerNorm normalizes per token
for pos in [0, 3, 7]:
m = out_ln[0, pos].mean().item()
s = out_ln[0, pos].std().item()
print(f" LayerNorm pos {pos}: mean={m:.6f}, std={s:.4f}")
guide = {
"BatchNorm": "CNNs, fixed-length, large batches (N >= 16)",
"LayerNorm": "Transformers, NLP, variable-length sequences",
"GroupNorm": "Small batches (N < 8), object detection",
"InstanceNorm": "Style transfer, per-sample normalization",
}
print("\nNormalization Guide:")
for k, v in guide.items():
print(f" {k:<14}: {v}")
Dropout Modes & Inference
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
# Critical: model.eval() disables dropout and uses running BN stats
class NetWithDropout(nn.Module):
def __init__(self, dropout=0.5):
super().__init__()
self.fc1 = nn.Linear(20, 64)
self.bn = nn.BatchNorm1d(64)
self.drop = nn.Dropout(dropout)
self.fc2 = nn.Linear(64, 1)
def forward(self, x):
return self.fc2(self.drop(torch.relu(self.bn(self.fc1(x))))).squeeze()
model = NetWithDropout(dropout=0.5)
x = torch.randn(10, 20)
# Demonstrate train vs eval mode difference
model.train()
out_train1 = model(x).detach()
out_train2 = model(x).detach()
model.eval()
out_eval1 = model(x).detach()
out_eval2 = model(x).detach()
print("Train mode outputs (different each call due to dropout):")
print(f" Call 1: {out_train1[:5].numpy().round(3)}")
print(f" Call 2: {out_train2[:5].numpy().round(3)}")
print(f" Same: {torch.allclose(out_train1, out_train2)}")
print("\nEval mode outputs (deterministic):")
print(f" Call 1: {out_eval1[:5].numpy().round(3)}")
print(f" Call 2: {out_eval2[:5].numpy().round(3)}")
print(f" Same: {torch.allclose(out_eval1, out_eval2)}")
print("\nKey rule: ALWAYS call model.eval() before inference!")
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42); np.random.seed(42)
class UnstableNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(50, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 1))
def forward(self, x): return self.net(x).squeeze()
class StableNet(nn.Module):
def __init__(self, dropout=0.4):
super().__init__()
self.net = nn.Sequential(
nn.Linear(50, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(dropout),
nn.Linear(256, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(dropout),
nn.Linear(128, 1))
def forward(self, x): return self.net(x).squeeze()
X = torch.randn(200, 50)*5; y = (X[:,0] > 0).float()
crit = nn.BCEWithLogitsLoss()
for name, model, wd in [("Unstable", UnstableNet(), 0), ("Stable", StableNet(0.4), 1e-4)]:
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=wd)
losses = []
for epoch in range(50):
model.train()
pred = model(X)
loss = crit(pred, y)
if torch.isnan(loss): losses.append(float('nan')); break
opt.zero_grad(); loss.backward(); opt.step()
losses.append(round(loss.item(), 4))
valid = [l for l in losses if l==l]
print(f"{name}: final={valid[-1]:.4f}, epochs={len(valid)}, diverged={len(valid)<50}")
import torch, torch.nn as nn
torch.manual_seed(42)
X_tr = torch.randn(100, 20); y_tr = (X_tr[:,0]>0).float()
X_va = torch.randn(300, 20); y_va = (X_va[:,0]>0).float()
# 1. Build UnregularizedNet (3 layers, no BN/Dropout)
# 2. Build RegularizedNet (3 layers, BN + Dropout p=0.3)
# 3. Train both 100 epochs with Adam, BCEWithLogitsLoss
# 4. Print train_acc and val_acc for both
Learning rate schedules adapt LR during training. Step decay, cosine annealing, and warmup+cosine are common strategies. ReduceLROnPlateau automatically reduces LR when validation metrics stagnate.
Common LR Schedulers
import torch, torch.nn as nn
torch.manual_seed(42)
def run_scheduler(sched_name, n_epochs=60, base_lr=0.1):
model = nn.Linear(10, 1)
opt = torch.optim.SGD(model.parameters(), lr=base_lr)
if sched_name == "StepLR":
sched = torch.optim.lr_scheduler.StepLR(opt, step_size=15, gamma=0.5)
elif sched_name == "CosineAnnealing":
sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=n_epochs)
elif sched_name == "ExponentialLR":
sched = torch.optim.lr_scheduler.ExponentialLR(opt, gamma=0.95)
else:
sched = torch.optim.lr_scheduler.OneCycleLR(
opt, max_lr=base_lr, total_steps=n_epochs)
lrs = [opt.param_groups[0]['lr']]
for _ in range(n_epochs):
sched.step()
lrs.append(opt.param_groups[0]['lr'])
return lrs
schedules = ["StepLR", "CosineAnnealing", "ExponentialLR", "OneCycleLR"]
checkpoints = [0, 15, 30, 45, 60]
for name in schedules:
lrs = run_scheduler(name)
vals = [f"{lrs[i]:.5f}" for i in checkpoints]
print(f"{name:<20}: {' -> '.join(vals)}")
Warmup + Cosine Decay
import torch, torch.nn as nn, math
class WarmupCosine:
def __init__(self, opt, warmup, total, min_lr=1e-6):
self.opt = opt
self.warmup = warmup
self.total = total
self.min_lr = min_lr
self.base_lr = opt.param_groups[0]['lr']
self.step_n = 0
def step(self):
self.step_n += 1
if self.step_n <= self.warmup:
lr = self.base_lr * self.step_n / self.warmup
else:
p = (self.step_n - self.warmup) / (self.total - self.warmup)
lr = self.min_lr + 0.5*(self.base_lr-self.min_lr)*(1+math.cos(math.pi*p))
for g in self.opt.param_groups: g['lr'] = lr
return lr
torch.manual_seed(42)
model = nn.Linear(10, 1)
opt = torch.optim.AdamW(model.parameters(), lr=1e-3)
sched = WarmupCosine(opt, warmup=10, total=100)
lrs = [sched.step() for _ in range(100)]
print("Warmup + Cosine LR at key steps:")
for s in [1, 5, 10, 25, 50, 75, 100]:
print(f" Step {s:>3}: {lrs[s-1]:.6f}")
# Simulate training
X = torch.randn(100, 10); y = torch.randn(100, 1)
opt2 = torch.optim.AdamW(model.parameters(), lr=1e-3)
sched2 = WarmupCosine(opt2, warmup=10, total=100)
for step in range(100):
loss = nn.MSELoss()(model(X), y)
opt2.zero_grad(); loss.backward(); opt2.step()
sched2.step()
if (step+1) % 25 == 0:
print(f" Step {step+1}: loss={loss.item():.4f}, lr={opt2.param_groups[0]['lr']:.6f}")
ReduceLROnPlateau & LR Finder
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
# ReduceLROnPlateau: auto-reduces LR when metric stagnates
model = nn.Sequential(nn.Linear(20, 64), nn.ReLU(), nn.Linear(64, 1))
opt = torch.optim.Adam(model.parameters(), lr=1e-2)
plateau_sched = torch.optim.lr_scheduler.ReduceLROnPlateau(
opt, mode='min', factor=0.5, patience=5, min_lr=1e-6, verbose=False)
X = torch.randn(200, 20); y = X[:, :1]
X_val = torch.randn(50, 20); y_val = X_val[:, :1]
crit = nn.MSELoss()
lr_history = []
for epoch in range(60):
model.train()
loss = crit(model(X), y)
opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
val_loss = crit(model(X_val), y_val).item()
plateau_sched.step(val_loss)
lr_history.append(opt.param_groups[0]['lr'])
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1}: val_loss={val_loss:.4f}, lr={opt.param_groups[0]['lr']:.6f}")
# Simple LR range test (find optimal LR)
print("\nLR Range Test (exponential sweep):")
model2 = nn.Linear(20, 1)
opt2 = torch.optim.SGD(model2.parameters(), lr=1e-6)
min_lr, max_lr, n_steps = 1e-6, 1e-1, 20
lr_mult = (max_lr/min_lr)**(1/n_steps)
for step in range(n_steps):
lr = min_lr * (lr_mult**step)
for g in opt2.param_groups: g['lr'] = lr
loss = crit(model2(X[:32]), y[:32])
opt2.zero_grad(); loss.backward(); opt2.step()
if step % 5 == 0:
print(f" lr={lr:.2e}: loss={loss.item():.4f}")
import torch, torch.nn as nn, math, numpy as np
torch.manual_seed(42)
def lr_fn(step, warmup=500, total=5000, base=1e-3, min_lr=1e-6):
if step < warmup: return base * step / max(1, warmup)
p = (step - warmup) / (total - warmup)
return min_lr + 0.5*(base-min_lr)*(1+math.cos(math.pi*p))
model1 = nn.Sequential(nn.Linear(32, 64), nn.ReLU(), nn.Linear(64, 1))
model2 = nn.Sequential(nn.Linear(32, 64), nn.ReLU(), nn.Linear(64, 1))
opt1 = torch.optim.AdamW(model1.parameters(), lr=1e-3) # fixed
opt2 = torch.optim.AdamW(model2.parameters(), lr=1e-3) # warmup+cosine
X = torch.randn(200, 32); y = torch.randn(200, 1)
crit = nn.MSELoss()
for step in range(5001):
for g in opt2.param_groups: g['lr'] = lr_fn(step)
l1 = crit(model1(X), y)
l2 = crit(model2(X), y)
for m, l, o in [(model1, l1, opt1), (model2, l2, opt2)]:
o.zero_grad(); l.backward(); o.step()
if step % 1000 == 0:
print(f"Step {step}: fixed_lr={l1.item():.4f}, warmup_cosine={l2.item():.4f}, lr={lr_fn(step):.6f}")
import torch, torch.nn as nn, math
def get_lr(step, warmup=100, total=500, base_lr=1e-3):
# Phase 1: linear warmup
# Phase 2: cosine decay
pass
model = nn.Linear(10, 1)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
X = torch.randn(100, 10); y = torch.randn(100, 1)
# Apply scheduler over 500 steps, print LR at key checkpoints
Checkpointing saves model state during training to recover from crashes and resume. Early stopping halts training when validation loss stops improving, preventing overfitting automatically.
Model Checkpointing
import torch, torch.nn as nn, os
torch.manual_seed(42)
class Net(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(20,64), nn.ReLU(), nn.Linear(64,1))
def forward(self, x): return self.net(x).squeeze()
def save_ckpt(model, opt, epoch, val_loss, path):
torch.save({'epoch': epoch, 'model': model.state_dict(),
'optimizer': opt.state_dict(), 'val_loss': val_loss}, path)
print(f" Saved: epoch={epoch}, val_loss={val_loss:.4f}")
def load_ckpt(model, opt, path):
if not os.path.exists(path): return 0, float('inf')
ckpt = torch.load(path, weights_only=True)
model.load_state_dict(ckpt['model'])
opt.load_state_dict(ckpt['optimizer'])
print(f" Resumed from epoch {ckpt['epoch']}")
return ckpt['epoch'], ckpt['val_loss']
model = Net()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.MSELoss()
X_tr = torch.randn(200,20); y_tr = X_tr[:,0]
X_va = torch.randn(50,20); y_va = X_va[:,0]
best_val, ckpt_path = float('inf'), "/tmp/best.pt"
for epoch in range(30):
model.train()
loss = crit(model(X_tr), y_tr); opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
vl = crit(model(X_va), y_va).item()
if vl < best_val:
best_val = vl; save_ckpt(model, opt, epoch+1, vl, ckpt_path)
# Reload best
model2 = Net(); opt2 = torch.optim.Adam(model2.parameters())
load_ckpt(model2, opt2, ckpt_path)
model2.eval()
with torch.no_grad():
print(f"Loaded model val_loss: {crit(model2(X_va), y_va).item():.4f}")
Early Stopping with Patience
import torch, torch.nn as nn, copy
class EarlyStopping:
def __init__(self, patience=10, min_delta=1e-4):
self.patience = patience
self.min_delta = min_delta
self.best_loss = float('inf')
self.best_weights = None
self.counter = 0
def __call__(self, model, val_loss):
if val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.best_weights = copy.deepcopy(model.state_dict())
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
model.load_state_dict(self.best_weights)
return True
return False
torch.manual_seed(42)
model = nn.Sequential(nn.Linear(10,32), nn.ReLU(), nn.Linear(32,1))
opt = torch.optim.Adam(model.parameters(), lr=1e-2)
crit = nn.MSELoss()
es = EarlyStopping(patience=8, min_delta=1e-3)
X = torch.randn(100,10); y = X[:,0:1]
Xv = torch.randn(50,10); yv = Xv[:,0:1]
for epoch in range(200):
model.train()
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
model.eval()
with torch.no_grad():
vl = crit(model(Xv), yv).item()
if (epoch+1)%20==0:
print(f"Epoch {epoch+1}: val={vl:.4f}, patience_ctr={es.counter}")
if es(model, vl):
print(f"Early stop at epoch {epoch+1}, best_val={es.best_loss:.4f}")
break
Training Manager (Checkpoint + Early Stop)
import torch, torch.nn as nn, copy, os
class TrainingManager:
def __init__(self, model, optimizer, patience=10, ckpt_every=5, save_dir="/tmp"):
self.model = model; self.opt = optimizer
self.patience = patience; self.ckpt_every = ckpt_every
self.save_dir = save_dir; self.best_loss = float('inf')
self.best_state = None; self.counter = 0; self.history = []
def update(self, epoch, train_loss, val_loss):
self.history.append({'epoch': epoch, 'train': train_loss, 'val': val_loss})
if (epoch+1) % self.ckpt_every == 0:
path = os.path.join(self.save_dir, f"ckpt_ep{epoch+1}.pt")
torch.save({'epoch': epoch+1, 'model': self.model.state_dict(), 'val': val_loss}, path)
print(f" [ckpt] ep{epoch+1} saved")
if val_loss < self.best_loss - 1e-4:
self.best_loss = val_loss
self.best_state = copy.deepcopy(self.model.state_dict())
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.model.load_state_dict(self.best_state)
print(f" Early stop ep {epoch+1}, best={self.best_loss:.4f}")
return True
return False
torch.manual_seed(42)
model = nn.Sequential(nn.Linear(20,64), nn.ReLU(), nn.Linear(64,1))
opt = torch.optim.Adam(model.parameters(), lr=5e-3)
mgr = TrainingManager(model, opt, patience=10, ckpt_every=5)
X = torch.randn(200,20); y = X[:,:1]; Xv = torch.randn(50,20); yv = Xv[:,:1]
crit = nn.MSELoss()
for epoch in range(80):
model.train()
l = crit(model(X), y); opt.zero_grad(); l.backward(); opt.step()
model.eval()
with torch.no_grad():
vl = crit(model(Xv), yv).item()
if (epoch+1)%20==0:
print(f"Epoch {epoch+1}: train={l.item():.4f}, val={vl:.4f}")
if mgr.update(epoch, l.item(), vl): break
import torch, torch.nn as nn, copy, os
torch.manual_seed(42)
class TrainingSystem:
def __init__(self, model, opt, patience=15, ckpt_every=5, save_dir="/tmp"):
self.model = model; self.opt = opt
self.patience = patience; self.ckpt_every = ckpt_every; self.save_dir = save_dir
self.best_loss = float('inf'); self.best_state = None; self.counter = 0
def step(self, epoch, train_loss, val_loss):
if (epoch+1) % self.ckpt_every == 0:
torch.save({'epoch': epoch+1, 'val': val_loss, 'model': self.model.state_dict()},
f"{self.save_dir}/ckpt_ep{epoch+1}.pt")
print(f" Checkpoint saved: epoch {epoch+1}")
if val_loss < self.best_loss - 1e-4:
self.best_loss = val_loss; self.counter = 0
self.best_state = copy.deepcopy(self.model.state_dict())
torch.save(self.best_state, f"{self.save_dir}/best.pt")
else:
self.counter += 1
if self.counter >= self.patience:
self.model.load_state_dict(self.best_state)
print(f" Early stop at {epoch+1}, best_val={self.best_loss:.4f}")
return True
return False
model = nn.Sequential(nn.Linear(20,64), nn.ReLU(), nn.Linear(64,1))
opt = torch.optim.Adam(model.parameters(), lr=5e-3)
sys = TrainingSystem(model, opt, patience=15, ckpt_every=5)
X = torch.randn(200,20); y = X[:,:1]; Xv = torch.randn(50,20); yv = Xv[:,:1]
crit = nn.MSELoss()
for epoch in range(100):
model.train()
l = crit(model(X), y); opt.zero_grad(); l.backward(); opt.step()
model.eval()
with torch.no_grad(): vl = crit(model(Xv), yv).item()
if (epoch+1)%20==0: print(f"Epoch {epoch+1}: train={l.item():.4f}, val={vl:.4f}")
if sys.step(epoch, l.item(), vl): break
import torch, torch.nn as nn, copy
class EarlyStopping:
def __init__(self, patience=10, min_delta=1e-3):
self.patience = patience
self.min_delta = min_delta
# TODO: add best_loss, best_weights, counter
pass
def __call__(self, model, val_loss):
# TODO: update counter, save best weights, return True to stop
pass
model = nn.Sequential(nn.Linear(10,32), nn.ReLU(), nn.Linear(32,1))
opt = torch.optim.Adam(model.parameters(), lr=1e-2)
X = torch.randn(100,10); y = X[:,0:1]
Xv = torch.randn(50,10); yv = Xv[:,0:1]
Gradient clipping prevents exploding gradients in deep RNNs and Transformers. Mixed precision (FP16/BF16) training halves memory usage and speeds up training on modern GPUs using GradScaler.
Gradient Clipping
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
class DeepRNN(nn.Module):
def __init__(self):
super().__init__()
self.rnn = nn.RNN(10, 64, 4, batch_first=True)
self.fc = nn.Linear(64, 1)
def forward(self, x):
out, _ = self.rnn(x)
return self.fc(out[:, -1]).squeeze()
def grad_norm(model):
return sum(p.grad.data.norm(2).item()**2 for p in model.parameters()
if p.grad is not None)**0.5
X = torch.randn(16, 50, 10); y = torch.randn(16)
model = DeepRNN()
opt = torch.optim.SGD(model.parameters(), lr=0.1)
crit = nn.MSELoss()
print("Gradient norms (before/after clip):")
for step in range(10):
loss = crit(model(X), y)
opt.zero_grad(); loss.backward()
gn_before = grad_norm(model)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
gn_after = grad_norm(model)
opt.step()
if step < 5:
print(f" Step {step+1}: before={gn_before:.2f}, after={gn_after:.2f} (clipped={gn_before>1.0})")
Mixed Precision Training Pattern
import torch, torch.nn as nn, time
torch.manual_seed(42)
class Model(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(128, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 10))
def forward(self, x): return self.net(x)
X = torch.randn(256, 128); y = torch.randint(0, 10, (256,))
crit = nn.CrossEntropyLoss()
# Standard FP32
model1 = Model()
opt1 = torch.optim.AdamW(model1.parameters(), lr=1e-3)
t0 = time.time()
for _ in range(200):
l = crit(model1(X), y); opt1.zero_grad(); l.backward(); opt1.step()
fp32_t = time.time()-t0
# AMP pattern (works on CPU too, GPU gets real speedup)
model2 = Model()
opt2 = torch.optim.AdamW(model2.parameters(), lr=1e-3)
scaler = torch.amp.GradScaler('cpu', enabled=False)
t0 = time.time()
for _ in range(200):
with torch.amp.autocast('cpu', dtype=torch.float32):
l = crit(model2(X), y)
scaler.scale(l).backward()
scaler.unscale_(opt2)
torch.nn.utils.clip_grad_norm_(model2.parameters(), 1.0)
scaler.step(opt2); scaler.update(); opt2.zero_grad()
amp_t = time.time()-t0
print(f"FP32 time: {fp32_t:.2f}s, AMP pattern time: {amp_t:.2f}s")
print(f"FP32 loss: {crit(model1(X),y).item():.4f}")
print(f"AMP loss: {crit(model2(X),y).item():.4f}")
print("\nAMP Best Practices:")
for tip in ["Use autocast for forward pass only",
"Use GradScaler to prevent FP16 underflow",
"Clip gradients AFTER scaler.unscale_()",
"BF16 more stable than FP16 (Ampere+ GPUs only)"]:
print(f" - {tip}")
Gradient Monitoring & Debugging
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
class MultiLayerNet(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(20, 64), nn.Linear(64, 32), nn.Linear(32, 16), nn.Linear(16, 1)])
self.relu = nn.ReLU()
def forward(self, x):
for layer in self.layers[:-1]:
x = self.relu(layer(x))
return self.layers[-1](x).squeeze()
model = MultiLayerNet()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.MSELoss()
X = torch.randn(64, 20); y = torch.randn(64)
print("Gradient statistics per layer:")
for epoch in [1, 10, 50]:
for _ in range(epoch if epoch==1 else 9):
loss = crit(model(X), y); opt.zero_grad(); loss.backward(); opt.step()
print(f"\nEpoch {epoch}:")
for name, p in model.named_parameters():
if p.grad is not None:
gn = p.grad.norm().item()
wn = p.data.norm().item()
print(f" {name:<25}: grad_norm={gn:.4f}, weight_norm={wn:.4f}, ratio={gn/wn:.4f}")
# Detect vanishing/exploding gradients
print("\nGradient health check:")
for name, p in model.named_parameters():
if p.grad is not None:
gn = p.grad.norm().item()
status = "EXPLODING" if gn > 10 else ("VANISHING" if gn < 1e-5 else "OK")
print(f" {name:<25}: {status} (norm={gn:.6f})")
import torch, torch.nn as nn, numpy as np
torch.manual_seed(42)
class StackedLSTM(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(10, 128, 4, batch_first=True, dropout=0.2)
self.fc = nn.Linear(128, 1)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(out[:, -1]).squeeze()
model = StackedLSTM()
opt = torch.optim.Adam(model.parameters(), lr=5e-3)
crit = nn.MSELoss()
X = torch.randn(32, 40, 10); y = torch.randn(32)
scaler = torch.amp.GradScaler('cpu', enabled=False)
print("Training with gradient clipping:")
for epoch in range(30):
with torch.amp.autocast('cpu', dtype=torch.float32):
pred = model(X); loss = crit(pred, y)
scaler.scale(loss).backward()
scaler.unscale_(opt)
gn_before = sum(p.grad.norm()**2 for p in model.parameters() if p.grad is not None)**0.5
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
gn_after = sum(p.grad.norm()**2 for p in model.parameters() if p.grad is not None)**0.5
scaler.step(opt); scaler.update(); opt.zero_grad()
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1}: loss={loss.item():.4f}, grad={gn_before.item():.3f}->{gn_after.item():.3f}")
import torch, torch.nn as nn
model = nn.RNN(10, 64, 3, batch_first=True)
fc = nn.Linear(64, 1)
params = list(model.parameters()) + list(fc.parameters())
opt = torch.optim.Adam(params, lr=1e-2)
X = torch.randn(16, 30, 10); y = torch.randn(16)
# 1. Forward: out, _ = model(X); pred = fc(out[:,-1]).squeeze()
# 2. MSELoss backward
# 3. Print grad norm BEFORE clip_grad_norm_
# 4. Apply clip_grad_norm_ max_norm=1.0
# 5. Print grad norm AFTER
# 6. optimizer.step()
Export PyTorch models via TorchScript for language-agnostic C++/mobile deployment, ONNX for cross-framework serving, or pickle for sklearn models. Production deployment requires consistent preprocessing and health checks.
TorchScript Export
import torch, torch.nn as nn, os
torch.manual_seed(42)
class Classifier(nn.Module):
def __init__(self, n_in=20, n_classes=3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_in, 64), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, n_classes))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return torch.softmax(self.net(x), dim=-1)
model = Classifier(); model.eval()
x = torch.randn(5, 20)
original_out = model(x)
# trace: for fixed control flow
traced = torch.jit.trace(model, x)
# script: for dynamic control flow
scripted = torch.jit.script(model)
scripted.save("/tmp/classifier.pt")
loaded = torch.jit.load("/tmp/classifier.pt"); loaded.eval()
loaded_out = loaded(x)
print(f"TorchScript size: {os.path.getsize('/tmp/classifier.pt')/1024:.1f} KB")
print(f"Outputs match: {torch.allclose(original_out, loaded_out, atol=1e-5)}")
print(f"Batch preds: {loaded(torch.randn(8,20)).argmax(1).tolist()}")
ONNX Export
import torch, torch.nn as nn, os
torch.manual_seed(42)
class Regressor(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(10,32), nn.ReLU(), nn.Linear(32,1))
def forward(self, x): return self.net(x)
model = Regressor(); model.eval()
dummy = torch.randn(1, 10)
torch.onnx.export(
model, dummy, "/tmp/regressor.onnx",
input_names=["features"], output_names=["prediction"],
dynamic_axes={"features": {0: "batch"}, "prediction": {0: "batch"}},
opset_version=17)
print(f"ONNX file: {os.path.getsize('/tmp/regressor.onnx')/1024:.1f} KB")
try:
import onnx
m = onnx.load("/tmp/regressor.onnx")
onnx.checker.check_model(m)
print("ONNX check: PASSED")
print(f" Inputs: {[i.name for i in m.graph.input]}")
print(f" Outputs: {[o.name for o in m.graph.output]}")
except ImportError:
print("Install onnx: pip install onnx")
# Deployment comparison
print("\nDeployment Format Comparison:")
for fmt, use_case in [
("TorchScript", "C++ microservice, mobile (TorchMobile)"),
("ONNX", "Cross-framework, ONNX Runtime, mobile"),
("Pickle", "Python-only, sklearn, quick prototyping"),
("TF SavedModel","TensorFlow Serving, TFLite mobile"),
]:
print(f" {fmt:<15}: {use_case}")
Production Deployment Checklist
import torch, torch.nn as nn, pickle, json, time, os
torch.manual_seed(42)
# Full deployment pipeline: train -> validate -> export -> health check
class ProductionModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(10,32), nn.ReLU(), nn.Linear(32,3))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return torch.softmax(self.net(x), dim=-1)
# Train
model = ProductionModel()
X = torch.randn(200, 10); y = torch.randint(0, 3, (200,))
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
for _ in range(100):
l = nn.CrossEntropyLoss()(model(X), y); opt.zero_grad(); l.backward(); opt.step()
model.eval()
test_acc = (model(X).argmax(1)==y).float().mean().item()
print(f"Model accuracy: {test_acc:.4f}")
# Export
scripted = torch.jit.script(model)
scripted.save("/tmp/prod_model.pt")
scripted_size = os.path.getsize("/tmp/prod_model.pt")/1024
# Health check function
def health_check(model_path, test_input_shape=(1, 10)):
loaded = torch.jit.load(model_path); loaded.eval()
x_test = torch.randn(*test_input_shape)
t0 = time.time()
with torch.no_grad():
out = loaded(x_test)
latency_ms = (time.time()-t0)*1000
return {
"status": "healthy",
"output_shape": list(out.shape),
"output_sum_to_1": bool(abs(out.sum().item()-1) < 1e-4),
"latency_ms": round(latency_ms, 3),
"model_size_kb": round(scripted_size, 1),
}
health = health_check("/tmp/prod_model.pt")
print("\nHealth Check:", json.dumps(health, indent=2))
# Deployment manifest
manifest = {
"model_path": "/tmp/prod_model.pt",
"format": "TorchScript",
"input": {"name": "features", "shape": [-1, 10], "dtype": "float32"},
"output": {"name": "probabilities", "shape": [-1, 3]},
"accuracy": round(test_acc, 4),
}
print("\nDeployment Manifest:", json.dumps(manifest, indent=2))
import torch, torch.nn as nn, os
torch.manual_seed(42)
class MultiOutputModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(15,64), nn.ReLU(), nn.Linear(64,32), nn.ReLU(), nn.Linear(32,4))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return torch.softmax(self.net(x), dim=-1)
# Train
model = MultiOutputModel()
X = torch.randn(200, 15); y = torch.randint(0, 4, (200,))
for _ in range(100):
l = nn.CrossEntropyLoss()(model(X), y)
l.backward(); torch.optim.Adam(model.parameters()).step()
model.eval()
test = torch.randn(5, 15)
orig_out = model(test).detach()
# Export TorchScript
scripted = torch.jit.script(model)
scripted.save("/tmp/multi.pt")
ts_out = torch.jit.load("/tmp/multi.pt")(test).detach()
# Export ONNX
torch.onnx.export(model, torch.randn(1,15), "/tmp/multi.onnx",
input_names=["x"], output_names=["probs"],
dynamic_axes={"x":{0:"batch"},"probs":{0:"batch"}}, opset_version=17)
print(f"TorchScript match: {torch.allclose(orig_out, ts_out, atol=1e-5)}")
print(f"ONNX size: {os.path.getsize('/tmp/multi.onnx')/1024:.1f} KB")
print(f"Predictions: {orig_out[:3].numpy().round(4)}")
import torch, torch.nn as nn
class Net(nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
# TODO: add layers
pass
model = Net(); model.eval()
# 1. torch.jit.script(model) -> scripted
# 2. scripted.save("/tmp/net.pt")
# 3. loaded = torch.jit.load("/tmp/net.pt")
# 4. x = torch.randn(4, 10)
# 5. Assert torch.allclose(model(x), loaded(x))