π’ NumPy
32 topics • Click any card to expand
NumPy arrays are the foundation of scientific computing. Create them from lists, ranges, or built-in factory functions.
import numpy as np
a = np.array([1, 2, 3, 4, 5])
b = np.array([[1, 2, 3], [4, 5, 6]]) # 2D
zeros = np.zeros((3, 4)) # 3Γ4 of 0.0
ones = np.ones((2, 3)) # 2Γ3 of 1.0
eye = np.eye(3) # 3Γ3 identity
full = np.full((2, 2), 7.0) # filled with 7.0
print("1D:", a)
print("2D shape:", b.shape)
print("Identity:\n", eye)import numpy as np
r = np.arange(0, 20, 2) # [0, 2, 4, ..., 18]
pts = np.linspace(0, 1, 5) # 5 evenly-spaced in [0,1]
log = np.logspace(0, 3, 4) # [1, 10, 100, 1000]
print("arange:", r)
print("linspace:", pts)
print("logspace:", log)
# Random arrays
rng = np.random.default_rng(42) # recommended new API
u = rng.random((3, 3)) # uniform [0,1)
n = rng.standard_normal((3, 3)) # standard normal
print("Random normal:\n", n.round(2))import numpy as np
# Checkerboard pattern using tile
unit = np.array([[0, 1], [1, 0]])
board = np.tile(unit, (4, 4))
print("Checkerboard (8x8):\n", board)
# meshgrid β create coordinate grids
x = np.linspace(-2, 2, 5)
y = np.linspace(-1, 1, 3)
XX, YY = np.meshgrid(x, y)
print("XX shape:", XX.shape) # (3, 5)
Z = np.sqrt(XX**2 + YY**2) # distance from origin
print("Z:\n", Z.round(2))
# np.empty β uninitialized (fast allocation)
buf = np.empty((3, 3), dtype=np.float64)
print("Empty (garbage values):", buf.shape, buf.dtype)import numpy as np
# np.fromfunction β build array from index formula
# Creates a 5x5 multiplication table
mul_table = np.fromfunction(lambda i, j: (i + 1) * (j + 1), (5, 5), dtype=int)
print("Multiplication table:\n", mul_table)
# Distance matrix from origin using fromfunction
dist = np.fromfunction(lambda i, j: np.sqrt(i**2 + j**2), (4, 4))
print("\nDistance from (0,0):\n", dist.round(2))
# Structured arrays β heterogeneous data like a database row
dt = np.dtype([('name', 'U10'), ('age', np.int32), ('score', np.float64)])
students = np.array([
('Alice', 23, 92.5),
('Bob', 25, 87.3),
('Carol', 22, 95.1),
], dtype=dt)
print("\nStructured array:", students)
print("Names:", students['name'])
print("Top scorer:", students[np.argmax(students['score'])]['name'])import numpy as np
np.random.seed(42)
n_samples, n_features = 5000, 12
X = np.random.randn(n_samples, n_features)
# Add bias column (ones) for intercept term
bias = np.ones((n_samples, 1))
X_b = np.hstack([bias, X])
print(f"Feature matrix shape: {X_b.shape}")
print(f"Memory: {X_b.nbytes / 1024:.1f} KB")
print(f"dtype: {X_b.dtype}")
print(f"Sample row[:5]: {X_b[0, :5].round(3)}")import numpy as np
# 1. 6x6 checkerboard using np.tile
unit = np.array([[0, 1], [1, 0]])
# TODO: board = np.tile(unit, ???)
# print("Checkerboard:\n", board)
# 2. 5x5 identity * diagonal scale
eye = np.eye(5)
scale = np.array([1, 2, 3, 4, 5])
# TODO: scaled = eye * ??? (use broadcasting/diag)
# print("Scaled identity:\n", scaled)
# 3. Log-spaced points, find closest to 500
pts = np.logspace(1, 4, 20) # 10^1 to 10^4
# TODO: idx = np.argmin(np.abs(pts - 500))
# print(f"Closest to 500: {pts[idx]:.2f} at index {idx}")Every NumPy array carries metadata: shape, dtype, number of dimensions, and memory size. Understanding these prevents bugs.
import numpy as np
a = np.array([[1.5, 2.0, 3.1],
[4.0, 5.5, 6.2]], dtype=np.float32)
print("shape: ", a.shape) # (2, 3)
print("ndim: ", a.ndim) # 2
print("dtype: ", a.dtype) # float32
print("size: ", a.size) # 6 (total elements)
print("itemsize:", a.itemsize) # 4 (bytes per element)
print("nbytes: ", a.nbytes) # 24 (total bytes)
print("Transpose shape:", a.T.shape)import numpy as np
a = np.arange(12) # [0..11]
r = a.reshape(3, 4)
print("Reshaped (3,4):\n", r)
# -1 auto-infers the dimension
c = a.reshape(2, -1) # (2, 6)
print("Auto reshape:", c.shape)
# Change dtype
b = a.astype(np.float32)
print("float32 dtype:", b.dtype)
# Flatten back to 1D
print("Flattened:", r.flatten())import numpy as np
a = np.arange(10)
# Slice = VIEW (shares memory)
v = a[2:6]
v[0] = 99
print("After modifying view:", a) # a is changed!
# .copy() = independent copy
c = a[2:6].copy()
c[0] = 0
print("After modifying copy:", a) # a unchanged
print("View shares memory:", np.shares_memory(a, v))
print("Copy shares memory:", np.shares_memory(a, c))
# reshape also returns a view when possible
r = a.reshape(2, 5)
print("Reshape shares memory:", np.shares_memory(a, r))import numpy as np
a = np.arange(12, dtype=np.float64).reshape(3, 4)
# Strides: bytes to step in each dimension
print("Shape: ", a.shape)
print("Strides: ", a.strides) # (32, 8) β 4 float64s per row
print("Itemsize: ", a.itemsize) # 8 bytes per float64
# C order (row-major, default): last index changes fastest
c_arr = np.ascontiguousarray(a, order='C')
print("\nC-order strides:", c_arr.strides) # (32, 8)
print("C contiguous: ", c_arr.flags['C_CONTIGUOUS'])
# Fortran order (column-major): first index changes fastest
f_arr = np.asfortranarray(a, order='F')
print("\nF-order strides:", f_arr.strides) # (8, 24)
print("F contiguous: ", f_arr.flags['F_CONTIGUOUS'])
# Transpose flips strides β still a view, not a copy
t = a.T
print("\nTranspose strides:", t.strides) # (8, 32)
print("Shares memory with original:", np.shares_memory(a, t))import numpy as np
sensors = {
"temperature": np.random.uniform(15, 40, (10000, 24)),
"pressure": np.random.uniform(900, 1100, (10000, 24)),
"humidity": np.random.uniform(20, 90, (10000, 24)),
}
print(f"{'Sensor':12s} {'float64 MB':>12} {'float32 MB':>12} {'Saved MB':>10}")
print("-" * 50)
for name, arr in sensors.items():
mb64 = arr.nbytes / 1e6
mb32 = arr.astype(np.float32).nbytes / 1e6
print(f"{name:12s} {mb64:>12.3f} {mb32:>12.3f} {mb64-mb32:>10.3f}")import numpy as np
dtypes = [np.int8, np.float32, np.complex128]
base_bytes = np.zeros((4, 5), dtype=np.float64).nbytes
for dt in dtypes:
arr = np.zeros((4, 5), dtype=dt)
# TODO: print shape, ndim, dtype, itemsize, nbytes
# TODO: compute saving vs float64 base_bytes
print(f"dtype={arr.dtype}: shape={arr.shape}, "
f"ndim=???, itemsize=???B, nbytes=???B, "
f"vs float64: ???B saving")
# Reshape and verify view
a = np.arange(20, dtype=np.float32).reshape(4, 5)
# TODO: b = a.reshape(2, 2, 5)
# TODO: print("shares_memory:", np.shares_memory(a, b))NumPy supports element access, slices, fancy (list-based) indexing, and boolean masks β all without Python loops.
import numpy as np
a = np.arange(10)
print(a[2]) # 2
print(a[-1]) # 9
print(a[2:7]) # [2 3 4 5 6]
print(a[::2]) # every other β [0 2 4 6 8]
print(a[::-1]) # reversed
m = np.arange(12).reshape(3, 4)
print("m[1,2]:", m[1, 2]) # 6
print("col 1:", m[:, 1]) # [1 5 9]
print("sub-matrix:\n", m[0:2, 1:3])import numpy as np
a = np.array([10, 20, 30, 40, 50])
idx = [0, 2, 4]
print("Fancy:", a[idx]) # [10 30 50]
m = np.arange(16).reshape(4, 4)
rows = [0, 1, 2]
cols = [1, 2, 3]
print("Diagonal slice:", m[rows, cols]) # [1 6 11]
# np.ix_ β all row/col combinations
print("Sub-matrix:\n", m[np.ix_(rows, cols)])import numpy as np
m = np.arange(25).reshape(5, 5)
print("Every 2nd row, every 2nd col:\n", m[::2, ::2])
print("Last 2 rows reversed:\n", m[-1::-2])
# np.take β fancy indexing along axis
data = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
order = [2, 0, 3, 1] # reorder rows
print("Reordered rows:\n", np.take(data, order, axis=0))
# Diagonal extraction
print("Main diagonal:", np.diag(m))
print("Offset diagonal:", np.diag(m, k=1)) # one above main
# Set values at fancy indices
arr = np.zeros(10, dtype=int)
arr[[1, 3, 5, 7, 9]] = 1
print("Odd positions set:", arr)import numpy as np
m = np.arange(25).reshape(5, 5)
print("Matrix:\n", m)
# np.ix_ β create open mesh for cross-product selection
rows = [0, 2, 4]
cols = [1, 3]
sub = m[np.ix_(rows, cols)]
print("\nnp.ix_ sub-matrix (rows 0,2,4 x cols 1,3):\n", sub)
# Scatter: write to all combinations of row/col indices
target = np.zeros((5, 5), dtype=int)
target[np.ix_(rows, cols)] = 9
print("\nAfter scatter write:\n", target)
# Gather: collect multiple blocks from a 3D array
batch = np.arange(60).reshape(3, 4, 5) # 3 matrices of (4,5)
sample_rows = [0, 2]
sample_cols = [1, 3, 4]
# Extract a sub-block from each matrix in the batch
gathered = batch[:, np.ix_(sample_rows, sample_cols)[0],
np.ix_(sample_rows, sample_cols)[1]]
print("\nBatch gather shape:", gathered.shape) # (3, 2, 3)
print("First matrix block:\n", gathered[0])import numpy as np
# 30 days of hourly readings
time_series = np.random.randn(24 * 30) + 20.0 # ~20Β°C
windows = np.lib.stride_tricks.sliding_window_view(time_series, 24)
print(f"Series: {time_series.shape} β Windows: {windows.shape}")
# Find 24-hour window with highest standard deviation
stds = windows.std(axis=1)
peak_idx = np.argmax(stds)
print(f"Most variable window starts at hour {peak_idx}")
print(f" std = {stds[peak_idx]:.3f}")
print(f" mean = {windows[peak_idx].mean():.2f}Β°C")
print(f" range [{windows[peak_idx].min():.1f}, {windows[peak_idx].max():.1f}]")import numpy as np
m = np.arange(36).reshape(6, 6)
print("Original:\n", m)
# 1. Extract 2x2 center block (rows 2-3, cols 2-3)
# TODO: center = m[???, ???]
# print("Center:\n", center)
# 2. Main diagonal
# TODO: diag = np.diag(m)
# print("Diagonal:", diag)
# 3. Set all values > 25 to -1 (in-place boolean indexing)
m_copy = m.copy()
# TODO: m_copy[???] = -1
# print("After masking >25:\n", m_copy)
# 4. Every other element from last row
# TODO: every_other = m[-1, ???]
# print("Every other (last row):", every_other)NumPy ufuncs (universal functions) operate element-wise on entire arrays β far faster than Python loops.
import numpy as np
a = np.array([1.0, 4.0, 9.0, 16.0])
b = np.array([2.0, 2.0, 3.0, 4.0])
print("a + b:", a + b)
print("a * b:", a * b)
print("a ** 2:", a ** 2)
print("sqrt:", np.sqrt(a))
print("log:", np.log(a).round(3))
print("exp:", np.exp([0, 1, 2]).round(3))
print("clip:", np.clip(a, 2, 10))import numpy as np
arr = np.array([1, 2, 3, 4, 5])
print("cumsum: ", np.cumsum(arr))
print("cumprod:", np.cumprod(arr))
print("diff: ", np.diff(arr))
x = np.linspace(-np.pi, np.pi, 5)
print("sin:", np.sin(x).round(2))
print("cos:", np.cos(x).round(2))
a = np.array([-3, -1, 0, 1, 3])
print("abs: ", np.abs(a))
print("sign:", np.sign(a))import numpy as np
# einsum β Einstein summation notation
A = np.random.randn(3, 4)
B = np.random.randn(4, 5)
# Matrix multiply: 'ij,jk->ik'
C = np.einsum('ij,jk->ik', A, B)
print("einsum matmul:", C.shape) # (3, 5)
# Trace (sum of diagonal): 'ii->'
sq = np.random.randn(4, 4)
print("trace:", np.einsum('ii->', sq), "==", np.trace(sq))
# Pairwise squared distances between points
points = np.array([[0,0],[1,0],[0,1],[1,1]], dtype=float)
diff = points[:, None, :] - points[None, :, :] # (4,4,2)
dist = np.sqrt((diff**2).sum(axis=-1))
print("Distance matrix:\n", dist.round(3))import numpy as np
# np.vectorize β wrap a scalar Python function for array inputs
def classify(x):
if x < 0: return 'negative'
if x == 0: return 'zero'
return 'positive'
v_classify = np.vectorize(classify)
data = np.array([-3, 0, 1, -1, 5])
print("vectorize:", v_classify(data))
# np.frompyfunc β similar but returns object arrays; useful for ufunc chaining
safe_log = np.frompyfunc(lambda x: np.log(x) if x > 0 else np.nan, 1, 1)
vals = np.array([-1.0, 0.0, 1.0, np.e, 10.0])
print("frompyfunc safe_log:", safe_log(vals).astype(float).round(3))
# np.piecewise β vectorized multi-branch function
x = np.linspace(-3, 3, 7)
y = np.piecewise(x,
[x < -1, (x >= -1) & (x <= 1), x > 1],
[lambda t: -t, # x < -1: y = -x
lambda t: t**2, # -1<=x<=1: y = x^2
lambda t: t]) # x > 1: y = x
print("\npiecewise input: ", x.round(2))
print("piecewise output:", y.round(2))import numpy as np
prices = np.array([100, 102, 98, 105, 103, 110, 108, 115, 112, 118],
dtype=np.float64)
# Percentage daily returns
pct_ret = np.diff(prices) / prices[:-1] * 100
# Log returns (additive, good for compounding)
log_ret = np.diff(np.log(prices))
# Annualized volatility (assuming 252 trading days)
ann_vol = log_ret.std() * np.sqrt(252)
# Sharpe-like ratio
sharpe = log_ret.mean() / log_ret.std() * np.sqrt(252)
print(f"Daily returns %: {pct_ret.round(2)}")
print(f"Annualized vol: {ann_vol:.4f}")
print(f"Sharpe ratio: {sharpe:.3f}")import numpy as np
# 1. Trig on degree array
degrees = np.array([0, 30, 45, 60, 90, 120, 180], dtype=float)
# TODO: radians = degrees * np.pi / 180
# TODO: s, c = np.sin(radians), np.cos(radians)
# TODO: verify = np.allclose(s**2 + c**2, 1)
# print("sinΒ²+cosΒ²==1:", verify)
# 2. Price array stats
np.random.seed(42)
prices = np.cumprod(1 + np.random.normal(0.001, 0.02, 30)) * 100
# TODO: pct_ret = np.diff(prices) / prices[:-1] * 100
# TODO: ma5 = np.convolve(prices, np.ones(5)/5, mode='valid')
# TODO: cum_ret = prices[-1] / prices[0] - 1
# print(f"Cumulative return: {cum_ret:.2%}")
# 3. Winsorize to [p10, p90]
data = np.random.randn(100) * 10
# TODO: p10, p90 = np.percentile(data, [10, 90])
# TODO: clipped = np.clip(data, p10, p90)
# print(f"Original range: [{data.min():.1f}, {data.max():.1f}]")
# print(f"Clipped range: [{clipped.min():.1f}, {clipped.max():.1f}]")Broadcasting lets NumPy perform operations on arrays of different shapes without copying data β memory-efficient and fast.
import numpy as np
a = np.array([[1, 2, 3],
[4, 5, 6]])
print("a + 10:\n", a + 10) # scalar
print("a * [1,2,3]:\n", a * np.array([1, 2, 3])) # row
print("a + [[10],[20]]:\n",
a + np.array([[10], [20]])) # columnimport numpy as np
X = np.random.randn(100, 5) # 100 samples, 5 features
# Z-score normalization
mean = X.mean(axis=0) # shape (5,) β per feature
std = X.std(axis=0)
X_z = (X - mean) / std # (100,5) op (5,) β broadcasts
print("After normalization:")
print(" mean:", X_z.mean(axis=0).round(8))
print(" std: ", X_z.std(axis=0).round(8))
# Min-max normalization
X_mm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
print("Min-max range:", X_mm.min(axis=0).round(2), "to", X_mm.max(axis=0).round(2))import numpy as np
# Outer product without np.outer
a = np.array([1, 2, 3, 4])
b = np.array([10, 20, 30])
outer = a[:, None] * b[None, :] # (4,1) * (1,3) β (4,3)
print("Outer product:\n", outer)
# All pairwise L2 distances between N points
rng = np.random.default_rng(0)
pts = rng.random((5, 2)) # 5 points in 2D
diff = pts[:, None, :] - pts[None, :, :] # (5,5,2)
dist = np.sqrt((diff**2).sum(axis=-1)) # (5,5)
print("Distance matrix:\n", dist.round(3))
print("Nearest neighbor of pt0:", np.argsort(dist[0])[1])import numpy as np
rng = np.random.default_rng(7)
# Batch matrix multiply: (B, M, K) @ (B, K, N) -> (B, M, N)
B, M, K, N = 4, 3, 5, 2
A_batch = rng.random((B, M, K))
W_batch = rng.random((B, K, N))
C_batch = np.matmul(A_batch, W_batch) # or A_batch @ W_batch
print("Batch matmul:", A_batch.shape, "@", W_batch.shape, "->", C_batch.shape)
# Broadcasting rule visualization: align shapes from the right
# (4, 1, 3) + ( 3, 3) -> (4, 3, 3) β rule: prepend 1s then stretch
x = rng.random((4, 1, 3))
y = rng.random((3, 3))
result = x + y
print("\nBroadcast (4,1,3) + (3,3) ->", result.shape)
# Shared weight matrix across all batch items: (B,M,K) @ (K,N)
W_shared = rng.random((K, N)) # single weight matrix
C_shared = A_batch @ W_shared # (4,3,5) @ (5,2) -> (4,3,2)
print("Shared-weight batch:", A_batch.shape, "@", W_shared.shape, "->", C_shared.shape)
print("All batch outputs equal?", False, "(each row of A_batch differs)")import numpy as np
# Simulate batch: (N=32, H=64, W=64, C=3)
images = np.random.randint(0, 256, (32, 64, 64, 3), dtype=np.uint8)
# ImageNet channel means and stds (R, G, B)
mean_rgb = np.array([0.485, 0.456, 0.406])
std_rgb = np.array([0.229, 0.224, 0.225])
# Scale to [0,1] then normalize β broadcasting: (32,64,64,3) op (3,)
normalized = (images.astype(np.float32) / 255.0 - mean_rgb) / std_rgb
print(f"Input: {images.shape} dtype={images.dtype}")
print(f"Output: {normalized.shape} dtype={normalized.dtype}")
print(f"Range: [{normalized.min():.3f}, {normalized.max():.3f}]")
print(f"Per-channel mean: {normalized.mean(axis=(0,1,2)).round(3)}")import numpy as np
np.random.seed(42)
# 1. Add bias column (column of 1s) to feature matrix
X = np.random.randn(50, 4)
# TODO: bias = np.ones((50, 1))
# TODO: X_b = np.hstack([bias, X])
# print("With bias:", X_b.shape) # (50, 5)
# 2. Row-wise softmax
logits = np.random.randn(3, 5)
# TODO: exp_l = np.exp(logits)
# TODO: softmax = exp_l / exp_l.sum(axis=1, keepdims=True)
# print("Row sums:", softmax.sum(axis=1)) # should be [1. 1. 1.]
# 3. Zero-center each row
M = np.random.randn(4, 6)
# TODO: row_means = M.mean(axis=1, keepdims=True)
# TODO: M_centered = M - row_means
# print("Row means after centering:", M_centered.mean(axis=1).round(10))Filter arrays with boolean conditions. np.where and np.select let you apply transformations without explicit loops.
import numpy as np
a = np.array([10, 25, 3, 47, 18, 32, 5])
mask = a > 20
print("Mask: ", mask)
print("Values>20:", a[mask])
# Modify in-place
a[a < 10] = 0
print("After zero small:", a)
# Multiple conditions
b = np.arange(20)
print("5 to 15:", b[(b >= 5) & (b <= 15)])import numpy as np
scores = np.array([45, 78, 55, 90, 33, 68, 82])
# np.where β binary choice
grade = np.where(scores >= 60, "Pass", "Fail")
print("Pass/Fail:", grade)
# np.select β multiple conditions
cond = [scores >= 90, scores >= 70, scores >= 50]
choices = ["A", "B", "C"]
letter = np.select(cond, choices, default="D")
print("Letter grades:", letter)
# np.where returns indices when given 1 arg
idx_fail = np.where(scores < 60)[0]
print("Failing indices:", idx_fail)import numpy as np
rng = np.random.default_rng(7)
data = rng.normal(50, 15, 20).round(1)
print("Data:", data)
# argwhere β indices where condition is True
low = np.argwhere(data < 35)
print("Indices where < 35:", low.flatten())
# Replace outliers (>2 std) with median
mean, std = data.mean(), data.std()
median = np.median(data)
outlier_mask = np.abs(data - mean) > 2 * std
cleaned = np.where(outlier_mask, median, data)
print(f"Replaced {outlier_mask.sum()} outliers with median {median:.1f}")
print("Cleaned:", cleaned)import numpy as np
rng = np.random.default_rng(3)
temps = rng.normal(20, 8, 12).round(1) # 12 hourly readings
print("Temperatures:", temps)
# np.where with compound condition (AND)
comfortable = np.where((temps >= 18) & (temps <= 26), temps, np.nan)
print("Comfortable temps only:", comfortable)
# np.select β clean multi-case replacement for nested np.where
conditions = [
temps < 5,
(temps >= 5) & (temps < 15),
(temps >= 15) & (temps < 25),
temps >= 25,
]
labels = ['freezing', 'cold', 'comfortable', 'hot']
category = np.select(conditions, labels, default='unknown')
print("Categories:", category)
# np.where returning indices (1-arg form) then using them
hot_idx = np.where(temps >= 25)[0]
print(f"Hot hours (index): {hot_idx} values: {temps[hot_idx]}")
# Compound OR mask
extreme = np.where((temps < 5) | (temps >= 30), 'extreme', 'normal')
print("Extreme flag:", extreme)import numpy as np
np.random.seed(1)
# 1 year of hourly outdoor temperatures (Β°C)
temps = np.random.normal(22, 7, 8760)
# Degree-hours for heating (< 16Β°C) and cooling (> 24Β°C)
heating = np.where(temps < 16, 16 - temps, 0)
cooling = np.where(temps > 24, temps - 24, 0)
extreme = (temps < 0) | (temps > 38)
print(f"Heating degree-hours: {heating.sum():.0f}")
print(f"Cooling degree-hours: {cooling.sum():.0f}")
print(f"Extreme hours (<0/>38): {extreme.sum()}")
print(f"Coldest: {temps.min():.1f}Β°C at hour {temps.argmin()}")
print(f"Hottest: {temps.max():.1f}Β°C at hour {temps.argmax()}")import numpy as np
np.random.seed(0)
data = np.random.normal(100, 20, 200)
# 1. Count per band
bands = [(None, 70), (70, 90), (90, 110), (110, 130), (130, None)]
labels = ['very_low', 'low', 'normal', 'high', 'very_high']
for label, (lo, hi) in zip(labels, bands):
if lo is None:
count = (data < hi).sum()
elif hi is None:
count = (data >= lo).sum()
else:
count = ((data >= lo) & (data < hi)).sum()
print(f" {label:10s}: {count}")
# 2. Label each value with np.select
conds = [
data < 70,
(data >= 70) & (data < 90),
(data >= 90) & (data < 110),
(data >= 110) & (data < 130),
]
# TODO: labeled = np.select(conds, labels[:4], default=labels[4])
# 3. Clip and verify
# TODO: clipped = np.clip(data, 60, 140)
# TODO: print("All in [60,140]:", np.all((clipped >= 60) & (clipped <= 140)))NumPy provides fast descriptive statistics along any axis, plus correlation, covariance, and percentile functions.
import numpy as np
data = np.array([4, 7, 13, 16, 21, 23, 24, 28, 30])
print("mean: ", np.mean(data))
print("median: ", np.median(data))
print("std: ", np.std(data).round(2))
print("25th: ", np.percentile(data, 25))
print("75th: ", np.percentile(data, 75))
# Axis-wise on 2D
m = np.random.randint(1, 10, (4, 5))
print("Row means:", m.mean(axis=1).round(2))
print("Col sums: ", m.sum(axis=0))import numpy as np
np.random.seed(42)
x = np.random.randn(100)
y = 0.8 * x + np.random.randn(100) * 0.5 # correlated
z = np.random.randn(100) # independent
# Pearson correlation matrix
data = np.vstack([x, y, z])
cov = np.cov(data)
corr = np.corrcoef(data)
print("Covariance matrix:\n", cov.round(2))
print("Correlation matrix:\n", corr.round(2))import numpy as np
rng = np.random.default_rng(42)
data = rng.normal(50, 10, 1000)
# Histogram
counts, edges = np.histogram(data, bins=10)
print("Bin edges:", edges.round(1))
print("Counts: ", counts)
# Manual histogram display
for i, (lo, hi, c) in enumerate(zip(edges, edges[1:], counts)):
bar = '#' * (c // 10)
print(f"[{lo:5.1f}-{hi:5.1f}] {bar} ({c})")
# Quantile / percentile
q = np.quantile(data, [0.25, 0.5, 0.75])
print(f"Q1={q[0]:.1f} Median={q[1]:.1f} Q3={q[2]:.1f} IQR={q[2]-q[0]:.1f}")
# nanmean β ignores NaN
a = np.array([1.0, 2.0, np.nan, 4.0, np.nan, 6.0])
print("nanmean:", np.nanmean(a), " nanstd:", np.nanstd(a).round(3))import numpy as np
rng = np.random.default_rng(99)
x = rng.normal(0, 1, 500)
y = 0.7 * x + rng.normal(0, 0.7, 500) # correlated with x
# percentile (takes 0-100) vs quantile (takes 0.0-1.0) β same result
p = np.percentile(x, [25, 50, 75])
q = np.quantile(x, [0.25, 0.50, 0.75])
print("percentile:", p.round(3))
print("quantile: ", q.round(3))
print("identical: ", np.allclose(p, q))
# corrcoef β Pearson correlation matrix
corr = np.corrcoef(x, y)
print(f"\nCorrelation x<->y: {corr[0,1]:.3f}")
# histogram2d β joint distribution of two variables
H, xedges, yedges = np.histogram2d(x, y, bins=5)
print("\n2D histogram (counts):\n", H.astype(int))
print("x edges:", xedges.round(2))
print("y edges:", yedges.round(2))
print("Most occupied bin count:", int(H.max()))import numpy as np
np.random.seed(42)
# 10,000 unit measurements (target: 50mm Β± 5mm)
measurements = np.random.normal(50, 5, 10000)
# Inject artificial defects
measurements[[100, 500, 2000, 7500]] = [85, 4, 92, 7]
mean = measurements.mean()
std = measurements.std()
z_score = np.abs((measurements - mean) / std)
outlier_mask = z_score > 3
outlier_vals = measurements[outlier_mask]
outlier_idx = np.where(outlier_mask)[0]
print(f"Process mean: {mean:.2f}mm, std: {std:.2f}mm")
print(f"Total units: {len(measurements):,}")
print(f"Defects found: {outlier_mask.sum()} ({outlier_mask.mean():.2%})")
print(f"Defect values: {outlier_vals.round(1)}")
print(f"At indices: {outlier_idx}")import numpy as np
np.random.seed(1)
scores = np.random.randint(0, 101, (10, 5))
subjects = ['Math', 'Science', 'English', 'History', 'Art']
print("Scores:\n", scores)
# 1. Per-column statistics
for j, subj in enumerate(subjects):
col = scores[:, j]
# TODO: print(f"{subj:8s}: mean={col.mean():.1f}, std={col.std():.1f}, min={col.min()}, max={col.max()}")
pass
# 2. Subject with highest variance
# TODO: variances = scores.var(axis=0)
# TODO: hardest = subjects[np.argmax(variances)]
# print(f"Highest variance subject: {hardest}")
# 3. Students with average below 50
# TODO: row_means = scores.mean(axis=1)
# TODO: struggling = np.where(row_means < 50)[0]
# print(f"Students below avg 50: rows {struggling}")
# 4. Correlation matrix
# TODO: corr = np.corrcoef(scores.T)
# print("Correlation:\n", corr.round(2))numpy.linalg provides matrix operations essential for machine learning, portfolio optimization, and scientific computing.
import numpy as np
A = np.array([[2, 1], [1, 3]], dtype=float)
B = np.array([[1, 2], [3, 4]], dtype=float)
print("A @ B:\n", A @ B) # matmul (preferred)
print("inv(A):\n", np.linalg.inv(A).round(3))
print("det(A):", np.linalg.det(A))
print("A @ inv(A):\n",
(A @ np.linalg.inv(A)).round(10)) # should be identityimport numpy as np
A = np.array([[4, 2], [1, 3]], dtype=float)
vals, vecs = np.linalg.eig(A)
print("Eigenvalues: ", vals)
print("Eigenvectors:\n", vecs.round(3))
# Solve the linear system Ax = b
b = np.array([10.0, 8.0])
x = np.linalg.solve(A, b)
print("Solution x:", x)
print("Verify Ax==b:", np.allclose(A @ x, b))import numpy as np
A = np.array([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]], dtype=float)
# SVD decomposition: A = U @ diag(S) @ Vt
U, S, Vt = np.linalg.svd(A)
print("Singular values:", S.round(4))
print("Rank (non-zero SVs):", np.sum(S > 1e-10))
# QR decomposition
Q, R = np.linalg.qr(A)
print("Q shape:", Q.shape, " R shape:", R.shape)
print("Reconstruct:", np.allclose(Q @ R, A))
# Norm and condition number
print("Frobenius norm:", np.linalg.norm(A, 'fro').round(3))
print("Condition number:", np.linalg.cond(A).round(1))import numpy as np
rng = np.random.default_rng(42)
# Least-squares: fit a line y = m*x + b to noisy data
x = np.linspace(0, 10, 20)
y_true = 2.5 * x + 1.0
y = y_true + rng.normal(0, 1.5, 20) # add noise
# Build design matrix [x, 1] for [m, b]
A = np.column_stack([x, np.ones(len(x))])
coeffs, residuals, rank, sv = np.linalg.lstsq(A, y, rcond=None)
print(f"lstsq fit: m={coeffs[0]:.3f} b={coeffs[1]:.3f} (true: m=2.5, b=1.0)")
# Eigenvalues and eigenvectors
M = np.array([[3, 1], [0, 2]], dtype=float)
eigenvalues, eigenvectors = np.linalg.eig(M)
print(f"\nEigenvalues: {eigenvalues}")
print(f"Eigenvectors:\n{eigenvectors.round(3)}")
# Verify: M @ v = lambda * v for each eigenpair
for i in range(len(eigenvalues)):
lhs = M @ eigenvectors[:, i]
rhs = eigenvalues[i] * eigenvectors[:, i]
print(f" Mv == lv (col {i}): {np.allclose(lhs, rhs)}")
# Condition number β high means near-singular, numerically unstable
well = np.array([[2., 1.], [1., 3.]])
ill = np.array([[1., 1.], [1., 1.0001]])
print(f"\nCondition (well-conditioned): {np.linalg.cond(well):.2f}")
print(f"Condition (ill-conditioned): {np.linalg.cond(ill):.0f}")import numpy as np
np.random.seed(0)
# 252 trading days, 5 assets
returns = np.random.randn(252, 5) * 0.01
cov = np.cov(returns.T) # 5Γ5 covariance matrix
# Closed-form minimum-variance weights: w = inv(Cov)Β·1 / (1'Β·inv(Cov)Β·1)
inv_cov = np.linalg.inv(cov)
ones = np.ones(5)
w_raw = inv_cov @ ones
w_minvar = w_raw / w_raw.sum() # normalize to sum = 1
port_var = float(w_minvar @ cov @ w_minvar)
port_std = np.sqrt(port_var * 252) # annualized
print("Min-variance weights:", w_minvar.round(4))
print(f"Sum of weights: {w_minvar.sum():.6f}")
print(f"Annualized portfolio vol: {port_std:.4f}")import numpy as np
# 2x + y - z = 8
# -3x - y + 2z = -11
# -2x + y + 2z = -3
A = np.array([
# TODO: fill in the coefficient matrix
], dtype=float)
b = np.array([8.0, -11.0, -3.0])
# TODO: x = np.linalg.solve(A, b)
# print("Solution: x={:.1f}, y={:.1f}, z={:.1f}".format(*x))
# print("Verify:", np.allclose(A @ x, b))
# TODO: det_A = np.linalg.det(A)
# print(f"det(A) = {det_A:.2f}")
# TODO: inv_A = np.linalg.inv(A)
# print("A @ inv(A) is identity:", np.allclose(A @ inv_A, np.eye(3)))Reshape, stack, split, and transpose arrays to match the formats expected by algorithms and frameworks.
import numpy as np
a = np.arange(24)
r = a.reshape(4, 6)
print("(4,6):\n", r)
b = a.reshape(2, 3, 4)
print("3D:", b.shape)
# -1 auto-infers
c = a.reshape(6, -1)
print("Auto (6,4):", c.shape)
print("Transpose:", r.T.shape)
print("Flatten:", r.flatten()[:8], "...")import numpy as np
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
print("hstack:", np.hstack([a, b]))
print("vstack:\n", np.vstack([a, b]))
print("stack(axis=1):\n", np.stack([a, b], axis=1))
# Split
arr = np.arange(12)
p1, p2, p3 = np.split(arr, 3)
print("Split:", p1, p2, p3)
# Concatenate 2D
m1 = np.ones((2, 3))
m2 = np.zeros((2, 3))
print("concat row:\n", np.concatenate([m1, m2], axis=0))import numpy as np
a = np.array([1, 2, 3])
# repeat β repeat each element
print("repeat each 3x:", np.repeat(a, 3)) # [1 1 1 2 2 2 3 3 3]
# tile β repeat the whole array
print("tile 3x:", np.tile(a, 3)) # [1 2 3 1 2 3 1 2 3]
# pad β add border around array
m = np.ones((3, 3), dtype=int)
padded = np.pad(m, pad_width=1, mode='constant', constant_values=0)
print("Padded (zeros border):\n", padded)
# roll β shift elements cyclically
arr = np.array([1, 2, 3, 4, 5])
print("roll right 2:", np.roll(arr, 2)) # [4 5 1 2 3]
print("roll left 2: ", np.roll(arr, -2)) # [3 4 5 1 2]import numpy as np
data = np.arange(24).reshape(4, 6)
print("Original (4x6):\n", data)
# np.split β equal-size splits (raises error if not divisible)
cols = np.split(data, 3, axis=1) # 3 chunks of (4, 2)
print("\nSplit into 3 col-chunks, each shape:", cols[0].shape)
# np.array_split β unequal splits are allowed
rows = np.array_split(data, 3, axis=0) # chunks of sizes 2, 1, 1
print("array_split row shapes:", [c.shape for c in rows])
# np.concatenate along different axes
a = np.ones((3, 4))
b = np.zeros((3, 4))
row_join = np.concatenate([a, b], axis=0) # (6, 4)
col_join = np.concatenate([a, b], axis=1) # (3, 8)
print("\nconcat axis=0 (row join):", row_join.shape)
print("concat axis=1 (col join):", col_join.shape)
# Round-trip: split then concatenate recovers original
chunks = np.array_split(data, [2, 3], axis=0) # at rows 2 and 3
restored = np.concatenate(chunks, axis=0)
print("\nRound-trip split->concat matches original:", np.array_equal(data, restored))import numpy as np
# Simulate a batch of images: (N, H, W, C) β TensorFlow/HWC format
batch_size, H, W, C = 32, 64, 64, 3
images = np.random.rand(batch_size, H, W, C).astype(np.float32)
# Flatten spatial dims for dense layer input
flat = images.reshape(batch_size, -1)
print(f"HWC flat: {images.shape} β {flat.shape}")
# Convert to CHW format for PyTorch: (N, C, H, W)
chw = images.transpose(0, 3, 1, 2)
print(f"CHW format: {chw.shape}")
# Stack two batches along batch axis
batch2 = np.random.rand(batch_size, H, W, C).astype(np.float32)
combined = np.concatenate([images, batch2], axis=0)
print(f"Combined: {combined.shape}")import numpy as np
# 1. Reshape, transpose, flatten
a = np.arange(60)
# TODO: r = a.reshape(3, 4, 5)
# TODO: t = r.transpose(2, 1, 0) # axes order (5, 4, 3)
# TODO: flat = t.flatten()
# print("Original shape:", r.shape)
# print("Transposed:", t.shape)
# print("Flattened:", flat.shape)
# 2. Stack three matrices along new axis
m1 = np.ones((4, 3))
m2 = np.zeros((4, 3))
m3 = np.full((4, 3), 2.0)
# TODO: stacked = np.stack([m1, m2, m3], axis=0)
# print("Stacked shape:", stacked.shape) # (3, 4, 3)
# 3. Split and re-stack
big = np.arange(96).reshape(12, 8)
# TODO: chunks = np.split(big, 4, axis=0) # 4 chunks of (3, 8)
# TODO: result = np.stack(chunks, axis=2) # (3, 8, 4)
# print("Result shape:", result.shape)NumPy's random module (use default_rng for new code) generates arrays from many distributions β key for simulations and augmentation.
import numpy as np
rng = np.random.default_rng(seed=42) # recommended API
u = rng.random((3, 3)) # uniform [0,1)
n = rng.standard_normal((3, 3)) # N(0,1)
i = rng.integers(0, 100, size=5) # integers
c = rng.choice([10,20,30,40], size=3, replace=False)
print("Uniform:\n", u.round(2))
print("Normal:\n", n.round(2))
print("Integers:", i)
print("Choice: ", c)import numpy as np
rng = np.random.default_rng(0)
normal = rng.normal(loc=50, scale=10, size=1000)
uniform = rng.uniform(low=0, high=100, size=1000)
binom = rng.binomial(n=10, p=0.3, size=1000)
poisson = rng.poisson(lam=5, size=1000)
for name, arr in [("normal",normal),("uniform",uniform),
("binom",binom),("poisson",poisson)]:
print(f"{name:8s}: mean={arr.mean():.2f} std={arr.std():.2f} "
f"range=[{arr.min():.0f},{arr.max():.0f}]")import numpy as np
rng = np.random.default_rng(42)
arr = np.arange(10)
# permutation β returns NEW shuffled array
shuffled = rng.permutation(arr)
print("Original: ", arr)
print("Permuted: ", shuffled)
# shuffle β in-place
arr2 = arr.copy()
rng.shuffle(arr2)
print("In-place: ", arr2)
# Bootstrap resampling
data = np.array([2.3, 4.1, 3.8, 5.0, 2.9, 4.4, 3.2, 4.7])
n_boot = 10000
boot_means = np.array([
rng.choice(data, size=len(data), replace=True).mean()
for _ in range(n_boot)
])
print(f"Bootstrap mean: {boot_means.mean():.3f}")
print(f"95% CI: [{np.percentile(boot_means,2.5):.3f}, "
f"{np.percentile(boot_means,97.5):.3f}]")import numpy as np
# New-style Generator β preferred over np.random.seed / np.random.randn
rng1 = np.random.default_rng(seed=2024)
rng2 = np.random.default_rng(seed=2024) # same seed β identical output
a = rng1.standard_normal(5)
b = rng2.standard_normal(5)
print("Seeded reproducibility (same seed):", np.allclose(a, b))
# Spawning independent sub-generators (safe for parallel work)
child_rngs = rng1.spawn(3)
print("Spawned", len(child_rngs), "independent generators")
print("Child 0 sample:", child_rngs[0].random(3).round(3))
# Compare normal vs uniform for same N
N = 100_000
rng = np.random.default_rng(0)
norm = rng.normal(loc=0.5, scale=0.15, size=N) # bell-shaped
unif = rng.uniform(low=0.0, high=1.0, size=N) # flat
for name, arr in [("normal", norm), ("uniform", unif)]:
print(f"{name:8s}: mean={arr.mean():.4f} std={arr.std():.4f} "
f"min={arr.min():.3f} max={arr.max():.3f}")
# Clamp normal to [0,1] range and compare coverage
norm_clipped = np.clip(norm, 0, 1)
print(f"\nNormal values outside [0,1]: {((norm<0)|(norm>1)).sum()} / {N}")
print(f"Uniform values outside [0,1]: {((unif<0)|(unif>1)).sum()} / {N}")import numpy as np
rng = np.random.default_rng(99)
n_sims = 100_000
n_days = 252
S0 = 100.0
mu_day = 0.0003 # daily drift
sig_day = 0.015 # daily volatility
# Geometric Brownian Motion
daily_ret = rng.normal(mu_day, sig_day, (n_sims, n_days))
paths = S0 * np.cumprod(1 + daily_ret, axis=1)
final = paths[:, -1]
var_95 = np.percentile(final, 5)
cvar_95 = final[final <= var_95].mean()
prob_loss = (final < S0).mean()
print(f"Expected year-end price: ${final.mean():.2f}")
print(f"95% Value-at-Risk (loss): ${S0 - var_95:.2f}")
print(f"95% CVaR (expected loss): ${S0 - cvar_95:.2f}")
print(f"Probability of loss: {prob_loss:.1%}")import numpy as np
rng = np.random.default_rng(42)
N = 1_000_000
# Roll two dice, compute sums
die1 = rng.integers(1, 7, size=N)
die2 = rng.integers(1, 7, size=N)
# TODO: sums = die1 + die2
# 1. Frequency of each outcome (2-12)
print("Sum | Empirical | Theoretical")
for s in range(2, 13):
# TODO: empirical = (sums == s).mean()
ways = min(s-1, 13-s) # number of ways to roll sum s
theoretical = ways / 36
# TODO: print(f" {s:2d} | {empirical:.4f} | {theoretical:.4f}")
pass
# 2. Probability of sum == 7
# TODO: p7 = (sums == 7).mean()
# print(f"P(sum=7): empirical={p7:.4f}, theoretical={1/6:.4f}")
# 3. Craps win (7 or 11 on first roll)
# TODO: craps_win = ((sums == 7) | (sums == 11)).mean()
# print(f"Craps first-roll win: {craps_win:.4f} (expected 0.2222)")Understand how NumPy stores data in memory β C vs Fortran order, strides, views vs copies, and how layout affects performance.
import numpy as np
# C-order: row-major (default)
A = np.array([[1,2,3],[4,5,6]], order='C')
B = np.array([[1,2,3],[4,5,6]], order='F') # Fortran-order: column-major
print('C-order strides:', A.strides) # (bytes per row, bytes per element)
print('F-order strides:', B.strides) # (bytes per element, bytes per col)
print('C contiguous:', A.flags['C_CONTIGUOUS'])
print('F contiguous:', B.flags['F_CONTIGUOUS'])
# Convert between orders
A_f = np.asfortranarray(A)
print('After asfortranarray:', A_f.strides)import numpy as np
arr = np.arange(12).reshape(3, 4)
# Slicing creates a VIEW (no copy)
view = arr[::2, ::2]
print('Is view:', np.shares_memory(arr, view)) # True
view[0, 0] = 999
print('Original changed:', arr[0, 0]) # 999 β shared memory!
# Fancy indexing creates a COPY
copy = arr[[0, 2], :][:, [0, 2]]
print('Is copy:', not np.shares_memory(arr, copy)) # True
copy[0, 0] = -1
print('Original unchanged:', arr[0, 0]) # still 999
# Force a copy explicitly
explicit_copy = arr.copy()
print('Explicit copy shares memory:', np.shares_memory(arr, explicit_copy))import numpy as np
# Zero-copy rolling window using stride tricks
arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
window_size = 4
windows = np.lib.stride_tricks.sliding_window_view(arr, window_size)
print('Windows shape:', windows.shape) # (7, 4)
print('Windows:\n', windows)
# Rolling statistics without loops
print('Rolling mean:', windows.mean(axis=1).round(2))
print('Rolling max: ', windows.max(axis=1))
print('Rolling std: ', windows.std(axis=1).round(3))
# 2D sliding window (e.g., image patches)
img = np.arange(25).reshape(5, 5)
patches = np.lib.stride_tricks.sliding_window_view(img, (3, 3))
print('Image patches shape:', patches.shape) # (3, 3, 3, 3)import numpy as np
a = np.random.rand(1_000_000)
b = np.random.rand(1_000_000)
result = np.empty_like(a)
# Avoid allocating a new array β write directly into result
np.add(a, b, out=result) # result = a + b (no temp array)
np.sqrt(a, out=a) # in-place square root
np.multiply(result, 2.0, out=result)
print('result[:5]:', result[:5].round(4))
print('a[:5] (sqrt in-place):', a[:5].round(4))
# Chain multiple ufunc operations with out
x = np.linspace(0, np.pi, 6)
tmp = np.empty_like(x)
np.sin(x, out=tmp) # tmp = sin(x)
np.square(tmp, out=tmp) # tmp = sin(x)^2 β still same memory
print('sin^2:', tmp.round(4))import numpy as np
np.random.seed(0)
ts = np.cumsum(np.random.randn(50)) # random walk
# TODO: create sliding window view with window_size=4
# TODO: compute mean and std of each window row
# TODO: stack into feature matrix (n_samples x 3): [window_mean, window_std, last_value]
# TODO: print shape and first 5 rowsStore heterogeneous data (mixed types) in a single NumPy array using structured dtypes β combine ints, floats, and strings like a lightweight in-memory table.
import numpy as np
# Define dtype with field names and types
dtype = np.dtype([
('name', 'U20'), # Unicode string, max 20 chars
('age', 'i4'), # 32-bit integer
('score', 'f8'), # 64-bit float
('grade', 'U2'), # short string
])
data = np.array([
('Alice', 25, 92.5, 'A'),
('Bob', 30, 78.3, 'B'),
('Carol', 22, 95.1, 'A'),
('Dave', 28, 65.0, 'C'),
], dtype=dtype)
print('Array dtype:', data.dtype)
print('Names:', data['name'])
print('Scores:', data['score'])
print('Alice row:', data[0])import numpy as np
dtype = np.dtype([('name','U20'),('dept','U10'),('salary','f8'),('years','i4')])
employees = np.array([
('Alice', 'Eng', 95000, 5),
('Bob', 'Sales', 72000, 3),
('Carol', 'Eng', 102000, 8),
('Dave', 'HR', 68000, 2),
('Eve', 'Eng', 88000, 4),
('Frank', 'Sales', 76000, 6),
], dtype=dtype)
# Filter: engineers only
eng = employees[employees['dept'] == 'Eng']
print('Engineers:', eng['name'])
# Sort by salary descending
sorted_emp = np.sort(employees, order='salary')[::-1]
print('By salary:', sorted_emp[['name', 'salary']])
# Boolean mask: high earners with 4+ years
high = employees[(employees['salary'] > 80000) & (employees['years'] >= 4)]
print('High earners:', high['name'])import numpy as np
# recarray lets you access fields as attributes (rec.name instead of rec['name'])
students = np.rec.array([
('Alice', 3.9, 'CS'),
('Bob', 3.5, 'Math'),
('Carol', 3.7, 'CS'),
('Dave', 3.2, 'Physics'),
], dtype=[('name','U20'),('gpa','f8'),('major','U10')])
print('Names (attr):', students.name)
print('GPAs (attr):', students.gpa)
print('CS students:', students.name[students.major == 'CS'])
print('Top student:', students.name[students.gpa.argmax()])
# recarray supports all standard numpy operations
print('Avg GPA:', students.gpa.mean().round(3))import numpy as np
import pandas as pd
dtype = np.dtype([('product','U30'),('price','f8'),('qty','i4'),('in_stock','?')])
products = np.array([
('Laptop', 999.99, 15, True),
('Mouse', 29.99, 80, True),
('Monitor', 399.99, 5, False),
('Keyboard', 79.99, 30, True),
], dtype=dtype)
# Structured array -> DataFrame
df = pd.DataFrame(products)
print('DataFrame:')
print(df)
# DataFrame -> structured array
df2 = pd.DataFrame({'x': [1.0, 2.0], 'y': [3.0, 4.0], 'label': ['a', 'b']})
rec = df2.to_records(index=False)
print('\nRecord array:', rec)
print('x field:', rec['x'])import numpy as np
dtype = np.dtype([('name','U20'),('grade','i4'),('gpa','f8')])
students = np.array([
('Alice', 11, 3.9), ('Bob', 12, 3.4), ('Carol', 11, 3.7),
('Dave', 12, 3.1), ('Eve', 10, 3.8), ('Frank', 10, 3.5),
], dtype=dtype)
# TODO: sort by gpa descending, print top 3
# TODO: for each unique grade, compute and print average gpaNumPy's linear algebra routines and polynomial tools β solve systems of equations, compute eigendecompositions, SVD, and fit curves to data.
import numpy as np
# Solve Ax = b
# 3x + y = 9
# x + 2y = 8
A = np.array([[3, 1], [1, 2]], dtype=float)
b = np.array([9, 8], dtype=float)
x = np.linalg.solve(A, b)
print('Solution x:', x) # [2. 3.]
print('Verify Ax == b:', np.allclose(A @ x, b))
# Matrix inverse (prefer linalg.solve for stability)
A_inv = np.linalg.inv(A)
print('Via inverse:', A_inv @ b)
# Check if system is well-conditioned
print('Condition number:', np.linalg.cond(A))
# Least-squares for overdetermined systems
A_over = np.vstack([A, [2, 3]])
b_over = np.append(b, 13)
x_ls, _, _, _ = np.linalg.lstsq(A_over, b_over, rcond=None)
print('Least-squares solution:', x_ls.round(4))import numpy as np
A = np.array([[4, 2], [1, 3]], dtype=float)
vals, vecs = np.linalg.eig(A)
print('Eigenvalues: ', vals) # [5. 2.]
print('Eigenvectors:\n', vecs) # columns are eigenvectors
# Verify: A @ v = lambda * v
for lam, v in zip(vals, vecs.T):
print(f'lambda={lam:.1f}: A@v={A@v.round(4)}, lam*v={lam*v.round(4)}, match={np.allclose(A@v, lam*v)}')
# Symmetric matrix has real eigenvalues β use eigh for efficiency
S = np.array([[6, 2, 1], [2, 3, 1], [1, 1, 1]], dtype=float)
vals_s, vecs_s = np.linalg.eigh(S) # eigh for symmetric/Hermitian
print('Symmetric eigenvalues (sorted):', vals_s.round(3))import numpy as np
# SVD: M = U @ diag(s) @ Vt
M = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10,11,12]], dtype=float)
U, s, Vt = np.linalg.svd(M, full_matrices=False)
print('U shape:', U.shape, '| s:', s.round(3), '| Vt shape:', Vt.shape)
print('Reconstruct error:', np.allclose(M, U @ np.diag(s) @ Vt))
# Low-rank approximation: keep top-k singular values
k = 1
M_approx = U[:, :k] @ np.diag(s[:k]) @ Vt[:k, :]
print(f'Rank-{k} approx:\n', M_approx.round(2))
print(f'Frobenius error: {np.linalg.norm(M - M_approx):.4f}')
print(f'Variance explained: {(s[:k]**2).sum()/(s**2).sum():.1%}')import numpy as np
# Generate noisy quadratic data
np.random.seed(42)
x = np.linspace(-3, 3, 50)
y_true = 2*x**2 - 3*x + 1
y_noisy = y_true + np.random.randn(50) * 2
# Fit polynomial of degree 2
coeffs = np.polyfit(x, y_noisy, deg=2)
print('Fitted coefficients:', coeffs.round(4)) # should be ~[2, -3, 1]
# Evaluate the fitted polynomial
y_fit = np.polyval(coeffs, x)
rmse = np.sqrt(np.mean((y_fit - y_true)**2))
print(f'RMSE vs true: {rmse:.4f}')
# Compare degrees
for deg in [1, 2, 3, 5]:
c = np.polyfit(x, y_noisy, deg)
y_hat = np.polyval(c, x)
print(f'Degree {deg}: train RMSE = {np.sqrt(np.mean((y_hat - y_noisy)**2)):.4f}')import numpy as np
np.random.seed(42)
x = np.linspace(0, 2*np.pi, 60)
y = np.sin(x) + np.random.randn(60) * 0.2
# TODO: for degrees [1, 3, 5, 7]:
# - fit np.polyfit
# - compute y_hat with np.polyval
# - compute RMSE
# - print results
# TODO: print which degree has lowest RMSEUse integer arrays, boolean masks, np.where, np.take, and advanced multi-dimensional indexing to select and modify array elements efficiently.
import numpy as np
np.random.seed(42)
arr = np.random.randint(0, 100, (5, 6))
print('Array:\n', arr)
# Select specific rows
row_idx = [0, 2, 4]
print('\nRows 0, 2, 4:\n', arr[row_idx])
# Select specific (row, col) pairs
row_idx = [0, 1, 2, 3]
col_idx = [5, 4, 3, 2]
print('\nDiagonal elements (row, col pairs):', arr[row_idx, col_idx])
# Outer indexing: all combinations
rows = np.array([0, 2])
cols = np.array([1, 3, 5])
print('\nOuter indexing (2 rows x 3 cols):\n', arr[np.ix_(rows, cols)])
# Assign values via fancy index
arr2 = arr.copy()
arr2[[1, 3], :] = -1
print('\nAfter zeroing rows 1, 3:\n', arr2)import numpy as np
np.random.seed(0)
data = np.random.randn(20)
print('Data:', data.round(2))
# Boolean mask
mask = data > 0.5
print(f'\nValues > 0.5: {data[mask].round(2)}')
print(f'Count: {mask.sum()}')
# np.where: conditional selection
clipped = np.where(data > 0, data, 0) # ReLU-style
print('\nReLU output:', clipped.round(2))
# Nested np.where: multi-condition
labels = np.where(data > 1, 'high', np.where(data < -1, 'low', 'mid'))
print('\nLabels:', labels)
for cat in ['low', 'mid', 'high']:
print(f' {cat}: {(labels == cat).sum()}')
# np.where for index retrieval
idx_high = np.where(data > 1)[0]
print('\nIndices of high values:', idx_high)import numpy as np
np.random.seed(42)
scores = np.array([82, 45, 91, 67, 88, 55, 73, 96, 61, 79])
names = np.array(['Alice','Bob','Carol','Dave','Eve','Frank','Grace','Hank','Iris','Jake'])
# argsort: get ranking indices
ranking = np.argsort(scores)[::-1] # descending
print('Top 5 students:')
for rank, idx in enumerate(ranking[:5], 1):
print(f' {rank}. {names[idx]}: {scores[idx]}')
# np.take: equivalent to fancy indexing but works on flattened axis
top3_scores = np.take(scores, ranking[:3])
print('\nTop 3 scores:', top3_scores)
# np.partition: O(n) top-k (faster than full sort)
top_k = 3
partitioned = np.partition(scores, -top_k)[-top_k:]
print(f'Top {top_k} (unordered):', partitioned)
# np.searchsorted: binary search on sorted array
sorted_scores = np.sort(scores)
threshold = 80
insert_pos = np.searchsorted(sorted_scores, threshold)
print(f'\nScores >= {threshold}: {sorted_scores[insert_pos:]}')import numpy as np
np.random.seed(42)
# 3D array: (batch, height, width)
img_batch = np.random.randint(0, 256, (4, 8, 8), dtype=np.uint8)
print('Batch shape:', img_batch.shape)
# Select specific pixel from each image in batch
rows = [1, 2, 3, 4] # row per image
cols = [1, 2, 3, 4] # col per image
batch_idx = [0, 1, 2, 3]
selected_pixels = img_batch[batch_idx, rows, cols]
print('One pixel per image:', selected_pixels)
# Vectorized update: zero out a 3x3 patch in all images
img_batch[:, 2:5, 2:5] = 0
print('After zeroing patch:', img_batch[0, 1:6, 1:6])
# Use np.indices for meshgrid-style indexing
H, W = 4, 4
ri, ci = np.indices((H, W))
dist_from_center = np.sqrt((ri - H//2)**2 + (ci - W//2)**2)
print('\nDistance from center:\n', dist_from_center.round(2))import numpy as np
np.random.seed(42)
scores = np.random.randint(40, 100, (30, 5))
subjects = ['Math', 'Science', 'English', 'History', 'Art']
print('Score matrix shape:', scores.shape)
# TODO: (1) Find best and worst subject per student (argmax/argmin)
# TODO: (2) Top 5 students by total score (np.argpartition)
# TODO: (3) Assign letter grades A/B/C/D/F using np.where
# TODO: (4) Pass/fail mask (avg >= 70 in at least 3 subjects)
Profile NumPy code, replace Python loops with vectorized operations, use einsum, out= parameters, and Numba JIT compilation for maximum speed.
import numpy as np
import time
np.random.seed(42)
n = 1_000_000
x = np.random.randn(n)
y = np.random.randn(n)
# Python loop (slow)
def dot_loop(a, b):
result = 0.0
for i in range(len(a)):
result += a[i] * b[i]
return result
# NumPy vectorized (fast)
def dot_numpy(a, b):
return np.dot(a, b)
# Benchmark (small n for loop)
small_n = 10_000
x_s, y_s = x[:small_n], y[:small_n]
t0 = time.perf_counter()
for _ in range(5): dot_loop(x_s, y_s)
loop_time = (time.perf_counter() - t0) / 5
t0 = time.perf_counter()
for _ in range(100): dot_numpy(x, y)
np_time = (time.perf_counter() - t0) / 100
print(f'Loop (n={small_n:,}): {loop_time*1000:.2f}ms')
print(f'NumPy (n={n:,}): {np_time*1000:.2f}ms')
print(f'NumPy processes {n/small_n:.0f}x more data in {loop_time/np_time:.0f}x less time')
print(f'Speedup: ~{loop_time*n/(np_time*small_n):.0f}x')import numpy as np
import time
np.random.seed(0)
A = np.random.randn(50, 30)
B = np.random.randn(30, 40)
C = np.random.randn(50, 40)
# Matrix multiplication
result1 = np.einsum('ij,jk->ik', A, B) # same as A @ B
result2 = np.einsum('ij,ij->', A, A[:,:30]) # element-wise sum of squares (trace of A.T@A)
# Batch matrix multiply: (batch, M, K) x (batch, K, N) -> (batch, M, N)
batch = 16
X = np.random.randn(batch, 10, 8)
W = np.random.randn(batch, 8, 6)
bmm_einsum = np.einsum('bij,bjk->bik', X, W)
bmm_loop = np.array([x @ w for x, w in zip(X, W)])
print('einsum batch matmul shape:', bmm_einsum.shape)
print('Matches loop result:', np.allclose(bmm_einsum, bmm_loop))
# Outer product
v1, v2 = np.array([1,2,3]), np.array([4,5,6])
outer = np.einsum('i,j->ij', v1, v2)
print('Outer product:\n', outer)
# Trace
M = np.random.randn(4, 4)
print(f'Trace: einsum={np.einsum("ii", M):.4f}, np.trace={np.trace(M):.4f}')import numpy as np
import time
np.random.seed(42)
n = 5_000_000
x = np.random.rand(n).astype(np.float32)
y = np.random.rand(n).astype(np.float32)
# Without out= (allocates new array each time)
def compute_no_out(x, y, n_iters=20):
result = np.empty_like(x)
for _ in range(n_iters):
result = np.sin(x) + np.cos(y) # creates temporary arrays
return result
# With out= (reuses pre-allocated buffers)
def compute_with_out(x, y, n_iters=20):
buf1 = np.empty_like(x)
buf2 = np.empty_like(x)
result = np.empty_like(x)
for _ in range(n_iters):
np.sin(x, out=buf1) # no allocation
np.cos(y, out=buf2)
np.add(buf1, buf2, out=result)
return result
t0 = time.perf_counter()
r1 = compute_no_out(x, y)
t1 = time.perf_counter() - t0
t0 = time.perf_counter()
r2 = compute_with_out(x, y)
t2 = time.perf_counter() - t0
print(f'Without out=: {t1:.3f}s')
print(f'With out=: {t2:.3f}s')
print(f'Speedup: {t1/t2:.2f}x')
print(f'Results match: {np.allclose(r1, r2)}')try:
import numba
from numba import njit
import numpy as np
import time
@njit
def pairwise_dist_numba(X):
n = X.shape[0]
D = np.empty((n, n), dtype=np.float64)
for i in range(n):
for j in range(i, n):
d = 0.0
for k in range(X.shape[1]):
diff = X[i, k] - X[j, k]
d += diff * diff
D[i, j] = D[j, i] = d**0.5
return D
np.random.seed(42)
X = np.random.randn(500, 10)
# Warm-up JIT
pairwise_dist_numba(X[:5])
t0 = time.perf_counter()
D_numba = pairwise_dist_numba(X)
t_numba = time.perf_counter() - t0
# NumPy equivalent
t0 = time.perf_counter()
from sklearn.metrics import pairwise_distances
D_np = pairwise_distances(X)
t_np = time.perf_counter() - t0
print(f'Numba: {t_numba*1000:.1f}ms')
print(f'sklearn pairwise: {t_np*1000:.1f}ms')
print(f'Match: {np.allclose(D_numba, D_np)}')
except ImportError:
print('pip install numba')
print('Numba @njit: compiles Python+NumPy loops to LLVM machine code.')
print('Typical speedup: 10-100x for loop-heavy numeric code.')import numpy as np
import time
np.random.seed(42)
X = np.random.randn(1000, 5)
# TODO: (1) Python nested loop (test on X[:50] for speed)
# TODO: (2) NumPy broadcasting: (n,1,d) - (1,n,d), then norm
# TODO: (3) scipy or einsum approach
# TODO: Benchmark all three and print speedups
Apply Fourier transforms, filtering, convolution, and spectral analysis to 1D and 2D signals using NumPy's FFT routines.
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# Composite signal: 5Hz + 20Hz + noise
fs = 500 # sampling rate
t = np.linspace(0, 1, fs, endpoint=False)
signal = (np.sin(2*np.pi*5*t) +
0.5*np.sin(2*np.pi*20*t) +
np.random.randn(fs)*0.3)
# FFT
fft_vals = np.fft.rfft(signal)
freqs = np.fft.rfftfreq(fs, d=1/fs)
power = np.abs(fft_vals)**2
# Find dominant frequencies
top_freq_idx = np.argsort(power)[::-1][:3]
print('Top frequencies (Hz):', freqs[top_freq_idx].round(1))
print('Corresponding power:', power[top_freq_idx].round(1))
fig, axes = plt.subplots(2, 1, figsize=(10, 6))
axes[0].plot(t[:100], signal[:100])
axes[0].set_title('Signal (first 100 samples)'); axes[0].set_xlabel('Time (s)')
axes[1].plot(freqs[:50], power[:50])
axes[1].set_title('Power Spectrum'); axes[1].set_xlabel('Frequency (Hz)')
plt.tight_layout(); plt.savefig('fft.png', dpi=80); plt.close(); print('Saved fft.png')import numpy as np
# Create noisy signal
fs = 1000
t = np.linspace(0, 1, fs)
clean = np.sin(2*np.pi*10*t) # 10 Hz signal
noisy = clean + 0.5*np.random.randn(fs) + 0.3*np.sin(2*np.pi*200*t)
# FFT-based low-pass filter
def lowpass_fft(signal, fs, cutoff_hz):
fft_sig = np.fft.rfft(signal)
freqs = np.fft.rfftfreq(len(signal), d=1/fs)
fft_sig[freqs > cutoff_hz] = 0 # zero out high frequencies
return np.fft.irfft(fft_sig)
filtered = lowpass_fft(noisy, fs, cutoff_hz=30)
snr_noisy = 10*np.log10(np.var(clean) / np.var(noisy - clean))
snr_filtered = 10*np.log10(np.var(clean) / np.var(filtered[:len(clean)] - clean))
print(f'SNR (noisy): {snr_noisy:.2f} dB')
print(f'SNR (filtered): {snr_filtered:.2f} dB')
print(f'Improvement: {snr_filtered - snr_noisy:.2f} dB')
print(f'Correlation (filtered vs clean): {np.corrcoef(clean, filtered[:len(clean)])[0,1]:.4f}')import numpy as np
np.random.seed(42)
n = 200
t = np.arange(n)
signal = np.sin(0.1*t) + np.random.randn(n)*0.5
# Manual convolution = moving average
def moving_average(x, w):
kernel = np.ones(w) / w
return np.convolve(x, kernel, mode='same')
# Gaussian smoothing kernel
def gaussian_kernel(size, sigma):
x = np.arange(-(size//2), size//2+1)
k = np.exp(-x**2 / (2*sigma**2))
return k / k.sum()
ma5 = moving_average(signal, 5)
ma20 = moving_average(signal, 20)
gk = gaussian_kernel(21, sigma=3)
gauss_smooth = np.convolve(signal, gk, mode='same')
# Derivative via convolution (finite difference kernel)
diff_kernel = np.array([1, 0, -1]) / 2
derivative = np.convolve(signal, diff_kernel, mode='same')
print(f'Signal length: {len(signal)}')
print(f'MA-5 smoothed std: {ma5.std():.3f} (original: {signal.std():.3f})')
print(f'MA-20 smoothed std: {ma20.std():.3f}')
print(f'Gaussian smooth std: {gauss_smooth.std():.3f}')
print(f'Derivative range: [{derivative.min():.3f}, {derivative.max():.3f}]')import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# Create a synthetic 2D image with patterns
np.random.seed(42)
H, W = 64, 64
x, y = np.meshgrid(np.arange(W), np.arange(H))
# Low-freq + high-freq patterns + noise
image = (np.sin(2*np.pi*2*x/W) + np.sin(2*np.pi*5*y/H) +
0.3*np.random.randn(H, W))
# 2D FFT
fft2d = np.fft.fft2(image)
fft_mag = np.abs(np.fft.fftshift(fft2d)) # shift zero-freq to center
log_mag = np.log1p(fft_mag) # log scale for visualization
# Low-pass filter: zero out high-freq components
center = np.array([H//2, W//2])
Y, X = np.ogrid[:H, :W]
dist_from_center = np.sqrt((Y - H//2)**2 + (X - W//2)**2)
mask = dist_from_center <= 10
fft_shift = np.fft.fftshift(fft2d)
fft_shift *= mask
filtered = np.real(np.fft.ifft2(np.fft.ifftshift(fft_shift)))
print(f'Original image PSNR vs filtered: {10*np.log10(image.max()**2 / ((image-filtered)**2).mean()):.1f} dB')
print(f'Dominant frequency components in top 5% power: {(log_mag > np.percentile(log_mag, 95)).sum()}')
fig, axes = plt.subplots(1, 3, figsize=(12, 4))
axes[0].imshow(image, cmap='gray'); axes[0].set_title('Original')
axes[1].imshow(log_mag, cmap='hot'); axes[1].set_title('2D Spectrum (log)')
axes[2].imshow(filtered, cmap='gray'); axes[2].set_title('Low-pass Filtered')
plt.tight_layout(); plt.savefig('fft2d.png', dpi=80); plt.close(); print('Saved fft2d.png')import numpy as np
np.random.seed(42)
fs = 8000
t = np.linspace(0, 1, fs)
clean = np.sin(2*np.pi*440*t) + 0.5*np.sin(2*np.pi*880*t)
noisy = clean + np.random.randn(fs) * 0.8
# TODO: (1) FFT + power spectrum, find top frequency peaks
# TODO: (2) Bandpass FFT filter (keep 400-950Hz)
# TODO: (3) Reconstruct with ifft
# TODO: (4) SNR before and after
# TODO: (5) Compare with 21-pt Gaussian convolution
NumPy's sort, argsort, searchsorted, and partition give O(n log n) and O(n) ordering operations that operate on arrays without Python loops.
import numpy as np
arr = np.array([3, 1, 4, 1, 5, 9, 2, 6])
# sort: returns sorted copy
sorted_arr = np.sort(arr)
print("sorted:", sorted_arr)
# argsort: indices that would sort the array
idx = np.argsort(arr)
print("argsort:", idx)
print("verify:", arr[idx]) # same as sorted_arr
# Sort along an axis (2D)
m = np.array([[3, 1, 4], [1, 5, 9], [2, 6, 5]])
print("row-wise sort:\n", np.sort(m, axis=1))
print("col-wise sort:\n", np.sort(m, axis=0))
# Stable sort for multi-key sorting
records = np.array([(2, 'b'), (1, 'a'), (2, 'a'), (1, 'b')],
dtype=[('key', int), ('val', 'U1')])
records.sort(order=['key', 'val'])
print("multi-key sort:", records)import numpy as np
data = np.array([10, 5, 8, 3, 15, 7, 2, 12])
print("min:", data.min(), "at index:", data.argmin())
print("max:", data.max(), "at index:", data.argmax())
# 2D: axis parameter
m = np.array([[3, 1, 4], [1, 5, 9], [2, 6, 5]])
print("argmin per row:", m.argmin(axis=1)) # [1, 0, 0]
print("argmax per col:", m.argmax(axis=0)) # [0, 2, 1]
# searchsorted: binary search in sorted array (O(log n))
sorted_data = np.array([1, 3, 5, 7, 9, 11, 13])
vals = np.array([0, 3, 6, 14])
# 'left': index where val would be inserted to keep sorted
left_idx = np.searchsorted(sorted_data, vals, side='left')
right_idx = np.searchsorted(sorted_data, vals, side='right')
print("left insert positions:", left_idx)
print("right insert positions:", right_idx)
# Use case: check if value exists
def is_in_sorted(arr, vals):
idx = np.searchsorted(arr, vals)
return (idx < len(arr)) & (arr[np.minimum(idx, len(arr)-1)] == vals)
print("vals in array:", is_in_sorted(sorted_data, vals))import numpy as np
np.random.seed(42)
scores = np.random.randint(0, 100, size=20)
print("Scores:", scores)
# partition: rearrange so k-th element is in sorted position
# Elements left of k are <= arr[k], right are >= arr[k]
k = 5
partitioned = np.partition(scores, k)
print(f"After partition(k={k}): {partitioned}")
print(f"Element at k={k}: {partitioned[k]} (this is the {k+1}th smallest)")
# Get top-5 scores (not sorted β just the 5 largest)
top5_idx = np.argpartition(scores, -5)[-5:]
top5_vals = scores[top5_idx]
print(f"Top-5 values (unsorted): {top5_vals}")
print(f"Top-5 values (sorted): {np.sort(top5_vals)[::-1]}")
# Bottom-5 (smallest)
bot5_idx = np.argpartition(scores, 5)[:5]
bot5_vals = scores[bot5_idx]
print(f"Bottom-5 values: {np.sort(bot5_vals)}")
# Speed comparison concept: partition is O(n), sort is O(n log n)
# For finding top-k from 1M elements, argpartition is much fasterimport numpy as np
np.random.seed(42)
n_reps = 10_000
rep_ids = np.arange(n_reps)
revenues = np.random.lognormal(mean=10, sigma=1.5, size=n_reps)
K = 100 # top 100 reps
# Efficient: O(n) to get top-K indices, O(K log K) to sort only those
top_k_idx = np.argpartition(revenues, -K)[-K:]
top_k_sorted = top_k_idx[np.argsort(revenues[top_k_idx])[::-1]]
print("Top 10 reps:")
for rank, idx in enumerate(top_k_sorted[:10], 1):
print(f" Rank {rank:3d}: Rep #{rep_ids[idx]:5d} | Revenue: ${revenues[idx]:>12,.0f}")
# Percentile ranks using searchsorted
sorted_rev = np.sort(revenues)
sample_rev = np.array([50_000, 100_000, 500_000])
pct_ranks = np.searchsorted(sorted_rev, sample_rev) / n_reps * 100
for rev, pct in zip(sample_rev, pct_ranks):
print(f" ${rev:>8,} is at the {pct:.1f}th percentile")import numpy as np
def top_k_products(sales, k):
# Sum across days (axis=1), then use argpartition
totals = sales.sum(axis=1)
idx = np.argpartition(totals, -k)[-k:]
return idx[np.argsort(totals[idx])[::-1]]
def bottom_k_days(sales, k):
# Mean across products (axis=0), then argpartition
daily_avg = sales.mean(axis=0)
idx = np.argpartition(daily_avg, k)[:k]
return idx[np.argsort(daily_avg[idx])]
np.random.seed(42)
sales = np.random.poisson(100, (50, 30)) # 50 products, 30 days
top3 = top_k_products(sales, 3)
bot3 = bottom_k_days(sales, 3)
print("Top 3 products (by total):", top3, "->", sales[top3].sum(axis=1))
print("Worst 3 days (by avg): ", bot3, "->", sales[:, bot3].mean(axis=0).round(1))
NumPy provides unique(), union1d(), intersect1d(), setdiff1d(), and in1d()/isin() for fast set-like operations on 1D arrays using sorted-array algorithms.
import numpy as np
arr = np.array([3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5])
# unique: sorted unique values
u = np.unique(arr)
print("unique:", u)
# return_counts: how many times each value appears
vals, counts = np.unique(arr, return_counts=True)
print("value counts:")
for v, c in zip(vals, counts):
print(f" {v}: {c}")
# return_index: first occurrence index
vals, idx = np.unique(arr, return_index=True)
print("first occurrence indices:", idx)
# return_inverse: reconstruct original from unique
vals, inv = np.unique(arr, return_inverse=True)
print("unique:", vals)
print("inverse:", inv)
print("reconstructed:", vals[inv]) # should match arr
# Most common value (mode-like)
mode_val = vals[np.argmax(counts)]
print(f"Mode: {mode_val} (appears {counts.max()} times)")import numpy as np
a = np.array([1, 2, 3, 4, 5])
b = np.array([3, 4, 5, 6, 7])
print("union (a | b): ", np.union1d(a, b))
print("intersect (a & b): ", np.intersect1d(a, b))
print("a - b (in a not b): ", np.setdiff1d(a, b))
print("b - a (in b not a): ", np.setdiff1d(b, a))
print("symmetric diff: ", np.setxor1d(a, b))
# With return_indices for intersect
common, a_idx, b_idx = np.intersect1d(a, b, return_indices=True)
print("Common:", common, "at a:", a_idx, "at b:", b_idx)
# Practical: find new customers
old_customers = np.array([101, 102, 103, 104, 105])
new_customers = np.array([103, 104, 106, 107, 108])
returning = np.intersect1d(old_customers, new_customers)
brand_new = np.setdiff1d(new_customers, old_customers)
churned = np.setdiff1d(old_customers, new_customers)
print(f"Returning: {returning}")
print(f"Brand new: {brand_new}")
print(f"Churned: {churned}")import numpy as np
products = np.array(['apple', 'banana', 'cherry', 'date', 'elderberry'])
blacklist = np.array(['banana', 'date', 'fig'])
# isin: element-wise membership test
mask = np.isin(products, blacklist)
print("Is in blacklist:", mask)
print("Allowed products:", products[~mask])
print("Blocked products:", products[mask])
# Large-scale membership test (much faster than Python loops)
np.random.seed(42)
user_ids = np.arange(1_000_000)
premium_users = np.random.choice(user_ids, size=50_000, replace=False)
# Check a sample of 1000 users
sample = np.random.choice(user_ids, size=1000, replace=False)
is_premium = np.isin(sample, premium_users)
print(f"Sample size: {len(sample)}, Premium users in sample: {is_premium.sum()}")
# in1d is equivalent (older API)
same_result = np.in1d(sample, premium_users)
print("isin == in1d:", np.all(is_premium == same_result))
# Invert: users NOT in premium
non_premium = sample[~is_premium]
print(f"Non-premium in sample: {len(non_premium)}")import numpy as np
np.random.seed(42)
manifest = np.random.choice(np.arange(200_000), size=100_000, replace=False)
# Simulate scanning: miss 500, add 300 unexpected
missed_idx = np.random.choice(len(manifest), size=500, replace=False)
extra_skus = np.random.randint(200_000, 201_000, size=300)
scanned = np.concatenate([np.delete(manifest, missed_idx), extra_skus])
manifest_u = np.unique(manifest)
scanned_u = np.unique(scanned)
missing = np.setdiff1d(manifest_u, scanned_u)
unexpected = np.setdiff1d(scanned_u, manifest_u)
confirmed = np.intersect1d(manifest_u, scanned_u)
# Duplicates in scan
vals, counts = np.unique(scanned, return_counts=True)
duplicates = vals[counts > 1]
print(f"Manifest: {len(manifest_u):,} SKUs")
print(f"Scanned: {len(scanned_u):,} unique SKUs")
print(f"Confirmed: {len(confirmed):,} ({len(confirmed)/len(manifest_u):.1%})")
print(f"Missing: {len(missing):,}")
print(f"Unexpected: {len(unexpected):,}")
print(f"Duplicates: {len(duplicates):,}")
print(f"Accuracy: {len(confirmed)/len(manifest_u):.2%}")import numpy as np
def cohort_analysis(cohorts):
results = []
seen_so_far = np.array([], dtype=int)
prev_week = np.array([], dtype=int)
for week, users in sorted(cohorts.items()):
users = np.unique(users)
new_users = np.setdiff1d(users, seen_so_far)
returning = np.intersect1d(users, seen_so_far)
churned = np.setdiff1d(prev_week, users)
results.append({
"week": week, "total": len(users),
"new": len(new_users), "returning": len(returning),
"churned_from_prev": len(churned),
})
seen_so_far = np.union1d(seen_so_far, users)
prev_week = users
return results
np.random.seed(42)
cohorts = {
"W1": np.random.randint(0, 100, 50),
"W2": np.random.randint(20, 120, 60),
"W3": np.random.randint(40, 140, 55),
"W4": np.random.randint(60, 160, 70),
}
for r in cohort_analysis(cohorts):
print(r)
Floating-point arithmetic has precision limits, NaN propagation, and overflow. NumPy provides tools to detect, mask, and safely handle these edge cases.
import numpy as np
data = np.array([1.0, 2.0, np.nan, 4.0, np.nan, 6.0])
# Detection
print("isnan:", np.isnan(data))
print("Count NaNs:", np.isnan(data).sum())
print("Any NaN:", np.any(np.isnan(data)))
# NaN-safe aggregations
print("mean with NaN:", np.mean(data)) # nan
print("nanmean:", np.nanmean(data)) # 3.25
print("nansum:", np.nansum(data)) # 13.0
print("nanstd:", np.nanstd(data).round(4))
print("nanmin/nanmax:", np.nanmin(data), np.nanmax(data))
# Replace NaN
filled_mean = np.where(np.isnan(data), np.nanmean(data), data)
print("NaN -> mean:", filled_mean)
filled_zero = np.nan_to_num(data, nan=0.0)
print("NaN -> 0: ", filled_zero)
# Forward fill (pandas-style, pure numpy)
def ffill(arr):
mask = np.isnan(arr)
idx = np.where(~mask, np.arange(len(arr)), 0)
np.maximum.accumulate(idx, out=idx)
return arr[idx]
print("Forward fill:", ffill(data))import numpy as np
# Infinity
x = np.array([1.0, -1.0, 0.0, np.inf, -np.inf, np.nan])
print("isinf:", np.isinf(x))
print("isfinite:", np.isfinite(x))
print("isnan:", np.isnan(x))
# All-in-one
print("Any problem:", ~np.isfinite(x).all())
# Replace inf with large values
clean = np.nan_to_num(x, nan=0.0, posinf=1e9, neginf=-1e9)
print("Cleaned:", clean)
# Overflow
float32 = np.float32(1e38)
print("float32 * 100:", float32 * 100) # inf
print("float32 dtype max:", np.finfo(np.float32).max)
# Machine epsilon
for dtype in [np.float16, np.float32, np.float64]:
fi = np.finfo(dtype)
print(f"{dtype.__name__:10s}: eps={fi.eps:.2e}, max={fi.max:.2e}")
# Division edge cases
print("0/0:", np.float64(0) / np.float64(0)) # nan
print("1/0:", np.float64(1) / np.float64(0)) # infimport numpy as np
# Floating-point is NOT exact
a = 0.1 + 0.2
print("0.1 + 0.2 == 0.3:", a == 0.3) # False!
print("value:", repr(a))
print("allclose:", np.isclose(a, 0.3)) # True (with tolerance)
# np.isclose with tolerances
x = np.array([1.0, 1.0 + 1e-8, 1.0 + 1e-5, 1.1])
print("isclose to 1.0:", np.isclose(x, 1.0, rtol=1e-5, atol=1e-8))
# Unstable: sum of large+small+large
big = np.float32(1e8)
tiny = np.float32(1.0)
print("float32 (1e8 + 1 - 1e8):", big + tiny - big) # may be 0!
# Kahan compensated summation is more stable
# np.sum uses pairwise summation which is better than naive
x = np.random.randn(1_000_000).astype(np.float32)
naive_sum = float(x[0])
for v in x[1:]:
naive_sum += float(v)
np_sum = float(np.sum(x, dtype=np.float64)) # promote to float64
print(f"Naive float32 sum: {naive_sum:.4f}")
print(f"NumPy float64 sum: {np_sum:.4f}")
# Log-sum-exp trick for numerical stability (important in ML)
logits = np.array([1000.0, 1001.0, 999.0]) # would overflow exp()
lse = logits.max() + np.log(np.sum(np.exp(logits - logits.max())))
print(f"Log-sum-exp (stable): {lse:.6f}")import numpy as np
np.random.seed(42)
n_days = 252
prices = 100 * np.exp(np.cumsum(np.random.randn(n_days) * 0.01))
# Inject data quality issues
prices[10] = np.nan # missing quote
prices[50] = 0.0 # trading halt
prices[100] = -5.0 # data error
def clean_prices(prices):
p = prices.copy()
# Remove physically impossible values
p[p <= 0] = np.nan
# Forward-fill NaN
mask = np.isnan(p)
idx = np.where(~mask, np.arange(len(p)), 0)
np.maximum.accumulate(idx, out=idx)
p = p[idx]
return p
def daily_returns(prices):
p = clean_prices(prices)
rets = np.diff(p) / p[:-1]
rets = rets[np.isfinite(rets)]
return rets
rets = daily_returns(prices)
print(f"Days cleaned: {np.isnan(prices).sum() + (prices <= 0).sum()}")
print(f"Valid returns: {len(rets)}")
print(f"Mean return: {np.nanmean(rets):.4%}")
print(f"Volatility: {np.nanstd(rets):.4%}")
print(f"Sharpe (approx): {np.nanmean(rets)/np.nanstd(rets)*np.sqrt(252):.2f}")import numpy as np
def naive_softmax(x):
e = np.exp(x)
return e / e.sum()
def stable_softmax(x):
# TODO: subtract max(x) before exp, then normalize
pass
def log_softmax(x):
# TODO: return log of stable softmax (more numerically stable version)
# hint: x - max(x) - log(sum(exp(x - max(x))))
pass
for x in [np.array([1.0, 2.0, 3.0]),
np.array([1000.0, 1001.0, 999.0]),
np.array([-1000.0, -999.0, -1001.0])]:
print(f"x = {x}")
print(f" naive: {naive_softmax(x)}")
print(f" stable: {stable_softmax(x)}")
print(f" log: {log_softmax(x)}")
print(f" exp(log) == stable: {np.allclose(np.exp(log_softmax(x)), stable_softmax(x))}")
NumPy's random module (Generator API) provides reproducible random sampling from dozens of distributions. Essential for simulations, bootstrapping, and synthetic data.
import numpy as np
# Modern API: use default_rng (preferred over np.random.seed)
rng = np.random.default_rng(seed=42)
# Uniform
u = rng.uniform(low=0, high=10, size=5)
print("uniform:", u.round(2))
# Integers
dice = rng.integers(1, 7, size=10)
print("dice rolls:", dice)
# Shuffle and choice
items = np.arange(10)
rng.shuffle(items)
print("shuffled:", items)
sample = rng.choice(items, size=5, replace=False)
print("sample without replacement:", sample)
# Weighted choice
weights = np.array([0.5, 0.3, 0.2])
outcomes = np.array(['A', 'B', 'C'])
draws = rng.choice(outcomes, size=20, p=weights)
vals, counts = np.unique(draws, return_counts=True)
print("Weighted draws:", dict(zip(vals, counts)))
# Verify reproducibility
rng2 = np.random.default_rng(seed=42)
print("Same sequence:", np.all(rng2.uniform(size=5) == np.random.default_rng(42).uniform(size=5)))import numpy as np
rng = np.random.default_rng(42)
# Normal distribution
heights = rng.normal(loc=170, scale=10, size=10_000)
print(f"Normal(170, 10): mean={heights.mean():.2f}, std={heights.std():.2f}")
# Lognormal (for prices, incomes)
prices = rng.lognormal(mean=3.0, sigma=0.5, size=5000)
print(f"Lognormal: median={np.median(prices):.2f}, mean={prices.mean():.2f}")
# Exponential (wait times, inter-arrivals)
wait_times = rng.exponential(scale=5.0, size=1000) # mean = 5 min
print(f"Exponential(lambda=0.2): mean={wait_times.mean():.2f}")
# Poisson (event counts)
requests_per_sec = rng.poisson(lam=30, size=60) # 30 req/s for 1 min
print(f"Poisson(30): mean={requests_per_sec.mean():.1f}, std={requests_per_sec.std():.1f}")
# Binomial (successes in n trials)
successes = rng.binomial(n=100, p=0.35, size=1000) # conversion rate
print(f"Binomial(100, 0.35): mean={successes.mean():.1f} (expected 35)")
# Beta (probabilities, proportions)
click_rates = rng.beta(a=2, b=5, size=1000)
print(f"Beta(2, 5): mean={click_rates.mean():.3f} (expected {2/(2+5):.3f})")import numpy as np
rng = np.random.default_rng(42)
# Bootstrap confidence interval
data = np.array([2.3, 4.1, 3.8, 5.2, 2.9, 4.7, 3.5, 6.1, 2.8, 4.4])
n_boot = 10_000
boot_means = np.array([
rng.choice(data, size=len(data), replace=True).mean()
for _ in range(n_boot)
])
ci_low, ci_high = np.percentile(boot_means, [2.5, 97.5])
print(f"Sample mean: {data.mean():.3f}")
print(f"95% bootstrap CI: [{ci_low:.3f}, {ci_high:.3f}]")
# Monte Carlo Pi estimation
N = 1_000_000
x = rng.uniform(-1, 1, N)
y = rng.uniform(-1, 1, N)
inside = (x**2 + y**2) <= 1.0
pi_est = 4 * inside.mean()
print(f"Pi estimate (N={N:,}): {pi_est:.5f} (true: {np.pi:.5f})")
# Simulate portfolio returns (Monte Carlo)
n_assets, n_sims, n_days = 5, 10_000, 252
mu = rng.uniform(0.0001, 0.001, n_assets) # daily expected return
sigma = rng.uniform(0.01, 0.03, n_assets) # daily vol
daily_rets = rng.normal(mu, sigma, (n_sims, n_days, n_assets))
port_rets = daily_rets.mean(axis=2).sum(axis=1) # equal weight
var_95 = np.percentile(port_rets, 5)
print(f"Portfolio annual return mean: {port_rets.mean():.2%}")
print(f"VaR 95%: {var_95:.2%}")import numpy as np
rng = np.random.default_rng(42)
# Observed data
n_control = 5000
n_treatment = 5000
conv_control = 0.10 # 10% baseline
conv_treat = 0.12 # 12% treatment (absolute +2pp)
# Sample from observed proportions
control = rng.binomial(1, conv_control, n_control)
treatment = rng.binomial(1, conv_treat, n_treatment)
obs_diff = treatment.mean() - control.mean()
print(f"Observed difference: {obs_diff:.4f} ({obs_diff:.2%})")
# Permutation test
N_PERM = 10_000
combined = np.concatenate([control, treatment])
perm_diffs = np.empty(N_PERM)
for i in range(N_PERM):
rng.shuffle(combined)
perm_diffs[i] = combined[:n_treatment].mean() - combined[n_treatment:].mean()
p_value = (np.abs(perm_diffs) >= np.abs(obs_diff)).mean()
print(f"Permutation test p-value: {p_value:.4f}")
print(f"Significant at alpha=0.05: {p_value < 0.05}")
ci = np.percentile(perm_diffs, [2.5, 97.5])
print(f"Null distribution 95% CI: [{ci[0]:.4f}, {ci[1]:.4f}]")import numpy as np
def fit_normal(data, n_samples=1000, seed=42):
rng = np.random.default_rng(seed)
mu = np.mean(data)
std = np.std(data, ddof=1)
# Generate from fitted distribution
synthetic = rng.normal(mu, std, n_samples)
# Empirical CDF comparison
emp = np.sort(data)
emp_cdf = np.arange(1, len(emp)+1) / len(emp)
# Normal CDF at emp points: use error function approximation
from math import erf
norm_cdf = np.array([0.5 * (1 + erf((x-mu)/(std*2**0.5))) for x in emp])
ks_stat = np.max(np.abs(emp_cdf - norm_cdf))
return {"mu": mu, "std": std, "ks": ks_stat, "synthetic": synthetic}
def simulate_gbm(S0, mu, sigma, T, n_steps, seed=42):
rng = np.random.default_rng(seed)
dt = T / n_steps
Z = rng.standard_normal(n_steps)
# TODO: compute log returns and cumulative prices
pass
rng = np.random.default_rng(42)
data = rng.normal(50, 10, 200)
result = fit_normal(data)
print(f"Fitted: mu={result['mu']:.2f}, std={result['std']:.2f}, KS={result['ks']:.4f}")
Images are 2D (grayscale) or 3D (H x W x C) NumPy arrays. Understanding array operations on images builds intuition for convolutions, pooling, and data augmentation.
import numpy as np
# Images are (H, W) for grayscale, (H, W, C) for color
np.random.seed(42)
H, W = 64, 64
# Synthetic grayscale (0-255 uint8)
gray = np.random.randint(0, 256, (H, W), dtype=np.uint8)
print(f"Grayscale shape: {gray.shape}, dtype: {gray.dtype}")
print(f"Pixel range: [{gray.min()}, {gray.max()}]")
# Synthetic RGB image
rgb = np.random.randint(0, 256, (H, W, 3), dtype=np.uint8)
print(f"RGB shape: {rgb.shape}")
# Extract channels
R, G, B = rgb[:, :, 0], rgb[:, :, 1], rgb[:, :, 2]
print(f"R channel mean: {R.mean():.1f}")
# RGB to grayscale (luminosity formula)
gray_from_rgb = (0.2989 * R + 0.5870 * G + 0.1140 * B).astype(np.uint8)
print(f"Converted gray shape: {gray_from_rgb.shape}")
# Normalize to [0, 1] float
img_float = rgb.astype(np.float32) / 255.0
print(f"Normalized range: [{img_float.min():.3f}, {img_float.max():.3f}]")
# Crop a region
crop = rgb[10:40, 15:50, :]
print(f"Crop shape: {crop.shape}")
# Horizontal flip
flipped = rgb[:, ::-1, :]
print(f"Flipped shape: {flipped.shape}")import numpy as np
def convolve2d(img, kernel):
H, W = img.shape
kH, kW = kernel.shape
pad_h, pad_w = kH // 2, kW // 2
# Zero-pad
padded = np.pad(img, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')
output = np.zeros_like(img, dtype=float)
for i in range(H):
for j in range(W):
output[i, j] = (padded[i:i+kH, j:j+kW] * kernel).sum()
return output
# Faster with stride_tricks (no Python loop)
def convolve2d_fast(img, kernel):
from numpy.lib.stride_tricks import sliding_window_view
H, W = img.shape
kH, kW = kernel.shape
pad_h, pad_w = kH // 2, kW // 2
padded = np.pad(img, ((pad_h, pad_h), (pad_w, pad_w)), mode='constant')
windows = sliding_window_view(padded, (kH, kW))
return (windows * kernel).sum(axis=(-2, -1))
img = np.random.rand(32, 32)
# Edge detection kernel (Sobel-like)
sobel_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=float)
edges = convolve2d_fast(img, sobel_x)
print(f"Edge response range: [{edges.min():.3f}, {edges.max():.3f}]")
# Max pooling (2x2)
def max_pool2d(img, size=2):
H, W = img.shape
pH, pW = H // size, W // size
return img[:pH*size, :pW*size].reshape(pH, size, pW, size).max(axis=(1, 3))
pooled = max_pool2d(img, 2)
print(f"After 2x2 max pool: {img.shape} -> {pooled.shape}")import numpy as np
rng = np.random.default_rng(42)
def augment_batch(images, rng):
# Augment a batch of images (N, H, W, C).
batch = images.copy().astype(np.float32)
# Random horizontal flip (50% chance per image)
flip_mask = rng.random(len(batch)) > 0.5
batch[flip_mask] = batch[flip_mask, :, ::-1, :]
# Random brightness adjustment
brightness = rng.uniform(0.8, 1.2, (len(batch), 1, 1, 1))
batch = np.clip(batch * brightness, 0, 255)
# Random crop and resize (simplified: just crop to 80% size)
H, W = batch.shape[1:3]
ch, cw = int(H * 0.8), int(W * 0.8)
for i in range(len(batch)):
y0 = rng.integers(0, H - ch)
x0 = rng.integers(0, W - cw)
# In real pipeline you would resize; here just show crop
batch[i, :ch, :cw, :] = batch[i, y0:y0+ch, x0:x0+cw, :]
# Gaussian noise
noise = rng.normal(0, 5, batch.shape)
batch = np.clip(batch + noise, 0, 255)
return batch.astype(np.uint8)
# Demo
N, H, W, C = 8, 64, 64, 3
batch = rng.integers(50, 200, (N, H, W, C), dtype=np.uint8)
augmented = augment_batch(batch, rng)
print(f"Batch: {batch.shape} | Augmented: {augmented.shape}")
print(f"Orig mean: {batch.mean():.1f} | Aug mean: {augmented.mean():.1f}")import numpy as np
rng = np.random.default_rng(42)
N, H, W, C = 32, 64, 64, 3
# Simulate a batch
batch = rng.integers(0, 256, (N, H, W, C), dtype=np.uint8)
# ImageNet-style normalization (per-channel mean/std)
MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
STD = np.array([0.229, 0.224, 0.225], dtype=np.float32)
def preprocess(batch):
x = batch.astype(np.float32) / 255.0 # [0,1]
x = (x - MEAN) / STD # normalize
x = x.transpose(0, 3, 1, 2) # NHWC -> NCHW for PyTorch
return x
# Random horizontal flip augmentation
def random_hflip(batch, rng, p=0.5):
mask = rng.random(len(batch)) < p
out = batch.copy()
out[mask] = batch[mask, :, ::-1, :]
return out
processed = preprocess(random_hflip(batch, rng))
print(f"Input: {batch.shape} uint8")
print(f"Output: {processed.shape} float32")
print(f"Channel 0: mean={processed[:,0,:,:].mean():.3f}, std={processed[:,0,:,:].std():.3f}")import numpy as np
def image_stats(img):
# img is (H, W, 3) uint8
stats = {}
for c, name in enumerate(['R', 'G', 'B']):
ch = img[:, :, c].astype(float)
stats[name] = {"mean": ch.mean(), "std": ch.std(),
"min": int(ch.min()), "max": int(ch.max())}
# Near white: all channels > 200
near_white = (img > 200).all(axis=2)
# Near black: all channels < 55
near_black = (img < 55).all(axis=2)
stats["near_white_pct"] = near_white.mean() * 100
stats["near_black_pct"] = near_black.mean() * 100
return stats
def normalize_contrast(img):
# TODO: for each channel, map min->0, max->255
out = np.empty_like(img, dtype=np.uint8)
for c in range(3):
ch = img[:, :, c].astype(float)
lo, hi = ch.min(), ch.max()
if hi > lo:
out[:, :, c] = ((ch - lo) / (hi - lo) * 255).astype(np.uint8)
else:
out[:, :, c] = 0
return out
rng = np.random.default_rng(42)
img = rng.integers(0, 256, (128, 128, 3), dtype=np.uint8)
stats = image_stats(img)
for k, v in stats.items():
print(f"{k}: {v}")
normed = normalize_contrast(img)
print("Normalized range:", normed.min(), normed.max())
NumPy provides percentiles, histograms, correlation, and covariance for descriptive statistics. These underpin nearly all data analysis workflows.
import numpy as np
rng = np.random.default_rng(42)
data = rng.lognormal(mean=4.5, sigma=0.8, size=10_000)
# Percentiles and quantiles
p25, p50, p75 = np.percentile(data, [25, 50, 75])
print(f"Q1={p25:.1f}, Median={p50:.1f}, Q3={p75:.1f}")
print(f"IQR={p75-p25:.1f}")
# Quantile (equivalent but takes fractions 0-1)
q = np.quantile(data, [0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0])
labels = ["0%", "25%", "50%", "75%", "90%", "95%", "99%", "100%"]
for l, v in zip(labels, q):
print(f" {l:>4s}: {v:>8.1f}")
# Outlier detection via IQR
iqr_mult = 1.5
lower = p25 - iqr_mult * (p75 - p25)
upper = p75 + iqr_mult * (p75 - p25)
outliers = data[(data < lower) | (data > upper)]
print(f"Outliers: {len(outliers)} ({len(outliers)/len(data):.1%})")
# 2D: percentile per column
m = rng.normal(0, 1, (100, 5))
col_medians = np.median(m, axis=0)
print("Col medians:", col_medians.round(2))import numpy as np
rng = np.random.default_rng(42)
data = np.concatenate([rng.normal(30, 5, 3000), # young cohort
rng.normal(55, 8, 2000)]) # senior cohort
# Basic histogram
counts, edges = np.histogram(data, bins=20)
centers = (edges[:-1] + edges[1:]) / 2
print("Histogram (first 5 bins):")
for c, n in zip(centers[:5], counts[:5]):
print(f" {c:.1f}: {'#' * (n//50)} ({n})")
# Normalized (density)
density, _ = np.histogram(data, bins=20, density=True)
print(f"Density sums to: {density.sum() * np.diff(edges).mean():.4f}") # ~1.0
# 2D histogram
x = rng.normal(0, 1, 5000)
y = 0.7 * x + rng.normal(0, 0.5, 5000) # correlated
counts_2d, xedges, yedges = np.histogram2d(x, y, bins=10)
print(f"2D histogram shape: {counts_2d.shape}")
print(f"Peak bin count: {counts_2d.max():.0f}")
# Digitize: assign each value to a bin
salary_bins = np.array([0, 30_000, 60_000, 100_000, 200_000, np.inf])
labels = ['entry', 'junior', 'mid', 'senior', 'exec']
salaries = np.array([25000, 45000, 75000, 120000, 350000, 58000])
bin_idx = np.digitize(salaries, salary_bins) - 1
print("Salary bands:", [labels[min(i, len(labels)-1)] for i in bin_idx])import numpy as np
rng = np.random.default_rng(42)
n = 200
# Generate correlated variables
x1 = rng.normal(0, 1, n)
x2 = 0.8 * x1 + rng.normal(0, 0.6, n) # strong positive correlation
x3 = rng.normal(0, 1, n) # uncorrelated
data = np.column_stack([x1, x2, x3])
# Pearson correlation matrix
corr = np.corrcoef(data.T) # data.T because corrcoef expects (n_vars, n_obs)
print("Correlation matrix:")
for row in corr:
print(" ", " ".join(f"{v:+.3f}" for v in row))
# Covariance matrix
cov = np.cov(data.T)
print("Covariance matrix diagonal (variances):", np.diag(cov).round(3))
# Manual Pearson r for two variables
def pearson_r(a, b):
a_c, b_c = a - a.mean(), b - b.mean()
return (a_c * b_c).sum() / (np.sqrt((a_c**2).sum()) * np.sqrt((b_c**2).sum()))
r12 = pearson_r(x1, x2)
r13 = pearson_r(x1, x3)
print(f"Pearson r(x1,x2)={r12:.3f}, r(x1,x3)={r13:.3f}")
# Rolling correlation
window = 30
roll_corr = np.array([
pearson_r(x1[i:i+window], x2[i:i+window])
for i in range(n - window)
])
print(f"Rolling({window}) correlation: mean={roll_corr.mean():.3f}")import numpy as np
rng = np.random.default_rng(42)
n_assets = 10
n_days = 1260 # 5 trading years
# Simulate correlated returns
true_corr = 0.3 * np.ones((n_assets, n_assets)) + 0.7 * np.eye(n_assets)
L = np.linalg.cholesky(true_corr)
Z = rng.standard_normal((n_days, n_assets))
returns = (Z @ L.T) * 0.01 # ~1% daily vol
# Equal-weight portfolio returns
weights = np.ones(n_assets) / n_assets
port_ret = returns @ weights
# Risk metrics
var_99 = np.percentile(port_ret, 1)
es_99 = port_ret[port_ret <= var_99].mean()
ann_vol = port_ret.std() * np.sqrt(252)
print(f"Portfolio daily returns: mean={port_ret.mean():.4%}, vol={port_ret.std():.4%}")
print(f"Annualized vol: {ann_vol:.2%}")
print(f"1-day VaR (99%): {var_99:.4%}")
print(f"Expected Shortfall (CVaR 99%): {es_99:.4%}")
# Correlation matrix
corr = np.corrcoef(returns.T)
print(f"Avg pairwise correlation: {corr[np.triu_indices(n_assets, k=1)].mean():.3f}")import numpy as np
def weighted_mean(data, weights):
data, weights = np.asarray(data, float), np.asarray(weights, float)
# TODO: return sum(data * weights) / sum(weights)
pass
def weighted_std(data, weights, ddof=0):
mean = weighted_mean(data, weights)
# TODO: compute weighted variance then sqrt
# variance = sum(w * (x - mean)^2) / sum(w) for ddof=0
pass
def weighted_percentile(data, weights, q):
# Sort data by value, then compute cumulative weight fraction
data, weights = np.asarray(data, float), np.asarray(weights, float)
idx = np.argsort(data)
sdata = data[idx]
sweights = weights[idx]
cumw = np.cumsum(sweights) / sweights.sum()
# TODO: interpolate to find value at quantile q (0-100)
pass
data = np.array([1, 2, 3, 4, 5], float)
weights = np.array([5, 1, 1, 1, 1], float)
print("Weighted mean:", weighted_mean(data, weights)) # should be ~1.5
print("Weighted std: ", weighted_std(data, weights))
print("Weighted 50th:", weighted_percentile(data, weights, 50))
NumPy supports binary (.npy, .npz), text (savetxt/genfromtxt), and memory-mapped file formats for efficient large array storage and loading.
import numpy as np
import tempfile, pathlib
tmp = pathlib.Path(tempfile.mkdtemp())
# Save/load single array (.npy)
arr = np.arange(12).reshape(3, 4)
np.save(tmp / 'array.npy', arr)
loaded = np.load(tmp / 'array.npy')
print("Loaded:", loaded)
print("Same data:", np.array_equal(arr, loaded))
print("File size:", (tmp / 'array.npy').stat().st_size, "bytes")
# Save multiple arrays in one file (.npz = numpy zip)
x = np.linspace(0, 10, 100)
y = np.sin(x)
dy = np.cos(x)
np.savez(tmp / 'signals.npz', x=x, y=y, dy=dy)
data = np.load(tmp / 'signals.npz')
print("Keys in npz:", list(data.keys()))
print("x[0:3]:", data['x'][:3].round(3))
# savez_compressed: gzip compression
np.savez_compressed(tmp / 'signals_compressed.npz', **dict(data))
orig_size = (tmp / 'signals.npz').stat().st_size
comp_size = (tmp / 'signals_compressed.npz').stat().st_size
print(f"Original: {orig_size} bytes, Compressed: {comp_size} bytes ({comp_size/orig_size:.0%})")
import shutil; shutil.rmtree(tmp)import numpy as np
import tempfile, pathlib
tmp = pathlib.Path(tempfile.mkdtemp())
# savetxt: write to CSV/TSV
data = np.array([[1, 2.5, 3.7], [4, 5.1, 6.8], [7, 8.3, 9.0]])
np.savetxt(tmp / 'data.csv', data, delimiter=',', fmt='%.2f',
header='col1,col2,col3', comments='')
# Read it back
loaded = np.genfromtxt(tmp / 'data.csv', delimiter=',', skip_header=1)
print("Loaded from CSV:\n", loaded)
# genfromtxt with mixed data and missing values
csv_content = '''id,temp,humidity,status
1,22.5,65.0,OK
2,nan,72.0,OK
3,19.8,,WARN
4,25.1,80.5,OK'''
(tmp / 'sensors.csv').write_text(csv_content)
sensors = np.genfromtxt(
tmp / 'sensors.csv',
delimiter=',',
names=True, # use header row as field names
dtype=None, # auto-detect dtypes
encoding='utf-8',
filling_values=np.nan # fill missing with nan
)
print("dtype:", sensors.dtype)
print("temps:", sensors['temp'])
print("humidity:", sensors['humidity'])
import shutil; shutil.rmtree(tmp)import numpy as np
import tempfile, pathlib
tmp = pathlib.Path(tempfile.mkdtemp())
fpath = tmp / 'large_array.dat'
# Create a memory-mapped array (simulating large data)
shape = (1000, 500)
# 'w+' = write+read, 'r+' = read+write existing, 'r' = read-only
mm_write = np.memmap(fpath, dtype='float32', mode='w+', shape=shape)
# Fill with data (only the modified pages are loaded into RAM)
mm_write[:100, :] = np.random.randn(100, 500).astype('float32')
mm_write.flush() # ensure data is written to disk
del mm_write
# Read back without loading everything into memory
mm_read = np.memmap(fpath, dtype='float32', mode='r', shape=shape)
print(f"Array shape: {mm_read.shape}, dtype: {mm_read.dtype}")
print(f"First row (first 5): {mm_read[0, :5]}")
print(f"File size: {fpath.stat().st_size:,} bytes")
print(f"Expected: {shape[0]*shape[1]*4:,} bytes (float32 = 4 bytes)")
# Slice a subarray (still memory-mapped, not loaded)
sub = mm_read[:50, :100]
print(f"Subarray shape: {sub.shape}, mean={sub.mean():.4f}")
del mm_read
import shutil; shutil.rmtree(tmp)import numpy as np, tempfile, pathlib, shutil
tmp = pathlib.Path(tempfile.mkdtemp())
# Simulate model weights for a small neural network
rng = np.random.default_rng(42)
weights = {
"W1": rng.standard_normal((784, 256)).astype(np.float32),
"b1": np.zeros(256, dtype=np.float32),
"W2": rng.standard_normal((256, 128)).astype(np.float32),
"b2": np.zeros(128, dtype=np.float32),
"W3": rng.standard_normal((128, 10)).astype(np.float32),
"b3": np.zeros(10, dtype=np.float32),
}
history = {
"train_loss": rng.uniform(0.5, 2.0, 50).cumsum() ** 0 * np.linspace(2, 0.2, 50),
"val_loss": rng.uniform(0.5, 2.0, 50).cumsum() ** 0 * np.linspace(2.1, 0.25, 50),
"epoch": np.arange(50),
}
def save_checkpoint(path, weights, history, epoch):
np.savez_compressed(path, epoch=epoch, **weights, **history)
return path.stat().st_size
def load_checkpoint(path):
data = np.load(path)
epoch = int(data['epoch'])
W = {k: data[k] for k in ['W1','b1','W2','b2','W3','b3']}
H = {k: data[k] for k in ['train_loss','val_loss','epoch']}
return epoch, W, H
ckpt_path = tmp / 'checkpoint_ep50.npz'
size = save_checkpoint(ckpt_path, weights, history, epoch=50)
print(f"Checkpoint saved: {size:,} bytes ({size/1024:.1f} KB)")
ep, W, H = load_checkpoint(ckpt_path)
print(f"Loaded epoch {ep}, W1 shape: {W['W1'].shape}")
print(f"Final train loss: {H['train_loss'][-1]:.4f}")
shutil.rmtree(tmp)import numpy as np, pathlib
class DataCache:
def __init__(self, cache_dir='./cache'):
self.dir = pathlib.Path(cache_dir)
self.dir.mkdir(parents=True, exist_ok=True)
def _path(self, key):
return self.dir / f"{key}.npz"
def exists(self, key):
return self._path(key).exists()
def save(self, key, **arrays):
np.savez_compressed(self._path(key), **arrays)
def load(self, key):
if not self.exists(key):
raise FileNotFoundError(f"Cache key {key!r} not found")
return dict(np.load(self._path(key)))
def delete(self, key):
p = self._path(key)
if p.exists(): p.unlink()
import tempfile, shutil
tmp = tempfile.mkdtemp()
cache = DataCache(tmp)
rng = np.random.default_rng(42)
X = rng.standard_normal((1000, 20))
y = rng.integers(0, 5, 1000)
cache.save('features', X=X, y=y)
print("Exists:", cache.exists('features'))
data = cache.load('features')
print("Loaded X:", data['X'].shape, "y:", data['y'].shape)
print("Data matches:", np.allclose(X, data['X']))
cache.delete('features')
print("After delete:", cache.exists('features'))
shutil.rmtree(tmp)
NumPy's datetime64 and timedelta64 types enable vectorized date/time arithmetic without Python loops. Essential for time series alignment and resampling.
import numpy as np
# Create datetime64 scalars and arrays
d1 = np.datetime64('2024-01-15')
d2 = np.datetime64('2024-03-20')
print("d1:", d1, "d2:", d2)
# Arithmetic
delta = d2 - d1
print(f"Delta: {delta}") # timedelta64
print(f"Days between: {delta.astype('timedelta64[D]').astype(int)}")
# Add days
d3 = d1 + np.timedelta64(30, 'D')
print(f"30 days after d1: {d3}")
# Date range
dates = np.arange('2024-01', '2025-01', dtype='datetime64[M]') # monthly
print("Monthly dates:", dates)
print("Count:", len(dates))
# Different units
dt_ns = np.datetime64('2024-01-15T09:30:00.000000000', 'ns')
dt_s = np.datetime64('2024-01-15T09:30:00', 's')
print("Nanosecond precision:", dt_ns)
print("Second precision:", dt_s)
# Year, month, day extraction via astype
day_arr = np.arange('2024-01-01', '2024-02-01', dtype='datetime64[D]')
months = day_arr.astype('datetime64[M]').astype(int) % 12 + 1
dom = (day_arr - day_arr.astype('datetime64[M]')).astype(int) + 1
print("Days of month:", dom[:7])import numpy as np
# busdaycalendar: define which days are business days
# Default: Mon-Fri, no holidays
bdc = np.busdaycalendar()
# busday_count: count business days between dates
start = np.datetime64('2024-01-01')
end = np.datetime64('2024-12-31')
n_bdays = np.busday_count(start, end)
print(f"Business days in 2024: {n_bdays}")
# is_busday: check if date is a business day
dates = np.arange('2024-01-01', '2024-01-08', dtype='datetime64[D]')
for d in dates:
bd = np.is_busday(d)
print(f" {d} ({['Mon','Tue','Wed','Thu','Fri','Sat','Sun'][(d.astype('datetime64[D]').astype(int))%7]}): {'Business' if bd else 'Weekend'}")
# offset_busdays: advance by N business days
t_plus_5bd = np.busday_offset('2024-01-15', 5, roll='forward')
print(f"5 business days after Jan 15: {t_plus_5bd}")
# Custom calendar with holidays
holidays = np.array(['2024-01-01', '2024-12-25'], dtype='datetime64[D]')
bdc_h = np.busdaycalendar(holidays=holidays)
# Settlement: T+2 business days (like equity markets)
trade_dates = np.array(['2024-01-03', '2024-12-24', '2024-12-26'], dtype='datetime64')
settlement = np.busday_offset(trade_dates, 2, busdaycal=bdc_h)
for t, s in zip(trade_dates, settlement):
print(f" Trade {t} -> Settle {s}")import numpy as np
rng = np.random.default_rng(42)
# Generate irregular timestamps (like real tick data)
n = 200
base = np.datetime64('2024-01-15T09:30:00', 's')
gaps = rng.integers(1, 60, n).astype('timedelta64[s]') # 1-60 sec gaps
times = base + np.cumsum(gaps)
prices = 100 + np.cumsum(rng.normal(0, 0.05, n))
print(f"First timestamp: {times[0]}")
print(f"Last timestamp: {times[-1]}")
print(f"Total duration: {(times[-1] - times[0]).astype(int)} seconds")
# Bin into 1-minute buckets (OHLCV)
minute_start = times.astype('datetime64[m]')
unique_mins = np.unique(minute_start)
print(f"Number of 1-minute bars: {len(unique_mins)}")
# OHLCV for each minute
for m in unique_mins[:3]:
mask = (minute_start == m)
bar_prices = prices[mask]
print(f" {m}: O={bar_prices[0]:.2f} H={bar_prices.max():.2f} "
f"L={bar_prices.min():.2f} C={bar_prices[-1]:.2f} V={mask.sum()}")
# Time-based filtering
morning = (times >= np.datetime64('2024-01-15T09:30:00', 's')) & \
(times < np.datetime64('2024-01-15T10:00:00', 's'))
print(f"Ticks in first 30 min: {morning.sum()}")import numpy as np
rng = np.random.default_rng(42)
# Simulate a week of irregular tick data
days = np.arange('2024-01-08', '2024-01-13', dtype='datetime64[D]') # Mon-Fri
all_times, all_prices = [], []
price = 100.0
for day in days:
open_ns = day.astype('datetime64[s]') + np.timedelta64(9*3600+30*60, 's')
close_ns = day.astype('datetime64[s]') + np.timedelta64(16*3600, 's')
n_ticks = rng.integers(50, 150)
t_offsets = rng.integers(0, int((close_ns - open_ns).astype(int)), n_ticks)
t_offsets.sort()
tick_times = open_ns + t_offsets.astype('timedelta64[s]')
rets = rng.normal(0, 0.01, n_ticks)
tick_prices = price * np.exp(np.cumsum(rets))
price = tick_prices[-1]
all_times.extend(tick_times.tolist())
all_prices.extend(tick_prices.tolist())
times = np.array(all_times, dtype='datetime64[s]')
prices = np.array(all_prices)
# OHLCV per day
print("Daily OHLCV:")
day_buckets = times.astype('datetime64[D]')
for day in np.unique(day_buckets):
m = (day_buckets == day)
p = prices[m]
print(f" {day}: O={p[0]:.2f} H={p.max():.2f} L={p.min():.2f} C={p[-1]:.2f} V={m.sum()}")import numpy as np
def get_trading_days(start, end, holidays=None):
start = np.datetime64(start, 'D')
end = np.datetime64(end, 'D')
hols = np.array(holidays or [], dtype='datetime64[D]')
bdc = np.busdaycalendar(holidays=hols)
# Generate all calendar days, filter to business days
all_days = np.arange(start, end, dtype='datetime64[D]')
return all_days[np.is_busday(all_days, busdaycal=bdc)]
def next_trading_day(date, holidays=None):
date = np.datetime64(date, 'D')
hols = np.array(holidays or [], dtype='datetime64[D]')
bdc = np.busdaycalendar(holidays=hols)
return np.busday_offset(date, 0, roll='forward', busdaycal=bdc)
us_holidays_2024 = [
'2024-01-01', '2024-01-15', '2024-02-19',
'2024-05-27', '2024-07-04', '2024-09-02',
'2024-11-28', '2024-12-25'
]
trading_days = get_trading_days('2024-01-01', '2024-04-01', us_holidays_2024)
print(f"Trading days in Q1 2024: {len(trading_days)}")
print("First 5:", trading_days[:5])
print("Last 3: ", trading_days[-3:])
print("Next trading day after 2024-12-24:", next_trading_day('2024-12-24', us_holidays_2024))
np.meshgrid, np.ogrid, and np.mgrid create coordinate grids for evaluating functions over 2D/3D spaces β essential for plotting, distance fields, and convolutions.
import numpy as np
# meshgrid creates coordinate matrices
x = np.linspace(-2, 2, 5)
y = np.linspace(-1, 1, 4)
X, Y = np.meshgrid(x, y)
print("x:", x)
print("y:", y)
print("X shape:", X.shape, " Y shape:", Y.shape)
print("X:\n", X)
print("Y:\n", Y)
# Evaluate a 2D function over the grid
Z = np.sin(X) * np.exp(-Y**2)
print("Z shape:", Z.shape)
print("Z max:", Z.max().round(4))
# Find point closest to (1.0, 0.5)
target_x, target_y = 1.0, 0.5
dist = np.sqrt((X - target_x)**2 + (Y - target_y)**2)
iy, ix = np.unravel_index(dist.argmin(), dist.shape)
print(f"Closest grid point to ({target_x}, {target_y}): ({X[iy,ix]:.1f}, {Y[iy,ix]:.1f})")
# 'ij' indexing (matrix-style, transposed from default)
X_ij, Y_ij = np.meshgrid(x, y, indexing='ij')
print("With indexing='ij': X shape:", X_ij.shape)import numpy as np
# ogrid: open grid β returns 1D arrays that broadcast
oy, ox = np.ogrid[-3:3:7j, -3:3:7j] # 7 points from -3 to 3
print("ogrid ox shape:", ox.shape, " oy shape:", oy.shape) # (1,7) and (7,1)
# Broadcasting creates the 2D result without full grid in memory
Z = ox**2 + oy**2 # broadcasts to (7,7)
print("Z shape:", Z.shape)
print("Radial distance from origin:\n", np.sqrt(Z).round(2))
# mgrid: closed grid β returns full arrays
y_grid, x_grid = np.mgrid[0:4, 0:5] # like meshgrid but shorter syntax
print("mgrid shapes:", x_grid.shape, y_grid.shape)
print("x_grid:\n", x_grid)
# Useful: create coordinate pairs for all pixels
H, W = 8, 8
rows, cols = np.mgrid[0:H, 0:W]
center = np.array([H/2, W/2])
dist_from_center = np.sqrt((rows - center[0])**2 + (cols - center[1])**2)
circle_mask = dist_from_center <= 3
print(f"Pixels within radius 3 of center: {circle_mask.sum()}")
# All coordinate pairs as (N, 2) array
coords = np.column_stack([rows.ravel(), cols.ravel()])
print(f"All {H}x{W} pixel coordinates shape: {coords.shape}")import numpy as np
# Create a distance field from a set of source points
H, W = 20, 20
rows, cols = np.mgrid[0:H, 0:W]
source_points = np.array([[3, 3], [10, 15], [17, 5]])
# Distance to nearest source point
dist_field = np.full((H, W), np.inf)
for pt in source_points:
d = np.sqrt((rows - pt[0])**2 + (cols - pt[1])**2)
dist_field = np.minimum(dist_field, d)
print(f"Distance field shape: {dist_field.shape}")
print(f"Max distance: {dist_field.max():.2f}")
print(f"Pixels within 5 units of any source: {(dist_field <= 5).sum()}")
# Voronoi diagram: which source is closest to each pixel?
labels = np.zeros((H, W), dtype=int)
for k, pt in enumerate(source_points):
d = np.sqrt((rows - pt[0])**2 + (cols - pt[1])**2)
mask = d < dist_field # strictly closer than current min
labels[mask] = k
dist_field = np.minimum(dist_field, d)
for k in range(len(source_points)):
print(f"Region {k}: {(labels == k).sum()} pixels")
# Gaussian RBF kernel centered at each source
sigma = 3.0
rbf = sum(np.exp(-(((rows-pt[0])**2+(cols-pt[1])**2)/(2*sigma**2)))
for pt in source_points)
print(f"RBF max: {rbf.max():.3f}, sum: {rbf.sum():.1f}")import numpy as np
rng = np.random.default_rng(42)
# Simulate GPS points (lat/lon in some area)
n_points = 500
lat = rng.normal(40.7, 0.05, n_points) # NYC-like
lon = rng.normal(-74.0, 0.08, n_points)
# Create grid
lat_grid, lon_grid = np.mgrid[
lat.min()-0.02 : lat.max()+0.02 : 50j,
lon.min()-0.02 : lon.max()+0.02 : 60j
]
# Gaussian KDE
def kde_density(lat_g, lon_g, lat_pts, lon_pts, bw=0.01):
density = np.zeros_like(lat_g)
for la, lo in zip(lat_pts, lon_pts):
d2 = (lat_g - la)**2 + (lon_g - lo)**2
density += np.exp(-d2 / (2 * bw**2))
return density / (len(lat_pts) * 2 * np.pi * bw**2)
density = kde_density(lat_grid, lon_grid, lat, lon)
print(f"Grid shape: {density.shape}")
print(f"Density range: [{density.min():.4f}, {density.max():.4f}]")
print(f"Peak density at: lat={lat_grid.ravel()[density.argmax()]:.4f}, "
f"lon={lon_grid.ravel()[density.argmax()]:.4f}")
# Hot spots: top 10% density cells
threshold = np.percentile(density, 90)
hot_spots = density > threshold
print(f"Hot spot cells: {hot_spots.sum()} ({hot_spots.mean():.1%} of grid)")import numpy as np
def bivariate_gaussian(X, Y, mu_x=0, mu_y=0, sigma_x=1, sigma_y=1, rho=0):
z = ((X - mu_x)**2 / sigma_x**2
- 2*rho*(X - mu_x)*(Y - mu_y) / (sigma_x*sigma_y)
+ (Y - mu_y)**2 / sigma_y**2)
coeff = 1 / (2 * np.pi * sigma_x * sigma_y * np.sqrt(1 - rho**2))
return coeff * np.exp(-z / (2 * (1 - rho**2)))
x = np.linspace(-3, 3, 50)
y = np.linspace(-3, 3, 50)
X, Y = np.meshgrid(x, y)
dx = x[1] - x[0]
dy = y[1] - y[0]
# Standard normal (rho=0)
Z1 = bivariate_gaussian(X, Y)
print(f"Standard: max={Z1.max():.4f}, integral={Z1.sum()*dx*dy:.4f}")
# Correlated (rho=0.7)
Z2 = bivariate_gaussian(X, Y, mu_x=0.5, mu_y=-0.5, sigma_x=1.2, sigma_y=0.8, rho=0.7)
print(f"Correlated: max={Z2.max():.4f}, integral={Z2.sum()*dx*dy:.4f}")
print(f"Peak at: ({X.ravel()[Z2.argmax()]:.2f}, {Y.ravel()[Z2.argmax()]:.2f})")
Broadcasting rules let NumPy operate on arrays of different shapes without copying data. Master broadcasting for pairwise distances, outer products, and vectorized scoring.
import numpy as np
# Broadcasting rule: align shapes from the right,
# size-1 dimensions are stretched to match
# Example 1: add row vector to each row of a matrix
m = np.ones((4, 3))
v = np.array([10, 20, 30]) # shape (3,)
print("m + v:", m + v) # (4,3) + (3,) -> (4,3)
# Example 2: add column vector to each column
col = np.array([[1], [2], [3], [4]]) # shape (4, 1)
print("m + col:\n", m + col) # (4,3) + (4,1) -> (4,3)
# Example 3: outer product via broadcasting
a = np.array([1, 2, 3]) # shape (3,)
b = np.array([10, 20]) # shape (2,)
outer = a[:, np.newaxis] * b[np.newaxis, :] # (3,1) * (1,2) -> (3,2)
print("Outer product:\n", outer)
print("Same as np.outer:", np.array_equal(outer, np.outer(a, b)))
# Shape check
shapes = [(3, 4, 5), (4, 5), (5,), (1,)]
base = np.zeros((3, 4, 5))
for s in shapes:
arr = np.ones(s)
print(f"(3,4,5) + {s} = {(base + arr).shape}")import numpy as np
rng = np.random.default_rng(42)
# Euclidean distance matrix: D[i,j] = dist(A[i], B[j])
def pairwise_euclidean(A, B):
# A: (m, d), B: (n, d) -> (m, n)
# ||a - b||^2 = ||a||^2 + ||b||^2 - 2 a.b
sq_A = (A**2).sum(axis=1, keepdims=True) # (m, 1)
sq_B = (B**2).sum(axis=1, keepdims=True) # (n, 1)
dot = A @ B.T # (m, n)
return np.sqrt(np.maximum(sq_A + sq_B.T - 2*dot, 0))
A = rng.standard_normal((50, 3)) # 50 query points
B = rng.standard_normal((100, 3)) # 100 database points
D = pairwise_euclidean(A, B)
print(f"Distance matrix: {D.shape}")
print(f"Min dist: {D.min():.3f}, Max dist: {D.max():.3f}")
# Nearest neighbor for each query
nn_idx = D.argmin(axis=1)
nn_dist = D.min(axis=1)
print(f"Query 0: nearest is B[{nn_idx[0]}] at distance {nn_dist[0]:.3f}")
# k-NN: top-k closest
k = 3
knn_idx = np.argpartition(D, k, axis=1)[:, :k]
print(f"3-NN indices for query 0: {knn_idx[0]}")
# Cosine similarity matrix
def cosine_sim(A, B):
A_norm = A / (np.linalg.norm(A, axis=1, keepdims=True) + 1e-9)
B_norm = B / (np.linalg.norm(B, axis=1, keepdims=True) + 1e-9)
return A_norm @ B_norm.T
C = cosine_sim(A, B)
print(f"Cosine sim range: [{C.min():.3f}, {C.max():.3f}]")import numpy as np
rng = np.random.default_rng(42)
# Multi-criteria scoring: score[i,j] = dot(weights, features[i,j])
# items: (N, F) features, weights: (F,) -> scores: (N,)
n_items, n_features = 1000, 10
features = rng.uniform(0, 1, (n_items, n_features))
weights = np.array([0.3, 0.2, 0.15, 0.1, 0.1, 0.05, 0.04, 0.03, 0.02, 0.01])
scores = features @ weights # (N,F) @ (F,) -> (N,)
print(f"Scores shape: {scores.shape}, range: [{scores.min():.3f}, {scores.max():.3f}]")
# Rank items (descending score)
ranks = np.argsort(scores)[::-1]
print("Top 5 items:", ranks[:5], "scores:", scores[ranks[:5]].round(3))
# Broadcasting for threshold matrix: flag (user, item) pairs
n_users = 20
user_thresholds = rng.uniform(0.4, 0.7, n_users) # (20,)
# recommended[i,j] = True if scores[j] >= thresholds[i]
recommended = scores[np.newaxis, :] >= user_thresholds[:, np.newaxis] # (20, 1000)
print(f"Recommendation matrix: {recommended.shape}")
print(f"Avg recommendations per user: {recommended.sum(axis=1).mean():.1f}")
# Pairwise similarity between items (feature dot products)
item_sim = features @ features.T # (N, N)
print(f"Item similarity matrix: {item_sim.shape}")
print(f"Self-similarity range: [{np.diag(item_sim).min():.3f}, {np.diag(item_sim).max():.3f}]")import numpy as np
rng = np.random.default_rng(42)
# Simulate document embeddings and a query
n_docs, dim = 50_000, 128
docs = rng.standard_normal((n_docs, dim))
docs /= np.linalg.norm(docs, axis=1, keepdims=True) # L2 normalize
query = rng.standard_normal(dim)
query /= np.linalg.norm(query)
# Cosine similarity: since both normalized, just dot product
sims = docs @ query # (50000,)
# Top-5 results
top5_idx = np.argpartition(sims, -5)[-5:]
top5_idx = top5_idx[np.argsort(sims[top5_idx])[::-1]]
print("Top 5 similar documents:")
for rank, idx in enumerate(top5_idx, 1):
print(f" Rank {rank}: doc_id={idx:5d}, similarity={sims[idx]:.4f}")
# Retrieval metrics: how many docs above threshold?
threshold = 0.1
n_relevant = (sims >= threshold).sum()
print(f"Docs with similarity >= {threshold}: {n_relevant} ({n_relevant/n_docs:.2%})")
# Batch queries
n_queries = 100
queries = rng.standard_normal((n_queries, dim))
queries /= np.linalg.norm(queries, axis=1, keepdims=True)
batch_sims = queries @ docs.T # (100, 50000)
batch_top1 = batch_sims.argmax(axis=1)
print(f"Batch top-1 for {n_queries} queries: mean sim={batch_sims.max(axis=1).mean():.4f}")import numpy as np
def softmax(x, axis=-1):
e = np.exp(x - x.max(axis=axis, keepdims=True))
return e / e.sum(axis=axis, keepdims=True)
def scaled_dot_product_attention(Q, K, V):
# Q: (seq, d_k), K: (seq, d_k), V: (seq, d_v)
d_k = Q.shape[-1]
# TODO: scores = softmax(Q @ K.T / sqrt(d_k))
# TODO: return scores @ V
pass
def multi_head_attention(Q, K, V, n_heads):
seq, d = Q.shape
d_k = d // n_heads
heads = []
for h in range(n_heads):
# TODO: slice Q[:,h*d_k:(h+1)*d_k], K, V similarly
# TODO: apply scaled_dot_product_attention, collect output
pass
# TODO: concatenate heads along last axis
pass
rng = np.random.default_rng(42)
seq, d = 10, 64
Q = rng.standard_normal((seq, d))
K = rng.standard_normal((seq, d))
V = rng.standard_normal((seq, d))
out1 = scaled_dot_product_attention(Q, K, V)
print("Single-head output shape:", out1.shape) # (10, 64)
out2 = multi_head_attention(Q, K, V, n_heads=8)
print("Multi-head output shape:", out2.shape) # (10, 64)
Ufuncs are vectorized functions that operate element-wise on arrays. They support reduce, accumulate, outer, and reduceat β and you can create custom ufuncs with np.frompyfunc or Numba.
import numpy as np
arr = np.array([1, 2, 3, 4, 5])
# reduce: apply ufunc repeatedly to reduce array
print("np.add.reduce:", np.add.reduce(arr)) # 15 (sum)
print("np.multiply.reduce:", np.multiply.reduce(arr)) # 120 (product)
print("np.maximum.reduce:", np.maximum.reduce(arr)) # 5
# accumulate: like reduce but keeps intermediate results
print("np.add.accumulate:", np.add.accumulate(arr)) # cumsum
print("np.multiply.accumulate:", np.multiply.accumulate(arr)) # cumprod
print("np.maximum.accumulate:", np.maximum.accumulate(arr))
# outer: compute all pairs
a = np.array([1, 2, 3])
b = np.array([10, 20, 30])
print("np.add.outer:\n", np.add.outer(a, b))
print("np.multiply.outer:\n", np.multiply.outer(a, b))
# 2D reduce along axis
m = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("Row sums:", np.add.reduce(m, axis=1)) # [6, 15, 24]
print("Col products:", np.multiply.reduce(m, axis=0)) # [28, 80, 162]
# reduceat: reduce in segments
data = np.arange(10)
# Reduce segments: [0:3], [3:6], [6:10]
result = np.add.reduceat(data, [0, 3, 6])
print("Segment sums:", result) # [0+1+2, 3+4+5, 6+7+8+9]import numpy as np
# np.frompyfunc: wrap a Python function as a ufunc
def clip_and_scale(x, lo=0, hi=1):
return min(max(x, lo), hi) * 100
# frompyfunc returns object dtype by default
ufunc_cas = np.frompyfunc(lambda x: clip_and_scale(x), 1, 1)
data = np.array([-0.5, 0.0, 0.3, 0.7, 1.0, 1.5])
print("clip_and_scale:", ufunc_cas(data).astype(float))
# vectorize: more user-friendly, supports output dtype
clip_pct = np.vectorize(clip_and_scale, otypes=[float])
print("vectorize:", clip_pct(data))
# Common ufuncs catalogue
x = np.linspace(0.1, 2.0, 5)
print("exp:", np.exp(x).round(3))
print("log:", np.log(x).round(3))
print("log2:", np.log2(x).round(3))
print("log10:", np.log10(x).round(3))
print("sqrt:", np.sqrt(x).round(3))
print("cbrt:", np.cbrt(x).round(3))
# Trigonometric
angles = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
print("sin:", np.sin(angles).round(3))
print("arcsin:", np.arcsin(np.sin(angles)).round(3))import numpy as np
import timeit
# where argument: apply ufunc only to selected elements
rng = np.random.default_rng(42)
data = rng.uniform(-5, 5, 1_000_000)
# np.sqrt only where data > 0, else keep 0
result = np.zeros_like(data)
np.sqrt(data, out=result, where=data > 0)
print(f"sqrt(data) where > 0: {result[:5].round(3)}")
# np.log1p (log(1+x)) is more numerically stable for small x
small = np.array([1e-10, 1e-5, 0.001, 0.01, 0.1])
print("log(1+x): ", np.log(1 + small))
print("log1p(x): ", np.log1p(small))
# Ufunc with out argument (in-place, avoids allocation)
a = np.random.rand(1_000_000)
b = np.empty_like(a)
t1 = timeit.timeit(lambda: np.exp(a), number=10)
t2 = timeit.timeit(lambda: np.exp(a, out=b), number=10)
print(f"exp(a) without out: {t1:.3f}s")
print(f"exp(a, out=b): {t2:.3f}s")
# at: unbuffered in-place operation (useful for scatter operations)
arr = np.zeros(5)
indices = np.array([0, 1, 0, 2, 1])
values = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
np.add.at(arr, indices, values)
print("add.at result:", arr) # handles duplicate indices correctlyimport numpy as np
# Pre-allocated buffers
rng = np.random.default_rng(42)
N = 1_000_000
x = rng.standard_normal(N).astype(np.float32)
buf = np.empty_like(x)
def relu(x, out=None):
return np.maximum(x, 0, out=out)
def sigmoid(x, out=None):
out = np.empty_like(x) if out is None else out
np.negative(x, out=out)
np.exp(out, out=out)
out += 1
np.reciprocal(out, out=out)
return out
def gelu(x, out=None):
# Approximate GELU: x * sigmoid(1.702 * x)
s = sigmoid(1.702 * x)
return np.multiply(x, s, out=out)
def swish(x, out=None):
return np.multiply(x, sigmoid(x), out=out)
# Compute and compare
activations = {
"ReLU": relu(x, buf.copy()),
"Sigmoid": sigmoid(x, buf.copy()),
"GELU": gelu(x, buf.copy()),
"Swish": swish(x, buf.copy()),
}
for name, act in activations.items():
print(f"{name:8s}: mean={act.mean():.4f}, std={act.std():.4f}, "
f"range=[{act.min():.3f}, {act.max():.3f}]")import numpy as np
def haversine_scalar(lat1, lon1, lat2, lon2):
R = 6371 # Earth radius in km
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
return 2 * R * np.arcsin(np.sqrt(a))
# Vectorize to work on arrays
haversine = np.vectorize(haversine_scalar, otypes=[float])
# New York City
ny_lat, ny_lon = 40.7128, -74.0060
rng = np.random.default_rng(42)
n_cities = 100
city_lats = rng.uniform(-60, 70, n_cities)
city_lons = rng.uniform(-180, 180, n_cities)
distances = haversine(ny_lat, ny_lon, city_lats, city_lons)
print(f"Distances computed: {len(distances)}")
print(f"Nearest city: {distances.min():.0f} km")
print(f"Farthest city: {distances.max():.0f} km")
print(f"Average distance from NYC: {distances.mean():.0f} km")
np.ma.MaskedArray transparently handles missing data by maintaining a boolean mask alongside values. Useful for sensor data with gaps or any dataset with invalid entries.
import numpy as np
# Create masked array
data = np.array([1.0, 2.0, -999.0, 4.0, -999.0, 6.0])
mask = data == -999.0
ma = np.ma.array(data, mask=mask)
print("Data: ", ma)
print("Mask: ", ma.mask)
print("Filled:", ma.filled(fill_value=0))
# Operations automatically skip masked values
print("Mean (excludes masked):", ma.mean()) # (1+2+4+6)/4 = 3.25
print("Sum:", ma.sum()) # 13
print("Std:", ma.std().round(4))
# np.ma.masked_where
arr = np.array([10, 25, -5, 0, 8, -1, 15])
ma2 = np.ma.masked_where(arr <= 0, arr)
print("Masked non-positive:", ma2)
print("Log (safe):", np.log(ma2)) # no error for masked values
# np.ma.masked_invalid: auto-mask NaN and Inf
dirty = np.array([1.0, np.nan, 3.0, np.inf, 5.0, -np.inf])
clean = np.ma.masked_invalid(dirty)
print("Masked invalid:", clean)
print("Safe mean:", clean.mean())import numpy as np
# 2D masked array
temps = np.array([[15.2, 16.1, -999, 18.5],
[-999, 14.8, 15.9, 16.7],
[13.1, -999, -999, 15.3]])
m = np.ma.masked_equal(temps, -999)
print("Masked temps:\n", m)
print("Row means (excl. missing):", m.mean(axis=1))
print("Col means (excl. missing):", m.mean(axis=0))
# Fill with column means
col_means = m.mean(axis=0)
filled = m.filled(fill_value=col_means) # broadcasts col_means
print("Filled temps:\n", filled.round(1))
# Stacking masked arrays
a = np.ma.array([1, 2, 3, 4], mask=[0, 1, 0, 0])
b = np.ma.array([5, 6, 7, 8], mask=[0, 0, 1, 0])
stacked = np.ma.vstack([a, b])
print("Stacked:\n", stacked)
print("Column min (ignoring masks):", stacked.min(axis=0))
# Convert to/from regular arrays
arr = m.compressed() # 1D array of valid values only
print("Compressed:", arr)
arr2 = np.ma.filled(m, -1) # replace mask with -1
print("Filled(-1):\n", arr2)import numpy as np
rng = np.random.default_rng(42)
N = 1000
# Simulate sensor readings with random dropouts
raw = rng.normal(20, 3, N) # temperature readings
dropout_mask = rng.random(N) < 0.15 # 15% missing
sensor = np.ma.array(raw, mask=dropout_mask)
print(f"Total readings: {N}")
print(f"Valid readings: {sensor.count()} ({sensor.count()/N:.1%})")
print(f"Masked readings: {sensor.mask.sum()}")
# Robust statistics (automatically use only valid data)
print(f"Mean: {sensor.mean():.3f}")
print(f"Median: {np.ma.median(sensor):.3f}")
print(f"Std: {sensor.std():.3f}")
print(f"Min: {sensor.min():.3f}")
print(f"Max: {sensor.max():.3f}")
# Percentiles (need filled for np.percentile)
filled_for_pct = sensor.compressed()
print(f"P25/P75: {np.percentile(filled_for_pct, [25,75]).round(2)}")
# Anomaly detection: flag values > 3 sigma (on top of existing mask)
mean, std = sensor.mean(), sensor.std()
anomaly_mask = (sensor < mean - 3*std) | (sensor > mean + 3*std)
sensor_clean = np.ma.array(sensor.data, mask=sensor.mask | anomaly_mask.filled(False))
print(f"After anomaly removal: {sensor_clean.count()} valid readings")import numpy as np
rng = np.random.default_rng(42)
n_years = 5
n_days = n_years * 365
# Simulate ocean temps with seasonal pattern
t = np.linspace(0, n_years * 2 * np.pi, n_days)
base_temp = 15 + 10 * np.sin(t - np.pi/2) # seasonal cycle
temps = base_temp + rng.normal(0, 1.5, n_days)
# Inject data quality issues
sensor_fail = rng.random(n_days) < 0.08 # 8% sensor failures
physical_anom = rng.random(n_days) < 0.02 # 2% physical anomalies
temps[sensor_fail] = -9999.0
temps[physical_anom] = rng.choice([-5.0, 40.0], physical_anom.sum())
# Build mask: flag everything invalid
mask = ((temps == -9999.0) |
(temps < -2.0) |
(temps > 35.0))
clean = np.ma.array(temps, mask=mask)
print(f"Data coverage: {clean.count()/n_days:.1%} valid ({clean.count()} of {n_days} days)")
print(f"Mean temperature: {clean.mean():.2f}Β°C")
print(f"Seasonal range: [{clean.min():.2f}, {clean.max():.2f}]Β°C")
# Annual stats (reshape to years x 365)
annual = clean[:n_years*365].reshape(n_years, 365)
for y in range(n_years):
row = annual[y]
print(f" Year {y+1}: mean={row.mean():.2f}Β°C, valid={row.count()} days")import numpy as np
def masked_corrcoef(X):
n_obs, n_vars = X.shape
corr = np.eye(n_vars)
for i in range(n_vars):
for j in range(i+1, n_vars):
# Rows valid for BOTH columns i and j
valid = (~X[:, i].mask) & (~X[:, j].mask)
if valid.sum() < 2:
corr[i, j] = corr[j, i] = np.nan
continue
xi = X[valid, i].data
xj = X[valid, j].data
r = np.corrcoef(xi, xj)[0, 1]
corr[i, j] = corr[j, i] = r
return corr
rng = np.random.default_rng(42)
n, p = 200, 4
data = rng.standard_normal((n, p))
# Create correlated structure
data[:, 1] = 0.7 * data[:, 0] + 0.3 * rng.standard_normal(n)
masks = rng.random((n, p)) < 0.2 # 20% missing per column
X = np.ma.array(data, mask=masks)
corr = masked_corrcoef(X)
print("Pairwise correlations (with per-pair valid observations):")
print(corr.round(3))
print("Expected corr(0,1) ~ 0.7:", corr[0, 1].round(2))
NumPy arrays store data in contiguous memory blocks. Understanding C/F order, strides, and views vs copies is critical for performance and interoperability with C/Fortran libraries.
import numpy as np
# C order (row-major): last axis varies fastest (default)
arr_c = np.array([[1, 2, 3], [4, 5, 6]], order='C')
print("C order strides:", arr_c.strides) # (12, 4) for float32
# F order (column-major): first axis varies fastest
arr_f = np.array([[1, 2, 3], [4, 5, 6]], order='F')
print("F order strides:", arr_f.strides) # (4, 8) for float32
# Check flags
print("C-contiguous:", arr_c.flags['C_CONTIGUOUS']) # True
print("F-contiguous:", arr_c.flags['F_CONTIGUOUS']) # False
# Transpose only changes strides, no data copy
T = arr_c.T
print("Transposed strides:", T.strides)
print("T is view:", T.base is arr_c) # True
# ascontiguousarray: ensure C-contiguous copy
T_c = np.ascontiguousarray(T)
print("After ascontiguousarray:", T_c.flags['C_CONTIGUOUS'])
# Strides as bytes
arr = np.arange(24, dtype=np.float64).reshape(2, 3, 4)
print(f"Shape: {arr.shape}, Strides (bytes): {arr.strides}")
print(f"Itemsize: {arr.itemsize} bytes")
# stride[k] = itemsize * product(shape[k+1:])import numpy as np
arr = np.arange(20, dtype=float)
# Slice creates a VIEW (shares memory)
view = arr[2:10:2] # every 2nd element from 2 to 10
print("view:", view)
print("view.base is arr:", view.base is arr)
view[0] = 999 # modifies arr!
print("arr after view[0]=999:", arr[:12])
# copy() creates an independent copy
arr2 = arr.copy()
arr2[0] = -1
print("Original arr[0] unchanged:", arr[0]) # still 999 (from earlier)
# Fancy indexing always copies
fancy = arr[[0, 3, 7]]
print("fancy.base:", fancy.base) # None = not a view
# reshape: usually a view if data is contiguous
arr3 = np.arange(12)
reshaped = arr3.reshape(3, 4)
print("reshape is view:", reshaped.base is arr3)
reshaped[0, 0] = -1
print("arr3[0] changed:", arr3[0]) # -1
# np.shares_memory: robust check
a = np.arange(10)
b = a[::2]
c = a.copy()
print("a and b share memory:", np.shares_memory(a, b)) # True
print("a and c share memory:", np.shares_memory(a, c)) # Falseimport numpy as np
from numpy.lib.stride_tricks import sliding_window_view, as_strided
arr = np.arange(10, dtype=float)
# sliding_window_view: safe way to get overlapping windows
windows = sliding_window_view(arr, window_shape=4)
print("Windows shape:", windows.shape) # (7, 4)
print("First 3 windows:\n", windows[:3])
# Each window is a VIEW β no data copy!
windows[0, 0] = 999
print("arr[0] changed:", arr[0]) # yes, 999
arr = np.arange(10, dtype=float) # reset
# 2D rolling windows (for image patches)
img = np.arange(16, dtype=float).reshape(4, 4)
patches = sliding_window_view(img, window_shape=(2, 2))
print("2D patches shape:", patches.shape) # (3, 3, 2, 2)
print("Patch at [0,0]:\n", patches[0, 0])
# as_strided: manual stride tricks (use with care!)
# Rolling mean via strides
def rolling_mean_strided(arr, window):
w = sliding_window_view(arr, window)
return w.mean(axis=1)
rolling = rolling_mean_strided(np.arange(10, dtype=float), 3)
print("Rolling mean (window=3):", rolling)import numpy as np, timeit
rng = np.random.default_rng(42)
A = rng.standard_normal((500, 300))
B = rng.standard_normal((300, 400))
# C-contiguous vs non-contiguous performance
B_T_F = np.asfortranarray(B.T) # F-order, non-contiguous for @
B_T_C = np.ascontiguousarray(B.T) # C-contiguous copy
print("B.T C-contiguous:", B.T.flags['C_CONTIGUOUS']) # False
print("B_T_C contiguous:", B_T_C.flags['C_CONTIGUOUS']) # True
# GEMM: A @ B (B is 300x400 C-contiguous)
t1 = timeit.timeit(lambda: A @ B, number=100)
t2 = timeit.timeit(lambda: A @ B_T_F.T, number=100)
t3 = timeit.timeit(lambda: A @ B_T_C.T, number=100)
print(f"A @ B (contiguous): {t1:.4f}s")
print(f"A @ B_T_F.T (F-order): {t2:.4f}s")
print(f"A @ B_T_C.T (C-order): {t3:.4f}s")
# Verify results are the same
C1 = A @ B
C2 = A @ B_T_F.T
print("Results match:", np.allclose(C1, C2))
# Memory usage check
import sys
print(f"B.T.copy() size: {sys.getsizeof(B.T.copy()):,} bytes")
print(f"B.T view size: {sys.getsizeof(B.T):,} bytes (metadata only)")import numpy as np
def inspect_array(arr):
print(f"Shape: {arr.shape}")
print(f"dtype: {arr.dtype}")
print(f"strides (bytes): {arr.strides}")
print(f"itemsize: {arr.itemsize}")
print(f"nbytes: {arr.nbytes:,}")
print(f"C-contiguous: {arr.flags['C_CONTIGUOUS']}")
print(f"F-contiguous: {arr.flags['F_CONTIGUOUS']}")
print(f"Is view: {arr.base is not None}")
print(f"N elements: {arr.size}")
def reshape_safe(arr, shape):
if not arr.flags['C_CONTIGUOUS']:
arr = np.ascontiguousarray(arr)
return arr.reshape(shape)
base = np.arange(24, dtype=np.float32).reshape(4, 6)
print("=== Base array ===")
inspect_array(base)
print("\n=== Transposed ===")
T = base.T
inspect_array(T)
print("\n=== Slice (non-contiguous) ===")
s = base[::2, ::2]
inspect_array(s)
print("\n=== Reshape safe (from transposed) ===")
r = reshape_safe(T, (24,))
inspect_array(r)
Beyond basic dot products: matrix decompositions (SVD, QR, Cholesky), determinants, null spaces, pseudo-inverse, and Kronecker products for systems of equations and dimensionality reduction.
import numpy as np
rng = np.random.default_rng(42)
# QR decomposition: A = Q @ R
A = rng.standard_normal((6, 4))
Q, R = np.linalg.qr(A)
print(f"A: {A.shape}, Q: {Q.shape}, R: {R.shape}")
print("Q orthonormal:", np.allclose(Q.T @ Q, np.eye(4)))
print("A = Q@R:", np.allclose(A, Q @ R))
# Least squares: solve Ax = b for overdetermined system
m, n = 100, 5
A = rng.standard_normal((m, n))
true_x = rng.standard_normal(n)
b = A @ true_x + rng.normal(0, 0.1, m) # noisy measurements
# lstsq: minimize ||Ax - b||^2
x, residuals, rank, sv = np.linalg.lstsq(A, b, rcond=None)
print(f"True x: {true_x[:3].round(3)}")
print(f"Solved x: {x[:3].round(3)}")
print(f"Matrix rank: {rank}, condition number: {sv[0]/sv[-1]:.1f}")
# Via normal equations (less stable but educational)
x_normal = np.linalg.solve(A.T @ A, A.T @ b)
print("Normal eq error:", np.abs(x - x_normal).max())import numpy as np
rng = np.random.default_rng(42)
# Cholesky decomposition: A = L @ L.T (for positive definite A)
# Used for sampling multivariate normal and inverting covariance matrices
n = 5
A = rng.standard_normal((n, n))
S = A.T @ A + n * np.eye(n) # make it positive definite
L = np.linalg.cholesky(S)
print("L shape:", L.shape)
print("L @ L.T == S:", np.allclose(L @ L.T, S))
print("L is lower triangular:", np.allclose(L, np.tril(L)))
# Sample from multivariate normal using Cholesky
mu = np.zeros(n)
cov = S / S.max() # scale to reasonable range
L = np.linalg.cholesky(cov)
samples = mu + rng.standard_normal((1000, n)) @ L.T
print(f"Sample covariance matches target: {np.allclose(np.cov(samples.T), cov, atol=0.1)}")
# Determinant (log for numerical stability)
logdet_sign, logdet = np.linalg.slogdet(S)
print(f"logdet(S): {logdet:.3f}, sign: {logdet_sign}")
print(f"det(S) = exp({logdet:.1f}) ~ {np.exp(logdet):.2e}")
# Pseudo-inverse (Moore-Penrose)
m = np.array([[1, 2], [3, 4], [5, 6]])
pinv = np.linalg.pinv(m)
print("Pseudo-inverse shape:", pinv.shape)
print("m @ pinv @ m == m:", np.allclose(m @ pinv @ m, m))import numpy as np
rng = np.random.default_rng(42)
# Eigendecomposition: A v = lambda v
A = np.array([[4, -2], [1, 1]])
eigenvalues, eigenvectors = np.linalg.eig(A)
print("Eigenvalues:", eigenvalues)
print("Eigenvectors:\n", eigenvectors)
# Verify: A @ v = lambda * v
for i in range(len(eigenvalues)):
v = eigenvectors[:, i]
print(f" A @ v{i} == {eigenvalues[i]:.1f} * v{i}: {np.allclose(A @ v, eigenvalues[i] * v)}")
# Symmetric matrix: use eigh (more stable, guaranteed real eigenvalues)
S = np.array([[3, 1], [1, 3]])
w, v = np.linalg.eigh(S)
print("Symmetric eigs:", w)
# PCA from scratch using SVD
n, d = 200, 5
X = rng.standard_normal((n, d))
X[:, 1] = 0.8 * X[:, 0] + 0.6 * X[:, 1] # create correlation
# Center data
X_c = X - X.mean(axis=0)
# SVD of centered data
U, s, Vt = np.linalg.svd(X_c, full_matrices=False)
explained = s**2 / (s**2).sum()
print("Explained variance ratio:", explained.round(3))
print("Cumulative:", explained.cumsum().round(3))
# Project to 2 components
X_2d = X_c @ Vt[:2].T
print(f"PCA 2D shape: {X_2d.shape}")import numpy as np
rng = np.random.default_rng(42)
n_users, n_items, n_factors = 100, 200, 10
# Simulate a low-rank rating matrix (most entries unknown)
U_true = rng.standard_normal((n_users, n_factors))
V_true = rng.standard_normal((n_items, n_factors))
R_true = U_true @ V_true.T + rng.normal(0, 0.5, (n_users, n_items))
R_true = np.clip(R_true, 1, 5)
# Only observe 20% of ratings
observed_mask = rng.random((n_users, n_items)) < 0.2
R_obs = np.where(observed_mask, R_true, 0.0)
# Truncated SVD on observed (imperfect but illustrative)
U, s, Vt = np.linalg.svd(R_obs, full_matrices=False)
k = n_factors
R_hat = U[:, :k] @ np.diag(s[:k]) @ Vt[:k, :]
R_hat = np.clip(R_hat, 1, 5)
# RMSE on observed entries
observed_pred = R_hat[observed_mask]
observed_true = R_true[observed_mask]
rmse = np.sqrt(np.mean((observed_pred - observed_true)**2))
print(f"RMSE on observed ratings: {rmse:.3f}")
# Top-5 recommendations for user 0
user0_scores = R_hat[0]
user0_unseen = ~observed_mask[0]
top5 = np.argsort(user0_scores * user0_unseen)[::-1][:5]
print(f"Top-5 recommendations for user 0: items {top5} with scores {user0_scores[top5].round(2)}")import numpy as np
def solve_system(A, b):
m, n = A.shape
rank = np.linalg.matrix_rank(A)
if m == n and rank == n:
label = "well-determined"
x = np.linalg.solve(A, b)
elif m > n:
label = "overdetermined (least squares)"
x, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
else:
label = "underdetermined (min-norm)"
x = np.linalg.pinv(A) @ b
return x, label
def condition_number(A):
s = np.linalg.svd(A, compute_uv=False)
return s[0] / s[-1] if s[-1] > 0 else np.inf
# Test cases
rng = np.random.default_rng(42)
# Well-determined 3x3
A1 = rng.standard_normal((3, 3))
b1 = rng.standard_normal(3)
x1, lbl1 = solve_system(A1, b1)
print(f"{lbl1}: cond={condition_number(A1):.1f}, error={np.linalg.norm(A1@x1-b1):.2e}")
# Overdetermined 10x3
A2 = rng.standard_normal((10, 3))
b2 = rng.standard_normal(10)
x2, lbl2 = solve_system(A2, b2)
print(f"{lbl2}: cond={condition_number(A2):.1f}, residual={np.linalg.norm(A2@x2-b2):.4f}")
# Ill-conditioned
A3 = np.array([[1, 1], [1, 1+1e-10]])
b3 = np.array([2.0, 2.0])
x3, lbl3 = solve_system(A3, b3)
print(f"{lbl3}: cond={condition_number(A3):.2e}")
Finite differences approximate gradients of black-box functions. Combined with np.gradient, they enable sensitivity analysis, gradient checking, and simple optimization without symbolic math.
import numpy as np
# First derivative via finite differences
def forward_diff(f, x, h=1e-5):
return (f(x + h) - f(x)) / h
def backward_diff(f, x, h=1e-5):
return (f(x) - f(x - h)) / h
def central_diff(f, x, h=1e-5):
return (f(x + h) - f(x - h)) / (2 * h)
def second_diff(f, x, h=1e-5):
return (f(x + h) - 2*f(x) + f(x - h)) / h**2
# Test on f(x) = sin(x), f'(x) = cos(x)
f = np.sin
x = np.pi / 4
true_deriv = np.cos(x)
print(f"True f'(pi/4) = {true_deriv:.10f}")
print(f"Forward: {forward_diff(f, x):.10f} err={abs(forward_diff(f,x)-true_deriv):.2e}")
print(f"Backward: {backward_diff(f, x):.10f} err={abs(backward_diff(f,x)-true_deriv):.2e}")
print(f"Central: {central_diff(f, x):.10f} err={abs(central_diff(f,x)-true_deriv):.2e}")
print(f"Second: {second_diff(f, x):.10f} true=-sin(pi/4)={-np.sin(x):.10f}")
# Vectorized on array
x_arr = np.linspace(0, 2*np.pi, 100)
deriv_arr = central_diff(np.sin, x_arr)
error = np.abs(deriv_arr - np.cos(x_arr))
print(f"Max error over array: {error.max():.2e}")import numpy as np
# np.gradient: uses central differences internally, handles edges with one-sided
x = np.linspace(0, 2*np.pi, 100)
y = np.sin(x)
dydx = np.gradient(y, x) # pass x for proper spacing
true = np.cos(x)
print(f"Max error vs cos(x): {np.abs(dydx - true).max():.4e}")
# Partial derivatives of 2D function
nx, ny = 50, 60
x_1d = np.linspace(0, 2, nx)
y_1d = np.linspace(0, 3, ny)
X, Y = np.meshgrid(x_1d, y_1d)
Z = np.sin(X) * np.cos(Y)
# gradient returns [dZ/dy, dZ/dx] for 2D (row, col ordering)
dZ_dy, dZ_dx = np.gradient(Z, y_1d, x_1d)
true_dZ_dx = np.cos(X) * np.cos(Y)
true_dZ_dy = -np.sin(X) * np.sin(Y)
print(f"dZ/dx error: {np.abs(dZ_dx - true_dZ_dx).max():.4e}")
print(f"dZ/dy error: {np.abs(dZ_dy - true_dZ_dy).max():.4e}")
# Gradient magnitude (for edge detection in images)
mag = np.sqrt(dZ_dx**2 + dZ_dy**2)
print(f"Gradient magnitude: min={mag.min():.3f}, max={mag.max():.3f}")import numpy as np
rng = np.random.default_rng(42)
# Optimize f(x, y) = (x-3)^2 + 2*(y+1)^2 (minimum at (3, -1))
def f(params):
x, y = params
return (x - 3)**2 + 2*(y + 1)**2
def grad_f(params):
x, y = params
return np.array([2*(x - 3), 4*(y + 1)])
# Gradient descent
params = np.array([0.0, 0.0])
lr = 0.1
history = [params.copy()]
for i in range(50):
g = grad_f(params)
params = params - lr * g
history.append(params.copy())
if i % 10 == 9:
print(f" iter {i+1:3d}: f={f(params):.6f}, params=({params[0]:.3f}, {params[1]:.3f})")
print(f"True minimum: (3, -1), found: ({params[0]:.4f}, {params[1]:.4f})")
# Numerical gradient check (compare analytic vs finite diff)
test_pt = rng.standard_normal(2)
analytic = grad_f(test_pt)
h = 1e-6
numeric = np.array([(f(test_pt + h*e) - f(test_pt - h*e)) / (2*h)
for e in np.eye(2)])
print(f"Gradient check: analytic={analytic.round(4)}, numeric={numeric.round(4)}")
print(f"Max error: {np.abs(analytic - numeric).max():.2e}")import numpy as np
rng = np.random.default_rng(42)
# Simulate: find mu, sigma that best fit observed log returns
n_obs = 500
true_mu, true_sigma = 0.001, 0.02
observed = rng.normal(true_mu, true_sigma, n_obs)
def neg_log_likelihood(params, data):
mu, log_sigma = params
sigma = np.exp(log_sigma) # log parameterization ensures sigma > 0
return 0.5 * np.sum(np.log(2*np.pi*sigma**2) + ((data - mu)/sigma)**2)
def numerical_gradient(f, params, h=1e-6):
grad = np.zeros_like(params)
for i in range(len(params)):
e = np.zeros_like(params)
e[i] = h
grad[i] = (f(params + e) - f(params - e)) / (2 * h)
return grad
params = np.array([0.0, np.log(0.01)]) # initial guess
lr = 1e-4
for epoch in range(500):
loss = neg_log_likelihood(params, observed)
grad = numerical_gradient(lambda p: neg_log_likelihood(p, observed), params)
params -= lr * grad
mu_hat = params[0]
sigma_hat = np.exp(params[1])
print(f"True: mu={true_mu:.4f}, sigma={true_sigma:.4f}")
print(f"Estimated: mu={mu_hat:.4f}, sigma={sigma_hat:.4f}")
print(f"Error: mu={abs(mu_hat-true_mu):.2e}, sigma={abs(sigma_hat-true_sigma):.2e}")import numpy as np
def jacobian(f, x, h=1e-6):
x = np.asarray(x, float)
f0 = np.asarray(f(x), float)
m = f0.size
n = x.size
J = np.zeros((m, n))
for j in range(n):
dx = np.zeros(n)
dx[j] = h
J[:, j] = (np.asarray(f(x + dx)) - np.asarray(f(x - dx))) / (2 * h)
return J
def gradient_check(f, grad_f, x, h=1e-6):
J_num = jacobian(f, x, h)
J_ana = np.asarray(grad_f(x))
err = np.abs(J_num - J_ana)
rel = err / (np.abs(J_ana) + 1e-8)
return {"max_abs_err": err.max(), "max_rel_err": rel.max(), "ok": rel.max() < 1e-4}
# Test function: f(x) = [sin(x0)*x1, x0^2 + exp(x1)]
def f_vec(x):
return [np.sin(x[0])*x[1], x[0]**2 + np.exp(x[1])]
def df_vec(x):
return np.array([
[np.cos(x[0])*x[1], np.sin(x[0])],
[2*x[0], np.exp(x[1])],
])
x0 = np.array([0.5, 1.2])
result = gradient_check(f_vec, df_vec, x0)
print("Jacobian check:", result)
print("Numerical J:\n", jacobian(f_vec, x0).round(6))
print("Analytic J:\n", df_vec(x0).round(6))
NumPy integrates tightly with pandas, scikit-learn, PyTorch, and SciPy. Understanding these bridges β arrays, dtypes, memory sharing β makes you faster at the whole pipeline.
import numpy as np
# Simulate what pandas does under the hood
# A DataFrame is essentially a dict of 1D NumPy arrays
# From numpy to structured array (pandas-like)
n = 5
ids = np.arange(1, n+1)
names = np.array(['Alice', 'Bob', 'Carol', 'Dave', 'Eve'])
scores = np.array([92.5, 78.3, 88.1, 95.0, 70.2])
grades = np.where(scores >= 90, 'A', np.where(scores >= 80, 'B', 'C'))
# Operations you'd do in pandas, using only NumPy
mean_score = scores.mean()
top_scorers = names[scores >= 90]
passing = names[scores >= 75]
print("Mean score:", mean_score.round(2))
print("Top scorers (A grade):", top_scorers)
print("Passing (>=75):", passing)
# Group by grade
for grade in np.unique(grades):
mask = grades == grade
print(f"Grade {grade}: {names[mask].tolist()}, avg={scores[mask].mean():.1f}")
# NumPy array -> pandas DataFrame conversion info
try:
import pandas as pd
df = pd.DataFrame({'id': ids, 'name': names, 'score': scores, 'grade': grades})
# df.values returns numpy array (may copy depending on dtypes)
arr = df[['id', 'score']].to_numpy()
print("DataFrame to numpy:", arr.shape, arr.dtype)
except ImportError:
print("(pandas not available, but the pattern works)")import numpy as np
rng = np.random.default_rng(42)
n, d = 1000, 5
X = rng.standard_normal((n, d))
# 1. Normalization
def z_score(X):
return (X - X.mean(axis=0)) / (X.std(axis=0) + 1e-8)
def min_max(X):
lo, hi = X.min(axis=0), X.max(axis=0)
return (X - lo) / (hi - lo + 1e-8)
Xz = z_score(X)
Xm = min_max(X)
print(f"Z-score: mean={Xz.mean(axis=0).round(2)}, std={Xz.std(axis=0).round(2)}")
print(f"MinMax: [{Xm.min():.3f}, {Xm.max():.3f}]")
# 2. Polynomial features (degree 2, first 3 columns)
Xp = np.column_stack([X[:, :3], X[:, :3]**2,
X[:, 0:1]*X[:, 1:2], X[:, 1:2]*X[:, 2:3]])
print(f"Polynomial features: {X.shape} -> {Xp.shape}")
# 3. One-hot encoding
cats = rng.integers(0, 4, n) # 4 categories
ohe = (cats[:, np.newaxis] == np.arange(4)[np.newaxis, :]).astype(float)
print(f"OHE: shape={ohe.shape}, sum per row all 1: {np.all(ohe.sum(axis=1)==1)}")
# 4. Interaction features (outer product per row)
a = X[:, :3] # 3 features
# Row-wise outer: shape (n, 3, 3) -> upper triangle -> (n, 6) interaction terms
combos = np.einsum('ni,nj->nij', a, a)
triu_idx = np.triu_indices(3, k=0)
interactions = combos[:, triu_idx[0], triu_idx[1]]
print(f"Interaction features shape: {interactions.shape}")import numpy as np
rng = np.random.default_rng(42)
# Manually implement a 2-layer neural network forward pass
def relu(x): return np.maximum(0, x)
def softmax(x):
e = np.exp(x - x.max(axis=1, keepdims=True))
return e / e.sum(axis=1, keepdims=True)
# Simulated trained weights (normally loaded from file)
n_in, n_hidden, n_out = 784, 256, 10
W1 = rng.standard_normal((n_in, n_hidden)) * 0.01
b1 = np.zeros(n_hidden)
W2 = rng.standard_normal((n_hidden, n_out)) * 0.01
b2 = np.zeros(n_out)
def forward(X):
h1 = relu(X @ W1 + b1)
out = softmax(X @ W1 @ W2 + b2) # simplified
return out
# Batch inference
batch_size = 64
X_batch = rng.standard_normal((batch_size, n_in)).astype(np.float32)
probs = forward(X_batch)
preds = probs.argmax(axis=1)
conf = probs.max(axis=1)
print(f"Batch: {X_batch.shape} -> probs: {probs.shape}")
print(f"Predictions: {preds[:10]}")
print(f"Confidence: min={conf.min():.3f}, max={conf.max():.3f}, mean={conf.mean():.3f}")
# Top-3 predictions per sample
top3 = np.argsort(probs, axis=1)[:, -3:][:, ::-1]
print(f"Top-3 class indices for sample 0: {top3[0]}")import numpy as np
rng = np.random.default_rng(42)
N, D = 1000, 10
# Synthetic binary classification data
X = rng.standard_normal((N, D))
true_w = rng.standard_normal(D)
y = (X @ true_w + rng.normal(0, 0.5, N) > 0).astype(float)
# Train/val/test split (70/15/15)
idx = rng.permutation(N)
n_train = int(0.7*N); n_val = int(0.15*N)
tr, va, te = idx[:n_train], idx[n_train:n_train+n_val], idx[n_train+n_val:]
def normalize(X_tr, X_te):
mu, std = X_tr.mean(0), X_tr.std(0) + 1e-8
return (X_tr-mu)/std, (X_te-mu)/std
X_tr, X_te = normalize(X[tr], X[te])
X_tr, X_va = normalize(X[tr], X[va])
# Logistic regression with SGD
def sigmoid(z): return 1 / (1 + np.exp(-np.clip(z, -100, 100)))
w = np.zeros(D)
lr, n_epochs, bs = 0.1, 20, 32
for epoch in range(n_epochs):
perm = rng.permutation(len(tr))
for i in range(0, len(tr), bs):
xi = X_tr[perm[i:i+bs]]
yi = y[tr][perm[i:i+bs]]
pred = sigmoid(xi @ w)
grad = xi.T @ (pred - yi) / len(yi)
w -= lr * grad
def accuracy(X, y_true, w):
return ((sigmoid(X @ w) >= 0.5) == y_true.astype(bool)).mean()
print(f"Train acc: {accuracy(X_tr, y[tr], w):.3f}")
print(f"Val acc: {accuracy(X_va, y[va], w):.3f}")
print(f"Test acc: {accuracy(X_te, y[te], w):.3f}")import numpy as np
def k_fold_cv(X, y, model_fn, k=5, seed=42):
rng = np.random.default_rng(seed)
idx = rng.permutation(len(y))
fold_size = len(y) // k
scores = []
for fold in range(k):
val_idx = idx[fold*fold_size:(fold+1)*fold_size]
train_idx = np.concatenate([idx[:fold*fold_size], idx[(fold+1)*fold_size:]])
X_tr, y_tr = X[train_idx], y[train_idx]
X_va, y_va = X[val_idx], y[val_idx]
preds = model_fn(X_tr, y_tr, X_va)
scores.append((preds == y_va).mean())
return {"mean": np.mean(scores), "std": np.std(scores), "folds": scores}
# Threshold classifier: predict 1 if feature 0 > median
def threshold_model(X_tr, y_tr, X_va):
threshold = np.median(X_tr[:, 0])
return (X_va[:, 0] > threshold).astype(int)
rng = np.random.default_rng(42)
N, D = 500, 5
X = rng.standard_normal((N, D))
y = (X[:, 0] + 0.5 * X[:, 1] > 0).astype(int)
result = k_fold_cv(X, y, threshold_model, k=5)
print(f"5-fold CV: {result['mean']:.3f} Β± {result['std']:.3f}")
print(f"Per-fold: {[round(s,3) for s in result['folds']]}")