Loading Module…

πŸš€ MLOps & Model Deployment

24 topics • Click any card to expand

1. Saving & Loading Models

Persist trained models to disk for reuse, versioning, and deployment. Covers pickle, joblib, and framework-native formats.

Save/load sklearn model with joblib
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(n_estimators=50, random_state=42)
model.fit(X, y)

# Save
joblib.dump(model, 'iris_rf.joblib')
print('Model saved.')

# Load and predict
loaded = joblib.load('iris_rf.joblib')
preds = loaded.predict(X[:5])
print('Predictions:', preds)
print('Expected:   ', y[:5])
Save full sklearn pipeline
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_breast_cancer

X, y = load_breast_cancer(return_X_y=True)

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf',   LogisticRegression(max_iter=1000))
])
pipe.fit(X, y)

# Save the entire pipeline β€” preprocessor + model
joblib.dump(pipe, 'cancer_pipeline.joblib')

loaded_pipe = joblib.load('cancer_pipeline.joblib')
print('Pipeline accuracy:', loaded_pipe.score(X, y))
print('Steps:', [name for name, _ in loaded_pipe.steps])
Save/load PyTorch model
try:
    import torch
    import torch.nn as nn

    class SimpleNet(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc = nn.Linear(4, 3)
        def forward(self, x):
            return self.fc(x)

    model = SimpleNet()

    # Best practice: save state_dict only
    torch.save(model.state_dict(), 'simple_net.pt')

    # Load
    loaded = SimpleNet()
    loaded.load_state_dict(torch.load('simple_net.pt', weights_only=True))
    loaded.eval()

    x = torch.randn(2, 4)
    print('Output:', loaded(x))
except ImportError:
    print('pip install torch')
Model versioning with metadata
import joblib, json, hashlib, time
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)
model = GradientBoostingClassifier(n_estimators=50, random_state=42)
model.fit(X, y)

# Save model
model_path = 'model_v1.joblib'
joblib.dump(model, model_path)

# Save metadata alongside
with open(model_path, 'rb') as f:
    checksum = hashlib.md5(f.read()).hexdigest()

metadata = {
    'model_class': type(model).__name__,
    'params': model.get_params(),
    'train_accuracy': model.score(X, y),
    'trained_at': time.strftime('%Y-%m-%dT%H:%M:%S'),
    'checksum': checksum,
}
with open('model_v1_meta.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print(json.dumps(metadata, indent=2))
💼 Real-World Scenario
A data science team trains a fraud detection model weekly. They need to save each version with metadata so they can roll back if a new model underperforms in production.
Real-World Code
import joblib, json, time, pathlib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

MODEL_DIR = pathlib.Path('model_registry')
MODEL_DIR.mkdir(exist_ok=True)

def save_model_version(model, version: str, metrics: dict):
    model_path = MODEL_DIR / f'model_{version}.joblib'
    meta_path  = MODEL_DIR / f'model_{version}_meta.json'
    joblib.dump(model, model_path)
    meta = {'version': version, 'trained_at': time.strftime('%Y-%m-%dT%H:%M:%S'),
            'model_class': type(model).__name__, **metrics}
    meta_path.write_text(json.dumps(meta, indent=2))
    print(f'Saved {model_path}')
    return meta

X, y = make_classification(n_samples=500, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
meta = save_model_version(model, 'v1.0', {'train_acc': model.score(X, y)})
print(json.dumps(meta, indent=2))
🏋️ Practice: Model Registry
Create a ModelRegistry class that tracks saved model versions, can list all versions, load a specific version, and return the best-performing one by accuracy.
Starter Code
import joblib, json, pathlib

class ModelRegistry:
    def __init__(self, registry_dir='registry'):
        self.dir = pathlib.Path(registry_dir)
        self.dir.mkdir(exist_ok=True)

    def save(self, model, version: str, accuracy: float):
        # TODO: save model and metadata json
        pass

    def list_versions(self) -> list:
        # TODO: return list of dicts with version + accuracy
        pass

    def load(self, version: str):
        # TODO: load and return model by version string
        pass

    def best(self):
        # TODO: return model with highest accuracy
        pass

# Test it
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
reg = ModelRegistry()
m = LogisticRegression(max_iter=200).fit(X, y)
reg.save(m, 'v1', m.score(X, y))
print(reg.list_versions())
✅ Practice Checklist
2. FastAPI Model Serving

Wrap ML models in REST APIs using FastAPI. Serve predictions, handle requests, add input validation with Pydantic, and test endpoints.

Minimal FastAPI prediction endpoint
# Save this as app.py and run: uvicorn app:app --reload
# pip install fastapi uvicorn scikit-learn joblib

APP_CODE = '''
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np

app = FastAPI(title='Iris Classifier')
model = joblib.load('iris_rf.joblib')  # load saved model

class IrisFeatures(BaseModel):
    sepal_length: float
    sepal_width:  float
    petal_length: float
    petal_width:  float

@app.post('/predict')
def predict(features: IrisFeatures):
    X = np.array([[features.sepal_length, features.sepal_width,
                   features.petal_length, features.petal_width]])
    pred = model.predict(X)[0]
    proba = model.predict_proba(X).max()
    return {'prediction': int(pred), 'confidence': float(proba)}
'''
print(APP_CODE)
Batch prediction endpoint
BATCH_API = '''
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import numpy as np, joblib

app = FastAPI()
model = joblib.load('iris_rf.joblib')

class SingleSample(BaseModel):
    features: List[float]

class BatchRequest(BaseModel):
    samples: List[SingleSample]

@app.post('/predict/batch')
def batch_predict(batch: BatchRequest):
    X = np.array([s.features for s in batch.samples])
    preds = model.predict(X).tolist()
    probas = model.predict_proba(X).max(axis=1).tolist()
    return [
        {'prediction': p, 'confidence': round(c, 4)}
        for p, c in zip(preds, probas)
    ]
'''
print(BATCH_API)
Testing API with requests
# Run your FastAPI server first, then test it:
try:
    import requests

    BASE_URL = 'http://localhost:8000'

    # Single prediction
    payload = {
        'sepal_length': 5.1, 'sepal_width': 3.5,
        'petal_length': 1.4, 'petal_width': 0.2
    }
    response = requests.post(f'{BASE_URL}/predict', json=payload, timeout=5)
    print('Status:', response.status_code)
    print('Response:', response.json())

    # Health check
    health = requests.get(f'{BASE_URL}/health', timeout=5)
    print('Health:', health.json())
except Exception as e:
    print(f'Server not running: {e}')
    print('Start with: uvicorn app:app --reload')
FastAPI with model hot-reload and health check
ADVANCED_API = '''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib, numpy as np, time, pathlib

app = FastAPI()
MODEL_PATH = 'iris_rf.joblib'
state = {'model': None, 'loaded_at': None}

def load_model():
    state['model'] = joblib.load(MODEL_PATH)
    state['loaded_at'] = time.strftime("%Y-%m-%dT%H:%M:%S")

@app.on_event('startup')
def startup():
    load_model()

@app.post('/reload')
def reload_model():
    load_model()
    return {'status': 'reloaded', 'at': state['loaded_at']}

@app.get('/health')
def health():
    return {'status': 'ok', 'model_loaded': state['model'] is not None,
            'loaded_at': state['loaded_at']}

@app.post('/predict')
def predict(x: list):
    if state['model'] is None:
        raise HTTPException(503, 'Model not loaded')
    return {'prediction': int(state['model'].predict([x])[0])}
'''
print(ADVANCED_API)
💼 Real-World Scenario
A retail company wants to serve their churn prediction model as a REST API so their CRM system can call it in real time to flag at-risk customers.
Real-World Code
# Churn prediction API skeleton
CHURN_API = '''
from fastapi import FastAPI
from pydantic import BaseModel, Field
from typing import Optional
import joblib, numpy as np

app = FastAPI(title='Churn Prediction API', version='1.0')
model = joblib.load('churn_model.joblib')

class CustomerFeatures(BaseModel):
    tenure_months:     int   = Field(..., ge=0, le=120)
    monthly_charges:   float = Field(..., ge=0)
    total_charges:     float = Field(..., ge=0)
    num_products:      int   = Field(..., ge=1, le=10)
    has_support_calls: bool

@app.post('/predict/churn')
def predict_churn(customer: CustomerFeatures):
    X = np.array([[
        customer.tenure_months, customer.monthly_charges,
        customer.total_charges, customer.num_products,
        int(customer.has_support_calls)
    ]])
    churn_prob = float(model.predict_proba(X)[0, 1])
    return {
        'churn_probability': round(churn_prob, 4),
        'risk_level': 'HIGH' if churn_prob > 0.7 else 'MEDIUM' if churn_prob > 0.4 else 'LOW'
    }
'''
print(CHURN_API)
🏋️ Practice: Sentiment API
Write a complete FastAPI app that accepts a text string and returns its VADER sentiment label and compound score. Include a /health endpoint.
Starter Code
# Save as sentiment_api.py
# Run: uvicorn sentiment_api:app --reload

SENTIMENT_API = '''
from fastapi import FastAPI
from pydantic import BaseModel
import nltk
nltk.download('vader_lexicon', quiet=True)
from nltk.sentiment.vader import SentimentIntensityAnalyzer

app = FastAPI()
# TODO: create SentimentIntensityAnalyzer instance

class TextRequest(BaseModel):
    text: str

# TODO: POST /analyze -> return label + compound score
# TODO: GET /health -> return {'status': 'ok'}
'''
print(SENTIMENT_API)
✅ Practice Checklist
3. Experiment Tracking with MLflow

Track experiments, log parameters and metrics, compare runs, and register models for deployment using MLflow.

Log a training run with MLflow
try:
    import mlflow
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, f1_score

    X, y = load_iris(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    mlflow.set_experiment('iris-classification')

    with mlflow.start_run(run_name='random-forest-v1'):
        params = {'n_estimators': 100, 'max_depth': 5, 'random_state': 42}
        mlflow.log_params(params)

        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)
        preds = model.predict(X_test)

        mlflow.log_metric('accuracy', accuracy_score(y_test, preds))
        mlflow.log_metric('f1',       f1_score(y_test, preds, average='weighted'))
        mlflow.sklearn.log_model(model, 'model')

    print('Run logged. View: mlflow ui')
except ImportError:
    print('pip install mlflow')
Comparing multiple runs
try:
    import mlflow
    from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score

    X, y = load_iris(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    mlflow.set_experiment('iris-comparison')

    models = [
        ('LogisticRegression', LogisticRegression(max_iter=200)),
        ('RandomForest',       RandomForestClassifier(n_estimators=50, random_state=42)),
        ('GradientBoosting',   GradientBoostingClassifier(n_estimators=50, random_state=42)),
    ]

    for name, clf in models:
        with mlflow.start_run(run_name=name):
            clf.fit(X_train, y_train)
            acc = accuracy_score(y_test, clf.predict(X_test))
            mlflow.log_metric('accuracy', acc)
            mlflow.log_param('model_type', name)
            print(f'{name}: {acc:.4f}')
except ImportError:
    print('pip install mlflow')
Autolog with sklearn
try:
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.datasets import load_breast_cancer
    from sklearn.model_selection import train_test_split

    mlflow.sklearn.autolog()  # auto-logs params, metrics, model

    X, y = load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

    with mlflow.start_run(run_name='autolog-demo'):
        model = GradientBoostingClassifier(n_estimators=100)
        model.fit(X_train, y_train)
        print('Autolog complete β€” everything captured automatically!')
        print('Accuracy:', model.score(X_test, y_test))
except ImportError:
    print('pip install mlflow')
Load a logged model from MLflow registry
try:
    import mlflow

    # After running an experiment, load the best run's model:
    # Option 1: by run_id
    # model = mlflow.sklearn.load_model('runs:/<run_id>/model')

    # Option 2: from Model Registry (after registering)
    # model = mlflow.sklearn.load_model('models:/IrisClassifier/Production')

    # Option 3: query best run programmatically
    client = mlflow.tracking.MlflowClient()
    experiment = client.get_experiment_by_name('iris-comparison')
    if experiment:
        runs = client.search_runs(
            experiment_ids=[experiment.experiment_id],
            order_by=['metrics.accuracy DESC'],
            max_results=1
        )
        if runs:
            best = runs[0]
            print(f'Best run: {best.info.run_id}')
            print(f'Accuracy: {best.data.metrics["accuracy"]:.4f}')
            # model = mlflow.sklearn.load_model(f'runs:/{best.info.run_id}/model')
        else:
            print('No runs found β€” run the comparison example first.')
    else:
        print('Experiment not found β€” run the comparison example first.')
except ImportError:
    print('pip install mlflow')
💼 Real-World Scenario
A team of data scientists is running hyperparameter sweeps for a credit scoring model. They need to track every experiment and promote the best model to production.
Real-World Code
try:
    import mlflow
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import roc_auc_score
    import itertools

    X, y = make_classification(n_samples=1000, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

    mlflow.set_experiment('credit-scoring-sweep')

    param_grid = list(itertools.product([50, 100], [3, 5], [0.05, 0.1]))
    best_auc, best_run_id = 0, None

    for n_est, depth, lr in param_grid:
        with mlflow.start_run():
            model = GradientBoostingClassifier(n_estimators=n_est, max_depth=depth, learning_rate=lr)
            model.fit(X_train, y_train)
            auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
            mlflow.log_params({'n_estimators': n_est, 'max_depth': depth, 'learning_rate': lr})
            mlflow.log_metric('roc_auc', auc)
            mlflow.sklearn.log_model(model, 'model')
            if auc > best_auc:
                best_auc = auc
                best_run_id = mlflow.active_run().info.run_id

    print(f'Best AUC: {best_auc:.4f} | Run: {best_run_id}')
except ImportError:
    print('pip install mlflow scikit-learn')
🏋️ Practice: Hyperparameter Sweep Logger
Run a grid search over n_estimators=[10,50,100] and max_depth=[3,5] for a RandomForest on the iris dataset, log each run to MLflow, then print the best run_id and accuracy.
Starter Code
try:
    import mlflow
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score

    X, y = load_iris(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

    mlflow.set_experiment('iris-sweep')

    # TODO: loop over n_estimators and max_depth
    # TODO: log params and accuracy in each run
    # TODO: track best_accuracy and best_run_id
    # TODO: print winner

except ImportError:
    print('pip install mlflow')
✅ Practice Checklist
4. Docker for ML Models

Package ML models and APIs in Docker containers for reproducible, portable deployment. Learn Dockerfiles, image building, and running containers.

Minimal Dockerfile for a FastAPI ML app
DOCKERFILE = '''
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY app.py .
COPY iris_rf.joblib .

# Expose port
EXPOSE 8000

# Start server
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

REQUIREMENTS = '''
fastapi==0.111.0
uvicorn==0.30.0
scikit-learn==1.5.0
joblib==1.4.2
numpy==1.26.4
'''

print('Dockerfile:')
print(DOCKERFILE)
print('requirements.txt:')
print(REQUIREMENTS)
Docker build and run commands
# Build and run Docker commands (run these in your terminal)

COMMANDS = '''
# Build image
docker build -t iris-api:v1 .

# Run container
docker run -d -p 8000:8000 --name iris-api iris-api:v1

# Test it
curl -X POST http://localhost:8000/predict \
  -H 'Content-Type: application/json' \
  -d '{"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}'

# View logs
docker logs iris-api

# Stop and remove
docker stop iris-api && docker rm iris-api

# Push to Docker Hub
docker tag iris-api:v1 yourusername/iris-api:v1
docker push yourusername/iris-api:v1
'''
print(COMMANDS)
Multi-stage Docker build (smaller image)
MULTISTAGE_DOCKERFILE = '''
# Stage 1: build dependencies
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# Stage 2: lean runtime image
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY app.py iris_rf.joblib ./

# Non-root user for security
RUN useradd -m appuser
USER appuser

EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print(MULTISTAGE_DOCKERFILE)
docker-compose for API + model server
COMPOSE_YAML = '''
# docker-compose.yml
version: "3.9"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL_PATH=/models/iris_rf.joblib
    volumes:
      - ./models:/models:ro
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    ports:
      - "5000:5000"
    command: mlflow server --host 0.0.0.0
    volumes:
      - mlflow_data:/mlflow

volumes:
  mlflow_data:
'''
print(COMPOSE_YAML)
💼 Real-World Scenario
A startup wants to deploy their recommendation model to AWS. They containerize it with Docker so it runs identically in dev, staging, and production.
Real-World Code
import pathlib

# Generate project structure for a dockerized ML API
files = {
    'Dockerfile': '''
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
''',
    'requirements.txt': 'fastapi\nuvicorn\nscikit-learn\njoblib\nnumpy\n',
    '.dockerignore': '__pycache__\n*.pyc\n.git\n*.ipynb\n',
    'app.py': '''
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np, os

app = FastAPI()
model = joblib.load(os.getenv('MODEL_PATH', 'model.joblib'))

class Request(BaseModel):
    features: list

@app.post('/predict')
def predict(req: Request):
    return {'prediction': int(model.predict([req.features])[0])}

@app.get('/health')
def health():
    return {'status': 'ok'}
''',
}

proj = pathlib.Path('ml_docker_project')
proj.mkdir(exist_ok=True)
for fname, content in files.items():
    (proj / fname).write_text(content.strip())
    print(f'Created: {proj / fname}')
🏋️ Practice: Dockerfile Generator
Write a Python function that generates a Dockerfile string given a list of pip packages, a Python version, and an entrypoint command.
Starter Code
def generate_dockerfile(packages: list, python_version: str = '3.11', entrypoint: str = 'python app.py') -> str:
    # TODO: generate FROM, WORKDIR, COPY requirements.txt, RUN pip install, COPY ., CMD
    pass

dockerfile = generate_dockerfile(
    packages=['fastapi', 'uvicorn', 'scikit-learn', 'joblib'],
    python_version='3.11',
    entrypoint='uvicorn app:app --host 0.0.0.0 --port 8000'
)
print(dockerfile)
✅ Practice Checklist
5. Model Monitoring & Data Drift

Monitor deployed models for performance degradation and data drift. Detect distribution shifts between training and production data.

Monitoring prediction distribution over time
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# Simulate production predictions over time
np.random.seed(42)
weeks = 8
# Week 1-4: stable; Week 5-8: drift
pred_probs = [
    np.random.beta(2, 5, 1000) for _ in range(4)  # stable
] + [
    np.random.beta(5, 2, 1000) for _ in range(4)  # drifted
]

mean_preds = [p.mean() for p in pred_probs]
print('Weekly mean prediction probabilities:')
for w, m in enumerate(mean_preds, 1):
    drift_flag = ' *** DRIFT DETECTED ***' if m > 0.5 else ''
    print(f'  Week {w}: {m:.3f}{drift_flag}')
KS test for feature distribution drift
from scipy import stats
import numpy as np

# Training data distribution
np.random.seed(42)
train_data = {
    'age':    np.random.normal(35, 10, 1000),
    'income': np.random.lognormal(10, 0.5, 1000),
}

# Production data (age drifted, income stable)
prod_data = {
    'age':    np.random.normal(45, 12, 500),  # shifted mean
    'income': np.random.lognormal(10, 0.5, 500),  # same
}

print('Kolmogorov-Smirnov Drift Test:')
for feature in train_data:
    ks_stat, p_value = stats.ks_2samp(train_data[feature], prod_data[feature])
    drift = 'DRIFT' if p_value < 0.05 else 'OK'
    print(f'  {feature}: KS={ks_stat:.3f}, p={p_value:.4f} -> {drift}')
Population Stability Index (PSI)
import numpy as np

def psi(expected, actual, bins=10):
    """Population Stability Index. PSI < 0.1 = stable, 0.1-0.25 = slight shift, > 0.25 = major shift."""
    breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
    breakpoints = np.unique(breakpoints)
    exp_counts = np.histogram(expected, bins=breakpoints)[0]
    act_counts = np.histogram(actual,   bins=breakpoints)[0]
    # Avoid division by zero
    exp_pct = np.where(exp_counts == 0, 0.0001, exp_counts / len(expected))
    act_pct = np.where(act_counts == 0, 0.0001, act_counts / len(actual))
    return np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))

np.random.seed(42)
train_scores = np.random.beta(2, 5, 1000)
stable_prod  = np.random.beta(2, 5, 500)     # same distribution
drifted_prod = np.random.beta(5, 2, 500)     # different distribution

print(f'PSI stable:  {psi(train_scores, stable_prod):.4f}')   # expect < 0.1
print(f'PSI drifted: {psi(train_scores, drifted_prod):.4f}')  # expect > 0.25
Alerting on accuracy degradation
import numpy as np
from collections import deque

class ModelMonitor:
    def __init__(self, window=100, threshold=0.05):
        self.window = window
        self.threshold = threshold
        self.baseline_acc = None
        self.recent = deque(maxlen=window)
        self.alerts = []

    def set_baseline(self, accuracy):
        self.baseline_acc = accuracy

    def log_prediction(self, pred, true_label):
        self.recent.append(int(pred == true_label))
        if len(self.recent) == self.window:
            current_acc = sum(self.recent) / self.window
            drop = self.baseline_acc - current_acc
            if drop > self.threshold:
                alert = f'ALERT: accuracy dropped {drop:.1%} (current={current_acc:.3f})'
                self.alerts.append(alert)
                print(alert)

np.random.seed(42)
monitor = ModelMonitor(window=50, threshold=0.05)
monitor.set_baseline(0.95)

# Good predictions, then degradation
for _ in range(100):
    monitor.log_prediction(1, 1 if np.random.rand() > 0.05 else 0)
for _ in range(100):
    monitor.log_prediction(1, 1 if np.random.rand() > 0.20 else 0)  # degraded
💼 Real-World Scenario
A bank's credit risk model needs continuous monitoring after deployment. Any significant accuracy drop or feature distribution shift should trigger a retraining alert.
Real-World Code
import numpy as np
from scipy import stats
from dataclasses import dataclass, field
from typing import Dict, List

@dataclass
class DriftReport:
    feature: str
    ks_stat: float
    p_value: float
    drifted: bool

class MLMonitor:
    def __init__(self, reference_data: Dict[str, np.ndarray], alpha: float = 0.05):
        self.reference = reference_data
        self.alpha = alpha

    def check_drift(self, current_data: Dict[str, np.ndarray]) -> List[DriftReport]:
        reports = []
        for feature, ref_vals in self.reference.items():
            if feature not in current_data:
                continue
            ks, p = stats.ks_2samp(ref_vals, current_data[feature])
            reports.append(DriftReport(feature, round(ks, 4), round(p, 4), p < self.alpha))
        return reports

np.random.seed(42)
reference = {'credit_score': np.random.normal(680, 80, 1000),
             'income':       np.random.lognormal(11, 0.4, 1000)}
current   = {'credit_score': np.random.normal(630, 90, 300),   # drifted
             'income':       np.random.lognormal(11, 0.4, 300)} # stable

monitor = MLMonitor(reference)
for report in monitor.check_drift(current):
    status = 'DRIFT' if report.drifted else 'OK'
    print(f'{report.feature}: KS={report.ks_stat}, p={report.p_value} -> {status}')
🏋️ Practice: Drift Dashboard
Compute PSI for 3 features between training and production data, print a table with PSI values and stability labels (stable/warning/critical).
Starter Code
import numpy as np

def psi(expected, actual, bins=10):
    breakpoints = np.unique(np.percentile(expected, np.linspace(0, 100, bins + 1)))
    exp_c = np.histogram(expected, bins=breakpoints)[0]
    act_c = np.histogram(actual,   bins=breakpoints)[0]
    e = np.where(exp_c == 0, 1e-4, exp_c / len(expected))
    a = np.where(act_c == 0, 1e-4, act_c / len(actual))
    return float(np.sum((a - e) * np.log(a / e)))

np.random.seed(42)
train = {'age': np.random.normal(35, 10, 1000),
         'income': np.random.lognormal(10, 0.5, 1000),
         'score': np.random.beta(2, 5, 1000)}
prod  = {'age': np.random.normal(45, 12, 500),   # drifted
         'income': np.random.lognormal(10, 0.5, 500),
         'score': np.random.beta(5, 2, 500)}      # drifted

# TODO: compute PSI for each feature
# TODO: print table: feature | PSI | stability label
✅ Practice Checklist
6. CI/CD for ML Pipelines

Automate model training, testing, and deployment with CI/CD. Learn to write GitHub Actions workflows and automated model validation.

Automated model validation script
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
import numpy as np, sys

ACCURACY_THRESHOLD = 0.95
F1_THRESHOLD = 0.94

def validate_model(model, X, y):
    """Return True if model passes all quality gates."""
    acc_scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')
    f1_scores  = cross_val_score(model, X, y, cv=5, scoring='f1_weighted')

    print(f'Accuracy: {acc_scores.mean():.4f} Β± {acc_scores.std():.4f}')
    print(f'F1:       {f1_scores.mean():.4f} Β± {f1_scores.std():.4f}')

    passed = acc_scores.mean() >= ACCURACY_THRESHOLD and f1_scores.mean() >= F1_THRESHOLD
    print('PASSED' if passed else 'FAILED')
    return passed

X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(n_estimators=100, random_state=42)
if not validate_model(model, X, y):
    sys.exit(1)  # Fail CI pipeline
GitHub Actions workflow for ML
GITHUB_ACTIONS_YAML = '''
# .github/workflows/ml_pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  train-and-validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Train model
        run: python train.py

      - name: Validate model
        run: python validate.py

      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: trained-model
          path: model.joblib
'''
print(GITHUB_ACTIONS_YAML)
Automated test suite for ML code
# tests/test_model.py (run with: pytest tests/)
import pytest, numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

TEST_CODE = '''
import pytest, numpy as np, joblib
from sklearn.datasets import load_iris

@pytest.fixture
def model():
    return joblib.load('iris_rf.joblib')

@pytest.fixture
def iris_data():
    X, y = load_iris(return_X_y=True)
    return X, y

def test_model_accuracy(model, iris_data):
    X, y = iris_data
    assert model.score(X, y) >= 0.95

def test_model_output_shape(model, iris_data):
    X, _ = iris_data
    preds = model.predict(X[:10])
    assert preds.shape == (10,)

def test_model_classes(model):
    assert len(model.classes_) == 3

def test_prediction_valid_class(model, iris_data):
    X, _ = iris_data
    preds = model.predict(X)
    assert set(preds).issubset({0, 1, 2})
'''
print(TEST_CODE)
Pre-deployment model comparison
import joblib
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

def evaluate(model, X_test, y_test):
    preds = model.predict(X_test)
    return {'accuracy': accuracy_score(y_test, preds),
            'f1': f1_score(y_test, preds, average='weighted')}

# Champion: currently deployed model
champion = RandomForestClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)
# Challenger: new candidate
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)

champ_metrics = evaluate(champion, X_test, y_test)
chall_metrics = evaluate(challenger, X_test, y_test)

print(f'Champion:   {champ_metrics}')
print(f'Challenger: {chall_metrics}')
promote = chall_metrics['f1'] > champ_metrics['f1']
print(f'Promote challenger: {promote}')
💼 Real-World Scenario
A team wants every pull request to automatically train and validate a model. If the model's F1 drops below 0.90, the PR is blocked from merging.
Real-World Code
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report
import sys, joblib

F1_GATE = 0.90

def train(X_train, y_train):
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    return model

def gate_check(model, X_test, y_test):
    preds = model.predict(X_test)
    f1 = f1_score(y_test, preds, average='weighted')
    print(classification_report(y_test, preds))
    if f1 < F1_GATE:
        print(f'FAILED: F1={f1:.4f} < threshold {F1_GATE}')
        sys.exit(1)
    print(f'PASSED: F1={f1:.4f}')
    return model

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = train(X_train, y_train)
validated_model = gate_check(model, X_test, y_test)
joblib.dump(validated_model, 'validated_model.joblib')
print('Model saved β€” ready for deployment.')
🏋️ Practice: Quality Gate Script
Write a validate.py script that loads a saved model, evaluates it against a test set, and exits with code 1 if accuracy < 0.92 or precision < 0.90.
Starter Code
import joblib, sys
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score

ACCURACY_GATE  = 0.92
PRECISION_GATE = 0.90

def validate(model_path: str):
    # TODO: load model from model_path
    # TODO: load iris, split 80/20
    # TODO: compute accuracy and weighted precision
    # TODO: print results
    # TODO: sys.exit(1) if below thresholds
    pass

validate('iris_rf.joblib')
✅ Practice Checklist
7. Feature Stores & Pipelines

Build reproducible feature engineering pipelines. Use sklearn Pipeline, ColumnTransformer, and custom transformers for production-ready preprocessing.

ColumnTransformer for mixed data
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

df = pd.DataFrame({
    'age':    [25, np.nan, 35, 42, 28],
    'income': [50000, 75000, np.nan, 90000, 62000],
    'city':   ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'gender': ['M', 'F', 'F', 'M', np.nan],
})

num_features = ['age', 'income']
cat_features = ['city', 'gender']

num_pipe = Pipeline([('impute', SimpleImputer(strategy='median')),
                     ('scale',  StandardScaler())])
cat_pipe = Pipeline([('impute', SimpleImputer(strategy='most_frequent')),
                     ('encode', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])

preprocessor = ColumnTransformer([
    ('num', num_pipe, num_features),
    ('cat', cat_pipe, cat_features),
])

X = preprocessor.fit_transform(df)
print('Transformed shape:', X.shape)
print('First row:', X[0].round(3))
Custom transformer with BaseEstimator
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

class LogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, offset=1):
        self.offset = offset

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.log(X + self.offset)

class OutlierClipper(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.lower_ = np.percentile(X, 1, axis=0)
        self.upper_ = np.percentile(X, 99, axis=0)
        return self

    def transform(self, X):
        return np.clip(X, self.lower_, self.upper_)

X = np.array([[1, 1000], [5, 5000], [100, 100000], [1, 50]])
pipe = Pipeline([('clip', OutlierClipper()), ('log', LogTransformer())])
print('Original:\n', X)
print('Transformed:\n', pipe.fit_transform(X).round(3))
Feature selection in pipeline
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import cross_val_score

X, y = load_breast_cancer(return_X_y=True)
print(f'Original features: {X.shape[1]}')

pipe = Pipeline([
    ('scaler',  StandardScaler()),
    ('select',  SelectKBest(f_classif, k=10)),
    ('clf',     RandomForestClassifier(n_estimators=100, random_state=42)),
])

scores = cross_val_score(pipe, X, y, cv=5, scoring='accuracy')
print(f'CV accuracy (10 features): {scores.mean():.4f} Β± {scores.std():.4f}')

pipe.fit(X, y)
selected = pipe['select'].get_support()
print(f'Selected feature indices: {selected.nonzero()[0]}')
Persisting and reusing a fitted pipeline
import joblib
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression

# Training data
train = pd.DataFrame({'age': [25,35,45,55], 'city': ['NYC','LA','NYC','LA'], 'target': [0,1,0,1]})
X_train = train.drop('target', axis=1)
y_train = train['target']

preprocessor = ColumnTransformer([
    ('num', StandardScaler(), ['age']),
    ('cat', OneHotEncoder(sparse_output=False), ['city']),
])
pipe = Pipeline([('prep', preprocessor), ('clf', LogisticRegression())])
pipe.fit(X_train, y_train)

# Save entire pipeline (includes fitted scaler + encoder)
joblib.dump(pipe, 'full_pipeline.joblib')

# Load and predict new data
loaded = joblib.load('full_pipeline.joblib')
new_data = pd.DataFrame({'age': [30, 50], 'city': ['LA', 'NYC']})
print('Predictions:', loaded.predict(new_data))
💼 Real-World Scenario
An insurance company needs to preprocess complex customer data (mixed types, missing values, categorical variables) reproducibly in both training and production serving.
Real-World Code
import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split

np.random.seed(42)
n = 500
df = pd.DataFrame({
    'age':        np.random.randint(18, 80, n).astype(float),
    'income':     np.random.lognormal(10, 0.5, n),
    'region':     np.random.choice(['North', 'South', 'East', 'West'], n),
    'vehicle_age':np.random.randint(0, 20, n).astype(float),
    'claim':      np.random.binomial(1, 0.15, n),
})
# Inject missing values
df.loc[np.random.choice(n, 30), 'age'] = np.nan
df.loc[np.random.choice(n, 20), 'region'] = np.nan

X = df.drop('claim', axis=1)
y = df['claim']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

num_pipe = Pipeline([('imp', SimpleImputer(strategy='median')), ('scl', StandardScaler())])
cat_pipe = Pipeline([('imp', SimpleImputer(strategy='most_frequent')),
                     ('enc', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])

prep = ColumnTransformer([
    ('num', num_pipe, ['age', 'income', 'vehicle_age']),
    ('cat', cat_pipe, ['region']),
])
full_pipe = Pipeline([('prep', prep), ('clf', GradientBoostingClassifier(random_state=42))])
full_pipe.fit(X_train, y_train)

print(f'Test accuracy: {full_pipe.score(X_test, y_test):.4f}')
joblib.dump(full_pipe, 'insurance_pipeline.joblib')
print('Pipeline saved.')
🏋️ Practice: Reusable Preprocessing Pipeline
Build a ColumnTransformer pipeline for a dataset with numeric columns (impute median + scale) and categorical columns (impute mode + one-hot encode), then save and reload it.
Starter Code
import pandas as pd, numpy as np, joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

data = pd.DataFrame({
    'tenure':   [12, np.nan, 36, 24, 6],
    'charges':  [50.0, 75.0, np.nan, 90.0, 45.0],
    'plan':     ['basic', 'premium', 'basic', np.nan, 'premium'],
    'country':  ['US', 'UK', 'US', 'CA', 'UK'],
})

# TODO: define num_pipe and cat_pipe
# TODO: combine with ColumnTransformer
# TODO: fit_transform data
# TODO: save and reload with joblib
# TODO: print transformed shape
✅ Practice Checklist
8. A/B Testing for ML Models

Safely roll out new models by running controlled experiments. Compare champion vs challenger using statistical tests to decide which to promote.

Traffic splitting for A/B model serving
import numpy as np
from dataclasses import dataclass
from typing import Any

@dataclass
class ABRouter:
    champion: Any
    challenger: Any
    challenger_pct: float = 0.1  # 10% traffic to challenger

    def predict(self, X):
        route = 'challenger' if np.random.rand() < self.challenger_pct else 'champion'
        model = self.challenger if route == 'challenger' else self.champion
        pred = model.predict([X])[0]
        return {'prediction': pred, 'model': route}

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)
champion   = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X, y)

router = ABRouter(champion, challenger, challenger_pct=0.2)
for _ in range(5):
    result = router.predict(X[0])
    print(f'Model: {result["model"]:12} | Prediction: {result["prediction"]}')
Collecting and comparing A/B results
import numpy as np
from collections import defaultdict
from scipy import stats

# Simulate A/B experiment: collect correct/incorrect per model
np.random.seed(42)
n_requests = 1000

results = defaultdict(list)
for _ in range(n_requests):
    group = 'challenger' if np.random.rand() < 0.2 else 'champion'
    # Simulated accuracy: champion=0.92, challenger=0.95
    accuracy = 0.95 if group == 'challenger' else 0.92
    correct = int(np.random.rand() < accuracy)
    results[group].append(correct)

for group, outcomes in results.items():
    print(f'{group}: n={len(outcomes)}, accuracy={np.mean(outcomes):.4f}')

# Two-proportion z-test
champ = results['champion']
chall = results['challenger']
_, p = stats.ttest_ind(champ, chall)
print(f'p-value: {p:.4f} -> {"significant" if p < 0.05 else "not significant"}')
Statistical significance with chi-squared test
import numpy as np
from scipy import stats

# Confusion matrix style: correct vs incorrect per model
champion_results   = {'correct': 460, 'incorrect': 40}   # n=500, 92%
challenger_results = {'correct': 190, 'incorrect': 10}   # n=200, 95%

# Chi-squared contingency test
observed = np.array([
    [champion_results['correct'],   champion_results['incorrect']],
    [challenger_results['correct'], challenger_results['incorrect']],
])

chi2, p, dof, expected = stats.chi2_contingency(observed)
print(f'Chi2: {chi2:.4f}')
print(f'p-value: {p:.4f}')
print(f'Significant at alpha=0.05: {p < 0.05}')
if p < 0.05:
    champ_acc = champion_results['correct'] / sum(champion_results.values())
    chall_acc = challenger_results['correct'] / sum(challenger_results.values())
    winner = 'challenger' if chall_acc > champ_acc else 'champion'
    print(f'Promote: {winner}')
Shadow mode deployment (no live impact)
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score

X, y = load_iris(return_X_y=True)
champion   = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X, y)

shadow_log = []

def serve_with_shadow(X_sample, true_label=None):
    """Serve champion, silently log challenger β€” no impact on users."""
    champ_pred = champion.predict([X_sample])[0]
    chall_pred = challenger.predict([X_sample])[0]  # shadow
    shadow_log.append({'champion': champ_pred, 'challenger': chall_pred,
                       'true': true_label})
    return champ_pred  # only champion's prediction served

for xi, yi in zip(X[:20], y[:20]):
    serve_with_shadow(xi, yi)

champ_acc = np.mean([l['champion'] == l['true'] for l in shadow_log])
chall_acc = np.mean([l['challenger'] == l['true'] for l in shadow_log])
print(f'Shadow test: champion={champ_acc:.2f}, challenger={chall_acc:.2f}')
💼 Real-World Scenario
An e-commerce company wants to test a new recommendation model against the current one. 10% of traffic goes to the new model; after 2 weeks they compare click-through rates statistically.
Real-World Code
import numpy as np
from scipy import stats
from dataclasses import dataclass, field
from typing import List

@dataclass
class ABExperiment:
    name: str
    challenger_pct: float = 0.1
    champion_outcomes: List[float] = field(default_factory=list)
    challenger_outcomes: List[float] = field(default_factory=list)

    def log(self, reward: float):
        group = 'challenger' if np.random.rand() < self.challenger_pct else 'champion'
        (self.challenger_outcomes if group == 'challenger' else self.champion_outcomes).append(reward)

    def report(self):
        champ = np.array(self.champion_outcomes)
        chall = np.array(self.challenger_outcomes)
        _, p = stats.ttest_ind(chall, champ)
        return {
            'champion_mean':   round(champ.mean(), 4),
            'challenger_mean': round(chall.mean(), 4),
            'p_value':         round(p, 4),
            'significant':     p < 0.05,
            'promote':         p < 0.05 and chall.mean() > champ.mean(),
        }

np.random.seed(42)
exp = ABExperiment('recommendation-v2', challenger_pct=0.1)
for _ in range(10000):
    # Simulate CTR: champion=5%, challenger=6%
    reward = np.random.binomial(1, 0.06) if np.random.rand() < 0.1 else np.random.binomial(1, 0.05)
    exp.log(reward)

import json
print(json.dumps(exp.report(), indent=2))
🏋️ Practice: A/B Test Analyzer
Given two lists of binary outcomes (0=failure, 1=success) for model A and model B, compute conversion rates, run a chi-squared test, and print whether to promote model B.
Starter Code
import numpy as np
from scipy import stats

np.random.seed(42)
model_a = np.random.binomial(1, 0.08, 5000)  # 8% conversion
model_b = np.random.binomial(1, 0.10, 1000)  # 10% conversion

def analyze_ab_test(a_outcomes, b_outcomes, alpha=0.05):
    # TODO: compute conversion rates
    # TODO: chi-squared test
    # TODO: print results and promote/keep decision
    pass

analyze_ab_test(model_a, model_b)
✅ Practice Checklist
9. Model Explainability

Make black-box models interpretable. Use SHAP values, permutation importance, and LIME to explain individual predictions and global feature importance.

Permutation feature importance
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

data = load_breast_cancer()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42)

importance_df = pd.DataFrame({
    'feature':   data.feature_names,
    'importance': result.importances_mean,
    'std':        result.importances_std,
}).sort_values('importance', ascending=False)

print(importance_df.head(10).to_string(index=False))
SHAP values for tree models
try:
    import shap
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.datasets import load_breast_cancer
    from sklearn.model_selection import train_test_split

    data = load_breast_cancer()
    X_train, X_test, y_train, y_test = train_test_split(
        data.data, data.target, random_state=42)

    model = GradientBoostingClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_test[:5])

    print('SHAP values for first test sample:')
    for feat, val in sorted(zip(data.feature_names, shap_values[0]),
                            key=lambda x: abs(x[1]), reverse=True)[:5]:
        print(f'  {feat:<35} {val:+.4f}')
except ImportError:
    print('pip install shap')
LIME for individual prediction explanation
try:
    import lime
    from lime.lime_tabular import LimeTabularExplainer
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import load_breast_cancer
    from sklearn.model_selection import train_test_split

    data = load_breast_cancer()
    X_train, X_test, y_train, y_test = train_test_split(
        data.data, data.target, random_state=42)

    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    explainer = LimeTabularExplainer(
        X_train, feature_names=data.feature_names,
        class_names=data.target_names, mode='classification')

    exp = explainer.explain_instance(X_test[0], model.predict_proba, num_features=5)
    print('Prediction:', model.predict([X_test[0]])[0], '->', data.target_names[model.predict([X_test[0]])[0]])
    print('\nTop contributing features:')
    for feat, weight in exp.as_list():
        print(f'  {feat[:45]:<45} {weight:+.4f}')
except ImportError:
    print('pip install lime')
Partial dependence plots (sklearn)
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import PartialDependenceDisplay
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

data = load_breast_cancer()
X_train, X_test, y_train, _ = train_test_split(data.data, data.target, random_state=42)

model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Plot partial dependence for top 2 features
fig, ax = plt.subplots(figsize=(10, 4))
PartialDependenceDisplay.from_estimator(
    model, X_train, features=[0, 1],
    feature_names=data.feature_names, ax=ax)
plt.tight_layout()
plt.savefig('pdp.png', dpi=80)
plt.close()
print('PDP saved to pdp.png')
print(f'Feature 0: {data.feature_names[0]}')
print(f'Feature 1: {data.feature_names[1]}')
💼 Real-World Scenario
A mortgage lender must explain why a loan was denied. They use SHAP to generate a plain-English explanation of the top factors for each decision.
Real-World Code
try:
    import shap
    import numpy as np
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split

    np.random.seed(42)
    n = 500
    feature_names = ['credit_score', 'income', 'debt_ratio', 'loan_amount', 'employment_years']
    X = np.column_stack([
        np.random.normal(680, 80, n),
        np.random.lognormal(11, 0.5, n),
        np.random.beta(2, 5, n),
        np.random.lognormal(12, 0.3, n),
        np.random.uniform(0, 30, n),
    ])
    y = (X[:, 0] > 700).astype(int) ^ (X[:, 2] > 0.4).astype(int)

    X_train, X_test = train_test_split(X, random_state=42)
    y_train, y_test = train_test_split(y, random_state=42)

    model = GradientBoostingClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    explainer = shap.TreeExplainer(model)
    shap_vals = explainer.shap_values(X_test[:3])

    for i, (sv, pred) in enumerate(zip(shap_vals, model.predict(X_test[:3]))):
        decision = 'APPROVED' if pred == 1 else 'DENIED'
        print(f'Application {i+1}: {decision}')
        factors = sorted(zip(feature_names, sv), key=lambda x: abs(x[1]), reverse=True)[:3]
        for feat, val in factors:
            direction = 'increased' if val > 0 else 'decreased'
            print(f'  {feat} {direction} approval chance by {abs(val):.4f}')
        print()
except ImportError:
    print('pip install shap')
🏋️ Practice: Feature Importance Report
Train a GradientBoostingClassifier on the iris dataset, compute permutation importance, and print a ranked table of features with their mean importance Β± std.
Starter Code
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    data.data, data.target, test_size=0.2, random_state=42)

# TODO: train GradientBoostingClassifier
# TODO: compute permutation_importance on X_test
# TODO: create DataFrame with feature, importance, std
# TODO: sort by importance descending and print
✅ Practice Checklist
10. End-to-End ML Project Structure

Organize a full ML project with proper structure, configuration management, reproducible training scripts, and a deployment-ready layout.

Standard ML project layout
PROJECT_STRUCTURE = '''
ml_project/
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ raw/          # Original, immutable data
β”‚   β”œβ”€β”€ processed/    # Cleaned and feature-engineered data
β”‚   └── external/     # Third-party data
β”œβ”€β”€ notebooks/        # Jupyter notebooks for exploration
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ data.py       # Data loading and preprocessing
β”‚   β”œβ”€β”€ features.py   # Feature engineering
β”‚   β”œβ”€β”€ train.py      # Model training
β”‚   β”œβ”€β”€ evaluate.py   # Model evaluation
β”‚   └── predict.py    # Inference functions
β”œβ”€β”€ models/           # Saved model artifacts
β”œβ”€β”€ tests/            # Unit and integration tests
β”œβ”€β”€ api/
β”‚   └── app.py        # FastAPI serving
β”œβ”€β”€ config.yaml       # Configuration file
β”œβ”€β”€ requirements.txt
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ Makefile
└── README.md
'''
print(PROJECT_STRUCTURE)
Configuration management with YAML
try:
    import yaml
    from dataclasses import dataclass

    CONFIG_YAML = '''
model:
  type: GradientBoostingClassifier
  params:
    n_estimators: 100
    max_depth: 5
    learning_rate: 0.1
    random_state: 42

data:
  train_path: data/processed/train.csv
  test_path:  data/processed/test.csv
  target_col: label
  test_size:  0.2

training:
  experiment_name: fraud-detection-v2
  cross_val_folds: 5
  metrics: [accuracy, f1_weighted, roc_auc]

serving:
  host: 0.0.0.0
  port: 8000
  model_path: models/best_model.joblib
'''

    config = yaml.safe_load(CONFIG_YAML)
    print('Model type:', config['model']['type'])
    print('n_estimators:', config['model']['params']['n_estimators'])
    print('Experiment:', config['training']['experiment_name'])
except ImportError:
    print('pip install pyyaml')
Reproducible training script
# src/train.py pattern
TRAIN_SCRIPT = '''
import argparse, joblib, json, time, pathlib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report

def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument('--n-estimators', type=int, default=100)
    p.add_argument('--max-depth',    type=int, default=5)
    p.add_argument('--lr',           type=float, default=0.1)
    p.add_argument('--output-dir',   default='models')
    return p.parse_args()

def main():
    args = parse_args()
    X, y = load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
    model = GradientBoostingClassifier(
        n_estimators=args.n_estimators, max_depth=args.max_depth,
        learning_rate=args.lr, random_state=42)
    model.fit(X_train, y_train)
    preds = model.predict(X_test)
    print(classification_report(y_test, preds))
    out = pathlib.Path(args.output_dir)
    out.mkdir(exist_ok=True)
    joblib.dump(model, out / 'model.joblib')
    print(f'Model saved to {out}/model.joblib')

if __name__ == "__main__":
    main()
'''
print(TRAIN_SCRIPT)
Makefile for ML workflows
MAKEFILE = '''
# Makefile
.PHONY: install train test serve docker-build docker-run

install:
\tpip install -r requirements.txt

train:
\tpython src/train.py --n-estimators 100 --max-depth 5

validate:
\tpython src/validate.py --model-path models/model.joblib

test:
\tpytest tests/ -v --cov=src --cov-report=term-missing

serve:
\tuvicorn api.app:app --host 0.0.0.0 --port 8000 --reload

docker-build:
\tdocker build -t ml-api:latest .

docker-run:
\tdocker run -p 8000:8000 ml-api:latest

clean:
\tfind . -name __pycache__ -exec rm -rf {} + 2>/dev/null; true
\trm -f models/*.joblib
'''
print(MAKEFILE)
💼 Real-World Scenario
A team onboarding a new ML engineer needs a fully scaffolded project with training scripts, config files, tests, and a Makefile so they can reproduce results with one command.
Real-World Code
import pathlib, json

def scaffold_ml_project(name: str):
    """Generate a reproducible ML project skeleton."""
    root = pathlib.Path(name)
    dirs = ['data/raw', 'data/processed', 'notebooks', 'src', 'models', 'tests', 'api']
    for d in dirs:
        (root / d).mkdir(parents=True, exist_ok=True)

    files = {
        'src/__init__.py': '',
        'tests/__init__.py': '',
        'requirements.txt': 'scikit-learn\njoblib\nfastapi\nuvicorn\nmlflow\npyyaml\npytest\n',
        'config.yaml': 'model:\n  type: RandomForestClassifier\n  params:\n    n_estimators: 100\n',
        'src/train.py': '# Training script\nif __name__ == "__main__":\n    print("Train model here")\n',
        'api/app.py': 'from fastapi import FastAPI\napp = FastAPI()\n@app.get("/health")\ndef health():\n    return {"status": "ok"}\n',
        '.gitignore': '__pycache__/\n*.joblib\n.env\nmlruns/\n',
    }
    for fpath, content in files.items():
        (root / fpath).write_text(content)

    print(f'Project {name!r} scaffolded:')
    for p in sorted(root.rglob('*')):
        indent = '  ' * (len(p.relative_to(root).parts) - 1)
        print(f'{indent}{p.name}{"" if p.is_file() else "/"}')

scaffold_ml_project('my_ml_project')
🏋️ Practice: Project Scaffolder
Extend the scaffold_ml_project function to also generate a Dockerfile, a Makefile with install/train/test/serve targets, and a src/evaluate.py stub.
Starter Code
import pathlib

def scaffold_ml_project(name: str):
    root = pathlib.Path(name)
    dirs = ['data/raw', 'data/processed', 'src', 'tests', 'models', 'api']
    for d in dirs:
        (root / d).mkdir(parents=True, exist_ok=True)

    # TODO: add Dockerfile content
    # TODO: add Makefile with install/train/test/serve targets
    # TODO: add src/evaluate.py with a validate() stub
    # TODO: write all files and print the directory tree
    pass

scaffold_ml_project('demo_project')
✅ Practice Checklist
11. Model Compression & Optimization

Reduce model size and latency through quantization, pruning, knowledge distillation, and ONNX export for production deployment.

Post-training quantization concept
import numpy as np

# Simulate 32-bit float weights -> 8-bit integer quantization
np.random.seed(42)
weights_f32 = np.random.randn(64, 64).astype(np.float32)

def quantize_int8(tensor):
    scale = (tensor.max() - tensor.min()) / 255
    zero_point = int(-tensor.min() / scale)
    q = np.clip(np.round(tensor / scale + zero_point), 0, 255).astype(np.uint8)
    return q, scale, zero_point

def dequantize(q_tensor, scale, zero_point):
    return scale * (q_tensor.astype(np.float32) - zero_point)

q, scale, zp = quantize_int8(weights_f32)
recovered = dequantize(q, scale, zp)

f32_bytes = weights_f32.nbytes
int8_bytes = q.nbytes
err = np.abs(weights_f32 - recovered).mean()

print(f'Original (float32): {f32_bytes:,} bytes')
print(f'Quantized (int8):   {int8_bytes:,} bytes')
print(f'Compression ratio:  {f32_bytes/int8_bytes:.1f}x')
print(f'Mean absolute error: {err:.6f}')
print(f'Relative error:      {err/np.abs(weights_f32).mean():.2%}')
Weight pruning with magnitude threshold
import numpy as np

np.random.seed(0)
weight_matrix = np.random.randn(128, 64)

def prune_weights(weights, sparsity=0.5):
    """Zero out smallest abs-value weights to achieve target sparsity."""
    flat   = np.abs(weights).flatten()
    thresh = np.percentile(flat, sparsity * 100)
    mask   = np.abs(weights) > thresh
    pruned = weights * mask
    actual_sparsity = 1 - mask.mean()
    return pruned, mask, actual_sparsity

for target in [0.3, 0.5, 0.7, 0.9]:
    pruned, mask, actual = prune_weights(weight_matrix, sparsity=target)
    nnz      = mask.sum()
    orig_err = np.linalg.norm(weight_matrix - pruned, 'fro')
    print(f'Sparsity {target:.0%}: {nnz:,} non-zero, '
          f'Frobenius error={orig_err:.3f}, '
          f'Memory reduction={actual:.1%}')

print('\nNote: sparse storage (CSR/COO) needed to realize memory savings.')
print('Effective with magnitude-based unstructured pruning + sparse matmul.')
Knowledge distillation: teacher -> student
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
split = 1600
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]

# Teacher: large GBM
teacher = GradientBoostingClassifier(n_estimators=100, max_depth=5, random_state=42)
teacher.fit(X_tr, y_tr)
teacher_acc = accuracy_score(y_te, teacher.predict(X_te))

# Soft labels (temperature scaling)
soft_labels = teacher.predict_proba(X_tr)[:, 1]
# Binarize with threshold 0.5 (soft to hard for sklearn student)
soft_hard = (soft_labels > 0.5).astype(int)

# Student: shallow decision tree trained on soft labels
student_distill = DecisionTreeClassifier(max_depth=4, random_state=42)
student_distill.fit(X_tr, soft_hard)

student_direct = DecisionTreeClassifier(max_depth=4, random_state=42)
student_direct.fit(X_tr, y_tr)

print(f'Teacher (GBM 100 trees): {teacher_acc:.3f}')
print(f'Student (distilled):     {accuracy_score(y_te, student_distill.predict(X_te)):.3f}')
print(f'Student (direct):        {accuracy_score(y_te, student_direct.predict(X_te)):.3f}')
print(f'\nTeacher params: ~{100*5**5:,} nodes est.')
print(f'Student params: ~{2**4:,} leaves')
ONNX export and inference simulation
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
import json

np.random.seed(0)
X, y = make_classification(n_samples=500, n_features=10, random_state=0)
model = LogisticRegression(max_iter=200).fit(X, y)

# Simulate ONNX export by serializing model parameters
def export_to_dict(model, feature_names=None):
    return {
        'type': 'LogisticRegression',
        'coef': model.coef_.tolist(),
        'intercept': model.intercept_.tolist(),
        'classes': model.classes_.tolist(),
        'n_features': model.n_features_in_,
    }

def infer_from_dict(model_dict, X):
    coef      = np.array(model_dict['coef'])
    intercept = np.array(model_dict['intercept'])
    logit = X @ coef.T + intercept
    proba = 1 / (1 + np.exp(-logit))
    return (proba > 0.5).astype(int).flatten()

exported = export_to_dict(model)
print('Exported model (JSON):') ; print(json.dumps({k: str(v)[:40] for k, v in exported.items()}, indent=2))
pred_orig   = model.predict(X[:5])
pred_onnx   = infer_from_dict(exported, X[:5])
print(f'\nOriginal predictions: {pred_orig}')
print(f'Simulated inference:  {pred_onnx}')
print(f'Match: {np.all(pred_orig == pred_onnx)}')
💼 Real-World Scenario
Deploy a fraud detection model to edge devices with 512MB RAM. The original XGBoost model is 80MB and takes 50ms per prediction. Apply pruning and quantization to reduce size to under 10MB with <10ms latency while maintaining >95% of original AUC.
Real-World Code
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import make_classification
from sklearn.metrics import roc_auc_score
import time

np.random.seed(1)
X, y = make_classification(n_samples=5000, n_features=30, random_state=1)
split = 4000
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]

# Teacher
teacher = GradientBoostingClassifier(n_estimators=100, max_depth=4, random_state=1)
teacher.fit(X_tr, y_tr)
t0 = time.perf_counter()
teacher_proba = teacher.predict_proba(X_te)[:,1]
t_teacher = (time.perf_counter() - t0) * 1000

# Student (distilled)
soft = (teacher.predict_proba(X_tr)[:,1] > 0.5).astype(int)
student = DecisionTreeClassifier(max_depth=6, random_state=1).fit(X_tr, soft)
t0 = time.perf_counter()
student_proba = student.predict_proba(X_te)[:,1]
t_student = (time.perf_counter() - t0) * 1000

print(f'Teacher AUC: {roc_auc_score(y_te, teacher_proba):.4f} | Time: {t_teacher:.1f}ms')
print(f'Student AUC: {roc_auc_score(y_te, student_proba):.4f} | Time: {t_student:.1f}ms')
print(f'Speed-up: {t_teacher/max(t_student,0.001):.1f}x')
🏋️ Practice: Benchmark Quantization Trade-offs
Implement int4 quantization (4-bit) and int8 quantization for a random 256x256 weight matrix. For each, compute: compression ratio, mean absolute reconstruction error, and max error. Plot a bar chart comparing compression ratios. Discuss which is better for accuracy-sensitive vs memory-sensitive deployments.
Starter Code
import numpy as np

np.random.seed(42)
W = np.random.randn(256, 256).astype(np.float32)

def quantize(tensor, bits):
    # TODO: quantize to `bits`-bit unsigned integer
    # levels = 2**bits
    # Compute scale and zero_point, quantize, dequantize
    pass

for bits in [4, 8, 16]:
    recovered, compression = quantize(W, bits)
    # TODO: print bits, compression ratio, MAE, max error
    pass
✅ Practice Checklist
12. Feature Stores & Data Pipelines

Design and implement feature stores, data versioning, and reproducible pipelines for consistent feature serving between training and inference.

In-memory feature store implementation
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List

class FeatureStore:
    def __init__(self):
        self._store = {}  # entity_id -> {feature_name: (value, timestamp)}

    def write(self, entity_id: str, features: dict, timestamp=None):
        if timestamp is None:
            timestamp = datetime.utcnow()
        if entity_id not in self._store:
            self._store[entity_id] = {}
        for k, v in features.items():
            self._store[entity_id][k] = (v, timestamp)

    def read(self, entity_id: str, feature_names: List[str]) -> dict:
        if entity_id not in self._store:
            return {f: None for f in feature_names}
        return {
            f: self._store[entity_id].get(f, (None, None))[0]
            for f in feature_names
        }

    def get_training_dataset(self, entities: List[str], features: List[str]) -> pd.DataFrame:
        rows = [{'entity_id': eid, **self.read(eid, features)} for eid in entities]
        return pd.DataFrame(rows)

# Usage
fs = FeatureStore()
for uid, feats in [
    ('user_1', {'age': 28, 'spend_30d': 150.0, 'logins_7d': 5}),
    ('user_2', {'age': 35, 'spend_30d': 400.0, 'logins_7d': 12}),
    ('user_3', {'age': 22, 'spend_30d': 50.0,  'logins_7d': 1}),
]:
    fs.write(uid, feats)

df = fs.get_training_dataset(['user_1', 'user_2', 'user_3'], ['age', 'spend_30d', 'logins_7d'])
print(df.to_string(index=False))
print('\nPoint lookup:', fs.read('user_2', ['spend_30d', 'logins_7d']))
Reproducible pipeline with pandas and hashing
import pandas as pd
import numpy as np
import hashlib, json

class ReproduciblePipeline:
    def __init__(self, steps: list, params: dict):
        self.steps  = steps
        self.params = params
        self._history = []

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        self._history = []
        for name, fn in self.steps:
            df = fn(df, self.params)
            self._history.append({'step': name, 'rows': len(df), 'cols': list(df.columns)})
        return df

    def fingerprint(self) -> str:
        blob = json.dumps({'params': self.params, 'steps': [s[0] for s in self.steps]}, sort_keys=True)
        return hashlib.md5(blob.encode()).hexdigest()[:12]

# Define pipeline steps
def drop_nulls(df, params): return df.dropna()
def filter_age(df, params): return df[df['age'] >= params['min_age']]
def add_feature(df, params): df = df.copy(); df['age_squared'] = df['age']**2; return df

np.random.seed(42)
df_raw = pd.DataFrame({'age': np.random.randint(16, 70, 100), 'spend': np.random.rand(100)*1000})
df_raw.loc[[5, 23, 67], 'age'] = np.nan

pipeline = ReproduciblePipeline(
    steps=[('drop_nulls', drop_nulls), ('filter_age', filter_age), ('add_feature', add_feature)],
    params={'min_age': 21}
)
result = pipeline.run(df_raw)
print(f'Pipeline fingerprint: {pipeline.fingerprint()}')
print(f'Input rows: {len(df_raw)}, Output rows: {len(result)}')
for step in pipeline._history:
    print(f'  After {step["step"]}: {step["rows"]} rows, {len(step["cols"])} cols')
Data versioning with checksums
import pandas as pd
import numpy as np
import hashlib, json
from datetime import datetime

class DataVersion:
    registry = []

    @classmethod
    def snapshot(cls, df: pd.DataFrame, name: str, metadata: dict = None) -> str:
        checksum = hashlib.md5(pd.util.hash_pandas_object(df).values.tobytes()).hexdigest()[:12]
        version  = {'name': name, 'checksum': checksum, 'shape': df.shape,
                    'timestamp': str(datetime.utcnow())[:19],
                    'columns': list(df.columns), 'metadata': metadata or {}}
        cls.registry.append(version)
        return checksum

    @classmethod
    def show_lineage(cls):
        for v in cls.registry:
            print(f"{v['timestamp']} | {v['name']:<20} | {str(v['shape']):<12} | cksum={v['checksum']}")

np.random.seed(7)
df_v1 = pd.DataFrame({'x': np.random.randn(100), 'y': np.random.randint(0,2,100)})
DataVersion.snapshot(df_v1, 'raw_data', {'source': 'sensor_A'})

df_v2 = df_v1.dropna()
DataVersion.snapshot(df_v2, 'cleaned', {'action': 'dropna'})

df_v3 = df_v2.copy(); df_v3['x_scaled'] = (df_v3['x'] - df_v3['x'].mean()) / df_v3['x'].std()
DataVersion.snapshot(df_v3, 'features_v1', {'scaler': 'standard'})

DataVersion.show_lineage()
Train/serve skew detection
import numpy as np
from scipy import stats

# Detect distribution shift between training features and production features
np.random.seed(42)

# Training distribution
train_features = {
    'age':     np.random.normal(35, 10, 1000),
    'income':  np.random.lognormal(10, 0.5, 1000),
    'score':   np.random.beta(2, 5, 1000),
}

# Production (simulated drift)
prod_features = {
    'age':    np.random.normal(35, 10, 500),    # same
    'income': np.random.lognormal(10.3, 0.5, 500),  # shifted
    'score':  np.random.beta(3, 3, 500),          # different shape
}

print('Train/Serve Skew Detection (KS Test):')
print(f'{"Feature":<12} {"KS stat":>10} {"p-value":>10} {"Drifted?":>10}')
print('-' * 48)
for feat in train_features:
    ks_stat, p_val = stats.ks_2samp(train_features[feat], prod_features[feat])
    drifted = 'YES' if p_val < 0.05 else 'no'
    print(f'{feat:<12} {ks_stat:>10.4f} {p_val:>10.4f} {drifted:>10}')
💼 Real-World Scenario
An ML team at a fintech company needs to ensure that the 30+ features used at training time are identical to those served at inference time. Build a feature registry that validates feature schemas, detects train/serve skew, and logs feature versions.
Real-World Code
import pandas as pd
import numpy as np
from scipy import stats

class FeatureRegistry:
    def __init__(self):
        self._schemas  = {}   # name -> {'dtype', 'mean', 'std'}
        self._versions = []

    def register(self, name: str, series: pd.Series):
        self._schemas[name] = {
            'dtype': str(series.dtype),
            'mean':  float(series.mean()),
            'std':   float(series.std()),
            'min':   float(series.min()),
            'max':   float(series.max()),
        }
        self._versions.append({'name': name, 'n': len(series)})

    def validate(self, name: str, series: pd.Series, alpha: float = 0.05):
        if name not in self._schemas:
            raise ValueError(f'Feature {name!r} not registered.')
        ref = self._schemas[name]
        issues = []
        if str(series.dtype) != ref['dtype']:
            issues.append(f'dtype mismatch: {series.dtype} vs {ref["dtype"]}')
        z = abs(series.mean() - ref['mean']) / max(ref['std'], 1e-6)
        if z > 3:
            issues.append(f'mean shift: z={z:.2f}')
        return issues or ['OK']

np.random.seed(5)
reg = FeatureRegistry()
for feat, vals in [('age', pd.Series(np.random.normal(35,10,1000))),
                   ('income', pd.Series(np.random.lognormal(10,0.5,1000)))]:
    reg.register(feat, vals)

# Serve time validation
prod_age    = pd.Series(np.random.normal(35, 10, 200))
prod_income = pd.Series(np.random.lognormal(10.8, 0.5, 200))  # drifted
print('age validation:   ', reg.validate('age', prod_age))
print('income validation:', reg.validate('income', prod_income))
🏋️ Practice: Build a Feature Pipeline with Drift Monitoring
Create a FeatureStore class that: (1) stores training feature distributions (mean, std, min, max), (2) at serve time detects distribution drift using Z-score on mean (threshold: |z| > 2), (3) logs a warning for drifted features. Test with 3 features where one has significant drift.
Starter Code
import numpy as np
import pandas as pd

class MonitoredFeatureStore:
    def __init__(self):
        self.train_stats = {}  # feature -> {mean, std, min, max}

    def fit(self, df: pd.DataFrame):
        # TODO: compute and store stats for each column
        pass

    def validate(self, df: pd.DataFrame, z_threshold: float = 2.0) -> dict:
        # TODO: compare serve-time stats to training stats
        # Return dict: {feature: 'OK' or 'DRIFT (z=X.X)'}
        pass

np.random.seed(0)
train = pd.DataFrame({'age': np.random.normal(30,8,1000), 'salary': np.random.normal(60000,10000,1000), 'score': np.random.beta(2,5,1000)})
serve = pd.DataFrame({'age': np.random.normal(30,8,200),  'salary': np.random.normal(75000,10000,200),  'score': np.random.beta(2,5,200)})
store = MonitoredFeatureStore()
store.fit(train)
print(store.validate(serve))
✅ Practice Checklist
13. AutoML & Hyperparameter Optimization

Automate model selection and hyperparameter tuning using grid search, random search, Bayesian optimization (Optuna), and automated feature engineering.

Grid search vs random search comparison
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import f1_score
import time

np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth':    [3, 5, 7, None],
    'min_samples_split': [2, 5, 10],
    'max_features': ['sqrt', 'log2'],
}

# Grid search: exhaustive
t0 = time.perf_counter()
gs = GridSearchCV(RandomForestClassifier(random_state=0), param_grid, cv=3, scoring='f1', n_jobs=-1)
gs.fit(X, y)
grid_time = time.perf_counter() - t0

# Random search: n_iter combinations
t0 = time.perf_counter()
rs = RandomizedSearchCV(RandomForestClassifier(random_state=0), param_grid, n_iter=10, cv=3, scoring='f1', n_jobs=-1, random_state=42)
rs.fit(X, y)
rand_time = time.perf_counter() - t0

print(f'Grid Search:   {gs.best_score_:.4f} | {3*4*3*2} combos | {grid_time:.1f}s')
print(f'Random Search: {rs.best_score_:.4f} | 10 combos  | {rand_time:.1f}s')
print(f'Speed-up: {grid_time/rand_time:.1f}x')
print(f'Best (random): {rs.best_params_}')
Bayesian optimization with Optuna
try:
    import optuna
    import numpy as np
    from sklearn.datasets import make_classification
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import cross_val_score

    optuna.logging.set_verbosity(optuna.logging.WARNING)
    np.random.seed(42)
    X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

    def objective(trial):
        n_est   = trial.suggest_int('n_estimators', 50, 300)
        depth   = trial.suggest_int('max_depth', 2, 8)
        lr      = trial.suggest_float('learning_rate', 0.01, 0.3, log=True)
        subsamp = trial.suggest_float('subsample', 0.5, 1.0)
        model   = GradientBoostingClassifier(n_estimators=n_est, max_depth=depth, learning_rate=lr, subsample=subsamp, random_state=0)
        scores  = cross_val_score(model, X, y, cv=3, scoring='f1')
        return scores.mean()

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=20)
    print(f'Best F1:     {study.best_value:.4f}')
    print(f'Best params: {study.best_params}')
    print(f'Total trials: {len(study.trials)}')
except ImportError:
    print('pip install optuna')
    print('Optuna uses Tree-structured Parzen Estimator (TPE) to model')
    print('P(x|good) and P(x|bad) and samples from P(x|good) region.')
Automated feature engineering with polynomial features
import numpy as np
from sklearn.datasets import make_regression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

np.random.seed(42)
X, y = make_regression(n_samples=500, n_features=5, noise=10, random_state=42)

best_score, best_degree = -np.inf, 1
results = []
for degree in [1, 2, 3]:
    pipe = Pipeline([
        ('poly', PolynomialFeatures(degree=degree, include_bias=False)),
        ('ridge', Ridge(alpha=1.0))
    ])
    scores = cross_val_score(pipe, X, y, cv=5, scoring='r2')
    results.append((degree, scores.mean(), pipe.named_steps['poly'].fit_transform(X).shape[1]))
    if scores.mean() > best_score:
        best_score, best_degree = scores.mean(), degree

print('Polynomial Feature Engineering Results:')
print(f'{"Degree":<10} {"R2 (CV)":<12} {"Features":<10}')
for deg, r2, n_feat in results:
    star = ' <-- best' if deg == best_degree else ''
    print(f'{deg:<10} {r2:<12.4f} {n_feat:<10}{star}')
print(f'\nOriginal features: {X.shape[1]} -> Best: {results[best_degree-1][2]}')
Early stopping and learning curve analysis
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Train with staged prediction to simulate early stopping
gbm = GradientBoostingClassifier(n_estimators=200, max_depth=3, learning_rate=0.05, random_state=0)
gbm.fit(X_tr, y_tr)

train_scores, val_scores = [], []
for y_pred_tr, y_pred_val in zip(
    gbm.staged_predict(X_tr), gbm.staged_predict(X_val)
):
    train_scores.append(f1_score(y_tr, y_pred_tr))
    val_scores.append(f1_score(y_val, y_pred_val))

best_iter = np.argmax(val_scores)
print(f'Best iteration: {best_iter+1} (val F1={val_scores[best_iter]:.4f})')
print(f'Final iteration val F1: {val_scores[-1]:.4f}')
fig, ax = plt.subplots(figsize=(9, 4))
ax.plot(train_scores, label='Train')
ax.plot(val_scores, label='Validation')
ax.axvline(best_iter, color='red', linestyle='--', label=f'Best iter={best_iter+1}')
ax.set_xlabel('Boosting rounds'); ax.set_ylabel('F1 Score')
ax.set_title('Learning Curves + Early Stopping')
ax.legend(); plt.tight_layout()
plt.savefig('learning_curves.png', dpi=80); plt.close(); print('Saved learning_curves.png')
💼 Real-World Scenario
An ML team wants to find the best model for a churn prediction task without manually tuning. Run Bayesian HPO over 3 model types (Logistic Regression, Random Forest, GBM) with 30 total Optuna trials. Return the best model, params, and cross-val AUC.
Real-World Code
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

np.random.seed(0)
X, y = make_classification(n_samples=2000, n_features=25, random_state=0)

# Manual multi-model search (Optuna-style pseudocode)
best_score, best_config = -np.inf, None
np.random.seed(99)
for trial in range(30):
    model_type = np.random.choice(['lr', 'rf', 'gbm'])
    if model_type == 'lr':
        C   = np.random.choice([0.01, 0.1, 1.0, 10.0])
        clf = LogisticRegression(C=C, max_iter=200)
        cfg = {'type': 'lr', 'C': C}
    elif model_type == 'rf':
        n   = np.random.choice([50, 100, 200])
        d   = np.random.choice([3, 5, None])
        clf = RandomForestClassifier(n_estimators=n, max_depth=d, random_state=0)
        cfg = {'type': 'rf', 'n_estimators': n, 'max_depth': d}
    else:
        lr  = np.random.choice([0.05, 0.1, 0.2])
        clf = GradientBoostingClassifier(learning_rate=lr, random_state=0)
        cfg = {'type': 'gbm', 'lr': lr}
    score = cross_val_score(clf, X, y, cv=3, scoring='roc_auc').mean()
    if score > best_score:
        best_score, best_config = score, cfg

print(f'Best AUC: {best_score:.4f}')
print(f'Best config: {best_config}')
🏋️ Practice: AutoML Model Selection with Optuna
Using make_classification (1000 samples, 15 features), implement a 20-trial Optuna study that searches over: (1) model type (RF or GBM), (2) n_estimators (50-200), (3) max_depth (2-8), (4) learning_rate for GBM (0.01-0.3 log scale). Maximize 3-fold CV ROC-AUC. Report best trial and params.
Starter Code
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

try:
    import optuna; optuna.logging.set_verbosity(optuna.logging.WARNING)
except ImportError:
    print('pip install optuna'); exit()

np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=15, random_state=42)

def objective(trial):
    # TODO: suggest model_type (categorical: 'rf' or 'gbm')
    # TODO: suggest n_estimators, max_depth
    # TODO: if gbm, suggest learning_rate (log scale)
    # TODO: return 3-fold CV ROC-AUC
    pass

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print('Best AUC:', study.best_value)
print('Best params:', study.best_params)
✅ Practice Checklist
14. Model Monitoring & Data Drift Detection

Data Drift Detection with Population Stability Index
import numpy as np
def psi(expected, actual, n_bins=10):
    # Population Stability Index between two distributions.
    bins = np.percentile(expected, np.linspace(0, 100, n_bins+1))
    bins[0] -= 1e-6; bins[-1] += 1e-6
    e_counts = np.histogram(expected, bins=bins)[0] / len(expected)
    a_counts = np.histogram(actual,   bins=bins)[0] / len(actual)
    e_counts = np.where(e_counts == 0, 1e-6, e_counts)
    a_counts = np.where(a_counts == 0, 1e-6, a_counts)
    psi_val = np.sum((a_counts - e_counts) * np.log(a_counts / e_counts))
    return psi_val
np.random.seed(42)
reference = np.random.normal(0, 1, 1000)
stable    = np.random.normal(0.1, 1.0, 500)   # slight shift
drifted   = np.random.normal(1.5, 1.5, 500)   # significant drift
print(f"PSI (stable):  {psi(reference, stable):.4f}  (<0.1: no drift)")
print(f"PSI (drifted): {psi(reference, drifted):.4f}  (>0.25: severe drift)")
Model Performance Degradation Monitoring
import numpy as np
from sklearn.metrics import accuracy_score
np.random.seed(1)
def simulate_batch(shift=0.0, n=200):
    X = np.random.randn(n, 5) + shift
    y_true = (X[:, 0] + X[:, 1] > shift).astype(int)
    # Simulated model trained on no-shift data
    y_pred = (X[:, 0] + X[:, 1] > 0).astype(int)
    return accuracy_score(y_true, y_pred)
print("Model Monitoring Dashboard")
print(f"{'Week':<6} {'Accuracy':<10} {'Status'}")
print("-" * 30)
baseline_acc = simulate_batch(shift=0.0)
for week in range(1, 9):
    shift = (week - 1) * 0.15
    acc = simulate_batch(shift=shift)
    delta = acc - baseline_acc
    status = "OK" if abs(delta) < 0.05 else ("WARN" if abs(delta) < 0.10 else "ALERT")
    print(f"  {week:<4} {acc:.4f}    {status}")
Kolmogorov-Smirnov Drift Test
import numpy as np
from scipy import stats
np.random.seed(5)
# Simulate feature distributions: train vs production batches
train_dist  = np.random.normal(50, 10, 2000)
prod_no_drift = np.random.normal(50.5, 10.2, 500)
prod_drift    = np.random.normal(58.0, 12.0, 500)
features = {
    "no_drift_batch": prod_no_drift,
    "drifted_batch":  prod_drift,
}
for name, prod in features.items():
    ks_stat, p_val = stats.ks_2samp(train_dist, prod)
    drift = "DRIFT DETECTED" if p_val < 0.05 else "stable"
    print(f"{name:<20}: KS={ks_stat:.4f}, p={p_val:.6f} -> {drift}")
💼 Real-World Scenario
Deployed ML model for loan approval: monitor weekly for data drift in applicant features (income, credit score, age) using PSI, and alert the ML team when any feature shows PSI > 0.25.
Real-World Code
import numpy as np
def psi(ref, curr, n_bins=10):
    bins = np.percentile(ref, np.linspace(0, 100, n_bins+1))
    bins[0] -= 1e-6; bins[-1] += 1e-6
    e = np.histogram(ref,  bins=bins)[0] / len(ref)
    a = np.histogram(curr, bins=bins)[0] / len(curr)
    e = np.where(e == 0, 1e-6, e)
    a = np.where(a == 0, 1e-6, a)
    return float(np.sum((a - e) * np.log(a / e)))
np.random.seed(99)
# Training distribution
train_income = np.random.lognormal(10.8, 0.4, 5000)
train_credit = np.random.normal(680, 50, 5000)
train_age    = np.random.normal(38, 12, 5000)
print("Loan Model Feature Drift Report")
print(f"{'Feature':<15} {'PSI':<8} {'Status'}")
print("-" * 35)
weeks_drift = [0, 0.05, 0.12, 0.22, 0.35]  # progressive drift scenario
for week, drift in enumerate(weeks_drift, 1):
    prod_income = np.random.lognormal(10.8 + drift, 0.4 + drift*0.1, 500)
    prod_credit = np.random.normal(680 - drift*40, 50, 500)
    prod_age    = np.random.normal(38 + drift*2, 12, 500)
    results = {
        "income": psi(train_income, prod_income),
        "credit_score": psi(train_credit, prod_credit),
        "age": psi(train_age, prod_age),
    }
    max_psi_feat = max(results, key=results.get)
    max_psi = results[max_psi_feat]
    status = "ALERT" if max_psi > 0.25 else ("WARN" if max_psi > 0.10 else "OK")
    print(f"Week {week}: {max_psi_feat:<12} PSI={max_psi:.3f}  [{status}]")
🏋️ Practice: Multi-Feature Drift Dashboard
For a churn prediction model with 6 features (age, tenure, monthly_spend, num_products, is_active, region_encoded), simulate 8 weekly production batches where drift gradually increases in tenure and monthly_spend starting at week 4. Compute PSI and KS-test for each feature each week. Build a summary table and flag weeks where total PSI > 0.5.
Starter Code
import numpy as np
from scipy import stats
def psi(ref, curr, n_bins=10):
    bins = np.percentile(ref, np.linspace(0, 100, n_bins+1))
    bins[0] -= 1e-6; bins[-1] += 1e-6
    e = np.histogram(ref,  bins=bins)[0] / len(ref)
    a = np.histogram(curr, bins=bins)[0] / len(curr)
    e = np.where(e == 0, 1e-6, e)
    a = np.where(a == 0, 1e-6, a)
    return float(np.sum((a - e) * np.log(a / e)))
np.random.seed(7)
n_train = 3000
features = {
    "age":           np.random.normal(35, 10, n_train),
    "tenure":        np.random.exponential(24, n_train),
    "monthly_spend": np.random.lognormal(4.5, 0.5, n_train),
    "num_products":  np.random.poisson(2.5, n_train).astype(float),
    "is_active":     np.random.binomial(1, 0.7, n_train).astype(float),
    "region":        np.random.randint(0, 5, n_train).astype(float),
}
# TODO: Generate 8 weekly batches of 300 samples (drift in tenure+spend from week 4)
# TODO: Compute PSI and KS-test p-value for each feature each week
# TODO: Flag weeks where total PSI > 0.5
# TODO: Print formatted weekly report table
✅ Practice Checklist
15. Feature Stores & Data Engineering Pipelines

Feature Store Simulation with Feast-like API
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# Simulate a simple feature store
class SimpleFeatureStore:
    def __init__(self):
        self._store = {}
    def ingest(self, entity_id, features, timestamp):
        self._store[(entity_id, timestamp)] = features
    def get_historical(self, entity_ids, feature_names, as_of=None):
        results = []
        for eid in entity_ids:
            matching = {ts: feats for (e, ts), feats in self._store.items() if e == eid}
            if not matching: continue
            latest_ts = max(matching.keys())
            feats = matching[latest_ts]
            row = {"entity_id": eid, "timestamp": latest_ts}
            row.update({k: feats.get(k) for k in feature_names})
            results.append(row)
        return pd.DataFrame(results)
fs = SimpleFeatureStore()
# Ingest user features
for user_id in range(1, 6):
    fs.ingest(user_id, {"age": np.random.randint(20, 60),
                        "spend_30d": round(np.random.uniform(50, 500), 2),
                        "logins_7d": np.random.randint(1, 30)},
              datetime.now() - timedelta(hours=np.random.randint(1, 48)))
result = fs.get_historical([1, 2, 3], ["age", "spend_30d", "logins_7d"])
print(result.to_string(index=False))
Data Pipeline with pandas + Validation
import pandas as pd
import numpy as np
np.random.seed(0)
# Raw data ingestion with quality checks
def build_training_pipeline(df_raw):
    report = {"input_rows": len(df_raw), "issues": []}
    # Null check
    null_pct = df_raw.isnull().mean()
    for col, pct in null_pct.items():
        if pct > 0.1:
            report["issues"].append(f"{col}: {pct:.1%} nulls")
    # Range validation
    if "age" in df_raw:
        invalid = ((df_raw["age"] < 0) | (df_raw["age"] > 120)).sum()
        if invalid: report["issues"].append(f"age: {invalid} out-of-range values")
    df = df_raw.dropna().copy()
    df = df[(df["age"].between(0, 120)) & (df["income"] > 0)]
    report["output_rows"] = len(df)
    report["drop_rate"] = 1 - len(df)/len(df_raw)
    return df, report
raw = pd.DataFrame({
    "age": np.random.choice([*np.random.randint(18,80,90), *[-1]*5, *[np.nan]*5], 100),
    "income": np.random.choice([*np.random.lognormal(10,0.5,85), *[np.nan]*15], 100),
})
clean, report = build_training_pipeline(raw)
print("Pipeline Report:", report)
Training Data Versioning (DVC-style)
import hashlib, json, os
from datetime import datetime
class DataVersioner:
    # Simple data versioning inspired by DVC.
    def __init__(self, registry_path="data_registry.json"):
        self.registry_path = registry_path
        self.registry = {}
    def hash_data(self, data_str):
        return hashlib.md5(data_str.encode()).hexdigest()[:12]
    def register(self, name, data_str, metadata=None):
        version_id = self.hash_data(data_str)
        entry = {
            "name": name, "version": version_id,
            "timestamp": datetime.now().isoformat()[:19],
            "size_bytes": len(data_str),
            "metadata": metadata or {}
        }
        self.registry[version_id] = entry
        print(f"Registered: {name} v{version_id}")
        return version_id
    def lineage(self, version_id):
        entry = self.registry.get(version_id, {})
        print(f"Lineage for {version_id}: {json.dumps(entry, indent=2)}")
dv = DataVersioner()
v1 = dv.register("train_features", "col1,col2\n1,2\n3,4\n5,6", {"n_rows": 3, "split": "train"})
v2 = dv.register("train_features", "col1,col2\n1,2\n3,4\n5,6\n7,8", {"n_rows": 4, "split": "train"})
dv.lineage(v1)
💼 Real-World Scenario
Recommendation system: maintain a feature store with user engagement features (clicks_7d, purchase_30d, category_affinity) updated hourly, served to the model at inference time with <10ms latency.
Real-World Code
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# Simulate feature store for recommendation system
class RecoFeatureStore:
    def __init__(self):
        self.user_features = {}
        self.item_features = {}
    def update_user(self, user_id, features):
        self.user_features[user_id] = {**features, "updated_at": datetime.now().isoformat()}
    def update_item(self, item_id, features):
        self.item_features[item_id] = {**features, "updated_at": datetime.now().isoformat()}
    def get_feature_vector(self, user_id, item_id):
        u = self.user_features.get(user_id, {})
        i = self.item_features.get(item_id, {})
        return {
            "user_clicks_7d": u.get("clicks_7d", 0),
            "user_purchase_30d": u.get("purchase_30d", 0),
            "item_avg_rating": i.get("avg_rating", 3.0),
            "item_purchase_count": i.get("purchase_count", 0),
            "affinity": u.get("category_affinity", {}).get(i.get("category"), 0.0)
        }
fs = RecoFeatureStore()
# Populate store
for uid in range(1, 6):
    fs.update_user(uid, {
        "clicks_7d": np.random.randint(5, 100),
        "purchase_30d": np.random.randint(0, 10),
        "category_affinity": {"electronics": np.random.uniform(0,1),
                               "books": np.random.uniform(0,1)}
    })
for iid in range(1, 4):
    fs.update_item(iid, {"avg_rating": np.random.uniform(3,5),
                          "purchase_count": np.random.randint(10,1000),
                          "category": np.random.choice(["electronics","books"])})
print("Feature vectors for serving:")
for uid in [1, 2]:
    for iid in [1, 2, 3]:
        fv = fs.get_feature_vector(uid, iid)
        print(f"  user={uid}, item={iid}: {fv}")
🏋️ Practice: ETL Pipeline with Schema Validation
Build a data pipeline that: (1) generates synthetic raw customer data with intentional quality issues (nulls, wrong types, out-of-range values), (2) validates schema using pandas dtype checks and range assertions, (3) cleans and transforms, (4) logs each step with row counts, (5) outputs a data quality report with pass/fail for each check.
Starter Code
import pandas as pd
import numpy as np
np.random.seed(33)
# Generate raw data with issues
n = 500
raw = pd.DataFrame({
    "customer_id": range(n),
    "age": np.random.choice([*np.random.randint(18,80,460), *[-5]*20, *[np.nan]*20], n),
    "revenue": np.random.choice([*np.random.lognormal(6,1,450), *[np.nan]*50], n),
    "segment": np.random.choice(["A","B","C","INVALID",None], n),
    "signup_date": pd.date_range("2020-01-01", periods=n, freq="D")
})
# TODO: Schema validation (dtypes, ranges, allowed values)
# TODO: Cleaning steps (impute/drop nulls, fix dtypes, filter invalid segments)
# TODO: Log each step with before/after row counts
# TODO: Output data quality report DataFrame
✅ Practice Checklist
16. A/B Testing & Canary Deployments for ML

A/B Test Significance Calculator
import numpy as np
from scipy import stats
def ab_test(control_conv, control_n, treat_conv, treat_n, alpha=0.05):
    # Two-proportion z-test for A/B testing.
    p_c = control_conv / control_n
    p_t = treat_conv  / treat_n
    p_pool = (control_conv + treat_conv) / (control_n + treat_n)
    se = np.sqrt(p_pool * (1-p_pool) * (1/control_n + 1/treat_n))
    z = (p_t - p_c) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z)))
    ci = (p_t - p_c) + np.array([-1, 1]) * stats.norm.ppf(1-alpha/2) * se
    lift = (p_t - p_c) / p_c
    return {"z": z, "p_value": p_value, "lift": lift, "ci_95": ci,
            "significant": p_value < alpha}
result = ab_test(control_conv=120, control_n=2000, treat_conv=155, treat_n=2000)
print(f"Conversion: control={120/2000:.3f}, treat={155/2000:.3f}")
print(f"Lift: {result['lift']:+.2%}")
print(f"p-value: {result['p_value']:.4f}")
print(f"95% CI: [{result['ci_95'][0]:.4f}, {result['ci_95'][1]:.4f}]")
print(f"Significant: {result['significant']}")
Canary Deployment Traffic Routing
import numpy as np
np.random.seed(42)
class CanaryRouter:
    def __init__(self, canary_pct=0.10):
        self.canary_pct = canary_pct
        self.metrics = {"control": [], "canary": []}
    def route(self):
        return "canary" if np.random.random() < self.canary_pct else "control"
    def record(self, model, latency_ms, error=False):
        self.metrics[model].append({"latency": latency_ms, "error": error})
    def report(self):
        for model, records in self.metrics.items():
            latencies = [r["latency"] for r in records]
            errors = [r["error"] for r in records]
            print(f"{model:<8}: n={len(records):4d}, avg_lat={np.mean(latencies):.1f}ms, "
                  f"p99={np.percentile(latencies,99):.1f}ms, err_rate={np.mean(errors):.3f}")
router = CanaryRouter(canary_pct=0.10)
for _ in range(2000):
    m = router.route()
    lat = np.random.lognormal(3.5 if m=="control" else 3.3, 0.4)
    err = np.random.random() < (0.02 if m=="control" else 0.025)
    router.record(m, lat, err)
router.report()
Multi-Armed Bandit for Continuous Optimization
import numpy as np
np.random.seed(7)
# Thompson Sampling for online model selection
class ThompsonBandit:
    def __init__(self, n_arms):
        self.alpha = np.ones(n_arms)
        self.beta  = np.ones(n_arms)
    def select(self):
        return np.argmax(np.random.beta(self.alpha, self.beta))
    def update(self, arm, reward):
        self.alpha[arm] += reward
        self.beta[arm]  += (1 - reward)
# 3 model versions with different true conversion rates
true_rates = [0.05, 0.08, 0.06]
bandit = ThompsonBandit(n_arms=3)
counts = np.zeros(3, dtype=int)
rewards_total = 0
for t in range(5000):
    arm = bandit.select()
    reward = int(np.random.random() < true_rates[arm])
    bandit.update(arm, reward)
    counts[arm] += 1
    rewards_total += reward
print(f"Total conversions: {rewards_total} / 5000 = {rewards_total/5000:.4f}")
for i in range(3):
    print(f"  Model {i+1} (true={true_rates[i]:.2f}): selected {counts[i]:4d} times ({counts[i]/5000:.1%})")
💼 Real-World Scenario
ML-powered pricing engine: run A/B tests comparing new pricing model (15% canary) vs current model, with Thompson Sampling auto-routing more traffic to the better performer after statistical significance is reached.
Real-World Code
import numpy as np
from scipy import stats
np.random.seed(21)
# Combined: Canary + statistical testing + bandit
true_revenue_control = 45.0   # $45 avg order value
true_revenue_canary  = 47.5   # $47.5 with new pricing
class PricingExperiment:
    def __init__(self, canary_pct=0.15):
        self.canary_pct = canary_pct
        self.control_revenues = []
        self.canary_revenues  = []
        self.alpha = np.array([1.0, 1.0])  # Thompson beta params
        self.beta  = np.array([1.0, 1.0])
    def route(self):
        # Start as pure A/B, shift to bandit after significance
        if len(self.control_revenues) < 200:
            return "canary" if np.random.random() < self.canary_pct else "control"
        return "canary" if np.argmax(np.random.beta(self.alpha, self.beta)) == 1 else "control"
    def observe(self, model):
        if model == "control":
            rev = np.random.normal(true_revenue_control, 12)
            self.control_revenues.append(rev)
            self.alpha[0] += max(rev/100, 0); self.beta[0] += max(1-rev/100, 0)
        else:
            rev = np.random.normal(true_revenue_canary, 12)
            self.canary_revenues.append(rev)
            self.alpha[1] += max(rev/100, 0); self.beta[1] += max(1-rev/100, 0)
    def check_significance(self):
        if len(self.control_revenues) < 30 or len(self.canary_revenues) < 30:
            return None
        t, p = stats.ttest_ind(self.canary_revenues, self.control_revenues)
        return p
exp = PricingExperiment()
for i in range(3000):
    m = exp.route()
    exp.observe(m)
    if i % 500 == 499:
        p = exp.check_significance()
        print(f"t={i+1}: control=${np.mean(exp.control_revenues):.2f}, "
              f"canary=${np.mean(exp.canary_revenues):.2f}, p={p:.4f}")
🏋️ Practice: Online Experiment Platform
Build a complete A/B testing framework that: (1) assigns users to control/treatment with hash-based deterministic routing, (2) collects metrics (conversion, revenue) per variant, (3) computes p-values and confidence intervals daily, (4) auto-declares a winner when p<0.05 and min_sample=500/arm, (5) logs results to a DataFrame. Simulate 10,000 user sessions over 14 days.
Starter Code
import numpy as np
import pandas as pd
from scipy import stats
np.random.seed(55)
# Experiment settings
TRUE_CONV_CONTROL = 0.04
TRUE_CONV_TREAT   = 0.048
TRUE_REV_CONTROL  = 30.0
TRUE_REV_TREAT    = 31.5
MIN_SAMPLE = 500
def assign(user_id):
    # Deterministic hash-based assignment
    return "treatment" if hash(f"exp1_{user_id}") % 2 == 0 else "control"
# TODO: Simulate 10,000 users over 14 days (approx 714/day)
# TODO: Track conversion and revenue per variant per day
# TODO: Daily significance check with two-proportion z-test
# TODO: Auto-declare winner when p<0.05 and n>=500/arm
# TODO: Log results to DataFrame with columns: day, n_control, n_treat, p_value, winner
✅ Practice Checklist
17. MLflow Experiment Tracking

MLflow provides a unified platform to log parameters, metrics, and artifacts across ML experiments for reproducibility and comparison.

Basic MLflow Logging
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

mlflow.set_experiment("iris_classification")

with mlflow.start_run(run_name="rf_baseline"):
    n_estimators = 100
    max_depth = 5
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)

    model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)

    acc = accuracy_score(y_test, model.predict(X_test))
    mlflow.log_metric("accuracy", acc)
    mlflow.sklearn.log_model(model, "model")
    print(f"Run logged. Accuracy: {acc:.4f}")
MLflow Autolog & Compare Runs
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

mlflow.set_experiment("breast_cancer_comparison")

# autolog captures all params/metrics automatically
mlflow.sklearn.autolog()

models = {
    "random_forest": RandomForestClassifier(n_estimators=50, random_state=42),
    "gradient_boost": GradientBoostingClassifier(n_estimators=50, random_state=42)
}

for name, model in models.items():
    with mlflow.start_run(run_name=name):
        model.fit(X_train, y_train)
        print(f"{name}: {model.score(X_test, y_test):.4f}")

# Query runs
runs = mlflow.search_runs(experiment_names=["breast_cancer_comparison"])
print(runs[["run_id", "metrics.training_accuracy_score"]].head())
💼 Real-World Scenario
Your team is training multiple ML models with different hyperparameters daily. You need to track experiments, compare results, and reproduce the best model.
Real-World Code
import mlflow, mlflow.sklearn
from sklearn.svm import SVC
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split, cross_val_score
import numpy as np

X, y = load_wine(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

mlflow.set_experiment("wine_svm_tuning")
mlflow.sklearn.autolog()

for C in [0.1, 1.0, 10.0]:
    for kernel in ["rbf", "linear"]:
        with mlflow.start_run(run_name=f"svm_C{C}_{kernel}"):
            model = SVC(C=C, kernel=kernel)
            cv_scores = cross_val_score(model, X_train, y_train, cv=5)
            model.fit(X_train, y_train)
            mlflow.log_param("C", C)
            mlflow.log_param("kernel", kernel)
            mlflow.log_metric("cv_mean", np.mean(cv_scores))
            mlflow.log_metric("cv_std", np.std(cv_scores))
            print(f"C={C}, kernel={kernel}: CV={np.mean(cv_scores):.4f}Β±{np.std(cv_scores):.4f}")

best = mlflow.search_runs(order_by=["metrics.cv_mean DESC"]).iloc[0]
print(f"Best run: {best['run_id']}, CV={best['metrics.cv_mean']:.4f}")
🏋️ Practice: Log experiment parameters and metrics
Use mlflow.start_run() to log params, metrics, and models for multiple hyperparameter combinations.
Starter Code
import mlflow
mlflow.set_experiment("practice")
with mlflow.start_run():
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_metric("loss", 0.25)
    print("Logged!")
✅ Practice Checklist
18. Data Versioning with DVC

DVC (Data Version Control) tracks datasets and model files alongside Git, enabling reproducible ML pipelines and dataset sharing.

DVC Pipeline Setup
# DVC workflow (run in terminal, shown as subprocess here)
import subprocess, os

# Initialize DVC in a git repo
# subprocess.run(["dvc", "init"])

# Track a large dataset file
# subprocess.run(["dvc", "add", "data/train.csv"])
# This creates data/train.csv.dvc and adds data/train.csv to .gitignore

# Configure remote storage
# subprocess.run(["dvc", "remote", "add", "-d", "myremote", "s3://my-bucket/dvc"])

# Push data to remote
# subprocess.run(["dvc", "push"])

# Pull data on another machine
# subprocess.run(["dvc", "pull"])

# DVC pipeline in dvc.yaml:
pipeline_yaml = (
    "stages:
"
    "  prepare:
"
    "    cmd: python prepare.py
"
    "    deps:
"
    "      - data/raw.csv
"
    "    outs:
"
    "      - data/processed.csv
"
    "  train:
"
    "    cmd: python train.py
"
    "    deps:
"
    "      - data/processed.csv
"
    "    outs:
"
    "      - models/model.pkl
"
    "    metrics:
"
    "      - metrics.json
"
)
print("DVC pipeline structure:")
print(pipeline_yaml)
Python DVC API
# Using DVC Python API for programmatic data management
# pip install dvc dvc-s3

import json, hashlib
from pathlib import Path

# Simulate DVC-style hash tracking
def compute_md5(filepath):
    h = hashlib.md5()
    with open(filepath, "rb") as f:
        h.update(f.read())
    return h.hexdigest()

# Example: track dataset versions manually
import pandas as pd
import numpy as np

# Create sample datasets
df_v1 = pd.DataFrame(np.random.randn(1000, 5), columns=[f"f{i}" for i in range(5)])
df_v2 = pd.concat([df_v1, pd.DataFrame(np.random.randn(200, 5), columns=df_v1.columns)])

# Save versions
df_v1.to_csv("/tmp/data_v1.csv", index=False)
df_v2.to_csv("/tmp/data_v2.csv", index=False)

# Track metadata
meta = {
    "v1": {"rows": len(df_v1), "md5": compute_md5("/tmp/data_v1.csv")},
    "v2": {"rows": len(df_v2), "md5": compute_md5("/tmp/data_v2.csv")},
}
print(json.dumps(meta, indent=2))
print(f"Row delta v1β†’v2: +{meta['v2']['rows'] - meta['v1']['rows']}")
💼 Real-World Scenario
Your ML team uses the same datasets but different preprocessing steps. You need reproducible pipelines where changing the data version automatically reruns downstream stages.
Real-World Code
# Simulate DVC-style pipeline tracking
import hashlib, json
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def md5(df):
    return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()

# Stage 1: Raw data
np.random.seed(42)
raw = pd.DataFrame({"x1": np.random.randn(500), "x2": np.random.randn(500),
                     "y": np.random.randint(0, 2, 500)})

# Stage 2: Preprocessing
scaler = StandardScaler()
X = scaler.fit_transform(raw[["x1", "x2"]])
y = raw["y"]

# Stage 3: Train
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = LogisticRegression()
model.fit(X_train, y_train)
acc = accuracy_score(y_test, model.predict(X_test))

# Log pipeline metadata
pipeline_state = {
    "raw_hash": md5(raw),
    "n_samples": len(raw),
    "accuracy": round(acc, 4)
}
print(json.dumps(pipeline_state, indent=2))
🏋️ Practice: Create reproducible data pipeline
Track dataset hash, preprocessing steps, and metrics together to ensure pipeline reproducibility.
Starter Code
import hashlib, pandas as pd, numpy as np
df = pd.DataFrame({"x": np.random.randn(100), "y": np.random.randint(0,2,100)})
h = hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()
print(f"Dataset hash: {h}, rows: {len(df)}")
✅ Practice Checklist
19. Model Registry & Lifecycle Management

A model registry tracks model versions, manages stage transitions (Staging β†’ Production), and provides a central hub for model governance.

MLflow Model Registry
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

mlflow.set_experiment("registry_demo")
model_name = "IrisClassifier"

with mlflow.start_run() as run:
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    acc = model.score(X_test, y_test)
    mlflow.log_metric("accuracy", acc)
    mlflow.sklearn.log_model(model, "model", registered_model_name=model_name)
    run_id = run.info.run_id

client = MlflowClient()
# Get latest version
versions = client.get_latest_versions(model_name)
for v in versions:
    print(f"Version {v.version}: stage={v.current_stage}, acc={acc:.4f}")
    # Transition to staging
    client.transition_model_version_stage(model_name, v.version, "Staging")
    print(f"Transitioned to Staging")
Model Comparison & Promotion
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd

client = MlflowClient()

# Search for best model runs across experiments
def get_best_model(experiment_name, metric="accuracy", higher_is_better=True):
    order = f"metrics.{metric} {'DESC' if higher_is_better else 'ASC'}"
    runs = mlflow.search_runs(
        experiment_names=[experiment_name],
        order_by=[order],
        max_results=5
    )
    if runs.empty:
        print(f"No runs found for {experiment_name}")
        return None

    best = runs.iloc[0]
    print(f"Best run: {best['run_id'][:8]}...")
    print(f"  {metric}: {best.get(f'metrics.{metric}', 'N/A')}")
    print(f"  params: {dict((k.replace('params.',''), v) for k,v in best.items() if k.startswith('params.'))}")
    return best

# Simulate comparing runs
print("Searching for best model...")
# In practice: get_best_model("iris_classification", "accuracy")

# Mock comparison table
comparison = pd.DataFrame({
    "model": ["RandomForest", "GradientBoost", "SVM"],
    "accuracy": [0.9737, 0.9605, 0.9737],
    "train_time_s": [2.1, 4.5, 0.3],
    "stage": ["Production", "Staging", "Archived"]
})
print(comparison.to_string(index=False))
💼 Real-World Scenario
Your company has 5 versions of a fraud detection model. You need to track which version is in production, roll back when performance degrades, and promote new versions through staging.
Real-World Code
import mlflow, mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import numpy as np

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

model_name = "FraudDetector"
mlflow.set_experiment("fraud_detection_registry")

# Train multiple versions
for version_seed in [42, 123, 999]:
    with mlflow.start_run(run_name=f"v_seed{version_seed}"):
        np.random.seed(version_seed)
        model = LogisticRegression(C=np.random.choice([0.1, 1.0, 10.0]), max_iter=1000)
        model.fit(X_train, y_train)
        acc = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", acc)
        mlflow.log_param("seed", version_seed)
        mlflow.sklearn.log_model(model, "model", registered_model_name=model_name)
        print(f"seed={version_seed}: accuracy={acc:.4f}")

# List all versions
client = MlflowClient()
versions = client.search_model_versions(f"name='{model_name}'")
for v in versions:
    print(f"Version {v.version}: {v.current_stage}")
🏋️ Practice: Register and stage a model version
Log a model with mlflow.sklearn.log_model using registered_model_name, then use MlflowClient to transition its stage.
Starter Code
import mlflow, mlflow.sklearn
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
with mlflow.start_run():
    m = DummyClassifier().fit(X, y)
    mlflow.sklearn.log_model(m, "model", registered_model_name="DummyModel")
print("Registered!")
✅ Practice Checklist
20. Containerizing ML Models with Docker

Docker packages ML models with all dependencies into portable containers, ensuring consistent environments from development to production.

Dockerfile for ML Model
# Example Dockerfile for a scikit-learn model API
dockerfile_lines = [
    "FROM python:3.10-slim",
    "WORKDIR /app",
    "COPY requirements.txt .",
    "RUN pip install --no-cache-dir -r requirements.txt",
    "COPY model.pkl .",
    "COPY app.py .",
    "EXPOSE 8000",
    "HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:8000/health || exit 1",
    'CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]',
]
print("Dockerfile:")
for line in dockerfile_lines:
    print("  " + line)

# FastAPI app.py content (as list to avoid nested triple-quotes)
app_lines = [
    "import pickle, numpy as np",
    "from fastapi import FastAPI",
    "from pydantic import BaseModel",
    "app = FastAPI()",
    'model = pickle.load(open("model.pkl", "rb"))',
    "class Features(BaseModel):",
    "    features: list[float]",
    "@app.get('/health')",
    "def health(): return {'status': 'ok'}",
    "@app.post('/predict')",
    "def predict(data: Features):",
    "    pred = model.predict([data.features])",
    "    return {'prediction': int(pred[0])}",
]
print("\napp.py:")
for line in app_lines:
    print("  " + line)
Docker Multi-Stage Build & MLflow Docker
# Multi-stage Dockerfile for smaller images
multistage_lines = [
    "FROM python:3.10 AS builder",
    "WORKDIR /build",
    "COPY requirements.txt .",
    "RUN pip install --user --no-cache-dir -r requirements.txt",
    "FROM python:3.10-slim AS production",
    "WORKDIR /app",
    "COPY --from=builder /root/.local /root/.local",
    "COPY . .",
    "ENV PATH=/root/.local/bin:$PATH",
    'CMD ["python", "serve.py"]',
]
print("Multi-stage Dockerfile:")
for line in multistage_lines:
    print("  " + line)

# MLflow built-in Docker packaging
mlflow_cmds = [
    "# Build Docker image from MLflow model (no Dockerfile needed!)",
    "mlflow models build-docker \\",
    '    --model-uri "models:/MyModel/Production" \\',
    '    --name "my-ml-model:latest"',
    "docker run -p 8080:8080 my-ml-model:latest",
    "curl -X POST http://localhost:8080/invocations \\",
    '    -H "Content-Type: application/json" \\',
    "    -d \'{"instances": [[5.1, 3.5, 1.4, 0.2]]}\'",
]
print("\nMLflow Docker commands:")
for cmd in mlflow_cmds:
    print("  " + cmd)
💼 Real-World Scenario
Your data science team develops models locally but deployment always breaks due to package version mismatches. You need to containerize the model so it runs identically everywhere.
Real-World Code
import pickle, json, os
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Train and save a model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
acc = accuracy_score(y_test, model.predict(X_test))

# Save model artifact
model_path = "/tmp/iris_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

# Generate deployment manifest
manifest = {
    "model": "RandomForestClassifier",
    "version": "1.0.0",
    "accuracy": round(acc, 4),
    "features": ["sepal_length", "sepal_width", "petal_length", "petal_width"],
    "classes": ["setosa", "versicolor", "virginica"],
    "model_path": model_path,
    "docker_image": "iris-classifier:1.0.0",
    "port": 8000
}
print(json.dumps(manifest, indent=2))

# Simulate container health check
def health_check(model_pkl_path):
    with open(model_pkl_path, "rb") as f:
        m = pickle.load(f)
    test_input = np.array([[5.1, 3.5, 1.4, 0.2]])
    pred = m.predict(test_input)
    return {"status": "healthy", "test_prediction": int(pred[0])}

print("Health check:", health_check(model_path))
🏋️ Practice: Create a deployment manifest for a model
Save a trained model and generate a JSON manifest with model metadata and deployment configuration.
Starter Code
import pickle, json
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
m = DummyClassifier().fit(X, y)
with open("/tmp/model.pkl", "wb") as f: pickle.dump(m, f)
print(json.dumps({"model": "DummyClassifier", "port": 8000}, indent=2))
✅ Practice Checklist
21. REST API Serving with FastAPI

FastAPI provides a high-performance, async-ready framework for building ML model serving APIs with automatic validation and documentation.

Basic FastAPI Model Server
# FastAPI ML serving (run with: uvicorn app:app --reload)
# app.py structure:
fastapi_lines = [
    "from fastapi import FastAPI",
    "from pydantic import BaseModel, validator",
    "from typing import List, Optional",
    "import time, pickle, numpy as np",
    "",
    "app = FastAPI(title='ML Model API', version='1.0.0')",
    "model = None",
    "",
    "@app.on_event('startup')",
    "async def load_model():",
    "    global model",
    "    model = pickle.load(open('model.pkl', 'rb'))",
    "",
    "class PredictionRequest(BaseModel):",
    "    features: List[float]",
    "    model_version: Optional[str] = 'latest'",
    "",
    "@app.post('/predict')",
    "async def predict(request: PredictionRequest):",
    "    start = time.time()",
    "    pred = model.predict([request.features])",
    "    prob = model.predict_proba([request.features])",
    "    latency = (time.time() - start) * 1000",
    "    return {'prediction': int(pred[0]), 'probability': prob[0].tolist(), 'latency_ms': round(latency, 2)}",
    "",
    "@app.get('/health')",
    "async def health(): return {'status': 'ok'}",
]
print("FastAPI app.py:")
for line in fastapi_lines[:15]:
    print("  " + line)
print("  ...")
Batch Prediction & Request Validation
# Batch prediction endpoint + middleware
import numpy as np, time

# Simulate batch endpoint logic
def batch_predict(instances):
    X = np.array(instances)
    # predictions = model.predict(X)
    predictions = [0] * len(X)  # mock
    return {"predictions": predictions, "count": len(predictions)}

# Test batch
result = batch_predict([[5.1,3.5,1.4,0.2],[6.7,3.0,5.2,2.3],[4.9,3.0,1.4,0.2]])
print("Batch result:", result)

# Middleware pattern (request logging)
import functools, logging
def log_middleware(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = (time.time() - start) * 1000
        logging.info(f"{func.__name__} completed in {duration:.2f}ms")
        return result
    return wrapper

@log_middleware
def predict_single(features):
    return {"prediction": 0, "latency_ms": 0.1}

res = predict_single([5.1, 3.5, 1.4, 0.2])
print("Single prediction:", res)

# FastAPI batch endpoint structure (key lines)
batch_endpoint_code = [
    "class BatchRequest(BaseModel):",
    "    instances: List[List[float]] = Field(..., min_items=1, max_items=1000)",
    "@app.post('/predict/batch')",
    "async def batch_predict(request: BatchRequest):",
    "    X = np.array(request.instances)",
    "    predictions = model.predict(X)",
    "    return {'predictions': predictions.tolist(), 'count': len(predictions)}",
]
print("\nBatch endpoint structure:")
for line in batch_endpoint_code: print("  " + line)
💼 Real-World Scenario
Your ML model needs to serve predictions via an API that can handle 1000 requests/second with <50ms latency, validate inputs, and return probabilities with predictions.
Real-World Code
# Simulate FastAPI prediction pipeline
import numpy as np
import pickle
import time
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# Train model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=50, random_state=42)
model.fit(X_train, y_train)

# Simulate API prediction function
def predict_api(features: list) -> dict:
    if len(features) != 4:
        return {"error": f"Expected 4 features, got {len(features)}"}

    start = time.time()
    X = np.array([features])
    pred = model.predict(X)
    prob = model.predict_proba(X)
    latency_ms = (time.time() - start) * 1000

    return {
        "prediction": int(pred[0]),
        "probability": [round(p, 4) for p in prob[0]],
        "latency_ms": round(latency_ms, 3)
    }

# Test single prediction
result = predict_api([5.1, 3.5, 1.4, 0.2])
print("Single prediction:", result)

# Simulate batch prediction
batch = [[5.1, 3.5, 1.4, 0.2], [6.7, 3.0, 5.2, 2.3], [4.9, 3.0, 1.4, 0.2]]
start = time.time()
batch_results = [predict_api(f) for f in batch]
total_ms = (time.time() - start) * 1000
print(f"Batch of {len(batch)}: {total_ms:.2f}ms total")
for i, r in enumerate(batch_results):
    print(f"  [{i}] pred={r['prediction']}, prob_max={max(r['probability']):.4f}")
🏋️ Practice: Build a prediction function that validates input
Create a function that checks input length, runs prediction, and returns prediction + probabilities.
Starter Code
import numpy as np
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
model = DummyClassifier().fit(X, y)
def predict(features):
    if len(features) != 4: return {"error": "need 4 features"}
    pred = model.predict([features])
    return {"prediction": int(pred[0])}
print(predict([5.1, 3.5, 1.4, 0.2]))
✅ Practice Checklist
22. Model Monitoring & Drift Detection

Production ML models degrade over time due to data drift and concept drift. Monitoring detects these shifts before they cause silent failures.

Statistical Drift Detection
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.datasets import load_iris

# Load reference (training) data
X, y = load_iris(return_X_y=True)
ref_data = pd.DataFrame(X, columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])

# Simulate production data with drift
np.random.seed(42)
# Normal production (no drift)
prod_normal = ref_data + np.random.normal(0, 0.05, ref_data.shape)

# Drifted production (mean shift)
prod_drift = ref_data + np.random.normal(0.5, 0.2, ref_data.shape)

def detect_drift_ks(reference, current, alpha=0.05):
    # Kolmogorov-Smirnov test for distribution drift per feature
    results = {}
    for col in reference.columns:
        stat, p_val = stats.ks_2samp(reference[col], current[col])
        results[col] = {
            "ks_stat": round(stat, 4),
            "p_value": round(p_val, 4),
            "drift_detected": p_val < alpha
        }
    return results

print("=== No Drift Scenario ===")
for feat, res in detect_drift_ks(ref_data, prod_normal).items():
    flag = "DRIFT" if res["drift_detected"] else "OK"
    print(f"  {feat}: {flag} (p={res['p_value']:.4f})")

print("\n=== Drift Scenario ===")
for feat, res in detect_drift_ks(ref_data, prod_drift).items():
    flag = "DRIFT" if res["drift_detected"] else "OK"
    print(f"  {feat}: {flag} (p={res['p_value']:.4f})")
PSI & Model Performance Monitoring
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

def psi(expected, actual, buckets=10):
    # Population Stability Index: PSI > 0.2 = significant drift
    def scale_range(arr, min_val, max_val):
        arr = np.clip(arr, min_val, max_val)
        return arr

    min_val = min(expected.min(), actual.min())
    max_val = max(expected.max(), actual.max())

    breakpoints = np.linspace(min_val, max_val, buckets + 1)
    expected_perc = np.histogram(expected, bins=breakpoints)[0] / len(expected) + 1e-10
    actual_perc = np.histogram(actual, bins=breakpoints)[0] / len(actual) + 1e-10

    psi_val = np.sum((actual_perc - expected_perc) * np.log(actual_perc / expected_perc))
    return round(psi_val, 4)

# Train model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, random_state=42)
model = RandomForestClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)

# Reference predictions (training)
ref_probs = model.predict_proba(X_train)[:, 1]

# Simulated production prediction scores
prod_stable = model.predict_proba(X_test)[:, 1]
prod_drifted = np.clip(prod_stable + np.random.normal(0.3, 0.1, len(prod_stable)), 0, 1)

print(f"PSI (stable):  {psi(ref_probs, prod_stable):.4f}  {'OK' if psi(ref_probs, prod_stable) < 0.1 else 'DRIFT'}")
print(f"PSI (drifted): {psi(ref_probs, prod_drifted):.4f}  {'OK' if psi(ref_probs, prod_drifted) < 0.1 else 'DRIFT'}")
print("\nPSI thresholds: <0.1 = stable, 0.1-0.2 = slight change, >0.2 = significant drift")
💼 Real-World Scenario
Your credit scoring model was accurate at launch but F1 dropped from 0.92 to 0.71 over 6 months. You need to detect data drift early and trigger retraining alerts.
Real-World Code
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

X, y = load_breast_cancer(return_X_y=True)
feat_names = load_breast_cancer().feature_names
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
baseline_f1 = f1_score(y_test, model.predict(X_test))
print(f"Baseline F1: {baseline_f1:.4f}")

# Simulate monthly production windows with increasing drift
months = 6
results = []
for month in range(1, months + 1):
    drift_level = month * 0.1
    X_prod = X_test + np.random.normal(drift_level, 0.05, X_test.shape)
    y_prod = model.predict(X_prod)

    # KS test on top 3 features
    drift_counts = sum(
        stats.ks_2samp(X_train[:, i], X_prod[:, i])[1] < 0.05
        for i in range(3)
    )

    # If we had labels, compute F1
    simulated_errors = (drift_level * np.random.randn(len(y_test))).astype(bool)
    y_labels = np.where(simulated_errors, 1 - y_test, y_test)
    f1 = f1_score(y_labels, model.predict(X_prod))

    results.append({"month": month, "f1": round(f1, 4), "drifted_features": drift_counts})
    alert = "ALERT" if f1 < baseline_f1 * 0.9 or drift_counts >= 2 else "OK"
    print(f"Month {month}: F1={f1:.4f}, drift_features={drift_counts} [{alert}]")
🏋️ Practice: Implement KS drift detection
Compare training and production data distributions using KS test and flag features with p-value < 0.05.
Starter Code
import numpy as np
from scipy import stats
ref = np.random.normal(0, 1, 1000)
prod = np.random.normal(0.5, 1, 1000)  # drifted
stat, p = stats.ks_2samp(ref, prod)
print(f"KS stat={stat:.4f}, p={p:.4f}, drift={'YES' if p<0.05 else 'NO'}")
✅ Practice Checklist
23. CI/CD for ML Pipelines

CI/CD for ML automates testing, validation, and deployment of models, ensuring quality gates before any model reaches production.

GitHub Actions ML Pipeline
# .github/workflows/ml_pipeline.yml structure
yaml_lines = [
    "name: ML Pipeline CI/CD",
    "on:",
    "  push:",
    "    branches: [main]",
    "jobs:",
    "  test-and-validate:",
    "    runs-on: ubuntu-latest",
    "    steps:",
    "    - uses: actions/checkout@v3",
    "    - name: Set up Python",
    "      uses: actions/setup-python@v4",
    "      with:",
    "        python-version: '3.10'",
    "    - name: Install dependencies",
    "      run: pip install -r requirements.txt",
    "    - name: Run unit tests",
    "      run: pytest tests/ --cov=src",
    "    - name: Train model",
    "      run: python train.py",
    "    - name: Evaluate model (quality gate)",
    "      run: python evaluate.py --threshold 0.90",
    "    - name: Build Docker image",
    "      if: github.ref == 'refs/heads/main'",
    "      run: docker build -t my-model:latest .",
]
print(".github/workflows/ml_pipeline.yml:")
for line in yaml_lines:
    print("  " + line)
Model Quality Gates
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import json, sys

def evaluate_model(model, X_train, X_test, y_train, y_test, thresholds):
    # Quality gate: fail CI if model doesn't meet thresholds
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    cv_scores = cross_val_score(model, X_train, y_train, cv=5)

    metrics = {
        "accuracy": round(accuracy_score(y_test, y_pred), 4),
        "f1_macro": round(f1_score(y_test, y_pred, average="macro"), 4),
        "cv_mean": round(cv_scores.mean(), 4),
        "cv_std": round(cv_scores.std(), 4),
    }

    passed = all(metrics.get(k, 0) >= v for k, v in thresholds.items())
    return metrics, passed

# Define quality gates
THRESHOLDS = {"accuracy": 0.90, "f1_macro": 0.88, "cv_mean": 0.89}

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)

metrics, passed = evaluate_model(model, X_train, X_test, y_train, y_test, THRESHOLDS)
print(json.dumps(metrics, indent=2))
print(f"\nQuality Gate: {'PASSED βœ“' if passed else 'FAILED βœ—'}")
if not passed:
    print("Failed thresholds:", {k: v for k, v in THRESHOLDS.items() if metrics.get(k, 0) < v})
    # sys.exit(1)  # Uncomment in real CI to fail the pipeline
💼 Real-World Scenario
Your team deploys new models manually, causing production incidents when untested code reaches users. You need automated testing, model quality gates, and staged deployment.
Real-World Code
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import f1_score
import json

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

THRESHOLDS = {"accuracy": 0.95, "f1": 0.94, "cv_mean": 0.94}

def ci_pipeline(model, model_name):
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    cv = cross_val_score(model, X_train, y_train, cv=5)

    metrics = {
        "model": model_name,
        "accuracy": round(model.score(X_test, y_test), 4),
        "f1": round(f1_score(y_test, y_pred), 4),
        "cv_mean": round(cv.mean(), 4),
        "cv_std": round(cv.std(), 4),
    }

    failures = [k for k in ["accuracy", "f1", "cv_mean"] if metrics[k] < THRESHOLDS[k]]
    metrics["status"] = "PASSED" if not failures else f"FAILED: {failures}"
    return metrics

for model, name in [
    (RandomForestClassifier(n_estimators=100, random_state=42), "RandomForest"),
    (GradientBoostingClassifier(n_estimators=50, random_state=42), "GradientBoost"),
]:
    result = ci_pipeline(model, name)
    print(json.dumps(result, indent=2))
🏋️ Practice: Implement a model quality gate
Write a function that evaluates a model against metric thresholds and returns pass/fail.
Starter Code
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y)
m = DummyClassifier().fit(X_train, y_train)
acc = m.score(X_test, y_test)
passed = acc >= 0.30
print(f"Accuracy={acc:.4f}, gate={'PASSED' if passed else 'FAILED'}")
✅ Practice Checklist
24. Feature Stores & ML Platforms

Feature stores centralize, version, and serve ML features consistently across training and serving, eliminating training-serving skew.

Feature Store with Feast
# Feature store concepts using pandas (Feast-like workflow)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Simulate a feature store
class SimpleFeatureStore:
    def __init__(self):
        self._features = {}
        self._entity_df = None

    def ingest(self, feature_view_name, df, entity_col="entity_id"):
        # Register feature data
        self._features[feature_view_name] = df.set_index(entity_col)
        print(f"Ingested '{feature_view_name}': {len(df)} rows, {len(df.columns)-1} features")

    def get_historical_features(self, entity_ids, feature_views):
        # Retrieve features for training
        dfs = [self._features[fv].loc[entity_ids] for fv in feature_views if fv in self._features]
        return pd.concat(dfs, axis=1).reset_index()

    def get_online_features(self, entity_id, feature_views):
        # Low-latency single-entity lookup for serving
        result = {"entity_id": entity_id}
        for fv in feature_views:
            if fv in self._features and entity_id in self._features[fv].index:
                result.update(self._features[fv].loc[entity_id].to_dict())
        return result

# Setup
store = SimpleFeatureStore()
np.random.seed(42)
n = 1000

# User behavioral features
user_features = pd.DataFrame({
    "entity_id": range(n),
    "avg_spend": np.random.exponential(50, n),
    "days_active": np.random.randint(1, 365, n),
    "n_transactions": np.random.poisson(10, n),
})
store.ingest("user_activity", user_features)

# User demographic features
user_demo = pd.DataFrame({
    "entity_id": range(n),
    "age": np.random.randint(18, 70, n),
    "credit_score": np.random.randint(300, 850, n),
})
store.ingest("user_demographics", user_demo)

# Historical features for training
train_features = store.get_historical_features(
    entity_ids=list(range(100)),
    feature_views=["user_activity", "user_demographics"]
)
print("Training features shape:", train_features.shape)
print(train_features.head(3).to_string())

# Online features for serving
online = store.get_online_features(42, ["user_activity", "user_demographics"])
print("\nOnline features for entity 42:", online)
Training-Serving Skew Prevention
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle

# Anti-pattern: inline preprocessing (causes training-serving skew)
def bad_preprocess(df):
    df["feature_ratio"] = df["f1"] / (df["f2"] + 1e-8)
    df["log_f3"] = np.log1p(df["f3"])
    return df[["feature_ratio", "log_f3", "f4"]].values

# Good pattern: encapsulate ALL preprocessing in a sklearn Pipeline
from sklearn.base import BaseEstimator, TransformerMixin

class FeatureEngineer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        X = pd.DataFrame(X, columns=["f1", "f2", "f3", "f4"])
        return np.column_stack([
            X["f1"] / (X["f2"] + 1e-8),   # ratio feature
            np.log1p(X["f3"]),              # log transform
            X["f4"]                          # passthrough
        ])

# Generate data
np.random.seed(42)
n = 1000
X = pd.DataFrame(np.abs(np.random.randn(n, 4)), columns=["f1", "f2", "f3", "f4"])
y = (X["f1"] > X["f2"]).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X.values, y, random_state=42)

# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
    ("engineer", FeatureEngineer()),
    ("scaler", StandardScaler()),
    ("model", RandomForestClassifier(n_estimators=50, random_state=42))
])

full_pipeline.fit(X_train, y_train)
acc = full_pipeline.score(X_test, y_test)
print(f"Pipeline accuracy: {acc:.4f}")

# Save entire pipeline (same logic used in training AND serving)
with open("/tmp/full_pipeline.pkl", "wb") as f:
    pickle.dump(full_pipeline, f)

# Serving: load and predict (ZERO skew risk)
with open("/tmp/full_pipeline.pkl", "rb") as f:
    serving_pipeline = pickle.load(f)

pred = serving_pipeline.predict(X_test[:5])
print("Serving predictions:", pred)
print("Training-serving skew: ELIMINATED (same pipeline object)")
💼 Real-World Scenario
Your churn model uses 50 features computed differently during training (pandas) and serving (SQL). This training-serving skew causes 8% accuracy loss in production.
Real-World Code
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pickle

np.random.seed(42)
n = 2000

# Simulate customer churn features
df = pd.DataFrame({
    "tenure_months": np.random.randint(1, 72, n),
    "monthly_charges": np.random.uniform(20, 120, n),
    "total_charges": np.random.uniform(20, 8000, n),
    "n_support_calls": np.random.poisson(2, n),
    "last_login_days": np.random.exponential(30, n),
})

# Target: churn if high charges + short tenure + many support calls
df["churn"] = (
    (df["monthly_charges"] > 80) &
    (df["tenure_months"] < 12) |
    (df["n_support_calls"] > 4)
).astype(int)

X = df.drop("churn", axis=1).values
y = df["churn"].values

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

# Feature engineering + model in one pipeline
def add_features(X):
    X = np.copy(X)
    # charge_per_month = total / tenure
    charge_ratio = X[:, 2] / (X[:, 0] + 1)
    # recency_support_interaction
    recency_support = X[:, 3] * np.log1p(X[:, 4])
    return np.column_stack([X, charge_ratio, recency_support])

pipeline = Pipeline([
    ("engineer", FunctionTransformer(add_features)),
    ("scaler", StandardScaler()),
    ("model", GradientBoostingClassifier(n_estimators=100, random_state=42))
])

pipeline.fit(X_train, y_train)
print("Churn model pipeline accuracy:", round(pipeline.score(X_test, y_test), 4))
print(classification_report(y_test, pipeline.predict(X_test)))

# Single-entity serving prediction
customer = np.array([[24, 95.0, 2500.0, 3, 45.0]])
pred = pipeline.predict(customer)
prob = pipeline.predict_proba(customer)
print(f"Customer churn prediction: {'CHURN' if pred[0] else 'RETAIN'} (prob={prob[0][1]:.4f})")
🏋️ Practice: Build a feature engineering pipeline
Use sklearn Pipeline with FunctionTransformer for feature engineering to prevent training-serving skew.
Starter Code
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.dummy import DummyClassifier

def add_feat(X): return np.column_stack([X, X[:, 0] / (X[:, 1] + 1)])
pipe = Pipeline([("eng", FunctionTransformer(add_feat)), ("sc", StandardScaler()), ("m", DummyClassifier())])
X = np.random.randn(100, 3); y = np.random.randint(0, 2, 100)
pipe.fit(X, y)
print("Pipeline works:", pipe.predict(X[:3]))
✅ Practice Checklist