Loading Module…

πŸ”’ NumPy

32 topics • Click any card to expand

1. Array Creation

NumPy arrays are the foundation of scientific computing. Create them from lists, ranges, or built-in factory functions.

From lists and factory functions
import numpy as np

a = np.array([1, 2, 3, 4, 5])
b = np.array([[1, 2, 3], [4, 5, 6]])   # 2D

zeros = np.zeros((3, 4))               # 3Γ—4 of 0.0
ones  = np.ones((2, 3))                # 2Γ—3 of 1.0
eye   = np.eye(3)                      # 3Γ—3 identity
full  = np.full((2, 2), 7.0)           # filled with 7.0

print("1D:", a)
print("2D shape:", b.shape)
print("Identity:\n", eye)
arange, linspace, logspace
import numpy as np

r   = np.arange(0, 20, 2)          # [0, 2, 4, ..., 18]
pts = np.linspace(0, 1, 5)         # 5 evenly-spaced in [0,1]
log = np.logspace(0, 3, 4)         # [1, 10, 100, 1000]

print("arange:", r)
print("linspace:", pts)
print("logspace:", log)

# Random arrays
rng = np.random.default_rng(42)    # recommended new API
u   = rng.random((3, 3))           # uniform [0,1)
n   = rng.standard_normal((3, 3))  # standard normal
print("Random normal:\n", n.round(2))
np.tile, np.meshgrid, and np.empty
import numpy as np

# Checkerboard pattern using tile
unit = np.array([[0, 1], [1, 0]])
board = np.tile(unit, (4, 4))
print("Checkerboard (8x8):\n", board)

# meshgrid β€” create coordinate grids
x = np.linspace(-2, 2, 5)
y = np.linspace(-1, 1, 3)
XX, YY = np.meshgrid(x, y)
print("XX shape:", XX.shape)   # (3, 5)
Z = np.sqrt(XX**2 + YY**2)    # distance from origin
print("Z:\n", Z.round(2))

# np.empty β€” uninitialized (fast allocation)
buf = np.empty((3, 3), dtype=np.float64)
print("Empty (garbage values):", buf.shape, buf.dtype)
np.fromfunction and structured arrays
import numpy as np

# np.fromfunction β€” build array from index formula
# Creates a 5x5 multiplication table
mul_table = np.fromfunction(lambda i, j: (i + 1) * (j + 1), (5, 5), dtype=int)
print("Multiplication table:\n", mul_table)

# Distance matrix from origin using fromfunction
dist = np.fromfunction(lambda i, j: np.sqrt(i**2 + j**2), (4, 4))
print("\nDistance from (0,0):\n", dist.round(2))

# Structured arrays β€” heterogeneous data like a database row
dt = np.dtype([('name', 'U10'), ('age', np.int32), ('score', np.float64)])
students = np.array([
    ('Alice', 23, 92.5),
    ('Bob',   25, 87.3),
    ('Carol', 22, 95.1),
], dtype=dt)
print("\nStructured array:", students)
print("Names:", students['name'])
print("Top scorer:", students[np.argmax(students['score'])]['name'])
💼 Real-World: Feature Matrix for Machine Learning
A data scientist prepares a feature matrix with a bias column before training a linear model.
import numpy as np

np.random.seed(42)
n_samples, n_features = 5000, 12
X = np.random.randn(n_samples, n_features)

# Add bias column (ones) for intercept term
bias = np.ones((n_samples, 1))
X_b  = np.hstack([bias, X])

print(f"Feature matrix shape: {X_b.shape}")
print(f"Memory: {X_b.nbytes / 1024:.1f} KB")
print(f"dtype: {X_b.dtype}")
print(f"Sample row[:5]: {X_b[0, :5].round(3)}")
🏋️ Practice: Array Construction Challenge
1) Create a 6x6 checkerboard of 0s and 1s using np.tile. 2) Build a 5x5 identity matrix, then multiply its diagonal by [1,2,3,4,5]. 3) Generate 20 log-spaced points from 10 to 10000 and find the one closest to 500.
Starter Code
import numpy as np

# 1. 6x6 checkerboard using np.tile
unit = np.array([[0, 1], [1, 0]])
# TODO: board = np.tile(unit, ???)
# print("Checkerboard:\n", board)

# 2. 5x5 identity * diagonal scale
eye = np.eye(5)
scale = np.array([1, 2, 3, 4, 5])
# TODO: scaled = eye * ???   (use broadcasting/diag)
# print("Scaled identity:\n", scaled)

# 3. Log-spaced points, find closest to 500
pts = np.logspace(1, 4, 20)   # 10^1 to 10^4
# TODO: idx = np.argmin(np.abs(pts - 500))
# print(f"Closest to 500: {pts[idx]:.2f} at index {idx}")
✅ Practice Checklist
2. Array Attributes & Inspection

Every NumPy array carries metadata: shape, dtype, number of dimensions, and memory size. Understanding these prevents bugs.

shape, ndim, dtype, size, nbytes
import numpy as np

a = np.array([[1.5, 2.0, 3.1],
              [4.0, 5.5, 6.2]], dtype=np.float32)

print("shape:   ", a.shape)     # (2, 3)
print("ndim:    ", a.ndim)      # 2
print("dtype:   ", a.dtype)     # float32
print("size:    ", a.size)      # 6  (total elements)
print("itemsize:", a.itemsize)  # 4  (bytes per element)
print("nbytes:  ", a.nbytes)    # 24 (total bytes)
print("Transpose shape:", a.T.shape)
Type casting and auto reshape
import numpy as np

a = np.arange(12)           # [0..11]
r = a.reshape(3, 4)
print("Reshaped (3,4):\n", r)

# -1 auto-infers the dimension
c = a.reshape(2, -1)        # (2, 6)
print("Auto reshape:", c.shape)

# Change dtype
b = a.astype(np.float32)
print("float32 dtype:", b.dtype)

# Flatten back to 1D
print("Flattened:", r.flatten())
View vs copy and np.shares_memory
import numpy as np

a = np.arange(10)

# Slice = VIEW (shares memory)
v = a[2:6]
v[0] = 99
print("After modifying view:", a)   # a is changed!

# .copy() = independent copy
c = a[2:6].copy()
c[0] = 0
print("After modifying copy:", a)   # a unchanged

print("View shares memory:", np.shares_memory(a, v))
print("Copy shares memory:", np.shares_memory(a, c))

# reshape also returns a view when possible
r = a.reshape(2, 5)
print("Reshape shares memory:", np.shares_memory(a, r))
Memory layout: strides, C vs F order, itemsize
import numpy as np

a = np.arange(12, dtype=np.float64).reshape(3, 4)

# Strides: bytes to step in each dimension
print("Shape:    ", a.shape)
print("Strides:  ", a.strides)   # (32, 8) β€” 4 float64s per row
print("Itemsize: ", a.itemsize)  # 8 bytes per float64

# C order (row-major, default): last index changes fastest
c_arr = np.ascontiguousarray(a, order='C')
print("\nC-order strides:", c_arr.strides)   # (32, 8)
print("C contiguous:   ", c_arr.flags['C_CONTIGUOUS'])

# Fortran order (column-major): first index changes fastest
f_arr = np.asfortranarray(a, order='F')
print("\nF-order strides:", f_arr.strides)   # (8, 24)
print("F contiguous:   ", f_arr.flags['F_CONTIGUOUS'])

# Transpose flips strides β€” still a view, not a copy
t = a.T
print("\nTranspose strides:", t.strides)     # (8, 32)
print("Shares memory with original:", np.shares_memory(a, t))
💼 Real-World: Memory Audit of Sensor Arrays
An IoT engineer compares float64 vs float32 memory for large time-series sensor arrays before storing.
import numpy as np

sensors = {
    "temperature": np.random.uniform(15, 40, (10000, 24)),
    "pressure":    np.random.uniform(900, 1100, (10000, 24)),
    "humidity":    np.random.uniform(20, 90, (10000, 24)),
}

print(f"{'Sensor':12s} {'float64 MB':>12} {'float32 MB':>12} {'Saved MB':>10}")
print("-" * 50)
for name, arr in sensors.items():
    mb64  = arr.nbytes / 1e6
    mb32  = arr.astype(np.float32).nbytes / 1e6
    print(f"{name:12s} {mb64:>12.3f} {mb32:>12.3f} {mb64-mb32:>10.3f}")
🏋️ Practice: Array Attribute Inspector
Create three arrays of different dtypes (int8, float32, complex128), each shaped (4, 5). For each: print shape, ndim, dtype, itemsize, nbytes, and compute the memory saving vs float64. Then reshape one to (2, 2, 5) and verify shares_memory returns True.
Starter Code
import numpy as np

dtypes = [np.int8, np.float32, np.complex128]
base_bytes = np.zeros((4, 5), dtype=np.float64).nbytes

for dt in dtypes:
    arr = np.zeros((4, 5), dtype=dt)
    # TODO: print shape, ndim, dtype, itemsize, nbytes
    # TODO: compute saving vs float64 base_bytes
    print(f"dtype={arr.dtype}: shape={arr.shape}, "
          f"ndim=???, itemsize=???B, nbytes=???B, "
          f"vs float64: ???B saving")

# Reshape and verify view
a = np.arange(20, dtype=np.float32).reshape(4, 5)
# TODO: b = a.reshape(2, 2, 5)
# TODO: print("shares_memory:", np.shares_memory(a, b))
✅ Practice Checklist
3. Indexing & Slicing

NumPy supports element access, slices, fancy (list-based) indexing, and boolean masks β€” all without Python loops.

1D and 2D indexing
import numpy as np

a = np.arange(10)
print(a[2])           # 2
print(a[-1])          # 9
print(a[2:7])         # [2 3 4 5 6]
print(a[::2])         # every other β†’ [0 2 4 6 8]
print(a[::-1])        # reversed

m = np.arange(12).reshape(3, 4)
print("m[1,2]:", m[1, 2])       # 6
print("col 1:", m[:, 1])        # [1 5 9]
print("sub-matrix:\n", m[0:2, 1:3])
Fancy indexing and np.ix_
import numpy as np

a = np.array([10, 20, 30, 40, 50])
idx = [0, 2, 4]
print("Fancy:", a[idx])              # [10 30 50]

m = np.arange(16).reshape(4, 4)
rows = [0, 1, 2]
cols = [1, 2, 3]
print("Diagonal slice:", m[rows, cols])   # [1 6 11]

# np.ix_ β€” all row/col combinations
print("Sub-matrix:\n", m[np.ix_(rows, cols)])
Advanced slicing: step, reverse, and np.take
import numpy as np

m = np.arange(25).reshape(5, 5)
print("Every 2nd row, every 2nd col:\n", m[::2, ::2])
print("Last 2 rows reversed:\n", m[-1::-2])

# np.take β€” fancy indexing along axis
data = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
order = [2, 0, 3, 1]   # reorder rows
print("Reordered rows:\n", np.take(data, order, axis=0))

# Diagonal extraction
print("Main diagonal:", np.diag(m))
print("Offset diagonal:", np.diag(m, k=1))   # one above main

# Set values at fancy indices
arr = np.zeros(10, dtype=int)
arr[[1, 3, 5, 7, 9]] = 1
print("Odd positions set:", arr)
np.ix_ for cross-product indexing and scatter/gather
import numpy as np

m = np.arange(25).reshape(5, 5)
print("Matrix:\n", m)

# np.ix_ β€” create open mesh for cross-product selection
rows = [0, 2, 4]
cols = [1, 3]
sub = m[np.ix_(rows, cols)]
print("\nnp.ix_ sub-matrix (rows 0,2,4 x cols 1,3):\n", sub)

# Scatter: write to all combinations of row/col indices
target = np.zeros((5, 5), dtype=int)
target[np.ix_(rows, cols)] = 9
print("\nAfter scatter write:\n", target)

# Gather: collect multiple blocks from a 3D array
batch = np.arange(60).reshape(3, 4, 5)  # 3 matrices of (4,5)
sample_rows = [0, 2]
sample_cols = [1, 3, 4]
# Extract a sub-block from each matrix in the batch
gathered = batch[:, np.ix_(sample_rows, sample_cols)[0],
                    np.ix_(sample_rows, sample_cols)[1]]
print("\nBatch gather shape:", gathered.shape)  # (3, 2, 3)
print("First matrix block:\n", gathered[0])
💼 Real-World: Sliding Window for Anomaly Detection
An IoT engineer extracts overlapping 24-hour windows from continuous sensor readings to detect anomalies.
import numpy as np

# 30 days of hourly readings
time_series = np.random.randn(24 * 30) + 20.0   # ~20Β°C

windows = np.lib.stride_tricks.sliding_window_view(time_series, 24)
print(f"Series: {time_series.shape}  β†’  Windows: {windows.shape}")

# Find 24-hour window with highest standard deviation
stds     = windows.std(axis=1)
peak_idx = np.argmax(stds)

print(f"Most variable window starts at hour {peak_idx}")
print(f"  std  = {stds[peak_idx]:.3f}")
print(f"  mean = {windows[peak_idx].mean():.2f}Β°C")
print(f"  range [{windows[peak_idx].min():.1f}, {windows[peak_idx].max():.1f}]")
🏋️ Practice: Matrix Surgery
Create a 6x6 matrix from arange(36). Using only indexing (no loops): 1) Extract the 2x2 center block (rows 2-3, cols 2-3). 2) Extract the main diagonal. 3) Set all values > 25 to -1. 4) Extract every other element from the last row.
Starter Code
import numpy as np

m = np.arange(36).reshape(6, 6)
print("Original:\n", m)

# 1. Extract 2x2 center block (rows 2-3, cols 2-3)
# TODO: center = m[???, ???]
# print("Center:\n", center)

# 2. Main diagonal
# TODO: diag = np.diag(m)
# print("Diagonal:", diag)

# 3. Set all values > 25 to -1 (in-place boolean indexing)
m_copy = m.copy()
# TODO: m_copy[???] = -1
# print("After masking >25:\n", m_copy)

# 4. Every other element from last row
# TODO: every_other = m[-1, ???]
# print("Every other (last row):", every_other)
✅ Practice Checklist
4. Mathematical Operations & Ufuncs

NumPy ufuncs (universal functions) operate element-wise on entire arrays β€” far faster than Python loops.

Arithmetic and ufuncs
import numpy as np

a = np.array([1.0, 4.0, 9.0, 16.0])
b = np.array([2.0, 2.0, 3.0,  4.0])

print("a + b:", a + b)
print("a * b:", a * b)
print("a ** 2:", a ** 2)
print("sqrt:", np.sqrt(a))
print("log:", np.log(a).round(3))
print("exp:", np.exp([0, 1, 2]).round(3))
print("clip:", np.clip(a, 2, 10))
Cumulative and trigonometric ops
import numpy as np

arr = np.array([1, 2, 3, 4, 5])
print("cumsum: ", np.cumsum(arr))
print("cumprod:", np.cumprod(arr))
print("diff:   ", np.diff(arr))

x = np.linspace(-np.pi, np.pi, 5)
print("sin:", np.sin(x).round(2))
print("cos:", np.cos(x).round(2))

a = np.array([-3, -1, 0, 1, 3])
print("abs: ", np.abs(a))
print("sign:", np.sign(a))
np.einsum and vectorized distance
import numpy as np

# einsum β€” Einstein summation notation
A = np.random.randn(3, 4)
B = np.random.randn(4, 5)

# Matrix multiply: 'ij,jk->ik'
C = np.einsum('ij,jk->ik', A, B)
print("einsum matmul:", C.shape)   # (3, 5)

# Trace (sum of diagonal): 'ii->'
sq = np.random.randn(4, 4)
print("trace:", np.einsum('ii->', sq), "==", np.trace(sq))

# Pairwise squared distances between points
points = np.array([[0,0],[1,0],[0,1],[1,1]], dtype=float)
diff = points[:, None, :] - points[None, :, :]  # (4,4,2)
dist = np.sqrt((diff**2).sum(axis=-1))
print("Distance matrix:\n", dist.round(3))
np.vectorize, np.frompyfunc, and np.piecewise
import numpy as np

# np.vectorize β€” wrap a scalar Python function for array inputs
def classify(x):
    if x < 0:   return 'negative'
    if x == 0:  return 'zero'
    return 'positive'

v_classify = np.vectorize(classify)
data = np.array([-3, 0, 1, -1, 5])
print("vectorize:", v_classify(data))

# np.frompyfunc β€” similar but returns object arrays; useful for ufunc chaining
safe_log = np.frompyfunc(lambda x: np.log(x) if x > 0 else np.nan, 1, 1)
vals = np.array([-1.0, 0.0, 1.0, np.e, 10.0])
print("frompyfunc safe_log:", safe_log(vals).astype(float).round(3))

# np.piecewise β€” vectorized multi-branch function
x = np.linspace(-3, 3, 7)
y = np.piecewise(x,
    [x < -1, (x >= -1) & (x <= 1), x > 1],
    [lambda t: -t,   # x < -1:  y = -x
     lambda t: t**2, # -1<=x<=1: y = x^2
     lambda t: t])   # x > 1:  y = x
print("\npiecewise input: ", x.round(2))
print("piecewise output:", y.round(2))
💼 Real-World: Daily Return & Volatility Calculation
A quant analyst vectorizes stock return and volatility calculations across a price history array.
import numpy as np

prices = np.array([100, 102, 98, 105, 103, 110, 108, 115, 112, 118],
                  dtype=np.float64)

# Percentage daily returns
pct_ret = np.diff(prices) / prices[:-1] * 100

# Log returns (additive, good for compounding)
log_ret = np.diff(np.log(prices))

# Annualized volatility (assuming 252 trading days)
ann_vol = log_ret.std() * np.sqrt(252)

# Sharpe-like ratio
sharpe = log_ret.mean() / log_ret.std() * np.sqrt(252)

print(f"Daily returns %: {pct_ret.round(2)}")
print(f"Annualized vol:  {ann_vol:.4f}")
print(f"Sharpe ratio:    {sharpe:.3f}")
🏋️ Practice: Vectorized Math Challenge
1) Given angles in degrees [0, 30, 45, 60, 90, 120, 180], compute sin and cos without a loop. Verify sinΒ²+cosΒ²=1 for all. 2) Given a 1D price array, compute: % daily return, 5-day moving average (using np.convolve), and cumulative return. 3) Use np.clip to cap values between the 10th and 90th percentile.
Starter Code
import numpy as np

# 1. Trig on degree array
degrees = np.array([0, 30, 45, 60, 90, 120, 180], dtype=float)
# TODO: radians = degrees * np.pi / 180
# TODO: s, c = np.sin(radians), np.cos(radians)
# TODO: verify = np.allclose(s**2 + c**2, 1)
# print("sinΒ²+cosΒ²==1:", verify)

# 2. Price array stats
np.random.seed(42)
prices = np.cumprod(1 + np.random.normal(0.001, 0.02, 30)) * 100
# TODO: pct_ret = np.diff(prices) / prices[:-1] * 100
# TODO: ma5 = np.convolve(prices, np.ones(5)/5, mode='valid')
# TODO: cum_ret = prices[-1] / prices[0] - 1
# print(f"Cumulative return: {cum_ret:.2%}")

# 3. Winsorize to [p10, p90]
data = np.random.randn(100) * 10
# TODO: p10, p90 = np.percentile(data, [10, 90])
# TODO: clipped = np.clip(data, p10, p90)
# print(f"Original range: [{data.min():.1f}, {data.max():.1f}]")
# print(f"Clipped range:  [{clipped.min():.1f}, {clipped.max():.1f}]")
✅ Practice Checklist
5. Broadcasting

Broadcasting lets NumPy perform operations on arrays of different shapes without copying data β€” memory-efficient and fast.

Scalar and vector broadcasting
import numpy as np

a = np.array([[1, 2, 3],
              [4, 5, 6]])

print("a + 10:\n",     a + 10)                      # scalar
print("a * [1,2,3]:\n", a * np.array([1, 2, 3]))   # row
print("a + [[10],[20]]:\n",
      a + np.array([[10], [20]]))                    # column
Min-max normalization via broadcasting
import numpy as np

X = np.random.randn(100, 5)          # 100 samples, 5 features

# Z-score normalization
mean = X.mean(axis=0)                # shape (5,) β€” per feature
std  = X.std(axis=0)
X_z  = (X - mean) / std             # (100,5) op (5,) β†’ broadcasts

print("After normalization:")
print("  mean:", X_z.mean(axis=0).round(8))
print("  std: ", X_z.std(axis=0).round(8))

# Min-max normalization
X_mm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
print("Min-max range:", X_mm.min(axis=0).round(2), "to", X_mm.max(axis=0).round(2))
Outer product and pairwise distance via broadcasting
import numpy as np

# Outer product without np.outer
a = np.array([1, 2, 3, 4])
b = np.array([10, 20, 30])
outer = a[:, None] * b[None, :]    # (4,1) * (1,3) β†’ (4,3)
print("Outer product:\n", outer)

# All pairwise L2 distances between N points
rng = np.random.default_rng(0)
pts = rng.random((5, 2))           # 5 points in 2D
diff = pts[:, None, :] - pts[None, :, :]  # (5,5,2)
dist = np.sqrt((diff**2).sum(axis=-1))    # (5,5)
print("Distance matrix:\n", dist.round(3))
print("Nearest neighbor of pt0:", np.argsort(dist[0])[1])
Batch matrix multiply with np.matmul and broadcasting rules
import numpy as np

rng = np.random.default_rng(7)

# Batch matrix multiply: (B, M, K) @ (B, K, N) -> (B, M, N)
B, M, K, N = 4, 3, 5, 2
A_batch = rng.random((B, M, K))
W_batch = rng.random((B, K, N))
C_batch = np.matmul(A_batch, W_batch)   # or A_batch @ W_batch
print("Batch matmul:", A_batch.shape, "@", W_batch.shape, "->", C_batch.shape)

# Broadcasting rule visualization: align shapes from the right
# (4, 1, 3) + (   3, 3) -> (4, 3, 3)  β€” rule: prepend 1s then stretch
x = rng.random((4, 1, 3))
y = rng.random((3, 3))
result = x + y
print("\nBroadcast (4,1,3) + (3,3) ->", result.shape)

# Shared weight matrix across all batch items: (B,M,K) @ (K,N)
W_shared = rng.random((K, N))           # single weight matrix
C_shared = A_batch @ W_shared           # (4,3,5) @ (5,2) -> (4,3,2)
print("Shared-weight batch:", A_batch.shape, "@", W_shared.shape, "->", C_shared.shape)
print("All batch outputs equal?", False, "(each row of A_batch differs)")
💼 Real-World: Batch Image Normalization for CNNs
A computer vision engineer normalizes a batch of RGB images using channel-wise ImageNet statistics.
import numpy as np

# Simulate batch: (N=32, H=64, W=64, C=3)
images = np.random.randint(0, 256, (32, 64, 64, 3), dtype=np.uint8)

# ImageNet channel means and stds (R, G, B)
mean_rgb = np.array([0.485, 0.456, 0.406])
std_rgb  = np.array([0.229, 0.224, 0.225])

# Scale to [0,1] then normalize β€” broadcasting: (32,64,64,3) op (3,)
normalized = (images.astype(np.float32) / 255.0 - mean_rgb) / std_rgb

print(f"Input:  {images.shape}  dtype={images.dtype}")
print(f"Output: {normalized.shape}  dtype={normalized.dtype}")
print(f"Range:  [{normalized.min():.3f}, {normalized.max():.3f}]")
print(f"Per-channel mean: {normalized.mean(axis=(0,1,2)).round(3)}")
🏋️ Practice: Broadcasting Workout
1) Add a bias column of 1s to a (50, 4) feature matrix using broadcasting/hstack. 2) Compute the softmax of a (3, 5) logit matrix row-wise: exp(x) / sum(exp(x)) β€” verify each row sums to 1. 3) Subtract the per-row mean from a (4, 6) matrix so each row has zero mean.
Starter Code
import numpy as np
np.random.seed(42)

# 1. Add bias column (column of 1s) to feature matrix
X = np.random.randn(50, 4)
# TODO: bias = np.ones((50, 1))
# TODO: X_b = np.hstack([bias, X])
# print("With bias:", X_b.shape)  # (50, 5)

# 2. Row-wise softmax
logits = np.random.randn(3, 5)
# TODO: exp_l = np.exp(logits)
# TODO: softmax = exp_l / exp_l.sum(axis=1, keepdims=True)
# print("Row sums:", softmax.sum(axis=1))  # should be [1. 1. 1.]

# 3. Zero-center each row
M = np.random.randn(4, 6)
# TODO: row_means = M.mean(axis=1, keepdims=True)
# TODO: M_centered = M - row_means
# print("Row means after centering:", M_centered.mean(axis=1).round(10))
✅ Practice Checklist
6. Boolean Indexing & np.where

Filter arrays with boolean conditions. np.where and np.select let you apply transformations without explicit loops.

Boolean masks
import numpy as np

a = np.array([10, 25, 3, 47, 18, 32, 5])

mask = a > 20
print("Mask:     ", mask)
print("Values>20:", a[mask])

# Modify in-place
a[a < 10] = 0
print("After zero small:", a)

# Multiple conditions
b = np.arange(20)
print("5 to 15:", b[(b >= 5) & (b <= 15)])
np.where and np.select
import numpy as np

scores = np.array([45, 78, 55, 90, 33, 68, 82])

# np.where β€” binary choice
grade = np.where(scores >= 60, "Pass", "Fail")
print("Pass/Fail:", grade)

# np.select β€” multiple conditions
cond    = [scores >= 90, scores >= 70, scores >= 50]
choices = ["A",          "B",          "C"]
letter  = np.select(cond, choices, default="D")
print("Letter grades:", letter)

# np.where returns indices when given 1 arg
idx_fail = np.where(scores < 60)[0]
print("Failing indices:", idx_fail)
np.argwhere and masked operations
import numpy as np

rng = np.random.default_rng(7)
data = rng.normal(50, 15, 20).round(1)
print("Data:", data)

# argwhere β€” indices where condition is True
low = np.argwhere(data < 35)
print("Indices where < 35:", low.flatten())

# Replace outliers (>2 std) with median
mean, std = data.mean(), data.std()
median = np.median(data)
outlier_mask = np.abs(data - mean) > 2 * std
cleaned = np.where(outlier_mask, median, data)
print(f"Replaced {outlier_mask.sum()} outliers with median {median:.1f}")
print("Cleaned:", cleaned)
np.where with multiple conditions and np.select for multi-case logic
import numpy as np

rng = np.random.default_rng(3)
temps = rng.normal(20, 8, 12).round(1)   # 12 hourly readings
print("Temperatures:", temps)

# np.where with compound condition (AND)
comfortable = np.where((temps >= 18) & (temps <= 26), temps, np.nan)
print("Comfortable temps only:", comfortable)

# np.select β€” clean multi-case replacement for nested np.where
conditions = [
    temps < 5,
    (temps >= 5)  & (temps < 15),
    (temps >= 15) & (temps < 25),
    temps >= 25,
]
labels = ['freezing', 'cold', 'comfortable', 'hot']
category = np.select(conditions, labels, default='unknown')
print("Categories:", category)

# np.where returning indices (1-arg form) then using them
hot_idx = np.where(temps >= 25)[0]
print(f"Hot hours (index): {hot_idx}  values: {temps[hot_idx]}")

# Compound OR mask
extreme = np.where((temps < 5) | (temps >= 30), 'extreme', 'normal')
print("Extreme flag:", extreme)
💼 Real-World: Energy Consumption Flagging
A building engineer computes heating/cooling degree-hours and flags extreme temperature events for HVAC optimization.
import numpy as np

np.random.seed(1)
# 1 year of hourly outdoor temperatures (Β°C)
temps = np.random.normal(22, 7, 8760)

# Degree-hours for heating (< 16Β°C) and cooling (> 24Β°C)
heating = np.where(temps < 16, 16 - temps, 0)
cooling = np.where(temps > 24, temps - 24, 0)
extreme = (temps < 0) | (temps > 38)

print(f"Heating degree-hours:   {heating.sum():.0f}")
print(f"Cooling degree-hours:   {cooling.sum():.0f}")
print(f"Extreme hours (<0/>38): {extreme.sum()}")
print(f"Coldest: {temps.min():.1f}Β°C at hour {temps.argmin()}")
print(f"Hottest: {temps.max():.1f}Β°C at hour {temps.argmax()}")
🏋️ Practice: Boolean Filtering & Replacement
Generate 200 random normal values (mean=100, std=20). 1) Count values in each band: <70, 70-90, 90-110, 110-130, >130. 2) Use np.select to label them 'very_low','low','normal','high','very_high'. 3) Replace all values outside [60, 140] with the band boundary (clip). Verify no values remain outside [60, 140].
Starter Code
import numpy as np
np.random.seed(0)
data = np.random.normal(100, 20, 200)

# 1. Count per band
bands = [(None, 70), (70, 90), (90, 110), (110, 130), (130, None)]
labels = ['very_low', 'low', 'normal', 'high', 'very_high']
for label, (lo, hi) in zip(labels, bands):
    if lo is None:
        count = (data < hi).sum()
    elif hi is None:
        count = (data >= lo).sum()
    else:
        count = ((data >= lo) & (data < hi)).sum()
    print(f"  {label:10s}: {count}")

# 2. Label each value with np.select
conds = [
    data < 70,
    (data >= 70) & (data < 90),
    (data >= 90) & (data < 110),
    (data >= 110) & (data < 130),
]
# TODO: labeled = np.select(conds, labels[:4], default=labels[4])

# 3. Clip and verify
# TODO: clipped = np.clip(data, 60, 140)
# TODO: print("All in [60,140]:", np.all((clipped >= 60) & (clipped <= 140)))
✅ Practice Checklist
7. Statistical Functions

NumPy provides fast descriptive statistics along any axis, plus correlation, covariance, and percentile functions.

Descriptive stats and axis parameter
import numpy as np

data = np.array([4, 7, 13, 16, 21, 23, 24, 28, 30])
print("mean:   ", np.mean(data))
print("median: ", np.median(data))
print("std:    ", np.std(data).round(2))
print("25th:   ", np.percentile(data, 25))
print("75th:   ", np.percentile(data, 75))

# Axis-wise on 2D
m = np.random.randint(1, 10, (4, 5))
print("Row means:", m.mean(axis=1).round(2))
print("Col sums: ", m.sum(axis=0))
Correlation and covariance
import numpy as np

np.random.seed(42)
x = np.random.randn(100)
y = 0.8 * x + np.random.randn(100) * 0.5  # correlated
z = np.random.randn(100)                   # independent

# Pearson correlation matrix
data = np.vstack([x, y, z])
cov  = np.cov(data)
corr = np.corrcoef(data)

print("Covariance matrix:\n", cov.round(2))
print("Correlation matrix:\n", corr.round(2))
np.histogram and quantile functions
import numpy as np

rng = np.random.default_rng(42)
data = rng.normal(50, 10, 1000)

# Histogram
counts, edges = np.histogram(data, bins=10)
print("Bin edges:", edges.round(1))
print("Counts:   ", counts)

# Manual histogram display
for i, (lo, hi, c) in enumerate(zip(edges, edges[1:], counts)):
    bar = '#' * (c // 10)
    print(f"[{lo:5.1f}-{hi:5.1f}] {bar} ({c})")

# Quantile / percentile
q = np.quantile(data, [0.25, 0.5, 0.75])
print(f"Q1={q[0]:.1f}  Median={q[1]:.1f}  Q3={q[2]:.1f}  IQR={q[2]-q[0]:.1f}")

# nanmean β€” ignores NaN
a = np.array([1.0, 2.0, np.nan, 4.0, np.nan, 6.0])
print("nanmean:", np.nanmean(a), "  nanstd:", np.nanstd(a).round(3))
np.percentile vs np.quantile, np.corrcoef, and np.histogram2d
import numpy as np

rng = np.random.default_rng(99)
x = rng.normal(0, 1, 500)
y = 0.7 * x + rng.normal(0, 0.7, 500)   # correlated with x

# percentile (takes 0-100) vs quantile (takes 0.0-1.0) β€” same result
p = np.percentile(x, [25, 50, 75])
q = np.quantile(x,  [0.25, 0.50, 0.75])
print("percentile:", p.round(3))
print("quantile:  ", q.round(3))
print("identical: ", np.allclose(p, q))

# corrcoef β€” Pearson correlation matrix
corr = np.corrcoef(x, y)
print(f"\nCorrelation x<->y: {corr[0,1]:.3f}")

# histogram2d β€” joint distribution of two variables
H, xedges, yedges = np.histogram2d(x, y, bins=5)
print("\n2D histogram (counts):\n", H.astype(int))
print("x edges:", xedges.round(2))
print("y edges:", yedges.round(2))
print("Most occupied bin count:", int(H.max()))
💼 Real-World: Manufacturing QC Outlier Detection
A quality engineer uses Z-scores to identify defective units exceeding 3 standard deviations from the process mean.
import numpy as np

np.random.seed(42)
# 10,000 unit measurements (target: 50mm Β± 5mm)
measurements = np.random.normal(50, 5, 10000)

# Inject artificial defects
measurements[[100, 500, 2000, 7500]] = [85, 4, 92, 7]

mean    = measurements.mean()
std     = measurements.std()
z_score = np.abs((measurements - mean) / std)

outlier_mask   = z_score > 3
outlier_vals   = measurements[outlier_mask]
outlier_idx    = np.where(outlier_mask)[0]

print(f"Process mean:  {mean:.2f}mm,  std: {std:.2f}mm")
print(f"Total units:   {len(measurements):,}")
print(f"Defects found: {outlier_mask.sum()} ({outlier_mask.mean():.2%})")
print(f"Defect values: {outlier_vals.round(1)}")
print(f"At indices:    {outlier_idx}")
🏋️ Practice: Statistics on a 2D Dataset
Create a (10, 5) matrix of random student scores (int, 0-100). Compute: per-column mean, std, min, max. Find which column (subject) has the highest variance. Identify which rows (students) have an average below 50. Compute the correlation matrix between all 5 subjects.
Starter Code
import numpy as np
np.random.seed(1)

scores = np.random.randint(0, 101, (10, 5))
subjects = ['Math', 'Science', 'English', 'History', 'Art']
print("Scores:\n", scores)

# 1. Per-column statistics
for j, subj in enumerate(subjects):
    col = scores[:, j]
    # TODO: print(f"{subj:8s}: mean={col.mean():.1f}, std={col.std():.1f}, min={col.min()}, max={col.max()}")
    pass

# 2. Subject with highest variance
# TODO: variances = scores.var(axis=0)
# TODO: hardest = subjects[np.argmax(variances)]
# print(f"Highest variance subject: {hardest}")

# 3. Students with average below 50
# TODO: row_means = scores.mean(axis=1)
# TODO: struggling = np.where(row_means < 50)[0]
# print(f"Students below avg 50: rows {struggling}")

# 4. Correlation matrix
# TODO: corr = np.corrcoef(scores.T)
# print("Correlation:\n", corr.round(2))
✅ Practice Checklist
8. Linear Algebra

numpy.linalg provides matrix operations essential for machine learning, portfolio optimization, and scientific computing.

Matrix multiply, inverse, determinant
import numpy as np

A = np.array([[2, 1], [1, 3]], dtype=float)
B = np.array([[1, 2], [3, 4]], dtype=float)

print("A @ B:\n",  A @ B)            # matmul (preferred)
print("inv(A):\n", np.linalg.inv(A).round(3))
print("det(A):", np.linalg.det(A))
print("A @ inv(A):\n",
      (A @ np.linalg.inv(A)).round(10))  # should be identity
Eigenvalues and solving Ax = b
import numpy as np

A = np.array([[4, 2], [1, 3]], dtype=float)
vals, vecs = np.linalg.eig(A)
print("Eigenvalues: ", vals)
print("Eigenvectors:\n", vecs.round(3))

# Solve the linear system Ax = b
b = np.array([10.0, 8.0])
x = np.linalg.solve(A, b)
print("Solution x:", x)
print("Verify Ax==b:", np.allclose(A @ x, b))
SVD, QR decomposition, and matrix rank
import numpy as np

A = np.array([[1, 2, 3],
              [4, 5, 6],
              [7, 8, 9]], dtype=float)

# SVD decomposition: A = U @ diag(S) @ Vt
U, S, Vt = np.linalg.svd(A)
print("Singular values:", S.round(4))
print("Rank (non-zero SVs):", np.sum(S > 1e-10))

# QR decomposition
Q, R = np.linalg.qr(A)
print("Q shape:", Q.shape, "  R shape:", R.shape)
print("Reconstruct:", np.allclose(Q @ R, A))

# Norm and condition number
print("Frobenius norm:", np.linalg.norm(A, 'fro').round(3))
print("Condition number:", np.linalg.cond(A).round(1))
np.linalg.lstsq, np.linalg.eig, and condition number
import numpy as np

rng = np.random.default_rng(42)

# Least-squares: fit a line y = m*x + b to noisy data
x = np.linspace(0, 10, 20)
y_true = 2.5 * x + 1.0
y = y_true + rng.normal(0, 1.5, 20)   # add noise

# Build design matrix [x, 1] for [m, b]
A = np.column_stack([x, np.ones(len(x))])
coeffs, residuals, rank, sv = np.linalg.lstsq(A, y, rcond=None)
print(f"lstsq fit:  m={coeffs[0]:.3f}  b={coeffs[1]:.3f}  (true: m=2.5, b=1.0)")

# Eigenvalues and eigenvectors
M = np.array([[3, 1], [0, 2]], dtype=float)
eigenvalues, eigenvectors = np.linalg.eig(M)
print(f"\nEigenvalues: {eigenvalues}")
print(f"Eigenvectors:\n{eigenvectors.round(3)}")
# Verify: M @ v = lambda * v for each eigenpair
for i in range(len(eigenvalues)):
    lhs = M @ eigenvectors[:, i]
    rhs = eigenvalues[i] * eigenvectors[:, i]
    print(f"  Mv == lv (col {i}): {np.allclose(lhs, rhs)}")

# Condition number β€” high means near-singular, numerically unstable
well = np.array([[2., 1.], [1., 3.]])
ill  = np.array([[1., 1.], [1., 1.0001]])
print(f"\nCondition (well-conditioned): {np.linalg.cond(well):.2f}")
print(f"Condition (ill-conditioned):  {np.linalg.cond(ill):.0f}")
💼 Real-World: Minimum-Variance Portfolio Optimization
A portfolio manager uses covariance matrix inversion to compute optimal minimum-variance asset weights.
import numpy as np

np.random.seed(0)
# 252 trading days, 5 assets
returns = np.random.randn(252, 5) * 0.01
cov     = np.cov(returns.T)          # 5Γ—5 covariance matrix

# Closed-form minimum-variance weights: w = inv(Cov)Β·1 / (1'Β·inv(Cov)Β·1)
inv_cov  = np.linalg.inv(cov)
ones     = np.ones(5)
w_raw    = inv_cov @ ones
w_minvar = w_raw / w_raw.sum()       # normalize to sum = 1

port_var = float(w_minvar @ cov @ w_minvar)
port_std = np.sqrt(port_var * 252)   # annualized

print("Min-variance weights:", w_minvar.round(4))
print(f"Sum of weights: {w_minvar.sum():.6f}")
print(f"Annualized portfolio vol: {port_std:.4f}")
🏋️ Practice: Solve a 3x3 Linear System
Solve this system: 2x + y - z = 8, -3x - y + 2z = -11, -2x + y + 2z = -3. Set up as Ax=b, solve with np.linalg.solve, verify with np.allclose. Then compute det(A), inv(A), and confirm A @ inv(A) is the identity.
Starter Code
import numpy as np

# 2x + y - z  = 8
# -3x - y + 2z = -11
# -2x + y + 2z = -3
A = np.array([
    # TODO: fill in the coefficient matrix
], dtype=float)

b = np.array([8.0, -11.0, -3.0])

# TODO: x = np.linalg.solve(A, b)
# print("Solution: x={:.1f}, y={:.1f}, z={:.1f}".format(*x))
# print("Verify:", np.allclose(A @ x, b))

# TODO: det_A = np.linalg.det(A)
# print(f"det(A) = {det_A:.2f}")

# TODO: inv_A = np.linalg.inv(A)
# print("A @ inv(A) is identity:", np.allclose(A @ inv_A, np.eye(3)))
✅ Practice Checklist
9. Array Manipulation

Reshape, stack, split, and transpose arrays to match the formats expected by algorithms and frameworks.

reshape, flatten, transpose
import numpy as np

a = np.arange(24)

r = a.reshape(4, 6)
print("(4,6):\n", r)

b = a.reshape(2, 3, 4)
print("3D:", b.shape)

# -1 auto-infers
c = a.reshape(6, -1)
print("Auto (6,4):", c.shape)

print("Transpose:", r.T.shape)
print("Flatten:", r.flatten()[:8], "...")
stack, hstack, vstack, split
import numpy as np

a = np.array([1, 2, 3])
b = np.array([4, 5, 6])

print("hstack:", np.hstack([a, b]))
print("vstack:\n", np.vstack([a, b]))
print("stack(axis=1):\n", np.stack([a, b], axis=1))

# Split
arr = np.arange(12)
p1, p2, p3 = np.split(arr, 3)
print("Split:", p1, p2, p3)

# Concatenate 2D
m1 = np.ones((2, 3))
m2 = np.zeros((2, 3))
print("concat row:\n", np.concatenate([m1, m2], axis=0))
np.repeat, np.pad, and np.roll
import numpy as np

a = np.array([1, 2, 3])

# repeat β€” repeat each element
print("repeat each 3x:", np.repeat(a, 3))    # [1 1 1 2 2 2 3 3 3]

# tile β€” repeat the whole array
print("tile 3x:", np.tile(a, 3))             # [1 2 3 1 2 3 1 2 3]

# pad β€” add border around array
m = np.ones((3, 3), dtype=int)
padded = np.pad(m, pad_width=1, mode='constant', constant_values=0)
print("Padded (zeros border):\n", padded)

# roll β€” shift elements cyclically
arr = np.array([1, 2, 3, 4, 5])
print("roll right 2:", np.roll(arr, 2))      # [4 5 1 2 3]
print("roll left 2: ", np.roll(arr, -2))     # [3 4 5 1 2]
np.split, np.array_split, and np.concatenate along different axes
import numpy as np

data = np.arange(24).reshape(4, 6)
print("Original (4x6):\n", data)

# np.split β€” equal-size splits (raises error if not divisible)
cols = np.split(data, 3, axis=1)      # 3 chunks of (4, 2)
print("\nSplit into 3 col-chunks, each shape:", cols[0].shape)

# np.array_split β€” unequal splits are allowed
rows = np.array_split(data, 3, axis=0)  # chunks of sizes 2, 1, 1
print("array_split row shapes:", [c.shape for c in rows])

# np.concatenate along different axes
a = np.ones((3, 4))
b = np.zeros((3, 4))
row_join = np.concatenate([a, b], axis=0)  # (6, 4)
col_join = np.concatenate([a, b], axis=1)  # (3, 8)
print("\nconcat axis=0 (row join):", row_join.shape)
print("concat axis=1 (col join):", col_join.shape)

# Round-trip: split then concatenate recovers original
chunks = np.array_split(data, [2, 3], axis=0)  # at rows 2 and 3
restored = np.concatenate(chunks, axis=0)
print("\nRound-trip split->concat matches original:", np.array_equal(data, restored))
💼 Real-World: Image Batch Preparation for Deep Learning
A computer vision engineer reshapes image batches into the correct formats for PyTorch (CHW) and TensorFlow (HWC) models.
import numpy as np

# Simulate a batch of images: (N, H, W, C) β€” TensorFlow/HWC format
batch_size, H, W, C = 32, 64, 64, 3
images = np.random.rand(batch_size, H, W, C).astype(np.float32)

# Flatten spatial dims for dense layer input
flat = images.reshape(batch_size, -1)
print(f"HWC flat:   {images.shape} β†’ {flat.shape}")

# Convert to CHW format for PyTorch: (N, C, H, W)
chw = images.transpose(0, 3, 1, 2)
print(f"CHW format: {chw.shape}")

# Stack two batches along batch axis
batch2   = np.random.rand(batch_size, H, W, C).astype(np.float32)
combined = np.concatenate([images, batch2], axis=0)
print(f"Combined:   {combined.shape}")
🏋️ Practice: Shape Manipulation Challenge
1) Start with np.arange(60), reshape to (3, 4, 5). Transpose axes to (5, 4, 3), then flatten. 2) Stack three (4, 3) matrices along a new axis 0 to get (3, 4, 3). 3) Split a (12, 8) matrix into 4 equal row-chunks and stack them along axis=2.
Starter Code
import numpy as np

# 1. Reshape, transpose, flatten
a = np.arange(60)
# TODO: r = a.reshape(3, 4, 5)
# TODO: t = r.transpose(2, 1, 0)  # axes order (5, 4, 3)
# TODO: flat = t.flatten()
# print("Original shape:", r.shape)
# print("Transposed:", t.shape)
# print("Flattened:", flat.shape)

# 2. Stack three matrices along new axis
m1 = np.ones((4, 3))
m2 = np.zeros((4, 3))
m3 = np.full((4, 3), 2.0)
# TODO: stacked = np.stack([m1, m2, m3], axis=0)
# print("Stacked shape:", stacked.shape)  # (3, 4, 3)

# 3. Split and re-stack
big = np.arange(96).reshape(12, 8)
# TODO: chunks = np.split(big, 4, axis=0)  # 4 chunks of (3, 8)
# TODO: result = np.stack(chunks, axis=2)  # (3, 8, 4)
# print("Result shape:", result.shape)
✅ Practice Checklist
10. Random Number Generation

NumPy's random module (use default_rng for new code) generates arrays from many distributions β€” key for simulations and augmentation.

Seeding and common distributions
import numpy as np

rng = np.random.default_rng(seed=42)   # recommended API

u   = rng.random((3, 3))               # uniform [0,1)
n   = rng.standard_normal((3, 3))      # N(0,1)
i   = rng.integers(0, 100, size=5)     # integers
c   = rng.choice([10,20,30,40], size=3, replace=False)

print("Uniform:\n",  u.round(2))
print("Normal:\n",   n.round(2))
print("Integers:", i)
print("Choice:  ", c)
Statistical distributions
import numpy as np

rng = np.random.default_rng(0)

normal  = rng.normal(loc=50, scale=10, size=1000)
uniform = rng.uniform(low=0, high=100, size=1000)
binom   = rng.binomial(n=10, p=0.3, size=1000)
poisson = rng.poisson(lam=5, size=1000)

for name, arr in [("normal",normal),("uniform",uniform),
                   ("binom",binom),("poisson",poisson)]:
    print(f"{name:8s}: mean={arr.mean():.2f}  std={arr.std():.2f}  "
          f"range=[{arr.min():.0f},{arr.max():.0f}]")
shuffle, permutation, and bootstrap sampling
import numpy as np

rng = np.random.default_rng(42)

arr = np.arange(10)

# permutation β€” returns NEW shuffled array
shuffled = rng.permutation(arr)
print("Original:  ", arr)
print("Permuted:  ", shuffled)

# shuffle β€” in-place
arr2 = arr.copy()
rng.shuffle(arr2)
print("In-place:  ", arr2)

# Bootstrap resampling
data = np.array([2.3, 4.1, 3.8, 5.0, 2.9, 4.4, 3.2, 4.7])
n_boot = 10000
boot_means = np.array([
    rng.choice(data, size=len(data), replace=True).mean()
    for _ in range(n_boot)
])
print(f"Bootstrap mean: {boot_means.mean():.3f}")
print(f"95% CI: [{np.percentile(boot_means,2.5):.3f}, "
      f"{np.percentile(boot_means,97.5):.3f}]")
New Generator API, seeded reproducibility, and normal vs uniform comparison
import numpy as np

# New-style Generator β€” preferred over np.random.seed / np.random.randn
rng1 = np.random.default_rng(seed=2024)
rng2 = np.random.default_rng(seed=2024)   # same seed β†’ identical output

a = rng1.standard_normal(5)
b = rng2.standard_normal(5)
print("Seeded reproducibility (same seed):", np.allclose(a, b))

# Spawning independent sub-generators (safe for parallel work)
child_rngs = rng1.spawn(3)
print("Spawned", len(child_rngs), "independent generators")
print("Child 0 sample:", child_rngs[0].random(3).round(3))

# Compare normal vs uniform for same N
N = 100_000
rng = np.random.default_rng(0)
norm = rng.normal(loc=0.5, scale=0.15, size=N)   # bell-shaped
unif = rng.uniform(low=0.0, high=1.0,  size=N)   # flat

for name, arr in [("normal", norm), ("uniform", unif)]:
    print(f"{name:8s}: mean={arr.mean():.4f}  std={arr.std():.4f}  "
          f"min={arr.min():.3f}  max={arr.max():.3f}")

# Clamp normal to [0,1] range and compare coverage
norm_clipped = np.clip(norm, 0, 1)
print(f"\nNormal values outside [0,1]: {((norm<0)|(norm>1)).sum()} / {N}")
print(f"Uniform values outside [0,1]: {((unif<0)|(unif>1)).sum()} / {N}")
💼 Real-World: Monte Carlo Stock Price Simulation
A risk analyst simulates 100,000 price paths to estimate portfolio VaR and probability of loss at year-end.
import numpy as np

rng      = np.random.default_rng(99)
n_sims   = 100_000
n_days   = 252
S0       = 100.0
mu_day   = 0.0003       # daily drift
sig_day  = 0.015        # daily volatility

# Geometric Brownian Motion
daily_ret = rng.normal(mu_day, sig_day, (n_sims, n_days))
paths     = S0 * np.cumprod(1 + daily_ret, axis=1)
final     = paths[:, -1]

var_95    = np.percentile(final, 5)
cvar_95   = final[final <= var_95].mean()
prob_loss = (final < S0).mean()

print(f"Expected year-end price:  ${final.mean():.2f}")
print(f"95% Value-at-Risk (loss): ${S0 - var_95:.2f}")
print(f"95% CVaR (expected loss): ${S0 - cvar_95:.2f}")
print(f"Probability of loss:      {prob_loss:.1%}")
🏋️ Practice: Monte Carlo Dice Simulation
Simulate 1,000,000 rolls of two 6-sided dice using NumPy (no Python loops). 1) Compute the frequency of each sum (2-12). 2) Compare to theoretical probability. 3) Find the empirical probability of rolling a sum of 7. 4) Simulate the 'Craps' first-roll win condition (sum 7 or 11).
Starter Code
import numpy as np
rng = np.random.default_rng(42)
N = 1_000_000

# Roll two dice, compute sums
die1 = rng.integers(1, 7, size=N)
die2 = rng.integers(1, 7, size=N)
# TODO: sums = die1 + die2

# 1. Frequency of each outcome (2-12)
print("Sum | Empirical | Theoretical")
for s in range(2, 13):
    # TODO: empirical = (sums == s).mean()
    ways = min(s-1, 13-s)  # number of ways to roll sum s
    theoretical = ways / 36
    # TODO: print(f"  {s:2d} | {empirical:.4f}    | {theoretical:.4f}")
    pass

# 2. Probability of sum == 7
# TODO: p7 = (sums == 7).mean()
# print(f"P(sum=7): empirical={p7:.4f}, theoretical={1/6:.4f}")

# 3. Craps win (7 or 11 on first roll)
# TODO: craps_win = ((sums == 7) | (sums == 11)).mean()
# print(f"Craps first-roll win: {craps_win:.4f} (expected 0.2222)")
✅ Practice Checklist
11. Memory Layout & Strides

Understand how NumPy stores data in memory β€” C vs Fortran order, strides, views vs copies, and how layout affects performance.

C-order vs Fortran-order arrays
import numpy as np

# C-order: row-major (default)
A = np.array([[1,2,3],[4,5,6]], order='C')
B = np.array([[1,2,3],[4,5,6]], order='F')  # Fortran-order: column-major

print('C-order strides:', A.strides)   # (bytes per row, bytes per element)
print('F-order strides:', B.strides)   # (bytes per element, bytes per col)
print('C contiguous:', A.flags['C_CONTIGUOUS'])
print('F contiguous:', B.flags['F_CONTIGUOUS'])

# Convert between orders
A_f = np.asfortranarray(A)
print('After asfortranarray:', A_f.strides)
Views vs copies β€” when does NumPy copy?
import numpy as np

arr = np.arange(12).reshape(3, 4)

# Slicing creates a VIEW (no copy)
view = arr[::2, ::2]
print('Is view:', np.shares_memory(arr, view))  # True
view[0, 0] = 999
print('Original changed:', arr[0, 0])  # 999 β€” shared memory!

# Fancy indexing creates a COPY
copy = arr[[0, 2], :][:, [0, 2]]
print('Is copy:', not np.shares_memory(arr, copy))  # True
copy[0, 0] = -1
print('Original unchanged:', arr[0, 0])  # still 999

# Force a copy explicitly
explicit_copy = arr.copy()
print('Explicit copy shares memory:', np.shares_memory(arr, explicit_copy))
Sliding window view with stride tricks
import numpy as np

# Zero-copy rolling window using stride tricks
arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
window_size = 4

windows = np.lib.stride_tricks.sliding_window_view(arr, window_size)
print('Windows shape:', windows.shape)  # (7, 4)
print('Windows:\n', windows)

# Rolling statistics without loops
print('Rolling mean:', windows.mean(axis=1).round(2))
print('Rolling max: ', windows.max(axis=1))
print('Rolling std: ', windows.std(axis=1).round(3))

# 2D sliding window (e.g., image patches)
img = np.arange(25).reshape(5, 5)
patches = np.lib.stride_tricks.sliding_window_view(img, (3, 3))
print('Image patches shape:', patches.shape)  # (3, 3, 3, 3)
In-place operations with out parameter
import numpy as np

a = np.random.rand(1_000_000)
b = np.random.rand(1_000_000)
result = np.empty_like(a)

# Avoid allocating a new array β€” write directly into result
np.add(a, b, out=result)          # result = a + b (no temp array)
np.sqrt(a, out=a)                 # in-place square root
np.multiply(result, 2.0, out=result)

print('result[:5]:', result[:5].round(4))
print('a[:5] (sqrt in-place):', a[:5].round(4))

# Chain multiple ufunc operations with out
x = np.linspace(0, np.pi, 6)
tmp = np.empty_like(x)
np.sin(x, out=tmp)        # tmp = sin(x)
np.square(tmp, out=tmp)   # tmp = sin(x)^2 β€” still same memory
print('sin^2:', tmp.round(4))
🏋️ Practice: Stride-Based Feature Matrix
Given a 1D time series, use sliding_window_view to build a feature matrix where each row contains [t-3, t-2, t-1, t] values. Then compute the mean and std of each row as features.
Starter Code
import numpy as np

np.random.seed(0)
ts = np.cumsum(np.random.randn(50))  # random walk

# TODO: create sliding window view with window_size=4
# TODO: compute mean and std of each window row
# TODO: stack into feature matrix (n_samples x 3): [window_mean, window_std, last_value]
# TODO: print shape and first 5 rows
✅ Practice Checklist
12. Structured Arrays

Store heterogeneous data (mixed types) in a single NumPy array using structured dtypes β€” combine ints, floats, and strings like a lightweight in-memory table.

Creating structured arrays with named fields
import numpy as np

# Define dtype with field names and types
dtype = np.dtype([
    ('name',  'U20'),   # Unicode string, max 20 chars
    ('age',   'i4'),    # 32-bit integer
    ('score', 'f8'),    # 64-bit float
    ('grade', 'U2'),    # short string
])

data = np.array([
    ('Alice', 25, 92.5, 'A'),
    ('Bob',   30, 78.3, 'B'),
    ('Carol', 22, 95.1, 'A'),
    ('Dave',  28, 65.0, 'C'),
], dtype=dtype)

print('Array dtype:', data.dtype)
print('Names:', data['name'])
print('Scores:', data['score'])
print('Alice row:', data[0])
Filtering and sorting structured arrays
import numpy as np

dtype = np.dtype([('name','U20'),('dept','U10'),('salary','f8'),('years','i4')])
employees = np.array([
    ('Alice',  'Eng',   95000, 5),
    ('Bob',    'Sales', 72000, 3),
    ('Carol',  'Eng',   102000, 8),
    ('Dave',   'HR',    68000, 2),
    ('Eve',    'Eng',   88000, 4),
    ('Frank',  'Sales', 76000, 6),
], dtype=dtype)

# Filter: engineers only
eng = employees[employees['dept'] == 'Eng']
print('Engineers:', eng['name'])

# Sort by salary descending
sorted_emp = np.sort(employees, order='salary')[::-1]
print('By salary:', sorted_emp[['name', 'salary']])

# Boolean mask: high earners with 4+ years
high = employees[(employees['salary'] > 80000) & (employees['years'] >= 4)]
print('High earners:', high['name'])
numpy.recarray for attribute-style access
import numpy as np

# recarray lets you access fields as attributes (rec.name instead of rec['name'])
students = np.rec.array([
    ('Alice', 3.9, 'CS'),
    ('Bob',   3.5, 'Math'),
    ('Carol', 3.7, 'CS'),
    ('Dave',  3.2, 'Physics'),
], dtype=[('name','U20'),('gpa','f8'),('major','U10')])

print('Names (attr):', students.name)
print('GPAs  (attr):', students.gpa)
print('CS students:', students.name[students.major == 'CS'])
print('Top student:', students.name[students.gpa.argmax()])

# recarray supports all standard numpy operations
print('Avg GPA:', students.gpa.mean().round(3))
Converting structured arrays to/from pandas
import numpy as np
import pandas as pd

dtype = np.dtype([('product','U30'),('price','f8'),('qty','i4'),('in_stock','?')])
products = np.array([
    ('Laptop',  999.99, 15, True),
    ('Mouse',    29.99, 80, True),
    ('Monitor', 399.99,  5, False),
    ('Keyboard', 79.99, 30, True),
], dtype=dtype)

# Structured array -> DataFrame
df = pd.DataFrame(products)
print('DataFrame:')
print(df)

# DataFrame -> structured array
df2 = pd.DataFrame({'x': [1.0, 2.0], 'y': [3.0, 4.0], 'label': ['a', 'b']})
rec = df2.to_records(index=False)
print('\nRecord array:', rec)
print('x field:', rec['x'])
🏋️ Practice: Student Records Sorter
Create a structured array with fields: name (str), grade (int), gpa (float) for 6 students. Sort by gpa descending and print the top 3. Then compute average gpa per grade level.
Starter Code
import numpy as np

dtype = np.dtype([('name','U20'),('grade','i4'),('gpa','f8')])
students = np.array([
    ('Alice', 11, 3.9), ('Bob', 12, 3.4), ('Carol', 11, 3.7),
    ('Dave', 12, 3.1), ('Eve', 10, 3.8), ('Frank', 10, 3.5),
], dtype=dtype)

# TODO: sort by gpa descending, print top 3
# TODO: for each unique grade, compute and print average gpa
✅ Practice Checklist
13. Linear Algebra & Polynomial Fitting

NumPy's linear algebra routines and polynomial tools β€” solve systems of equations, compute eigendecompositions, SVD, and fit curves to data.

Solving linear systems with np.linalg.solve
import numpy as np

# Solve Ax = b
# 3x + y = 9
# x + 2y = 8
A = np.array([[3, 1], [1, 2]], dtype=float)
b = np.array([9, 8], dtype=float)

x = np.linalg.solve(A, b)
print('Solution x:', x)               # [2. 3.]
print('Verify Ax == b:', np.allclose(A @ x, b))

# Matrix inverse (prefer linalg.solve for stability)
A_inv = np.linalg.inv(A)
print('Via inverse:', A_inv @ b)

# Check if system is well-conditioned
print('Condition number:', np.linalg.cond(A))

# Least-squares for overdetermined systems
A_over = np.vstack([A, [2, 3]])
b_over = np.append(b, 13)
x_ls, _, _, _ = np.linalg.lstsq(A_over, b_over, rcond=None)
print('Least-squares solution:', x_ls.round(4))
Eigenvalues and eigenvectors
import numpy as np

A = np.array([[4, 2], [1, 3]], dtype=float)
vals, vecs = np.linalg.eig(A)

print('Eigenvalues: ', vals)    # [5. 2.]
print('Eigenvectors:\n', vecs)  # columns are eigenvectors

# Verify: A @ v = lambda * v
for lam, v in zip(vals, vecs.T):
    print(f'lambda={lam:.1f}: A@v={A@v.round(4)}, lam*v={lam*v.round(4)}, match={np.allclose(A@v, lam*v)}')

# Symmetric matrix has real eigenvalues β€” use eigh for efficiency
S = np.array([[6, 2, 1], [2, 3, 1], [1, 1, 1]], dtype=float)
vals_s, vecs_s = np.linalg.eigh(S)  # eigh for symmetric/Hermitian
print('Symmetric eigenvalues (sorted):', vals_s.round(3))
SVD decomposition and low-rank approximation
import numpy as np

# SVD: M = U @ diag(s) @ Vt
M = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10,11,12]], dtype=float)
U, s, Vt = np.linalg.svd(M, full_matrices=False)

print('U shape:', U.shape, '| s:', s.round(3), '| Vt shape:', Vt.shape)
print('Reconstruct error:', np.allclose(M, U @ np.diag(s) @ Vt))

# Low-rank approximation: keep top-k singular values
k = 1
M_approx = U[:, :k] @ np.diag(s[:k]) @ Vt[:k, :]
print(f'Rank-{k} approx:\n', M_approx.round(2))
print(f'Frobenius error: {np.linalg.norm(M - M_approx):.4f}')
print(f'Variance explained: {(s[:k]**2).sum()/(s**2).sum():.1%}')
Polynomial fitting with polyfit and polyval
import numpy as np

# Generate noisy quadratic data
np.random.seed(42)
x = np.linspace(-3, 3, 50)
y_true = 2*x**2 - 3*x + 1
y_noisy = y_true + np.random.randn(50) * 2

# Fit polynomial of degree 2
coeffs = np.polyfit(x, y_noisy, deg=2)
print('Fitted coefficients:', coeffs.round(4))  # should be ~[2, -3, 1]

# Evaluate the fitted polynomial
y_fit = np.polyval(coeffs, x)
rmse = np.sqrt(np.mean((y_fit - y_true)**2))
print(f'RMSE vs true: {rmse:.4f}')

# Compare degrees
for deg in [1, 2, 3, 5]:
    c = np.polyfit(x, y_noisy, deg)
    y_hat = np.polyval(c, x)
    print(f'Degree {deg}: train RMSE = {np.sqrt(np.mean((y_hat - y_noisy)**2)):.4f}')
🏋️ Practice: Curve Fitting & Residuals
Generate 60 points of noisy sine data (y = sin(x) + noise). Fit polynomial degrees 1, 3, 5, and 7. For each, compute the RMSE and plot residuals. Identify the degree with best bias-variance tradeoff.
Starter Code
import numpy as np

np.random.seed(42)
x = np.linspace(0, 2*np.pi, 60)
y = np.sin(x) + np.random.randn(60) * 0.2

# TODO: for degrees [1, 3, 5, 7]:
#   - fit np.polyfit
#   - compute y_hat with np.polyval
#   - compute RMSE
#   - print results
# TODO: print which degree has lowest RMSE
✅ Practice Checklist
14. Advanced Indexing & Fancy Indexing

Use integer arrays, boolean masks, np.where, np.take, and advanced multi-dimensional indexing to select and modify array elements efficiently.

Integer array indexing
import numpy as np

np.random.seed(42)
arr = np.random.randint(0, 100, (5, 6))
print('Array:\n', arr)

# Select specific rows
row_idx = [0, 2, 4]
print('\nRows 0, 2, 4:\n', arr[row_idx])

# Select specific (row, col) pairs
row_idx = [0, 1, 2, 3]
col_idx = [5, 4, 3, 2]
print('\nDiagonal elements (row, col pairs):', arr[row_idx, col_idx])

# Outer indexing: all combinations
rows = np.array([0, 2])
cols = np.array([1, 3, 5])
print('\nOuter indexing (2 rows x 3 cols):\n', arr[np.ix_(rows, cols)])

# Assign values via fancy index
arr2 = arr.copy()
arr2[[1, 3], :] = -1
print('\nAfter zeroing rows 1, 3:\n', arr2)
Boolean masking & np.where
import numpy as np

np.random.seed(0)
data = np.random.randn(20)
print('Data:', data.round(2))

# Boolean mask
mask = data > 0.5
print(f'\nValues > 0.5: {data[mask].round(2)}')
print(f'Count: {mask.sum()}')

# np.where: conditional selection
clipped = np.where(data > 0, data, 0)  # ReLU-style
print('\nReLU output:', clipped.round(2))

# Nested np.where: multi-condition
labels = np.where(data > 1, 'high', np.where(data < -1, 'low', 'mid'))
print('\nLabels:', labels)
for cat in ['low', 'mid', 'high']:
    print(f'  {cat}: {(labels == cat).sum()}')

# np.where for index retrieval
idx_high = np.where(data > 1)[0]
print('\nIndices of high values:', idx_high)
np.take, np.put, and argsort tricks
import numpy as np

np.random.seed(42)
scores = np.array([82, 45, 91, 67, 88, 55, 73, 96, 61, 79])
names  = np.array(['Alice','Bob','Carol','Dave','Eve','Frank','Grace','Hank','Iris','Jake'])

# argsort: get ranking indices
ranking = np.argsort(scores)[::-1]  # descending
print('Top 5 students:')
for rank, idx in enumerate(ranking[:5], 1):
    print(f'  {rank}. {names[idx]}: {scores[idx]}')

# np.take: equivalent to fancy indexing but works on flattened axis
top3_scores = np.take(scores, ranking[:3])
print('\nTop 3 scores:', top3_scores)

# np.partition: O(n) top-k (faster than full sort)
top_k = 3
partitioned = np.partition(scores, -top_k)[-top_k:]
print(f'Top {top_k} (unordered):', partitioned)

# np.searchsorted: binary search on sorted array
sorted_scores = np.sort(scores)
threshold = 80
insert_pos = np.searchsorted(sorted_scores, threshold)
print(f'\nScores >= {threshold}: {sorted_scores[insert_pos:]}')
Multi-dimensional advanced indexing
import numpy as np

np.random.seed(42)
# 3D array: (batch, height, width)
img_batch = np.random.randint(0, 256, (4, 8, 8), dtype=np.uint8)
print('Batch shape:', img_batch.shape)

# Select specific pixel from each image in batch
rows = [1, 2, 3, 4]  # row per image
cols = [1, 2, 3, 4]  # col per image
batch_idx = [0, 1, 2, 3]
selected_pixels = img_batch[batch_idx, rows, cols]
print('One pixel per image:', selected_pixels)

# Vectorized update: zero out a 3x3 patch in all images
img_batch[:, 2:5, 2:5] = 0
print('After zeroing patch:', img_batch[0, 1:6, 1:6])

# Use np.indices for meshgrid-style indexing
H, W = 4, 4
ri, ci = np.indices((H, W))
dist_from_center = np.sqrt((ri - H//2)**2 + (ci - W//2)**2)
print('\nDistance from center:\n', dist_from_center.round(2))
🏋️ Practice: Student Score Analyzer
Given a 2D array of exam scores (30 students x 5 subjects), use advanced indexing to: (1) find each student's best and worst subject, (2) select the top 5 students by total score using np.argpartition, (3) assign letter grades (A>=90, B>=80, C>=70, D>=60, F<60) using np.where, (4) create a pass/fail mask where students need avg >= 70 in at least 3 subjects.
Starter Code
import numpy as np

np.random.seed(42)
scores = np.random.randint(40, 100, (30, 5))
subjects = ['Math', 'Science', 'English', 'History', 'Art']
print('Score matrix shape:', scores.shape)

# TODO: (1) Find best and worst subject per student (argmax/argmin)
# TODO: (2) Top 5 students by total score (np.argpartition)
# TODO: (3) Assign letter grades A/B/C/D/F using np.where
# TODO: (4) Pass/fail mask (avg >= 70 in at least 3 subjects)
✅ Practice Checklist
15. NumPy Performance & Vectorization

Profile NumPy code, replace Python loops with vectorized operations, use einsum, out= parameters, and Numba JIT compilation for maximum speed.

Loop vs vectorized benchmark
import numpy as np
import time

np.random.seed(42)
n = 1_000_000
x = np.random.randn(n)
y = np.random.randn(n)

# Python loop (slow)
def dot_loop(a, b):
    result = 0.0
    for i in range(len(a)):
        result += a[i] * b[i]
    return result

# NumPy vectorized (fast)
def dot_numpy(a, b):
    return np.dot(a, b)

# Benchmark (small n for loop)
small_n = 10_000
x_s, y_s = x[:small_n], y[:small_n]

t0 = time.perf_counter()
for _ in range(5): dot_loop(x_s, y_s)
loop_time = (time.perf_counter() - t0) / 5

t0 = time.perf_counter()
for _ in range(100): dot_numpy(x, y)
np_time = (time.perf_counter() - t0) / 100

print(f'Loop (n={small_n:,}):   {loop_time*1000:.2f}ms')
print(f'NumPy (n={n:,}): {np_time*1000:.2f}ms')
print(f'NumPy processes {n/small_n:.0f}x more data in {loop_time/np_time:.0f}x less time')
print(f'Speedup: ~{loop_time*n/(np_time*small_n):.0f}x')
np.einsum for tensor contractions
import numpy as np
import time

np.random.seed(0)
A = np.random.randn(50, 30)
B = np.random.randn(30, 40)
C = np.random.randn(50, 40)

# Matrix multiplication
result1 = np.einsum('ij,jk->ik', A, B)         # same as A @ B
result2 = np.einsum('ij,ij->', A, A[:,:30])    # element-wise sum of squares (trace of A.T@A)

# Batch matrix multiply: (batch, M, K) x (batch, K, N) -> (batch, M, N)
batch = 16
X = np.random.randn(batch, 10, 8)
W = np.random.randn(batch, 8, 6)
bmm_einsum = np.einsum('bij,bjk->bik', X, W)
bmm_loop   = np.array([x @ w for x, w in zip(X, W)])

print('einsum batch matmul shape:', bmm_einsum.shape)
print('Matches loop result:', np.allclose(bmm_einsum, bmm_loop))

# Outer product
v1, v2 = np.array([1,2,3]), np.array([4,5,6])
outer = np.einsum('i,j->ij', v1, v2)
print('Outer product:\n', outer)

# Trace
M = np.random.randn(4, 4)
print(f'Trace: einsum={np.einsum("ii", M):.4f}, np.trace={np.trace(M):.4f}')
Memory-efficient operations with out=
import numpy as np
import time

np.random.seed(42)
n = 5_000_000
x = np.random.rand(n).astype(np.float32)
y = np.random.rand(n).astype(np.float32)

# Without out= (allocates new array each time)
def compute_no_out(x, y, n_iters=20):
    result = np.empty_like(x)
    for _ in range(n_iters):
        result = np.sin(x) + np.cos(y)  # creates temporary arrays
    return result

# With out= (reuses pre-allocated buffers)
def compute_with_out(x, y, n_iters=20):
    buf1   = np.empty_like(x)
    buf2   = np.empty_like(x)
    result = np.empty_like(x)
    for _ in range(n_iters):
        np.sin(x, out=buf1)         # no allocation
        np.cos(y, out=buf2)
        np.add(buf1, buf2, out=result)
    return result

t0 = time.perf_counter()
r1 = compute_no_out(x, y)
t1 = time.perf_counter() - t0

t0 = time.perf_counter()
r2 = compute_with_out(x, y)
t2 = time.perf_counter() - t0

print(f'Without out=: {t1:.3f}s')
print(f'With    out=: {t2:.3f}s')
print(f'Speedup: {t1/t2:.2f}x')
print(f'Results match: {np.allclose(r1, r2)}')
Numba JIT acceleration
try:
    import numba
    from numba import njit
    import numpy as np
    import time

    @njit
    def pairwise_dist_numba(X):
        n = X.shape[0]
        D = np.empty((n, n), dtype=np.float64)
        for i in range(n):
            for j in range(i, n):
                d = 0.0
                for k in range(X.shape[1]):
                    diff = X[i, k] - X[j, k]
                    d += diff * diff
                D[i, j] = D[j, i] = d**0.5
        return D

    np.random.seed(42)
    X = np.random.randn(500, 10)

    # Warm-up JIT
    pairwise_dist_numba(X[:5])

    t0 = time.perf_counter()
    D_numba = pairwise_dist_numba(X)
    t_numba = time.perf_counter() - t0

    # NumPy equivalent
    t0 = time.perf_counter()
    from sklearn.metrics import pairwise_distances
    D_np = pairwise_distances(X)
    t_np = time.perf_counter() - t0

    print(f'Numba: {t_numba*1000:.1f}ms')
    print(f'sklearn pairwise: {t_np*1000:.1f}ms')
    print(f'Match: {np.allclose(D_numba, D_np)}')
except ImportError:
    print('pip install numba')
    print('Numba @njit: compiles Python+NumPy loops to LLVM machine code.')
    print('Typical speedup: 10-100x for loop-heavy numeric code.')
🏋️ Practice: Vectorize a Distance Computation
Given a matrix X of shape (1000, 5), compute the pairwise Euclidean distance matrix D[i,j] = ||x_i - x_j||. Implement three versions: (1) pure Python nested loop, (2) NumPy broadcasting (X[:,None,:] - X[None,:,:]), (3) using np.einsum or scipy.spatial.distance. Benchmark all three and report speedups.
Starter Code
import numpy as np
import time

np.random.seed(42)
X = np.random.randn(1000, 5)

# TODO: (1) Python nested loop (test on X[:50] for speed)
# TODO: (2) NumPy broadcasting: (n,1,d) - (1,n,d), then norm
# TODO: (3) scipy or einsum approach
# TODO: Benchmark all three and print speedups
✅ Practice Checklist
16. Signal Processing with NumPy

Apply Fourier transforms, filtering, convolution, and spectral analysis to 1D and 2D signals using NumPy's FFT routines.

FFT and frequency spectrum
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# Composite signal: 5Hz + 20Hz + noise
fs = 500      # sampling rate
t  = np.linspace(0, 1, fs, endpoint=False)
signal = (np.sin(2*np.pi*5*t) +
          0.5*np.sin(2*np.pi*20*t) +
          np.random.randn(fs)*0.3)

# FFT
fft_vals = np.fft.rfft(signal)
freqs    = np.fft.rfftfreq(fs, d=1/fs)
power    = np.abs(fft_vals)**2

# Find dominant frequencies
top_freq_idx = np.argsort(power)[::-1][:3]
print('Top frequencies (Hz):', freqs[top_freq_idx].round(1))
print('Corresponding power:', power[top_freq_idx].round(1))

fig, axes = plt.subplots(2, 1, figsize=(10, 6))
axes[0].plot(t[:100], signal[:100])
axes[0].set_title('Signal (first 100 samples)'); axes[0].set_xlabel('Time (s)')
axes[1].plot(freqs[:50], power[:50])
axes[1].set_title('Power Spectrum'); axes[1].set_xlabel('Frequency (Hz)')
plt.tight_layout(); plt.savefig('fft.png', dpi=80); plt.close(); print('Saved fft.png')
Low-pass filter with FFT
import numpy as np

# Create noisy signal
fs = 1000
t  = np.linspace(0, 1, fs)
clean  = np.sin(2*np.pi*10*t)           # 10 Hz signal
noisy  = clean + 0.5*np.random.randn(fs) + 0.3*np.sin(2*np.pi*200*t)

# FFT-based low-pass filter
def lowpass_fft(signal, fs, cutoff_hz):
    fft_sig  = np.fft.rfft(signal)
    freqs    = np.fft.rfftfreq(len(signal), d=1/fs)
    fft_sig[freqs > cutoff_hz] = 0  # zero out high frequencies
    return np.fft.irfft(fft_sig)

filtered = lowpass_fft(noisy, fs, cutoff_hz=30)

snr_noisy    = 10*np.log10(np.var(clean) / np.var(noisy - clean))
snr_filtered = 10*np.log10(np.var(clean) / np.var(filtered[:len(clean)] - clean))
print(f'SNR (noisy):    {snr_noisy:.2f} dB')
print(f'SNR (filtered): {snr_filtered:.2f} dB')
print(f'Improvement:    {snr_filtered - snr_noisy:.2f} dB')
print(f'Correlation (filtered vs clean): {np.corrcoef(clean, filtered[:len(clean)])[0,1]:.4f}')
1D convolution and moving average
import numpy as np

np.random.seed(42)
n = 200
t = np.arange(n)
signal = np.sin(0.1*t) + np.random.randn(n)*0.5

# Manual convolution = moving average
def moving_average(x, w):
    kernel = np.ones(w) / w
    return np.convolve(x, kernel, mode='same')

# Gaussian smoothing kernel
def gaussian_kernel(size, sigma):
    x = np.arange(-(size//2), size//2+1)
    k = np.exp(-x**2 / (2*sigma**2))
    return k / k.sum()

ma5  = moving_average(signal, 5)
ma20 = moving_average(signal, 20)
gk   = gaussian_kernel(21, sigma=3)
gauss_smooth = np.convolve(signal, gk, mode='same')

# Derivative via convolution (finite difference kernel)
diff_kernel = np.array([1, 0, -1]) / 2
derivative  = np.convolve(signal, diff_kernel, mode='same')

print(f'Signal length: {len(signal)}')
print(f'MA-5 smoothed std:    {ma5.std():.3f} (original: {signal.std():.3f})')
print(f'MA-20 smoothed std:   {ma20.std():.3f}')
print(f'Gaussian smooth std:  {gauss_smooth.std():.3f}')
print(f'Derivative range: [{derivative.min():.3f}, {derivative.max():.3f}]')
2D FFT for image frequency analysis
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# Create a synthetic 2D image with patterns
np.random.seed(42)
H, W = 64, 64
x, y = np.meshgrid(np.arange(W), np.arange(H))

# Low-freq + high-freq patterns + noise
image = (np.sin(2*np.pi*2*x/W) + np.sin(2*np.pi*5*y/H) +
         0.3*np.random.randn(H, W))

# 2D FFT
fft2d   = np.fft.fft2(image)
fft_mag = np.abs(np.fft.fftshift(fft2d))  # shift zero-freq to center
log_mag = np.log1p(fft_mag)                # log scale for visualization

# Low-pass filter: zero out high-freq components
center  = np.array([H//2, W//2])
Y, X    = np.ogrid[:H, :W]
dist_from_center = np.sqrt((Y - H//2)**2 + (X - W//2)**2)
mask    = dist_from_center <= 10
fft_shift = np.fft.fftshift(fft2d)
fft_shift *= mask
filtered = np.real(np.fft.ifft2(np.fft.ifftshift(fft_shift)))

print(f'Original image PSNR vs filtered: {10*np.log10(image.max()**2 / ((image-filtered)**2).mean()):.1f} dB')
print(f'Dominant frequency components in top 5% power: {(log_mag > np.percentile(log_mag, 95)).sum()}')
fig, axes = plt.subplots(1, 3, figsize=(12, 4))
axes[0].imshow(image, cmap='gray'); axes[0].set_title('Original')
axes[1].imshow(log_mag, cmap='hot'); axes[1].set_title('2D Spectrum (log)')
axes[2].imshow(filtered, cmap='gray'); axes[2].set_title('Low-pass Filtered')
plt.tight_layout(); plt.savefig('fft2d.png', dpi=80); plt.close(); print('Saved fft2d.png')
🏋️ Practice: Audio Denoising Pipeline
Simulate an audio signal: 440Hz tone + 880Hz harmonic + white noise. (1) Compute FFT and plot the power spectrum. (2) Design a bandpass FFT filter that keeps only 400-950Hz. (3) Reconstruct the signal with ifft. (4) Compute SNR before and after filtering. (5) Also try a 21-point Gaussian smoothing kernel via np.convolve as an alternative.
Starter Code
import numpy as np

np.random.seed(42)
fs = 8000
t  = np.linspace(0, 1, fs)

clean = np.sin(2*np.pi*440*t) + 0.5*np.sin(2*np.pi*880*t)
noisy = clean + np.random.randn(fs) * 0.8

# TODO: (1) FFT + power spectrum, find top frequency peaks
# TODO: (2) Bandpass FFT filter (keep 400-950Hz)
# TODO: (3) Reconstruct with ifft
# TODO: (4) SNR before and after
# TODO: (5) Compare with 21-pt Gaussian convolution
✅ Practice Checklist
17. Sorting, Searching & Partitioning

NumPy's sort, argsort, searchsorted, and partition give O(n log n) and O(n) ordering operations that operate on arrays without Python loops.

sort and argsort
import numpy as np

arr = np.array([3, 1, 4, 1, 5, 9, 2, 6])

# sort: returns sorted copy
sorted_arr = np.sort(arr)
print("sorted:", sorted_arr)

# argsort: indices that would sort the array
idx = np.argsort(arr)
print("argsort:", idx)
print("verify:", arr[idx])   # same as sorted_arr

# Sort along an axis (2D)
m = np.array([[3, 1, 4], [1, 5, 9], [2, 6, 5]])
print("row-wise sort:\n", np.sort(m, axis=1))
print("col-wise sort:\n", np.sort(m, axis=0))

# Stable sort for multi-key sorting
records = np.array([(2, 'b'), (1, 'a'), (2, 'a'), (1, 'b')],
                   dtype=[('key', int), ('val', 'U1')])
records.sort(order=['key', 'val'])
print("multi-key sort:", records)
argmin, argmax, searchsorted
import numpy as np

data = np.array([10, 5, 8, 3, 15, 7, 2, 12])

print("min:", data.min(), "at index:", data.argmin())
print("max:", data.max(), "at index:", data.argmax())

# 2D: axis parameter
m = np.array([[3, 1, 4], [1, 5, 9], [2, 6, 5]])
print("argmin per row:", m.argmin(axis=1))   # [1, 0, 0]
print("argmax per col:", m.argmax(axis=0))   # [0, 2, 1]

# searchsorted: binary search in sorted array (O(log n))
sorted_data = np.array([1, 3, 5, 7, 9, 11, 13])
vals = np.array([0, 3, 6, 14])

# 'left': index where val would be inserted to keep sorted
left_idx  = np.searchsorted(sorted_data, vals, side='left')
right_idx = np.searchsorted(sorted_data, vals, side='right')
print("left insert positions:", left_idx)
print("right insert positions:", right_idx)

# Use case: check if value exists
def is_in_sorted(arr, vals):
    idx = np.searchsorted(arr, vals)
    return (idx < len(arr)) & (arr[np.minimum(idx, len(arr)-1)] == vals)

print("vals in array:", is_in_sorted(sorted_data, vals))
partition and argpartition (O(n) top-k)
import numpy as np

np.random.seed(42)
scores = np.random.randint(0, 100, size=20)
print("Scores:", scores)

# partition: rearrange so k-th element is in sorted position
# Elements left of k are <= arr[k], right are >= arr[k]
k = 5
partitioned = np.partition(scores, k)
print(f"After partition(k={k}): {partitioned}")
print(f"Element at k={k}: {partitioned[k]} (this is the {k+1}th smallest)")

# Get top-5 scores (not sorted β€” just the 5 largest)
top5_idx  = np.argpartition(scores, -5)[-5:]
top5_vals = scores[top5_idx]
print(f"Top-5 values (unsorted): {top5_vals}")
print(f"Top-5 values (sorted):   {np.sort(top5_vals)[::-1]}")

# Bottom-5 (smallest)
bot5_idx  = np.argpartition(scores, 5)[:5]
bot5_vals = scores[bot5_idx]
print(f"Bottom-5 values: {np.sort(bot5_vals)}")

# Speed comparison concept: partition is O(n), sort is O(n log n)
# For finding top-k from 1M elements, argpartition is much faster
💼 Real-World: Ranking Sales Reps
A sales dashboard needs to rank 10,000 reps by revenue but only display the top 100, using argpartition for O(n) efficiency instead of a full sort.
import numpy as np

np.random.seed(42)
n_reps = 10_000
rep_ids = np.arange(n_reps)
revenues = np.random.lognormal(mean=10, sigma=1.5, size=n_reps)

K = 100  # top 100 reps

# Efficient: O(n) to get top-K indices, O(K log K) to sort only those
top_k_idx = np.argpartition(revenues, -K)[-K:]
top_k_sorted = top_k_idx[np.argsort(revenues[top_k_idx])[::-1]]

print("Top 10 reps:")
for rank, idx in enumerate(top_k_sorted[:10], 1):
    print(f"  Rank {rank:3d}: Rep #{rep_ids[idx]:5d} | Revenue: ${revenues[idx]:>12,.0f}")

# Percentile ranks using searchsorted
sorted_rev = np.sort(revenues)
sample_rev = np.array([50_000, 100_000, 500_000])
pct_ranks  = np.searchsorted(sorted_rev, sample_rev) / n_reps * 100
for rev, pct in zip(sample_rev, pct_ranks):
    print(f"  ${rev:>8,} is at the {pct:.1f}th percentile")
🏋️ Practice: Top-K Selector
Write a function top_k_products(sales, k) that takes a 2D array (rows=products, cols=days) and returns the indices of the k products with the highest total sales. Use argpartition for efficiency. Also write bottom_k_days(sales, k) that returns the k days (columns) with lowest average sales across all products.
Starter Code
import numpy as np

def top_k_products(sales, k):
    # Sum across days (axis=1), then use argpartition
    totals = sales.sum(axis=1)
    idx = np.argpartition(totals, -k)[-k:]
    return idx[np.argsort(totals[idx])[::-1]]

def bottom_k_days(sales, k):
    # Mean across products (axis=0), then argpartition
    daily_avg = sales.mean(axis=0)
    idx = np.argpartition(daily_avg, k)[:k]
    return idx[np.argsort(daily_avg[idx])]

np.random.seed(42)
sales = np.random.poisson(100, (50, 30))   # 50 products, 30 days
top3  = top_k_products(sales, 3)
bot3  = bottom_k_days(sales, 3)
print("Top 3 products (by total):", top3, "->", sales[top3].sum(axis=1))
print("Worst 3 days (by avg):   ", bot3, "->", sales[:, bot3].mean(axis=0).round(1))
✅ Practice Checklist
18. Set Operations on Arrays

NumPy provides unique(), union1d(), intersect1d(), setdiff1d(), and in1d()/isin() for fast set-like operations on 1D arrays using sorted-array algorithms.

unique and value counts
import numpy as np

arr = np.array([3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5])

# unique: sorted unique values
u = np.unique(arr)
print("unique:", u)

# return_counts: how many times each value appears
vals, counts = np.unique(arr, return_counts=True)
print("value counts:")
for v, c in zip(vals, counts):
    print(f"  {v}: {c}")

# return_index: first occurrence index
vals, idx = np.unique(arr, return_index=True)
print("first occurrence indices:", idx)

# return_inverse: reconstruct original from unique
vals, inv = np.unique(arr, return_inverse=True)
print("unique:", vals)
print("inverse:", inv)
print("reconstructed:", vals[inv])  # should match arr

# Most common value (mode-like)
mode_val = vals[np.argmax(counts)]
print(f"Mode: {mode_val} (appears {counts.max()} times)")
union1d, intersect1d, setdiff1d, setxor1d
import numpy as np

a = np.array([1, 2, 3, 4, 5])
b = np.array([3, 4, 5, 6, 7])

print("union (a | b):      ", np.union1d(a, b))
print("intersect (a & b):  ", np.intersect1d(a, b))
print("a - b (in a not b): ", np.setdiff1d(a, b))
print("b - a (in b not a): ", np.setdiff1d(b, a))
print("symmetric diff:     ", np.setxor1d(a, b))

# With return_indices for intersect
common, a_idx, b_idx = np.intersect1d(a, b, return_indices=True)
print("Common:", common, "at a:", a_idx, "at b:", b_idx)

# Practical: find new customers
old_customers = np.array([101, 102, 103, 104, 105])
new_customers = np.array([103, 104, 106, 107, 108])
returning = np.intersect1d(old_customers, new_customers)
brand_new = np.setdiff1d(new_customers, old_customers)
churned   = np.setdiff1d(old_customers, new_customers)
print(f"Returning: {returning}")
print(f"Brand new: {brand_new}")
print(f"Churned:   {churned}")
isin / in1d for membership testing
import numpy as np

products = np.array(['apple', 'banana', 'cherry', 'date', 'elderberry'])
blacklist = np.array(['banana', 'date', 'fig'])

# isin: element-wise membership test
mask = np.isin(products, blacklist)
print("Is in blacklist:", mask)
print("Allowed products:", products[~mask])
print("Blocked products:", products[mask])

# Large-scale membership test (much faster than Python loops)
np.random.seed(42)
user_ids      = np.arange(1_000_000)
premium_users = np.random.choice(user_ids, size=50_000, replace=False)

# Check a sample of 1000 users
sample = np.random.choice(user_ids, size=1000, replace=False)
is_premium = np.isin(sample, premium_users)
print(f"Sample size: {len(sample)}, Premium users in sample: {is_premium.sum()}")

# in1d is equivalent (older API)
same_result = np.in1d(sample, premium_users)
print("isin == in1d:", np.all(is_premium == same_result))

# Invert: users NOT in premium
non_premium = sample[~is_premium]
print(f"Non-premium in sample: {len(non_premium)}")
💼 Real-World: Inventory Reconciliation
A warehouse system compares today's scan vs expected manifest using NumPy set operations to find missing, extra, and duplicate items instantly across 100K SKUs.
import numpy as np

np.random.seed(42)
manifest = np.random.choice(np.arange(200_000), size=100_000, replace=False)

# Simulate scanning: miss 500, add 300 unexpected
missed_idx   = np.random.choice(len(manifest), size=500, replace=False)
extra_skus   = np.random.randint(200_000, 201_000, size=300)
scanned      = np.concatenate([np.delete(manifest, missed_idx), extra_skus])

manifest_u = np.unique(manifest)
scanned_u  = np.unique(scanned)

missing      = np.setdiff1d(manifest_u, scanned_u)
unexpected   = np.setdiff1d(scanned_u, manifest_u)
confirmed    = np.intersect1d(manifest_u, scanned_u)

# Duplicates in scan
vals, counts = np.unique(scanned, return_counts=True)
duplicates   = vals[counts > 1]

print(f"Manifest:    {len(manifest_u):,} SKUs")
print(f"Scanned:     {len(scanned_u):,} unique SKUs")
print(f"Confirmed:   {len(confirmed):,} ({len(confirmed)/len(manifest_u):.1%})")
print(f"Missing:     {len(missing):,}")
print(f"Unexpected:  {len(unexpected):,}")
print(f"Duplicates:  {len(duplicates):,}")
print(f"Accuracy:    {len(confirmed)/len(manifest_u):.2%}")
🏋️ Practice: Cohort Analysis
Write a function cohort_analysis(cohorts: dict) where cohorts maps week labels to user_id arrays. For each week, compute: (1) new users (not seen in any previous week), (2) returning users (seen in at least one previous week), (3) churned from previous week. Return a list of dicts with these counts.
Starter Code
import numpy as np

def cohort_analysis(cohorts):
    results = []
    seen_so_far = np.array([], dtype=int)
    prev_week   = np.array([], dtype=int)

    for week, users in sorted(cohorts.items()):
        users = np.unique(users)
        new_users  = np.setdiff1d(users, seen_so_far)
        returning  = np.intersect1d(users, seen_so_far)
        churned    = np.setdiff1d(prev_week, users)

        results.append({
            "week": week, "total": len(users),
            "new": len(new_users), "returning": len(returning),
            "churned_from_prev": len(churned),
        })
        seen_so_far = np.union1d(seen_so_far, users)
        prev_week   = users
    return results

np.random.seed(42)
cohorts = {
    "W1": np.random.randint(0, 100, 50),
    "W2": np.random.randint(20, 120, 60),
    "W3": np.random.randint(40, 140, 55),
    "W4": np.random.randint(60, 160, 70),
}
for r in cohort_analysis(cohorts):
    print(r)
✅ Practice Checklist
19. Numerical Stability, NaN & Inf Handling

Floating-point arithmetic has precision limits, NaN propagation, and overflow. NumPy provides tools to detect, mask, and safely handle these edge cases.

NaN detection and handling
import numpy as np

data = np.array([1.0, 2.0, np.nan, 4.0, np.nan, 6.0])

# Detection
print("isnan:", np.isnan(data))
print("Count NaNs:", np.isnan(data).sum())
print("Any NaN:", np.any(np.isnan(data)))

# NaN-safe aggregations
print("mean with NaN:", np.mean(data))          # nan
print("nanmean:", np.nanmean(data))              # 3.25
print("nansum:", np.nansum(data))               # 13.0
print("nanstd:", np.nanstd(data).round(4))
print("nanmin/nanmax:", np.nanmin(data), np.nanmax(data))

# Replace NaN
filled_mean = np.where(np.isnan(data), np.nanmean(data), data)
print("NaN -> mean:", filled_mean)

filled_zero = np.nan_to_num(data, nan=0.0)
print("NaN -> 0:   ", filled_zero)

# Forward fill (pandas-style, pure numpy)
def ffill(arr):
    mask = np.isnan(arr)
    idx = np.where(~mask, np.arange(len(arr)), 0)
    np.maximum.accumulate(idx, out=idx)
    return arr[idx]

print("Forward fill:", ffill(data))
Inf, overflow, and finfo
import numpy as np

# Infinity
x = np.array([1.0, -1.0, 0.0, np.inf, -np.inf, np.nan])
print("isinf:", np.isinf(x))
print("isfinite:", np.isfinite(x))
print("isnan:", np.isnan(x))

# All-in-one
print("Any problem:", ~np.isfinite(x).all())

# Replace inf with large values
clean = np.nan_to_num(x, nan=0.0, posinf=1e9, neginf=-1e9)
print("Cleaned:", clean)

# Overflow
float32 = np.float32(1e38)
print("float32 * 100:", float32 * 100)     # inf
print("float32 dtype max:", np.finfo(np.float32).max)

# Machine epsilon
for dtype in [np.float16, np.float32, np.float64]:
    fi = np.finfo(dtype)
    print(f"{dtype.__name__:10s}: eps={fi.eps:.2e}, max={fi.max:.2e}")

# Division edge cases
print("0/0:", np.float64(0) / np.float64(0))  # nan
print("1/0:", np.float64(1) / np.float64(0))  # inf
Numerical precision and stable computations
import numpy as np

# Floating-point is NOT exact
a = 0.1 + 0.2
print("0.1 + 0.2 ==  0.3:", a == 0.3)     # False!
print("value:", repr(a))
print("allclose:", np.isclose(a, 0.3))     # True (with tolerance)

# np.isclose with tolerances
x = np.array([1.0, 1.0 + 1e-8, 1.0 + 1e-5, 1.1])
print("isclose to 1.0:", np.isclose(x, 1.0, rtol=1e-5, atol=1e-8))

# Unstable: sum of large+small+large
big  = np.float32(1e8)
tiny = np.float32(1.0)
print("float32 (1e8 + 1 - 1e8):", big + tiny - big)  # may be 0!

# Kahan compensated summation is more stable
# np.sum uses pairwise summation which is better than naive
x = np.random.randn(1_000_000).astype(np.float32)
naive_sum = float(x[0])
for v in x[1:]:
    naive_sum += float(v)

np_sum = float(np.sum(x, dtype=np.float64))  # promote to float64
print(f"Naive float32 sum: {naive_sum:.4f}")
print(f"NumPy float64 sum: {np_sum:.4f}")

# Log-sum-exp trick for numerical stability (important in ML)
logits = np.array([1000.0, 1001.0, 999.0])  # would overflow exp()
lse = logits.max() + np.log(np.sum(np.exp(logits - logits.max())))
print(f"Log-sum-exp (stable): {lse:.6f}")
💼 Real-World: Financial Return Calculator
A risk system computes daily returns from price series that contain missing quotes (NaN), halted trading (0), and data errors (negative prices).
import numpy as np

np.random.seed(42)
n_days = 252
prices = 100 * np.exp(np.cumsum(np.random.randn(n_days) * 0.01))

# Inject data quality issues
prices[10]  = np.nan    # missing quote
prices[50]  = 0.0       # trading halt
prices[100] = -5.0      # data error

def clean_prices(prices):
    p = prices.copy()
    # Remove physically impossible values
    p[p <= 0] = np.nan
    # Forward-fill NaN
    mask = np.isnan(p)
    idx  = np.where(~mask, np.arange(len(p)), 0)
    np.maximum.accumulate(idx, out=idx)
    p = p[idx]
    return p

def daily_returns(prices):
    p = clean_prices(prices)
    rets = np.diff(p) / p[:-1]
    rets = rets[np.isfinite(rets)]
    return rets

rets = daily_returns(prices)
print(f"Days cleaned: {np.isnan(prices).sum() + (prices <= 0).sum()}")
print(f"Valid returns: {len(rets)}")
print(f"Mean return: {np.nanmean(rets):.4%}")
print(f"Volatility:  {np.nanstd(rets):.4%}")
print(f"Sharpe (approx): {np.nanmean(rets)/np.nanstd(rets)*np.sqrt(252):.2f}")
🏋️ Practice: Stable Softmax
Implement stable_softmax(x) that computes softmax using the log-sum-exp trick to avoid overflow. Implement log_softmax(x) using the same trick. Verify both give the same probabilities on x = [1000, 1001, 999] and x = [-1000, -999, -1001]. Compare against naive softmax to show instability.
Starter Code
import numpy as np

def naive_softmax(x):
    e = np.exp(x)
    return e / e.sum()

def stable_softmax(x):
    # TODO: subtract max(x) before exp, then normalize
    pass

def log_softmax(x):
    # TODO: return log of stable softmax (more numerically stable version)
    # hint: x - max(x) - log(sum(exp(x - max(x))))
    pass

for x in [np.array([1.0, 2.0, 3.0]),
          np.array([1000.0, 1001.0, 999.0]),
          np.array([-1000.0, -999.0, -1001.0])]:
    print(f"x = {x}")
    print(f"  naive:  {naive_softmax(x)}")
    print(f"  stable: {stable_softmax(x)}")
    print(f"  log:    {log_softmax(x)}")
    print(f"  exp(log) == stable: {np.allclose(np.exp(log_softmax(x)), stable_softmax(x))}")
✅ Practice Checklist
20. Probability Distributions & Random Sampling

NumPy's random module (Generator API) provides reproducible random sampling from dozens of distributions. Essential for simulations, bootstrapping, and synthetic data.

Generator API and reproducibility
import numpy as np

# Modern API: use default_rng (preferred over np.random.seed)
rng = np.random.default_rng(seed=42)

# Uniform
u = rng.uniform(low=0, high=10, size=5)
print("uniform:", u.round(2))

# Integers
dice = rng.integers(1, 7, size=10)
print("dice rolls:", dice)

# Shuffle and choice
items = np.arange(10)
rng.shuffle(items)
print("shuffled:", items)

sample = rng.choice(items, size=5, replace=False)
print("sample without replacement:", sample)

# Weighted choice
weights  = np.array([0.5, 0.3, 0.2])
outcomes = np.array(['A', 'B', 'C'])
draws    = rng.choice(outcomes, size=20, p=weights)
vals, counts = np.unique(draws, return_counts=True)
print("Weighted draws:", dict(zip(vals, counts)))

# Verify reproducibility
rng2 = np.random.default_rng(seed=42)
print("Same sequence:", np.all(rng2.uniform(size=5) == np.random.default_rng(42).uniform(size=5)))
Statistical distributions
import numpy as np

rng = np.random.default_rng(42)

# Normal distribution
heights = rng.normal(loc=170, scale=10, size=10_000)
print(f"Normal(170, 10): mean={heights.mean():.2f}, std={heights.std():.2f}")

# Lognormal (for prices, incomes)
prices = rng.lognormal(mean=3.0, sigma=0.5, size=5000)
print(f"Lognormal: median={np.median(prices):.2f}, mean={prices.mean():.2f}")

# Exponential (wait times, inter-arrivals)
wait_times = rng.exponential(scale=5.0, size=1000)  # mean = 5 min
print(f"Exponential(lambda=0.2): mean={wait_times.mean():.2f}")

# Poisson (event counts)
requests_per_sec = rng.poisson(lam=30, size=60)  # 30 req/s for 1 min
print(f"Poisson(30): mean={requests_per_sec.mean():.1f}, std={requests_per_sec.std():.1f}")

# Binomial (successes in n trials)
successes = rng.binomial(n=100, p=0.35, size=1000)  # conversion rate
print(f"Binomial(100, 0.35): mean={successes.mean():.1f} (expected 35)")

# Beta (probabilities, proportions)
click_rates = rng.beta(a=2, b=5, size=1000)
print(f"Beta(2, 5): mean={click_rates.mean():.3f} (expected {2/(2+5):.3f})")
Bootstrap sampling and Monte Carlo
import numpy as np

rng = np.random.default_rng(42)

# Bootstrap confidence interval
data = np.array([2.3, 4.1, 3.8, 5.2, 2.9, 4.7, 3.5, 6.1, 2.8, 4.4])

n_boot = 10_000
boot_means = np.array([
    rng.choice(data, size=len(data), replace=True).mean()
    for _ in range(n_boot)
])

ci_low, ci_high = np.percentile(boot_means, [2.5, 97.5])
print(f"Sample mean: {data.mean():.3f}")
print(f"95% bootstrap CI: [{ci_low:.3f}, {ci_high:.3f}]")

# Monte Carlo Pi estimation
N = 1_000_000
x = rng.uniform(-1, 1, N)
y = rng.uniform(-1, 1, N)
inside = (x**2 + y**2) <= 1.0
pi_est = 4 * inside.mean()
print(f"Pi estimate (N={N:,}): {pi_est:.5f} (true: {np.pi:.5f})")

# Simulate portfolio returns (Monte Carlo)
n_assets, n_sims, n_days = 5, 10_000, 252
mu    = rng.uniform(0.0001, 0.001, n_assets)   # daily expected return
sigma = rng.uniform(0.01, 0.03, n_assets)       # daily vol
daily_rets = rng.normal(mu, sigma, (n_sims, n_days, n_assets))
port_rets  = daily_rets.mean(axis=2).sum(axis=1)  # equal weight
var_95 = np.percentile(port_rets, 5)
print(f"Portfolio annual return mean: {port_rets.mean():.2%}")
print(f"VaR 95%: {var_95:.2%}")
💼 Real-World: A/B Test Simulation
A product team uses bootstrap simulation to determine whether a 2-percentage-point conversion rate improvement is statistically significant given their sample sizes.
import numpy as np

rng = np.random.default_rng(42)

# Observed data
n_control    = 5000
n_treatment  = 5000
conv_control = 0.10   # 10% baseline
conv_treat   = 0.12   # 12% treatment (absolute +2pp)

# Sample from observed proportions
control   = rng.binomial(1, conv_control, n_control)
treatment = rng.binomial(1, conv_treat,   n_treatment)

obs_diff = treatment.mean() - control.mean()
print(f"Observed difference: {obs_diff:.4f} ({obs_diff:.2%})")

# Permutation test
N_PERM = 10_000
combined = np.concatenate([control, treatment])
perm_diffs = np.empty(N_PERM)
for i in range(N_PERM):
    rng.shuffle(combined)
    perm_diffs[i] = combined[:n_treatment].mean() - combined[n_treatment:].mean()

p_value = (np.abs(perm_diffs) >= np.abs(obs_diff)).mean()
print(f"Permutation test p-value: {p_value:.4f}")
print(f"Significant at alpha=0.05: {p_value < 0.05}")

ci = np.percentile(perm_diffs, [2.5, 97.5])
print(f"Null distribution 95% CI: [{ci[0]:.4f}, {ci[1]:.4f}]")
🏋️ Practice: Distribution Fitter
Write fit_normal(data) that uses np.mean and np.std to estimate parameters, then generates n_samples from that distribution and computes the KS-like statistic (max absolute difference between sorted empirical CDF and normal CDF). Also write simulate_geometric_brownian(S0, mu, sigma, T, n_steps) for stock price simulation.
Starter Code
import numpy as np

def fit_normal(data, n_samples=1000, seed=42):
    rng = np.random.default_rng(seed)
    mu  = np.mean(data)
    std = np.std(data, ddof=1)
    # Generate from fitted distribution
    synthetic = rng.normal(mu, std, n_samples)
    # Empirical CDF comparison
    emp  = np.sort(data)
    emp_cdf = np.arange(1, len(emp)+1) / len(emp)
    # Normal CDF at emp points: use error function approximation
    from math import erf
    norm_cdf = np.array([0.5 * (1 + erf((x-mu)/(std*2**0.5))) for x in emp])
    ks_stat  = np.max(np.abs(emp_cdf - norm_cdf))
    return {"mu": mu, "std": std, "ks": ks_stat, "synthetic": synthetic}

def simulate_gbm(S0, mu, sigma, T, n_steps, seed=42):
    rng = np.random.default_rng(seed)
    dt  = T / n_steps
    Z   = rng.standard_normal(n_steps)
    # TODO: compute log returns and cumulative prices
    pass

rng = np.random.default_rng(42)
data = rng.normal(50, 10, 200)
result = fit_normal(data)
print(f"Fitted: mu={result['mu']:.2f}, std={result['std']:.2f}, KS={result['ks']:.4f}")
✅ Practice Checklist
21. Image Arrays & 2D Operations

Images are 2D (grayscale) or 3D (H x W x C) NumPy arrays. Understanding array operations on images builds intuition for convolutions, pooling, and data augmentation.

Image as array: channels and pixel operations
import numpy as np

# Images are (H, W) for grayscale, (H, W, C) for color
np.random.seed(42)
H, W = 64, 64

# Synthetic grayscale (0-255 uint8)
gray = np.random.randint(0, 256, (H, W), dtype=np.uint8)
print(f"Grayscale shape: {gray.shape}, dtype: {gray.dtype}")
print(f"Pixel range: [{gray.min()}, {gray.max()}]")

# Synthetic RGB image
rgb = np.random.randint(0, 256, (H, W, 3), dtype=np.uint8)
print(f"RGB shape: {rgb.shape}")

# Extract channels
R, G, B = rgb[:, :, 0], rgb[:, :, 1], rgb[:, :, 2]
print(f"R channel mean: {R.mean():.1f}")

# RGB to grayscale (luminosity formula)
gray_from_rgb = (0.2989 * R + 0.5870 * G + 0.1140 * B).astype(np.uint8)
print(f"Converted gray shape: {gray_from_rgb.shape}")

# Normalize to [0, 1] float
img_float = rgb.astype(np.float32) / 255.0
print(f"Normalized range: [{img_float.min():.3f}, {img_float.max():.3f}]")

# Crop a region
crop = rgb[10:40, 15:50, :]
print(f"Crop shape: {crop.shape}")

# Horizontal flip
flipped = rgb[:, ::-1, :]
print(f"Flipped shape: {flipped.shape}")
Convolution and pooling with strides
import numpy as np

def convolve2d(img, kernel):
    H, W = img.shape
    kH, kW = kernel.shape
    pad_h, pad_w = kH // 2, kW // 2
    # Zero-pad
    padded = np.pad(img, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')
    output = np.zeros_like(img, dtype=float)
    for i in range(H):
        for j in range(W):
            output[i, j] = (padded[i:i+kH, j:j+kW] * kernel).sum()
    return output

# Faster with stride_tricks (no Python loop)
def convolve2d_fast(img, kernel):
    from numpy.lib.stride_tricks import sliding_window_view
    H, W = img.shape
    kH, kW = kernel.shape
    pad_h, pad_w = kH // 2, kW // 2
    padded = np.pad(img, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')
    windows = sliding_window_view(padded, (kH, kW))
    return (windows * kernel).sum(axis=(-2, -1))

img = np.random.rand(32, 32)
# Edge detection kernel (Sobel-like)
sobel_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=float)
edges = convolve2d_fast(img, sobel_x)
print(f"Edge response range: [{edges.min():.3f}, {edges.max():.3f}]")

# Max pooling (2x2)
def max_pool2d(img, size=2):
    H, W = img.shape
    pH, pW = H // size, W // size
    return img[:pH*size, :pW*size].reshape(pH, size, pW, size).max(axis=(1, 3))

pooled = max_pool2d(img, 2)
print(f"After 2x2 max pool: {img.shape} -> {pooled.shape}")
Data augmentation with NumPy
import numpy as np

rng = np.random.default_rng(42)

def augment_batch(images, rng):
    # Augment a batch of images (N, H, W, C).
    batch = images.copy().astype(np.float32)

    # Random horizontal flip (50% chance per image)
    flip_mask = rng.random(len(batch)) > 0.5
    batch[flip_mask] = batch[flip_mask, :, ::-1, :]

    # Random brightness adjustment
    brightness = rng.uniform(0.8, 1.2, (len(batch), 1, 1, 1))
    batch = np.clip(batch * brightness, 0, 255)

    # Random crop and resize (simplified: just crop to 80% size)
    H, W = batch.shape[1:3]
    ch, cw = int(H * 0.8), int(W * 0.8)
    for i in range(len(batch)):
        y0 = rng.integers(0, H - ch)
        x0 = rng.integers(0, W - cw)
        # In real pipeline you would resize; here just show crop
        batch[i, :ch, :cw, :] = batch[i, y0:y0+ch, x0:x0+cw, :]

    # Gaussian noise
    noise = rng.normal(0, 5, batch.shape)
    batch = np.clip(batch + noise, 0, 255)

    return batch.astype(np.uint8)

# Demo
N, H, W, C = 8, 64, 64, 3
batch = rng.integers(50, 200, (N, H, W, C), dtype=np.uint8)
augmented = augment_batch(batch, rng)
print(f"Batch: {batch.shape} | Augmented: {augmented.shape}")
print(f"Orig mean: {batch.mean():.1f} | Aug mean: {augmented.mean():.1f}")
💼 Real-World: Batch Preprocessing Pipeline
A CNN training loop preprocesses a batch of 64x64 RGB images: normalize per-channel, apply random augmentation, and flatten into model input.
import numpy as np

rng = np.random.default_rng(42)
N, H, W, C = 32, 64, 64, 3

# Simulate a batch
batch = rng.integers(0, 256, (N, H, W, C), dtype=np.uint8)

# ImageNet-style normalization (per-channel mean/std)
MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
STD  = np.array([0.229, 0.224, 0.225], dtype=np.float32)

def preprocess(batch):
    x = batch.astype(np.float32) / 255.0  # [0,1]
    x = (x - MEAN) / STD                  # normalize
    x = x.transpose(0, 3, 1, 2)           # NHWC -> NCHW for PyTorch
    return x

# Random horizontal flip augmentation
def random_hflip(batch, rng, p=0.5):
    mask = rng.random(len(batch)) < p
    out  = batch.copy()
    out[mask] = batch[mask, :, ::-1, :]
    return out

processed = preprocess(random_hflip(batch, rng))
print(f"Input: {batch.shape} uint8")
print(f"Output: {processed.shape} float32")
print(f"Channel 0: mean={processed[:,0,:,:].mean():.3f}, std={processed[:,0,:,:].std():.3f}")
🏋️ Practice: Image Statistics
Write a function image_stats(img) that takes a (H,W,3) uint8 image and returns a dict with per-channel mean, std, min, max, and the percentage of 'near-white' pixels (all channels > 200) and 'near-black' pixels (all channels < 55). Also write normalize_contrast(img) that maps pixel values to [0,255] using min-max normalization per channel.
Starter Code
import numpy as np

def image_stats(img):
    # img is (H, W, 3) uint8
    stats = {}
    for c, name in enumerate(['R', 'G', 'B']):
        ch = img[:, :, c].astype(float)
        stats[name] = {"mean": ch.mean(), "std": ch.std(),
                       "min": int(ch.min()), "max": int(ch.max())}
    # Near white: all channels > 200
    near_white = (img > 200).all(axis=2)
    # Near black: all channels < 55
    near_black = (img < 55).all(axis=2)
    stats["near_white_pct"] = near_white.mean() * 100
    stats["near_black_pct"] = near_black.mean() * 100
    return stats

def normalize_contrast(img):
    # TODO: for each channel, map min->0, max->255
    out = np.empty_like(img, dtype=np.uint8)
    for c in range(3):
        ch = img[:, :, c].astype(float)
        lo, hi = ch.min(), ch.max()
        if hi > lo:
            out[:, :, c] = ((ch - lo) / (hi - lo) * 255).astype(np.uint8)
        else:
            out[:, :, c] = 0
    return out

rng = np.random.default_rng(42)
img = rng.integers(0, 256, (128, 128, 3), dtype=np.uint8)
stats = image_stats(img)
for k, v in stats.items():
    print(f"{k}: {v}")
normed = normalize_contrast(img)
print("Normalized range:", normed.min(), normed.max())
✅ Practice Checklist
22. Statistical Aggregations

NumPy provides percentiles, histograms, correlation, and covariance for descriptive statistics. These underpin nearly all data analysis workflows.

Percentile, quantile, and descriptive stats
import numpy as np

rng = np.random.default_rng(42)
data = rng.lognormal(mean=4.5, sigma=0.8, size=10_000)

# Percentiles and quantiles
p25, p50, p75 = np.percentile(data, [25, 50, 75])
print(f"Q1={p25:.1f}, Median={p50:.1f}, Q3={p75:.1f}")
print(f"IQR={p75-p25:.1f}")

# Quantile (equivalent but takes fractions 0-1)
q  = np.quantile(data, [0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0])
labels = ["0%", "25%", "50%", "75%", "90%", "95%", "99%", "100%"]
for l, v in zip(labels, q):
    print(f"  {l:>4s}: {v:>8.1f}")

# Outlier detection via IQR
iqr_mult = 1.5
lower = p25 - iqr_mult * (p75 - p25)
upper = p75 + iqr_mult * (p75 - p25)
outliers = data[(data < lower) | (data > upper)]
print(f"Outliers: {len(outliers)} ({len(outliers)/len(data):.1%})")

# 2D: percentile per column
m = rng.normal(0, 1, (100, 5))
col_medians = np.median(m, axis=0)
print("Col medians:", col_medians.round(2))
Histogram and frequency analysis
import numpy as np

rng = np.random.default_rng(42)
data = np.concatenate([rng.normal(30, 5, 3000),   # young cohort
                       rng.normal(55, 8, 2000)])   # senior cohort

# Basic histogram
counts, edges = np.histogram(data, bins=20)
centers = (edges[:-1] + edges[1:]) / 2
print("Histogram (first 5 bins):")
for c, n in zip(centers[:5], counts[:5]):
    print(f"  {c:.1f}: {'#' * (n//50)} ({n})")

# Normalized (density)
density, _ = np.histogram(data, bins=20, density=True)
print(f"Density sums to: {density.sum() * np.diff(edges).mean():.4f}")  # ~1.0

# 2D histogram
x = rng.normal(0, 1, 5000)
y = 0.7 * x + rng.normal(0, 0.5, 5000)  # correlated
counts_2d, xedges, yedges = np.histogram2d(x, y, bins=10)
print(f"2D histogram shape: {counts_2d.shape}")
print(f"Peak bin count: {counts_2d.max():.0f}")

# Digitize: assign each value to a bin
salary_bins = np.array([0, 30_000, 60_000, 100_000, 200_000, np.inf])
labels = ['entry', 'junior', 'mid', 'senior', 'exec']
salaries = np.array([25000, 45000, 75000, 120000, 350000, 58000])
bin_idx  = np.digitize(salaries, salary_bins) - 1
print("Salary bands:", [labels[min(i, len(labels)-1)] for i in bin_idx])
Correlation and covariance
import numpy as np

rng = np.random.default_rng(42)
n = 200

# Generate correlated variables
x1 = rng.normal(0, 1, n)
x2 = 0.8 * x1 + rng.normal(0, 0.6, n)   # strong positive correlation
x3 = rng.normal(0, 1, n)                  # uncorrelated

data = np.column_stack([x1, x2, x3])

# Pearson correlation matrix
corr = np.corrcoef(data.T)  # data.T because corrcoef expects (n_vars, n_obs)
print("Correlation matrix:")
for row in corr:
    print("  ", " ".join(f"{v:+.3f}" for v in row))

# Covariance matrix
cov = np.cov(data.T)
print("Covariance matrix diagonal (variances):", np.diag(cov).round(3))

# Manual Pearson r for two variables
def pearson_r(a, b):
    a_c, b_c = a - a.mean(), b - b.mean()
    return (a_c * b_c).sum() / (np.sqrt((a_c**2).sum()) * np.sqrt((b_c**2).sum()))

r12 = pearson_r(x1, x2)
r13 = pearson_r(x1, x3)
print(f"Pearson r(x1,x2)={r12:.3f}, r(x1,x3)={r13:.3f}")

# Rolling correlation
window = 30
roll_corr = np.array([
    pearson_r(x1[i:i+window], x2[i:i+window])
    for i in range(n - window)
])
print(f"Rolling({window}) correlation: mean={roll_corr.mean():.3f}")
💼 Real-World: Multi-Asset Risk Analysis
A risk team computes the 1-day 99% VaR and expected shortfall for a 10-asset portfolio using historical simulation on 5 years of daily returns.
import numpy as np

rng = np.random.default_rng(42)
n_assets = 10
n_days   = 1260   # 5 trading years

# Simulate correlated returns
true_corr = 0.3 * np.ones((n_assets, n_assets)) + 0.7 * np.eye(n_assets)
L = np.linalg.cholesky(true_corr)
Z = rng.standard_normal((n_days, n_assets))
returns = (Z @ L.T) * 0.01  # ~1% daily vol

# Equal-weight portfolio returns
weights  = np.ones(n_assets) / n_assets
port_ret = returns @ weights

# Risk metrics
var_99   = np.percentile(port_ret, 1)
es_99    = port_ret[port_ret <= var_99].mean()
ann_vol  = port_ret.std() * np.sqrt(252)

print(f"Portfolio daily returns: mean={port_ret.mean():.4%}, vol={port_ret.std():.4%}")
print(f"Annualized vol:    {ann_vol:.2%}")
print(f"1-day VaR (99%):   {var_99:.4%}")
print(f"Expected Shortfall (CVaR 99%): {es_99:.4%}")

# Correlation matrix
corr = np.corrcoef(returns.T)
print(f"Avg pairwise correlation: {corr[np.triu_indices(n_assets, k=1)].mean():.3f}")
🏋️ Practice: Weighted Statistics
Implement weighted_mean(data, weights), weighted_std(data, weights, ddof=0), and weighted_percentile(data, weights, q) where weights represent observation frequencies. Verify weighted_mean on data=[1,2,3,4,5] with weights=[5,1,1,1,1] (mean should be close to 1 since 5 is heavily weighted).
Starter Code
import numpy as np

def weighted_mean(data, weights):
    data, weights = np.asarray(data, float), np.asarray(weights, float)
    # TODO: return sum(data * weights) / sum(weights)
    pass

def weighted_std(data, weights, ddof=0):
    mean = weighted_mean(data, weights)
    # TODO: compute weighted variance then sqrt
    # variance = sum(w * (x - mean)^2) / sum(w) for ddof=0
    pass

def weighted_percentile(data, weights, q):
    # Sort data by value, then compute cumulative weight fraction
    data, weights = np.asarray(data, float), np.asarray(weights, float)
    idx     = np.argsort(data)
    sdata   = data[idx]
    sweights = weights[idx]
    cumw    = np.cumsum(sweights) / sweights.sum()
    # TODO: interpolate to find value at quantile q (0-100)
    pass

data    = np.array([1, 2, 3, 4, 5], float)
weights = np.array([5, 1, 1, 1, 1], float)
print("Weighted mean:", weighted_mean(data, weights))   # should be ~1.5
print("Weighted std: ", weighted_std(data, weights))
print("Weighted 50th:", weighted_percentile(data, weights, 50))
✅ Practice Checklist
23. Array I/O (save, load, text, memmap)

NumPy supports binary (.npy, .npz), text (savetxt/genfromtxt), and memory-mapped file formats for efficient large array storage and loading.

np.save, np.load, np.savez
import numpy as np
import tempfile, pathlib

tmp = pathlib.Path(tempfile.mkdtemp())

# Save/load single array (.npy)
arr = np.arange(12).reshape(3, 4)
np.save(tmp / 'array.npy', arr)

loaded = np.load(tmp / 'array.npy')
print("Loaded:", loaded)
print("Same data:", np.array_equal(arr, loaded))
print("File size:", (tmp / 'array.npy').stat().st_size, "bytes")

# Save multiple arrays in one file (.npz = numpy zip)
x  = np.linspace(0, 10, 100)
y  = np.sin(x)
dy = np.cos(x)
np.savez(tmp / 'signals.npz', x=x, y=y, dy=dy)

data = np.load(tmp / 'signals.npz')
print("Keys in npz:", list(data.keys()))
print("x[0:3]:", data['x'][:3].round(3))

# savez_compressed: gzip compression
np.savez_compressed(tmp / 'signals_compressed.npz', **dict(data))
orig_size = (tmp / 'signals.npz').stat().st_size
comp_size = (tmp / 'signals_compressed.npz').stat().st_size
print(f"Original: {orig_size} bytes, Compressed: {comp_size} bytes ({comp_size/orig_size:.0%})")

import shutil; shutil.rmtree(tmp)
savetxt and genfromtxt
import numpy as np
import tempfile, pathlib

tmp = pathlib.Path(tempfile.mkdtemp())

# savetxt: write to CSV/TSV
data = np.array([[1, 2.5, 3.7], [4, 5.1, 6.8], [7, 8.3, 9.0]])
np.savetxt(tmp / 'data.csv', data, delimiter=',', fmt='%.2f',
           header='col1,col2,col3', comments='')

# Read it back
loaded = np.genfromtxt(tmp / 'data.csv', delimiter=',', skip_header=1)
print("Loaded from CSV:\n", loaded)

# genfromtxt with mixed data and missing values
csv_content = '''id,temp,humidity,status
1,22.5,65.0,OK
2,nan,72.0,OK
3,19.8,,WARN
4,25.1,80.5,OK'''

(tmp / 'sensors.csv').write_text(csv_content)
sensors = np.genfromtxt(
    tmp / 'sensors.csv',
    delimiter=',',
    names=True,         # use header row as field names
    dtype=None,         # auto-detect dtypes
    encoding='utf-8',
    filling_values=np.nan  # fill missing with nan
)
print("dtype:", sensors.dtype)
print("temps:", sensors['temp'])
print("humidity:", sensors['humidity'])

import shutil; shutil.rmtree(tmp)
Memory-mapped arrays (np.memmap)
import numpy as np
import tempfile, pathlib

tmp = pathlib.Path(tempfile.mkdtemp())
fpath = tmp / 'large_array.dat'

# Create a memory-mapped array (simulating large data)
shape = (1000, 500)
# 'w+' = write+read, 'r+' = read+write existing, 'r' = read-only
mm_write = np.memmap(fpath, dtype='float32', mode='w+', shape=shape)

# Fill with data (only the modified pages are loaded into RAM)
mm_write[:100, :] = np.random.randn(100, 500).astype('float32')
mm_write.flush()  # ensure data is written to disk
del mm_write

# Read back without loading everything into memory
mm_read = np.memmap(fpath, dtype='float32', mode='r', shape=shape)
print(f"Array shape: {mm_read.shape}, dtype: {mm_read.dtype}")
print(f"First row (first 5): {mm_read[0, :5]}")
print(f"File size: {fpath.stat().st_size:,} bytes")
print(f"Expected: {shape[0]*shape[1]*4:,} bytes (float32 = 4 bytes)")

# Slice a subarray (still memory-mapped, not loaded)
sub = mm_read[:50, :100]
print(f"Subarray shape: {sub.shape}, mean={sub.mean():.4f}")

del mm_read
import shutil; shutil.rmtree(tmp)
💼 Real-World: Model Checkpoint System
A deep learning workflow saves model weights and training history to compressed .npz files and reloads them for inference, logging the disk footprint.
import numpy as np, tempfile, pathlib, shutil

tmp = pathlib.Path(tempfile.mkdtemp())

# Simulate model weights for a small neural network
rng = np.random.default_rng(42)
weights = {
    "W1": rng.standard_normal((784, 256)).astype(np.float32),
    "b1": np.zeros(256, dtype=np.float32),
    "W2": rng.standard_normal((256, 128)).astype(np.float32),
    "b2": np.zeros(128, dtype=np.float32),
    "W3": rng.standard_normal((128, 10)).astype(np.float32),
    "b3": np.zeros(10, dtype=np.float32),
}
history = {
    "train_loss": rng.uniform(0.5, 2.0, 50).cumsum() ** 0 * np.linspace(2, 0.2, 50),
    "val_loss":   rng.uniform(0.5, 2.0, 50).cumsum() ** 0 * np.linspace(2.1, 0.25, 50),
    "epoch":      np.arange(50),
}

def save_checkpoint(path, weights, history, epoch):
    np.savez_compressed(path, epoch=epoch, **weights, **history)
    return path.stat().st_size

def load_checkpoint(path):
    data = np.load(path)
    epoch = int(data['epoch'])
    W = {k: data[k] for k in ['W1','b1','W2','b2','W3','b3']}
    H = {k: data[k] for k in ['train_loss','val_loss','epoch']}
    return epoch, W, H

ckpt_path = tmp / 'checkpoint_ep50.npz'
size = save_checkpoint(ckpt_path, weights, history, epoch=50)
print(f"Checkpoint saved: {size:,} bytes ({size/1024:.1f} KB)")

ep, W, H = load_checkpoint(ckpt_path)
print(f"Loaded epoch {ep}, W1 shape: {W['W1'].shape}")
print(f"Final train loss: {H['train_loss'][-1]:.4f}")

shutil.rmtree(tmp)
🏋️ Practice: Data Pipeline with npz Cache
Write a DataCache class that saves a numpy array to disk as .npz if it doesn't exist, or loads it if it does. Add methods: cache_exists(key), save(key, **arrays), load(key), and delete(key). The cache directory should be configurable in __init__. Test by saving and loading a feature matrix and label array.
Starter Code
import numpy as np, pathlib

class DataCache:
    def __init__(self, cache_dir='./cache'):
        self.dir = pathlib.Path(cache_dir)
        self.dir.mkdir(parents=True, exist_ok=True)

    def _path(self, key):
        return self.dir / f"{key}.npz"

    def exists(self, key):
        return self._path(key).exists()

    def save(self, key, **arrays):
        np.savez_compressed(self._path(key), **arrays)

    def load(self, key):
        if not self.exists(key):
            raise FileNotFoundError(f"Cache key {key!r} not found")
        return dict(np.load(self._path(key)))

    def delete(self, key):
        p = self._path(key)
        if p.exists(): p.unlink()

import tempfile, shutil
tmp = tempfile.mkdtemp()
cache = DataCache(tmp)

rng = np.random.default_rng(42)
X = rng.standard_normal((1000, 20))
y = rng.integers(0, 5, 1000)

cache.save('features', X=X, y=y)
print("Exists:", cache.exists('features'))

data = cache.load('features')
print("Loaded X:", data['X'].shape, "y:", data['y'].shape)
print("Data matches:", np.allclose(X, data['X']))

cache.delete('features')
print("After delete:", cache.exists('features'))
shutil.rmtree(tmp)
✅ Practice Checklist
24. Datetime64 & Time Arithmetic

NumPy's datetime64 and timedelta64 types enable vectorized date/time arithmetic without Python loops. Essential for time series alignment and resampling.

datetime64 basics and arithmetic
import numpy as np

# Create datetime64 scalars and arrays
d1 = np.datetime64('2024-01-15')
d2 = np.datetime64('2024-03-20')
print("d1:", d1, "d2:", d2)

# Arithmetic
delta = d2 - d1
print(f"Delta: {delta}")               # timedelta64
print(f"Days between: {delta.astype('timedelta64[D]').astype(int)}")

# Add days
d3 = d1 + np.timedelta64(30, 'D')
print(f"30 days after d1: {d3}")

# Date range
dates = np.arange('2024-01', '2025-01', dtype='datetime64[M]')  # monthly
print("Monthly dates:", dates)
print("Count:", len(dates))

# Different units
dt_ns = np.datetime64('2024-01-15T09:30:00.000000000', 'ns')
dt_s  = np.datetime64('2024-01-15T09:30:00', 's')
print("Nanosecond precision:", dt_ns)
print("Second precision:", dt_s)

# Year, month, day extraction via astype
day_arr  = np.arange('2024-01-01', '2024-02-01', dtype='datetime64[D]')
months   = day_arr.astype('datetime64[M]').astype(int) % 12 + 1
dom      = (day_arr - day_arr.astype('datetime64[M]')).astype(int) + 1
print("Days of month:", dom[:7])
Business day operations and calendar
import numpy as np

# busdaycalendar: define which days are business days
# Default: Mon-Fri, no holidays
bdc = np.busdaycalendar()

# busday_count: count business days between dates
start = np.datetime64('2024-01-01')
end   = np.datetime64('2024-12-31')
n_bdays = np.busday_count(start, end)
print(f"Business days in 2024: {n_bdays}")

# is_busday: check if date is a business day
dates = np.arange('2024-01-01', '2024-01-08', dtype='datetime64[D]')
for d in dates:
    bd = np.is_busday(d)
    print(f"  {d} ({['Mon','Tue','Wed','Thu','Fri','Sat','Sun'][(d.astype('datetime64[D]').astype(int))%7]}): {'Business' if bd else 'Weekend'}")

# offset_busdays: advance by N business days
t_plus_5bd = np.busday_offset('2024-01-15', 5, roll='forward')
print(f"5 business days after Jan 15: {t_plus_5bd}")

# Custom calendar with holidays
holidays  = np.array(['2024-01-01', '2024-12-25'], dtype='datetime64[D]')
bdc_h = np.busdaycalendar(holidays=holidays)
# Settlement: T+2 business days (like equity markets)
trade_dates = np.array(['2024-01-03', '2024-12-24', '2024-12-26'], dtype='datetime64')
settlement  = np.busday_offset(trade_dates, 2, busdaycal=bdc_h)
for t, s in zip(trade_dates, settlement):
    print(f"  Trade {t} -> Settle {s}")
Time series alignment and resampling
import numpy as np

rng = np.random.default_rng(42)

# Generate irregular timestamps (like real tick data)
n = 200
base  = np.datetime64('2024-01-15T09:30:00', 's')
gaps  = rng.integers(1, 60, n).astype('timedelta64[s]')   # 1-60 sec gaps
times = base + np.cumsum(gaps)

prices = 100 + np.cumsum(rng.normal(0, 0.05, n))

print(f"First timestamp: {times[0]}")
print(f"Last timestamp:  {times[-1]}")
print(f"Total duration: {(times[-1] - times[0]).astype(int)} seconds")

# Bin into 1-minute buckets (OHLCV)
minute_start = times.astype('datetime64[m]')
unique_mins  = np.unique(minute_start)

print(f"Number of 1-minute bars: {len(unique_mins)}")

# OHLCV for each minute
for m in unique_mins[:3]:
    mask = (minute_start == m)
    bar_prices = prices[mask]
    print(f"  {m}: O={bar_prices[0]:.2f} H={bar_prices.max():.2f} "
          f"L={bar_prices.min():.2f} C={bar_prices[-1]:.2f} V={mask.sum()}")

# Time-based filtering
morning = (times >= np.datetime64('2024-01-15T09:30:00', 's')) & \
          (times <  np.datetime64('2024-01-15T10:00:00', 's'))
print(f"Ticks in first 30 min: {morning.sum()}")
💼 Real-World: Market Hours Filter
A trading system filters raw tick data to keep only regular market hours (9:30-16:00 ET), exclude weekends and holidays, and compute daily OHLCV bars.
import numpy as np

rng = np.random.default_rng(42)

# Simulate a week of irregular tick data
days = np.arange('2024-01-08', '2024-01-13', dtype='datetime64[D]')  # Mon-Fri
all_times, all_prices = [], []
price = 100.0
for day in days:
    open_ns  = day.astype('datetime64[s]') + np.timedelta64(9*3600+30*60, 's')
    close_ns = day.astype('datetime64[s]') + np.timedelta64(16*3600, 's')
    n_ticks  = rng.integers(50, 150)
    t_offsets = rng.integers(0, int((close_ns - open_ns).astype(int)), n_ticks)
    t_offsets.sort()
    tick_times = open_ns + t_offsets.astype('timedelta64[s]')
    rets = rng.normal(0, 0.01, n_ticks)
    tick_prices = price * np.exp(np.cumsum(rets))
    price = tick_prices[-1]
    all_times.extend(tick_times.tolist())
    all_prices.extend(tick_prices.tolist())

times  = np.array(all_times, dtype='datetime64[s]')
prices = np.array(all_prices)

# OHLCV per day
print("Daily OHLCV:")
day_buckets = times.astype('datetime64[D]')
for day in np.unique(day_buckets):
    m = (day_buckets == day)
    p = prices[m]
    print(f"  {day}: O={p[0]:.2f} H={p.max():.2f} L={p.min():.2f} C={p[-1]:.2f} V={m.sum()}")
🏋️ Practice: Holiday Calendar
Write a function get_trading_days(start, end, holidays) that returns all business days between start and end (as datetime64 strings) excluding the given holidays. Write next_trading_day(date, holidays) that returns the next business day on or after date. Test with US holidays from 2024.
Starter Code
import numpy as np

def get_trading_days(start, end, holidays=None):
    start = np.datetime64(start, 'D')
    end   = np.datetime64(end, 'D')
    hols  = np.array(holidays or [], dtype='datetime64[D]')
    bdc   = np.busdaycalendar(holidays=hols)
    # Generate all calendar days, filter to business days
    all_days = np.arange(start, end, dtype='datetime64[D]')
    return all_days[np.is_busday(all_days, busdaycal=bdc)]

def next_trading_day(date, holidays=None):
    date = np.datetime64(date, 'D')
    hols = np.array(holidays or [], dtype='datetime64[D]')
    bdc  = np.busdaycalendar(holidays=hols)
    return np.busday_offset(date, 0, roll='forward', busdaycal=bdc)

us_holidays_2024 = [
    '2024-01-01', '2024-01-15', '2024-02-19',
    '2024-05-27', '2024-07-04', '2024-09-02',
    '2024-11-28', '2024-12-25'
]
trading_days = get_trading_days('2024-01-01', '2024-04-01', us_holidays_2024)
print(f"Trading days in Q1 2024: {len(trading_days)}")
print("First 5:", trading_days[:5])
print("Last 3: ", trading_days[-3:])

print("Next trading day after 2024-12-24:", next_trading_day('2024-12-24', us_holidays_2024))
✅ Practice Checklist
25. Meshgrid & Grid Operations

np.meshgrid, np.ogrid, and np.mgrid create coordinate grids for evaluating functions over 2D/3D spaces β€” essential for plotting, distance fields, and convolutions.

np.meshgrid for 2D function evaluation
import numpy as np

# meshgrid creates coordinate matrices
x = np.linspace(-2, 2, 5)
y = np.linspace(-1, 1, 4)
X, Y = np.meshgrid(x, y)

print("x:", x)
print("y:", y)
print("X shape:", X.shape, "  Y shape:", Y.shape)
print("X:\n", X)
print("Y:\n", Y)

# Evaluate a 2D function over the grid
Z = np.sin(X) * np.exp(-Y**2)
print("Z shape:", Z.shape)
print("Z max:", Z.max().round(4))

# Find point closest to (1.0, 0.5)
target_x, target_y = 1.0, 0.5
dist = np.sqrt((X - target_x)**2 + (Y - target_y)**2)
iy, ix = np.unravel_index(dist.argmin(), dist.shape)
print(f"Closest grid point to ({target_x}, {target_y}): ({X[iy,ix]:.1f}, {Y[iy,ix]:.1f})")

# 'ij' indexing (matrix-style, transposed from default)
X_ij, Y_ij = np.meshgrid(x, y, indexing='ij')
print("With indexing='ij': X shape:", X_ij.shape)
ogrid and mgrid for memory-efficient grids
import numpy as np

# ogrid: open grid β€” returns 1D arrays that broadcast
oy, ox = np.ogrid[-3:3:7j, -3:3:7j]   # 7 points from -3 to 3
print("ogrid ox shape:", ox.shape, " oy shape:", oy.shape)  # (1,7) and (7,1)

# Broadcasting creates the 2D result without full grid in memory
Z = ox**2 + oy**2  # broadcasts to (7,7)
print("Z shape:", Z.shape)
print("Radial distance from origin:\n", np.sqrt(Z).round(2))

# mgrid: closed grid β€” returns full arrays
y_grid, x_grid = np.mgrid[0:4, 0:5]  # like meshgrid but shorter syntax
print("mgrid shapes:", x_grid.shape, y_grid.shape)
print("x_grid:\n", x_grid)

# Useful: create coordinate pairs for all pixels
H, W = 8, 8
rows, cols = np.mgrid[0:H, 0:W]
center = np.array([H/2, W/2])
dist_from_center = np.sqrt((rows - center[0])**2 + (cols - center[1])**2)
circle_mask = dist_from_center <= 3
print(f"Pixels within radius 3 of center: {circle_mask.sum()}")

# All coordinate pairs as (N, 2) array
coords = np.column_stack([rows.ravel(), cols.ravel()])
print(f"All {H}x{W} pixel coordinates shape: {coords.shape}")
Distance fields and spatial operations
import numpy as np

# Create a distance field from a set of source points
H, W = 20, 20
rows, cols = np.mgrid[0:H, 0:W]

source_points = np.array([[3, 3], [10, 15], [17, 5]])

# Distance to nearest source point
dist_field = np.full((H, W), np.inf)
for pt in source_points:
    d = np.sqrt((rows - pt[0])**2 + (cols - pt[1])**2)
    dist_field = np.minimum(dist_field, d)

print(f"Distance field shape: {dist_field.shape}")
print(f"Max distance: {dist_field.max():.2f}")
print(f"Pixels within 5 units of any source: {(dist_field <= 5).sum()}")

# Voronoi diagram: which source is closest to each pixel?
labels = np.zeros((H, W), dtype=int)
for k, pt in enumerate(source_points):
    d = np.sqrt((rows - pt[0])**2 + (cols - pt[1])**2)
    mask = d < dist_field  # strictly closer than current min
    labels[mask] = k
    dist_field = np.minimum(dist_field, d)

for k in range(len(source_points)):
    print(f"Region {k}: {(labels == k).sum()} pixels")

# Gaussian RBF kernel centered at each source
sigma = 3.0
rbf = sum(np.exp(-(((rows-pt[0])**2+(cols-pt[1])**2)/(2*sigma**2)))
          for pt in source_points)
print(f"RBF max: {rbf.max():.3f}, sum: {rbf.sum():.1f}")
💼 Real-World: Heat Map Generator
A geographic data team generates a density heat map from GPS coordinates by evaluating a Gaussian kernel density estimate over a grid covering the study area.
import numpy as np

rng = np.random.default_rng(42)
# Simulate GPS points (lat/lon in some area)
n_points = 500
lat = rng.normal(40.7, 0.05, n_points)   # NYC-like
lon = rng.normal(-74.0, 0.08, n_points)

# Create grid
lat_grid, lon_grid = np.mgrid[
    lat.min()-0.02 : lat.max()+0.02 : 50j,
    lon.min()-0.02 : lon.max()+0.02 : 60j
]

# Gaussian KDE
def kde_density(lat_g, lon_g, lat_pts, lon_pts, bw=0.01):
    density = np.zeros_like(lat_g)
    for la, lo in zip(lat_pts, lon_pts):
        d2 = (lat_g - la)**2 + (lon_g - lo)**2
        density += np.exp(-d2 / (2 * bw**2))
    return density / (len(lat_pts) * 2 * np.pi * bw**2)

density = kde_density(lat_grid, lon_grid, lat, lon)
print(f"Grid shape: {density.shape}")
print(f"Density range: [{density.min():.4f}, {density.max():.4f}]")
print(f"Peak density at: lat={lat_grid.ravel()[density.argmax()]:.4f}, "
      f"lon={lon_grid.ravel()[density.argmax()]:.4f}")

# Hot spots: top 10% density cells
threshold = np.percentile(density, 90)
hot_spots  = density > threshold
print(f"Hot spot cells: {hot_spots.sum()} ({hot_spots.mean():.1%} of grid)")
🏋️ Practice: Bivariate Gaussian
Write a function bivariate_gaussian(X, Y, mu_x, mu_y, sigma_x, sigma_y, rho) that evaluates the bivariate normal PDF at grid points (X, Y). Parameters: mu = means, sigma = std devs, rho = correlation. Use meshgrid to create a 50x50 grid over [-3, 3] x [-3, 3] and plot (print stats). Verify the PDF integrates to ~1.
Starter Code
import numpy as np

def bivariate_gaussian(X, Y, mu_x=0, mu_y=0, sigma_x=1, sigma_y=1, rho=0):
    z = ((X - mu_x)**2 / sigma_x**2
         - 2*rho*(X - mu_x)*(Y - mu_y) / (sigma_x*sigma_y)
         + (Y - mu_y)**2 / sigma_y**2)
    coeff = 1 / (2 * np.pi * sigma_x * sigma_y * np.sqrt(1 - rho**2))
    return coeff * np.exp(-z / (2 * (1 - rho**2)))

x = np.linspace(-3, 3, 50)
y = np.linspace(-3, 3, 50)
X, Y = np.meshgrid(x, y)
dx = x[1] - x[0]
dy = y[1] - y[0]

# Standard normal (rho=0)
Z1 = bivariate_gaussian(X, Y)
print(f"Standard: max={Z1.max():.4f}, integral={Z1.sum()*dx*dy:.4f}")

# Correlated (rho=0.7)
Z2 = bivariate_gaussian(X, Y, mu_x=0.5, mu_y=-0.5, sigma_x=1.2, sigma_y=0.8, rho=0.7)
print(f"Correlated: max={Z2.max():.4f}, integral={Z2.sum()*dx*dy:.4f}")
print(f"Peak at: ({X.ravel()[Z2.argmax()]:.2f}, {Y.ravel()[Z2.argmax()]:.2f})")
✅ Practice Checklist
26. Advanced Broadcasting & Pairwise Operations

Broadcasting rules let NumPy operate on arrays of different shapes without copying data. Master broadcasting for pairwise distances, outer products, and vectorized scoring.

Broadcasting rules and shape alignment
import numpy as np

# Broadcasting rule: align shapes from the right,
# size-1 dimensions are stretched to match

# Example 1: add row vector to each row of a matrix
m = np.ones((4, 3))
v = np.array([10, 20, 30])         # shape (3,)
print("m + v:", m + v)             # (4,3) + (3,) -> (4,3)

# Example 2: add column vector to each column
col = np.array([[1], [2], [3], [4]])   # shape (4, 1)
print("m + col:\n", m + col)          # (4,3) + (4,1) -> (4,3)

# Example 3: outer product via broadcasting
a = np.array([1, 2, 3])     # shape (3,)
b = np.array([10, 20])      # shape (2,)
outer = a[:, np.newaxis] * b[np.newaxis, :]  # (3,1) * (1,2) -> (3,2)
print("Outer product:\n", outer)
print("Same as np.outer:", np.array_equal(outer, np.outer(a, b)))

# Shape check
shapes = [(3, 4, 5), (4, 5), (5,), (1,)]
base = np.zeros((3, 4, 5))
for s in shapes:
    arr = np.ones(s)
    print(f"(3,4,5) + {s} = {(base + arr).shape}")
Pairwise distances (no loops)
import numpy as np

rng = np.random.default_rng(42)

# Euclidean distance matrix: D[i,j] = dist(A[i], B[j])
def pairwise_euclidean(A, B):
    # A: (m, d), B: (n, d) -> (m, n)
    # ||a - b||^2 = ||a||^2 + ||b||^2 - 2 a.b
    sq_A = (A**2).sum(axis=1, keepdims=True)  # (m, 1)
    sq_B = (B**2).sum(axis=1, keepdims=True)  # (n, 1)
    dot  = A @ B.T                            # (m, n)
    return np.sqrt(np.maximum(sq_A + sq_B.T - 2*dot, 0))

A = rng.standard_normal((50, 3))   # 50 query points
B = rng.standard_normal((100, 3))  # 100 database points

D = pairwise_euclidean(A, B)
print(f"Distance matrix: {D.shape}")
print(f"Min dist: {D.min():.3f}, Max dist: {D.max():.3f}")

# Nearest neighbor for each query
nn_idx = D.argmin(axis=1)
nn_dist = D.min(axis=1)
print(f"Query 0: nearest is B[{nn_idx[0]}] at distance {nn_dist[0]:.3f}")

# k-NN: top-k closest
k = 3
knn_idx = np.argpartition(D, k, axis=1)[:, :k]
print(f"3-NN indices for query 0: {knn_idx[0]}")

# Cosine similarity matrix
def cosine_sim(A, B):
    A_norm = A / (np.linalg.norm(A, axis=1, keepdims=True) + 1e-9)
    B_norm = B / (np.linalg.norm(B, axis=1, keepdims=True) + 1e-9)
    return A_norm @ B_norm.T

C = cosine_sim(A, B)
print(f"Cosine sim range: [{C.min():.3f}, {C.max():.3f}]")
Vectorized scoring and ranking
import numpy as np

rng = np.random.default_rng(42)

# Multi-criteria scoring: score[i,j] = dot(weights, features[i,j])
# items: (N, F) features, weights: (F,) -> scores: (N,)
n_items, n_features = 1000, 10
features = rng.uniform(0, 1, (n_items, n_features))
weights  = np.array([0.3, 0.2, 0.15, 0.1, 0.1, 0.05, 0.04, 0.03, 0.02, 0.01])

scores = features @ weights   # (N,F) @ (F,) -> (N,)
print(f"Scores shape: {scores.shape}, range: [{scores.min():.3f}, {scores.max():.3f}]")

# Rank items (descending score)
ranks = np.argsort(scores)[::-1]
print("Top 5 items:", ranks[:5], "scores:", scores[ranks[:5]].round(3))

# Broadcasting for threshold matrix: flag (user, item) pairs
n_users = 20
user_thresholds = rng.uniform(0.4, 0.7, n_users)     # (20,)
# recommended[i,j] = True if scores[j] >= thresholds[i]
recommended = scores[np.newaxis, :] >= user_thresholds[:, np.newaxis]  # (20, 1000)
print(f"Recommendation matrix: {recommended.shape}")
print(f"Avg recommendations per user: {recommended.sum(axis=1).mean():.1f}")

# Pairwise similarity between items (feature dot products)
item_sim = features @ features.T  # (N, N)
print(f"Item similarity matrix: {item_sim.shape}")
print(f"Self-similarity range: [{np.diag(item_sim).min():.3f}, {np.diag(item_sim).max():.3f}]")
💼 Real-World: Embedding Search Engine
A semantic search system finds the top-5 most similar documents to a query by computing cosine similarity between query embedding and 50,000 document embeddings using broadcasting.
import numpy as np

rng = np.random.default_rng(42)

# Simulate document embeddings and a query
n_docs, dim = 50_000, 128
docs  = rng.standard_normal((n_docs, dim))
docs /= np.linalg.norm(docs, axis=1, keepdims=True)  # L2 normalize

query = rng.standard_normal(dim)
query /= np.linalg.norm(query)

# Cosine similarity: since both normalized, just dot product
sims = docs @ query   # (50000,)

# Top-5 results
top5_idx = np.argpartition(sims, -5)[-5:]
top5_idx = top5_idx[np.argsort(sims[top5_idx])[::-1]]

print("Top 5 similar documents:")
for rank, idx in enumerate(top5_idx, 1):
    print(f"  Rank {rank}: doc_id={idx:5d}, similarity={sims[idx]:.4f}")

# Retrieval metrics: how many docs above threshold?
threshold = 0.1
n_relevant = (sims >= threshold).sum()
print(f"Docs with similarity >= {threshold}: {n_relevant} ({n_relevant/n_docs:.2%})")

# Batch queries
n_queries = 100
queries = rng.standard_normal((n_queries, dim))
queries /= np.linalg.norm(queries, axis=1, keepdims=True)
batch_sims = queries @ docs.T   # (100, 50000)
batch_top1 = batch_sims.argmax(axis=1)
print(f"Batch top-1 for {n_queries} queries: mean sim={batch_sims.max(axis=1).mean():.4f}")
🏋️ Practice: Attention Scores
Implement scaled_dot_product_attention(Q, K, V) where Q, K have shape (seq_len, d_k) and V has shape (seq_len, d_v). Compute scores = softmax(Q @ K.T / sqrt(d_k)) @ V using only NumPy. Also implement multi_head_attention(Q, K, V, n_heads) that splits into n_heads, applies attention, and concatenates.
Starter Code
import numpy as np

def softmax(x, axis=-1):
    e = np.exp(x - x.max(axis=axis, keepdims=True))
    return e / e.sum(axis=axis, keepdims=True)

def scaled_dot_product_attention(Q, K, V):
    # Q: (seq, d_k), K: (seq, d_k), V: (seq, d_v)
    d_k = Q.shape[-1]
    # TODO: scores = softmax(Q @ K.T / sqrt(d_k))
    # TODO: return scores @ V
    pass

def multi_head_attention(Q, K, V, n_heads):
    seq, d = Q.shape
    d_k = d // n_heads
    heads = []
    for h in range(n_heads):
        # TODO: slice Q[:,h*d_k:(h+1)*d_k], K, V similarly
        # TODO: apply scaled_dot_product_attention, collect output
        pass
    # TODO: concatenate heads along last axis
    pass

rng = np.random.default_rng(42)
seq, d = 10, 64
Q = rng.standard_normal((seq, d))
K = rng.standard_normal((seq, d))
V = rng.standard_normal((seq, d))

out1 = scaled_dot_product_attention(Q, K, V)
print("Single-head output shape:", out1.shape)  # (10, 64)

out2 = multi_head_attention(Q, K, V, n_heads=8)
print("Multi-head output shape:", out2.shape)   # (10, 64)
✅ Practice Checklist
27. Universal Functions (ufuncs) in Depth

Ufuncs are vectorized functions that operate element-wise on arrays. They support reduce, accumulate, outer, and reduceat β€” and you can create custom ufuncs with np.frompyfunc or Numba.

Built-in ufunc methods: reduce, accumulate, outer
import numpy as np

arr = np.array([1, 2, 3, 4, 5])

# reduce: apply ufunc repeatedly to reduce array
print("np.add.reduce:", np.add.reduce(arr))         # 15 (sum)
print("np.multiply.reduce:", np.multiply.reduce(arr))  # 120 (product)
print("np.maximum.reduce:", np.maximum.reduce(arr))  # 5

# accumulate: like reduce but keeps intermediate results
print("np.add.accumulate:", np.add.accumulate(arr))        # cumsum
print("np.multiply.accumulate:", np.multiply.accumulate(arr))  # cumprod
print("np.maximum.accumulate:", np.maximum.accumulate(arr))

# outer: compute all pairs
a = np.array([1, 2, 3])
b = np.array([10, 20, 30])
print("np.add.outer:\n", np.add.outer(a, b))
print("np.multiply.outer:\n", np.multiply.outer(a, b))

# 2D reduce along axis
m = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("Row sums:", np.add.reduce(m, axis=1))       # [6, 15, 24]
print("Col products:", np.multiply.reduce(m, axis=0))  # [28, 80, 162]

# reduceat: reduce in segments
data = np.arange(10)
# Reduce segments: [0:3], [3:6], [6:10]
result = np.add.reduceat(data, [0, 3, 6])
print("Segment sums:", result)   # [0+1+2, 3+4+5, 6+7+8+9]
Creating custom ufuncs
import numpy as np

# np.frompyfunc: wrap a Python function as a ufunc
def clip_and_scale(x, lo=0, hi=1):
    return min(max(x, lo), hi) * 100

# frompyfunc returns object dtype by default
ufunc_cas = np.frompyfunc(lambda x: clip_and_scale(x), 1, 1)
data = np.array([-0.5, 0.0, 0.3, 0.7, 1.0, 1.5])
print("clip_and_scale:", ufunc_cas(data).astype(float))

# vectorize: more user-friendly, supports output dtype
clip_pct = np.vectorize(clip_and_scale, otypes=[float])
print("vectorize:", clip_pct(data))

# Common ufuncs catalogue
x = np.linspace(0.1, 2.0, 5)
print("exp:", np.exp(x).round(3))
print("log:", np.log(x).round(3))
print("log2:", np.log2(x).round(3))
print("log10:", np.log10(x).round(3))
print("sqrt:", np.sqrt(x).round(3))
print("cbrt:", np.cbrt(x).round(3))

# Trigonometric
angles = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
print("sin:", np.sin(angles).round(3))
print("arcsin:", np.arcsin(np.sin(angles)).round(3))
Ufunc performance and where argument
import numpy as np
import timeit

# where argument: apply ufunc only to selected elements
rng = np.random.default_rng(42)
data = rng.uniform(-5, 5, 1_000_000)

# np.sqrt only where data > 0, else keep 0
result = np.zeros_like(data)
np.sqrt(data, out=result, where=data > 0)
print(f"sqrt(data) where > 0: {result[:5].round(3)}")

# np.log1p (log(1+x)) is more numerically stable for small x
small = np.array([1e-10, 1e-5, 0.001, 0.01, 0.1])
print("log(1+x):  ", np.log(1 + small))
print("log1p(x):  ", np.log1p(small))

# Ufunc with out argument (in-place, avoids allocation)
a = np.random.rand(1_000_000)
b = np.empty_like(a)

t1 = timeit.timeit(lambda: np.exp(a), number=10)
t2 = timeit.timeit(lambda: np.exp(a, out=b), number=10)
print(f"exp(a) without out: {t1:.3f}s")
print(f"exp(a, out=b):      {t2:.3f}s")

# at: unbuffered in-place operation (useful for scatter operations)
arr = np.zeros(5)
indices = np.array([0, 1, 0, 2, 1])
values  = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
np.add.at(arr, indices, values)
print("add.at result:", arr)  # handles duplicate indices correctly
💼 Real-World: Vectorized Activation Functions
A neural network training loop applies activation functions to millions of neurons β€” using ufuncs with out= buffers halves memory allocation overhead.
import numpy as np

# Pre-allocated buffers
rng  = np.random.default_rng(42)
N    = 1_000_000
x    = rng.standard_normal(N).astype(np.float32)
buf  = np.empty_like(x)

def relu(x, out=None):
    return np.maximum(x, 0, out=out)

def sigmoid(x, out=None):
    out = np.empty_like(x) if out is None else out
    np.negative(x, out=out)
    np.exp(out, out=out)
    out += 1
    np.reciprocal(out, out=out)
    return out

def gelu(x, out=None):
    # Approximate GELU: x * sigmoid(1.702 * x)
    s = sigmoid(1.702 * x)
    return np.multiply(x, s, out=out)

def swish(x, out=None):
    return np.multiply(x, sigmoid(x), out=out)

# Compute and compare
activations = {
    "ReLU":    relu(x, buf.copy()),
    "Sigmoid": sigmoid(x, buf.copy()),
    "GELU":    gelu(x, buf.copy()),
    "Swish":   swish(x, buf.copy()),
}

for name, act in activations.items():
    print(f"{name:8s}: mean={act.mean():.4f}, std={act.std():.4f}, "
          f"range=[{act.min():.3f}, {act.max():.3f}]")
🏋️ Practice: Custom Ufunc
Use np.frompyfunc to create a ufunc haversine_ufunc that computes the great-circle distance in km between a (lat1, lon1) and (lat2, lon2) point pair. Then use vectorize to wrap it into a function that accepts arrays of lat/lon pairs. Compute distances between New York and a grid of 100 cities.
Starter Code
import numpy as np

def haversine_scalar(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    dlat = np.radians(lat2 - lat1)
    dlon = np.radians(lon2 - lon1)
    a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
    return 2 * R * np.arcsin(np.sqrt(a))

# Vectorize to work on arrays
haversine = np.vectorize(haversine_scalar, otypes=[float])

# New York City
ny_lat, ny_lon = 40.7128, -74.0060

rng = np.random.default_rng(42)
n_cities = 100
city_lats = rng.uniform(-60, 70, n_cities)
city_lons = rng.uniform(-180, 180, n_cities)

distances = haversine(ny_lat, ny_lon, city_lats, city_lons)
print(f"Distances computed: {len(distances)}")
print(f"Nearest city: {distances.min():.0f} km")
print(f"Farthest city: {distances.max():.0f} km")
print(f"Average distance from NYC: {distances.mean():.0f} km")
✅ Practice Checklist
28. Masked Arrays (np.ma)

np.ma.MaskedArray transparently handles missing data by maintaining a boolean mask alongside values. Useful for sensor data with gaps or any dataset with invalid entries.

Creating and using masked arrays
import numpy as np

# Create masked array
data = np.array([1.0, 2.0, -999.0, 4.0, -999.0, 6.0])
mask = data == -999.0

ma = np.ma.array(data, mask=mask)
print("Data:  ", ma)
print("Mask:  ", ma.mask)
print("Filled:", ma.filled(fill_value=0))

# Operations automatically skip masked values
print("Mean (excludes masked):", ma.mean())   # (1+2+4+6)/4 = 3.25
print("Sum:", ma.sum())                        # 13
print("Std:", ma.std().round(4))

# np.ma.masked_where
arr = np.array([10, 25, -5, 0, 8, -1, 15])
ma2 = np.ma.masked_where(arr <= 0, arr)
print("Masked non-positive:", ma2)
print("Log (safe):", np.log(ma2))   # no error for masked values

# np.ma.masked_invalid: auto-mask NaN and Inf
dirty = np.array([1.0, np.nan, 3.0, np.inf, 5.0, -np.inf])
clean = np.ma.masked_invalid(dirty)
print("Masked invalid:", clean)
print("Safe mean:", clean.mean())
Masked array operations and conversions
import numpy as np

# 2D masked array
temps = np.array([[15.2, 16.1, -999, 18.5],
                  [-999, 14.8, 15.9, 16.7],
                  [13.1, -999, -999, 15.3]])
m = np.ma.masked_equal(temps, -999)

print("Masked temps:\n", m)
print("Row means (excl. missing):", m.mean(axis=1))
print("Col means (excl. missing):", m.mean(axis=0))

# Fill with column means
col_means = m.mean(axis=0)
filled = m.filled(fill_value=col_means)  # broadcasts col_means
print("Filled temps:\n", filled.round(1))

# Stacking masked arrays
a = np.ma.array([1, 2, 3, 4], mask=[0, 1, 0, 0])
b = np.ma.array([5, 6, 7, 8], mask=[0, 0, 1, 0])
stacked = np.ma.vstack([a, b])
print("Stacked:\n", stacked)
print("Column min (ignoring masks):", stacked.min(axis=0))

# Convert to/from regular arrays
arr = m.compressed()         # 1D array of valid values only
print("Compressed:", arr)
arr2 = np.ma.filled(m, -1)  # replace mask with -1
print("Filled(-1):\n", arr2)
Masked array with statistics and aggregations
import numpy as np

rng = np.random.default_rng(42)
N = 1000

# Simulate sensor readings with random dropouts
raw = rng.normal(20, 3, N)          # temperature readings
dropout_mask = rng.random(N) < 0.15  # 15% missing

sensor = np.ma.array(raw, mask=dropout_mask)
print(f"Total readings: {N}")
print(f"Valid readings: {sensor.count()} ({sensor.count()/N:.1%})")
print(f"Masked readings: {sensor.mask.sum()}")

# Robust statistics (automatically use only valid data)
print(f"Mean:   {sensor.mean():.3f}")
print(f"Median: {np.ma.median(sensor):.3f}")
print(f"Std:    {sensor.std():.3f}")
print(f"Min:    {sensor.min():.3f}")
print(f"Max:    {sensor.max():.3f}")

# Percentiles (need filled for np.percentile)
filled_for_pct = sensor.compressed()
print(f"P25/P75: {np.percentile(filled_for_pct, [25,75]).round(2)}")

# Anomaly detection: flag values > 3 sigma (on top of existing mask)
mean, std = sensor.mean(), sensor.std()
anomaly_mask = (sensor < mean - 3*std) | (sensor > mean + 3*std)
sensor_clean = np.ma.array(sensor.data, mask=sensor.mask | anomaly_mask.filled(False))
print(f"After anomaly removal: {sensor_clean.count()} valid readings")
💼 Real-World: Oceanographic Data Cleaner
A research station processes 5-year daily ocean temperature data with sensor failures (flagged as -9999) and physical impossibilities (<-2Β°C or >35Β°C), using masked arrays to compute clean statistics.
import numpy as np

rng = np.random.default_rng(42)
n_years = 5
n_days  = n_years * 365

# Simulate ocean temps with seasonal pattern
t = np.linspace(0, n_years * 2 * np.pi, n_days)
base_temp = 15 + 10 * np.sin(t - np.pi/2)  # seasonal cycle
temps = base_temp + rng.normal(0, 1.5, n_days)

# Inject data quality issues
sensor_fail   = rng.random(n_days) < 0.08    # 8% sensor failures
physical_anom = rng.random(n_days) < 0.02    # 2% physical anomalies
temps[sensor_fail]   = -9999.0
temps[physical_anom] = rng.choice([-5.0, 40.0], physical_anom.sum())

# Build mask: flag everything invalid
mask = ((temps == -9999.0) |
        (temps < -2.0) |
        (temps > 35.0))
clean = np.ma.array(temps, mask=mask)

print(f"Data coverage: {clean.count()/n_days:.1%} valid ({clean.count()} of {n_days} days)")
print(f"Mean temperature: {clean.mean():.2f}Β°C")
print(f"Seasonal range: [{clean.min():.2f}, {clean.max():.2f}]Β°C")

# Annual stats (reshape to years x 365)
annual = clean[:n_years*365].reshape(n_years, 365)
for y in range(n_years):
    row = annual[y]
    print(f"  Year {y+1}: mean={row.mean():.2f}Β°C, valid={row.count()} days")
🏋️ Practice: Masked Correlation
Write masked_corrcoef(X) that computes a pairwise correlation matrix for a 2D masked array X (shape n_obs x n_vars), where each pair of variables is correlated only using rows where BOTH are valid. Return a regular (n_vars x n_vars) ndarray. Test on a dataset where each column has independent random missing values.
Starter Code
import numpy as np

def masked_corrcoef(X):
    n_obs, n_vars = X.shape
    corr = np.eye(n_vars)
    for i in range(n_vars):
        for j in range(i+1, n_vars):
            # Rows valid for BOTH columns i and j
            valid = (~X[:, i].mask) & (~X[:, j].mask)
            if valid.sum() < 2:
                corr[i, j] = corr[j, i] = np.nan
                continue
            xi = X[valid, i].data
            xj = X[valid, j].data
            r  = np.corrcoef(xi, xj)[0, 1]
            corr[i, j] = corr[j, i] = r
    return corr

rng = np.random.default_rng(42)
n, p = 200, 4
data = rng.standard_normal((n, p))
# Create correlated structure
data[:, 1] = 0.7 * data[:, 0] + 0.3 * rng.standard_normal(n)

masks = rng.random((n, p)) < 0.2   # 20% missing per column
X = np.ma.array(data, mask=masks)

corr = masked_corrcoef(X)
print("Pairwise correlations (with per-pair valid observations):")
print(corr.round(3))
print("Expected corr(0,1) ~ 0.7:", corr[0, 1].round(2))
✅ Practice Checklist
29. Memory Layout, Strides & Contiguity

NumPy arrays store data in contiguous memory blocks. Understanding C/F order, strides, and views vs copies is critical for performance and interoperability with C/Fortran libraries.

C vs Fortran order, flags, and strides
import numpy as np

# C order (row-major): last axis varies fastest (default)
arr_c = np.array([[1, 2, 3], [4, 5, 6]], order='C')
print("C order strides:", arr_c.strides)   # (12, 4) for float32

# F order (column-major): first axis varies fastest
arr_f = np.array([[1, 2, 3], [4, 5, 6]], order='F')
print("F order strides:", arr_f.strides)   # (4, 8) for float32

# Check flags
print("C-contiguous:", arr_c.flags['C_CONTIGUOUS'])   # True
print("F-contiguous:", arr_c.flags['F_CONTIGUOUS'])   # False

# Transpose only changes strides, no data copy
T = arr_c.T
print("Transposed strides:", T.strides)
print("T is view:", T.base is arr_c)   # True

# ascontiguousarray: ensure C-contiguous copy
T_c = np.ascontiguousarray(T)
print("After ascontiguousarray:", T_c.flags['C_CONTIGUOUS'])

# Strides as bytes
arr = np.arange(24, dtype=np.float64).reshape(2, 3, 4)
print(f"Shape: {arr.shape}, Strides (bytes): {arr.strides}")
print(f"Itemsize: {arr.itemsize} bytes")
# stride[k] = itemsize * product(shape[k+1:])
Views vs copies and memory sharing
import numpy as np

arr = np.arange(20, dtype=float)

# Slice creates a VIEW (shares memory)
view = arr[2:10:2]   # every 2nd element from 2 to 10
print("view:", view)
print("view.base is arr:", view.base is arr)

view[0] = 999        # modifies arr!
print("arr after view[0]=999:", arr[:12])

# copy() creates an independent copy
arr2 = arr.copy()
arr2[0] = -1
print("Original arr[0] unchanged:", arr[0])  # still 999 (from earlier)

# Fancy indexing always copies
fancy = arr[[0, 3, 7]]
print("fancy.base:", fancy.base)   # None = not a view

# reshape: usually a view if data is contiguous
arr3 = np.arange(12)
reshaped = arr3.reshape(3, 4)
print("reshape is view:", reshaped.base is arr3)
reshaped[0, 0] = -1
print("arr3[0] changed:", arr3[0])   # -1

# np.shares_memory: robust check
a = np.arange(10)
b = a[::2]
c = a.copy()
print("a and b share memory:", np.shares_memory(a, b))   # True
print("a and c share memory:", np.shares_memory(a, c))   # False
Stride tricks for rolling windows
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view, as_strided

arr = np.arange(10, dtype=float)

# sliding_window_view: safe way to get overlapping windows
windows = sliding_window_view(arr, window_shape=4)
print("Windows shape:", windows.shape)   # (7, 4)
print("First 3 windows:\n", windows[:3])

# Each window is a VIEW β€” no data copy!
windows[0, 0] = 999
print("arr[0] changed:", arr[0])   # yes, 999

arr = np.arange(10, dtype=float)  # reset

# 2D rolling windows (for image patches)
img = np.arange(16, dtype=float).reshape(4, 4)
patches = sliding_window_view(img, window_shape=(2, 2))
print("2D patches shape:", patches.shape)   # (3, 3, 2, 2)
print("Patch at [0,0]:\n", patches[0, 0])

# as_strided: manual stride tricks (use with care!)
# Rolling mean via strides
def rolling_mean_strided(arr, window):
    w = sliding_window_view(arr, window)
    return w.mean(axis=1)

rolling = rolling_mean_strided(np.arange(10, dtype=float), 3)
print("Rolling mean (window=3):", rolling)
💼 Real-World: Cache-Friendly Matrix Multiply
A performance-critical simulation avoids unnecessary transpositions and ensures arrays are contiguous before passing to BLAS-backed np.dot, cutting computation time significantly.
import numpy as np, timeit

rng = np.random.default_rng(42)

A = rng.standard_normal((500, 300))
B = rng.standard_normal((300, 400))

# C-contiguous vs non-contiguous performance
B_T_F = np.asfortranarray(B.T)       # F-order, non-contiguous for @
B_T_C = np.ascontiguousarray(B.T)    # C-contiguous copy

print("B.T C-contiguous:", B.T.flags['C_CONTIGUOUS'])      # False
print("B_T_C contiguous:", B_T_C.flags['C_CONTIGUOUS'])    # True

# GEMM: A @ B (B is 300x400 C-contiguous)
t1 = timeit.timeit(lambda: A @ B, number=100)
t2 = timeit.timeit(lambda: A @ B_T_F.T, number=100)
t3 = timeit.timeit(lambda: A @ B_T_C.T, number=100)

print(f"A @ B (contiguous):      {t1:.4f}s")
print(f"A @ B_T_F.T (F-order):   {t2:.4f}s")
print(f"A @ B_T_C.T (C-order):   {t3:.4f}s")

# Verify results are the same
C1 = A @ B
C2 = A @ B_T_F.T
print("Results match:", np.allclose(C1, C2))

# Memory usage check
import sys
print(f"B.T.copy() size: {sys.getsizeof(B.T.copy()):,} bytes")
print(f"B.T view size:   {sys.getsizeof(B.T):,} bytes (metadata only)")
🏋️ Practice: Stride Inspector
Write inspect_array(arr) that prints: shape, dtype, strides, itemsize, nbytes, is_C_contiguous, is_F_contiguous, is_view (arr.base is not None), total elements. Then write reshape_safe(arr, shape) that reshapes arr to shape, ensuring the result is C-contiguous (copy only if needed). Test with various slices and transpositions.
Starter Code
import numpy as np

def inspect_array(arr):
    print(f"Shape:           {arr.shape}")
    print(f"dtype:           {arr.dtype}")
    print(f"strides (bytes): {arr.strides}")
    print(f"itemsize:        {arr.itemsize}")
    print(f"nbytes:          {arr.nbytes:,}")
    print(f"C-contiguous:    {arr.flags['C_CONTIGUOUS']}")
    print(f"F-contiguous:    {arr.flags['F_CONTIGUOUS']}")
    print(f"Is view:         {arr.base is not None}")
    print(f"N elements:      {arr.size}")

def reshape_safe(arr, shape):
    if not arr.flags['C_CONTIGUOUS']:
        arr = np.ascontiguousarray(arr)
    return arr.reshape(shape)

base = np.arange(24, dtype=np.float32).reshape(4, 6)
print("=== Base array ===")
inspect_array(base)

print("\n=== Transposed ===")
T = base.T
inspect_array(T)

print("\n=== Slice (non-contiguous) ===")
s = base[::2, ::2]
inspect_array(s)

print("\n=== Reshape safe (from transposed) ===")
r = reshape_safe(T, (24,))
inspect_array(r)
✅ Practice Checklist
30. Advanced Linear Algebra Operations

Beyond basic dot products: matrix decompositions (SVD, QR, Cholesky), determinants, null spaces, pseudo-inverse, and Kronecker products for systems of equations and dimensionality reduction.

QR decomposition and least squares
import numpy as np

rng = np.random.default_rng(42)

# QR decomposition: A = Q @ R
A = rng.standard_normal((6, 4))
Q, R = np.linalg.qr(A)
print(f"A: {A.shape}, Q: {Q.shape}, R: {R.shape}")
print("Q orthonormal:", np.allclose(Q.T @ Q, np.eye(4)))
print("A = Q@R:", np.allclose(A, Q @ R))

# Least squares: solve Ax = b for overdetermined system
m, n = 100, 5
A = rng.standard_normal((m, n))
true_x = rng.standard_normal(n)
b = A @ true_x + rng.normal(0, 0.1, m)   # noisy measurements

# lstsq: minimize ||Ax - b||^2
x, residuals, rank, sv = np.linalg.lstsq(A, b, rcond=None)
print(f"True x:   {true_x[:3].round(3)}")
print(f"Solved x: {x[:3].round(3)}")
print(f"Matrix rank: {rank}, condition number: {sv[0]/sv[-1]:.1f}")

# Via normal equations (less stable but educational)
x_normal = np.linalg.solve(A.T @ A, A.T @ b)
print("Normal eq error:", np.abs(x - x_normal).max())
Cholesky, determinant, and null space
import numpy as np

rng = np.random.default_rng(42)

# Cholesky decomposition: A = L @ L.T (for positive definite A)
# Used for sampling multivariate normal and inverting covariance matrices
n = 5
A = rng.standard_normal((n, n))
S = A.T @ A + n * np.eye(n)  # make it positive definite

L = np.linalg.cholesky(S)
print("L shape:", L.shape)
print("L @ L.T == S:", np.allclose(L @ L.T, S))
print("L is lower triangular:", np.allclose(L, np.tril(L)))

# Sample from multivariate normal using Cholesky
mu  = np.zeros(n)
cov = S / S.max()   # scale to reasonable range
L   = np.linalg.cholesky(cov)
samples = mu + rng.standard_normal((1000, n)) @ L.T
print(f"Sample covariance matches target: {np.allclose(np.cov(samples.T), cov, atol=0.1)}")

# Determinant (log for numerical stability)
logdet_sign, logdet = np.linalg.slogdet(S)
print(f"logdet(S): {logdet:.3f}, sign: {logdet_sign}")
print(f"det(S) = exp({logdet:.1f}) ~ {np.exp(logdet):.2e}")

# Pseudo-inverse (Moore-Penrose)
m = np.array([[1, 2], [3, 4], [5, 6]])
pinv = np.linalg.pinv(m)
print("Pseudo-inverse shape:", pinv.shape)
print("m @ pinv @ m == m:", np.allclose(m @ pinv @ m, m))
Eigendecomposition and PCA from scratch
import numpy as np

rng = np.random.default_rng(42)

# Eigendecomposition: A v = lambda v
A = np.array([[4, -2], [1,  1]])
eigenvalues, eigenvectors = np.linalg.eig(A)
print("Eigenvalues:", eigenvalues)
print("Eigenvectors:\n", eigenvectors)

# Verify: A @ v = lambda * v
for i in range(len(eigenvalues)):
    v = eigenvectors[:, i]
    print(f"  A @ v{i} == {eigenvalues[i]:.1f} * v{i}: {np.allclose(A @ v, eigenvalues[i] * v)}")

# Symmetric matrix: use eigh (more stable, guaranteed real eigenvalues)
S = np.array([[3, 1], [1, 3]])
w, v = np.linalg.eigh(S)
print("Symmetric eigs:", w)

# PCA from scratch using SVD
n, d = 200, 5
X = rng.standard_normal((n, d))
X[:, 1] = 0.8 * X[:, 0] + 0.6 * X[:, 1]  # create correlation

# Center data
X_c = X - X.mean(axis=0)

# SVD of centered data
U, s, Vt = np.linalg.svd(X_c, full_matrices=False)
explained = s**2 / (s**2).sum()
print("Explained variance ratio:", explained.round(3))
print("Cumulative:", explained.cumsum().round(3))

# Project to 2 components
X_2d = X_c @ Vt[:2].T
print(f"PCA 2D shape: {X_2d.shape}")
💼 Real-World: Recommender via Matrix Factorization
A collaborative filtering system factorizes a user-item rating matrix using SVD to find latent factors and predict missing ratings.
import numpy as np

rng = np.random.default_rng(42)

n_users, n_items, n_factors = 100, 200, 10

# Simulate a low-rank rating matrix (most entries unknown)
U_true = rng.standard_normal((n_users, n_factors))
V_true = rng.standard_normal((n_items, n_factors))
R_true = U_true @ V_true.T + rng.normal(0, 0.5, (n_users, n_items))
R_true = np.clip(R_true, 1, 5)

# Only observe 20% of ratings
observed_mask = rng.random((n_users, n_items)) < 0.2
R_obs = np.where(observed_mask, R_true, 0.0)

# Truncated SVD on observed (imperfect but illustrative)
U, s, Vt = np.linalg.svd(R_obs, full_matrices=False)
k = n_factors
R_hat = U[:, :k] @ np.diag(s[:k]) @ Vt[:k, :]
R_hat = np.clip(R_hat, 1, 5)

# RMSE on observed entries
observed_pred = R_hat[observed_mask]
observed_true = R_true[observed_mask]
rmse = np.sqrt(np.mean((observed_pred - observed_true)**2))
print(f"RMSE on observed ratings: {rmse:.3f}")

# Top-5 recommendations for user 0
user0_scores = R_hat[0]
user0_unseen = ~observed_mask[0]
top5 = np.argsort(user0_scores * user0_unseen)[::-1][:5]
print(f"Top-5 recommendations for user 0: items {top5} with scores {user0_scores[top5].round(2)}")
🏋️ Practice: System of Equations Solver
Write solve_system(A, b) that checks if the system Ax=b is (1) well-determined (use rank), (2) overdetermined (least squares), or (3) underdetermined (minimum-norm solution via pinv), and solves accordingly with a label. Also write condition_number(A) and show how ill-conditioned matrices cause solution instability.
Starter Code
import numpy as np

def solve_system(A, b):
    m, n = A.shape
    rank = np.linalg.matrix_rank(A)
    if m == n and rank == n:
        label = "well-determined"
        x = np.linalg.solve(A, b)
    elif m > n:
        label = "overdetermined (least squares)"
        x, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
    else:
        label = "underdetermined (min-norm)"
        x = np.linalg.pinv(A) @ b
    return x, label

def condition_number(A):
    s = np.linalg.svd(A, compute_uv=False)
    return s[0] / s[-1] if s[-1] > 0 else np.inf

# Test cases
rng = np.random.default_rng(42)

# Well-determined 3x3
A1 = rng.standard_normal((3, 3))
b1 = rng.standard_normal(3)
x1, lbl1 = solve_system(A1, b1)
print(f"{lbl1}: cond={condition_number(A1):.1f}, error={np.linalg.norm(A1@x1-b1):.2e}")

# Overdetermined 10x3
A2 = rng.standard_normal((10, 3))
b2 = rng.standard_normal(10)
x2, lbl2 = solve_system(A2, b2)
print(f"{lbl2}: cond={condition_number(A2):.1f}, residual={np.linalg.norm(A2@x2-b2):.4f}")

# Ill-conditioned
A3 = np.array([[1, 1], [1, 1+1e-10]])
b3 = np.array([2.0, 2.0])
x3, lbl3 = solve_system(A3, b3)
print(f"{lbl3}: cond={condition_number(A3):.2e}")
✅ Practice Checklist
31. Numerical Differentiation & Optimization

Finite differences approximate gradients of black-box functions. Combined with np.gradient, they enable sensitivity analysis, gradient checking, and simple optimization without symbolic math.

Finite differences: forward, backward, central
import numpy as np

# First derivative via finite differences
def forward_diff(f, x, h=1e-5):
    return (f(x + h) - f(x)) / h

def backward_diff(f, x, h=1e-5):
    return (f(x) - f(x - h)) / h

def central_diff(f, x, h=1e-5):
    return (f(x + h) - f(x - h)) / (2 * h)

def second_diff(f, x, h=1e-5):
    return (f(x + h) - 2*f(x) + f(x - h)) / h**2

# Test on f(x) = sin(x), f'(x) = cos(x)
f = np.sin
x = np.pi / 4
true_deriv = np.cos(x)

print(f"True f'(pi/4) = {true_deriv:.10f}")
print(f"Forward:   {forward_diff(f, x):.10f}  err={abs(forward_diff(f,x)-true_deriv):.2e}")
print(f"Backward:  {backward_diff(f, x):.10f}  err={abs(backward_diff(f,x)-true_deriv):.2e}")
print(f"Central:   {central_diff(f, x):.10f}  err={abs(central_diff(f,x)-true_deriv):.2e}")
print(f"Second:    {second_diff(f, x):.10f}  true=-sin(pi/4)={-np.sin(x):.10f}")

# Vectorized on array
x_arr = np.linspace(0, 2*np.pi, 100)
deriv_arr = central_diff(np.sin, x_arr)
error = np.abs(deriv_arr - np.cos(x_arr))
print(f"Max error over array: {error.max():.2e}")
np.gradient for array derivatives
import numpy as np

# np.gradient: uses central differences internally, handles edges with one-sided
x = np.linspace(0, 2*np.pi, 100)
y = np.sin(x)

dydx = np.gradient(y, x)   # pass x for proper spacing
true  = np.cos(x)
print(f"Max error vs cos(x): {np.abs(dydx - true).max():.4e}")

# Partial derivatives of 2D function
nx, ny = 50, 60
x_1d = np.linspace(0, 2, nx)
y_1d = np.linspace(0, 3, ny)
X, Y = np.meshgrid(x_1d, y_1d)

Z = np.sin(X) * np.cos(Y)

# gradient returns [dZ/dy, dZ/dx] for 2D (row, col ordering)
dZ_dy, dZ_dx = np.gradient(Z, y_1d, x_1d)

true_dZ_dx = np.cos(X) * np.cos(Y)
true_dZ_dy = -np.sin(X) * np.sin(Y)

print(f"dZ/dx error: {np.abs(dZ_dx - true_dZ_dx).max():.4e}")
print(f"dZ/dy error: {np.abs(dZ_dy - true_dZ_dy).max():.4e}")

# Gradient magnitude (for edge detection in images)
mag = np.sqrt(dZ_dx**2 + dZ_dy**2)
print(f"Gradient magnitude: min={mag.min():.3f}, max={mag.max():.3f}")
Gradient descent with NumPy
import numpy as np

rng = np.random.default_rng(42)

# Optimize f(x, y) = (x-3)^2 + 2*(y+1)^2 (minimum at (3, -1))
def f(params):
    x, y = params
    return (x - 3)**2 + 2*(y + 1)**2

def grad_f(params):
    x, y = params
    return np.array([2*(x - 3), 4*(y + 1)])

# Gradient descent
params  = np.array([0.0, 0.0])
lr      = 0.1
history = [params.copy()]

for i in range(50):
    g = grad_f(params)
    params = params - lr * g
    history.append(params.copy())
    if i % 10 == 9:
        print(f"  iter {i+1:3d}: f={f(params):.6f}, params=({params[0]:.3f}, {params[1]:.3f})")

print(f"True minimum: (3, -1), found: ({params[0]:.4f}, {params[1]:.4f})")

# Numerical gradient check (compare analytic vs finite diff)
test_pt = rng.standard_normal(2)
analytic = grad_f(test_pt)
h = 1e-6
numeric  = np.array([(f(test_pt + h*e) - f(test_pt - h*e)) / (2*h)
                      for e in np.eye(2)])
print(f"Gradient check: analytic={analytic.round(4)}, numeric={numeric.round(4)}")
print(f"Max error: {np.abs(analytic - numeric).max():.2e}")
💼 Real-World: Gradient-Based Calibration
A financial model calibrates parameters to match observed option prices by minimizing a loss function using gradient descent with numerical gradients.
import numpy as np

rng = np.random.default_rng(42)

# Simulate: find mu, sigma that best fit observed log returns
n_obs = 500
true_mu, true_sigma = 0.001, 0.02
observed = rng.normal(true_mu, true_sigma, n_obs)

def neg_log_likelihood(params, data):
    mu, log_sigma = params
    sigma = np.exp(log_sigma)  # log parameterization ensures sigma > 0
    return 0.5 * np.sum(np.log(2*np.pi*sigma**2) + ((data - mu)/sigma)**2)

def numerical_gradient(f, params, h=1e-6):
    grad = np.zeros_like(params)
    for i in range(len(params)):
        e = np.zeros_like(params)
        e[i] = h
        grad[i] = (f(params + e) - f(params - e)) / (2 * h)
    return grad

params = np.array([0.0, np.log(0.01)])  # initial guess
lr = 1e-4

for epoch in range(500):
    loss = neg_log_likelihood(params, observed)
    grad = numerical_gradient(lambda p: neg_log_likelihood(p, observed), params)
    params -= lr * grad

mu_hat    = params[0]
sigma_hat = np.exp(params[1])
print(f"True:      mu={true_mu:.4f}, sigma={true_sigma:.4f}")
print(f"Estimated: mu={mu_hat:.4f}, sigma={sigma_hat:.4f}")
print(f"Error:     mu={abs(mu_hat-true_mu):.2e}, sigma={abs(sigma_hat-true_sigma):.2e}")
🏋️ Practice: Jacobian Checker
Write jacobian(f, x, h=1e-6) that computes the Jacobian matrix of a vector-valued function f: R^n -> R^m using central differences. Then write gradient_check(f, grad_f, x) that compares the numerical Jacobian with the analytic gradient and reports the relative error. Test with f(x) = [sin(x0)*x1, x0^2 + exp(x1)].
Starter Code
import numpy as np

def jacobian(f, x, h=1e-6):
    x  = np.asarray(x, float)
    f0 = np.asarray(f(x), float)
    m  = f0.size
    n  = x.size
    J  = np.zeros((m, n))
    for j in range(n):
        dx    = np.zeros(n)
        dx[j] = h
        J[:, j] = (np.asarray(f(x + dx)) - np.asarray(f(x - dx))) / (2 * h)
    return J

def gradient_check(f, grad_f, x, h=1e-6):
    J_num = jacobian(f, x, h)
    J_ana = np.asarray(grad_f(x))
    err   = np.abs(J_num - J_ana)
    rel   = err / (np.abs(J_ana) + 1e-8)
    return {"max_abs_err": err.max(), "max_rel_err": rel.max(), "ok": rel.max() < 1e-4}

# Test function: f(x) = [sin(x0)*x1, x0^2 + exp(x1)]
def f_vec(x):
    return [np.sin(x[0])*x[1], x[0]**2 + np.exp(x[1])]

def df_vec(x):
    return np.array([
        [np.cos(x[0])*x[1], np.sin(x[0])],
        [2*x[0],              np.exp(x[1])],
    ])

x0 = np.array([0.5, 1.2])
result = gradient_check(f_vec, df_vec, x0)
print("Jacobian check:", result)
print("Numerical J:\n", jacobian(f_vec, x0).round(6))
print("Analytic J:\n",  df_vec(x0).round(6))
✅ Practice Checklist
32. NumPy in Data Science Workflows

NumPy integrates tightly with pandas, scikit-learn, PyTorch, and SciPy. Understanding these bridges β€” arrays, dtypes, memory sharing β€” makes you faster at the whole pipeline.

NumPy <-> Pandas integration
import numpy as np

# Simulate what pandas does under the hood
# A DataFrame is essentially a dict of 1D NumPy arrays

# From numpy to structured array (pandas-like)
n = 5
ids    = np.arange(1, n+1)
names  = np.array(['Alice', 'Bob', 'Carol', 'Dave', 'Eve'])
scores = np.array([92.5, 78.3, 88.1, 95.0, 70.2])
grades = np.where(scores >= 90, 'A', np.where(scores >= 80, 'B', 'C'))

# Operations you'd do in pandas, using only NumPy
mean_score = scores.mean()
top_scorers = names[scores >= 90]
passing     = names[scores >= 75]

print("Mean score:", mean_score.round(2))
print("Top scorers (A grade):", top_scorers)
print("Passing (>=75):", passing)

# Group by grade
for grade in np.unique(grades):
    mask = grades == grade
    print(f"Grade {grade}: {names[mask].tolist()}, avg={scores[mask].mean():.1f}")

# NumPy array -> pandas DataFrame conversion info
try:
    import pandas as pd
    df = pd.DataFrame({'id': ids, 'name': names, 'score': scores, 'grade': grades})
    # df.values returns numpy array (may copy depending on dtypes)
    arr = df[['id', 'score']].to_numpy()
    print("DataFrame to numpy:", arr.shape, arr.dtype)
except ImportError:
    print("(pandas not available, but the pattern works)")
NumPy for feature engineering
import numpy as np

rng = np.random.default_rng(42)
n, d = 1000, 5

X = rng.standard_normal((n, d))

# 1. Normalization
def z_score(X):
    return (X - X.mean(axis=0)) / (X.std(axis=0) + 1e-8)

def min_max(X):
    lo, hi = X.min(axis=0), X.max(axis=0)
    return (X - lo) / (hi - lo + 1e-8)

Xz = z_score(X)
Xm = min_max(X)
print(f"Z-score: mean={Xz.mean(axis=0).round(2)}, std={Xz.std(axis=0).round(2)}")
print(f"MinMax: [{Xm.min():.3f}, {Xm.max():.3f}]")

# 2. Polynomial features (degree 2, first 3 columns)
Xp = np.column_stack([X[:, :3], X[:, :3]**2,
                       X[:, 0:1]*X[:, 1:2], X[:, 1:2]*X[:, 2:3]])
print(f"Polynomial features: {X.shape} -> {Xp.shape}")

# 3. One-hot encoding
cats = rng.integers(0, 4, n)   # 4 categories
ohe  = (cats[:, np.newaxis] == np.arange(4)[np.newaxis, :]).astype(float)
print(f"OHE: shape={ohe.shape}, sum per row all 1: {np.all(ohe.sum(axis=1)==1)}")

# 4. Interaction features (outer product per row)
a = X[:, :3]   # 3 features
# Row-wise outer: shape (n, 3, 3) -> upper triangle -> (n, 6) interaction terms
combos = np.einsum('ni,nj->nij', a, a)
triu_idx = np.triu_indices(3, k=0)
interactions = combos[:, triu_idx[0], triu_idx[1]]
print(f"Interaction features shape: {interactions.shape}")
NumPy in ML inference
import numpy as np

rng = np.random.default_rng(42)

# Manually implement a 2-layer neural network forward pass
def relu(x): return np.maximum(0, x)
def softmax(x):
    e = np.exp(x - x.max(axis=1, keepdims=True))
    return e / e.sum(axis=1, keepdims=True)

# Simulated trained weights (normally loaded from file)
n_in, n_hidden, n_out = 784, 256, 10
W1 = rng.standard_normal((n_in, n_hidden)) * 0.01
b1 = np.zeros(n_hidden)
W2 = rng.standard_normal((n_hidden, n_out)) * 0.01
b2 = np.zeros(n_out)

def forward(X):
    h1  = relu(X @ W1 + b1)
    out = softmax(X @ W1 @ W2 + b2)  # simplified
    return out

# Batch inference
batch_size = 64
X_batch = rng.standard_normal((batch_size, n_in)).astype(np.float32)
probs = forward(X_batch)
preds = probs.argmax(axis=1)
conf  = probs.max(axis=1)

print(f"Batch: {X_batch.shape} -> probs: {probs.shape}")
print(f"Predictions: {preds[:10]}")
print(f"Confidence: min={conf.min():.3f}, max={conf.max():.3f}, mean={conf.mean():.3f}")

# Top-3 predictions per sample
top3 = np.argsort(probs, axis=1)[:, -3:][:, ::-1]
print(f"Top-3 class indices for sample 0: {top3[0]}")
💼 Real-World: End-to-End ML Pipeline
A data scientist implements a complete train/eval cycle using only NumPy: feature normalization, train/val/test split, logistic regression with gradient descent, and evaluation metrics.
import numpy as np

rng = np.random.default_rng(42)
N, D = 1000, 10

# Synthetic binary classification data
X = rng.standard_normal((N, D))
true_w = rng.standard_normal(D)
y = (X @ true_w + rng.normal(0, 0.5, N) > 0).astype(float)

# Train/val/test split (70/15/15)
idx = rng.permutation(N)
n_train = int(0.7*N); n_val = int(0.15*N)
tr, va, te = idx[:n_train], idx[n_train:n_train+n_val], idx[n_train+n_val:]

def normalize(X_tr, X_te):
    mu, std = X_tr.mean(0), X_tr.std(0) + 1e-8
    return (X_tr-mu)/std, (X_te-mu)/std

X_tr, X_te = normalize(X[tr], X[te])
X_tr, X_va = normalize(X[tr], X[va])

# Logistic regression with SGD
def sigmoid(z): return 1 / (1 + np.exp(-np.clip(z, -100, 100)))

w = np.zeros(D)
lr, n_epochs, bs = 0.1, 20, 32
for epoch in range(n_epochs):
    perm = rng.permutation(len(tr))
    for i in range(0, len(tr), bs):
        xi = X_tr[perm[i:i+bs]]
        yi = y[tr][perm[i:i+bs]]
        pred = sigmoid(xi @ w)
        grad = xi.T @ (pred - yi) / len(yi)
        w -= lr * grad

def accuracy(X, y_true, w):
    return ((sigmoid(X @ w) >= 0.5) == y_true.astype(bool)).mean()

print(f"Train acc: {accuracy(X_tr, y[tr], w):.3f}")
print(f"Val   acc: {accuracy(X_va, y[va], w):.3f}")
print(f"Test  acc: {accuracy(X_te, y[te], w):.3f}")
🏋️ Practice: Cross-Validation
Implement k_fold_cv(X, y, model_fn, k=5, seed=42) where model_fn(X_train, y_train, X_val) returns predictions. Split data into k folds, train on k-1 folds, predict on the held-out fold, compute accuracy for each fold, and return mean and std. Test with a simple threshold classifier.
Starter Code
import numpy as np

def k_fold_cv(X, y, model_fn, k=5, seed=42):
    rng  = np.random.default_rng(seed)
    idx  = rng.permutation(len(y))
    fold_size = len(y) // k
    scores = []
    for fold in range(k):
        val_idx   = idx[fold*fold_size:(fold+1)*fold_size]
        train_idx = np.concatenate([idx[:fold*fold_size], idx[(fold+1)*fold_size:]])
        X_tr, y_tr = X[train_idx], y[train_idx]
        X_va, y_va = X[val_idx],   y[val_idx]
        preds = model_fn(X_tr, y_tr, X_va)
        scores.append((preds == y_va).mean())
    return {"mean": np.mean(scores), "std": np.std(scores), "folds": scores}

# Threshold classifier: predict 1 if feature 0 > median
def threshold_model(X_tr, y_tr, X_va):
    threshold = np.median(X_tr[:, 0])
    return (X_va[:, 0] > threshold).astype(int)

rng = np.random.default_rng(42)
N, D = 500, 5
X = rng.standard_normal((N, D))
y = (X[:, 0] + 0.5 * X[:, 1] > 0).astype(int)

result = k_fold_cv(X, y, threshold_model, k=5)
print(f"5-fold CV: {result['mean']:.3f} Β± {result['std']:.3f}")
print(f"Per-fold: {[round(s,3) for s in result['folds']]}")
✅ Practice Checklist