π§Ή Data Cleaning & Feature Engineering
12 topics • Click any card to expand
Before cleaning, you need to understand your data. Profiling reveals data types, missing values, distributions, and anomalies β the roadmap for your entire cleaning pipeline.
import pandas as pd
import numpy as np
# Create a realistic messy dataset
np.random.seed(42)
n = 500
df = pd.DataFrame({
"age": np.random.normal(35, 12, n).astype(int),
"income": np.random.lognormal(10.5, 0.8, n),
"education": np.random.choice(["High School", "Bachelor", "Master", "PhD", None], n, p=[0.3, 0.35, 0.2, 0.1, 0.05]),
"city": np.random.choice(["NYC", "LA", "Chicago", "Houston", " NYC ", "nyc"], n),
"signup_date": pd.date_range("2020-01-01", periods=n, freq="D").astype(str),
"purchase_amount": np.where(np.random.random(n) > 0.9, np.nan, np.random.exponential(50, n)),
"is_active": np.random.choice([True, False, "yes", "no", 1, 0], n),
})
# Inject some bad data
df.loc[10, "age"] = -5
df.loc[20, "age"] = 150
df.loc[30, "income"] = -1000
print(df.shape)
print(df.dtypes)
print(df.head())import pandas as pd
import numpy as np
# Comprehensive missing value report
def missing_report(df):
missing = df.isnull().sum()
pct = (missing / len(df) * 100).round(2)
dtypes = df.dtypes
report = pd.DataFrame({
"missing": missing,
"pct_missing": pct,
"dtype": dtypes,
"nunique": df.nunique(),
})
return report[report["missing"] > 0].sort_values("pct_missing", ascending=False)
# Create sample data
df = pd.DataFrame({
"A": [1, 2, np.nan, 4, 5],
"B": [np.nan, np.nan, 3, 4, 5],
"C": ["x", None, "z", "x", None],
"D": [1.0, 2.0, 3.0, 4.0, 5.0],
})
print(missing_report(df))
# Visualize missing patterns
print("\nMissing pattern (True = missing):")
print(df.isnull().astype(int))import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"salary": np.concatenate([np.random.normal(60000, 15000, 95), [250000, 300000, -5000, 0, 500000]]),
"age": np.concatenate([np.random.normal(35, 10, 97), [200, -3, 0]]),
})
# Basic statistics
print(df.describe().round(2))
# IQR method for outlier detection
def detect_outliers_iqr(series, factor=1.5):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - factor * IQR
upper = Q3 + factor * IQR
outliers = series[(series < lower) | (series > upper)]
return outliers, lower, upper
for col in df.columns:
outliers, lo, hi = detect_outliers_iqr(df[col])
print(f"\n{col}: {len(outliers)} outliers (range: {lo:.0f} to {hi:.0f})")
if len(outliers) > 0:
print(f" Values: {outliers.values}")import pandas as pd
import numpy as np
def data_quality_report(df, name="dataset"):
print(f"{'='*60}")
print(f"DATA QUALITY REPORT: {name}")
print(f"{'='*60}")
print(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns")
print(f"Memory: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
# Missing values
missing = df.isnull().sum()
if missing.any():
print(f"\nMissing Values:")
for col in missing[missing > 0].index:
pct = missing[col] / len(df) * 100
print(f" {col}: {missing[col]} ({pct:.1f}%)")
# Duplicates
dupes = df.duplicated().sum()
print(f"\nDuplicate rows: {dupes} ({dupes/len(df)*100:.1f}%)")
# Numeric outliers (Z-score > 3)
numeric = df.select_dtypes(include="number")
if not numeric.empty:
print(f"\nNumeric outliers (|z| > 3):")
for col in numeric.columns:
z = (numeric[col] - numeric[col].mean()) / numeric[col].std()
n_outliers = (z.abs() > 3).sum()
if n_outliers > 0:
print(f" {col}: {n_outliers} outliers")
# Cardinality check
print(f"\nColumn cardinality:")
for col in df.columns:
nuniq = df[col].nunique()
ratio = nuniq / len(df)
flag = " β potential ID" if ratio > 0.95 else " β low cardinality" if nuniq < 5 else ""
print(f" {col}: {nuniq} unique ({ratio:.1%}){flag}")
# Test
np.random.seed(42)
df = pd.DataFrame({
"user_id": range(100),
"age": np.concatenate([np.random.normal(35, 10, 97), [200, -3, 0]]),
"category": np.random.choice(["A", "B", "C"], 100),
"value": np.where(np.random.random(100) > 0.85, np.nan, np.random.exponential(50, 100)),
})
data_quality_report(df, "sample_data")Missing data is the most common data quality issue. The right strategy depends on WHY data is missing (MCAR, MAR, MNAR) and how much is missing.
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"age": [25, np.nan, 35, 40, np.nan, 55, 30, np.nan, 45, 50],
"income": [50000, 60000, np.nan, 80000, 45000, np.nan, 55000, 70000, np.nan, 90000],
"education": ["BS", "MS", "BS", None, "PhD", "BS", None, "MS", "BS", "PhD"],
"purchased": [1, 0, 1, 1, 0, 1, 0, 1, np.nan, 1],
})
# Check missingness
print("Missing per column:")
print(df.isnull().sum())
print("\nMissing percentage:")
print((df.isnull().mean() * 100).round(1))
# Check if missingness is correlated
print("\nMissing correlation matrix:")
print(df.isnull().corr().round(2))
# Types of missingness:
# MCAR: Missing Completely At Random (safe to drop or impute)
# MAR: Missing At Random (conditional on observed data)
# MNAR: Missing Not At Random (the value itself determines missingness)import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"age": [25, np.nan, 35, 40, np.nan, 55, 30, np.nan, 45, 50],
"salary": [50000, 60000, np.nan, 80000, np.nan, 120000, 55000, 70000, np.nan, 90000],
"dept": ["Sales", "Eng", "Sales", None, "Eng", "Eng", None, "Sales", "Eng", "Sales"],
})
# Strategy 1: Drop rows with any missing values
df_dropped = df.dropna()
print(f"After dropna: {len(df_dropped)} rows (lost {len(df) - len(df_dropped)})")
# Strategy 2: Fill with constants
df["dept_filled"] = df["dept"].fillna("Unknown")
# Strategy 3: Mean/median/mode imputation
df["age_mean"] = df["age"].fillna(df["age"].mean())
df["age_median"] = df["age"].fillna(df["age"].median())
df["salary_median"] = df["salary"].fillna(df["salary"].median())
# Strategy 4: Group-based imputation (smarter!)
df["salary_by_dept"] = df.groupby("dept")["salary"].transform(
lambda x: x.fillna(x.median())
)
# Strategy 5: Forward/backward fill (for time series)
df["age_ffill"] = df["age"].ffill()
df["age_bfill"] = df["age"].bfill()
print(df[["age", "age_mean", "age_median", "age_ffill"]].to_string())import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
np.random.seed(42)
X = np.array([
[25, 50000], [30, 60000], [35, np.nan], [40, 80000],
[np.nan, 45000], [55, 120000], [30, 55000], [np.nan, 70000],
])
# KNN Imputer β uses similar rows to estimate missing values
knn_imp = KNNImputer(n_neighbors=3)
X_knn = knn_imp.fit_transform(X)
print("KNN Imputed:")
print(X_knn.round(0))
# Iterative Imputer (MICE) β models each feature as a function of others
iter_imp = IterativeImputer(max_iter=10, random_state=42)
X_iter = iter_imp.fit_transform(X)
print("\nIterative (MICE) Imputed:")
print(X_iter.round(0))
# Adding a missing indicator feature (useful for models!)
from sklearn.impute import MissingIndicator
indicator = MissingIndicator()
missing_flags = indicator.fit_transform(X)
print(f"\nMissing indicator columns: {indicator.features_}")
print(missing_flags.astype(int))import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
np.random.seed(42)
n = 200
df = pd.DataFrame({
"age": np.where(np.random.random(n) > 0.9, np.nan, np.random.normal(45, 15, n)),
"income": np.where(np.random.random(n) > 0.8, np.nan, np.random.lognormal(10.5, 0.7, n)),
"employed": np.random.choice([1, 0], n, p=[0.7, 0.3]),
"bmi": np.where(np.random.random(n) > 0.85, np.nan, np.random.normal(26, 5, n)),
})
print(f"Before: {df.isnull().sum().to_dict()}")
# 1. Age (MCAR) β median imputation is fine
df["age"] = df["age"].fillna(df["age"].median())
# 2. Income (MAR β depends on employment) β group-based
df["income"] = df.groupby("employed")["income"].transform(
lambda x: x.fillna(x.median())
)
# Fill any remaining NaN (if a group was all NaN)
df["income"] = df["income"].fillna(df["income"].median())
# 3. BMI β KNN imputation (use correlated features)
knn = KNNImputer(n_neighbors=5)
df[["age", "income", "bmi"]] = knn.fit_transform(df[["age", "income", "bmi"]])
# 4. Add missing indicators for model features
df["income_was_missing"] = df["income"].isnull().astype(int)
df["bmi_was_missing"] = df["bmi"].isnull().astype(int)
print(f"After: {df.isnull().sum().to_dict()}")
print(df.describe().round(1))import pandas as pd
import numpy as np
class SmartImputer:
def __init__(self, numeric_strategy="median", categorical_strategy="mode",
high_missing_threshold=0.5):
self.numeric_strategy = numeric_strategy
self.categorical_strategy = categorical_strategy
self.threshold = high_missing_threshold
self.fill_values = {}
def fit(self, df):
for col in df.columns:
pct_missing = df[col].isnull().mean()
if pct_missing > self.threshold:
self.fill_values[col] = "DROP"
elif df[col].dtype in ["float64", "int64"]:
# TODO: compute fill value based on numeric_strategy
pass
else:
# TODO: compute fill value based on categorical_strategy
pass
return self
def transform(self, df):
df = df.copy()
# TODO: apply fill_values, drop columns marked "DROP"
return df
# Test
np.random.seed(42)
df = pd.DataFrame({
"A": [1, np.nan, 3, 4, np.nan],
"B": [np.nan, np.nan, np.nan, np.nan, 5], # 80% missing β should drop
"C": ["x", None, "y", "x", "y"],
})
imputer = SmartImputer()
imputer.fit(df)
result = imputer.transform(df)
print(result)Duplicate records and inconsistent formatting are silent data quality killers. A single city spelled three different ways will break your groupby analysis.
import pandas as pd
df = pd.DataFrame({
"name": ["Alice", "Bob", "Alice", "Carol", "Bob", "Alice"],
"email": ["alice@co.com", "bob@co.com", "alice@co.com", "carol@co.com", "bob@co.com", "alice2@co.com"],
"purchase": [100, 200, 100, 150, 250, 300],
})
# Find exact duplicates
print(f"Duplicate rows: {df.duplicated().sum()}")
print(df[df.duplicated(keep=False)]) # show ALL duplicates
# Duplicates based on subset of columns
print(f"\nDuplicate names: {df.duplicated(subset=['name']).sum()}")
print(f"Duplicate name+email: {df.duplicated(subset=['name', 'email']).sum()}")
# Remove duplicates
df_clean = df.drop_duplicates()
print(f"\nAfter dedup (exact): {len(df_clean)} rows")
# Keep last occurrence
df_clean = df.drop_duplicates(subset=["name", "email"], keep="last")
print(f"After dedup (name+email, keep last): {len(df_clean)} rows")import pandas as pd
df = pd.DataFrame({
"city": ["New York", " new york ", "NEW YORK", "nyc", "N.Y.C.", "Los Angeles", "LA", "los angeles"],
"state": ["NY", "ny", " NY ", "NY", "NY", "CA", "ca", "CA"],
"phone": ["555-1234", "(555) 123-4567", "5551234", "+1-555-123-4567", "555.123.4567", "N/A", "", None],
})
# Step 1: Strip whitespace and standardize case
df["city_clean"] = df["city"].str.strip().str.title()
df["state_clean"] = df["state"].str.strip().str.upper()
# Step 2: Map common variations
city_mapping = {
"Nyc": "New York",
"N.Y.C.": "New York",
"La": "Los Angeles",
}
df["city_clean"] = df["city_clean"].replace(city_mapping)
# Step 3: Clean phone numbers (keep only digits)
df["phone_clean"] = df["phone"].fillna("").str.replace(r"[^\d]", "", regex=True)
df["phone_clean"] = df["phone_clean"].replace("", None)
print(df[["city", "city_clean", "state", "state_clean"]].to_string())
print()
print(df[["phone", "phone_clean"]].to_string())import pandas as pd
import numpy as np
# Common type issues in real data
df = pd.DataFrame({
"price": ["$10.99", "$24.50", "15.00", "$8.99", "N/A"],
"date": ["2024-01-15", "01/16/2024", "Jan 17, 2024", "2024-01-18", "invalid"],
"is_active": ["yes", "no", "True", "1", "false"],
"rating": ["4.5", "3.8", "five", "4.2", "4.9"],
"quantity": ["10", "20", "30.0", "forty", "50"],
})
# Fix prices: remove $ and convert
df["price_clean"] = df["price"].str.replace("$", "", regex=False)
df["price_clean"] = pd.to_numeric(df["price_clean"], errors="coerce")
# Fix dates: parse with mixed formats
df["date_clean"] = pd.to_datetime(df["date"], format="mixed", errors="coerce")
# Fix booleans
bool_map = {"yes": True, "no": False, "true": True, "false": False, "1": True, "0": False}
df["active_clean"] = df["is_active"].str.lower().map(bool_map)
# Fix numerics with error coercion
df["rating_clean"] = pd.to_numeric(df["rating"], errors="coerce")
df["qty_clean"] = pd.to_numeric(df["quantity"], errors="coerce").astype("Int64")
print(df[["price", "price_clean", "date", "date_clean"]].to_string())
print()
print(df[["is_active", "active_clean", "rating", "rating_clean"]].to_string())import pandas as pd
import numpy as np
# Simulated messy merged data
df = pd.DataFrame({
"name": ["John Smith", "john smith", "JOHN SMITH", "Jane Doe", "jane doe"],
"email": ["john@co.com", "john@co.com", "john@co.com", "jane@co.com", "jane@co.com"],
"signup": ["2023-01-15", "01/15/2023", "Jan 15, 2023", "2023-06-20", "06/20/2023"],
"plan": ["premium", "PREMIUM", "Premium", "basic", "Basic"],
"revenue": ["$1200", "1200.00", "$1,200.00", "$500", "500"],
})
def clean_customer_data(df):
df = df.copy()
# Standardize text fields
df["name"] = df["name"].str.strip().str.title()
df["email"] = df["email"].str.strip().str.lower()
df["plan"] = df["plan"].str.strip().str.lower()
# Parse dates
df["signup"] = pd.to_datetime(df["signup"], format="mixed")
# Clean revenue
df["revenue"] = (df["revenue"].astype(str)
.str.replace("[$,]", "", regex=True))
df["revenue"] = pd.to_numeric(df["revenue"])
# Deduplicate (keep most recent)
df = df.sort_values("signup").drop_duplicates(subset=["email"], keep="last")
return df.reset_index(drop=True)
result = clean_customer_data(df)
print(result)
print(f"\nReduced from {len(df)} to {len(result)} rows")Outliers can be legitimate extreme values or data errors. The right approach depends on context β sometimes you remove them, sometimes you keep them, sometimes you cap them.
import pandas as pd
import numpy as np
np.random.seed(42)
data = np.concatenate([np.random.normal(100, 15, 95), [200, 250, 10, 5, 300]])
df = pd.DataFrame({"value": data})
# Method 1: Z-score (assumes normal distribution)
df["z_score"] = (df["value"] - df["value"].mean()) / df["value"].std()
z_outliers = df[df["z_score"].abs() > 3]
print(f"Z-score outliers (|z| > 3): {len(z_outliers)}")
# Method 2: IQR (no distribution assumption)
Q1 = df["value"].quantile(0.25)
Q3 = df["value"].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
iqr_outliers = df[(df["value"] < lower) | (df["value"] > upper)]
print(f"IQR outliers: {len(iqr_outliers)} (range: {lower:.1f} to {upper:.1f})")
# Method 3: Modified Z-score (uses median, more robust)
median = df["value"].median()
mad = (df["value"] - median).abs().median()
df["modified_z"] = 0.6745 * (df["value"] - median) / mad
mod_outliers = df[df["modified_z"].abs() > 3.5]
print(f"Modified Z-score outliers: {len(mod_outliers)}")import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"salary": np.concatenate([np.random.normal(60000, 15000, 95), [250000, 300000, -5000, 0, 500000]]),
})
# Strategy 1: Remove outliers
Q1 = df["salary"].quantile(0.25)
Q3 = df["salary"].quantile(0.75)
IQR = Q3 - Q1
mask = (df["salary"] >= Q1 - 1.5*IQR) & (df["salary"] <= Q3 + 1.5*IQR)
df_removed = df[mask]
print(f"Removed: {len(df)} β {len(df_removed)} rows")
# Strategy 2: Winsorize (cap at percentiles)
lower = df["salary"].quantile(0.01)
upper = df["salary"].quantile(0.99)
df["salary_capped"] = df["salary"].clip(lower, upper)
print(f"\nCapped range: {lower:.0f} to {upper:.0f}")
# Strategy 3: Log transform (reduces skew)
df["salary_log"] = np.log1p(df["salary"].clip(lower=0))
# Strategy 4: Flag outliers (keep them, but add indicator)
df["is_outlier"] = (~mask).astype(int)
print(f"\nOriginal stats: mean={df['salary'].mean():.0f}, std={df['salary'].std():.0f}")
print(f"Capped stats: mean={df['salary_capped'].mean():.0f}, std={df['salary_capped'].std():.0f}")import pandas as pd
import numpy as np
np.random.seed(42)
n = 1000
df = pd.DataFrame({
"amount": np.concatenate([
np.random.lognormal(3, 1, 900), # normal transactions
np.random.lognormal(6, 0.5, 80), # high-value legitimate
np.array([0.01, 0.001, -50, 999999] * 5), # errors/fraud
]),
"is_fraud": np.concatenate([
np.zeros(900), np.zeros(80), np.ones(20)
]),
})
# Strategy: Don't remove β create features from outlier signals
df["log_amount"] = np.log1p(df["amount"].clip(lower=0))
df["amount_zscore"] = (df["amount"] - df["amount"].mean()) / df["amount"].std()
df["is_negative"] = (df["amount"] < 0).astype(int)
df["is_extreme"] = (df["amount_zscore"].abs() > 3).astype(int)
# Percentile rank (robust to outliers)
df["amount_pctile"] = df["amount"].rank(pct=True)
print("Feature correlations with fraud:")
for col in ["log_amount", "amount_zscore", "is_negative", "is_extreme", "amount_pctile"]:
corr = df[col].corr(df["is_fraud"])
print(f" {col:20s}: {corr:.3f}")ML models need numbers, not strings. Encoding transforms categorical data into numerical features. The right encoding depends on the variable type and the model you're using.
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
df = pd.DataFrame({
"color": ["red", "blue", "green", "red", "blue"],
"size": ["S", "M", "L", "XL", "M"],
"quality": ["low", "medium", "high", "medium", "high"],
})
# Label Encoding β maps categories to integers
# Good for: ordinal variables, tree-based models
le = LabelEncoder()
df["color_label"] = le.fit_transform(df["color"])
print("Label encoded:")
print(df[["color", "color_label"]])
# One-Hot Encoding β creates binary columns
# Good for: nominal variables, linear models, neural nets
df_onehot = pd.get_dummies(df[["color"]], prefix="color", drop_first=False)
print("\nOne-hot encoded:")
print(df_onehot)
# drop_first=True to avoid multicollinearity (for linear models)
df_onehot_drop = pd.get_dummies(df[["color"]], prefix="color", drop_first=True)
print("\nWith drop_first:")
print(df_onehot_drop)import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
df = pd.DataFrame({
"education": ["High School", "Bachelor", "Master", "PhD", "Bachelor"],
"satisfaction": ["low", "medium", "high", "very high", "medium"],
})
# Ordinal encoding with explicit order
edu_order = ["High School", "Bachelor", "Master", "PhD"]
sat_order = ["low", "medium", "high", "very high"]
oe = OrdinalEncoder(categories=[edu_order, sat_order])
df[["edu_encoded", "sat_encoded"]] = oe.fit_transform(df[["education", "satisfaction"]])
print(df)
# High School=0, Bachelor=1, Master=2, PhD=3
# low=0, medium=1, high=2, very high=3import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"city": np.random.choice(["NYC", "LA", "Chicago", "Houston", "Phoenix"], 100),
"price": np.random.normal(50, 15, 100),
})
# Frequency encoding β replace with occurrence count
freq = df["city"].value_counts()
df["city_freq"] = df["city"].map(freq)
# Target encoding β replace with mean of target variable
# IMPORTANT: use only training data to compute means (avoid leakage!)
target_means = df.groupby("city")["price"].mean()
df["city_target"] = df["city"].map(target_means)
print("Encoding comparison:")
print(df.groupby("city").agg(
count=("city_freq", "first"),
target_mean=("city_target", "first"),
).round(2))
# For production: use sklearn's TargetEncoder (handles leakage)
from sklearn.preprocessing import TargetEncoder
te = TargetEncoder(smooth="auto")
df["city_target_sklearn"] = te.fit_transform(
df[["city"]], df["price"]
)import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder, TargetEncoder
from sklearn.compose import ColumnTransformer
np.random.seed(42)
n = 500
df = pd.DataFrame({
"color": np.random.choice(["red", "blue", "green"], n), # nominal, low cardinality
"size": np.random.choice(["XS", "S", "M", "L", "XL"], n), # ordinal
"zip_code": np.random.choice([f"{z:05d}" for z in range(10000, 10200)], n), # high cardinality
"price": np.random.normal(100, 30, n), # target
})
# Strategy per column type:
# 1. Nominal + low cardinality β One-hot
# 2. Ordinal β Ordinal encoding
# 3. High cardinality β Target encoding
# One-hot for color
color_dummies = pd.get_dummies(df["color"], prefix="color", drop_first=True)
# Ordinal for size
size_order = [["XS", "S", "M", "L", "XL"]]
oe = OrdinalEncoder(categories=size_order)
df["size_ord"] = oe.fit_transform(df[["size"]])
# Target encoding for zip_code
te = TargetEncoder(smooth="auto")
df["zip_target"] = te.fit_transform(df[["zip_code"]], df["price"])
# Combine
result = pd.concat([color_dummies, df[["size_ord", "zip_target", "price"]]], axis=1)
print(result.head())
print(f"\nFinal shape: {result.shape}")Many ML algorithms (linear regression, SVM, KNN, neural nets) are sensitive to feature scale. Scaling puts features on comparable ranges so no single feature dominates.
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
np.random.seed(42)
df = pd.DataFrame({
"age": np.random.normal(35, 10, 100),
"income": np.random.lognormal(10.5, 0.8, 100), # skewed
"score": np.concatenate([np.random.normal(50, 10, 95), [200, 250, 0, 5, 300]]), # with outliers
})
# StandardScaler: mean=0, std=1 (assumes normal distribution)
scaler_std = StandardScaler()
df_std = pd.DataFrame(scaler_std.fit_transform(df), columns=df.columns)
# MinMaxScaler: scales to [0, 1] (sensitive to outliers)
scaler_mm = MinMaxScaler()
df_mm = pd.DataFrame(scaler_mm.fit_transform(df), columns=df.columns)
# RobustScaler: uses median and IQR (robust to outliers)
scaler_rob = RobustScaler()
df_rob = pd.DataFrame(scaler_rob.fit_transform(df), columns=df.columns)
print("Original stats:")
print(df.describe().round(1).loc[["mean", "std", "min", "max"]])
print("\nStandardScaler:")
print(df_std.describe().round(2).loc[["mean", "std", "min", "max"]])
print("\nMinMaxScaler:")
print(df_mm.describe().round(2).loc[["mean", "std"]])
print("\nRobustScaler:")
print(df_rob.describe().round(2).loc[["mean", "std"]])# Decision guide:
#
# StandardScaler (Z-score normalization)
# Use when: data is roughly normal, no extreme outliers
# Models: Linear/Logistic Regression, SVM, PCA, Neural Nets
# Formula: (x - mean) / std
#
# MinMaxScaler
# Use when: you need bounded [0,1] range, no extreme outliers
# Models: Neural networks (especially with sigmoid), KNN
# Formula: (x - min) / (max - min)
#
# RobustScaler
# Use when: data has outliers
# Models: Any model sensitive to scale
# Formula: (x - median) / IQR
#
# No scaling needed:
# Models: Tree-based (Random Forest, XGBoost, LightGBM)
# These are scale-invariant β scaling won't help or hurt
import pandas as pd
guide = pd.DataFrame({
"Scaler": ["StandardScaler", "MinMaxScaler", "RobustScaler", "None"],
"Best For": ["Normal data", "Bounded range", "Data with outliers", "Tree models"],
"Outlier Robust": ["No", "No", "Yes", "N/A"],
"Range": ["~(-3, 3)", "[0, 1]", "Centered at 0", "Original"],
})
print(guide.to_string(index=False))import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
np.random.seed(42)
X = np.random.normal(50, 15, (200, 3))
y = np.random.choice([0, 1], 200)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# CORRECT: fit on train, transform both
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # fit + transform
X_test_scaled = scaler.transform(X_test) # transform only!
print(f"Train mean: {X_train_scaled.mean(axis=0).round(6)}") # ~0
print(f"Test mean: {X_test_scaled.mean(axis=0).round(2)}") # close to 0, not exact
# WRONG: fitting on test data (data leakage!)
# scaler.fit_transform(X_test) # NEVER DO THIS!
# This leaks test set statistics into your modelimport pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, RobustScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
np.random.seed(42)
df = pd.DataFrame({
"age": np.random.normal(35, 10, 100),
"income": np.concatenate([np.random.lognormal(10.5, 0.8, 95), [1e7]*5]), # outliers
"score": np.random.normal(500, 100, 100),
"education": np.random.choice(["HS", "BS", "MS", "PhD"], 100),
"target": np.random.choice([0, 1], 100),
})
# Define column groups
normal_cols = ["age", "score"] # roughly normal β StandardScaler
skewed_cols = ["income"] # skewed + outliers β RobustScaler
cat_cols = ["education"] # categorical β OneHotEncoder
preprocessor = ColumnTransformer([
("normal", Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
]), normal_cols),
("skewed", Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", RobustScaler()),
]), skewed_cols),
("cat", Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OneHotEncoder(drop="first", sparse_output=False)),
]), cat_cols),
])
X = preprocessor.fit_transform(df.drop("target", axis=1))
print(f"Transformed shape: {X.shape}")
print(f"Feature names: {preprocessor.get_feature_names_out()}")Feature engineering is the art of creating new informative features from existing data. Good features can improve model performance more than better algorithms.
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"length": np.random.uniform(1, 10, 100),
"width": np.random.uniform(1, 5, 100),
"height": np.random.uniform(1, 3, 100),
"price": np.random.lognormal(5, 1, 100),
"quantity": np.random.randint(1, 100, 100),
})
# Ratios
df["aspect_ratio"] = df["length"] / df["width"]
df["price_per_unit"] = df["price"] / df["quantity"]
# Products (interaction features)
df["volume"] = df["length"] * df["width"] * df["height"]
df["surface_area"] = 2 * (df["length"]*df["width"] + df["width"]*df["height"] + df["length"]*df["height"])
# Log transforms (for skewed data)
df["log_price"] = np.log1p(df["price"])
# Power transforms
df["length_squared"] = df["length"] ** 2
df["sqrt_quantity"] = np.sqrt(df["quantity"])
# Binning
df["price_bin"] = pd.cut(df["price"], bins=5, labels=["very_low", "low", "medium", "high", "very_high"])
df["qty_bucket"] = pd.qcut(df["quantity"], q=4, labels=["Q1", "Q2", "Q3", "Q4"])
print(df[["price", "log_price", "price_bin", "quantity", "qty_bucket"]].head(10))import pandas as pd
import numpy as np
np.random.seed(42)
dates = pd.date_range("2023-01-01", periods=365, freq="D")
df = pd.DataFrame({
"date": dates,
"sales": np.random.poisson(100, 365) + np.sin(np.arange(365) / 365 * 2 * np.pi) * 30,
})
# Extract temporal features
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["day"] = df["date"].dt.day
df["day_of_week"] = df["date"].dt.dayofweek # 0=Mon, 6=Sun
df["day_name"] = df["date"].dt.day_name()
df["quarter"] = df["date"].dt.quarter
df["is_weekend"] = df["day_of_week"].isin([5, 6]).astype(int)
df["is_month_start"] = df["date"].dt.is_month_start.astype(int)
df["is_month_end"] = df["date"].dt.is_month_end.astype(int)
df["week_of_year"] = df["date"].dt.isocalendar().week.astype(int)
# Cyclical encoding (for models that don't understand periodicity)
df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)
df["dow_sin"] = np.sin(2 * np.pi * df["day_of_week"] / 7)
df["dow_cos"] = np.cos(2 * np.pi * df["day_of_week"] / 7)
print(df[["date", "month", "day_of_week", "is_weekend", "month_sin", "month_cos"]].head(10))import pandas as pd
df = pd.DataFrame({
"review": [
"Great product! Highly recommend!!",
"Terrible quality. Broke after one day.",
"It's okay. Nothing special.",
"ABSOLUTELY AMAZING MUST BUY!!!",
"Worst purchase ever. Total waste of money.",
],
})
# Length features
df["char_count"] = df["review"].str.len()
df["word_count"] = df["review"].str.split().str.len()
df["avg_word_len"] = df["char_count"] / df["word_count"]
# Punctuation features
df["exclamation_count"] = df["review"].str.count("!")
df["question_count"] = df["review"].str.count("\?")
# Case features
df["upper_ratio"] = df["review"].apply(lambda x: sum(c.isupper() for c in x) / len(x))
# Sentiment proxy features (without NLP model)
positive_words = {"great", "amazing", "recommend", "love", "best", "excellent"}
negative_words = {"terrible", "worst", "broke", "waste", "bad", "awful"}
df["positive_count"] = df["review"].str.lower().apply(
lambda x: sum(1 for w in x.split() if w.strip(".,!?") in positive_words)
)
df["negative_count"] = df["review"].str.lower().apply(
lambda x: sum(1 for w in x.split() if w.strip(".,!?") in negative_words)
)
print(df[["review", "word_count", "exclamation_count", "upper_ratio", "positive_count", "negative_count"]].to_string())import pandas as pd
import numpy as np
np.random.seed(42)
n_transactions = 5000
n_customers = 200
transactions = pd.DataFrame({
"customer_id": np.random.randint(1, n_customers + 1, n_transactions),
"date": pd.date_range("2023-01-01", periods=n_transactions, freq="2h"),
"amount": np.random.lognormal(3, 1, n_transactions),
"category": np.random.choice(["electronics", "clothing", "food", "books"], n_transactions),
})
# Engineer customer-level features from transactions
def build_customer_features(txns):
today = txns["date"].max()
features = txns.groupby("customer_id").agg(
total_transactions=("amount", "count"),
total_spend=("amount", "sum"),
avg_spend=("amount", "mean"),
std_spend=("amount", "std"),
max_spend=("amount", "max"),
first_purchase=("date", "min"),
last_purchase=("date", "max"),
unique_categories=("category", "nunique"),
)
# Recency, Frequency, Monetary (RFM)
features["recency_days"] = (today - features["last_purchase"]).dt.days
features["tenure_days"] = (today - features["first_purchase"]).dt.days
features["purchase_frequency"] = features["total_transactions"] / (features["tenure_days"] + 1) * 30
# Coefficient of variation (spending consistency)
features["spend_cv"] = features["std_spend"] / features["avg_spend"]
# Category diversity ratio
features["category_diversity"] = features["unique_categories"] / 4
return features.drop(columns=["first_purchase", "last_purchase"])
customer_features = build_customer_features(transactions)
print(customer_features.describe().round(2))
print(f"\nFeature count: {customer_features.shape[1]}")import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
"date": pd.date_range("2023-01-01", periods=1000, freq="D"),
"store_id": np.random.choice(["A", "B", "C"], 1000),
"product": np.random.choice(["widget", "gadget", "tool"], 1000),
"quantity": np.random.poisson(10, 1000),
"unit_price": np.random.uniform(5, 100, 1000).round(2),
"discount_pct": np.random.choice([0, 5, 10, 15, 20], 1000),
})
# TODO: Create at least 10 features:
# 1. Revenue = quantity * unit_price * (1 - discount_pct/100)
# 2. Is weekend
# 3. Month, quarter
# 4. Discount flag (any discount or not)
# 5. Revenue per unit
# 6. Rolling 7-day average revenue per store
# 7. Days since last purchase per product
# 8. Store-level average price
# 9. Product popularity rank
# 10. Your own creative feature!
# Show the final DataFrame with all features
Polynomial features capture non-linear relationships, while interaction features capture how two variables together affect the target differently than each alone.
import numpy as np
import pandas as pd
from sklearn.preprocessing import PolynomialFeatures
# Simple example
X = np.array([[2, 3], [4, 5], [6, 7]])
# Degree 2 polynomial features
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(X)
print("Original features: [a, b]")
print("Polynomial features:", poly.get_feature_names_out())
print(pd.DataFrame(X_poly, columns=poly.get_feature_names_out()))
# Interaction only (no powers)
inter = PolynomialFeatures(degree=2, interaction_only=True, include_bias=False)
X_inter = inter.fit_transform(X)
print("\nInteraction only:", inter.get_feature_names_out())
print(pd.DataFrame(X_inter, columns=inter.get_feature_names_out()))import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.metrics import r2_score
# Data with non-linear relationship
np.random.seed(42)
X = np.random.uniform(0, 10, 100).reshape(-1, 1)
y = 3 * X.ravel()**2 - 2 * X.ravel() + 5 + np.random.normal(0, 10, 100)
# Linear model (poor fit for quadratic data)
lr = LinearRegression()
lr.fit(X, y)
print(f"Linear R2: {r2_score(y, lr.predict(X)):.4f}")
# Polynomial features + linear model = polynomial regression!
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(X)
lr_poly = LinearRegression()
lr_poly.fit(X_poly, y)
print(f"Poly(2) R2: {r2_score(y, lr_poly.predict(X_poly)):.4f}")
# Degree 3
poly3 = PolynomialFeatures(degree=3, include_bias=False)
X_poly3 = poly3.fit_transform(X)
lr_poly3 = LinearRegression()
lr_poly3.fit(X_poly3, y)
print(f"Poly(3) R2: {r2_score(y, lr_poly3.predict(X_poly3)):.4f}")
# WARNING: too many features with high degree
for d in [2, 3, 4, 5]:
poly = PolynomialFeatures(degree=d, include_bias=False)
print(f"Degree {d}: {poly.fit_transform(np.zeros((1, 5))).shape[1]} features from 5 inputs")When one class dominates (e.g., 99% non-fraud, 1% fraud), models tend to predict the majority class. Resampling and weighting techniques fix this.
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
# Create imbalanced dataset (95% class 0, 5% class 1)
X, y = make_classification(n_samples=2000, n_features=10,
weights=[0.95, 0.05], random_state=42)
print(f"Class distribution:")
print(pd.Series(y).value_counts().sort_index())
print(f"\nImbalance ratio: {sum(y==0)/sum(y==1):.0f}:1")
# Naive model always predicts majority class
naive_accuracy = sum(y==0) / len(y)
print(f"Naive accuracy (always predict 0): {naive_accuracy:.1%}")
print("β This looks great but catches 0% of the minority class!")import numpy as np
from sklearn.datasets import make_classification
from collections import Counter
# pip install imbalanced-learn
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from imblearn.combine import SMOTETomek
X, y = make_classification(n_samples=2000, n_features=10,
weights=[0.95, 0.05], random_state=42)
print(f"Original: {Counter(y)}")
# Random oversampling (duplicate minority samples)
ros = RandomOverSampler(random_state=42)
X_ros, y_ros = ros.fit_resample(X, y)
print(f"Random oversample: {Counter(y_ros)}")
# SMOTE (create synthetic minority samples)
smote = SMOTE(random_state=42)
X_smote, y_smote = smote.fit_resample(X, y)
print(f"SMOTE: {Counter(y_smote)}")
# Random undersampling (remove majority samples)
rus = RandomUnderSampler(random_state=42)
X_rus, y_rus = rus.fit_resample(X, y)
print(f"Random undersample: {Counter(y_rus)}")
# SMOTE + Tomek links (combined approach)
smt = SMOTETomek(random_state=42)
X_smt, y_smt = smt.fit_resample(X, y)
print(f"SMOTE+Tomek: {Counter(y_smt)}")import numpy as np
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = make_classification(n_samples=2000, n_features=10,
weights=[0.95, 0.05], random_state=42)
# Without class weights
lr = LogisticRegression(max_iter=1000)
scores = cross_val_score(lr, X, y, cv=5, scoring="f1")
print(f"LogReg (no weights) F1: {scores.mean():.3f}")
# With class_weight='balanced' (automatically adjusts)
lr_bal = LogisticRegression(class_weight="balanced", max_iter=1000)
scores_bal = cross_val_score(lr_bal, X, y, cv=5, scoring="f1")
print(f"LogReg (balanced) F1: {scores_bal.mean():.3f}")
# Random Forest with class weights
rf = RandomForestClassifier(n_estimators=100, random_state=42)
scores_rf = cross_val_score(rf, X, y, cv=5, scoring="f1")
print(f"RF (no weights) F1: {scores_rf.mean():.3f}")
rf_bal = RandomForestClassifier(n_estimators=100, class_weight="balanced", random_state=42)
scores_rf_bal = cross_val_score(rf_bal, X, y, cv=5, scoring="f1")
print(f"RF (balanced) F1: {scores_rf_bal.mean():.3f}")import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
# Simulated fraud data (0.5% fraud rate)
X, y = make_classification(n_samples=10000, n_features=15,
weights=[0.995, 0.005],
n_informative=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
# Pipeline: SMOTE + Random Forest
pipeline = ImbPipeline([
("smote", SMOTE(sampling_strategy=0.3, random_state=42)), # 30% minority ratio
("clf", RandomForestClassifier(
n_estimators=200,
class_weight="balanced_subsample",
random_state=42,
)),
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
print("Fraud Detection Results:")
print(classification_report(y_test, y_pred, target_names=["Legitimate", "Fraud"]))Not all features help your model. Feature selection removes irrelevant or redundant features, improving model performance, interpretability, and training speed.
import pandas as pd
import numpy as np
np.random.seed(42)
n = 500
df = pd.DataFrame({
"useful_1": np.random.normal(0, 1, n),
"useful_2": np.random.normal(0, 1, n),
"correlated": None, # will correlate with useful_1
"constant": 5.0, # zero variance β useless
"near_constant": np.where(np.random.random(n) > 0.99, 1, 0), # almost constant
"random_noise": np.random.normal(0, 1, n),
})
df["correlated"] = df["useful_1"] * 0.9 + np.random.normal(0, 0.1, n)
df["target"] = 2 * df["useful_1"] + 3 * df["useful_2"] + np.random.normal(0, 0.5, n)
# Remove zero/near-zero variance
from sklearn.feature_selection import VarianceThreshold
selector = VarianceThreshold(threshold=0.01)
cols_before = df.drop("target", axis=1).columns.tolist()
X_var = selector.fit_transform(df.drop("target", axis=1))
cols_after = [c for c, keep in zip(cols_before, selector.get_support()) if keep]
print(f"Variance filter: {len(cols_before)} β {len(cols_after)} features")
print(f"Removed: {set(cols_before) - set(cols_after)}")
# Remove highly correlated features
corr = df[cols_after].corr().abs()
upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool))
to_drop = [c for c in upper.columns if any(upper[c] > 0.85)]
print(f"\nHighly correlated (>0.85): {to_drop}")
# Correlation with target
target_corr = df[cols_after].corrwith(df["target"]).abs().sort_values(ascending=False)
print(f"\nCorrelation with target:")
print(target_corr)import numpy as np
from sklearn.datasets import make_classification
from sklearn.feature_selection import RFE, SelectFromModel
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LassoCV
X, y = make_classification(n_samples=500, n_features=20,
n_informative=5, n_redundant=5,
random_state=42)
# Recursive Feature Elimination (RFE)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rfe = RFE(rf, n_features_to_select=5)
rfe.fit(X, y)
print(f"RFE selected features: {np.where(rfe.support_)[0]}")
print(f"Feature ranking: {rfe.ranking_}")
# SelectFromModel with Random Forest importance
sfm = SelectFromModel(rf, max_features=5)
sfm.fit(X, y)
print(f"\nRF importance selected: {np.where(sfm.get_support())[0]}")
# Lasso regularization (automatic feature selection)
from sklearn.preprocessing import StandardScaler
X_scaled = StandardScaler().fit_transform(X)
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X_scaled, y)
important = np.where(np.abs(lasso.coef_) > 0.01)[0]
print(f"\nLasso selected ({len(important)} features): {important}")import numpy as np
from sklearn.datasets import make_classification
from sklearn.feature_selection import mutual_info_classif, SelectKBest
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = make_classification(n_samples=1000, n_features=50,
n_informative=10, n_redundant=15,
random_state=42)
# Method 1: Mutual Information
mi = mutual_info_classif(X, y, random_state=42)
top_mi = np.argsort(mi)[-10:]
print(f"Top 10 by mutual info: {sorted(top_mi)}")
# Method 2: Random Forest importance
rf = RandomForestClassifier(n_estimators=200, random_state=42)
rf.fit(X, y)
top_rf = np.argsort(rf.feature_importances_)[-10:]
print(f"Top 10 by RF importance: {sorted(top_rf)}")
# Method 3: Consensus β features selected by both methods
consensus = set(top_mi) & set(top_rf)
print(f"\nConsensus features: {sorted(consensus)}")
# Compare performance
for name, features in [("All 50", list(range(50))), ("MI top 10", top_mi),
("RF top 10", top_rf), ("Consensus", list(consensus))]:
scores = cross_val_score(rf, X[:, features], y, cv=5, scoring="accuracy")
print(f" {name:15s}: {scores.mean():.4f} (+/- {scores.std():.4f})")Data leakage is when information from the test set or the future leaks into training, giving unrealistically good results that fail in production. It's the #1 mistake in ML projects.
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
X = np.random.normal(0, 1, (1000, 5))
y = (X[:, 0] + X[:, 1] > 0).astype(int)
# LEAK 1: Scaling before splitting
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # fit on ALL data including test!
X_train, X_test = train_test_split(X_scaled, test_size=0.2)
# Fix: split first, then fit_transform on train only
# LEAK 2: Feature selection on all data
from sklearn.feature_selection import SelectKBest, f_classif
selector = SelectKBest(f_classif, k=3)
X_selected = selector.fit_transform(X, y) # uses ALL labels!
# Fix: select features using only training data
# LEAK 3: Target encoding on all data
df = pd.DataFrame({"cat": np.random.choice(["A", "B"], 1000), "target": y})
df["cat_encoded"] = df.groupby("cat")["target"].transform("mean")
# This uses the test set's target values in the encoding!
# Fix: encode using only training fold values
print("Common leakage sources:")
print("1. Scaling/normalizing before train-test split")
print("2. Feature selection using full dataset")
print("3. Target encoding using full dataset")
print("4. Using future data to predict the past (time series)")
print("5. Including features derived from the target")import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
np.random.seed(42)
X = np.random.normal(0, 1, (500, 20))
y = (X[:, 0] + X[:, 1] + X[:, 2] > 0).astype(int)
# Pipeline ensures each step fits ONLY on training data
pipe = Pipeline([
("scaler", StandardScaler()),
("selector", SelectKBest(f_classif, k=5)),
("model", LogisticRegression()),
])
# cross_val_score handles the split correctly:
# For each fold: fit scaler β fit selector β fit model on TRAIN
# Then: transform test β select features β predict on TEST
scores = cross_val_score(pipe, X, y, cv=5, scoring="accuracy")
print(f"Pipeline CV accuracy: {scores.mean():.4f} (+/- {scores.std():.4f})")
print("Each fold's scaler and selector were fit ONLY on training data!")import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
np.random.seed(42)
dates = pd.date_range("2023-01-01", periods=365, freq="D")
df = pd.DataFrame({
"date": dates,
"value": np.cumsum(np.random.normal(0, 1, 365)) + 100,
})
# WRONG: random split shuffles time
# train_test_split would mix future data into training!
# CORRECT: TimeSeriesSplit preserves temporal order
tss = TimeSeriesSplit(n_splits=5)
for i, (train_idx, test_idx) in enumerate(tss.split(df)):
train_end = df.iloc[train_idx[-1]]["date"].date()
test_start = df.iloc[test_idx[0]]["date"].date()
print(f"Fold {i+1}: Train ends {train_end}, Test starts {test_start} ({len(test_idx)} days)")
# WRONG: using future-derived features
# df["next_day_value"] = df["value"].shift(-1) # LEAK!
# df["rolling_mean"] = df["value"].rolling(7).mean() # OK if no shift
# CORRECT: only use past data
df["lag_1"] = df["value"].shift(1) # yesterday's value
df["lag_7"] = df["value"].shift(7) # last week's value
df["rolling_7_mean"] = df["value"].shift(1).rolling(7).mean() # shift first!
print(df[["date", "value", "lag_1", "rolling_7_mean"]].head(10))def leakage_audit(pipeline_description):
"""Run through a data leakage audit checklist."""
checks = [
("Train-test split happens BEFORE any preprocessing", True),
("Scaler fit only on training data", True),
("Feature selection uses only training data", True),
("Target encoding uses only training fold", True),
("No future data used in features (time series)", True),
("No features derived directly from target", True),
("Cross-validation uses Pipeline (not manual steps)", True),
("Time series uses TimeSeriesSplit (not random)", True),
("Imputation statistics from training data only", True),
("Outlier thresholds computed on training data only", True),
]
print("DATA LEAKAGE AUDIT")
print("=" * 50)
all_pass = True
for check, expected in checks:
status = "PASS" if expected else "FAIL"
if not expected:
all_pass = False
print(f" [{status}] {check}")
print(f"\nResult: {'ALL CLEAR' if all_pass else 'LEAKAGE DETECTED'}")
leakage_audit("example pipeline")
# Key rule of thumb:
# If your model performs MUCH better in dev than production,
# you probably have data leakage somewhere.
print("\nRule: If dev performance >> production performance = LEAKAGE")Combine all cleaning and feature engineering steps into a single reproducible sklearn Pipeline. This ensures consistent preprocessing in training and production.
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
np.random.seed(42)
n = 500
df = pd.DataFrame({
"age": np.where(np.random.random(n) > 0.9, np.nan, np.random.normal(35, 10, n)),
"income": np.random.lognormal(10.5, 0.8, n),
"score": np.random.normal(500, 100, n),
"education": np.random.choice(["HS", "BS", "MS", "PhD", None], n),
"city": np.random.choice(["NYC", "LA", "Chicago", "Houston"], n),
"target": np.random.choice([0, 1], n),
})
num_features = ["age", "income", "score"]
ord_features = ["education"]
cat_features = ["city"]
preprocessor = ColumnTransformer([
("num", Pipeline([
("impute", KNNImputer(n_neighbors=5)),
("scale", StandardScaler()),
]), num_features),
("ord", Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OrdinalEncoder(categories=[["HS", "BS", "MS", "PhD"]])),
]), ord_features),
("cat", Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OneHotEncoder(drop="first", sparse_output=False)),
]), cat_features),
])
# Full pipeline: preprocess β select β model
full_pipeline = Pipeline([
("preprocess", preprocessor),
("select", SelectKBest(f_classif, k=5)),
("model", RandomForestClassifier(n_estimators=100, random_state=42)),
])
X = df.drop("target", axis=1)
y = df["target"]
scores = cross_val_score(full_pipeline, X, y, cv=5, scoring="accuracy")
print(f"Pipeline CV accuracy: {scores.mean():.4f} (+/- {scores.std():.4f})")import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import joblib
# Build and train pipeline
np.random.seed(42)
X_train = np.random.normal(0, 1, (200, 5))
y_train = (X_train[:, 0] > 0).astype(int)
pipe = Pipeline([
("scaler", StandardScaler()),
("model", LogisticRegression()),
])
pipe.fit(X_train, y_train)
# Save entire pipeline (preprocessing + model)
joblib.dump(pipe, "model_pipeline.pkl")
print("Pipeline saved!")
# Load in production
loaded_pipe = joblib.load("model_pipeline.pkl")
X_new = np.random.normal(0, 1, (5, 5))
predictions = loaded_pipe.predict(X_new)
print(f"Predictions: {predictions}")
# The loaded pipeline includes the fitted scaler!
# No need to separately save/load preprocessing steps
import os; os.remove("model_pipeline.pkl")import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, RobustScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
# Custom transformer for outlier capping
class OutlierCapper(BaseEstimator, TransformerMixin):
def __init__(self, lower_pct=0.01, upper_pct=0.99):
self.lower_pct = lower_pct
self.upper_pct = upper_pct
def fit(self, X, y=None):
self.lower_ = np.nanpercentile(X, self.lower_pct * 100, axis=0)
self.upper_ = np.nanpercentile(X, self.upper_pct * 100, axis=0)
return self
def transform(self, X):
X = np.array(X, dtype=float)
return np.clip(X, self.lower_, self.upper_)
# Sample data
np.random.seed(42)
n = 1000
df = pd.DataFrame({
"age": np.where(np.random.random(n) > 0.9, np.nan, np.random.normal(35, 10, n)),
"income": np.concatenate([np.random.lognormal(10.5, 0.8, 980), [1e8]*20]),
"tenure": np.random.exponential(3, n),
"plan": np.random.choice(["basic", "premium", "enterprise", None], n),
"churned": np.random.choice([0, 1], n, p=[0.85, 0.15]),
})
num_cols = ["age", "income", "tenure"]
cat_cols = ["plan"]
pipeline = Pipeline([
("preprocess", ColumnTransformer([
("num", Pipeline([
("impute", SimpleImputer(strategy="median")),
("cap", OutlierCapper()),
("scale", RobustScaler()),
]), num_cols),
("cat", Pipeline([
("impute", SimpleImputer(strategy="constant", fill_value="unknown")),
("encode", OneHotEncoder(drop="first", sparse_output=False, handle_unknown="ignore")),
]), cat_cols),
])),
("model", GradientBoostingClassifier(n_estimators=100, random_state=42)),
])
X = df.drop("churned", axis=1)
y = df["churned"]
scores = cross_val_score(pipeline, X, y, cv=5, scoring="f1")
print(f"Full pipeline F1: {scores.mean():.4f} (+/- {scores.std():.4f})")import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
np.random.seed(42)
df = pd.DataFrame({
"feature_1": np.where(np.random.random(300) > 0.85, np.nan, np.random.normal(0, 1, 300)),
"feature_2": np.random.lognormal(2, 1, 300),
"feature_3": np.random.choice(["A", "B", "C", None], 300),
"feature_4": np.random.choice(["low", "medium", "high"], 300),
"target": np.random.choice([0, 1], 300),
})
# TODO: Define numeric and categorical columns
# TODO: Build preprocessor with ColumnTransformer
# TODO: Build full pipeline with preprocessor + model
# TODO: Evaluate with cross_val_score
# TODO: Print results