Scikit-learn Study Guide
Machine Learning in Python β from data loading to model deployment.
10 Topics • Real-World MLImport scikit-learn, load built-in datasets, split data into train/test sets.
Loading Datasets & Train/Test Split
from sklearn.datasets import load_iris, load_boston, make_classification
from sklearn.model_selection import train_test_split
import pandas as pd
# Load Iris dataset
iris = load_iris()
X, y = iris.data, iris.target
print('Features:', iris.feature_names)
print('Classes:', iris.target_names)
print('Shape:', X.shape)
# Split 80/20
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f'Train: {X_train.shape}, Test: {X_test.shape}')Synthetic Dataset with make_classification
from sklearn.datasets import make_classification, make_regression
import numpy as np
# Synthetic classification data
X, y = make_classification(
n_samples=1000, n_features=10,
n_informative=5, n_redundant=2,
random_state=42
)
print('X shape:', X.shape, '| Classes:', np.unique(y))
# Synthetic regression data
X_r, y_r = make_regression(
n_samples=500, n_features=5, noise=0.1, random_state=42
)
print('Regression X:', X_r.shape, '| y range:', y_r.min().round(1), '-', y_r.max().round(1))Stratified K-Fold Cross-Validation
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
import numpy as np
iris = load_iris()
X, y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
model = RandomForestClassifier(n_estimators=50, random_state=42)
scores = cross_val_score(model, X, y, cv=skf, scoring='accuracy')
print('Per-fold accuracy:', scores.round(4))
print(f'Mean: {scores.mean():.4f} Std: {scores.std():.4f}')
# Check class balance per fold
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
counts = np.bincount(y[val_idx])
print(f'Fold {fold+1} val class counts: {counts}')StratifiedShuffleSplit & make_regression for Continuous Targets
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.datasets import make_classification, make_regression
import numpy as np
# StratifiedShuffleSplit: multiple random stratified splits
X_c, y_c = make_classification(
n_samples=500, n_features=6, n_classes=3,
n_informative=4, random_state=42
)
sss = StratifiedShuffleSplit(n_splits=3, test_size=0.2, random_state=42)
for fold, (train_idx, test_idx) in enumerate(sss.split(X_c, y_c)):
train_dist = np.bincount(y_c[train_idx]) / len(train_idx)
test_dist = np.bincount(y_c[test_idx]) / len(test_idx)
print(f'Split {fold+1} train dist: {train_dist.round(3)} | test dist: {test_dist.round(3)}')
# make_regression: multi-output continuous targets
X_r, y_r = make_regression(
n_samples=400, n_features=8, n_informative=5,
n_targets=2, noise=5.0, random_state=42
)
print(f'Regression X: {X_r.shape} | y: {y_r.shape}')
print(f'y col0 range: [{y_r[:,0].min():.1f}, {y_r[:,0].max():.1f}]')
print(f'y col1 range: [{y_r[:,1].min():.1f}, {y_r[:,1].max():.1f}]')import pandas as pd
from sklearn.model_selection import train_test_split
# Simulate patient vitals
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
'age': np.random.randint(20, 80, 200),
'bp': np.random.randint(60, 140, 200),
'cholesterol': np.random.randint(150, 300, 200),
'glucose': np.random.randint(70, 200, 200),
'disease': np.random.choice([0, 1], 200, p=[0.7, 0.3])
})
X = df.drop('disease', axis=1)
y = df['disease']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
print(f'Train positives: {y_train.mean():.1%} | Test positives: {y_test.mean():.1%}')from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
X, y = make_classification(
n_samples=1000, n_features=8, n_classes=3,
n_informative=6, n_redundant=1, random_state=42
)
# TODO: Create StratifiedKFold with 10 splits, shuffle=True
# skf = StratifiedKFold(???)
# TODO: Run cross_val_score with LogisticRegression(max_iter=500)
# scores = cross_val_score(???)
# print('CV scores:', scores.round(4))
# print(f'Mean: {scores.mean():.4f} Std: {scores.std():.4f}')
# TODO: Compare with a single train/test split
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# model = LogisticRegression(max_iter=500)
# model.fit(X_train, y_train)
# print('Single split accuracy:', model.score(X_test, y_test).round(4))LinearRegression for continuous targets; LogisticRegression for binary/multiclass classification.
Linear Regression
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
X, y = make_regression(n_samples=200, n_features=3, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print('Coefficients:', model.coef_.round(2))
print('Intercept:', round(model.intercept_, 2))
print('R2 Score:', round(r2_score(y_test, y_pred), 4))
print('RMSE:', round(np.sqrt(mean_squared_error(y_test, y_pred)), 2))Logistic Regression (Classification)
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42, stratify=iris.target
)
lr = LogisticRegression(max_iter=200)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print('Accuracy:', accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred, target_names=iris.target_names))Ridge & Lasso Regularization
from sklearn.linear_model import Ridge, Lasso, LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
X, y = make_regression(n_samples=200, n_features=20, noise=15, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
models = [
('LinearRegression', LinearRegression()),
('Ridge(alpha=1)', Ridge(alpha=1.0)),
('Ridge(alpha=10)', Ridge(alpha=10.0)),
('Lasso(alpha=1)', Lasso(alpha=1.0)),
]
for name, model in models:
model.fit(X_train, y_train)
r2 = r2_score(y_test, model.predict(X_test))
n_zero = np.sum(np.abs(model.coef_) < 1e-4)
print(f'{name:25s} R2={r2:.4f} zero_coefs={n_zero}')Alpha Tuning & ElasticNet
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.datasets import make_regression
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import numpy as np
X, y = make_regression(n_samples=300, n_features=15, n_informative=5,
noise=20, random_state=42)
# Alpha sweep for Ridge and Lasso
alphas = [0.01, 0.1, 1, 10, 100]
print(f'{'Alpha':>8} | {'Ridge R2':>10} | {'Lasso R2':>10}')
print('-' * 35)
for a in alphas:
r_pipe = Pipeline([('sc', StandardScaler()), ('m', Ridge(alpha=a))])
l_pipe = Pipeline([('sc', StandardScaler()), ('m', Lasso(alpha=a, max_iter=5000))])
r2_r = cross_val_score(r_pipe, X, y, cv=5, scoring='r2').mean()
r2_l = cross_val_score(l_pipe, X, y, cv=5, scoring='r2').mean()
print(f'{a:>8.2f} | {r2_r:>10.4f} | {r2_l:>10.4f}')
# ElasticNet: blends L1 + L2
for l1r in [0.1, 0.5, 0.9]:
en = Pipeline([('sc', StandardScaler()),
('m', ElasticNet(alpha=1.0, l1_ratio=l1r, max_iter=5000))])
r2 = cross_val_score(en, X, y, cv=5, scoring='r2').mean()
print(f'ElasticNet l1_ratio={l1r:.1f} CV R2={r2:.4f}')from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import numpy as np
# Simulate loan applicant data
np.random.seed(42)
n = 500
income = np.random.normal(50000, 15000, n)
debt = np.random.normal(20000, 8000, n)
credit_score = np.random.randint(300, 850, n)
X = np.column_stack([income, debt, credit_score])
# Default if debt/income > 0.6 or credit < 550
y = ((debt / income > 0.6) | (credit_score < 550)).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LogisticRegression()
model.fit(X_train, y_train)
print('Loan Default Prediction:')
print(classification_report(y_test, model.predict(X_test)))from sklearn.linear_model import LinearRegression, Lasso
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
X, y, true_coef = make_regression(
n_samples=300, n_features=15, n_informative=5,
noise=10, coef=True, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# TODO: Train LinearRegression and compute R2
# lr = LinearRegression()
# lr.fit(X_train, y_train)
# print('LinearRegression R2:', r2_score(y_test, lr.predict(X_test)).round(4))
# TODO: Train Lasso(alpha=0.5) and compute R2
# lasso = Lasso(alpha=0.5)
# lasso.fit(X_train, y_train)
# print('Lasso R2:', r2_score(y_test, lasso.predict(X_test)).round(4))
# TODO: Count zero coefficients in Lasso
# n_zero = np.sum(np.abs(lasso.coef_) < 1e-4)
# print(f'Lasso zero coefs: {n_zero} / 15 (expect ~10)')
# TODO: Print top-5 features by |coef| for both models
# for name, coef in [('LR', lr.coef_), ('Lasso', lasso.coef_)]:
# top5 = np.argsort(np.abs(coef))[::-1][:5]
# print(f'{name} top features: {top5}')Tree-based models: interpretable Decision Trees and powerful ensemble Random Forests.
Decision Tree Classifier
from sklearn.tree import DecisionTreeClassifier, export_text
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
dt = DecisionTreeClassifier(max_depth=3, random_state=42)
dt.fit(X_train, y_train)
print('Accuracy:', accuracy_score(y_test, dt.predict(X_test)))
print('Feature importances:')
for name, imp in zip(iris.feature_names, dt.feature_importances_):
print(f' {name}: {imp:.3f}')
print(export_text(dt, feature_names=iris.feature_names))Random Forest & Feature Importance
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
print('RF Accuracy:', accuracy_score(y_test, rf.predict(X_test)))
# Top 5 features
idx = np.argsort(rf.feature_importances_)[::-1][:5]
for i in idx:
print(f' Feature {i}: {rf.feature_importances_[i]:.4f}')Gradient Boosting Classifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
gb = GradientBoostingClassifier(
n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42
)
gb.fit(X_train, y_train)
print('GBM Accuracy:', accuracy_score(y_test, gb.predict(X_test)).round(4))
# Feature importances β top 5
idx = np.argsort(gb.feature_importances_)[::-1][:5]
print('Top 5 features by importance:')
for rank, i in enumerate(idx, 1):
print(f' {rank}. Feature {i}: {gb.feature_importances_[i]:.4f}')ExtraTreesClassifier & Feature Importance Bar Chart (Text)
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
import numpy as np
iris = load_iris()
X, y = iris.data, iris.target
feat_names = iris.feature_names
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# ExtraTrees vs RandomForest comparison
for name, clf in [('RandomForest ', RandomForestClassifier(n_estimators=100, random_state=42)),
('ExtraTrees ', ExtraTreesClassifier(n_estimators=100, random_state=42))]:
clf.fit(X_train, y_train)
cv = cross_val_score(clf, X, y, cv=5).mean()
print(f'{name} test={accuracy_score(y_test, clf.predict(X_test)):.4f} CV={cv:.4f}')
# Text-based feature importance bar chart
et = ExtraTreesClassifier(n_estimators=100, random_state=42).fit(X_train, y_train)
imps = et.feature_importances_
print('\nFeature Importance Bar Chart:')
max_imp = imps.max()
bar_width = 30
for name, imp in sorted(zip(feat_names, imps), key=lambda x: -x[1]):
bar = int(imp / max_imp * bar_width) * '#'
print(f' {name:28s} |{bar:<30}| {imp:.4f}')from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np, pandas as pd
np.random.seed(42)
n = 1000
df = pd.DataFrame({
'recency_days': np.random.randint(1, 365, n),
'frequency': np.random.randint(1, 50, n),
'monetary': np.random.exponential(200, n),
'tenure_months': np.random.randint(1, 60, n),
'support_calls': np.random.poisson(2, n)
})
# Churn if high recency, low frequency
df['churn'] = ((df['recency_days'] > 200) & (df['frequency'] < 5)).astype(int)
X = df.drop('churn', axis=1)
y = df['churn']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
print('Customer Churn Prediction:')
print(classification_report(y_test, rf.predict(X_test)))from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
import numpy as np
iris = load_iris()
X, y = iris.data, iris.target
print(f'{'Depth':>6} | {'Train Acc':>10} | {'CV Acc':>10} | {'Gap':>8}')
print('-' * 44)
best_depth, best_cv = 1, 0.0
for depth in range(1, 11):
# TODO: dt = DecisionTreeClassifier(max_depth=depth, random_state=42)
# TODO: dt.fit(X, y)
# TODO: train_acc = dt.score(X, y)
# TODO: cv_acc = cross_val_score(dt, X, y, cv=5).mean()
# TODO: gap = train_acc - cv_acc
# TODO: print(f'{depth:>6} | {train_acc:>10.4f} | {cv_acc:>10.4f} | {gap:>8.4f}')
# TODO: if cv_acc > best_cv: best_depth, best_cv = depth, cv_acc
pass
# TODO: print(f'Best depth: {best_depth} with CV accuracy {best_cv:.4f}')SVM finds the maximum-margin hyperplane. Works for classification (SVC) and regression (SVR).
SVC with Kernel Trick
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
X, y = make_classification(n_samples=500, n_features=5, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# SVM needs scaled features
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
for kernel in ['linear', 'rbf', 'poly']:
svm = SVC(kernel=kernel, C=1.0)
svm.fit(X_train_s, y_train)
acc = accuracy_score(y_test, svm.predict(X_test_s))
print(f'{kernel:8s} kernel accuracy: {acc:.4f}')SVR for Regression
from sklearn.svm import SVR
from sklearn.datasets import make_regression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
X, y = make_regression(n_samples=300, n_features=3, noise=15, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
for kernel in ['linear', 'rbf']:
svr = SVR(kernel=kernel, C=10)
svr.fit(X_train_s, y_train)
r2 = r2_score(y_test, svr.predict(X_test_s))
print(f'SVR ({kernel}) R2: {r2:.4f}')SVM with C-Parameter Tuning
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
import numpy as np
X, y = make_classification(n_samples=500, n_features=5, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
print('C value | CV Accuracy')
print('-' * 25)
for C in [0.01, 0.1, 1, 10, 100]:
svm = SVC(kernel='rbf', C=C)
cv_scores = cross_val_score(svm, X_train_s, y_train, cv=5)
print(f'C={C:6.2f} | {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}')NuSVC & SVM with class_weight='balanced'
from sklearn.svm import NuSVC, SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, f1_score
import numpy as np
# Imbalanced binary classification
X, y = make_classification(
n_samples=600, n_features=6, weights=[0.85, 0.15], random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_train)
X_te = scaler.transform(X_test)
# NuSVC: nu controls upper bound on training errors
print('NuSVC nu sweep:')
for nu in [0.1, 0.3, 0.5]:
try:
m = NuSVC(nu=nu, kernel='rbf').fit(X_tr, y_train)
f1 = f1_score(y_test, m.predict(X_te))
print(f' nu={nu} F1={f1:.4f}')
except Exception as e:
print(f' nu={nu} infeasible: {e}')
# SVC with class_weight='balanced' handles imbalance
print('\nSVC class_weight comparison:')
for cw in [None, 'balanced']:
svc = SVC(kernel='rbf', C=10, class_weight=cw).fit(X_tr, y_train)
f1 = f1_score(y_test, svc.predict(X_te))
print(f' class_weight={str(cw):10s} F1={f1:.4f}')
print(classification_report(y_test, svc.predict(X_te)))from sklearn.svm import SVC
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score
# Simulated reviews
reviews = [
'great product love it', 'terrible waste of money',
'amazing quality highly recommend', 'broken arrived damaged',
'best purchase ever', 'awful customer service never again',
'works perfectly fast shipping', 'poor quality disappointed',
]
labels = [1, 0, 1, 0, 1, 0, 1, 0] # 1=positive, 0=negative
pipe = Pipeline([
('tfidf', TfidfVectorizer()),
('svm', SVC(kernel='linear', C=1.0))
])
pipe.fit(reviews[:6], labels[:6])
preds = pipe.predict(reviews[6:])
print('Predictions:', ['Positive' if p else 'Negative' for p in preds])
print('True labels:', ['Positive' if l else 'Negative' for l in labels[6:]])from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
import numpy as np
X, y = make_classification(n_samples=500, n_features=6, random_state=0)
# TODO: Scale X with StandardScaler
# scaler = StandardScaler()
# X_s = scaler.fit_transform(X)
# TODO: Compare kernels
kernels = ['linear', 'rbf', 'poly', 'sigmoid']
print('Kernel | CV Accuracy')
print('-' * 28)
for kernel in kernels:
# TODO: svm = SVC(kernel=kernel, C=1.0)
# TODO: scores = cross_val_score(svm, X_s, y, cv=5)
# TODO: print(f'{kernel:8s} | {scores.mean():.4f} +/- {scores.std():.4f}')
pass
# TODO: Tune C for rbf kernel
print('\nRBF kernel C tuning:')
for C in [0.1, 1, 10, 100]:
# TODO: svm = SVC(kernel='rbf', C=C)
# TODO: scores = cross_val_score(svm, X_s, y, cv=5)
# TODO: print(f'C={C:6.1f} | {scores.mean():.4f}')
passKNN classifies by majority vote of k neighbors. Naive Bayes uses Bayes' theorem with feature independence assumption.
K-Nearest Neighbors
from sklearn.neighbors import KNeighborsClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
import numpy as np
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
# Find best k
for k in [1, 3, 5, 7, 11]:
knn = KNeighborsClassifier(n_neighbors=k)
scores = cross_val_score(knn, X_train_s, y_train, cv=5)
print(f'k={k:2d}: CV accuracy = {scores.mean():.4f} (+/- {scores.std():.4f})')Naive Bayes for Text Classification
from sklearn.naive_bayes import MultinomialNB, GaussianNB
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Email spam detection
emails = [
'win money now free prize', 'meeting tomorrow at 10am',
'congratulations you won cash', 'project update attached',
'free viagra cheap pills', 'please review the report',
'claim your reward today', 'lunch at noon works for me',
]
labels = [1, 0, 1, 0, 1, 0, 1, 0] # 1=spam
vec = CountVectorizer()
X = vec.fit_transform(emails)
nb = MultinomialNB()
nb.fit(X, labels)
test = vec.transform(['free money win now', 'schedule a meeting'])
preds = nb.predict(test)
print('Predictions:', ['SPAM' if p else 'HAM' for p in preds])GaussianNB with Prior Probability
from sklearn.naive_bayes import GaussianNB
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
# Imbalanced dataset (80% class 0, 20% class 1)
X, y = make_classification(
n_samples=1000, n_features=6, weights=[0.8, 0.2], random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Default priors (learned from training data)
gnb_default = GaussianNB()
gnb_default.fit(X_train, y_train)
# Custom priors (force equal class probability)
gnb_equal = GaussianNB(priors=[0.5, 0.5])
gnb_equal.fit(X_train, y_train)
for name, m in [('Default priors', gnb_default), ('Equal priors', gnb_equal)]:
acc = accuracy_score(y_test, m.predict(X_test))
print(f'{name}: accuracy={acc:.4f}')
print('Class priors (default):', gnb_default.class_prior_.round(3))RadiusNeighborsClassifier & GaussianNB Calibration
from sklearn.neighbors import RadiusNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.calibration import CalibratedClassifierCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, log_loss
import numpy as np
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, stratify=iris.target, random_state=42
)
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_train)
X_te = scaler.transform(X_test)
# RadiusNeighborsClassifier: classify within a fixed radius
print('RadiusNeighborsClassifier radius sweep:')
for r in [0.5, 1.0, 1.5, 2.0]:
rnc = RadiusNeighborsClassifier(radius=r, outlier_label='most_frequent')
rnc.fit(X_tr, y_train)
acc = accuracy_score(y_test, rnc.predict(X_te))
print(f' radius={r:.1f} accuracy={acc:.4f}')
# GaussianNB calibration: improve probability estimates
gnb_raw = GaussianNB()
gnb_cal = CalibratedClassifierCV(GaussianNB(), method='isotonic', cv=5)
gnb_raw.fit(X_tr, y_train)
gnb_cal.fit(X_tr, y_train)
for name, m in [('GaussianNB (raw) ', gnb_raw), ('GaussianNB (cal) ', gnb_cal)]:
proba = m.predict_proba(X_te)
ll = log_loss(y_test, proba)
acc = accuracy_score(y_test, m.predict(X_te))
print(f'{name} accuracy={acc:.4f} log_loss={ll:.4f}')from sklearn.neighbors import NearestNeighbors
import numpy as np, pandas as pd
# Product feature vectors (price, rating, sales, weight)
np.random.seed(42)
products = pd.DataFrame({
'name': [f'Product_{i}' for i in range(20)],
'price': np.random.uniform(10, 200, 20),
'rating': np.random.uniform(1, 5, 20),
'sales': np.random.randint(100, 10000, 20),
'weight_kg': np.random.uniform(0.1, 5, 20)
})
from sklearn.preprocessing import StandardScaler
X = StandardScaler().fit_transform(products[['price','rating','sales','weight_kg']])
nn = NearestNeighbors(n_neighbors=4, metric='euclidean')
nn.fit(X)
# Find similar products to Product_0
distances, indices = nn.kneighbors([X[0]])
print('Products similar to', products.iloc[0]['name'])
for i, d in zip(indices[0][1:], distances[0][1:]):
print(f' {products.iloc[i]["name"]} (dist={d:.2f})')from sklearn.neighbors import KNeighborsClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
import numpy as np
iris = load_iris()
# TODO: Scale features
# scaler = StandardScaler()
# X_s = scaler.fit_transform(iris.data)
y = iris.target
# TODO: Compare distance metrics at k=5
metrics = ['euclidean', 'manhattan', 'chebyshev']
print('Metric | CV Accuracy')
print('-' * 30)
best_metric, best_score = '', 0.0
for metric in metrics:
# TODO: knn = KNeighborsClassifier(n_neighbors=5, metric=metric)
# TODO: scores = cross_val_score(knn, X_s, y, cv=5)
# TODO: print(f'{metric:11s} | {scores.mean():.4f}')
# TODO: if scores.mean() > best_score: best_metric, best_score = metric, scores.mean()
pass
# TODO: Find best k for the winning metric
# print(f'\nTuning k for best metric: {best_metric}')
# for k in range(1, 16):
# knn = KNeighborsClassifier(n_neighbors=k, metric=best_metric)
# s = cross_val_score(knn, X_s, y, cv=5).mean()
# print(f' k={k:2d}: {s:.4f}')Unsupervised learning: group similar data points without labels.
KMeans Clustering
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
from sklearn.metrics import silhouette_score
import numpy as np
X, _ = make_blobs(n_samples=300, centers=4, random_state=42)
# Elbow method to find optimal k
inertias = []
for k in range(2, 9):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
km.fit(X)
inertias.append(km.inertia_)
sil = silhouette_score(X, km.labels_)
print(f'k={k}: inertia={km.inertia_:.1f}, silhouette={sil:.3f}')
# Best k=4
best = KMeans(n_clusters=4, random_state=42, n_init=10)
best.fit(X)
print('Cluster sizes:', {i: (best.labels_==i).sum() for i in range(4)})DBSCAN for Density-Based Clustering
from sklearn.cluster import DBSCAN
from sklearn.datasets import make_moons
from sklearn.preprocessing import StandardScaler
import numpy as np
# make_moons: non-convex shapes KMeans can't handle
X, _ = make_moons(n_samples=200, noise=0.1, random_state=42)
X = StandardScaler().fit_transform(X)
db = DBSCAN(eps=0.3, min_samples=5)
db.fit(X)
labels = db.labels_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
print(f'Clusters found: {n_clusters}')
print(f'Noise points: {n_noise}')
print(f'Cluster sizes: {[(labels==i).sum() for i in range(n_clusters)]}')Agglomerative Hierarchical Clustering
from sklearn.cluster import AgglomerativeClustering
from sklearn.datasets import make_blobs
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
import numpy as np
X, _ = make_blobs(n_samples=200, centers=3, random_state=42)
X = StandardScaler().fit_transform(X)
for linkage in ['ward', 'complete', 'average', 'single']:
agg = AgglomerativeClustering(n_clusters=3, linkage=linkage)
labels = agg.fit_predict(X)
sil = silhouette_score(X, labels)
print(f'Linkage={linkage:8s} silhouette={sil:.4f}')AgglomerativeClustering & Silhouette Score Comparison
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.metrics import silhouette_score
from sklearn.datasets import make_blobs
import numpy as np
X, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.8, random_state=42)
# Compare silhouette scores for k = 2..6
print('KMeans silhouette scores:')
for k in range(2, 7):
labels = KMeans(n_clusters=k, random_state=42, n_init=10).fit_predict(X)
score = silhouette_score(X, labels)
print(f' k={k}: {score:.4f}')
# AgglomerativeClustering with different linkages
print('\nAgglomerative linkage comparison (k=4):')
for linkage in ['ward', 'complete', 'average', 'single']:
agg = AgglomerativeClustering(n_clusters=4, linkage=linkage)
labels = agg.fit_predict(X)
score = silhouette_score(X, labels)
print(f' {linkage:8s}: {score:.4f}')from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import numpy as np, pandas as pd
np.random.seed(42)
n = 500
df = pd.DataFrame({
'recency': np.random.randint(1, 365, n),
'frequency': np.random.randint(1, 100, n),
'monetary': np.random.exponential(300, n)
})
X = StandardScaler().fit_transform(df)
km = KMeans(n_clusters=4, random_state=42, n_init=10)
df['segment'] = km.fit_predict(X)
segment_names = {0: 'Champions', 1: 'At Risk', 2: 'New Customers', 3: 'Lost'}
summary = df.groupby('segment').agg({'recency':'mean','frequency':'mean','monetary':'mean','segment':'count'})
summary.columns = ['Avg Recency', 'Avg Frequency', 'Avg Monetary', 'Count']
print('Customer Segments:')
print(summary.round(1))from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
import numpy as np
X, true_labels = make_blobs(n_samples=400, centers=5, random_state=42)
X = StandardScaler().fit_transform(X)
print(f'{'k':>3} | {'Inertia':>12} | {'Silhouette':>12}')
print('-' * 35)
inertias = []
sil_scores = []
for k in range(2, 11):
# TODO: km = KMeans(n_clusters=k, random_state=42, n_init=10)
# TODO: km.fit(X)
# TODO: inertias.append(km.inertia_)
# TODO: sil = silhouette_score(X, km.labels_)
# TODO: sil_scores.append(sil)
# TODO: print(f'{k:>3} | {km.inertia_:>12.2f} | {sil:>12.4f}')
pass
# TODO: best_k = range(2, 11)[np.argmax(sil_scores)]
# print(f'Best k by silhouette: {best_k}')Measure model performance: confusion matrix, ROC-AUC, precision-recall, cross-validation.
Classification Metrics
from sklearn.metrics import (
confusion_matrix, classification_report,
roc_auc_score, roc_curve, accuracy_score
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=500, n_features=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf = RandomForestClassifier(n_estimators=50, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
y_prob = rf.predict_proba(X_test)[:, 1]
print('Accuracy:', accuracy_score(y_test, y_pred))
print('ROC-AUC:', roc_auc_score(y_test, y_prob).round(4))
print('Confusion Matrix:')
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))Cross-Validation & Regression Metrics
from sklearn.model_selection import cross_val_score, KFold
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.datasets import make_regression
import numpy as np
X, y = make_regression(n_samples=300, n_features=5, noise=20, random_state=42)
rf = RandomForestRegressor(n_estimators=50, random_state=42)
# 5-fold cross-validation
cv = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(rf, X, y, cv=cv, scoring='r2')
print(f'CV R2: {scores.mean():.4f} +/- {scores.std():.4f}')
# Also report MAE, RMSE on test set
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf.fit(X_train, y_train)
yp = rf.predict(X_test)
print(f'MAE: {mean_absolute_error(y_test, yp):.2f}')
print(f'RMSE: {np.sqrt(mean_squared_error(y_test, yp)):.2f}')
print(f'R2: {r2_score(y_test, yp):.4f}')Precision-Recall Curve & Threshold Tuning
from sklearn.metrics import precision_recall_curve, average_precision_score
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import numpy as np
X, y = make_classification(n_samples=1000, weights=[0.85, 0.15], random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
rf = RandomForestClassifier(n_estimators=50, random_state=42)
rf.fit(X_train, y_train)
y_prob = rf.predict_proba(X_test)[:, 1]
print(f'Avg Precision Score: {average_precision_score(y_test, y_prob):.4f}')
# Find threshold that maximises F1
thresholds = np.arange(0.1, 0.9, 0.05)
best_f1, best_thresh = 0, 0.5
for t in thresholds:
y_pred_t = (y_prob >= t).astype(int)
f1 = f1_score(y_test, y_pred_t, zero_division=0)
if f1 > best_f1:
best_f1, best_thresh = f1, t
print(f'Best threshold={best_thresh:.2f} F1={best_f1:.4f}')
y_best = (y_prob >= best_thresh).astype(int)
print(f'Precision={precision_score(y_test, y_best):.4f} Recall={recall_score(y_test, y_best):.4f}')Learning Curve, Validation Curve & Precision-Recall
from sklearn.model_selection import learning_curve, validation_curve
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import numpy as np
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
# Learning curve β how accuracy changes with more training data
train_sizes, train_scores, val_scores = learning_curve(
RandomForestClassifier(n_estimators=50, random_state=42),
X, y, cv=5, train_sizes=np.linspace(0.1, 1.0, 8), scoring='accuracy'
)
print('Learning curve (train size β val accuracy):')
for sz, vs in zip(train_sizes, val_scores.mean(axis=1)):
print(f' n={int(sz):4d}: {vs:.4f}')
# Validation curve β how accuracy changes with a hyperparameter
param_range = [10, 50, 100, 200, 300]
train_s, val_s = validation_curve(
RandomForestClassifier(random_state=42),
X, y, param_name='n_estimators', param_range=param_range,
cv=5, scoring='accuracy'
)
print('\nValidation curve (n_estimators β val accuracy):')
for n, vs in zip(param_range, val_s.mean(axis=1)):
print(f' n={n:3d}: {vs:.4f}')from sklearn.metrics import classification_report, roc_auc_score, recall_score
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
# Simulate imbalanced cancer screening data (10% positive)
X, y = make_classification(
n_samples=1000, weights=[0.9, 0.1],
n_features=8, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
model = GradientBoostingClassifier(random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
print('Cancer Detection Model Evaluation:')
print(f'Recall (sensitivity): {recall_score(y_test, y_pred):.4f}')
print(f'ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}')
print(classification_report(y_test, y_pred, target_names=['Healthy','Cancer']))from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
import numpy as np
X, y = make_classification(
n_samples=800, weights=[0.9, 0.1], n_features=8, random_state=1
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=1
)
# TODO: Train RandomForestClassifier and get predict_proba
# rf = RandomForestClassifier(n_estimators=100, random_state=1)
# rf.fit(X_train, y_train)
# y_prob = rf.predict_proba(X_test)[:, 1]
# TODO: Default threshold (0.5) metrics
# y_pred = (y_prob >= 0.5).astype(int)
# print(f'Default: P={precision_score(y_test, y_pred):.4f} R={recall_score(y_test, y_pred):.4f} F1={f1_score(y_test, y_pred):.4f}')
# TODO: Threshold sweep
# best_f1, best_t = 0, 0.5
# for t in np.arange(0.1, 0.9, 0.05):
# yp = (y_prob >= t).astype(int)
# f1 = f1_score(y_test, yp, zero_division=0)
# if f1 > best_f1: best_f1, best_t = f1, t
# print(f'Best threshold={best_t:.2f} F1={best_f1:.4f}')
# TODO: ROC-AUC
# print('ROC-AUC:', roc_auc_score(y_test, y_prob).round(4))Chain preprocessing + model into a single Pipeline. Prevent data leakage and simplify deployment.
Building a Pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
import numpy as np
X, y = make_classification(n_samples=500, n_features=6, random_state=42)
# Inject missing values
X_missing = X.copy()
X_missing[np.random.choice(500, 50), np.random.choice(6, 50)] = np.nan
X_train, X_test, y_train, y_test = train_test_split(X_missing, y, test_size=0.2, random_state=42)
pipe = Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=50, random_state=42))
])
pipe.fit(X_train, y_train)
print('Pipeline accuracy:', pipe.score(X_test, y_test).round(4))ColumnTransformer for Mixed Data
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
import pandas as pd, numpy as np
# Mixed data: numeric + categorical
np.random.seed(42)
df = pd.DataFrame({
'age': np.random.randint(18, 70, 200),
'income': np.random.normal(50000, 20000, 200),
'city': np.random.choice(['NYC', 'LA', 'Chicago'], 200),
'plan': np.random.choice(['basic', 'premium'], 200)
})
y = (df['income'] > 55000).astype(int)
num_cols = ['age', 'income']
cat_cols = ['city', 'plan']
preprocessor = ColumnTransformer([
('num', StandardScaler(), num_cols),
('cat', OneHotEncoder(drop='first'), cat_cols)
])
pipe = Pipeline([('prep', preprocessor), ('clf', LogisticRegression())])
from sklearn.model_selection import cross_val_score
scores = cross_val_score(pipe, df, y, cv=5)
print('CV Accuracy:', scores.mean().round(4))Pipeline with Polynomial Features
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=500, n_features=4, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
pipe = Pipeline([
('scaler', StandardScaler()),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('clf', LogisticRegression(max_iter=1000))
])
pipe.fit(X_train, y_train)
print('Pipeline steps:', [name for name, _ in pipe.steps])
print(f'Train accuracy: {pipe.score(X_train, y_train):.4f}')
print(f'Test accuracy: {pipe.score(X_test, y_test):.4f}')
print(f'Features after poly: {pipe.named_steps["poly"].n_output_features_}')Pipeline with SelectKBest & FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
import numpy as np
X, y = make_classification(n_samples=500, n_features=20,
n_informative=8, random_state=42)
# Custom log1p transform as a FunctionTransformer
log_transform = FunctionTransformer(np.log1p, validate=True)
pipe = Pipeline([
('log', FunctionTransformer(np.abs)), # make all positive first
('scaler', StandardScaler()),
('select', SelectKBest(f_classif, k=8)), # keep top 8 features
('clf', LogisticRegression(max_iter=500))
])
scores = cross_val_score(pipe, X, y, cv=5, scoring='accuracy')
print('Pipeline with SelectKBest(k=8):')
print(f' CV accuracy: {scores.mean():.4f} Β± {scores.std():.4f}')
# Compare: all 20 features vs top 8
pipe_all = Pipeline([('scaler', StandardScaler()), ('clf', LogisticRegression(max_iter=500))])
scores_all = cross_val_score(pipe_all, X, y, cv=5)
print(f' All 20 features: {scores_all.mean():.4f} Β± {scores_all.std():.4f}')from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pandas as pd, numpy as np
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(22, 60, n),
'salary': np.random.normal(60000, 20000, n),
'years_at_co': np.random.randint(1, 20, n),
'dept': np.random.choice(['Eng', 'Sales', 'HR', 'Finance'], n),
'satisfaction': np.random.choice(['low', 'med', 'high'], n)
})
df['attrition'] = ((df['salary'] < 45000) | (df['satisfaction'] == 'low')).astype(int)
X = df.drop('attrition', axis=1)
y = df['attrition']
pre = ColumnTransformer([
('num', StandardScaler(), ['age', 'salary', 'years_at_co']),
('cat', OneHotEncoder(drop='first', sparse_output=False), ['dept', 'satisfaction'])
])
pipe = Pipeline([('prep', pre), ('clf', GradientBoostingClassifier(random_state=42))])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
pipe.fit(X_train, y_train)
print('Employee Attrition Pipeline:')
print(classification_report(y_test, pipe.predict(X_test)))from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import pandas as pd, numpy as np
np.random.seed(42)
n = 400
df = pd.DataFrame({
'age': np.random.randint(18, 65, n).astype(float),
'salary': np.random.normal(50000, 15000, n),
'dept': np.random.choice(['Tech', 'HR', 'Sales'], n),
'remote': np.random.choice(['yes', 'no'], n),
})
df.loc[np.random.choice(n, 30), 'age'] = np.nan
y = (df['salary'] > 55000).astype(int)
# TODO: Define numeric_features and categorical_features
numeric_features = [] # age, salary
categorical_features = [] # dept, remote
# TODO: Build numeric_transformer Pipeline: SimpleImputer(strategy='median') -> StandardScaler
numeric_transformer = None
# TODO: Build categorical_transformer Pipeline: SimpleImputer(strategy='most_frequent') -> OneHotEncoder(drop='first')
categorical_transformer = None
# TODO: Build ColumnTransformer with numeric and categorical transformers
preprocessor = None
# TODO: Build full Pipeline: preprocessor -> RandomForestClassifier(n_estimators=50, random_state=42)
pipe = None
# TODO: Run cross_val_score and print mean accuracy
# scores = cross_val_score(pipe, df, y, cv=5)
# print('CV Accuracy:', scores.mean().round(4))Find the best model parameters using GridSearchCV and RandomizedSearchCV.
GridSearchCV
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=600, n_features=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5]
}
grid = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid, cv=5, scoring='accuracy', n_jobs=-1
)
grid.fit(X_train, y_train)
print('Best params:', grid.best_params_)
print('Best CV score:', round(grid.best_score_, 4))
print('Test score:', round(grid.score(X_test, y_test), 4))RandomizedSearchCV (Faster)
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from scipy.stats import randint, uniform
X, y = make_classification(n_samples=600, n_features=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
param_dist = {
'n_estimators': randint(50, 300),
'max_depth': randint(2, 10),
'learning_rate': uniform(0.01, 0.3),
'subsample': uniform(0.6, 0.4)
}
rscv = RandomizedSearchCV(
GradientBoostingClassifier(random_state=42),
param_dist, n_iter=20, cv=5, scoring='accuracy',
random_state=42, n_jobs=-1
)
rscv.fit(X_train, y_train)
print('Best params:', rscv.best_params_)
print('Best CV score:', round(rscv.best_score_, 4))
print('Test score:', round(rscv.score(X_test, y_test), 4))HalvingGridSearchCV (Successive Halving)
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import HalvingGridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=2000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
param_grid = {
'n_estimators': [50, 100, 200, 300],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5]
}
search = HalvingGridSearchCV(
RandomForestClassifier(random_state=42),
param_grid, factor=3, cv=3, scoring='accuracy',
random_state=42, n_jobs=-1
)
search.fit(X_train, y_train)
print('Best params:', search.best_params_)
print('Best CV score:', round(search.best_score_, 4))
print('Test accuracy:', round(search.score(X_test, y_test), 4))
print(f'Configs evaluated: {len(search.cv_results_["mean_test_score"])}')cross_validate with Multiple Scoring Metrics
from sklearn.model_selection import cross_validate
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
import numpy as np
X, y = make_classification(n_samples=800, n_features=10,
weights=[0.7, 0.3], random_state=42)
# Evaluate with multiple metrics at once
scoring = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
results = cross_validate(
GradientBoostingClassifier(n_estimators=100, random_state=42),
X, y, cv=5, scoring=scoring, return_train_score=True
)
print('5-fold CV results (mean Β± std):')
for metric in scoring:
test_mean = results[f'test_{metric}'].mean()
test_std = results[f'test_{metric}'].std()
train_mean = results[f'train_{metric}'].mean()
gap = train_mean - test_mean
print(f' {metric:12s}: {test_mean:.4f} Β± {test_std:.4f} (train={train_mean:.4f}, gap={gap:.4f})')
print(f'\nFit time: {results["fit_time"].mean():.3f}s avg')from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from scipy.stats import randint, uniform
# Simulate CTR data (heavily imbalanced: ~2% click rate)
X, y = make_classification(
n_samples=2000, weights=[0.98, 0.02],
n_features=10, n_informative=6, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, stratify=y, test_size=0.2, random_state=42
)
param_dist = {
'n_estimators': randint(100, 400),
'learning_rate': uniform(0.01, 0.2),
'max_depth': randint(2, 6)
}
rscv = RandomizedSearchCV(
GradientBoostingClassifier(random_state=42),
param_dist, n_iter=15, cv=3, scoring='roc_auc',
random_state=42, n_jobs=-1
)
rscv.fit(X_train, y_train)
from sklearn.metrics import roc_auc_score
y_prob = rscv.predict_proba(X_test)[:, 1]
print('CTR Model Tuning:')
print('Best AUC:', round(rscv.best_score_, 4))
print('Test AUC:', round(roc_auc_score(y_test, y_prob), 4))from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import numpy as np
X, y = make_classification(n_samples=800, n_features=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# TODO: Build Pipeline with StandardScaler and SVC
# pipe = Pipeline([('scaler', StandardScaler()), ('svm', SVC())])
# TODO: Define param_grid for SVC:
# 'svm__C': [0.1, 1, 10, 100], 'svm__kernel': ['rbf', 'linear']
param_grid = {}
# TODO: Run GridSearchCV with cv=5, scoring='accuracy'
# grid = GridSearchCV(pipe, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
# grid.fit(X_train, y_train)
# print('Best params:', grid.best_params_)
# print('Test accuracy:', round(grid.score(X_test, y_test), 4))
# BONUS: Try RandomizedSearchCV with n_iter=10
# from scipy.stats import loguniform
# param_dist = {'svm__C': loguniform(0.01, 100), 'svm__kernel': ['rbf', 'linear']}
# rscv = RandomizedSearchCV(pipe, param_dist, n_iter=10, cv=5, random_state=42)
# rscv.fit(X_train, y_train)
# print('RandomizedSearch best:', rscv.best_params_)Reduce high-dimensional data for visualization, noise reduction, and speeding up training.
PCA β Principal Component Analysis
from sklearn.decomposition import PCA
from sklearn.datasets import load_digits
from sklearn.preprocessing import StandardScaler
import numpy as np
digits = load_digits()
X = StandardScaler().fit_transform(digits.data)
print('Original shape:', X.shape) # 1797 x 64
# Keep 95% of variance
pca = PCA(n_components=0.95)
X_pca = pca.fit_transform(X)
print('Reduced shape:', X_pca.shape)
print(f'Explained variance: {pca.explained_variance_ratio_.sum():.3f}')
# 2D for visualization
pca2 = PCA(n_components=2)
X_2d = pca2.fit_transform(X)
print('2D shape:', X_2d.shape)
print('Variance explained by 2 PCs:', pca2.explained_variance_ratio_.sum().round(3))t-SNE for Visualization
from sklearn.manifold import TSNE
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
import numpy as np
iris = load_iris()
X = StandardScaler().fit_transform(iris.data)
# t-SNE: great for visualization, non-linear
tsne = TSNE(n_components=2, random_state=42, perplexity=30, max_iter=1000)
X_tsne = tsne.fit_transform(X)
print('t-SNE output shape:', X_tsne.shape)
# Confirm clusters align with true labels
for cls in range(3):
mask = iris.target == cls
center = X_tsne[mask].mean(axis=0)
print(f'{iris.target_names[cls]}: center=({center[0]:.1f}, {center[1]:.1f})')Explained Variance Curve with PCA
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_digits
import numpy as np
digits = load_digits()
X = StandardScaler().fit_transform(digits.data)
pca_full = PCA()
pca_full.fit(X)
cumvar = np.cumsum(pca_full.explained_variance_ratio_)
n90 = np.argmax(cumvar >= 0.90) + 1
n95 = np.argmax(cumvar >= 0.95) + 1
n99 = np.argmax(cumvar >= 0.99) + 1
print(f'Total features: {X.shape[1]}')
print(f'Components for 90% variance: {n90} ({X.shape[1]/n90:.1f}x compression)')
print(f'Components for 95% variance: {n95} ({X.shape[1]/n95:.1f}x compression)')
print(f'Components for 99% variance: {n99} ({X.shape[1]/n99:.1f}x compression)')
print('Top 5 PC variance explained:')
for i, (var, cum) in enumerate(zip(pca_full.explained_variance_ratio_[:5], cumvar[:5])):
print(f' PC{i+1}: {var:.4f} (cumulative: {cum:.4f})')NMF & Isomap for Non-Linear Reduction
from sklearn.decomposition import NMF
from sklearn.manifold import Isomap
from sklearn.preprocessing import MinMaxScaler
from sklearn.datasets import load_digits
import numpy as np
digits = load_digits()
X = MinMaxScaler().fit_transform(digits.data) # NMF needs non-negative
y = digits.target
# NMF: learns parts-based representation
nmf = NMF(n_components=20, max_iter=500, random_state=42)
X_nmf = nmf.fit_transform(X)
print(f'NMF: {X.shape} β {X_nmf.shape}')
print(f'Reconstruction error: {nmf.reconstruction_err_:.4f}')
# Isomap: non-linear manifold learning (preserves geodesic distances)
iso = Isomap(n_components=2, n_neighbors=10)
X_iso = iso.fit_transform(X)
print(f'\nIsomap: {X.shape} β {X_iso.shape}')
# Check cluster quality: std of 2D coordinates per digit class
print('Per-digit cluster spread (lower = tighter cluster):')
for cls in range(10):
spread = X_iso[y == cls].std()
print(f' Digit {cls}: {spread:.3f}')from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
# Simulated news headlines
docs = [
'stock market rises sharply', 'fed raises interest rates',
'tech giants report earnings', 'inflation data released',
'new iphone model announced', 'oil prices fall today',
'housing market cools down', 'crypto prices volatile',
'gdp growth beats forecast', 'layoffs hit tech sector'
]
labels = [1, 1, 1, 1, 1, 0, 0, 0, 1, 0] # 1=finance/tech, 0=other
# TF-IDF -> LSA (Truncated SVD) -> Logistic Regression
pipe = Pipeline([
('tfidf', TfidfVectorizer(max_features=50)),
('svd', TruncatedSVD(n_components=5, random_state=42)),
('clf', LogisticRegression())
])
pipe.fit(docs, labels)
print('Test predictions:', pipe.predict(docs[-3:]))
print('True labels: ', labels[-3:])from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_digits
import numpy as np
digits = load_digits()
X = StandardScaler().fit_transform(digits.data)
y = digits.target
print('Original shape:', X.shape) # (1797, 64)
# TODO: Apply PCA keeping 90% of variance
# pca = PCA(n_components=???)
# X_pca = pca.fit_transform(X)
# print('Reduced shape:', X_pca.shape)
# TODO: 5-fold CV on ORIGINAL data with KNN(n_neighbors=5)
# scores_orig = cross_val_score(KNeighborsClassifier(n_neighbors=5), X, y, cv=5)
# print(f'KNN on original: {scores_orig.mean():.4f}')
# TODO: 5-fold CV on PCA-reduced data
# scores_pca = cross_val_score(KNeighborsClassifier(n_neighbors=5), X_pca, y, cv=5)
# print(f'KNN on PCA: {scores_pca.mean():.4f}')
# BONUS: Reduce to 2D and print class centers
# pca2 = PCA(n_components=2)
# X_2d = pca2.fit_transform(X)
# for cls in range(10):
# center = X_2d[y == cls].mean(axis=0)
# print(f'Digit {cls} center: ({center[0]:.2f}, {center[1]:.2f})')Handle class imbalance using resampling (oversampling minority, undersampling majority), class weights, and threshold tuning to improve recall on rare classes.
Class Weights & Threshold Tuning
import numpy as np
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score
np.random.seed(42)
X, y = make_classification(
n_samples=5000, weights=[0.95, 0.05], random_state=42
)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
# Without class weight
lr_base = LogisticRegression(random_state=0).fit(X_tr, y_tr)
print('Default class_weight:')
print(classification_report(y_te, lr_base.predict(X_te), target_names=['maj','min']))
# With class_weight='balanced'
lr_bal = LogisticRegression(class_weight='balanced', random_state=0).fit(X_tr, y_tr)
proba = lr_bal.predict_proba(X_te)[:, 1]
best_thresh, best_f1 = 0.5, 0.0
for t in np.arange(0.1, 0.9, 0.05):
f1 = f1_score(y_te, (proba > t).astype(int))
if f1 > best_f1: best_f1, best_thresh = f1, t
print(f'Balanced model: best threshold={best_thresh:.2f}, minority F1={best_f1:.3f}')Manual Oversampling (Random + SMOTE-style)
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
np.random.seed(42)
X, y = make_classification(n_samples=2000, weights=[0.9, 0.1], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
def random_oversample(X, y, random_state=42):
rng = np.random.RandomState(random_state)
classes, counts = np.unique(y, return_counts=True)
max_count = max(counts)
X_bal, y_bal = [X], [y]
for cls, cnt in zip(classes, counts):
if cnt < max_count:
idx = np.where(y == cls)[0]
extra = rng.choice(idx, max_count - cnt, replace=True)
X_bal.append(X[extra])
y_bal.append(y[extra])
return np.vstack(X_bal), np.concatenate(y_bal)
X_res, y_res = random_oversample(X_tr, y_tr)
print(f'Before: {np.bincount(y_tr)}')
print(f'After: {np.bincount(y_res)}')
base_f1 = f1_score(y_te, LogisticRegression(random_state=0).fit(X_tr, y_tr).predict(X_te))
over_f1 = f1_score(y_te, LogisticRegression(random_state=0).fit(X_res, y_res).predict(X_te))
print(f'Base minority F1: {base_f1:.3f} | Oversampled F1: {over_f1:.3f}')SMOTE with imbalanced-learn
try:
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
import numpy as np
np.random.seed(42)
X, y = make_classification(n_samples=2000, weights=[0.9, 0.1], random_state=42)
pipe = ImbPipeline([
('smote', SMOTE(random_state=42)),
('rf', RandomForestClassifier(n_estimators=50, random_state=0))
])
scores = cross_val_score(pipe, X, y, cv=5, scoring='f1')
print(f'SMOTE+RF F1: {scores.mean():.3f} +/- {scores.std():.3f}')
base_scores = cross_val_score(
RandomForestClassifier(class_weight='balanced', n_estimators=50, random_state=0),
X, y, cv=5, scoring='f1'
)
print(f'Balanced RF F1: {base_scores.mean():.3f} +/- {base_scores.std():.3f}')
except ImportError:
print('pip install imbalanced-learn')
print('SMOTE: synthetic minority oversampling.')
print('For each minority sample, create synthetic points along lines to k-nearest neighbors.')Precision-Recall Curve & AUC-PR
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_curve, average_precision_score, roc_auc_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
X, y = make_classification(n_samples=3000, weights=[0.9, 0.1], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
lr = LogisticRegression(class_weight='balanced', random_state=0).fit(X_tr, y_tr)
proba = lr.predict_proba(X_te)[:, 1]
prec, rec, thresh = precision_recall_curve(y_te, proba)
ap = average_precision_score(y_te, proba)
auc = roc_auc_score(y_te, proba)
print(f'ROC-AUC: {auc:.4f}')
print(f'Average Precision (AUC-PR): {ap:.4f}')
print('Use AUC-PR for imbalanced data (ROC can be misleadingly high!)')
fig, ax = plt.subplots(figsize=(6, 5))
ax.plot(rec, prec, lw=2, label=f'AP={ap:.3f}')
ax.axhline(y_te.mean(), linestyle='--', color='gray', label=f'Baseline ({y_te.mean():.3f})')
ax.set_xlabel('Recall'); ax.set_ylabel('Precision')
ax.set_title('Precision-Recall Curve'); ax.legend()
plt.tight_layout(); plt.savefig('pr_curve.png', dpi=80); plt.close()
print('Saved pr_curve.png')import numpy as np
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, average_precision_score
np.random.seed(7)
X, y = make_classification(n_samples=10000, weights=[0.995, 0.005], random_state=7,
n_features=15, n_informative=8)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
model = LogisticRegression(class_weight='balanced', max_iter=500, random_state=0)
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)[:, 1]
# Find threshold where precision >= 0.40
from sklearn.metrics import precision_recall_curve
prec, rec, thresh = precision_recall_curve(y_te, proba)
valid = np.where(prec[:-1] >= 0.40)[0]
if len(valid):
best_t = thresh[valid[np.argmax(rec[valid])]]
pred = (proba >= best_t).astype(int)
print(f'Threshold: {best_t:.3f}')
print(classification_report(y_te, pred, target_names=['legit', 'fraud']))
print(f'AUC-PR: {average_precision_score(y_te, proba):.4f}')import numpy as np
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, average_precision_score
np.random.seed(42)
X, y = make_classification(n_samples=3000, weights=[0.95, 0.05], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=0)
# TODO: (1) Default LR - compute minority F1 and AUC-PR
# TODO: (2) Balanced LR (class_weight='balanced') - same metrics
# TODO: (3) LR + random oversampling - same metrics
# TODO: For balanced LR, find threshold maximizing minority F1
Build sklearn-compatible custom transformers and estimators using BaseEstimator, TransformerMixin, and ClassifierMixin to integrate into pipelines.
Custom Transformer with TransformerMixin
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
class WinsorizeTransformer(BaseEstimator, TransformerMixin):
'''Clip values to [lower_q, upper_q] quantiles per feature.'''
def __init__(self, lower=0.01, upper=0.99):
self.lower = lower
self.upper = upper
def fit(self, X, y=None):
self.lower_ = np.quantile(X, self.lower, axis=0)
self.upper_ = np.quantile(X, self.upper, axis=0)
return self
def transform(self, X):
return np.clip(X, self.lower_, self.upper_)
np.random.seed(42)
X, y = make_regression(n_samples=300, n_features=5, noise=5, random_state=42)
X[:10, 0] = 1000 # outliers
pipe = Pipeline([
('winsor', WinsorizeTransformer(lower=0.05, upper=0.95)),
('lr', LinearRegression())
])
from sklearn.model_selection import cross_val_score
scores = cross_val_score(pipe, X, y, cv=5, scoring='r2')
print(f'Winsorized pipeline R2: {scores.mean():.4f} +/- {scores.std():.4f}')
print(f'Params: {pipe.get_params()}')Custom Classifier with ClassifierMixin
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import accuracy_score
class MajorityVoteClassifier(BaseEstimator, ClassifierMixin):
'''Predicts the most common class in the neighborhood (baseline).'''
def __init__(self, window=10):
self.window = window
def fit(self, X, y):
X, y = check_X_y(X, y)
self.classes_ = unique_labels(y)
self.X_train_ = X
self.y_train_ = y
return self
def predict(self, X):
check_is_fitted(self)
X = check_array(X)
preds = []
for x in X:
dists = np.sum((self.X_train_ - x)**2, axis=1)
nn_idx = np.argsort(dists)[:self.window]
vals, cnts = np.unique(self.y_train_[nn_idx], return_counts=True)
preds.append(vals[np.argmax(cnts)])
return np.array(preds)
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
np.random.seed(42)
X, y = make_classification(n_samples=500, n_features=10, random_state=42)
clf = MajorityVoteClassifier(window=15)
scores = cross_val_score(clf, X, y, cv=5)
print(f'MajorityVote CV accuracy: {scores.mean():.4f}')Custom Selector: SelectByCorrelation
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
class SelectByCorrelation(BaseEstimator, TransformerMixin):
'''Keep features with |Pearson corr| > threshold with target.'''
def __init__(self, threshold=0.1):
self.threshold = threshold
def fit(self, X, y):
corrs = np.array([
abs(np.corrcoef(X[:, j], y)[0, 1])
for j in range(X.shape[1])
])
self.selected_ = np.where(corrs >= self.threshold)[0]
self.n_features_in_ = X.shape[1]
return self
def transform(self, X):
return X[:, self.selected_]
np.random.seed(42)
X, y = make_classification(n_samples=600, n_features=20, n_informative=5, random_state=42)
pipe = Pipeline([
('select', SelectByCorrelation(threshold=0.05)),
('lr', LogisticRegression(max_iter=200, random_state=0))
])
from sklearn.model_selection import GridSearchCV
gs = GridSearchCV(pipe, {'select__threshold': [0.02, 0.05, 0.1, 0.15]}, cv=5)
gs.fit(X, y)
print(f'Best threshold: {gs.best_params_["select__threshold"]}')
print(f'Best CV accuracy: {gs.best_score_:.4f}')
pipe.fit(X, y)
print(f'Features selected: {pipe.named_steps["select"].selected_.tolist()}')set_output API & check_estimator
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.estimator_checks import parametrize_with_checks
class RobustScaler(BaseEstimator, TransformerMixin):
'''Scale by median and IQR for robustness to outliers.'''
def __init__(self):
pass
def fit(self, X, y=None):
self.median_ = np.median(X, axis=0)
q75, q25 = np.percentile(X, [75, 25], axis=0)
self.iqr_ = q75 - q25
self.iqr_[self.iqr_ == 0] = 1.0
return self
def transform(self, X):
return (X - self.median_) / self.iqr_
np.random.seed(42)
X = np.random.randn(100, 4)
X[[0,1,2], 0] = 100 # outliers
rs = RobustScaler()
X_scaled = rs.fit_transform(X)
print('Original col0 stats: mean={:.1f}, std={:.1f}'.format(X[:,0].mean(), X[:,0].std()))
print('Scaled col0 stats: mean={:.3f}, std={:.3f}'.format(X_scaled[:,0].mean(), X_scaled[:,0].std()))
print('set_output API (pandas):')
rs2 = RobustScaler().set_output(transform='pandas')
df = pd.DataFrame(X[:5], columns=['a','b','c','d'])
print(rs2.fit_transform(df))import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.datasets import make_classification
class SafeLogTransformer(BaseEstimator, TransformerMixin):
def __init__(self, offset=1.0): self.offset = offset
def fit(self, X, y=None): return self
def transform(self, X): return np.log1p(np.abs(X)) * np.sign(X)
class WinsorizeTransformer(BaseEstimator, TransformerMixin):
def __init__(self, q=0.05): self.q = q
def fit(self, X, y=None):
self.lo_ = np.quantile(X, self.q, axis=0)
self.hi_ = np.quantile(X, 1-self.q, axis=0)
return self
def transform(self, X): return np.clip(X, self.lo_, self.hi_)
np.random.seed(42)
X, y = make_classification(n_samples=800, n_features=12, n_informative=6, random_state=42)
X[:20, :3] *= 100
pipe = Pipeline([
('winsor', WinsorizeTransformer(q=0.05)),
('log', SafeLogTransformer()),
('lr', LogisticRegression(max_iter=300, random_state=0))
])
scores = cross_val_score(pipe, X, y, cv=5)
print(f'Custom pipeline CV: {scores.mean():.4f} +/- {scores.std():.4f}')import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.datasets import make_classification
class ClipTransformer(BaseEstimator, TransformerMixin):
def __init__(self, k=3.0):
self.k = k
def fit(self, X, y=None):
# TODO: store mean_ and std_ for each feature
return self
def transform(self, X):
# TODO: clip to [mean_ - k*std_, mean_ + k*std_]
pass
np.random.seed(42)
X, y = make_classification(n_samples=600, n_features=10, random_state=42)
X[:10, :3] *= 50 # inject outliers
pipe = Pipeline([('clip', ClipTransformer()), ('lr', LogisticRegression(max_iter=200))])
# TODO: GridSearchCV over clip__k in [1.5, 2.0, 2.5, 3.0]
# TODO: Print best k and best CV score
Calibrate classifier probabilities so that a predicted 0.7 means 70% of samples are positive. Use Platt scaling and isotonic regression with reliability diagrams.
Calibration Curve (Reliability Diagram)
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.calibration import calibration_curve
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
X, y = make_classification(n_samples=3000, n_features=20, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
models = {
'Random Forest': RandomForestClassifier(n_estimators=50, random_state=0),
'Logistic Reg': LogisticRegression(max_iter=200, random_state=0),
}
fig, ax = plt.subplots(figsize=(6, 6))
ax.plot([0,1], [0,1], 'k--', label='Perfect')
for name, clf in models.items():
clf.fit(X_tr, y_tr)
prob_pos = clf.predict_proba(X_te)[:, 1]
frac_pos, mean_pred = calibration_curve(y_te, prob_pos, n_bins=10)
ax.plot(mean_pred, frac_pos, 's-', label=name)
ax.set_xlabel('Mean predicted probability')
ax.set_ylabel('Fraction of positives')
ax.set_title('Reliability Diagram'); ax.legend()
plt.tight_layout(); plt.savefig('calibration.png', dpi=80); plt.close()
print('Saved calibration.png')
print('RF is typically overconfident; LR is generally better calibrated.')Platt Scaling (Sigmoid Calibration)
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import train_test_split
from sklearn.metrics import brier_score_loss, log_loss
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
rf = RandomForestClassifier(n_estimators=50, random_state=0)
rf.fit(X_tr, y_tr)
# Platt scaling = sigmoid calibration
platt = CalibratedClassifierCV(RandomForestClassifier(n_estimators=50, random_state=0),
method='sigmoid', cv=5)
platt.fit(X_tr, y_tr)
# Isotonic regression calibration
isoton = CalibratedClassifierCV(RandomForestClassifier(n_estimators=50, random_state=0),
method='isotonic', cv=5)
isoton.fit(X_tr, y_tr)
for name, clf in [('RF (raw)', rf), ('Platt', platt), ('Isotonic', isoton)]:
prob = clf.predict_proba(X_te)[:, 1]
print(f'{name:<15} Brier={brier_score_loss(y_te, prob):.4f} LogLoss={log_loss(y_te, prob):.4f}')Expected Calibration Error (ECE)
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import train_test_split
def expected_calibration_error(y_true, y_prob, n_bins=10):
bins = np.linspace(0, 1, n_bins + 1)
ece = 0.0
for lo, hi in zip(bins[:-1], bins[1:]):
mask = (y_prob >= lo) & (y_prob < hi)
if not mask.any(): continue
frac_pos = y_true[mask].mean()
mean_conf = y_prob[mask].mean()
ece += mask.mean() * abs(frac_pos - mean_conf)
return ece
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
gbm = GradientBoostingClassifier(n_estimators=100, random_state=0).fit(X_tr, y_tr)
gbm_cal = CalibratedClassifierCV(
GradientBoostingClassifier(n_estimators=100, random_state=0), method='isotonic', cv=5
).fit(X_tr, y_tr)
for name, clf in [('GBM raw', gbm), ('GBM calibrated', gbm_cal)]:
prob = clf.predict_proba(X_te)[:, 1]
ece = expected_calibration_error(y_te, prob)
print(f'{name:<18} ECE={ece:.4f}')Temperature Scaling (post-hoc calibration)
import numpy as np
from scipy.optimize import minimize_scalar
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import log_loss
def sigmoid(z): return 1 / (1 + np.exp(-z))
def temperature_scale(logits, T):
return sigmoid(logits / T)
np.random.seed(42)
X, y = make_classification(n_samples=3000, n_features=20, random_state=42)
X_tr, X_val, X_te = X[:1500], X[1500:2000], X[2000:]
y_tr, y_val, y_te = y[:1500], y[1500:2000], y[2000:]
gbm = GradientBoostingClassifier(n_estimators=50, random_state=0).fit(X_tr, y_tr)
# Get raw log-odds on validation set
proba_val = gbm.predict_proba(X_val)[:, 1]
logits_val = np.log(proba_val + 1e-10) - np.log(1 - proba_val + 1e-10)
# Optimize temperature on validation set
result = minimize_scalar(
lambda T: log_loss(y_val, temperature_scale(logits_val, T)),
bounds=(0.1, 10.0), method='bounded'
)
T_opt = result.x
print(f'Optimal temperature: {T_opt:.3f}')
proba_te = gbm.predict_proba(X_te)[:, 1]
logits_te = np.log(proba_te + 1e-10) - np.log(1 - proba_te + 1e-10)
cal_proba = temperature_scale(logits_te, T_opt)
print(f'Original log-loss: {log_loss(y_te, proba_te):.4f}')
print(f'Calibrated log-loss: {log_loss(y_te, cal_proba):.4f}')import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import brier_score_loss
np.random.seed(1)
X, y = make_classification(n_samples=5000, n_features=15, weights=[0.8, 0.2], random_state=1)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
raw = GradientBoostingClassifier(n_estimators=100, random_state=0).fit(X_tr, y_tr)
cal = CalibratedClassifierCV(
GradientBoostingClassifier(n_estimators=100, random_state=0),
method='isotonic', cv=5
).fit(X_tr, y_tr)
for name, clf in [('Raw GBM', raw), ('Calibrated', cal)]:
prob = clf.predict_proba(X_te)[:, 1]
frac, mean_pred = calibration_curve(y_te, prob, n_bins=5)
print(f'{name} Brier: {brier_score_loss(y_te, prob):.4f}')
for mp, fp in zip(mean_pred, frac):
print(f' pred={mp:.2f} -> actual={fp:.2f} (err={abs(fp-mp):.2f})')import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import brier_score_loss
np.random.seed(42)
X, y = make_classification(n_samples=3000, n_features=15, weights=[0.9,0.1], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=0)
# TODO: Train raw RF
# TODO: Calibrate with 'sigmoid' (Platt)
# TODO: Calibrate with 'isotonic'
# TODO: For each, compute Brier score and print calibration curve values
Combine multiple base learners to build a stronger meta-model. Stacking uses out-of-fold predictions as features for a meta-learner; blending uses a single held-out set. Both reduce variance and capture complementary model strengths.
Stacking with OOF Predictions
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict, StratifiedKFold
from sklearn.metrics import roc_auc_score
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
# Base learners: get out-of-fold predictions
rf = RandomForestClassifier(n_estimators=100, random_state=0)
gbm = GradientBoostingClassifier(n_estimators=100, random_state=1)
rf_oof = cross_val_predict(rf, X, y, cv=cv, method='predict_proba')[:, 1]
gbm_oof = cross_val_predict(gbm, X, y, cv=cv, method='predict_proba')[:, 1]
# Stack: meta-learner on OOF predictions
import numpy as np
X_meta = np.column_stack([rf_oof, gbm_oof])
meta = LogisticRegression()
meta_oof = cross_val_predict(meta, X_meta, y, cv=cv, method='predict_proba')[:, 1]
print(f"RF AUC: {roc_auc_score(y, rf_oof):.4f}")
print(f"GBM AUC: {roc_auc_score(y, gbm_oof):.4f}")
print(f"Stack AUC:{roc_auc_score(y, meta_oof):.4f}")
Blending with Weight Search
import numpy as np
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
X, y = make_regression(n_samples=800, n_features=15, noise=20, random_state=42)
kf = KFold(n_splits=5, shuffle=True, random_state=0)
# Blending: single hold-out blend set
blend_idx = int(0.8 * len(X))
X_tr, X_bl = X[:blend_idx], X[blend_idx:]
y_tr, y_bl = y[:blend_idx], y[blend_idx:]
rf = RandomForestRegressor(n_estimators=100, random_state=0).fit(X_tr, y_tr)
gbm = GradientBoostingRegressor(n_estimators=100, random_state=1).fit(X_tr, y_tr)
rf_bl = rf.predict(X_bl)
gbm_bl = gbm.predict(X_bl)
# Grid search blending weights
best_w, best_rmse = 0.5, float('inf')
for w in np.arange(0, 1.05, 0.05):
blend = w * rf_bl + (1-w) * gbm_bl
rmse = np.sqrt(mean_squared_error(y_bl, blend))
if rmse < best_rmse:
best_rmse, best_w = rmse, w
print(f"RF RMSE: {np.sqrt(mean_squared_error(y_bl, rf_bl)):.4f}")
print(f"GBM RMSE: {np.sqrt(mean_squared_error(y_bl, gbm_bl)):.4f}")
print(f"Best blend (w_rf={best_w:.2f}): RMSE={best_rmse:.4f}")
sklearn StackingClassifier
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import (RandomForestClassifier, GradientBoostingClassifier,
ExtraTreesClassifier, StackingClassifier)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
X, y = make_classification(n_samples=1000, n_features=20,
n_informative=12, random_state=42)
estimators = [
('rf', RandomForestClassifier(n_estimators=100, random_state=0)),
('gbm', GradientBoostingClassifier(n_estimators=100, random_state=1)),
('et', ExtraTreesClassifier(n_estimators=100, random_state=2)),
]
stack = StackingClassifier(
estimators=estimators,
final_estimator=LogisticRegression(),
cv=5, passthrough=False
)
scores = cross_val_score(stack, X, y, cv=5, scoring='roc_auc')
print(f"Stacking AUC: {scores.mean():.4f} +/- {scores.std():.4f}")
Ensemble Comparison
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
X, y = make_classification(n_samples=1000, n_features=20, random_state=7)
models = {
"Single Tree": DecisionTreeClassifier(max_depth=5),
"Bagging": BaggingClassifier(DecisionTreeClassifier(max_depth=5),
n_estimators=50, random_state=0),
"AdaBoost": AdaBoostClassifier(n_estimators=100, random_state=1),
"Random Forest":RandomForestClassifier(n_estimators=100, random_state=2),
}
print(f"{'Model':<20} {'AUC':>8} {'Std':>6}")
for name, model in models.items():
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
print(f"{name:<20} {scores.mean():.4f} {scores.std():.4f}")
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, StackingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.metrics import roc_auc_score
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=25, n_informative=15, random_state=0)
estimators = [
("rf", RandomForestClassifier(n_estimators=100, random_state=0)),
("gbm", GradientBoostingClassifier(n_estimators=100, random_state=1)),
]
stack = StackingClassifier(estimators=estimators, final_estimator=LogisticRegression(), cv=5)
for name, clf in estimators + [("stack", stack)]:
scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
print(f"{name:<8}: AUC={scores.mean():.4f} +/- {scores.std():.4f}")
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import (RandomForestClassifier, GradientBoostingClassifier,
StackingClassifier)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict, StratifiedKFold, cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
X, y = load_breast_cancer(return_X_y=True)
# TODO: Build a 3-model stacking ensemble (RF, GBM, SVM or LR)
# TODO: Use StratifiedKFold(5) for OOF generation
# TODO: Meta-learner: LogisticRegression
# TODO: Compare: base models AUC vs stacking AUC
# TODO: Add feature passthrough=True and compare again
Use TimeSeriesSplit for realistic CV that respects temporal ordering. Expanding-window and sliding-window strategies prevent future data leakage and simulate production deployment conditions.
Expanding Window CV from Scratch
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
np.random.seed(42)
n = 500
dates = pd.date_range("2020-01-01", periods=n, freq="D")
trend = np.arange(n) * 0.05
season = 2 * np.sin(2 * np.pi * np.arange(n) / 7)
noise = np.random.normal(0, 0.5, n)
y = trend + season + noise
# Time-Series Split: expanding window
n_splits = 5
split_size = n // (n_splits + 1)
results = []
for fold in range(n_splits):
train_end = split_size * (fold + 2)
test_end = train_end + split_size
X_tr = np.column_stack([np.arange(train_end), np.sin(2*np.pi*np.arange(train_end)/7)])
X_te = np.column_stack([np.arange(train_end, test_end),
np.sin(2*np.pi*np.arange(train_end, test_end)/7)])
m = Ridge().fit(X_tr, y[:train_end])
pred = m.predict(X_te)
rmse = np.sqrt(mean_squared_error(y[train_end:test_end], pred))
results.append(rmse)
print(f"Fold {fold+1}: train={train_end}, test RMSE={rmse:.4f}")
print(f"Mean RMSE: {np.mean(results):.4f}")
TimeSeriesSplit with sklearn Pipeline
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
np.random.seed(1)
n = 400
t = np.arange(n)
y = 3*np.sin(2*np.pi*t/30) + 0.02*t + np.random.normal(0, 0.3, n)
# Feature engineering: lag features + time features
def make_features(t_arr, y_arr, lag=5):
X = np.column_stack([
t_arr,
np.sin(2*np.pi*t_arr/7),
np.sin(2*np.pi*t_arr/30),
] + [np.roll(y_arr, l) for l in range(1, lag+1)])
return X[lag:]
lag = 5
X = make_features(t, y, lag)
y_lagged = y[lag:]
tscv = TimeSeriesSplit(n_splits=5)
pipe = Pipeline([("scaler", StandardScaler()), ("ridge", Ridge(alpha=1.0))])
rmses = []
for tr, te in tscv.split(X):
pipe.fit(X[tr], y_lagged[tr])
pred = pipe.predict(X[te])
rmses.append(np.sqrt(mean_squared_error(y_lagged[te], pred)))
print(f" RMSE: {rmses[-1]:.4f}")
print(f"TimeSeriesSplit CV RMSE: {np.mean(rmses):.4f} +/- {np.std(rmses):.4f}")
Walk-Forward with Gap (prevents leakage)
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
np.random.seed(5)
n = 600
t = np.arange(n)
# Piecewise trend with seasonality
y = np.where(t < 300, 0.03*t, 0.01*t + 6) + 2*np.sin(2*np.pi*t/52) + np.random.normal(0, 0.5, n)
def make_X(t_arr, y_arr, lag=10):
features = [t_arr % 7, t_arr % 52] # day-of-week, week-of-year
for l in range(1, lag+1):
features.append(np.roll(y_arr, l))
X = np.column_stack(features)[lag:]
return X
X = make_X(t, y, lag=10)
y_f = y[10:]
tscv = TimeSeriesSplit(n_splits=5, gap=10)
results = []
for fold, (tr, te) in enumerate(tscv.split(X)):
m = GradientBoostingRegressor(n_estimators=100, max_depth=4, random_state=0)
m.fit(X[tr], y_f[tr])
pred = m.predict(X[te])
rmse = np.sqrt(mean_squared_error(y_f[te], pred))
results.append(rmse)
print(f"Fold {fold+1} (gap=10): RMSE={rmse:.4f}")
print(f"Mean RMSE: {np.mean(results):.4f}")
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error
np.random.seed(7)
n = 750
t = np.arange(n)
returns = np.random.normal(0.001, 0.02, n)
# Lag features
X = np.column_stack([np.roll(returns, l) for l in range(1, 11)])[10:]
y = returns[10:]
tscv = TimeSeriesSplit(n_splits=5, gap=5)
results = []
for fold, (tr, te) in enumerate(tscv.split(X)):
m = GradientBoostingRegressor(n_estimators=100, max_depth=3, random_state=0)
m.fit(X[tr], y[tr])
pred = m.predict(X[te])
rmse = np.sqrt(mean_squared_error(y[te], pred))
results.append(rmse)
print(f"Fold {fold+1}: test_size={len(te)}, RMSE={rmse:.6f}")
print(f"Mean RMSE: {np.mean(results):.6f}")
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
np.random.seed(0)
n = 730
t = np.arange(n)
# Daily demand with weekly + annual seasonality + upward trend
demand = (100 + 0.1*t + 20*np.sin(2*np.pi*t/7) +
10*np.sin(2*np.pi*t/365) + np.random.normal(0, 5, n))
# TODO: Create lag features (lag 1..14) + time features (dow, month)
# TODO: TimeSeriesSplit(n_splits=5, gap=7) walk-forward validation
# TODO: Compare Ridge, RF, GBM with CV RMSE
# TODO: Report per-fold RMSE and total mean RMSE for each model
SHAP (SHapley Additive exPlanations) provides consistent, theoretically grounded feature attributions for any model. Use TreeExplainer for tree-based models for fast exact SHAP values.
Global Feature Importance with SHAP
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
import shap
X, y = make_classification(n_samples=500, n_features=10,
n_informative=6, random_state=42)
feature_names = [f"feat_{i}" for i in range(10)]
model = RandomForestClassifier(n_estimators=100, random_state=0).fit(X, y)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X[:100])
# shap_values[1] = SHAP for class 1
print("Global feature importance (mean |SHAP|):")
mean_shap = np.abs(shap_values[1]).mean(axis=0)
for i in np.argsort(mean_shap)[::-1]:
bar = "#" * int(mean_shap[i] * 100)
print(f" {feature_names[i]:<12} {mean_shap[i]:.4f} {bar}")
Individual Prediction Explanation
import numpy as np
from sklearn.datasets import make_regression
from sklearn.ensemble import GradientBoostingRegressor
import shap
X, y = make_regression(n_samples=400, n_features=8, noise=10, random_state=0)
feature_names = ["age","income","tenure","spend","logins","products","region","segment"]
model = GradientBoostingRegressor(n_estimators=200, random_state=0).fit(X, y)
explainer = shap.TreeExplainer(model)
shap_values = explainer(X[:50])
print("Individual explanation for sample 0:")
print(f" Base value (expected prediction): {shap_values.base_values[0]:.3f}")
print(f" Model output for sample 0: {model.predict(X[:1])[0]:.3f}")
print(" Feature contributions:")
for feat, val in sorted(zip(feature_names, shap_values.values[0]),
key=lambda x: abs(x[1]), reverse=True):
direction = "++" if val > 0 else "--"
print(f" {direction} {feat:<12}: {val:+.4f}")
SHAP on Breast Cancer Dataset
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
import shap
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=0)
model = GradientBoostingClassifier(n_estimators=100, random_state=0).fit(X_tr, y_tr)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_te.values)
# Summary: top 5 most impactful features globally
mean_abs = np.abs(shap_values).mean(axis=0)
top5_idx = np.argsort(mean_abs)[::-1][:5]
print("Top 5 features by mean |SHAP| on test set:")
for i in top5_idx:
feat = X.columns[i]
print(f" {feat:<35} mean|SHAP|={mean_abs[i]:.4f}")
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
import shap
np.random.seed(0)
n = 1000
X = np.random.randn(n, 8)
feature_names = ["credit_score","income","debt_ratio","employment_yrs",
"loan_amount","num_accounts","late_payments","collateral"]
# Simulate default probability
prob = 1 / (1 + np.exp(-(X[:,0]*0.8 - X[:,2]*0.6 + X[:,4]*0.4)))
y = np.random.binomial(1, prob)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=0)
model = GradientBoostingClassifier(n_estimators=100, random_state=0).fit(X_tr, y_tr)
explainer = shap.TreeExplainer(model)
sv = explainer.shap_values(X_te)
print("Global importance (credit model):")
mean_abs = np.abs(sv).mean(axis=0)
for i in np.argsort(mean_abs)[::-1][:5]:
print(f" {feature_names[i]:<20}: {mean_abs[i]:.4f}")
print("\nSample 0 explanation:")
for feat, val in sorted(zip(feature_names, sv[0]), key=lambda x: abs(x[1]), reverse=True)[:3]:
print(f" {feat}: {val:+.4f}")
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
import shap
housing = fetch_california_housing(as_frame=True)
X, y = housing.data, housing.target
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
model = GradientBoostingRegressor(n_estimators=200, max_depth=4, random_state=0)
model.fit(X_tr, y_tr)
# TODO: Create SHAP TreeExplainer and compute shap_values for X_te[:200]
# TODO: Print global feature importance ranking (mean |SHAP|)
# TODO: Explain the 3 highest and 3 lowest predicted houses individually
# TODO: Check if SHAP values sum to model output - expected value (verify additivity)
Transform raw features into richer representations. PolynomialFeatures adds interactions and powers; FunctionTransformer applies any callable; custom transformers plug into Pipelines.
PolynomialFeatures and interaction terms
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
np.random.seed(42)
X, y = make_regression(n_samples=200, n_features=3, noise=10, random_state=42)
# Add a non-linear relationship
y += X[:, 0] ** 2 * 0.5
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Linear model without features
lr = LinearRegression().fit(X_train, y_train)
print(f"Linear R2: {r2_score(y_test, lr.predict(X_test)):.4f}")
# Polynomial features (degree 2 adds x^2, x1*x2, etc.)
poly = PolynomialFeatures(degree=2, include_bias=False)
X_train_p = poly.fit_transform(X_train)
X_test_p = poly.transform(X_test)
lr_poly = LinearRegression().fit(X_train_p, y_train)
print(f"Poly R2: {r2_score(y_test, lr_poly.predict(X_test_p)):.4f}")
print(f"Features: {X_train.shape[1]} -> {X_train_p.shape[1]}")
print("Feature names:", poly.get_feature_names_out(['a','b','c'])[:6])
FunctionTransformer for custom transformations
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge
import numpy as np, pandas as pd
np.random.seed(0)
n = 300
# Skewed feature β log-transform helps
X = np.column_stack([
np.random.exponential(5, n), # right-skewed
np.random.normal(0, 1, n), # already normal
np.random.uniform(1, 100, n), # uniform
])
y = 3 * np.log1p(X[:, 0]) + 2 * X[:, 1] + 0.1 * X[:, 2] + np.random.randn(n)
# FunctionTransformer: apply log1p to first column only
def log_transform(X):
Xt = X.copy()
Xt[:, 0] = np.log1p(np.abs(Xt[:, 0]))
return Xt
log_pipe = Pipeline([
('log', FunctionTransformer(log_transform, validate=True)),
('ridge', Ridge(alpha=1.0)),
])
from sklearn.model_selection import cross_val_score
scores = cross_val_score(log_pipe, X, y, cv=5, scoring='r2')
print(f"Log-transform pipeline R2: {scores.mean():.4f} Β± {scores.std():.4f}")
Custom transformer with BaseEstimator and TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
class OutlierClipper(BaseEstimator, TransformerMixin):
def __init__(self, n_std=3.0):
self.n_std = n_std
def fit(self, X, y=None):
self.mean_ = X.mean(axis=0)
self.std_ = X.std(axis=0)
return self
def transform(self, X):
lo = self.mean_ - self.n_std * self.std_
hi = self.mean_ + self.n_std * self.std_
return np.clip(X, lo, hi)
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42)
pipe = Pipeline([
('clip', OutlierClipper(n_std=3.0)),
('scale', StandardScaler()),
('clf', LogisticRegression(max_iter=1000)),
])
pipe.fit(X_tr, y_tr)
print(f"Accuracy: {accuracy_score(y_te, pipe.predict(X_te)):.4f}")
print(f"Clipping params: mean={pipe['clip'].mean_[:3].round(2)}")
ColumnTransformer for mixed-type data
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import numpy as np, pandas as pd
np.random.seed(42)
n = 400
df = pd.DataFrame({
'age': np.random.randint(18, 70, n).astype(float),
'income': np.random.exponential(40000, n),
'score': np.random.normal(600, 100, n),
'region': np.random.choice(['North','South','East','West'], n),
'product': np.random.choice(['Basic','Premium','Enterprise'], n),
'target': np.random.choice([0, 1], n, p=[0.6, 0.4]),
})
# Inject some missing values
df.loc[df.sample(30).index, 'age'] = np.nan
df.loc[df.sample(20).index, 'income'] = np.nan
num_features = ['age', 'income', 'score']
cat_features = ['region', 'product']
num_pipe = Pipeline([
('impute', SimpleImputer(strategy='median')),
('scale', StandardScaler()),
])
cat_pipe = Pipeline([
('impute', SimpleImputer(strategy='most_frequent')),
('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
preprocessor = ColumnTransformer([
('num', num_pipe, num_features),
('cat', cat_pipe, cat_features),
])
X = df.drop('target', axis=1)
y = df['target']
full_pipe = Pipeline([
('prep', preprocessor),
('clf', RandomForestClassifier(n_estimators=100, random_state=42)),
])
scores = cross_val_score(full_pipe, X, y, cv=5, scoring='accuracy')
print(f"Mixed-type pipeline accuracy: {scores.mean():.4f} Β± {scores.std():.4f}")
print(f"Input features: {X.shape[1]} | Encoded features: {preprocessor.fit_transform(X).shape[1]}")
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
import numpy as np, pandas as pd
np.random.seed(42)
n = 500
df = pd.DataFrame({
'days_since_last': np.random.exponential(30, n), # skewed
'total_orders': np.random.exponential(8, n), # skewed
'avg_order_val': np.random.normal(85, 30, n).clip(10, 250),
'support_tickets': np.random.poisson(1.5, n),
'segment': np.random.choice(['Bronze','Silver','Gold'], n),
'country': np.random.choice(['US','UK','DE','FR'], n),
'churned': np.random.choice([0,1], n, p=[0.75, 0.25]),
})
log_tf = FunctionTransformer(np.log1p)
num_pipe = Pipeline([
('log', log_tf),
('scale', StandardScaler()),
])
cat_pipe = Pipeline([
('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
prep = ColumnTransformer([
('num', num_pipe, ['days_since_last','total_orders','avg_order_val','support_tickets']),
('cat', cat_pipe, ['segment','country']),
])
pipe = Pipeline([
('prep', prep),
('clf', GradientBoostingClassifier(n_estimators=100, random_state=42)),
])
X, y = df.drop('churned', axis=1), df['churned']
scores = cross_val_score(pipe, X, y, cv=5, scoring='roc_auc')
print(f"Churn ROC-AUC: {scores.mean():.4f} Β± {scores.std():.4f}")
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import PolynomialFeatures, StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score
import numpy as np, pandas as pd
np.random.seed(42)
n = 400
df = pd.DataFrame({
'sqft': np.random.exponential(1500, n).clip(500, 5000),
'bedrooms': np.random.randint(1, 6, n).astype(float),
'bathrooms': np.random.randint(1, 4, n).astype(float),
'age': np.random.randint(0, 50, n).astype(float),
'neighborhood': np.random.choice(['A','B','C','D'], n),
'price': None,
})
df['price'] = (np.log1p(df['sqft']) * 50000 +
df['bedrooms'] * 15000 +
df['bathrooms'] * 10000 -
df['age'] * 500 +
np.random.randn(n) * 10000).clip(80000, 800000)
X = df.drop('price', axis=1)
y = np.log1p(df['price'])
num_features = ['sqft','bedrooms','bathrooms','age']
cat_features = ['neighborhood']
# TODO: build num_pipe (FunctionTransformer log1p + PolynomialFeatures + StandardScaler)
# TODO: build cat_pipe (OneHotEncoder)
# TODO: ColumnTransformer + Pipeline with Ridge
# TODO: cross_val_score with cv=5, scoring='r2'
ROC-AUC and Precision-Recall curves reveal classifier performance beyond accuracy. Use them to select thresholds, compare models, and diagnose class imbalance issues.
ROC curve and AUC with multiple models
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.25, random_state=42, stratify=cancer.target)
models = {
'Logistic Regression': LogisticRegression(max_iter=1000),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
}
for name, model in models.items():
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)[:, 1]
fpr, tpr, _ = roc_curve(y_te, proba)
auc = roc_auc_score(y_te, proba)
print(f"{name:25s} AUC = {auc:.4f}")
Precision-Recall curve and optimal threshold
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import precision_recall_curve, average_precision_score, f1_score
import numpy as np
np.random.seed(42)
X, y = make_classification(n_samples=1000, weights=[0.85, 0.15],
n_features=10, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2,
stratify=y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)[:, 1]
precision, recall, thresholds = precision_recall_curve(y_te, proba)
ap = average_precision_score(y_te, proba)
print(f"Average Precision: {ap:.4f}")
# Find threshold that maximizes F1
f1s = 2 * precision[:-1] * recall[:-1] / (precision[:-1] + recall[:-1] + 1e-9)
best_idx = np.argmax(f1s)
best_thresh = thresholds[best_idx]
print(f"Best F1 threshold: {best_thresh:.3f} | F1={f1s[best_idx]:.4f}")
print(f"At threshold {best_thresh:.3f}: precision={precision[best_idx]:.3f}, recall={recall[best_idx]:.3f}")
# Apply custom threshold
y_pred = (proba >= best_thresh).astype(int)
print(f"F1 with custom threshold: {f1_score(y_te, y_pred):.4f}")
Confusion matrix, classification report, calibration
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (confusion_matrix, classification_report,
ConfusionMatrixDisplay)
import numpy as np
digits = load_digits()
X_tr, X_te, y_tr, y_te = train_test_split(
digits.data, digits.target, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)
y_pred = model.predict(X_te)
print("Classification Report:")
print(classification_report(y_te, y_pred, digits=3))
cm = confusion_matrix(y_te, y_pred)
# Find the most confused classes
np.fill_diagonal(cm, 0) # zero out correct predictions
i, j = np.unravel_index(cm.argmax(), cm.shape)
print(f"Most confused: digit {i} predicted as {j} ({cm[i,j]} times)")
Multi-class ROC with one-vs-rest
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_auc_score
import numpy as np
iris = load_iris()
X_tr, X_te, y_tr, y_te = train_test_split(
iris.data, iris.target, test_size=0.3, random_state=42, stratify=iris.target)
model = LogisticRegression(multi_class='ovr', max_iter=200)
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)
# One-vs-Rest AUC for each class
y_bin = label_binarize(y_te, classes=[0, 1, 2])
for i, name in enumerate(iris.target_names):
auc = roc_auc_score(y_bin[:, i], proba[:, i])
print(f" {name:12s} AUC = {auc:.4f}")
macro_auc = roc_auc_score(y_bin, proba, average='macro')
micro_auc = roc_auc_score(y_bin, proba, average='micro')
print(f"Macro AUC: {macro_auc:.4f} | Micro AUC: {micro_auc:.4f}")
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (roc_auc_score, average_precision_score,
precision_recall_curve, confusion_matrix)
import numpy as np
np.random.seed(42)
X, y = make_classification(
n_samples=5000, n_features=15, n_informative=8,
weights=[0.97, 0.03], flip_y=0.01, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42)
gbc = GradientBoostingClassifier(n_estimators=200, max_depth=4,
learning_rate=0.05, random_state=42)
gbc.fit(X_tr, y_tr)
proba = gbc.predict_proba(X_te)[:, 1]
print(f"ROC-AUC: {roc_auc_score(y_te, proba):.4f}")
print(f"Avg Precision: {average_precision_score(y_te, proba):.4f}")
# Find threshold for recall >= 0.90
precision, recall, thresholds = precision_recall_curve(y_te, proba)
high_recall_mask = recall[:-1] >= 0.90
if high_recall_mask.any():
best_prec = precision[:-1][high_recall_mask].max()
best_thr = thresholds[high_recall_mask][precision[:-1][high_recall_mask].argmax()]
print(f"At recall>=90%: threshold={best_thr:.3f}, precision={best_prec:.3f}")
y_pred = (proba >= best_thr).astype(int)
cm = confusion_matrix(y_te, y_pred)
print(f"Confusion matrix:\n{cm}")
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import precision_recall_curve, confusion_matrix
import numpy as np
np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=10, weights=[0.9,0.1],
random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25,
stratify=y, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)[:, 1]
precision, recall, thresholds = precision_recall_curve(y_te, proba)
# TODO: print precision/recall at 10 evenly-spaced thresholds
# TODO: find threshold giving recall >= 0.85 with max precision
# TODO: print confusion matrix at that threshold
Gradient Boosting sequentially trains shallow trees, each correcting prior errors. sklearn offers GradientBoostingClassifier and the faster HistGradientBoosting (native categorical support, faster on large data).
GradientBoostingClassifier with learning rate tuning
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score
import numpy as np
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target)
# Compare learning rates (lower rate = more trees needed but often better)
for lr in [0.3, 0.1, 0.05]:
gbc = GradientBoostingClassifier(
n_estimators=200, learning_rate=lr,
max_depth=3, subsample=0.8,
random_state=42
)
gbc.fit(X_tr, y_tr)
auc = roc_auc_score(y_te, gbc.predict_proba(X_te)[:, 1])
print(f"lr={lr:.2f} n_est=200 AUC={auc:.4f}")
# Best model with early stopping via staged_predict
best = GradientBoostingClassifier(n_estimators=300, learning_rate=0.05,
max_depth=3, subsample=0.8, random_state=42)
best.fit(X_tr, y_tr)
staged_aucs = [roc_auc_score(y_te, p[:, 1])
for p in best.staged_predict_proba(X_te)]
best_n = int(np.argmax(staged_aucs)) + 1
print(f"Best n_estimators via staged: {best_n} AUC={staged_aucs[best_n-1]:.4f}")
HistGradientBoostingClassifier (faster, supports NaN)
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score
import numpy as np
np.random.seed(42)
X, y = make_classification(n_samples=10000, n_features=20,
n_informative=12, random_state=42)
# Inject missing values (HistGB handles NaN natively!)
mask = np.random.random(X.shape) < 0.05
X[mask] = np.nan
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
hgb = HistGradientBoostingClassifier(
max_iter=300,
learning_rate=0.05,
max_depth=5,
min_samples_leaf=20,
early_stopping=True, # built-in early stopping
validation_fraction=0.1,
n_iter_no_change=20,
random_state=42,
)
hgb.fit(X_tr, y_tr)
auc = roc_auc_score(y_te, hgb.predict_proba(X_te)[:, 1])
print(f"HistGB AUC: {auc:.4f}")
print(f"Iterations used: {hgb.n_iter_} (early stopped from max 300)")
print(f"NaN features handled: {mask.sum()} missing values")
Feature importances and partial dependence
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.inspection import partial_dependence
import numpy as np
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42)
gbc = GradientBoostingClassifier(n_estimators=200, max_depth=3,
learning_rate=0.1, random_state=42)
gbc.fit(X_tr, y_tr)
# Feature importances (mean impurity decrease)
importances = gbc.feature_importances_
top5_idx = np.argsort(importances)[-5:][::-1]
print("Top 5 features by importance:")
for idx in top5_idx:
print(f" [{idx:2d}] {cancer.feature_names[idx]:30s} {importances[idx]:.4f}")
# Partial dependence for top feature
top_feat = top5_idx[0]
pdp = partial_dependence(gbc, X_tr, features=[top_feat], kind='average')
print(f"
Partial dependence for '{cancer.feature_names[top_feat]}':")
vals = pdp['grid_values'][0]
avgs = pdp['average'][0]
for v, a in zip(vals[::5], avgs[::5]):
print(f" x={v:.2f} -> mean prediction={a:.4f}")
GBM vs RF vs HistGB comparison
from sklearn.ensemble import (GradientBoostingClassifier,
RandomForestClassifier,
HistGradientBoostingClassifier)
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
import numpy as np, time
np.random.seed(42)
X, y = make_classification(n_samples=5000, n_features=20,
n_informative=10, random_state=42)
models = {
'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42),
'GradientBoosting': GradientBoostingClassifier(n_estimators=200, random_state=42),
'HistGradientBoosting': HistGradientBoostingClassifier(max_iter=200, random_state=42),
}
print(f"{'Model':25s} {'ROC-AUC':>8s} {'Time(s)':>8s}")
print('-' * 48)
for name, model in models.items():
t0 = time.time()
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
elapsed = time.time() - t0
print(f"{name:25s} {scores.mean():.4f} {elapsed:.2f}s")
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.metrics import roc_auc_score, classification_report
import numpy as np, pandas as pd
np.random.seed(42)
n = 3000
df = pd.DataFrame({
'orders_6m': np.random.poisson(3, n),
'avg_value': np.random.exponential(75, n),
'days_active': np.random.randint(1, 365, n),
'support_calls':np.random.poisson(0.8, n),
'email_opens': np.random.binomial(20, 0.3, n),
'category_pref':np.random.choice([0, 1, 2, 3], n), # categorical
})
df['high_ltv'] = ((df['orders_6m'] > 4) &
(df['avg_value'] > 80) &
(df['days_active'] > 180)).astype(int)
X = df.drop('high_ltv', axis=1).values
y = df['high_ltv'].values
X_tr, X_te, y_tr, y_te = train_test_split(X, y, stratify=y,
test_size=0.2, random_state=42)
# HistGB natively handles integers as potential categoricals
model = HistGradientBoostingClassifier(
max_iter=500, learning_rate=0.05,
max_depth=5, early_stopping=True,
n_iter_no_change=25, random_state=42,
)
model.fit(X_tr, y_tr)
proba = model.predict_proba(X_te)[:, 1]
print(f"ROC-AUC: {roc_auc_score(y_te, proba):.4f}")
print(f"Iterations: {model.n_iter_}")
print(classification_report(y_te, model.predict(X_te)))
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import numpy as np
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42)
param_grid = {
'n_estimators': [100, 200],
'learning_rate': [0.05, 0.1],
'max_depth': [2, 3, 4],
}
# TODO: create GridSearchCV with GradientBoostingClassifier, 5-fold, roc_auc scoring
# TODO: fit on X_tr, y_tr
# TODO: print best_params_, best_score_
# TODO: print top 5 parameter combinations from cv_results_
Ridge (L2), Lasso (L1), and ElasticNet combine least squares with regularization penalties. Lasso performs feature selection by zeroing coefficients; Ridge shrinks them. Use CV variants to auto-select alpha.
Ridge vs Lasso vs ElasticNet comparison
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.pipeline import Pipeline
import numpy as np
np.random.seed(42)
X, y, coef = make_regression(n_samples=200, n_features=50, n_informative=10,
noise=10, coef=True, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25, random_state=42)
models = {
'Ridge': Ridge(alpha=1.0),
'Lasso': Lasso(alpha=1.0, max_iter=5000),
'ElasticNet':ElasticNet(alpha=1.0, l1_ratio=0.5, max_iter=5000),
}
print(f"{'Model':12s} {'R2':>7s} {'RMSE':>8s} {'Non-zero coefs':>14s}")
print('-' * 50)
for name, model in models.items():
pipe = Pipeline([('scale', StandardScaler()), ('reg', model)])
pipe.fit(X_tr, y_tr)
y_pred = pipe.predict(X_te)
r2 = r2_score(y_te, y_pred)
rmse = mean_squared_error(y_te, y_pred, squared=False)
nz = (pipe['reg'].coef_ != 0).sum()
print(f"{name:12s} {r2:7.4f} {rmse:8.2f} {nz:>14d}")
RidgeCV and LassoCV for automatic alpha selection
from sklearn.linear_model import RidgeCV, LassoCV, ElasticNetCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
np.random.seed(42)
X, y = make_regression(n_samples=300, n_features=40, n_informative=12,
noise=15, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
alphas = np.logspace(-3, 3, 50)
for ModelCV, name in [(RidgeCV, 'RidgeCV'), (LassoCV, 'LassoCV')]:
if name == 'LassoCV':
pipe = Pipeline([('s', StandardScaler()),
('m', ModelCV(alphas=alphas, cv=5, max_iter=5000))])
else:
pipe = Pipeline([('s', StandardScaler()),
('m', ModelCV(alphas=alphas, cv=5))])
pipe.fit(X_tr, y_tr)
model = pipe['m']
r2 = r2_score(y_te, pipe.predict(X_te))
nz = (model.coef_ != 0).sum()
print(f"{name:10s} best alpha={model.alpha_:.4f} R2={r2:.4f} non-zero={nz}")
Regularization path β how coefficients shrink
from sklearn.linear_model import lasso_path
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
import numpy as np
diabetes = load_diabetes()
X = StandardScaler().fit_transform(diabetes.data)
y = diabetes.target
# Compute Lasso path
alphas, coefs, _ = lasso_path(X, y, eps=1e-3, n_alphas=100)
# Show which features survive at each regularization level
feature_names = diabetes.feature_names
checkpoints = [0, 25, 50, 75, 99]
print(f"{'Alpha':>10s} {'Active features'}")
print('-' * 60)
for i in checkpoints:
active = [feature_names[j] for j in range(len(feature_names))
if abs(coefs[j, i]) > 1e-4]
print(f"{alphas[i]:10.4f} {active}")
# Feature that persists longest (most important)
last_active = np.argmax([(coefs[j] != 0).sum() for j in range(len(feature_names))])
print(f"
Most robust feature: {feature_names[last_active]}")
ElasticNet for correlated features
from sklearn.linear_model import ElasticNetCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
np.random.seed(42)
n, p = 200, 20
# Correlated features: add collinear pairs
X = np.random.randn(n, p // 2)
X = np.hstack([X, X + np.random.randn(n, p // 2) * 0.1]) # pairs of correlated features
true_coef = np.array([3, -2, 1.5, 0, 0] * (p // 10) + [0] * (p - p // 10 * 5))[:p]
y = X @ true_coef + np.random.randn(n) * 2
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# Elastic net handles correlated features better than Lasso
# (Lasso tends to pick one from each correlated group arbitrarily)
l1_ratios = [0.1, 0.5, 0.9] # 0=Ridge, 1=Lasso
enet = ElasticNetCV(l1_ratio=l1_ratios, alphas=np.logspace(-3, 1, 30),
cv=5, max_iter=5000)
pipe = Pipeline([('scale', StandardScaler()), ('enet', enet)])
pipe.fit(X_tr, y_tr)
r2 = r2_score(y_te, pipe.predict(X_te))
print(f"ElasticNetCV R2: {r2:.4f}")
print(f"Best alpha: {pipe['enet'].alpha_:.4f}")
print(f"Best l1_ratio: {pipe['enet'].l1_ratio_:.2f}")
print(f"Non-zero coefs: {(pipe['enet'].coef_ != 0).sum()} / {p}")
from sklearn.linear_model import LassoCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np, pandas as pd
np.random.seed(42)
n_samples = 300
n_snps = 500 # many features (SNPs), few truly predictive
# Simulate SNP data (binary 0/1/2 alleles)
X = np.random.choice([0, 1, 2], size=(n_samples, n_snps))
# Only 10 SNPs truly predict the outcome
true_snps = np.random.choice(n_snps, 10, replace=False)
true_coef = np.zeros(n_snps)
true_coef[true_snps] = np.random.randn(10) * 2
y = X @ true_coef + np.random.randn(n_samples) * 3
feature_names = [f'SNP_{i:04d}' for i in range(n_snps)]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
pipe = Pipeline([
('scale', StandardScaler()),
('lasso', LassoCV(cv=5, max_iter=10000, n_alphas=50)),
])
pipe.fit(X_tr, y_tr)
lasso = pipe['lasso']
selected = np.where(lasso.coef_ != 0)[0]
true_found = len(set(selected) & set(true_snps))
print(f"R2: {r2_score(y_te, pipe.predict(X_te)):.4f}")
print(f"Best alpha: {lasso.alpha_:.4f}")
print(f"Selected {len(selected)} SNPs | True SNPs recovered: {true_found}/10")
print("Top selected:", [feature_names[i] for i in selected[:5]])
from sklearn.datasets import load_diabetes
from sklearn.linear_model import Lasso, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import numpy as np
diabetes = load_diabetes()
X, y = diabetes.data, diabetes.target
names = diabetes.feature_names
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
alphas = [0.01, 0.1, 1, 10, 100]
print("=== Lasso ===")
for alpha in alphas:
# TODO: Pipeline StandardScaler + Lasso(alpha, max_iter=5000)
# TODO: fit, predict, r2, non-zero coefs, surviving feature names
pass
print("
=== Ridge ===")
for alpha in alphas:
# TODO: same for Ridge
pass
Save trained models with joblib (recommended) or pickle. Version models with metadata, validate loaded models before serving, and use pipelines to ensure preprocessing is saved too.
joblib save/load and pipeline persistence
import joblib, os, tempfile
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
iris = load_iris()
X_tr, X_te, y_tr, y_te = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42)
pipe = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=100, random_state=42)),
])
pipe.fit(X_tr, y_tr)
acc_before = accuracy_score(y_te, pipe.predict(X_te))
print(f"Accuracy before save: {acc_before:.4f}")
# Save with joblib
model_path = os.path.join(tempfile.gettempdir(), 'iris_pipeline.joblib')
joblib.dump(pipe, model_path, compress=3) # compress=3 reduces file size
size_kb = os.path.getsize(model_path) / 1024
print(f"Saved to: {model_path} ({size_kb:.1f} KB)")
# Load and verify
loaded = joblib.load(model_path)
acc_after = accuracy_score(y_te, loaded.predict(X_te))
print(f"Accuracy after load: {acc_after:.4f}")
print(f"Models identical: {acc_before == acc_after}")
print(f"Loaded type: {type(loaded)}")
Model versioning with metadata
import joblib, json, os, tempfile, hashlib
from datetime import datetime
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
cancer = load_breast_cancer()
X_tr, X_te, y_tr, y_te = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42)
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)
auc = roc_auc_score(y_te, model.predict_proba(X_te)[:, 1])
# Build a model bundle with metadata
bundle = {
'model': model,
'metadata': {
'version': 'v1.2.0',
'trained_at': datetime.now().isoformat(),
'sklearn_version': __import__('sklearn').__version__,
'python_version': __import__('sys').version.split()[0],
'train_samples': len(X_tr),
'features': list(cancer.feature_names),
'target': 'malignant',
'metrics': {
'roc_auc_test': round(auc, 4),
},
},
}
path = os.path.join(tempfile.gettempdir(), 'model_v120.joblib')
joblib.dump(bundle, path)
# Reload and validate
loaded = joblib.load(path)
meta = loaded['metadata']
print(f"Version: {meta['version']}")
print(f"Trained: {meta['trained_at']}")
print(f"AUC: {meta['metrics']['roc_auc_test']}")
print(f"Features: {len(meta['features'])}")
print(f"Sklearn: {meta['sklearn_version']}")
Pickle protocol and cross-version safety checks
import pickle, os, tempfile, warnings
import sklearn
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
wine = load_wine()
X_tr, X_te, y_tr, y_te = train_test_split(
wine.data, wine.target, test_size=0.2, random_state=42)
pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC(probability=True))])
pipe.fit(X_tr, y_tr)
path = os.path.join(tempfile.gettempdir(), 'wine_svc.pkl')
# Save with highest protocol (fastest, most compact)
with open(path, 'wb') as f:
pickle.dump({'model': pipe, 'sklearn_version': sklearn.__version__},
f, protocol=pickle.HIGHEST_PROTOCOL)
size_kb = os.path.getsize(path) / 1024
print(f"Saved ({size_kb:.1f} KB, protocol {pickle.HIGHEST_PROTOCOL})")
# Load with version check
with open(path, 'rb') as f:
data = pickle.load(f)
saved_ver = data['sklearn_version']
curr_ver = sklearn.__version__
if saved_ver != curr_ver:
warnings.warn(f"sklearn version mismatch: saved={saved_ver}, current={curr_ver}")
else:
print(f"Version OK: {curr_ver}")
acc = accuracy_score(y_te, data['model'].predict(X_te))
print(f"Accuracy after reload: {acc:.4f}")
import joblib, os, tempfile
from datetime import datetime
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
# --- Training phase ---
iris = load_iris()
X_tr, X_te, y_tr, y_te = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42)
pipe = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=100, random_state=42)),
])
pipe.fit(X_tr, y_tr)
model_dir = tempfile.mkdtemp()
path = os.path.join(model_dir, 'model_v1.joblib')
joblib.dump({'model': pipe,
'version': 'v1.0',
'features': list(iris.feature_names),
'classes': list(iris.target_names),
'trained_at': datetime.now().isoformat(),
'test_accuracy': accuracy_score(y_te, pipe.predict(X_te))},
path)
print(f"Model saved: {path}")
# --- Serving phase ---
def load_model(path):
bundle = joblib.load(path)
print(f"Loaded {bundle['version']} | acc={bundle['test_accuracy']:.4f}")
print(f"Features: {bundle['features']}")
return bundle
def predict(bundle, X):
model = bundle['model']
classes = bundle['classes']
preds = model.predict(X)
probas = model.predict_proba(X)
return [{'class': classes[p], 'confidence': float(probas[i].max())}
for i, p in enumerate(preds)]
bundle = load_model(path)
samples = iris.data[:3]
results = predict(bundle, samples)
for i, r in enumerate(results):
print(f"Sample {i+1}: {r['class']} (conf={r['confidence']:.3f})")
import joblib, os, time, tempfile
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from datetime import datetime
wine = load_wine()
X_tr, X_te, y_tr, y_te = train_test_split(
wine.data, wine.target, test_size=0.2, random_state=42)
models_dir = os.path.join(tempfile.gettempdir(), 'models')
os.makedirs(models_dir, exist_ok=True)
model_configs = {
'random_forest': RandomForestClassifier(n_estimators=100, random_state=42),
'gradient_boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
}
for name, model in model_configs.items():
# TODO: time the training, compute accuracy
# TODO: save bundle with metadata (name, accuracy, params, trained_at)
# TODO: save to models_dir/{name}.joblib
pass
def compare_models(directory):
# TODO: load all .joblib files, print comparison table
pass
compare_models(models_dir)
sklearn's TfidfVectorizer and CountVectorizer convert text to numerical features. Combine them in a Pipeline with classifiers to build spam detectors, sentiment analyzers, and topic classifiers.
TF-IDF + Logistic Regression for text classification
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report
import numpy as np
# Simulate sentiment dataset
positive = [
"I love this product! It works great.",
"Excellent quality and fast delivery.",
"Best purchase ever, highly recommend.",
"Amazing service, very satisfied.",
"Outstanding performance, exceeded expectations.",
"Wonderful experience, will buy again.",
"Perfect product, exactly as described.",
"Fantastic value for money.",
] * 15
negative = [
"Terrible product, broke after one day.",
"Very disappointed, not as advertised.",
"Waste of money, poor quality.",
"Awful experience, do not buy.",
"Horrible customer service.",
"Complete junk, returned immediately.",
"Worst purchase ever made.",
"Very poor quality, falls apart.",
] * 15
texts = positive + negative
labels = [1] * len(positive) + [0] * len(negative)
X_tr, X_te, y_tr, y_te = train_test_split(texts, labels, test_size=0.2,
random_state=42, stratify=labels)
pipe = Pipeline([
('tfidf', TfidfVectorizer(ngram_range=(1,2), max_features=5000,
sublinear_tf=True, min_df=2)),
('clf', LogisticRegression(C=1.0, max_iter=1000)),
])
pipe.fit(X_tr, y_tr)
print(classification_report(y_te, pipe.predict(X_te),
target_names=['Negative','Positive']))
# Feature importance: top words per class
vocab = pipe['tfidf'].vocabulary_
coef = pipe['clf'].coef_[0]
top_pos = sorted(vocab, key=lambda w: -coef[vocab[w]])[:5]
top_neg = sorted(vocab, key=lambda w: coef[vocab[w]])[:5]
print("Top positive words:", top_pos)
print("Top negative words:", top_neg)
CountVectorizer + Multinomial Naive Bayes (fast baseline)
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
from sklearn.datasets import fetch_20newsgroups
import numpy as np
# Load a subset of 20 newsgroups (4 categories)
categories = ['sci.space', 'rec.sport.hockey', 'talk.politics.guns', 'comp.graphics']
data = fetch_20newsgroups(subset='all', categories=categories,
remove=('headers', 'footers', 'quotes'),
random_state=42)
pipe = Pipeline([
('cv', CountVectorizer(stop_words='english', max_features=20000, min_df=2)),
('tfidf', TfidfTransformer(sublinear_tf=True)),
('nb', MultinomialNB(alpha=0.1)),
])
scores = cross_val_score(pipe, data.data, data.target, cv=5, scoring='accuracy')
print(f"Multinomial NB accuracy: {scores.mean():.4f} Β± {scores.std():.4f}")
print(f"Categories: {categories}")
print(f"Dataset: {len(data.data)} documents")
# Quick prediction demo
pipe.fit(data.data, data.target)
test_texts = [
"NASA launched a new rocket to Mars last week",
"The hockey team won the championship finals",
]
preds = pipe.predict(test_texts)
for text, pred in zip(test_texts, preds):
print(f" '{text[:45]}...' -> {data.target_names[pred]}")
TF-IDF with GridSearch for hyperparameter tuning
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import fetch_20newsgroups
import numpy as np
categories = ['sci.med', 'sci.space', 'comp.graphics']
data = fetch_20newsgroups(subset='train', categories=categories,
remove=('headers','footers','quotes'))
pipe = Pipeline([
('tfidf', TfidfVectorizer(stop_words='english')),
('svc', LinearSVC(max_iter=2000)),
])
param_grid = {
'tfidf__max_features': [5000, 20000],
'tfidf__ngram_range': [(1,1), (1,2)],
'tfidf__sublinear_tf': [True, False],
'svc__C': [0.1, 1.0, 10.0],
}
gs = GridSearchCV(pipe, param_grid, cv=3, scoring='accuracy', n_jobs=-1, verbose=0)
gs.fit(data.data, data.target)
print(f"Best accuracy: {gs.best_score_:.4f}")
print("Best params:")
for k, v in gs.best_params_.items():
print(f" {k}: {v}")
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
np.random.seed(42)
spam_keywords = ['buy now', 'click here', 'free offer', 'earn money fast',
'limited time', 'act now', 'winner', 'prize', 'congratulations',
'no cost', 'guaranteed', 'risk free', 'million dollars']
ham_phrases = ['meeting tomorrow', 'project update', 'please review',
'attached report', 'schedule call', 'thanks for your time',
'budget review', 'team standup', 'quarterly results']
def gen_email(is_spam, n):
emails = []
for _ in range(n):
if is_spam:
base = np.random.choice(spam_keywords, np.random.randint(3,7), replace=True)
filler = ['you', 'the', 'for', 'is', 'a', 'to', 'in']
text = ' '.join(list(base) + list(np.random.choice(filler, 10)))
else:
base = np.random.choice(ham_phrases, np.random.randint(2,5), replace=True)
text = ' '.join(base)
emails.append(text)
return emails
spam = gen_email(True, 500)
ham = gen_email(False, 1000)
texts = spam + ham
labels = [1]*len(spam) + [0]*len(ham)
X_tr, X_te, y_tr, y_te = train_test_split(texts, labels, test_size=0.2,
stratify=labels, random_state=42)
pipe = Pipeline([
('tfidf', TfidfVectorizer(ngram_range=(1,2), sublinear_tf=True,
max_features=10000, min_df=1)),
('clf', LogisticRegression(C=5.0, max_iter=1000, class_weight='balanced')),
])
pipe.fit(X_tr, y_tr)
y_pred = pipe.predict(X_te)
print(classification_report(y_te, y_pred, target_names=['Ham','Spam']))
print("Confusion matrix:")
print(confusion_matrix(y_te, y_pred))
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import numpy as np
categories = ['sci.space', 'rec.sport.hockey', 'comp.graphics',
'talk.politics.guns', 'sci.med']
data = fetch_20newsgroups(subset='all', categories=categories,
remove=('headers','footers','quotes'), random_state=42)
pipe = Pipeline([
('tfidf', TfidfVectorizer(max_features=15000, ngram_range=(1,2),
sublinear_tf=True, stop_words='english')),
('svc', LinearSVC(C=1.0, max_iter=2000)),
])
# TODO: cross_val_score with 5-fold accuracy
# TODO: fit on all data
# TODO: for each class, find top-5 words (largest coef_ values)
Anomaly detection finds outliers without labeled examples. IsolationForest uses random splits; LocalOutlierFactor compares density to neighbors; OneClassSVM learns a boundary around normal data.
IsolationForest for fraud detection
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(42)
# Simulate normal transactions
n_normal = 1000
n_fraud = 30
normal = np.random.multivariate_normal(
mean=[100, 50, 10],
cov=[[400, 50, 5], [50, 100, 2], [5, 2, 4]],
size=n_normal
)
fraud = np.random.multivariate_normal(
mean=[500, 200, 100],
cov=[[10000, 0, 0], [0, 5000, 0], [0, 0, 1000]],
size=n_fraud
)
X = np.vstack([normal, fraud])
true_labels = np.array([1]*n_normal + [-1]*n_fraud) # 1=normal, -1=anomaly
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
iso = IsolationForest(n_estimators=200, contamination=n_fraud/(n_normal+n_fraud),
random_state=42)
preds = iso.fit_predict(X_scaled) # 1=normal, -1=anomaly
scores = iso.score_samples(X_scaled)
# Evaluate
tp = ((preds == -1) & (true_labels == -1)).sum()
fp = ((preds == -1) & (true_labels == 1)).sum()
fn = ((preds == 1) & (true_labels == -1)).sum()
precision = tp / (tp + fp) if (tp+fp) > 0 else 0
recall = tp / (tp + fn) if (tp+fn) > 0 else 0
print(f"IsolationForest: TP={tp}, FP={fp}, FN={fn}")
print(f"Precision={precision:.3f} Recall={recall:.3f}")
print(f"Anomaly score range: [{scores.min():.3f}, {scores.max():.3f}]")
LocalOutlierFactor and OneClassSVM comparison
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(0)
# 2D dataset for intuition
n_normal, n_anom = 300, 20
X_normal = np.random.randn(n_normal, 2)
X_anom = np.random.uniform(low=-5, high=5, size=(n_anom, 2))
X = np.vstack([X_normal, X_anom])
y_true = np.array([1]*n_normal + [-1]*n_anom)
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
contamination = n_anom / len(X)
detectors = {
'IsolationForest': IsolationForest(contamination=contamination, random_state=0),
'LocalOutlierFactor': LocalOutlierFactor(contamination=contamination, n_neighbors=20),
'OneClassSVM': OneClassSVM(nu=contamination, kernel='rbf', gamma='scale'),
}
print(f"{'Detector':22s} {'TP':>4s} {'FP':>4s} {'FN':>4s} {'Precision':>10s} {'Recall':>7s}")
for name, det in detectors.items():
if name == 'LocalOutlierFactor':
preds = det.fit_predict(X_s)
else:
det.fit(X_s[y_true == 1]) # train on normal only for OC-SVM/IF
preds = det.predict(X_s)
tp = ((preds==-1)&(y_true==-1)).sum()
fp = ((preds==-1)&(y_true== 1)).sum()
fn = ((preds== 1)&(y_true==-1)).sum()
prec = tp/(tp+fp) if (tp+fp) else 0
rec = tp/(tp+fn) if (tp+fn) else 0
print(f"{name:22s} {tp:4d} {fp:4d} {fn:4d} {prec:10.3f} {rec:7.3f}")
Anomaly scores and threshold tuning
from sklearn.ensemble import IsolationForest
from sklearn.metrics import precision_recall_curve, roc_auc_score
import numpy as np
np.random.seed(42)
# Multi-modal normal distribution + sparse anomalies
n_normal = 2000
n_anom = 50
X_normal = np.vstack([
np.random.multivariate_normal([0, 0], [[1,0.5],[0.5,1]], n_normal//2),
np.random.multivariate_normal([5, 5], [[1,-0.3],[-0.3,1]], n_normal//2),
])
X_anom = np.random.uniform(-8, 12, (n_anom, 2))
X = np.vstack([X_normal, X_anom])
y_true = np.array([0]*n_normal + [1]*n_anom) # 1 = anomaly
iso = IsolationForest(n_estimators=300, contamination='auto', random_state=42)
iso.fit(X_normal) # fit on normal data only
scores = -iso.score_samples(X) # negate: higher = more anomalous
auc = roc_auc_score(y_true, scores)
print(f"ROC-AUC: {auc:.4f}")
# Find threshold at precision >= 0.8
precision, recall, thresholds = precision_recall_curve(y_true, scores)
high_prec = precision[:-1] >= 0.80
if high_prec.any():
best_recall = recall[:-1][high_prec].max()
best_thr = thresholds[high_prec][recall[:-1][high_prec].argmax()]
print(f"At precision>=80%: threshold={best_thr:.4f}, recall={best_recall:.3f}")
flagged = (scores >= best_thr).sum()
print(f"Flagged {flagged} anomalies ({flagged/len(X):.1%} of data)")
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np, pandas as pd
np.random.seed(42)
n = 5000
# Simulate sensor readings (temperature, pressure, vibration)
normal = pd.DataFrame({
'temperature': np.random.normal(70, 5, n),
'pressure': np.random.normal(100, 3, n),
'vibration': np.random.normal(0.5, 0.1, n),
'flow_rate': np.random.normal(50, 4, n),
})
# Inject anomalies: sudden spikes
n_anom = 50
anom_idx = np.random.choice(n, n_anom, replace=False)
anomalies = normal.copy()
anomalies.loc[anom_idx, 'temperature'] += np.random.uniform(30, 60, n_anom)
anomalies.loc[anom_idx, 'vibration'] += np.random.uniform(1, 3, n_anom)
scaler = StandardScaler()
X_normal = scaler.fit_transform(normal.values)
X_all = scaler.transform(anomalies.values)
# Train only on normal data
iso = IsolationForest(n_estimators=300, contamination=n_anom/n, random_state=42)
iso.fit(X_normal)
scores = -iso.score_samples(X_all)
# Dynamic threshold: mean + 3*std of normal scores
normal_scores = -iso.score_samples(X_normal)
threshold = normal_scores.mean() + 3 * normal_scores.std()
alerts = np.where(scores > threshold)[0]
true_pos = len(set(alerts) & set(anom_idx))
print(f"Threshold: {threshold:.4f}")
print(f"Alerts: {len(alerts)} | True anomalies: {n_anom}")
print(f"True Positives: {true_pos} | Recall: {true_pos/n_anom:.2%}")
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(42)
n_normal, n_intrusion = 800, 20
# TODO: generate normal connections (packet_size, duration, port)
# TODO: generate intrusion attempts
# TODO: stack into X, create y_true labels
# TODO: for each detector, compute TP, FP, FN, precision, recall
# (fit on normal-only data for IsolationForest and OneClassSVM)
# LocalOutlierFactor.fit_predict on full X
Beyond KMeans: AgglomerativeClustering builds hierarchies (no k needed upfront); GaussianMixture models soft assignments with probabilistic clusters; silhouette and Calinski-Harabasz scores evaluate cluster quality.
AgglomerativeClustering with linkage strategies
from sklearn.cluster import AgglomerativeClustering
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score, silhouette_score
import numpy as np
np.random.seed(42)
X, y_true = make_blobs(n_samples=300, centers=4, cluster_std=0.8, random_state=42)
X = StandardScaler().fit_transform(X)
linkages = ['ward', 'complete', 'average', 'single']
print(f"{'Linkage':10s} {'ARI':>6s} {'Silhouette':>10s}")
print('-' * 32)
for link in linkages:
agg = AgglomerativeClustering(n_clusters=4, linkage=link)
labels = agg.fit_predict(X)
ari = adjusted_rand_score(y_true, labels)
sil = silhouette_score(X, labels)
print(f"{link:10s} {ari:6.4f} {sil:10.4f}")
# Connectivity constraints (useful for spatial data)
from sklearn.neighbors import kneighbors_graph
connectivity = kneighbors_graph(X, n_neighbors=10, include_self=False)
agg_conn = AgglomerativeClustering(n_clusters=4, linkage='ward',
connectivity=connectivity)
labels_conn = agg_conn.fit_predict(X)
print(f"
With connectivity: ARI={adjusted_rand_score(y_true, labels_conn):.4f}")
Gaussian Mixture Models β soft assignments
from sklearn.mixture import GaussianMixture
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score
import numpy as np
np.random.seed(42)
X, y_true = make_blobs(n_samples=400, centers=3, cluster_std=[1.0, 0.5, 1.5],
random_state=42)
X = StandardScaler().fit_transform(X)
# Compare covariance types
for cov_type in ['full', 'tied', 'diag', 'spherical']:
gm = GaussianMixture(n_components=3, covariance_type=cov_type,
random_state=42, n_init=5)
gm.fit(X)
labels = gm.predict(X)
ari = adjusted_rand_score(y_true, labels)
bic = gm.bic(X)
aic = gm.aic(X)
print(f"{cov_type:12s} ARI={ari:.4f} BIC={bic:.1f} AIC={aic:.1f}")
# Soft assignments (probabilities)
gm_best = GaussianMixture(n_components=3, covariance_type='full',
random_state=42, n_init=5)
gm_best.fit(X)
proba = gm_best.predict_proba(X)
print(f"
Sample soft assignments (first 5):")
for row in proba[:5]:
print(f" {row.round(3)}") # probability of each cluster
BIC/AIC model selection and silhouette analysis
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
import numpy as np
np.random.seed(42)
X, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.9, random_state=42)
X = StandardScaler().fit_transform(X)
# Select k via BIC (GaussianMixture) and silhouette (KMeans)
print("=== GaussianMixture BIC/AIC ===")
print(f"{'k':>3s} {'BIC':>10s} {'AIC':>10s}")
for k in range(2, 8):
gm = GaussianMixture(n_components=k, n_init=5, random_state=42)
gm.fit(X)
print(f"{k:3d} {gm.bic(X):10.1f} {gm.aic(X):10.1f}")
print("
=== KMeans Cluster Quality ===")
print(f"{'k':>3s} {'Silhouette':>11s} {'Calinski-H':>12s} {'Davies-B':>10s}")
for k in range(2, 8):
km = KMeans(n_clusters=k, n_init=10, random_state=42)
labels = km.fit_predict(X)
sil = silhouette_score(X, labels)
ch = calinski_harabasz_score(X, labels)
db = davies_bouldin_score(X, labels)
print(f"{k:3d} {sil:11.4f} {ch:12.1f} {db:10.4f}")
HDBSCAN-style with DBSCAN density-based clustering
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_moons, make_blobs
from sklearn.metrics import adjusted_rand_score
import numpy as np
np.random.seed(42)
# Non-convex clusters (DBSCAN handles; KMeans fails)
X_moons, y_moons = make_moons(n_samples=300, noise=0.08, random_state=42)
X_moons = StandardScaler().fit_transform(X_moons)
# Tune eps
print("=== DBSCAN on Moons (non-convex) ===")
for eps in [0.1, 0.2, 0.3, 0.5]:
db = DBSCAN(eps=eps, min_samples=5)
labels = db.fit_predict(X_moons)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
if n_clusters > 0:
try:
ari = adjusted_rand_score(y_moons, labels)
except Exception:
ari = 0
print(f" eps={eps:.1f} clusters={n_clusters} noise={n_noise} ARI={ari:.4f}")
# DBSCAN is also good for identifying outliers
X_blobs, y_blobs = make_blobs(n_samples=200, centers=3, random_state=42)
X_blobs = StandardScaler().fit_transform(X_blobs)
db_best = DBSCAN(eps=0.5, min_samples=8)
labels = db_best.fit_predict(X_blobs)
print(f"
Blobs: {(labels==-1).sum()} noise points identified as outliers")
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
import numpy as np, pandas as pd
np.random.seed(42)
n = 800
df = pd.DataFrame({
'recency': np.random.exponential(30, n), # days since last purchase
'frequency': np.random.poisson(5, n) + 1, # orders per year
'monetary': np.random.exponential(150, n), # avg order value $
'tenure': np.random.randint(1, 60, n), # months as customer
})
X = StandardScaler().fit_transform(df.values)
# Select k via BIC
bic_scores = []
for k in range(2, 8):
gm = GaussianMixture(n_components=k, n_init=5, random_state=42)
gm.fit(X)
bic_scores.append((k, gm.bic(X)))
best_k = min(bic_scores, key=lambda t: t[1])[0]
print(f"Best k by BIC: {best_k}")
gm = GaussianMixture(n_components=best_k, covariance_type='full',
n_init=10, random_state=42)
gm.fit(X)
df['segment'] = gm.predict(X)
df['confidence'] = gm.predict_proba(X).max(axis=1)
print("
Segment profiles:")
print(df.groupby('segment')[['recency','frequency','monetary','tenure']].mean().round(1))
print(f"
Avg confidence: {df.confidence.mean():.3f}")
low_conf = (df.confidence < 0.6).sum()
print(f"Low-confidence assignments (<60%): {low_conf} ({low_conf/len(df):.1%})")
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score, silhouette_score
import numpy as np
np.random.seed(42)
c1 = np.random.multivariate_normal([0, 0], [[0.09,0],[0,0.09]], 100)
c2 = np.random.multivariate_normal([5, 5], [[2.25,0],[0,2.25]], 100)
c3 = np.random.multivariate_normal([2,-3], [[4,2],[2,1]], 100)
X = np.vstack([c1, c2, c3])
y_true = np.array([0]*100 + [1]*100 + [2]*100)
X_s = StandardScaler().fit_transform(X)
models = {
'AgglomerativeClustering': AgglomerativeClustering(n_clusters=3, linkage='ward'),
'GaussianMixture': GaussianMixture(n_components=3, covariance_type='full',
n_init=5, random_state=42),
'KMeans': KMeans(n_clusters=3, n_init=10, random_state=42),
}
# TODO: for each model, fit, predict, print ARI and silhouette