π MLOps & Model Deployment
24 topics • Click any card to expand
Persist trained models to disk for reuse, versioning, and deployment. Covers pickle, joblib, and framework-native formats.
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(n_estimators=50, random_state=42)
model.fit(X, y)
# Save
joblib.dump(model, 'iris_rf.joblib')
print('Model saved.')
# Load and predict
loaded = joblib.load('iris_rf.joblib')
preds = loaded.predict(X[:5])
print('Predictions:', preds)
print('Expected: ', y[:5])import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_breast_cancer
X, y = load_breast_cancer(return_X_y=True)
pipe = Pipeline([
('scaler', StandardScaler()),
('clf', LogisticRegression(max_iter=1000))
])
pipe.fit(X, y)
# Save the entire pipeline β preprocessor + model
joblib.dump(pipe, 'cancer_pipeline.joblib')
loaded_pipe = joblib.load('cancer_pipeline.joblib')
print('Pipeline accuracy:', loaded_pipe.score(X, y))
print('Steps:', [name for name, _ in loaded_pipe.steps])try:
import torch
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(4, 3)
def forward(self, x):
return self.fc(x)
model = SimpleNet()
# Best practice: save state_dict only
torch.save(model.state_dict(), 'simple_net.pt')
# Load
loaded = SimpleNet()
loaded.load_state_dict(torch.load('simple_net.pt', weights_only=True))
loaded.eval()
x = torch.randn(2, 4)
print('Output:', loaded(x))
except ImportError:
print('pip install torch')import joblib, json, hashlib, time
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
model = GradientBoostingClassifier(n_estimators=50, random_state=42)
model.fit(X, y)
# Save model
model_path = 'model_v1.joblib'
joblib.dump(model, model_path)
# Save metadata alongside
with open(model_path, 'rb') as f:
checksum = hashlib.md5(f.read()).hexdigest()
metadata = {
'model_class': type(model).__name__,
'params': model.get_params(),
'train_accuracy': model.score(X, y),
'trained_at': time.strftime('%Y-%m-%dT%H:%M:%S'),
'checksum': checksum,
}
with open('model_v1_meta.json', 'w') as f:
json.dump(metadata, f, indent=2)
print(json.dumps(metadata, indent=2))import joblib, json, time, pathlib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
MODEL_DIR = pathlib.Path('model_registry')
MODEL_DIR.mkdir(exist_ok=True)
def save_model_version(model, version: str, metrics: dict):
model_path = MODEL_DIR / f'model_{version}.joblib'
meta_path = MODEL_DIR / f'model_{version}_meta.json'
joblib.dump(model, model_path)
meta = {'version': version, 'trained_at': time.strftime('%Y-%m-%dT%H:%M:%S'),
'model_class': type(model).__name__, **metrics}
meta_path.write_text(json.dumps(meta, indent=2))
print(f'Saved {model_path}')
return meta
X, y = make_classification(n_samples=500, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
meta = save_model_version(model, 'v1.0', {'train_acc': model.score(X, y)})
print(json.dumps(meta, indent=2))import joblib, json, pathlib
class ModelRegistry:
def __init__(self, registry_dir='registry'):
self.dir = pathlib.Path(registry_dir)
self.dir.mkdir(exist_ok=True)
def save(self, model, version: str, accuracy: float):
# TODO: save model and metadata json
pass
def list_versions(self) -> list:
# TODO: return list of dicts with version + accuracy
pass
def load(self, version: str):
# TODO: load and return model by version string
pass
def best(self):
# TODO: return model with highest accuracy
pass
# Test it
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
reg = ModelRegistry()
m = LogisticRegression(max_iter=200).fit(X, y)
reg.save(m, 'v1', m.score(X, y))
print(reg.list_versions())Wrap ML models in REST APIs using FastAPI. Serve predictions, handle requests, add input validation with Pydantic, and test endpoints.
# Save this as app.py and run: uvicorn app:app --reload
# pip install fastapi uvicorn scikit-learn joblib
APP_CODE = '''
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np
app = FastAPI(title='Iris Classifier')
model = joblib.load('iris_rf.joblib') # load saved model
class IrisFeatures(BaseModel):
sepal_length: float
sepal_width: float
petal_length: float
petal_width: float
@app.post('/predict')
def predict(features: IrisFeatures):
X = np.array([[features.sepal_length, features.sepal_width,
features.petal_length, features.petal_width]])
pred = model.predict(X)[0]
proba = model.predict_proba(X).max()
return {'prediction': int(pred), 'confidence': float(proba)}
'''
print(APP_CODE)BATCH_API = '''
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import numpy as np, joblib
app = FastAPI()
model = joblib.load('iris_rf.joblib')
class SingleSample(BaseModel):
features: List[float]
class BatchRequest(BaseModel):
samples: List[SingleSample]
@app.post('/predict/batch')
def batch_predict(batch: BatchRequest):
X = np.array([s.features for s in batch.samples])
preds = model.predict(X).tolist()
probas = model.predict_proba(X).max(axis=1).tolist()
return [
{'prediction': p, 'confidence': round(c, 4)}
for p, c in zip(preds, probas)
]
'''
print(BATCH_API)# Run your FastAPI server first, then test it:
try:
import requests
BASE_URL = 'http://localhost:8000'
# Single prediction
payload = {
'sepal_length': 5.1, 'sepal_width': 3.5,
'petal_length': 1.4, 'petal_width': 0.2
}
response = requests.post(f'{BASE_URL}/predict', json=payload, timeout=5)
print('Status:', response.status_code)
print('Response:', response.json())
# Health check
health = requests.get(f'{BASE_URL}/health', timeout=5)
print('Health:', health.json())
except Exception as e:
print(f'Server not running: {e}')
print('Start with: uvicorn app:app --reload')ADVANCED_API = '''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib, numpy as np, time, pathlib
app = FastAPI()
MODEL_PATH = 'iris_rf.joblib'
state = {'model': None, 'loaded_at': None}
def load_model():
state['model'] = joblib.load(MODEL_PATH)
state['loaded_at'] = time.strftime("%Y-%m-%dT%H:%M:%S")
@app.on_event('startup')
def startup():
load_model()
@app.post('/reload')
def reload_model():
load_model()
return {'status': 'reloaded', 'at': state['loaded_at']}
@app.get('/health')
def health():
return {'status': 'ok', 'model_loaded': state['model'] is not None,
'loaded_at': state['loaded_at']}
@app.post('/predict')
def predict(x: list):
if state['model'] is None:
raise HTTPException(503, 'Model not loaded')
return {'prediction': int(state['model'].predict([x])[0])}
'''
print(ADVANCED_API)# Churn prediction API skeleton
CHURN_API = '''
from fastapi import FastAPI
from pydantic import BaseModel, Field
from typing import Optional
import joblib, numpy as np
app = FastAPI(title='Churn Prediction API', version='1.0')
model = joblib.load('churn_model.joblib')
class CustomerFeatures(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_support_calls: bool
@app.post('/predict/churn')
def predict_churn(customer: CustomerFeatures):
X = np.array([[
customer.tenure_months, customer.monthly_charges,
customer.total_charges, customer.num_products,
int(customer.has_support_calls)
]])
churn_prob = float(model.predict_proba(X)[0, 1])
return {
'churn_probability': round(churn_prob, 4),
'risk_level': 'HIGH' if churn_prob > 0.7 else 'MEDIUM' if churn_prob > 0.4 else 'LOW'
}
'''
print(CHURN_API)# Save as sentiment_api.py
# Run: uvicorn sentiment_api:app --reload
SENTIMENT_API = '''
from fastapi import FastAPI
from pydantic import BaseModel
import nltk
nltk.download('vader_lexicon', quiet=True)
from nltk.sentiment.vader import SentimentIntensityAnalyzer
app = FastAPI()
# TODO: create SentimentIntensityAnalyzer instance
class TextRequest(BaseModel):
text: str
# TODO: POST /analyze -> return label + compound score
# TODO: GET /health -> return {'status': 'ok'}
'''
print(SENTIMENT_API)Track experiments, log parameters and metrics, compare runs, and register models for deployment using MLflow.
try:
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
mlflow.set_experiment('iris-classification')
with mlflow.start_run(run_name='random-forest-v1'):
params = {'n_estimators': 100, 'max_depth': 5, 'random_state': 42}
mlflow.log_params(params)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
preds = model.predict(X_test)
mlflow.log_metric('accuracy', accuracy_score(y_test, preds))
mlflow.log_metric('f1', f1_score(y_test, preds, average='weighted'))
mlflow.sklearn.log_model(model, 'model')
print('Run logged. View: mlflow ui')
except ImportError:
print('pip install mlflow')try:
import mlflow
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
mlflow.set_experiment('iris-comparison')
models = [
('LogisticRegression', LogisticRegression(max_iter=200)),
('RandomForest', RandomForestClassifier(n_estimators=50, random_state=42)),
('GradientBoosting', GradientBoostingClassifier(n_estimators=50, random_state=42)),
]
for name, clf in models:
with mlflow.start_run(run_name=name):
clf.fit(X_train, y_train)
acc = accuracy_score(y_test, clf.predict(X_test))
mlflow.log_metric('accuracy', acc)
mlflow.log_param('model_type', name)
print(f'{name}: {acc:.4f}')
except ImportError:
print('pip install mlflow')try:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
mlflow.sklearn.autolog() # auto-logs params, metrics, model
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
with mlflow.start_run(run_name='autolog-demo'):
model = GradientBoostingClassifier(n_estimators=100)
model.fit(X_train, y_train)
print('Autolog complete β everything captured automatically!')
print('Accuracy:', model.score(X_test, y_test))
except ImportError:
print('pip install mlflow')try:
import mlflow
# After running an experiment, load the best run's model:
# Option 1: by run_id
# model = mlflow.sklearn.load_model('runs:/<run_id>/model')
# Option 2: from Model Registry (after registering)
# model = mlflow.sklearn.load_model('models:/IrisClassifier/Production')
# Option 3: query best run programmatically
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name('iris-comparison')
if experiment:
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=['metrics.accuracy DESC'],
max_results=1
)
if runs:
best = runs[0]
print(f'Best run: {best.info.run_id}')
print(f'Accuracy: {best.data.metrics["accuracy"]:.4f}')
# model = mlflow.sklearn.load_model(f'runs:/{best.info.run_id}/model')
else:
print('No runs found β run the comparison example first.')
else:
print('Experiment not found β run the comparison example first.')
except ImportError:
print('pip install mlflow')try:
import mlflow
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import itertools
X, y = make_classification(n_samples=1000, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment('credit-scoring-sweep')
param_grid = list(itertools.product([50, 100], [3, 5], [0.05, 0.1]))
best_auc, best_run_id = 0, None
for n_est, depth, lr in param_grid:
with mlflow.start_run():
model = GradientBoostingClassifier(n_estimators=n_est, max_depth=depth, learning_rate=lr)
model.fit(X_train, y_train)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
mlflow.log_params({'n_estimators': n_est, 'max_depth': depth, 'learning_rate': lr})
mlflow.log_metric('roc_auc', auc)
mlflow.sklearn.log_model(model, 'model')
if auc > best_auc:
best_auc = auc
best_run_id = mlflow.active_run().info.run_id
print(f'Best AUC: {best_auc:.4f} | Run: {best_run_id}')
except ImportError:
print('pip install mlflow scikit-learn')try:
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment('iris-sweep')
# TODO: loop over n_estimators and max_depth
# TODO: log params and accuracy in each run
# TODO: track best_accuracy and best_run_id
# TODO: print winner
except ImportError:
print('pip install mlflow')Package ML models and APIs in Docker containers for reproducible, portable deployment. Learn Dockerfiles, image building, and running containers.
DOCKERFILE = '''
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app.py .
COPY iris_rf.joblib .
# Expose port
EXPOSE 8000
# Start server
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
REQUIREMENTS = '''
fastapi==0.111.0
uvicorn==0.30.0
scikit-learn==1.5.0
joblib==1.4.2
numpy==1.26.4
'''
print('Dockerfile:')
print(DOCKERFILE)
print('requirements.txt:')
print(REQUIREMENTS)# Build and run Docker commands (run these in your terminal)
COMMANDS = '''
# Build image
docker build -t iris-api:v1 .
# Run container
docker run -d -p 8000:8000 --name iris-api iris-api:v1
# Test it
curl -X POST http://localhost:8000/predict \
-H 'Content-Type: application/json' \
-d '{"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}'
# View logs
docker logs iris-api
# Stop and remove
docker stop iris-api && docker rm iris-api
# Push to Docker Hub
docker tag iris-api:v1 yourusername/iris-api:v1
docker push yourusername/iris-api:v1
'''
print(COMMANDS)MULTISTAGE_DOCKERFILE = '''
# Stage 1: build dependencies
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: lean runtime image
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY app.py iris_rf.joblib ./
# Non-root user for security
RUN useradd -m appuser
USER appuser
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print(MULTISTAGE_DOCKERFILE)COMPOSE_YAML = '''
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/models/iris_rf.joblib
volumes:
- ./models:/models:ro
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
mlflow:
image: ghcr.io/mlflow/mlflow:latest
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0
volumes:
- mlflow_data:/mlflow
volumes:
mlflow_data:
'''
print(COMPOSE_YAML)import pathlib
# Generate project structure for a dockerized ML API
files = {
'Dockerfile': '''
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
''',
'requirements.txt': 'fastapi\nuvicorn\nscikit-learn\njoblib\nnumpy\n',
'.dockerignore': '__pycache__\n*.pyc\n.git\n*.ipynb\n',
'app.py': '''
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np, os
app = FastAPI()
model = joblib.load(os.getenv('MODEL_PATH', 'model.joblib'))
class Request(BaseModel):
features: list
@app.post('/predict')
def predict(req: Request):
return {'prediction': int(model.predict([req.features])[0])}
@app.get('/health')
def health():
return {'status': 'ok'}
''',
}
proj = pathlib.Path('ml_docker_project')
proj.mkdir(exist_ok=True)
for fname, content in files.items():
(proj / fname).write_text(content.strip())
print(f'Created: {proj / fname}')def generate_dockerfile(packages: list, python_version: str = '3.11', entrypoint: str = 'python app.py') -> str:
# TODO: generate FROM, WORKDIR, COPY requirements.txt, RUN pip install, COPY ., CMD
pass
dockerfile = generate_dockerfile(
packages=['fastapi', 'uvicorn', 'scikit-learn', 'joblib'],
python_version='3.11',
entrypoint='uvicorn app:app --host 0.0.0.0 --port 8000'
)
print(dockerfile)Monitor deployed models for performance degradation and data drift. Detect distribution shifts between training and production data.
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# Simulate production predictions over time
np.random.seed(42)
weeks = 8
# Week 1-4: stable; Week 5-8: drift
pred_probs = [
np.random.beta(2, 5, 1000) for _ in range(4) # stable
] + [
np.random.beta(5, 2, 1000) for _ in range(4) # drifted
]
mean_preds = [p.mean() for p in pred_probs]
print('Weekly mean prediction probabilities:')
for w, m in enumerate(mean_preds, 1):
drift_flag = ' *** DRIFT DETECTED ***' if m > 0.5 else ''
print(f' Week {w}: {m:.3f}{drift_flag}')from scipy import stats
import numpy as np
# Training data distribution
np.random.seed(42)
train_data = {
'age': np.random.normal(35, 10, 1000),
'income': np.random.lognormal(10, 0.5, 1000),
}
# Production data (age drifted, income stable)
prod_data = {
'age': np.random.normal(45, 12, 500), # shifted mean
'income': np.random.lognormal(10, 0.5, 500), # same
}
print('Kolmogorov-Smirnov Drift Test:')
for feature in train_data:
ks_stat, p_value = stats.ks_2samp(train_data[feature], prod_data[feature])
drift = 'DRIFT' if p_value < 0.05 else 'OK'
print(f' {feature}: KS={ks_stat:.3f}, p={p_value:.4f} -> {drift}')import numpy as np
def psi(expected, actual, bins=10):
"""Population Stability Index. PSI < 0.1 = stable, 0.1-0.25 = slight shift, > 0.25 = major shift."""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
breakpoints = np.unique(breakpoints)
exp_counts = np.histogram(expected, bins=breakpoints)[0]
act_counts = np.histogram(actual, bins=breakpoints)[0]
# Avoid division by zero
exp_pct = np.where(exp_counts == 0, 0.0001, exp_counts / len(expected))
act_pct = np.where(act_counts == 0, 0.0001, act_counts / len(actual))
return np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))
np.random.seed(42)
train_scores = np.random.beta(2, 5, 1000)
stable_prod = np.random.beta(2, 5, 500) # same distribution
drifted_prod = np.random.beta(5, 2, 500) # different distribution
print(f'PSI stable: {psi(train_scores, stable_prod):.4f}') # expect < 0.1
print(f'PSI drifted: {psi(train_scores, drifted_prod):.4f}') # expect > 0.25import numpy as np
from collections import deque
class ModelMonitor:
def __init__(self, window=100, threshold=0.05):
self.window = window
self.threshold = threshold
self.baseline_acc = None
self.recent = deque(maxlen=window)
self.alerts = []
def set_baseline(self, accuracy):
self.baseline_acc = accuracy
def log_prediction(self, pred, true_label):
self.recent.append(int(pred == true_label))
if len(self.recent) == self.window:
current_acc = sum(self.recent) / self.window
drop = self.baseline_acc - current_acc
if drop > self.threshold:
alert = f'ALERT: accuracy dropped {drop:.1%} (current={current_acc:.3f})'
self.alerts.append(alert)
print(alert)
np.random.seed(42)
monitor = ModelMonitor(window=50, threshold=0.05)
monitor.set_baseline(0.95)
# Good predictions, then degradation
for _ in range(100):
monitor.log_prediction(1, 1 if np.random.rand() > 0.05 else 0)
for _ in range(100):
monitor.log_prediction(1, 1 if np.random.rand() > 0.20 else 0) # degradedimport numpy as np
from scipy import stats
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class DriftReport:
feature: str
ks_stat: float
p_value: float
drifted: bool
class MLMonitor:
def __init__(self, reference_data: Dict[str, np.ndarray], alpha: float = 0.05):
self.reference = reference_data
self.alpha = alpha
def check_drift(self, current_data: Dict[str, np.ndarray]) -> List[DriftReport]:
reports = []
for feature, ref_vals in self.reference.items():
if feature not in current_data:
continue
ks, p = stats.ks_2samp(ref_vals, current_data[feature])
reports.append(DriftReport(feature, round(ks, 4), round(p, 4), p < self.alpha))
return reports
np.random.seed(42)
reference = {'credit_score': np.random.normal(680, 80, 1000),
'income': np.random.lognormal(11, 0.4, 1000)}
current = {'credit_score': np.random.normal(630, 90, 300), # drifted
'income': np.random.lognormal(11, 0.4, 300)} # stable
monitor = MLMonitor(reference)
for report in monitor.check_drift(current):
status = 'DRIFT' if report.drifted else 'OK'
print(f'{report.feature}: KS={report.ks_stat}, p={report.p_value} -> {status}')import numpy as np
def psi(expected, actual, bins=10):
breakpoints = np.unique(np.percentile(expected, np.linspace(0, 100, bins + 1)))
exp_c = np.histogram(expected, bins=breakpoints)[0]
act_c = np.histogram(actual, bins=breakpoints)[0]
e = np.where(exp_c == 0, 1e-4, exp_c / len(expected))
a = np.where(act_c == 0, 1e-4, act_c / len(actual))
return float(np.sum((a - e) * np.log(a / e)))
np.random.seed(42)
train = {'age': np.random.normal(35, 10, 1000),
'income': np.random.lognormal(10, 0.5, 1000),
'score': np.random.beta(2, 5, 1000)}
prod = {'age': np.random.normal(45, 12, 500), # drifted
'income': np.random.lognormal(10, 0.5, 500),
'score': np.random.beta(5, 2, 500)} # drifted
# TODO: compute PSI for each feature
# TODO: print table: feature | PSI | stability labelAutomate model training, testing, and deployment with CI/CD. Learn to write GitHub Actions workflows and automated model validation.
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
import numpy as np, sys
ACCURACY_THRESHOLD = 0.95
F1_THRESHOLD = 0.94
def validate_model(model, X, y):
"""Return True if model passes all quality gates."""
acc_scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')
f1_scores = cross_val_score(model, X, y, cv=5, scoring='f1_weighted')
print(f'Accuracy: {acc_scores.mean():.4f} Β± {acc_scores.std():.4f}')
print(f'F1: {f1_scores.mean():.4f} Β± {f1_scores.std():.4f}')
passed = acc_scores.mean() >= ACCURACY_THRESHOLD and f1_scores.mean() >= F1_THRESHOLD
print('PASSED' if passed else 'FAILED')
return passed
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(n_estimators=100, random_state=42)
if not validate_model(model, X, y):
sys.exit(1) # Fail CI pipelineGITHUB_ACTIONS_YAML = '''
# .github/workflows/ml_pipeline.yml
name: ML Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
train-and-validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model
run: python train.py
- name: Validate model
run: python validate.py
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: model.joblib
'''
print(GITHUB_ACTIONS_YAML)# tests/test_model.py (run with: pytest tests/)
import pytest, numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
TEST_CODE = '''
import pytest, numpy as np, joblib
from sklearn.datasets import load_iris
@pytest.fixture
def model():
return joblib.load('iris_rf.joblib')
@pytest.fixture
def iris_data():
X, y = load_iris(return_X_y=True)
return X, y
def test_model_accuracy(model, iris_data):
X, y = iris_data
assert model.score(X, y) >= 0.95
def test_model_output_shape(model, iris_data):
X, _ = iris_data
preds = model.predict(X[:10])
assert preds.shape == (10,)
def test_model_classes(model):
assert len(model.classes_) == 3
def test_prediction_valid_class(model, iris_data):
X, _ = iris_data
preds = model.predict(X)
assert set(preds).issubset({0, 1, 2})
'''
print(TEST_CODE)import joblib
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
def evaluate(model, X_test, y_test):
preds = model.predict(X_test)
return {'accuracy': accuracy_score(y_test, preds),
'f1': f1_score(y_test, preds, average='weighted')}
# Champion: currently deployed model
champion = RandomForestClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)
# Challenger: new candidate
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)
champ_metrics = evaluate(champion, X_test, y_test)
chall_metrics = evaluate(challenger, X_test, y_test)
print(f'Champion: {champ_metrics}')
print(f'Challenger: {chall_metrics}')
promote = chall_metrics['f1'] > champ_metrics['f1']
print(f'Promote challenger: {promote}')from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report
import sys, joblib
F1_GATE = 0.90
def train(X_train, y_train):
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
return model
def gate_check(model, X_test, y_test):
preds = model.predict(X_test)
f1 = f1_score(y_test, preds, average='weighted')
print(classification_report(y_test, preds))
if f1 < F1_GATE:
print(f'FAILED: F1={f1:.4f} < threshold {F1_GATE}')
sys.exit(1)
print(f'PASSED: F1={f1:.4f}')
return model
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = train(X_train, y_train)
validated_model = gate_check(model, X_test, y_test)
joblib.dump(validated_model, 'validated_model.joblib')
print('Model saved β ready for deployment.')import joblib, sys
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score
ACCURACY_GATE = 0.92
PRECISION_GATE = 0.90
def validate(model_path: str):
# TODO: load model from model_path
# TODO: load iris, split 80/20
# TODO: compute accuracy and weighted precision
# TODO: print results
# TODO: sys.exit(1) if below thresholds
pass
validate('iris_rf.joblib')Build reproducible feature engineering pipelines. Use sklearn Pipeline, ColumnTransformer, and custom transformers for production-ready preprocessing.
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
df = pd.DataFrame({
'age': [25, np.nan, 35, 42, 28],
'income': [50000, 75000, np.nan, 90000, 62000],
'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
'gender': ['M', 'F', 'F', 'M', np.nan],
})
num_features = ['age', 'income']
cat_features = ['city', 'gender']
num_pipe = Pipeline([('impute', SimpleImputer(strategy='median')),
('scale', StandardScaler())])
cat_pipe = Pipeline([('impute', SimpleImputer(strategy='most_frequent')),
('encode', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])
preprocessor = ColumnTransformer([
('num', num_pipe, num_features),
('cat', cat_pipe, cat_features),
])
X = preprocessor.fit_transform(df)
print('Transformed shape:', X.shape)
print('First row:', X[0].round(3))import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
class LogTransformer(BaseEstimator, TransformerMixin):
def __init__(self, offset=1):
self.offset = offset
def fit(self, X, y=None):
return self
def transform(self, X):
return np.log(X + self.offset)
class OutlierClipper(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
self.lower_ = np.percentile(X, 1, axis=0)
self.upper_ = np.percentile(X, 99, axis=0)
return self
def transform(self, X):
return np.clip(X, self.lower_, self.upper_)
X = np.array([[1, 1000], [5, 5000], [100, 100000], [1, 50]])
pipe = Pipeline([('clip', OutlierClipper()), ('log', LogTransformer())])
print('Original:\n', X)
print('Transformed:\n', pipe.fit_transform(X).round(3))from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import cross_val_score
X, y = load_breast_cancer(return_X_y=True)
print(f'Original features: {X.shape[1]}')
pipe = Pipeline([
('scaler', StandardScaler()),
('select', SelectKBest(f_classif, k=10)),
('clf', RandomForestClassifier(n_estimators=100, random_state=42)),
])
scores = cross_val_score(pipe, X, y, cv=5, scoring='accuracy')
print(f'CV accuracy (10 features): {scores.mean():.4f} Β± {scores.std():.4f}')
pipe.fit(X, y)
selected = pipe['select'].get_support()
print(f'Selected feature indices: {selected.nonzero()[0]}')import joblib
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
# Training data
train = pd.DataFrame({'age': [25,35,45,55], 'city': ['NYC','LA','NYC','LA'], 'target': [0,1,0,1]})
X_train = train.drop('target', axis=1)
y_train = train['target']
preprocessor = ColumnTransformer([
('num', StandardScaler(), ['age']),
('cat', OneHotEncoder(sparse_output=False), ['city']),
])
pipe = Pipeline([('prep', preprocessor), ('clf', LogisticRegression())])
pipe.fit(X_train, y_train)
# Save entire pipeline (includes fitted scaler + encoder)
joblib.dump(pipe, 'full_pipeline.joblib')
# Load and predict new data
loaded = joblib.load('full_pipeline.joblib')
new_data = pd.DataFrame({'age': [30, 50], 'city': ['LA', 'NYC']})
print('Predictions:', loaded.predict(new_data))import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(18, 80, n).astype(float),
'income': np.random.lognormal(10, 0.5, n),
'region': np.random.choice(['North', 'South', 'East', 'West'], n),
'vehicle_age':np.random.randint(0, 20, n).astype(float),
'claim': np.random.binomial(1, 0.15, n),
})
# Inject missing values
df.loc[np.random.choice(n, 30), 'age'] = np.nan
df.loc[np.random.choice(n, 20), 'region'] = np.nan
X = df.drop('claim', axis=1)
y = df['claim']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
num_pipe = Pipeline([('imp', SimpleImputer(strategy='median')), ('scl', StandardScaler())])
cat_pipe = Pipeline([('imp', SimpleImputer(strategy='most_frequent')),
('enc', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])
prep = ColumnTransformer([
('num', num_pipe, ['age', 'income', 'vehicle_age']),
('cat', cat_pipe, ['region']),
])
full_pipe = Pipeline([('prep', prep), ('clf', GradientBoostingClassifier(random_state=42))])
full_pipe.fit(X_train, y_train)
print(f'Test accuracy: {full_pipe.score(X_test, y_test):.4f}')
joblib.dump(full_pipe, 'insurance_pipeline.joblib')
print('Pipeline saved.')import pandas as pd, numpy as np, joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
data = pd.DataFrame({
'tenure': [12, np.nan, 36, 24, 6],
'charges': [50.0, 75.0, np.nan, 90.0, 45.0],
'plan': ['basic', 'premium', 'basic', np.nan, 'premium'],
'country': ['US', 'UK', 'US', 'CA', 'UK'],
})
# TODO: define num_pipe and cat_pipe
# TODO: combine with ColumnTransformer
# TODO: fit_transform data
# TODO: save and reload with joblib
# TODO: print transformed shapeSafely roll out new models by running controlled experiments. Compare champion vs challenger using statistical tests to decide which to promote.
import numpy as np
from dataclasses import dataclass
from typing import Any
@dataclass
class ABRouter:
champion: Any
challenger: Any
challenger_pct: float = 0.1 # 10% traffic to challenger
def predict(self, X):
route = 'challenger' if np.random.rand() < self.challenger_pct else 'champion'
model = self.challenger if route == 'challenger' else self.champion
pred = model.predict([X])[0]
return {'prediction': pred, 'model': route}
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
champion = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X, y)
router = ABRouter(champion, challenger, challenger_pct=0.2)
for _ in range(5):
result = router.predict(X[0])
print(f'Model: {result["model"]:12} | Prediction: {result["prediction"]}')import numpy as np
from collections import defaultdict
from scipy import stats
# Simulate A/B experiment: collect correct/incorrect per model
np.random.seed(42)
n_requests = 1000
results = defaultdict(list)
for _ in range(n_requests):
group = 'challenger' if np.random.rand() < 0.2 else 'champion'
# Simulated accuracy: champion=0.92, challenger=0.95
accuracy = 0.95 if group == 'challenger' else 0.92
correct = int(np.random.rand() < accuracy)
results[group].append(correct)
for group, outcomes in results.items():
print(f'{group}: n={len(outcomes)}, accuracy={np.mean(outcomes):.4f}')
# Two-proportion z-test
champ = results['champion']
chall = results['challenger']
_, p = stats.ttest_ind(champ, chall)
print(f'p-value: {p:.4f} -> {"significant" if p < 0.05 else "not significant"}')import numpy as np
from scipy import stats
# Confusion matrix style: correct vs incorrect per model
champion_results = {'correct': 460, 'incorrect': 40} # n=500, 92%
challenger_results = {'correct': 190, 'incorrect': 10} # n=200, 95%
# Chi-squared contingency test
observed = np.array([
[champion_results['correct'], champion_results['incorrect']],
[challenger_results['correct'], challenger_results['incorrect']],
])
chi2, p, dof, expected = stats.chi2_contingency(observed)
print(f'Chi2: {chi2:.4f}')
print(f'p-value: {p:.4f}')
print(f'Significant at alpha=0.05: {p < 0.05}')
if p < 0.05:
champ_acc = champion_results['correct'] / sum(champion_results.values())
chall_acc = challenger_results['correct'] / sum(challenger_results.values())
winner = 'challenger' if chall_acc > champ_acc else 'champion'
print(f'Promote: {winner}')import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
X, y = load_iris(return_X_y=True)
champion = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)
challenger = GradientBoostingClassifier(n_estimators=50, random_state=42).fit(X, y)
shadow_log = []
def serve_with_shadow(X_sample, true_label=None):
"""Serve champion, silently log challenger β no impact on users."""
champ_pred = champion.predict([X_sample])[0]
chall_pred = challenger.predict([X_sample])[0] # shadow
shadow_log.append({'champion': champ_pred, 'challenger': chall_pred,
'true': true_label})
return champ_pred # only champion's prediction served
for xi, yi in zip(X[:20], y[:20]):
serve_with_shadow(xi, yi)
champ_acc = np.mean([l['champion'] == l['true'] for l in shadow_log])
chall_acc = np.mean([l['challenger'] == l['true'] for l in shadow_log])
print(f'Shadow test: champion={champ_acc:.2f}, challenger={chall_acc:.2f}')import numpy as np
from scipy import stats
from dataclasses import dataclass, field
from typing import List
@dataclass
class ABExperiment:
name: str
challenger_pct: float = 0.1
champion_outcomes: List[float] = field(default_factory=list)
challenger_outcomes: List[float] = field(default_factory=list)
def log(self, reward: float):
group = 'challenger' if np.random.rand() < self.challenger_pct else 'champion'
(self.challenger_outcomes if group == 'challenger' else self.champion_outcomes).append(reward)
def report(self):
champ = np.array(self.champion_outcomes)
chall = np.array(self.challenger_outcomes)
_, p = stats.ttest_ind(chall, champ)
return {
'champion_mean': round(champ.mean(), 4),
'challenger_mean': round(chall.mean(), 4),
'p_value': round(p, 4),
'significant': p < 0.05,
'promote': p < 0.05 and chall.mean() > champ.mean(),
}
np.random.seed(42)
exp = ABExperiment('recommendation-v2', challenger_pct=0.1)
for _ in range(10000):
# Simulate CTR: champion=5%, challenger=6%
reward = np.random.binomial(1, 0.06) if np.random.rand() < 0.1 else np.random.binomial(1, 0.05)
exp.log(reward)
import json
print(json.dumps(exp.report(), indent=2))import numpy as np
from scipy import stats
np.random.seed(42)
model_a = np.random.binomial(1, 0.08, 5000) # 8% conversion
model_b = np.random.binomial(1, 0.10, 1000) # 10% conversion
def analyze_ab_test(a_outcomes, b_outcomes, alpha=0.05):
# TODO: compute conversion rates
# TODO: chi-squared test
# TODO: print results and promote/keep decision
pass
analyze_ab_test(model_a, model_b)Make black-box models interpretable. Use SHAP values, permutation importance, and LIME to explain individual predictions and global feature importance.
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
data = load_breast_cancer()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42)
importance_df = pd.DataFrame({
'feature': data.feature_names,
'importance': result.importances_mean,
'std': result.importances_std,
}).sort_values('importance', ascending=False)
print(importance_df.head(10).to_string(index=False))try:
import shap
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
data = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
data.data, data.target, random_state=42)
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test[:5])
print('SHAP values for first test sample:')
for feat, val in sorted(zip(data.feature_names, shap_values[0]),
key=lambda x: abs(x[1]), reverse=True)[:5]:
print(f' {feat:<35} {val:+.4f}')
except ImportError:
print('pip install shap')try:
import lime
from lime.lime_tabular import LimeTabularExplainer
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
data = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
data.data, data.target, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
explainer = LimeTabularExplainer(
X_train, feature_names=data.feature_names,
class_names=data.target_names, mode='classification')
exp = explainer.explain_instance(X_test[0], model.predict_proba, num_features=5)
print('Prediction:', model.predict([X_test[0]])[0], '->', data.target_names[model.predict([X_test[0]])[0]])
print('\nTop contributing features:')
for feat, weight in exp.as_list():
print(f' {feat[:45]:<45} {weight:+.4f}')
except ImportError:
print('pip install lime')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import PartialDependenceDisplay
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
data = load_breast_cancer()
X_train, X_test, y_train, _ = train_test_split(data.data, data.target, random_state=42)
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Plot partial dependence for top 2 features
fig, ax = plt.subplots(figsize=(10, 4))
PartialDependenceDisplay.from_estimator(
model, X_train, features=[0, 1],
feature_names=data.feature_names, ax=ax)
plt.tight_layout()
plt.savefig('pdp.png', dpi=80)
plt.close()
print('PDP saved to pdp.png')
print(f'Feature 0: {data.feature_names[0]}')
print(f'Feature 1: {data.feature_names[1]}')try:
import shap
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
np.random.seed(42)
n = 500
feature_names = ['credit_score', 'income', 'debt_ratio', 'loan_amount', 'employment_years']
X = np.column_stack([
np.random.normal(680, 80, n),
np.random.lognormal(11, 0.5, n),
np.random.beta(2, 5, n),
np.random.lognormal(12, 0.3, n),
np.random.uniform(0, 30, n),
])
y = (X[:, 0] > 700).astype(int) ^ (X[:, 2] > 0.4).astype(int)
X_train, X_test = train_test_split(X, random_state=42)
y_train, y_test = train_test_split(y, random_state=42)
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
explainer = shap.TreeExplainer(model)
shap_vals = explainer.shap_values(X_test[:3])
for i, (sv, pred) in enumerate(zip(shap_vals, model.predict(X_test[:3]))):
decision = 'APPROVED' if pred == 1 else 'DENIED'
print(f'Application {i+1}: {decision}')
factors = sorted(zip(feature_names, sv), key=lambda x: abs(x[1]), reverse=True)[:3]
for feat, val in factors:
direction = 'increased' if val > 0 else 'decreased'
print(f' {feat} {direction} approval chance by {abs(val):.4f}')
print()
except ImportError:
print('pip install shap')from sklearn.ensemble import GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
data.data, data.target, test_size=0.2, random_state=42)
# TODO: train GradientBoostingClassifier
# TODO: compute permutation_importance on X_test
# TODO: create DataFrame with feature, importance, std
# TODO: sort by importance descending and printOrganize a full ML project with proper structure, configuration management, reproducible training scripts, and a deployment-ready layout.
PROJECT_STRUCTURE = '''
ml_project/
βββ data/
β βββ raw/ # Original, immutable data
β βββ processed/ # Cleaned and feature-engineered data
β βββ external/ # Third-party data
βββ notebooks/ # Jupyter notebooks for exploration
βββ src/
β βββ __init__.py
β βββ data.py # Data loading and preprocessing
β βββ features.py # Feature engineering
β βββ train.py # Model training
β βββ evaluate.py # Model evaluation
β βββ predict.py # Inference functions
βββ models/ # Saved model artifacts
βββ tests/ # Unit and integration tests
βββ api/
β βββ app.py # FastAPI serving
βββ config.yaml # Configuration file
βββ requirements.txt
βββ Dockerfile
βββ Makefile
βββ README.md
'''
print(PROJECT_STRUCTURE)try:
import yaml
from dataclasses import dataclass
CONFIG_YAML = '''
model:
type: GradientBoostingClassifier
params:
n_estimators: 100
max_depth: 5
learning_rate: 0.1
random_state: 42
data:
train_path: data/processed/train.csv
test_path: data/processed/test.csv
target_col: label
test_size: 0.2
training:
experiment_name: fraud-detection-v2
cross_val_folds: 5
metrics: [accuracy, f1_weighted, roc_auc]
serving:
host: 0.0.0.0
port: 8000
model_path: models/best_model.joblib
'''
config = yaml.safe_load(CONFIG_YAML)
print('Model type:', config['model']['type'])
print('n_estimators:', config['model']['params']['n_estimators'])
print('Experiment:', config['training']['experiment_name'])
except ImportError:
print('pip install pyyaml')# src/train.py pattern
TRAIN_SCRIPT = '''
import argparse, joblib, json, time, pathlib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report
def parse_args():
p = argparse.ArgumentParser()
p.add_argument('--n-estimators', type=int, default=100)
p.add_argument('--max-depth', type=int, default=5)
p.add_argument('--lr', type=float, default=0.1)
p.add_argument('--output-dir', default='models')
return p.parse_args()
def main():
args = parse_args()
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = GradientBoostingClassifier(
n_estimators=args.n_estimators, max_depth=args.max_depth,
learning_rate=args.lr, random_state=42)
model.fit(X_train, y_train)
preds = model.predict(X_test)
print(classification_report(y_test, preds))
out = pathlib.Path(args.output_dir)
out.mkdir(exist_ok=True)
joblib.dump(model, out / 'model.joblib')
print(f'Model saved to {out}/model.joblib')
if __name__ == "__main__":
main()
'''
print(TRAIN_SCRIPT)MAKEFILE = '''
# Makefile
.PHONY: install train test serve docker-build docker-run
install:
\tpip install -r requirements.txt
train:
\tpython src/train.py --n-estimators 100 --max-depth 5
validate:
\tpython src/validate.py --model-path models/model.joblib
test:
\tpytest tests/ -v --cov=src --cov-report=term-missing
serve:
\tuvicorn api.app:app --host 0.0.0.0 --port 8000 --reload
docker-build:
\tdocker build -t ml-api:latest .
docker-run:
\tdocker run -p 8000:8000 ml-api:latest
clean:
\tfind . -name __pycache__ -exec rm -rf {} + 2>/dev/null; true
\trm -f models/*.joblib
'''
print(MAKEFILE)import pathlib, json
def scaffold_ml_project(name: str):
"""Generate a reproducible ML project skeleton."""
root = pathlib.Path(name)
dirs = ['data/raw', 'data/processed', 'notebooks', 'src', 'models', 'tests', 'api']
for d in dirs:
(root / d).mkdir(parents=True, exist_ok=True)
files = {
'src/__init__.py': '',
'tests/__init__.py': '',
'requirements.txt': 'scikit-learn\njoblib\nfastapi\nuvicorn\nmlflow\npyyaml\npytest\n',
'config.yaml': 'model:\n type: RandomForestClassifier\n params:\n n_estimators: 100\n',
'src/train.py': '# Training script\nif __name__ == "__main__":\n print("Train model here")\n',
'api/app.py': 'from fastapi import FastAPI\napp = FastAPI()\n@app.get("/health")\ndef health():\n return {"status": "ok"}\n',
'.gitignore': '__pycache__/\n*.joblib\n.env\nmlruns/\n',
}
for fpath, content in files.items():
(root / fpath).write_text(content)
print(f'Project {name!r} scaffolded:')
for p in sorted(root.rglob('*')):
indent = ' ' * (len(p.relative_to(root).parts) - 1)
print(f'{indent}{p.name}{"" if p.is_file() else "/"}')
scaffold_ml_project('my_ml_project')import pathlib
def scaffold_ml_project(name: str):
root = pathlib.Path(name)
dirs = ['data/raw', 'data/processed', 'src', 'tests', 'models', 'api']
for d in dirs:
(root / d).mkdir(parents=True, exist_ok=True)
# TODO: add Dockerfile content
# TODO: add Makefile with install/train/test/serve targets
# TODO: add src/evaluate.py with a validate() stub
# TODO: write all files and print the directory tree
pass
scaffold_ml_project('demo_project')Reduce model size and latency through quantization, pruning, knowledge distillation, and ONNX export for production deployment.
import numpy as np
# Simulate 32-bit float weights -> 8-bit integer quantization
np.random.seed(42)
weights_f32 = np.random.randn(64, 64).astype(np.float32)
def quantize_int8(tensor):
scale = (tensor.max() - tensor.min()) / 255
zero_point = int(-tensor.min() / scale)
q = np.clip(np.round(tensor / scale + zero_point), 0, 255).astype(np.uint8)
return q, scale, zero_point
def dequantize(q_tensor, scale, zero_point):
return scale * (q_tensor.astype(np.float32) - zero_point)
q, scale, zp = quantize_int8(weights_f32)
recovered = dequantize(q, scale, zp)
f32_bytes = weights_f32.nbytes
int8_bytes = q.nbytes
err = np.abs(weights_f32 - recovered).mean()
print(f'Original (float32): {f32_bytes:,} bytes')
print(f'Quantized (int8): {int8_bytes:,} bytes')
print(f'Compression ratio: {f32_bytes/int8_bytes:.1f}x')
print(f'Mean absolute error: {err:.6f}')
print(f'Relative error: {err/np.abs(weights_f32).mean():.2%}')import numpy as np
np.random.seed(0)
weight_matrix = np.random.randn(128, 64)
def prune_weights(weights, sparsity=0.5):
"""Zero out smallest abs-value weights to achieve target sparsity."""
flat = np.abs(weights).flatten()
thresh = np.percentile(flat, sparsity * 100)
mask = np.abs(weights) > thresh
pruned = weights * mask
actual_sparsity = 1 - mask.mean()
return pruned, mask, actual_sparsity
for target in [0.3, 0.5, 0.7, 0.9]:
pruned, mask, actual = prune_weights(weight_matrix, sparsity=target)
nnz = mask.sum()
orig_err = np.linalg.norm(weight_matrix - pruned, 'fro')
print(f'Sparsity {target:.0%}: {nnz:,} non-zero, '
f'Frobenius error={orig_err:.3f}, '
f'Memory reduction={actual:.1%}')
print('\nNote: sparse storage (CSR/COO) needed to realize memory savings.')
print('Effective with magnitude-based unstructured pruning + sparse matmul.')import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
split = 1600
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# Teacher: large GBM
teacher = GradientBoostingClassifier(n_estimators=100, max_depth=5, random_state=42)
teacher.fit(X_tr, y_tr)
teacher_acc = accuracy_score(y_te, teacher.predict(X_te))
# Soft labels (temperature scaling)
soft_labels = teacher.predict_proba(X_tr)[:, 1]
# Binarize with threshold 0.5 (soft to hard for sklearn student)
soft_hard = (soft_labels > 0.5).astype(int)
# Student: shallow decision tree trained on soft labels
student_distill = DecisionTreeClassifier(max_depth=4, random_state=42)
student_distill.fit(X_tr, soft_hard)
student_direct = DecisionTreeClassifier(max_depth=4, random_state=42)
student_direct.fit(X_tr, y_tr)
print(f'Teacher (GBM 100 trees): {teacher_acc:.3f}')
print(f'Student (distilled): {accuracy_score(y_te, student_distill.predict(X_te)):.3f}')
print(f'Student (direct): {accuracy_score(y_te, student_direct.predict(X_te)):.3f}')
print(f'\nTeacher params: ~{100*5**5:,} nodes est.')
print(f'Student params: ~{2**4:,} leaves')import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
import json
np.random.seed(0)
X, y = make_classification(n_samples=500, n_features=10, random_state=0)
model = LogisticRegression(max_iter=200).fit(X, y)
# Simulate ONNX export by serializing model parameters
def export_to_dict(model, feature_names=None):
return {
'type': 'LogisticRegression',
'coef': model.coef_.tolist(),
'intercept': model.intercept_.tolist(),
'classes': model.classes_.tolist(),
'n_features': model.n_features_in_,
}
def infer_from_dict(model_dict, X):
coef = np.array(model_dict['coef'])
intercept = np.array(model_dict['intercept'])
logit = X @ coef.T + intercept
proba = 1 / (1 + np.exp(-logit))
return (proba > 0.5).astype(int).flatten()
exported = export_to_dict(model)
print('Exported model (JSON):') ; print(json.dumps({k: str(v)[:40] for k, v in exported.items()}, indent=2))
pred_orig = model.predict(X[:5])
pred_onnx = infer_from_dict(exported, X[:5])
print(f'\nOriginal predictions: {pred_orig}')
print(f'Simulated inference: {pred_onnx}')
print(f'Match: {np.all(pred_orig == pred_onnx)}')import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import make_classification
from sklearn.metrics import roc_auc_score
import time
np.random.seed(1)
X, y = make_classification(n_samples=5000, n_features=30, random_state=1)
split = 4000
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# Teacher
teacher = GradientBoostingClassifier(n_estimators=100, max_depth=4, random_state=1)
teacher.fit(X_tr, y_tr)
t0 = time.perf_counter()
teacher_proba = teacher.predict_proba(X_te)[:,1]
t_teacher = (time.perf_counter() - t0) * 1000
# Student (distilled)
soft = (teacher.predict_proba(X_tr)[:,1] > 0.5).astype(int)
student = DecisionTreeClassifier(max_depth=6, random_state=1).fit(X_tr, soft)
t0 = time.perf_counter()
student_proba = student.predict_proba(X_te)[:,1]
t_student = (time.perf_counter() - t0) * 1000
print(f'Teacher AUC: {roc_auc_score(y_te, teacher_proba):.4f} | Time: {t_teacher:.1f}ms')
print(f'Student AUC: {roc_auc_score(y_te, student_proba):.4f} | Time: {t_student:.1f}ms')
print(f'Speed-up: {t_teacher/max(t_student,0.001):.1f}x')import numpy as np
np.random.seed(42)
W = np.random.randn(256, 256).astype(np.float32)
def quantize(tensor, bits):
# TODO: quantize to `bits`-bit unsigned integer
# levels = 2**bits
# Compute scale and zero_point, quantize, dequantize
pass
for bits in [4, 8, 16]:
recovered, compression = quantize(W, bits)
# TODO: print bits, compression ratio, MAE, max error
pass
Design and implement feature stores, data versioning, and reproducible pipelines for consistent feature serving between training and inference.
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List
class FeatureStore:
def __init__(self):
self._store = {} # entity_id -> {feature_name: (value, timestamp)}
def write(self, entity_id: str, features: dict, timestamp=None):
if timestamp is None:
timestamp = datetime.utcnow()
if entity_id not in self._store:
self._store[entity_id] = {}
for k, v in features.items():
self._store[entity_id][k] = (v, timestamp)
def read(self, entity_id: str, feature_names: List[str]) -> dict:
if entity_id not in self._store:
return {f: None for f in feature_names}
return {
f: self._store[entity_id].get(f, (None, None))[0]
for f in feature_names
}
def get_training_dataset(self, entities: List[str], features: List[str]) -> pd.DataFrame:
rows = [{'entity_id': eid, **self.read(eid, features)} for eid in entities]
return pd.DataFrame(rows)
# Usage
fs = FeatureStore()
for uid, feats in [
('user_1', {'age': 28, 'spend_30d': 150.0, 'logins_7d': 5}),
('user_2', {'age': 35, 'spend_30d': 400.0, 'logins_7d': 12}),
('user_3', {'age': 22, 'spend_30d': 50.0, 'logins_7d': 1}),
]:
fs.write(uid, feats)
df = fs.get_training_dataset(['user_1', 'user_2', 'user_3'], ['age', 'spend_30d', 'logins_7d'])
print(df.to_string(index=False))
print('\nPoint lookup:', fs.read('user_2', ['spend_30d', 'logins_7d']))import pandas as pd
import numpy as np
import hashlib, json
class ReproduciblePipeline:
def __init__(self, steps: list, params: dict):
self.steps = steps
self.params = params
self._history = []
def run(self, df: pd.DataFrame) -> pd.DataFrame:
self._history = []
for name, fn in self.steps:
df = fn(df, self.params)
self._history.append({'step': name, 'rows': len(df), 'cols': list(df.columns)})
return df
def fingerprint(self) -> str:
blob = json.dumps({'params': self.params, 'steps': [s[0] for s in self.steps]}, sort_keys=True)
return hashlib.md5(blob.encode()).hexdigest()[:12]
# Define pipeline steps
def drop_nulls(df, params): return df.dropna()
def filter_age(df, params): return df[df['age'] >= params['min_age']]
def add_feature(df, params): df = df.copy(); df['age_squared'] = df['age']**2; return df
np.random.seed(42)
df_raw = pd.DataFrame({'age': np.random.randint(16, 70, 100), 'spend': np.random.rand(100)*1000})
df_raw.loc[[5, 23, 67], 'age'] = np.nan
pipeline = ReproduciblePipeline(
steps=[('drop_nulls', drop_nulls), ('filter_age', filter_age), ('add_feature', add_feature)],
params={'min_age': 21}
)
result = pipeline.run(df_raw)
print(f'Pipeline fingerprint: {pipeline.fingerprint()}')
print(f'Input rows: {len(df_raw)}, Output rows: {len(result)}')
for step in pipeline._history:
print(f' After {step["step"]}: {step["rows"]} rows, {len(step["cols"])} cols')import pandas as pd
import numpy as np
import hashlib, json
from datetime import datetime
class DataVersion:
registry = []
@classmethod
def snapshot(cls, df: pd.DataFrame, name: str, metadata: dict = None) -> str:
checksum = hashlib.md5(pd.util.hash_pandas_object(df).values.tobytes()).hexdigest()[:12]
version = {'name': name, 'checksum': checksum, 'shape': df.shape,
'timestamp': str(datetime.utcnow())[:19],
'columns': list(df.columns), 'metadata': metadata or {}}
cls.registry.append(version)
return checksum
@classmethod
def show_lineage(cls):
for v in cls.registry:
print(f"{v['timestamp']} | {v['name']:<20} | {str(v['shape']):<12} | cksum={v['checksum']}")
np.random.seed(7)
df_v1 = pd.DataFrame({'x': np.random.randn(100), 'y': np.random.randint(0,2,100)})
DataVersion.snapshot(df_v1, 'raw_data', {'source': 'sensor_A'})
df_v2 = df_v1.dropna()
DataVersion.snapshot(df_v2, 'cleaned', {'action': 'dropna'})
df_v3 = df_v2.copy(); df_v3['x_scaled'] = (df_v3['x'] - df_v3['x'].mean()) / df_v3['x'].std()
DataVersion.snapshot(df_v3, 'features_v1', {'scaler': 'standard'})
DataVersion.show_lineage()import numpy as np
from scipy import stats
# Detect distribution shift between training features and production features
np.random.seed(42)
# Training distribution
train_features = {
'age': np.random.normal(35, 10, 1000),
'income': np.random.lognormal(10, 0.5, 1000),
'score': np.random.beta(2, 5, 1000),
}
# Production (simulated drift)
prod_features = {
'age': np.random.normal(35, 10, 500), # same
'income': np.random.lognormal(10.3, 0.5, 500), # shifted
'score': np.random.beta(3, 3, 500), # different shape
}
print('Train/Serve Skew Detection (KS Test):')
print(f'{"Feature":<12} {"KS stat":>10} {"p-value":>10} {"Drifted?":>10}')
print('-' * 48)
for feat in train_features:
ks_stat, p_val = stats.ks_2samp(train_features[feat], prod_features[feat])
drifted = 'YES' if p_val < 0.05 else 'no'
print(f'{feat:<12} {ks_stat:>10.4f} {p_val:>10.4f} {drifted:>10}')import pandas as pd
import numpy as np
from scipy import stats
class FeatureRegistry:
def __init__(self):
self._schemas = {} # name -> {'dtype', 'mean', 'std'}
self._versions = []
def register(self, name: str, series: pd.Series):
self._schemas[name] = {
'dtype': str(series.dtype),
'mean': float(series.mean()),
'std': float(series.std()),
'min': float(series.min()),
'max': float(series.max()),
}
self._versions.append({'name': name, 'n': len(series)})
def validate(self, name: str, series: pd.Series, alpha: float = 0.05):
if name not in self._schemas:
raise ValueError(f'Feature {name!r} not registered.')
ref = self._schemas[name]
issues = []
if str(series.dtype) != ref['dtype']:
issues.append(f'dtype mismatch: {series.dtype} vs {ref["dtype"]}')
z = abs(series.mean() - ref['mean']) / max(ref['std'], 1e-6)
if z > 3:
issues.append(f'mean shift: z={z:.2f}')
return issues or ['OK']
np.random.seed(5)
reg = FeatureRegistry()
for feat, vals in [('age', pd.Series(np.random.normal(35,10,1000))),
('income', pd.Series(np.random.lognormal(10,0.5,1000)))]:
reg.register(feat, vals)
# Serve time validation
prod_age = pd.Series(np.random.normal(35, 10, 200))
prod_income = pd.Series(np.random.lognormal(10.8, 0.5, 200)) # drifted
print('age validation: ', reg.validate('age', prod_age))
print('income validation:', reg.validate('income', prod_income))import numpy as np
import pandas as pd
class MonitoredFeatureStore:
def __init__(self):
self.train_stats = {} # feature -> {mean, std, min, max}
def fit(self, df: pd.DataFrame):
# TODO: compute and store stats for each column
pass
def validate(self, df: pd.DataFrame, z_threshold: float = 2.0) -> dict:
# TODO: compare serve-time stats to training stats
# Return dict: {feature: 'OK' or 'DRIFT (z=X.X)'}
pass
np.random.seed(0)
train = pd.DataFrame({'age': np.random.normal(30,8,1000), 'salary': np.random.normal(60000,10000,1000), 'score': np.random.beta(2,5,1000)})
serve = pd.DataFrame({'age': np.random.normal(30,8,200), 'salary': np.random.normal(75000,10000,200), 'score': np.random.beta(2,5,200)})
store = MonitoredFeatureStore()
store.fit(train)
print(store.validate(serve))
Automate model selection and hyperparameter tuning using grid search, random search, Bayesian optimization (Optuna), and automated feature engineering.
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import f1_score
import time
np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7, None],
'min_samples_split': [2, 5, 10],
'max_features': ['sqrt', 'log2'],
}
# Grid search: exhaustive
t0 = time.perf_counter()
gs = GridSearchCV(RandomForestClassifier(random_state=0), param_grid, cv=3, scoring='f1', n_jobs=-1)
gs.fit(X, y)
grid_time = time.perf_counter() - t0
# Random search: n_iter combinations
t0 = time.perf_counter()
rs = RandomizedSearchCV(RandomForestClassifier(random_state=0), param_grid, n_iter=10, cv=3, scoring='f1', n_jobs=-1, random_state=42)
rs.fit(X, y)
rand_time = time.perf_counter() - t0
print(f'Grid Search: {gs.best_score_:.4f} | {3*4*3*2} combos | {grid_time:.1f}s')
print(f'Random Search: {rs.best_score_:.4f} | 10 combos | {rand_time:.1f}s')
print(f'Speed-up: {grid_time/rand_time:.1f}x')
print(f'Best (random): {rs.best_params_}')try:
import optuna
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
optuna.logging.set_verbosity(optuna.logging.WARNING)
np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
def objective(trial):
n_est = trial.suggest_int('n_estimators', 50, 300)
depth = trial.suggest_int('max_depth', 2, 8)
lr = trial.suggest_float('learning_rate', 0.01, 0.3, log=True)
subsamp = trial.suggest_float('subsample', 0.5, 1.0)
model = GradientBoostingClassifier(n_estimators=n_est, max_depth=depth, learning_rate=lr, subsample=subsamp, random_state=0)
scores = cross_val_score(model, X, y, cv=3, scoring='f1')
return scores.mean()
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print(f'Best F1: {study.best_value:.4f}')
print(f'Best params: {study.best_params}')
print(f'Total trials: {len(study.trials)}')
except ImportError:
print('pip install optuna')
print('Optuna uses Tree-structured Parzen Estimator (TPE) to model')
print('P(x|good) and P(x|bad) and samples from P(x|good) region.')import numpy as np
from sklearn.datasets import make_regression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
np.random.seed(42)
X, y = make_regression(n_samples=500, n_features=5, noise=10, random_state=42)
best_score, best_degree = -np.inf, 1
results = []
for degree in [1, 2, 3]:
pipe = Pipeline([
('poly', PolynomialFeatures(degree=degree, include_bias=False)),
('ridge', Ridge(alpha=1.0))
])
scores = cross_val_score(pipe, X, y, cv=5, scoring='r2')
results.append((degree, scores.mean(), pipe.named_steps['poly'].fit_transform(X).shape[1]))
if scores.mean() > best_score:
best_score, best_degree = scores.mean(), degree
print('Polynomial Feature Engineering Results:')
print(f'{"Degree":<10} {"R2 (CV)":<12} {"Features":<10}')
for deg, r2, n_feat in results:
star = ' <-- best' if deg == best_degree else ''
print(f'{deg:<10} {r2:<12.4f} {n_feat:<10}{star}')
print(f'\nOriginal features: {X.shape[1]} -> Best: {results[best_degree-1][2]}')import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# Train with staged prediction to simulate early stopping
gbm = GradientBoostingClassifier(n_estimators=200, max_depth=3, learning_rate=0.05, random_state=0)
gbm.fit(X_tr, y_tr)
train_scores, val_scores = [], []
for y_pred_tr, y_pred_val in zip(
gbm.staged_predict(X_tr), gbm.staged_predict(X_val)
):
train_scores.append(f1_score(y_tr, y_pred_tr))
val_scores.append(f1_score(y_val, y_pred_val))
best_iter = np.argmax(val_scores)
print(f'Best iteration: {best_iter+1} (val F1={val_scores[best_iter]:.4f})')
print(f'Final iteration val F1: {val_scores[-1]:.4f}')
fig, ax = plt.subplots(figsize=(9, 4))
ax.plot(train_scores, label='Train')
ax.plot(val_scores, label='Validation')
ax.axvline(best_iter, color='red', linestyle='--', label=f'Best iter={best_iter+1}')
ax.set_xlabel('Boosting rounds'); ax.set_ylabel('F1 Score')
ax.set_title('Learning Curves + Early Stopping')
ax.legend(); plt.tight_layout()
plt.savefig('learning_curves.png', dpi=80); plt.close(); print('Saved learning_curves.png')import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
np.random.seed(0)
X, y = make_classification(n_samples=2000, n_features=25, random_state=0)
# Manual multi-model search (Optuna-style pseudocode)
best_score, best_config = -np.inf, None
np.random.seed(99)
for trial in range(30):
model_type = np.random.choice(['lr', 'rf', 'gbm'])
if model_type == 'lr':
C = np.random.choice([0.01, 0.1, 1.0, 10.0])
clf = LogisticRegression(C=C, max_iter=200)
cfg = {'type': 'lr', 'C': C}
elif model_type == 'rf':
n = np.random.choice([50, 100, 200])
d = np.random.choice([3, 5, None])
clf = RandomForestClassifier(n_estimators=n, max_depth=d, random_state=0)
cfg = {'type': 'rf', 'n_estimators': n, 'max_depth': d}
else:
lr = np.random.choice([0.05, 0.1, 0.2])
clf = GradientBoostingClassifier(learning_rate=lr, random_state=0)
cfg = {'type': 'gbm', 'lr': lr}
score = cross_val_score(clf, X, y, cv=3, scoring='roc_auc').mean()
if score > best_score:
best_score, best_config = score, cfg
print(f'Best AUC: {best_score:.4f}')
print(f'Best config: {best_config}')import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
try:
import optuna; optuna.logging.set_verbosity(optuna.logging.WARNING)
except ImportError:
print('pip install optuna'); exit()
np.random.seed(42)
X, y = make_classification(n_samples=1000, n_features=15, random_state=42)
def objective(trial):
# TODO: suggest model_type (categorical: 'rf' or 'gbm')
# TODO: suggest n_estimators, max_depth
# TODO: if gbm, suggest learning_rate (log scale)
# TODO: return 3-fold CV ROC-AUC
pass
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print('Best AUC:', study.best_value)
print('Best params:', study.best_params)
import numpy as np
def psi(expected, actual, n_bins=10):
# Population Stability Index between two distributions.
bins = np.percentile(expected, np.linspace(0, 100, n_bins+1))
bins[0] -= 1e-6; bins[-1] += 1e-6
e_counts = np.histogram(expected, bins=bins)[0] / len(expected)
a_counts = np.histogram(actual, bins=bins)[0] / len(actual)
e_counts = np.where(e_counts == 0, 1e-6, e_counts)
a_counts = np.where(a_counts == 0, 1e-6, a_counts)
psi_val = np.sum((a_counts - e_counts) * np.log(a_counts / e_counts))
return psi_val
np.random.seed(42)
reference = np.random.normal(0, 1, 1000)
stable = np.random.normal(0.1, 1.0, 500) # slight shift
drifted = np.random.normal(1.5, 1.5, 500) # significant drift
print(f"PSI (stable): {psi(reference, stable):.4f} (<0.1: no drift)")
print(f"PSI (drifted): {psi(reference, drifted):.4f} (>0.25: severe drift)")import numpy as np
from sklearn.metrics import accuracy_score
np.random.seed(1)
def simulate_batch(shift=0.0, n=200):
X = np.random.randn(n, 5) + shift
y_true = (X[:, 0] + X[:, 1] > shift).astype(int)
# Simulated model trained on no-shift data
y_pred = (X[:, 0] + X[:, 1] > 0).astype(int)
return accuracy_score(y_true, y_pred)
print("Model Monitoring Dashboard")
print(f"{'Week':<6} {'Accuracy':<10} {'Status'}")
print("-" * 30)
baseline_acc = simulate_batch(shift=0.0)
for week in range(1, 9):
shift = (week - 1) * 0.15
acc = simulate_batch(shift=shift)
delta = acc - baseline_acc
status = "OK" if abs(delta) < 0.05 else ("WARN" if abs(delta) < 0.10 else "ALERT")
print(f" {week:<4} {acc:.4f} {status}")import numpy as np
from scipy import stats
np.random.seed(5)
# Simulate feature distributions: train vs production batches
train_dist = np.random.normal(50, 10, 2000)
prod_no_drift = np.random.normal(50.5, 10.2, 500)
prod_drift = np.random.normal(58.0, 12.0, 500)
features = {
"no_drift_batch": prod_no_drift,
"drifted_batch": prod_drift,
}
for name, prod in features.items():
ks_stat, p_val = stats.ks_2samp(train_dist, prod)
drift = "DRIFT DETECTED" if p_val < 0.05 else "stable"
print(f"{name:<20}: KS={ks_stat:.4f}, p={p_val:.6f} -> {drift}")import numpy as np
def psi(ref, curr, n_bins=10):
bins = np.percentile(ref, np.linspace(0, 100, n_bins+1))
bins[0] -= 1e-6; bins[-1] += 1e-6
e = np.histogram(ref, bins=bins)[0] / len(ref)
a = np.histogram(curr, bins=bins)[0] / len(curr)
e = np.where(e == 0, 1e-6, e)
a = np.where(a == 0, 1e-6, a)
return float(np.sum((a - e) * np.log(a / e)))
np.random.seed(99)
# Training distribution
train_income = np.random.lognormal(10.8, 0.4, 5000)
train_credit = np.random.normal(680, 50, 5000)
train_age = np.random.normal(38, 12, 5000)
print("Loan Model Feature Drift Report")
print(f"{'Feature':<15} {'PSI':<8} {'Status'}")
print("-" * 35)
weeks_drift = [0, 0.05, 0.12, 0.22, 0.35] # progressive drift scenario
for week, drift in enumerate(weeks_drift, 1):
prod_income = np.random.lognormal(10.8 + drift, 0.4 + drift*0.1, 500)
prod_credit = np.random.normal(680 - drift*40, 50, 500)
prod_age = np.random.normal(38 + drift*2, 12, 500)
results = {
"income": psi(train_income, prod_income),
"credit_score": psi(train_credit, prod_credit),
"age": psi(train_age, prod_age),
}
max_psi_feat = max(results, key=results.get)
max_psi = results[max_psi_feat]
status = "ALERT" if max_psi > 0.25 else ("WARN" if max_psi > 0.10 else "OK")
print(f"Week {week}: {max_psi_feat:<12} PSI={max_psi:.3f} [{status}]")import numpy as np
from scipy import stats
def psi(ref, curr, n_bins=10):
bins = np.percentile(ref, np.linspace(0, 100, n_bins+1))
bins[0] -= 1e-6; bins[-1] += 1e-6
e = np.histogram(ref, bins=bins)[0] / len(ref)
a = np.histogram(curr, bins=bins)[0] / len(curr)
e = np.where(e == 0, 1e-6, e)
a = np.where(a == 0, 1e-6, a)
return float(np.sum((a - e) * np.log(a / e)))
np.random.seed(7)
n_train = 3000
features = {
"age": np.random.normal(35, 10, n_train),
"tenure": np.random.exponential(24, n_train),
"monthly_spend": np.random.lognormal(4.5, 0.5, n_train),
"num_products": np.random.poisson(2.5, n_train).astype(float),
"is_active": np.random.binomial(1, 0.7, n_train).astype(float),
"region": np.random.randint(0, 5, n_train).astype(float),
}
# TODO: Generate 8 weekly batches of 300 samples (drift in tenure+spend from week 4)
# TODO: Compute PSI and KS-test p-value for each feature each week
# TODO: Flag weeks where total PSI > 0.5
# TODO: Print formatted weekly report table
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# Simulate a simple feature store
class SimpleFeatureStore:
def __init__(self):
self._store = {}
def ingest(self, entity_id, features, timestamp):
self._store[(entity_id, timestamp)] = features
def get_historical(self, entity_ids, feature_names, as_of=None):
results = []
for eid in entity_ids:
matching = {ts: feats for (e, ts), feats in self._store.items() if e == eid}
if not matching: continue
latest_ts = max(matching.keys())
feats = matching[latest_ts]
row = {"entity_id": eid, "timestamp": latest_ts}
row.update({k: feats.get(k) for k in feature_names})
results.append(row)
return pd.DataFrame(results)
fs = SimpleFeatureStore()
# Ingest user features
for user_id in range(1, 6):
fs.ingest(user_id, {"age": np.random.randint(20, 60),
"spend_30d": round(np.random.uniform(50, 500), 2),
"logins_7d": np.random.randint(1, 30)},
datetime.now() - timedelta(hours=np.random.randint(1, 48)))
result = fs.get_historical([1, 2, 3], ["age", "spend_30d", "logins_7d"])
print(result.to_string(index=False))import pandas as pd
import numpy as np
np.random.seed(0)
# Raw data ingestion with quality checks
def build_training_pipeline(df_raw):
report = {"input_rows": len(df_raw), "issues": []}
# Null check
null_pct = df_raw.isnull().mean()
for col, pct in null_pct.items():
if pct > 0.1:
report["issues"].append(f"{col}: {pct:.1%} nulls")
# Range validation
if "age" in df_raw:
invalid = ((df_raw["age"] < 0) | (df_raw["age"] > 120)).sum()
if invalid: report["issues"].append(f"age: {invalid} out-of-range values")
df = df_raw.dropna().copy()
df = df[(df["age"].between(0, 120)) & (df["income"] > 0)]
report["output_rows"] = len(df)
report["drop_rate"] = 1 - len(df)/len(df_raw)
return df, report
raw = pd.DataFrame({
"age": np.random.choice([*np.random.randint(18,80,90), *[-1]*5, *[np.nan]*5], 100),
"income": np.random.choice([*np.random.lognormal(10,0.5,85), *[np.nan]*15], 100),
})
clean, report = build_training_pipeline(raw)
print("Pipeline Report:", report)import hashlib, json, os
from datetime import datetime
class DataVersioner:
# Simple data versioning inspired by DVC.
def __init__(self, registry_path="data_registry.json"):
self.registry_path = registry_path
self.registry = {}
def hash_data(self, data_str):
return hashlib.md5(data_str.encode()).hexdigest()[:12]
def register(self, name, data_str, metadata=None):
version_id = self.hash_data(data_str)
entry = {
"name": name, "version": version_id,
"timestamp": datetime.now().isoformat()[:19],
"size_bytes": len(data_str),
"metadata": metadata or {}
}
self.registry[version_id] = entry
print(f"Registered: {name} v{version_id}")
return version_id
def lineage(self, version_id):
entry = self.registry.get(version_id, {})
print(f"Lineage for {version_id}: {json.dumps(entry, indent=2)}")
dv = DataVersioner()
v1 = dv.register("train_features", "col1,col2\n1,2\n3,4\n5,6", {"n_rows": 3, "split": "train"})
v2 = dv.register("train_features", "col1,col2\n1,2\n3,4\n5,6\n7,8", {"n_rows": 4, "split": "train"})
dv.lineage(v1)import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# Simulate feature store for recommendation system
class RecoFeatureStore:
def __init__(self):
self.user_features = {}
self.item_features = {}
def update_user(self, user_id, features):
self.user_features[user_id] = {**features, "updated_at": datetime.now().isoformat()}
def update_item(self, item_id, features):
self.item_features[item_id] = {**features, "updated_at": datetime.now().isoformat()}
def get_feature_vector(self, user_id, item_id):
u = self.user_features.get(user_id, {})
i = self.item_features.get(item_id, {})
return {
"user_clicks_7d": u.get("clicks_7d", 0),
"user_purchase_30d": u.get("purchase_30d", 0),
"item_avg_rating": i.get("avg_rating", 3.0),
"item_purchase_count": i.get("purchase_count", 0),
"affinity": u.get("category_affinity", {}).get(i.get("category"), 0.0)
}
fs = RecoFeatureStore()
# Populate store
for uid in range(1, 6):
fs.update_user(uid, {
"clicks_7d": np.random.randint(5, 100),
"purchase_30d": np.random.randint(0, 10),
"category_affinity": {"electronics": np.random.uniform(0,1),
"books": np.random.uniform(0,1)}
})
for iid in range(1, 4):
fs.update_item(iid, {"avg_rating": np.random.uniform(3,5),
"purchase_count": np.random.randint(10,1000),
"category": np.random.choice(["electronics","books"])})
print("Feature vectors for serving:")
for uid in [1, 2]:
for iid in [1, 2, 3]:
fv = fs.get_feature_vector(uid, iid)
print(f" user={uid}, item={iid}: {fv}")import pandas as pd
import numpy as np
np.random.seed(33)
# Generate raw data with issues
n = 500
raw = pd.DataFrame({
"customer_id": range(n),
"age": np.random.choice([*np.random.randint(18,80,460), *[-5]*20, *[np.nan]*20], n),
"revenue": np.random.choice([*np.random.lognormal(6,1,450), *[np.nan]*50], n),
"segment": np.random.choice(["A","B","C","INVALID",None], n),
"signup_date": pd.date_range("2020-01-01", periods=n, freq="D")
})
# TODO: Schema validation (dtypes, ranges, allowed values)
# TODO: Cleaning steps (impute/drop nulls, fix dtypes, filter invalid segments)
# TODO: Log each step with before/after row counts
# TODO: Output data quality report DataFrame
import numpy as np
from scipy import stats
def ab_test(control_conv, control_n, treat_conv, treat_n, alpha=0.05):
# Two-proportion z-test for A/B testing.
p_c = control_conv / control_n
p_t = treat_conv / treat_n
p_pool = (control_conv + treat_conv) / (control_n + treat_n)
se = np.sqrt(p_pool * (1-p_pool) * (1/control_n + 1/treat_n))
z = (p_t - p_c) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
ci = (p_t - p_c) + np.array([-1, 1]) * stats.norm.ppf(1-alpha/2) * se
lift = (p_t - p_c) / p_c
return {"z": z, "p_value": p_value, "lift": lift, "ci_95": ci,
"significant": p_value < alpha}
result = ab_test(control_conv=120, control_n=2000, treat_conv=155, treat_n=2000)
print(f"Conversion: control={120/2000:.3f}, treat={155/2000:.3f}")
print(f"Lift: {result['lift']:+.2%}")
print(f"p-value: {result['p_value']:.4f}")
print(f"95% CI: [{result['ci_95'][0]:.4f}, {result['ci_95'][1]:.4f}]")
print(f"Significant: {result['significant']}")import numpy as np
np.random.seed(42)
class CanaryRouter:
def __init__(self, canary_pct=0.10):
self.canary_pct = canary_pct
self.metrics = {"control": [], "canary": []}
def route(self):
return "canary" if np.random.random() < self.canary_pct else "control"
def record(self, model, latency_ms, error=False):
self.metrics[model].append({"latency": latency_ms, "error": error})
def report(self):
for model, records in self.metrics.items():
latencies = [r["latency"] for r in records]
errors = [r["error"] for r in records]
print(f"{model:<8}: n={len(records):4d}, avg_lat={np.mean(latencies):.1f}ms, "
f"p99={np.percentile(latencies,99):.1f}ms, err_rate={np.mean(errors):.3f}")
router = CanaryRouter(canary_pct=0.10)
for _ in range(2000):
m = router.route()
lat = np.random.lognormal(3.5 if m=="control" else 3.3, 0.4)
err = np.random.random() < (0.02 if m=="control" else 0.025)
router.record(m, lat, err)
router.report()import numpy as np
np.random.seed(7)
# Thompson Sampling for online model selection
class ThompsonBandit:
def __init__(self, n_arms):
self.alpha = np.ones(n_arms)
self.beta = np.ones(n_arms)
def select(self):
return np.argmax(np.random.beta(self.alpha, self.beta))
def update(self, arm, reward):
self.alpha[arm] += reward
self.beta[arm] += (1 - reward)
# 3 model versions with different true conversion rates
true_rates = [0.05, 0.08, 0.06]
bandit = ThompsonBandit(n_arms=3)
counts = np.zeros(3, dtype=int)
rewards_total = 0
for t in range(5000):
arm = bandit.select()
reward = int(np.random.random() < true_rates[arm])
bandit.update(arm, reward)
counts[arm] += 1
rewards_total += reward
print(f"Total conversions: {rewards_total} / 5000 = {rewards_total/5000:.4f}")
for i in range(3):
print(f" Model {i+1} (true={true_rates[i]:.2f}): selected {counts[i]:4d} times ({counts[i]/5000:.1%})")import numpy as np
from scipy import stats
np.random.seed(21)
# Combined: Canary + statistical testing + bandit
true_revenue_control = 45.0 # $45 avg order value
true_revenue_canary = 47.5 # $47.5 with new pricing
class PricingExperiment:
def __init__(self, canary_pct=0.15):
self.canary_pct = canary_pct
self.control_revenues = []
self.canary_revenues = []
self.alpha = np.array([1.0, 1.0]) # Thompson beta params
self.beta = np.array([1.0, 1.0])
def route(self):
# Start as pure A/B, shift to bandit after significance
if len(self.control_revenues) < 200:
return "canary" if np.random.random() < self.canary_pct else "control"
return "canary" if np.argmax(np.random.beta(self.alpha, self.beta)) == 1 else "control"
def observe(self, model):
if model == "control":
rev = np.random.normal(true_revenue_control, 12)
self.control_revenues.append(rev)
self.alpha[0] += max(rev/100, 0); self.beta[0] += max(1-rev/100, 0)
else:
rev = np.random.normal(true_revenue_canary, 12)
self.canary_revenues.append(rev)
self.alpha[1] += max(rev/100, 0); self.beta[1] += max(1-rev/100, 0)
def check_significance(self):
if len(self.control_revenues) < 30 or len(self.canary_revenues) < 30:
return None
t, p = stats.ttest_ind(self.canary_revenues, self.control_revenues)
return p
exp = PricingExperiment()
for i in range(3000):
m = exp.route()
exp.observe(m)
if i % 500 == 499:
p = exp.check_significance()
print(f"t={i+1}: control=${np.mean(exp.control_revenues):.2f}, "
f"canary=${np.mean(exp.canary_revenues):.2f}, p={p:.4f}")import numpy as np
import pandas as pd
from scipy import stats
np.random.seed(55)
# Experiment settings
TRUE_CONV_CONTROL = 0.04
TRUE_CONV_TREAT = 0.048
TRUE_REV_CONTROL = 30.0
TRUE_REV_TREAT = 31.5
MIN_SAMPLE = 500
def assign(user_id):
# Deterministic hash-based assignment
return "treatment" if hash(f"exp1_{user_id}") % 2 == 0 else "control"
# TODO: Simulate 10,000 users over 14 days (approx 714/day)
# TODO: Track conversion and revenue per variant per day
# TODO: Daily significance check with two-proportion z-test
# TODO: Auto-declare winner when p<0.05 and n>=500/arm
# TODO: Log results to DataFrame with columns: day, n_control, n_treat, p_value, winner
MLflow provides a unified platform to log parameters, metrics, and artifacts across ML experiments for reproducibility and comparison.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment("iris_classification")
with mlflow.start_run(run_name="rf_baseline"):
n_estimators = 100
max_depth = 5
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
model.fit(X_train, y_train)
acc = accuracy_score(y_test, model.predict(X_test))
mlflow.log_metric("accuracy", acc)
mlflow.sklearn.log_model(model, "model")
print(f"Run logged. Accuracy: {acc:.4f}")
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment("breast_cancer_comparison")
# autolog captures all params/metrics automatically
mlflow.sklearn.autolog()
models = {
"random_forest": RandomForestClassifier(n_estimators=50, random_state=42),
"gradient_boost": GradientBoostingClassifier(n_estimators=50, random_state=42)
}
for name, model in models.items():
with mlflow.start_run(run_name=name):
model.fit(X_train, y_train)
print(f"{name}: {model.score(X_test, y_test):.4f}")
# Query runs
runs = mlflow.search_runs(experiment_names=["breast_cancer_comparison"])
print(runs[["run_id", "metrics.training_accuracy_score"]].head())
import mlflow, mlflow.sklearn
from sklearn.svm import SVC
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split, cross_val_score
import numpy as np
X, y = load_wine(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment("wine_svm_tuning")
mlflow.sklearn.autolog()
for C in [0.1, 1.0, 10.0]:
for kernel in ["rbf", "linear"]:
with mlflow.start_run(run_name=f"svm_C{C}_{kernel}"):
model = SVC(C=C, kernel=kernel)
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
model.fit(X_train, y_train)
mlflow.log_param("C", C)
mlflow.log_param("kernel", kernel)
mlflow.log_metric("cv_mean", np.mean(cv_scores))
mlflow.log_metric("cv_std", np.std(cv_scores))
print(f"C={C}, kernel={kernel}: CV={np.mean(cv_scores):.4f}Β±{np.std(cv_scores):.4f}")
best = mlflow.search_runs(order_by=["metrics.cv_mean DESC"]).iloc[0]
print(f"Best run: {best['run_id']}, CV={best['metrics.cv_mean']:.4f}")
import mlflow
mlflow.set_experiment("practice")
with mlflow.start_run():
mlflow.log_param("learning_rate", 0.01)
mlflow.log_metric("loss", 0.25)
print("Logged!")
DVC (Data Version Control) tracks datasets and model files alongside Git, enabling reproducible ML pipelines and dataset sharing.
# DVC workflow (run in terminal, shown as subprocess here)
import subprocess, os
# Initialize DVC in a git repo
# subprocess.run(["dvc", "init"])
# Track a large dataset file
# subprocess.run(["dvc", "add", "data/train.csv"])
# This creates data/train.csv.dvc and adds data/train.csv to .gitignore
# Configure remote storage
# subprocess.run(["dvc", "remote", "add", "-d", "myremote", "s3://my-bucket/dvc"])
# Push data to remote
# subprocess.run(["dvc", "push"])
# Pull data on another machine
# subprocess.run(["dvc", "pull"])
# DVC pipeline in dvc.yaml:
pipeline_yaml = (
"stages:
"
" prepare:
"
" cmd: python prepare.py
"
" deps:
"
" - data/raw.csv
"
" outs:
"
" - data/processed.csv
"
" train:
"
" cmd: python train.py
"
" deps:
"
" - data/processed.csv
"
" outs:
"
" - models/model.pkl
"
" metrics:
"
" - metrics.json
"
)
print("DVC pipeline structure:")
print(pipeline_yaml)
# Using DVC Python API for programmatic data management
# pip install dvc dvc-s3
import json, hashlib
from pathlib import Path
# Simulate DVC-style hash tracking
def compute_md5(filepath):
h = hashlib.md5()
with open(filepath, "rb") as f:
h.update(f.read())
return h.hexdigest()
# Example: track dataset versions manually
import pandas as pd
import numpy as np
# Create sample datasets
df_v1 = pd.DataFrame(np.random.randn(1000, 5), columns=[f"f{i}" for i in range(5)])
df_v2 = pd.concat([df_v1, pd.DataFrame(np.random.randn(200, 5), columns=df_v1.columns)])
# Save versions
df_v1.to_csv("/tmp/data_v1.csv", index=False)
df_v2.to_csv("/tmp/data_v2.csv", index=False)
# Track metadata
meta = {
"v1": {"rows": len(df_v1), "md5": compute_md5("/tmp/data_v1.csv")},
"v2": {"rows": len(df_v2), "md5": compute_md5("/tmp/data_v2.csv")},
}
print(json.dumps(meta, indent=2))
print(f"Row delta v1βv2: +{meta['v2']['rows'] - meta['v1']['rows']}")
# Simulate DVC-style pipeline tracking
import hashlib, json
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
def md5(df):
return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()
# Stage 1: Raw data
np.random.seed(42)
raw = pd.DataFrame({"x1": np.random.randn(500), "x2": np.random.randn(500),
"y": np.random.randint(0, 2, 500)})
# Stage 2: Preprocessing
scaler = StandardScaler()
X = scaler.fit_transform(raw[["x1", "x2"]])
y = raw["y"]
# Stage 3: Train
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = LogisticRegression()
model.fit(X_train, y_train)
acc = accuracy_score(y_test, model.predict(X_test))
# Log pipeline metadata
pipeline_state = {
"raw_hash": md5(raw),
"n_samples": len(raw),
"accuracy": round(acc, 4)
}
print(json.dumps(pipeline_state, indent=2))
import hashlib, pandas as pd, numpy as np
df = pd.DataFrame({"x": np.random.randn(100), "y": np.random.randint(0,2,100)})
h = hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()
print(f"Dataset hash: {h}, rows: {len(df)}")
A model registry tracks model versions, manages stage transitions (Staging β Production), and provides a central hub for model governance.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
mlflow.set_experiment("registry_demo")
model_name = "IrisClassifier"
with mlflow.start_run() as run:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
acc = model.score(X_test, y_test)
mlflow.log_metric("accuracy", acc)
mlflow.sklearn.log_model(model, "model", registered_model_name=model_name)
run_id = run.info.run_id
client = MlflowClient()
# Get latest version
versions = client.get_latest_versions(model_name)
for v in versions:
print(f"Version {v.version}: stage={v.current_stage}, acc={acc:.4f}")
# Transition to staging
client.transition_model_version_stage(model_name, v.version, "Staging")
print(f"Transitioned to Staging")
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
client = MlflowClient()
# Search for best model runs across experiments
def get_best_model(experiment_name, metric="accuracy", higher_is_better=True):
order = f"metrics.{metric} {'DESC' if higher_is_better else 'ASC'}"
runs = mlflow.search_runs(
experiment_names=[experiment_name],
order_by=[order],
max_results=5
)
if runs.empty:
print(f"No runs found for {experiment_name}")
return None
best = runs.iloc[0]
print(f"Best run: {best['run_id'][:8]}...")
print(f" {metric}: {best.get(f'metrics.{metric}', 'N/A')}")
print(f" params: {dict((k.replace('params.',''), v) for k,v in best.items() if k.startswith('params.'))}")
return best
# Simulate comparing runs
print("Searching for best model...")
# In practice: get_best_model("iris_classification", "accuracy")
# Mock comparison table
comparison = pd.DataFrame({
"model": ["RandomForest", "GradientBoost", "SVM"],
"accuracy": [0.9737, 0.9605, 0.9737],
"train_time_s": [2.1, 4.5, 0.3],
"stage": ["Production", "Staging", "Archived"]
})
print(comparison.to_string(index=False))
import mlflow, mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import numpy as np
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model_name = "FraudDetector"
mlflow.set_experiment("fraud_detection_registry")
# Train multiple versions
for version_seed in [42, 123, 999]:
with mlflow.start_run(run_name=f"v_seed{version_seed}"):
np.random.seed(version_seed)
model = LogisticRegression(C=np.random.choice([0.1, 1.0, 10.0]), max_iter=1000)
model.fit(X_train, y_train)
acc = model.score(X_test, y_test)
mlflow.log_metric("accuracy", acc)
mlflow.log_param("seed", version_seed)
mlflow.sklearn.log_model(model, "model", registered_model_name=model_name)
print(f"seed={version_seed}: accuracy={acc:.4f}")
# List all versions
client = MlflowClient()
versions = client.search_model_versions(f"name='{model_name}'")
for v in versions:
print(f"Version {v.version}: {v.current_stage}")
import mlflow, mlflow.sklearn
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
with mlflow.start_run():
m = DummyClassifier().fit(X, y)
mlflow.sklearn.log_model(m, "model", registered_model_name="DummyModel")
print("Registered!")
Docker packages ML models with all dependencies into portable containers, ensuring consistent environments from development to production.
# Example Dockerfile for a scikit-learn model API
dockerfile_lines = [
"FROM python:3.10-slim",
"WORKDIR /app",
"COPY requirements.txt .",
"RUN pip install --no-cache-dir -r requirements.txt",
"COPY model.pkl .",
"COPY app.py .",
"EXPOSE 8000",
"HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:8000/health || exit 1",
'CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]',
]
print("Dockerfile:")
for line in dockerfile_lines:
print(" " + line)
# FastAPI app.py content (as list to avoid nested triple-quotes)
app_lines = [
"import pickle, numpy as np",
"from fastapi import FastAPI",
"from pydantic import BaseModel",
"app = FastAPI()",
'model = pickle.load(open("model.pkl", "rb"))',
"class Features(BaseModel):",
" features: list[float]",
"@app.get('/health')",
"def health(): return {'status': 'ok'}",
"@app.post('/predict')",
"def predict(data: Features):",
" pred = model.predict([data.features])",
" return {'prediction': int(pred[0])}",
]
print("\napp.py:")
for line in app_lines:
print(" " + line)
# Multi-stage Dockerfile for smaller images
multistage_lines = [
"FROM python:3.10 AS builder",
"WORKDIR /build",
"COPY requirements.txt .",
"RUN pip install --user --no-cache-dir -r requirements.txt",
"FROM python:3.10-slim AS production",
"WORKDIR /app",
"COPY --from=builder /root/.local /root/.local",
"COPY . .",
"ENV PATH=/root/.local/bin:$PATH",
'CMD ["python", "serve.py"]',
]
print("Multi-stage Dockerfile:")
for line in multistage_lines:
print(" " + line)
# MLflow built-in Docker packaging
mlflow_cmds = [
"# Build Docker image from MLflow model (no Dockerfile needed!)",
"mlflow models build-docker \\",
' --model-uri "models:/MyModel/Production" \\',
' --name "my-ml-model:latest"',
"docker run -p 8080:8080 my-ml-model:latest",
"curl -X POST http://localhost:8080/invocations \\",
' -H "Content-Type: application/json" \\',
" -d \'{"instances": [[5.1, 3.5, 1.4, 0.2]]}\'",
]
print("\nMLflow Docker commands:")
for cmd in mlflow_cmds:
print(" " + cmd)
import pickle, json, os
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Train and save a model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
acc = accuracy_score(y_test, model.predict(X_test))
# Save model artifact
model_path = "/tmp/iris_model.pkl"
with open(model_path, "wb") as f:
pickle.dump(model, f)
# Generate deployment manifest
manifest = {
"model": "RandomForestClassifier",
"version": "1.0.0",
"accuracy": round(acc, 4),
"features": ["sepal_length", "sepal_width", "petal_length", "petal_width"],
"classes": ["setosa", "versicolor", "virginica"],
"model_path": model_path,
"docker_image": "iris-classifier:1.0.0",
"port": 8000
}
print(json.dumps(manifest, indent=2))
# Simulate container health check
def health_check(model_pkl_path):
with open(model_pkl_path, "rb") as f:
m = pickle.load(f)
test_input = np.array([[5.1, 3.5, 1.4, 0.2]])
pred = m.predict(test_input)
return {"status": "healthy", "test_prediction": int(pred[0])}
print("Health check:", health_check(model_path))
import pickle, json
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
m = DummyClassifier().fit(X, y)
with open("/tmp/model.pkl", "wb") as f: pickle.dump(m, f)
print(json.dumps({"model": "DummyClassifier", "port": 8000}, indent=2))
FastAPI provides a high-performance, async-ready framework for building ML model serving APIs with automatic validation and documentation.
# FastAPI ML serving (run with: uvicorn app:app --reload)
# app.py structure:
fastapi_lines = [
"from fastapi import FastAPI",
"from pydantic import BaseModel, validator",
"from typing import List, Optional",
"import time, pickle, numpy as np",
"",
"app = FastAPI(title='ML Model API', version='1.0.0')",
"model = None",
"",
"@app.on_event('startup')",
"async def load_model():",
" global model",
" model = pickle.load(open('model.pkl', 'rb'))",
"",
"class PredictionRequest(BaseModel):",
" features: List[float]",
" model_version: Optional[str] = 'latest'",
"",
"@app.post('/predict')",
"async def predict(request: PredictionRequest):",
" start = time.time()",
" pred = model.predict([request.features])",
" prob = model.predict_proba([request.features])",
" latency = (time.time() - start) * 1000",
" return {'prediction': int(pred[0]), 'probability': prob[0].tolist(), 'latency_ms': round(latency, 2)}",
"",
"@app.get('/health')",
"async def health(): return {'status': 'ok'}",
]
print("FastAPI app.py:")
for line in fastapi_lines[:15]:
print(" " + line)
print(" ...")
# Batch prediction endpoint + middleware
import numpy as np, time
# Simulate batch endpoint logic
def batch_predict(instances):
X = np.array(instances)
# predictions = model.predict(X)
predictions = [0] * len(X) # mock
return {"predictions": predictions, "count": len(predictions)}
# Test batch
result = batch_predict([[5.1,3.5,1.4,0.2],[6.7,3.0,5.2,2.3],[4.9,3.0,1.4,0.2]])
print("Batch result:", result)
# Middleware pattern (request logging)
import functools, logging
def log_middleware(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = (time.time() - start) * 1000
logging.info(f"{func.__name__} completed in {duration:.2f}ms")
return result
return wrapper
@log_middleware
def predict_single(features):
return {"prediction": 0, "latency_ms": 0.1}
res = predict_single([5.1, 3.5, 1.4, 0.2])
print("Single prediction:", res)
# FastAPI batch endpoint structure (key lines)
batch_endpoint_code = [
"class BatchRequest(BaseModel):",
" instances: List[List[float]] = Field(..., min_items=1, max_items=1000)",
"@app.post('/predict/batch')",
"async def batch_predict(request: BatchRequest):",
" X = np.array(request.instances)",
" predictions = model.predict(X)",
" return {'predictions': predictions.tolist(), 'count': len(predictions)}",
]
print("\nBatch endpoint structure:")
for line in batch_endpoint_code: print(" " + line)
# Simulate FastAPI prediction pipeline
import numpy as np
import pickle
import time
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Train model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=50, random_state=42)
model.fit(X_train, y_train)
# Simulate API prediction function
def predict_api(features: list) -> dict:
if len(features) != 4:
return {"error": f"Expected 4 features, got {len(features)}"}
start = time.time()
X = np.array([features])
pred = model.predict(X)
prob = model.predict_proba(X)
latency_ms = (time.time() - start) * 1000
return {
"prediction": int(pred[0]),
"probability": [round(p, 4) for p in prob[0]],
"latency_ms": round(latency_ms, 3)
}
# Test single prediction
result = predict_api([5.1, 3.5, 1.4, 0.2])
print("Single prediction:", result)
# Simulate batch prediction
batch = [[5.1, 3.5, 1.4, 0.2], [6.7, 3.0, 5.2, 2.3], [4.9, 3.0, 1.4, 0.2]]
start = time.time()
batch_results = [predict_api(f) for f in batch]
total_ms = (time.time() - start) * 1000
print(f"Batch of {len(batch)}: {total_ms:.2f}ms total")
for i, r in enumerate(batch_results):
print(f" [{i}] pred={r['prediction']}, prob_max={max(r['probability']):.4f}")
import numpy as np
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
model = DummyClassifier().fit(X, y)
def predict(features):
if len(features) != 4: return {"error": "need 4 features"}
pred = model.predict([features])
return {"prediction": int(pred[0])}
print(predict([5.1, 3.5, 1.4, 0.2]))
Production ML models degrade over time due to data drift and concept drift. Monitoring detects these shifts before they cause silent failures.
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.datasets import load_iris
# Load reference (training) data
X, y = load_iris(return_X_y=True)
ref_data = pd.DataFrame(X, columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])
# Simulate production data with drift
np.random.seed(42)
# Normal production (no drift)
prod_normal = ref_data + np.random.normal(0, 0.05, ref_data.shape)
# Drifted production (mean shift)
prod_drift = ref_data + np.random.normal(0.5, 0.2, ref_data.shape)
def detect_drift_ks(reference, current, alpha=0.05):
# Kolmogorov-Smirnov test for distribution drift per feature
results = {}
for col in reference.columns:
stat, p_val = stats.ks_2samp(reference[col], current[col])
results[col] = {
"ks_stat": round(stat, 4),
"p_value": round(p_val, 4),
"drift_detected": p_val < alpha
}
return results
print("=== No Drift Scenario ===")
for feat, res in detect_drift_ks(ref_data, prod_normal).items():
flag = "DRIFT" if res["drift_detected"] else "OK"
print(f" {feat}: {flag} (p={res['p_value']:.4f})")
print("\n=== Drift Scenario ===")
for feat, res in detect_drift_ks(ref_data, prod_drift).items():
flag = "DRIFT" if res["drift_detected"] else "OK"
print(f" {feat}: {flag} (p={res['p_value']:.4f})")
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
def psi(expected, actual, buckets=10):
# Population Stability Index: PSI > 0.2 = significant drift
def scale_range(arr, min_val, max_val):
arr = np.clip(arr, min_val, max_val)
return arr
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
breakpoints = np.linspace(min_val, max_val, buckets + 1)
expected_perc = np.histogram(expected, bins=breakpoints)[0] / len(expected) + 1e-10
actual_perc = np.histogram(actual, bins=breakpoints)[0] / len(actual) + 1e-10
psi_val = np.sum((actual_perc - expected_perc) * np.log(actual_perc / expected_perc))
return round(psi_val, 4)
# Train model
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, random_state=42)
model = RandomForestClassifier(n_estimators=50, random_state=42).fit(X_train, y_train)
# Reference predictions (training)
ref_probs = model.predict_proba(X_train)[:, 1]
# Simulated production prediction scores
prod_stable = model.predict_proba(X_test)[:, 1]
prod_drifted = np.clip(prod_stable + np.random.normal(0.3, 0.1, len(prod_stable)), 0, 1)
print(f"PSI (stable): {psi(ref_probs, prod_stable):.4f} {'OK' if psi(ref_probs, prod_stable) < 0.1 else 'DRIFT'}")
print(f"PSI (drifted): {psi(ref_probs, prod_drifted):.4f} {'OK' if psi(ref_probs, prod_drifted) < 0.1 else 'DRIFT'}")
print("\nPSI thresholds: <0.1 = stable, 0.1-0.2 = slight change, >0.2 = significant drift")
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
X, y = load_breast_cancer(return_X_y=True)
feat_names = load_breast_cancer().feature_names
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
baseline_f1 = f1_score(y_test, model.predict(X_test))
print(f"Baseline F1: {baseline_f1:.4f}")
# Simulate monthly production windows with increasing drift
months = 6
results = []
for month in range(1, months + 1):
drift_level = month * 0.1
X_prod = X_test + np.random.normal(drift_level, 0.05, X_test.shape)
y_prod = model.predict(X_prod)
# KS test on top 3 features
drift_counts = sum(
stats.ks_2samp(X_train[:, i], X_prod[:, i])[1] < 0.05
for i in range(3)
)
# If we had labels, compute F1
simulated_errors = (drift_level * np.random.randn(len(y_test))).astype(bool)
y_labels = np.where(simulated_errors, 1 - y_test, y_test)
f1 = f1_score(y_labels, model.predict(X_prod))
results.append({"month": month, "f1": round(f1, 4), "drifted_features": drift_counts})
alert = "ALERT" if f1 < baseline_f1 * 0.9 or drift_counts >= 2 else "OK"
print(f"Month {month}: F1={f1:.4f}, drift_features={drift_counts} [{alert}]")
import numpy as np
from scipy import stats
ref = np.random.normal(0, 1, 1000)
prod = np.random.normal(0.5, 1, 1000) # drifted
stat, p = stats.ks_2samp(ref, prod)
print(f"KS stat={stat:.4f}, p={p:.4f}, drift={'YES' if p<0.05 else 'NO'}")
CI/CD for ML automates testing, validation, and deployment of models, ensuring quality gates before any model reaches production.
# .github/workflows/ml_pipeline.yml structure
yaml_lines = [
"name: ML Pipeline CI/CD",
"on:",
" push:",
" branches: [main]",
"jobs:",
" test-and-validate:",
" runs-on: ubuntu-latest",
" steps:",
" - uses: actions/checkout@v3",
" - name: Set up Python",
" uses: actions/setup-python@v4",
" with:",
" python-version: '3.10'",
" - name: Install dependencies",
" run: pip install -r requirements.txt",
" - name: Run unit tests",
" run: pytest tests/ --cov=src",
" - name: Train model",
" run: python train.py",
" - name: Evaluate model (quality gate)",
" run: python evaluate.py --threshold 0.90",
" - name: Build Docker image",
" if: github.ref == 'refs/heads/main'",
" run: docker build -t my-model:latest .",
]
print(".github/workflows/ml_pipeline.yml:")
for line in yaml_lines:
print(" " + line)
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import json, sys
def evaluate_model(model, X_train, X_test, y_train, y_test, thresholds):
# Quality gate: fail CI if model doesn't meet thresholds
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
metrics = {
"accuracy": round(accuracy_score(y_test, y_pred), 4),
"f1_macro": round(f1_score(y_test, y_pred, average="macro"), 4),
"cv_mean": round(cv_scores.mean(), 4),
"cv_std": round(cv_scores.std(), 4),
}
passed = all(metrics.get(k, 0) >= v for k, v in thresholds.items())
return metrics, passed
# Define quality gates
THRESHOLDS = {"accuracy": 0.90, "f1_macro": 0.88, "cv_mean": 0.89}
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
metrics, passed = evaluate_model(model, X_train, X_test, y_train, y_test, THRESHOLDS)
print(json.dumps(metrics, indent=2))
print(f"\nQuality Gate: {'PASSED β' if passed else 'FAILED β'}")
if not passed:
print("Failed thresholds:", {k: v for k, v in THRESHOLDS.items() if metrics.get(k, 0) < v})
# sys.exit(1) # Uncomment in real CI to fail the pipeline
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import f1_score
import json
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
THRESHOLDS = {"accuracy": 0.95, "f1": 0.94, "cv_mean": 0.94}
def ci_pipeline(model, model_name):
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
cv = cross_val_score(model, X_train, y_train, cv=5)
metrics = {
"model": model_name,
"accuracy": round(model.score(X_test, y_test), 4),
"f1": round(f1_score(y_test, y_pred), 4),
"cv_mean": round(cv.mean(), 4),
"cv_std": round(cv.std(), 4),
}
failures = [k for k in ["accuracy", "f1", "cv_mean"] if metrics[k] < THRESHOLDS[k]]
metrics["status"] = "PASSED" if not failures else f"FAILED: {failures}"
return metrics
for model, name in [
(RandomForestClassifier(n_estimators=100, random_state=42), "RandomForest"),
(GradientBoostingClassifier(n_estimators=50, random_state=42), "GradientBoost"),
]:
result = ci_pipeline(model, name)
print(json.dumps(result, indent=2))
from sklearn.dummy import DummyClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y)
m = DummyClassifier().fit(X_train, y_train)
acc = m.score(X_test, y_test)
passed = acc >= 0.30
print(f"Accuracy={acc:.4f}, gate={'PASSED' if passed else 'FAILED'}")
Feature stores centralize, version, and serve ML features consistently across training and serving, eliminating training-serving skew.
# Feature store concepts using pandas (Feast-like workflow)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Simulate a feature store
class SimpleFeatureStore:
def __init__(self):
self._features = {}
self._entity_df = None
def ingest(self, feature_view_name, df, entity_col="entity_id"):
# Register feature data
self._features[feature_view_name] = df.set_index(entity_col)
print(f"Ingested '{feature_view_name}': {len(df)} rows, {len(df.columns)-1} features")
def get_historical_features(self, entity_ids, feature_views):
# Retrieve features for training
dfs = [self._features[fv].loc[entity_ids] for fv in feature_views if fv in self._features]
return pd.concat(dfs, axis=1).reset_index()
def get_online_features(self, entity_id, feature_views):
# Low-latency single-entity lookup for serving
result = {"entity_id": entity_id}
for fv in feature_views:
if fv in self._features and entity_id in self._features[fv].index:
result.update(self._features[fv].loc[entity_id].to_dict())
return result
# Setup
store = SimpleFeatureStore()
np.random.seed(42)
n = 1000
# User behavioral features
user_features = pd.DataFrame({
"entity_id": range(n),
"avg_spend": np.random.exponential(50, n),
"days_active": np.random.randint(1, 365, n),
"n_transactions": np.random.poisson(10, n),
})
store.ingest("user_activity", user_features)
# User demographic features
user_demo = pd.DataFrame({
"entity_id": range(n),
"age": np.random.randint(18, 70, n),
"credit_score": np.random.randint(300, 850, n),
})
store.ingest("user_demographics", user_demo)
# Historical features for training
train_features = store.get_historical_features(
entity_ids=list(range(100)),
feature_views=["user_activity", "user_demographics"]
)
print("Training features shape:", train_features.shape)
print(train_features.head(3).to_string())
# Online features for serving
online = store.get_online_features(42, ["user_activity", "user_demographics"])
print("\nOnline features for entity 42:", online)
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle
# Anti-pattern: inline preprocessing (causes training-serving skew)
def bad_preprocess(df):
df["feature_ratio"] = df["f1"] / (df["f2"] + 1e-8)
df["log_f3"] = np.log1p(df["f3"])
return df[["feature_ratio", "log_f3", "f4"]].values
# Good pattern: encapsulate ALL preprocessing in a sklearn Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
class FeatureEngineer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None): return self
def transform(self, X):
X = pd.DataFrame(X, columns=["f1", "f2", "f3", "f4"])
return np.column_stack([
X["f1"] / (X["f2"] + 1e-8), # ratio feature
np.log1p(X["f3"]), # log transform
X["f4"] # passthrough
])
# Generate data
np.random.seed(42)
n = 1000
X = pd.DataFrame(np.abs(np.random.randn(n, 4)), columns=["f1", "f2", "f3", "f4"])
y = (X["f1"] > X["f2"]).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X.values, y, random_state=42)
# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
("engineer", FeatureEngineer()),
("scaler", StandardScaler()),
("model", RandomForestClassifier(n_estimators=50, random_state=42))
])
full_pipeline.fit(X_train, y_train)
acc = full_pipeline.score(X_test, y_test)
print(f"Pipeline accuracy: {acc:.4f}")
# Save entire pipeline (same logic used in training AND serving)
with open("/tmp/full_pipeline.pkl", "wb") as f:
pickle.dump(full_pipeline, f)
# Serving: load and predict (ZERO skew risk)
with open("/tmp/full_pipeline.pkl", "rb") as f:
serving_pipeline = pickle.load(f)
pred = serving_pipeline.predict(X_test[:5])
print("Serving predictions:", pred)
print("Training-serving skew: ELIMINATED (same pipeline object)")
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pickle
np.random.seed(42)
n = 2000
# Simulate customer churn features
df = pd.DataFrame({
"tenure_months": np.random.randint(1, 72, n),
"monthly_charges": np.random.uniform(20, 120, n),
"total_charges": np.random.uniform(20, 8000, n),
"n_support_calls": np.random.poisson(2, n),
"last_login_days": np.random.exponential(30, n),
})
# Target: churn if high charges + short tenure + many support calls
df["churn"] = (
(df["monthly_charges"] > 80) &
(df["tenure_months"] < 12) |
(df["n_support_calls"] > 4)
).astype(int)
X = df.drop("churn", axis=1).values
y = df["churn"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
# Feature engineering + model in one pipeline
def add_features(X):
X = np.copy(X)
# charge_per_month = total / tenure
charge_ratio = X[:, 2] / (X[:, 0] + 1)
# recency_support_interaction
recency_support = X[:, 3] * np.log1p(X[:, 4])
return np.column_stack([X, charge_ratio, recency_support])
pipeline = Pipeline([
("engineer", FunctionTransformer(add_features)),
("scaler", StandardScaler()),
("model", GradientBoostingClassifier(n_estimators=100, random_state=42))
])
pipeline.fit(X_train, y_train)
print("Churn model pipeline accuracy:", round(pipeline.score(X_test, y_test), 4))
print(classification_report(y_test, pipeline.predict(X_test)))
# Single-entity serving prediction
customer = np.array([[24, 95.0, 2500.0, 3, 45.0]])
pred = pipeline.predict(customer)
prob = pipeline.predict_proba(customer)
print(f"Customer churn prediction: {'CHURN' if pred[0] else 'RETAIN'} (prob={prob[0][1]:.4f})")
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.dummy import DummyClassifier
def add_feat(X): return np.column_stack([X, X[:, 0] / (X[:, 1] + 1)])
pipe = Pipeline([("eng", FunctionTransformer(add_feat)), ("sc", StandardScaler()), ("m", DummyClassifier())])
X = np.random.randn(100, 3); y = np.random.randint(0, 2, 100)
pipe.fit(X, y)
print("Pipeline works:", pipe.predict(X[:3]))