Loading Module…

πŸ“ˆ Time Series Analysis

24 topics • Click any card to expand

1. DateTime Indexing

Pandas DatetimeIndex enables powerful time-based selection, alignment, and operations. Use pd.to_datetime() to parse dates and set_index() to create a time series.

Creating a DatetimeIndex
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
dates = pd.date_range('2024-01-01', periods=10, freq='D')
s = pd.Series(rng.integers(100, 500, 10), index=dates)
print(s)
print('\nIndex type:', type(s.index))
print('Freq:', s.index.freq)
print('First date:', s.index[0])
print('Last date:', s.index[-1])
Parsing and indexing with pd.to_datetime
import pandas as pd
import numpy as np

rng = np.random.default_rng(7)
df = pd.DataFrame({
    'date':    ['2024-01-15', '2024-02-20', '2024-03-05', '2024-04-10'],
    'revenue': rng.integers(1000, 9999, 4),
    'units':   rng.integers(10, 200, 4),
})
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
print(df)
print('\n.dt accessor on a column:')
df2 = df.reset_index()
df2['year']  = df2['date'].dt.year
df2['month'] = df2['date'].dt.month
df2['dow']   = df2['date'].dt.day_name()
print(df2[['date','year','month','dow']])
Time-based slicing and selection
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
idx = pd.date_range('2023-01-01', '2024-12-31', freq='D')
ts = pd.Series(rng.normal(100, 15, len(idx)), index=idx)

# Slice by string
print('Jan 2024:', ts['2024-01'].shape)
print('Q1 2024:', ts['2024-01':'2024-03'].shape)
print('A specific week:', ts['2024-06-10':'2024-06-16'].values.round(2))

# Boolean mask
print('\nDays with value > 130:', (ts > 130).sum())
print('Weekend values mean:', ts[ts.index.dayofweek >= 5].mean().round(2))
Time zones and date arithmetic
import pandas as pd
import numpy as np

# Create UTC series and convert timezone
idx = pd.date_range('2024-01-01 00:00', periods=6, freq='6h', tz='UTC')
ts = pd.Series(range(6), index=idx)
print('UTC:')
print(ts)

# Convert to US/Eastern
ts_eastern = ts.tz_convert('US/Eastern')
print('\nUS/Eastern:')
print(ts_eastern)

# Date arithmetic
today = pd.Timestamp('2024-06-15')
print('\n30 days later:', today + pd.Timedelta(days=30))
print('Next Monday:', today + pd.offsets.Week(weekday=0))
print('End of month:', today + pd.offsets.MonthEnd(0))

# Period vs Timestamp
period = pd.Period('2024-Q2', freq='Q')
print('\nQ2 2024 start:', period.start_time)
print('Q2 2024 end:  ', period.end_time)
💼 Real-World Scenario
An e-commerce analyst needs to parse mixed-format sales timestamps, localize to UTC, and create a clean DatetimeIndex for downstream aggregation.
Real-World Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
raw = pd.DataFrame({
    'timestamp': ['2024-01-15 09:32', '2024/02/20 14:05', 'March 5, 2024 11:00', '2024-04-10T08:45:00'],
    'store':     ['NYC', 'LA', 'CHI', 'HOU'],
    'amount':    rng.uniform(50, 500, 4).round(2),
})

# Parse mixed formats
raw['ts_parsed'] = pd.to_datetime(raw['timestamp'], infer_datetime_format=True)
raw = raw.set_index('ts_parsed').sort_index()
raw.index = raw.index.tz_localize('US/Eastern').tz_convert('UTC')

print('Cleaned time series:')
print(raw[['store', 'amount']])
print('\nIndex timezone:', raw.index.tz)
print('Date range:', raw.index[0], 'to', raw.index[-1])
🏋️ Practice: Custom DatetimeIndex
Create a DatetimeIndex for every Monday in 2024. Build a Series with random weekly sales (uniform 1000-5000). Print the total sales per quarter using .resample('QE').sum().
Starter Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
# TODO: create weekly DatetimeIndex (Mondays only) for 2024
# Hint: pd.date_range(..., freq='W-MON')

# TODO: create Series with random sales

# TODO: resample to quarterly and print
✅ Practice Checklist
2. Resampling & Frequency Conversion

Resampling changes the time frequency of a series β€” downsampling aggregates to lower frequency, upsampling interpolates to higher frequency.

Downsampling with resample()
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
daily = pd.Series(rng.uniform(100, 500, 365), index=idx, name='sales')

# Downsample to different frequencies
print('Weekly sum (first 4):')
print(daily.resample('W').sum().head(4).round(2))

print('\nMonthly stats:')
monthly = daily.resample('ME').agg(['sum','mean','max','min'])
print(monthly.round(2))

print('\nQuarterly mean:')
print(daily.resample('QE').mean().round(2))
OHLC resampling for financial data
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
price = 100 + np.cumsum(rng.normal(0, 1.5, 365))
ts = pd.Series(price, index=idx, name='price')

# OHLC aggregation
weekly_ohlc = ts.resample('W').ohlc()
print('Weekly OHLC (first 5):')
print(weekly_ohlc.head().round(2))

# Custom aggregation
monthly_custom = ts.resample('ME').agg(
    open  = ('first'),
    close = ('last'),
    high  = ('max'),
    low   = ('min'),
    range = (lambda x: x.max() - x.min()),
)
print('\nMonthly custom agg:')
print(monthly_custom.round(2))
Upsampling and interpolation
import pandas as pd
import numpy as np

rng = np.random.default_rng(7)
# Monthly data β†’ daily via upsampling
monthly_idx = pd.date_range('2024-01-01', periods=6, freq='ME')
monthly = pd.Series([120, 135, 128, 142, 156, 149], index=monthly_idx, name='revenue')

# Forward fill
daily_ffill = monthly.resample('D').ffill()
# Linear interpolation
daily_interp = monthly.resample('D').interpolate('linear')
# Cubic interpolation
daily_cubic = monthly.resample('D').interpolate('cubic')

print('Monthly original:')
print(monthly)
print('\nDaily (first 10, forward fill):')
print(daily_ffill.head(10).round(2))
print('\nDaily (first 10, linear interp):')
print(daily_interp.head(10).round(2))
asfreq() vs resample() and timezone-aware resampling
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=24*7, freq='h', tz='UTC')
hourly = pd.Series(rng.uniform(10, 100, len(idx)), index=idx)

# asfreq β€” select existing values at new frequency (no aggregation)
every_6h = hourly.asfreq('6h')
print('asfreq every 6h (first 8):')
print(every_6h.head(8).round(2))

# Resample by time-of-day
hourly_mean = hourly.groupby(hourly.index.hour).mean()
print('\nMean by hour of day:')
for h, v in hourly_mean.items():
    bar = 'β–ˆ' * int(v / 10)
    print(f'  {h:02d}h: {v:5.1f} {bar}')
💼 Real-World Scenario
A retail analyst has daily transaction data for 3 years and needs to compute weekly, monthly, and quarterly revenue summaries with growth rates for an executive report.
Real-World Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2022-01-01', '2024-12-31', freq='D')
noise = rng.normal(0, 20, len(idx))
trend = np.linspace(500, 800, len(idx))
seasonal = 80 * np.sin(2 * np.pi * np.arange(len(idx)) / 365)
daily = pd.Series(trend + seasonal + noise, index=idx, name='revenue').clip(0)

# Aggregate
weekly  = daily.resample('W').sum()
monthly = daily.resample('ME').sum()
quarterly = daily.resample('QE').sum()

# Growth rates
monthly_growth = monthly.pct_change() * 100
quarterly_growth = quarterly.pct_change() * 100

print('Monthly revenue with MoM growth (last 6):')
report = pd.DataFrame({'revenue': monthly, 'mom_growth_%': monthly_growth}).tail(6)
print(report.round(2))
print('\nQuarterly totals with QoQ growth:')
print(pd.DataFrame({'revenue': quarterly, 'qoq_%': quarterly_growth}).round(2))
🏋️ Practice: Resample and Compare
Generate hourly temperature data for January 2024 (mean=5Β°C, std=8Β°C, with a sine wave daily cycle). Resample to daily min/mean/max. Find the coldest and warmest day.
Starter Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', '2024-01-31 23:00', freq='h')
# TODO: create hourly temp with sine daily cycle + noise

# TODO: resample to daily min/mean/max

# TODO: print coldest and warmest day
✅ Practice Checklist
3. Rolling & Expanding Windows

Rolling windows compute statistics over a sliding fixed-length window. Expanding windows grow from the start of the series. Both are essential for smoothing and feature engineering.

Rolling mean and standard deviation
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 15, 30), index=idx, name='value')

rm7  = ts.rolling(7).mean()
rm14 = ts.rolling(14).mean()
std7 = ts.rolling(7).std()

result = pd.DataFrame({'raw': ts, 'ma7': rm7, 'ma14': rm14, 'std7': std7})
print(result.tail(10).round(2))
print('\nNaN count from rolling(7):', rm7.isna().sum())
Rolling min_periods and center
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
ts = pd.Series(rng.normal(50, 10, 20), name='signal')

# min_periods: require at least 3 valid values
r_minp = ts.rolling(7, min_periods=3).mean()
# center=True: window centered on current observation
r_center = ts.rolling(7, center=True).mean()

print('Standard rolling(7): first 7 values')
print(ts.rolling(7).mean().head(7).round(2).tolist())
print('\nWith min_periods=3: first 7 values')
print(r_minp.head(7).round(2).tolist())
print('\nCentered rolling(7): first 7 values')
print(r_center.head(7).round(2).tolist())

# Rolling correlation between two series
s2 = pd.Series(rng.normal(50, 10, 20))
print('\nRolling correlation (last 5):')
print(ts.rolling(5).corr(s2).tail().round(3).tolist())
Expanding windows and cumulative stats
import pandas as pd
import numpy as np

rng = np.random.default_rng(7)
ts = pd.Series(rng.exponential(100, 20), name='revenue')

exp_mean = ts.expanding().mean()
exp_std  = ts.expanding().std()
exp_max  = ts.expanding().max()
cumsum   = ts.cumsum()

result = pd.DataFrame({
    'revenue':    ts.round(2),
    'cum_mean':   exp_mean.round(2),
    'cum_std':    exp_std.round(2),
    'cum_max':    exp_max.round(2),
    'cumsum':     cumsum.round(2),
})
print(result)
Exponentially weighted moving average (EWMA)
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 20, 30) + np.linspace(0, 30, 30), index=idx)

# EWM with different spans
ewm5  = ts.ewm(span=5,  adjust=False).mean()
ewm14 = ts.ewm(span=14, adjust=False).mean()
# EWM with alpha (decay factor directly)
ewm_a = ts.ewm(alpha=0.3, adjust=False).mean()

result = pd.DataFrame({'raw': ts, 'ewm5': ewm5, 'ewm14': ewm14, 'ewm_a0.3': ewm_a})
print(result.round(2))
print('\nEWM vs SMA final values:')
print(f'  EWM(span=5):  {ewm5.iloc[-1]:.2f}')
print(f'  SMA(window=5):{ts.rolling(5).mean().iloc[-1]:.2f}')
💼 Real-World Scenario
A supply chain analyst needs to flag anomalous daily order volumes by computing a 14-day rolling mean and standard deviation, then marking any day beyond 2 standard deviations as an outlier.
Real-World Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=90, freq='D')
orders = pd.Series(rng.normal(500, 60, 90), index=idx, name='orders')
# Inject anomalies
orders.iloc[15] = 950
orders.iloc[45] = 120
orders.iloc[72] = 880

roll = orders.rolling(14)
rm   = roll.mean()
rs   = roll.std()
upper = rm + 2 * rs
lower = rm - 2 * rs

anomalies = orders[(orders > upper) | (orders < lower)]
print(f'Total days: {len(orders)}, Anomalies detected: {len(anomalies)}')
print('\nAnomalous days:')
for date, val in anomalies.items():
    print(f'  {date.date()}: {val:.0f} orders (mean={rm[date]:.0f}, '
          f'bounds=[{lower[date]:.0f}, {upper[date]:.0f}])')
🏋️ Practice: Bollinger Bands
Generate 60 days of synthetic stock price data starting at $100 with daily returns from N(0.001, 0.02). Compute 20-day SMA and 2-std Bollinger Bands. Print the last 10 rows showing price, upper band, lower band, and whether the price is outside the bands.
Starter Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=60, freq='B')
# TODO: create price series with cumulative returns

# TODO: compute 20-day SMA and Bollinger Bands

# TODO: flag days outside bands and print last 10 rows
✅ Practice Checklist
4. Time Series Visualization

Visualizing time series reveals trends, seasonality, and anomalies. Key plots include line charts, seasonal decomposition, ACF/PACF correlograms, and lag plots.

Basic time series plot with annotations
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=365, freq='D')
ts = pd.Series(
    100 + np.linspace(0, 50, 365) + 20 * np.sin(2*np.pi*np.arange(365)/365) + rng.normal(0, 5, 365),
    index=idx, name='sales'
)

fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(ts.index, ts.values, lw=1, color='steelblue', label='Daily')
ax.plot(ts.rolling(30).mean().index, ts.rolling(30).mean(), lw=2, color='tomato', label='30-day MA')
ax.axvline(pd.Timestamp('2023-06-21'), color='green', ls='--', lw=1.5, label='Summer Solstice')
ax.fill_between(ts.index, ts.rolling(30).mean() - ts.rolling(30).std(),
                ts.rolling(30).mean() + ts.rolling(30).std(), alpha=0.2, color='tomato')
ax.set(title='Sales Time Series with 30-Day Moving Average', xlabel='Date', ylabel='Sales')
ax.legend(); ax.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('ts_basic.png', dpi=100)
plt.close()
print('Saved ts_basic.png')
Seasonal subseries plot
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
monthly = pd.Series(
    200 + 50*np.sin(2*np.pi*np.arange(36)/12) + rng.normal(0, 10, 36),
    index=idx, name='revenue'
)

fig, axes = plt.subplots(3, 4, figsize=(14, 8), sharey=True)
month_names = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m in range(1, 13):
    ax = axes[(m-1)//4][(m-1)%4]
    data = monthly[monthly.index.month == m]
    ax.bar(range(len(data)), data.values, color='steelblue', alpha=0.7)
    ax.axhline(data.mean(), color='tomato', lw=2)
    ax.set_title(month_names[m-1], fontsize=10)
    ax.set_xticks([])
plt.suptitle('Seasonal Subseries Plot β€” Monthly Revenue', fontsize=13)
plt.tight_layout()
plt.savefig('ts_seasonal_sub.png', dpi=100)
plt.close()
print('Saved ts_seasonal_sub.png')
ACF and PACF correlograms
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

try:
    from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
    rng = np.random.default_rng(42)
    # AR(2) process: y_t = 0.7*y_{t-1} - 0.3*y_{t-2} + noise
    n = 200
    y = np.zeros(n)
    eps = rng.normal(0, 1, n)
    for t in range(2, n):
        y[t] = 0.7*y[t-1] - 0.3*y[t-2] + eps[t]
    ts = pd.Series(y)

    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    plot_acf(ts, lags=20, ax=axes[0], title='ACF β€” AR(2) Process')
    plot_pacf(ts, lags=20, ax=axes[1], title='PACF β€” AR(2) Process', method='ywm')
    plt.tight_layout()
    plt.savefig('ts_acf.png', dpi=100)
    plt.close()
    print('Saved ts_acf.png')
    print('ACF at lag 1:', round(ts.autocorr(1), 3))
    print('ACF at lag 2:', round(ts.autocorr(2), 3))
except ImportError:
    print('statsmodels not installed β€” pip install statsmodels')
    import pandas as pd, numpy as np
    ts = pd.Series(np.random.randn(100))
    print('Manual ACF at lags 1-5:')
    for lag in range(1, 6):
        print(f'  lag {lag}: {ts.autocorr(lag):.3f}')
Lag plot and return distribution
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=100, freq='B')
price = 100 + np.cumsum(rng.normal(0, 1.5, 100))
ts = pd.Series(price, index=idx)
returns = ts.pct_change().dropna() * 100

fig, axes = plt.subplots(1, 3, figsize=(14, 4))
# Lag plot
axes[0].scatter(ts.values[:-1], ts.values[1:], alpha=0.5, s=20, color='steelblue')
axes[0].set(title='Lag Plot (lag=1)', xlabel='Price(t)', ylabel='Price(t+1)')

# Return distribution
axes[1].hist(returns, bins=20, color='coral', edgecolor='white', alpha=0.8)
axes[1].set(title='Daily Return Distribution', xlabel='Return (%)', ylabel='Count')

# Rolling volatility
vol = returns.rolling(10).std()
axes[2].plot(vol.index, vol, color='purple', lw=1.5)
axes[2].set(title='10-Day Rolling Volatility', xlabel='Date', ylabel='Std Dev (%)')
axes[2].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.savefig('ts_lag_dist.png', dpi=100)
plt.close()
print('Saved ts_lag_dist.png')
print(f'Return stats: mean={returns.mean():.2f}%, std={returns.std():.2f}%, skew={returns.skew():.2f}')
💼 Real-World Scenario
A business analyst needs to visualize 2 years of weekly e-commerce sales showing trend, seasonality, rolling average, and year-over-year comparison in a single dashboard.
Real-World Code
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-02', periods=104, freq='W-MON')
trend = np.linspace(5000, 8000, 104)
seasonal = 1500 * np.sin(2*np.pi*np.arange(104)/52)
sales = pd.Series(trend + seasonal + rng.normal(0, 200, 104), index=idx, name='sales')

y2023 = sales['2023'].values
y2024 = sales['2024'].values[:len(y2023)]

fig, axes = plt.subplots(2, 1, figsize=(14, 8))
axes[0].plot(sales.index, sales, alpha=0.4, lw=1, color='steelblue', label='Weekly')
axes[0].plot(sales.rolling(8).mean().index, sales.rolling(8).mean(), lw=2, color='tomato', label='8-wk MA')
axes[0].set(title='Weekly E-Commerce Sales (2023-2024)', ylabel='Sales ($)')
axes[0].legend(); axes[0].grid(alpha=0.3)

weeks = range(1, min(len(y2023), len(y2024))+1)
axes[1].plot(weeks, y2023[:len(weeks)], label='2023', color='steelblue', lw=2)
axes[1].plot(weeks, y2024[:len(weeks)], label='2024', color='tomato', lw=2)
axes[1].set(title='Year-over-Year Comparison', xlabel='Week', ylabel='Sales ($)')
axes[1].legend(); axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('ts_dashboard.png', dpi=100)
plt.close()
print('Saved ts_dashboard.png')
yoy_growth = (y2024.mean() / y2023.mean() - 1) * 100
print(f'YoY growth: {yoy_growth:+.1f}%')
🏋️ Practice: Multi-Series Time Plot
Generate 52 weeks of data for three products (A, B, C) with different trends and seasonalities. Plot all three on the same axis with a legend. Add a 4-week rolling average for each. Save as 'practice_ts.png'.
Starter Code
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=52, freq='W')
# TODO: create 3 product series with different patterns

# TODO: plot all 3 with rolling averages

plt.savefig('practice_ts.png', dpi=100)
plt.close()
print('Saved practice_ts.png')
✅ Practice Checklist
5. Trend & Seasonality Decomposition

Decomposition separates a time series into trend, seasonal, and residual components. Additive decomposition (Y = T + S + R) works for constant amplitude seasonality; multiplicative (Y = T Γ— S Γ— R) for growing amplitude.

Classical decomposition with statsmodels
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.seasonal import seasonal_decompose
    rng = np.random.default_rng(42)
    idx = pd.date_range('2021-01-01', periods=48, freq='ME')
    trend_comp = np.linspace(100, 180, 48)
    seasonal_comp = 20 * np.sin(2*np.pi*np.arange(48)/12)
    noise = rng.normal(0, 5, 48)
    ts = pd.Series(trend_comp + seasonal_comp + noise, index=idx)

    result = seasonal_decompose(ts, model='additive', period=12)
    print('Decomposition components (first 6 months):')
    df = pd.DataFrame({
        'observed': result.observed,
        'trend':    result.trend,
        'seasonal': result.seasonal,
        'resid':    result.resid,
    }).dropna()
    print(df.head(6).round(2))
    print(f'\nTrend range: {result.trend.dropna().min():.1f} - {result.trend.dropna().max():.1f}')
    print(f'Seasonal amplitude: {result.seasonal.max():.1f}')
    print(f'Residual std: {result.resid.dropna().std():.2f}')
except ImportError:
    print('pip install statsmodels')
Additive vs multiplicative decomposition
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.seasonal import seasonal_decompose
    rng = np.random.default_rng(0)
    idx = pd.date_range('2020-01-01', periods=60, freq='ME')
    t = np.arange(60)
    trend = 100 + 2*t
    seasonal = 0.3 * np.sin(2*np.pi*t/12)  # proportional to trend

    additive_ts = pd.Series(trend + 20*np.sin(2*np.pi*t/12) + rng.normal(0,3,60), index=idx)
    multiplicative_ts = pd.Series(trend * (1 + seasonal) + rng.normal(0,3,60), index=idx)

    for name, ts, model in [('Additive TS', additive_ts, 'additive'),
                             ('Multiplicative TS', multiplicative_ts, 'multiplicative')]:
        dec = seasonal_decompose(ts, model=model, period=12)
        resid_std = dec.resid.dropna().std()
        print(f'{name} ({model}): residual std = {resid_std:.3f}')
        print(f'  Seasonal max = {dec.seasonal.max():.3f}')
except ImportError:
    print('pip install statsmodels')
    import numpy as np
    t = np.arange(60)
    ts = 100 + 2*t + 20*np.sin(2*np.pi*t/12) + np.random.normal(0,3,60)
    print('Manual trend (linear fit):')
    coeffs = np.polyfit(t, ts, 1)
    print(f'  slope={coeffs[0]:.2f}, intercept={coeffs[1]:.2f}')
STL decomposition (robust to outliers)
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.seasonal import STL
    rng = np.random.default_rng(42)
    idx = pd.date_range('2020-01-01', periods=60, freq='ME')
    ts = pd.Series(
        100 + np.linspace(0, 40, 60) + 15*np.sin(2*np.pi*np.arange(60)/12) + rng.normal(0, 4, 60),
        index=idx
    )
    # Add outlier
    ts.iloc[24] = 250

    stl = STL(ts, period=12, robust=True)
    result = stl.fit()
    print('STL Decomposition (robust=True):')
    df = pd.DataFrame({'trend': result.trend, 'seasonal': result.seasonal, 'resid': result.resid})
    print(df.tail(6).round(2))
    print(f'\nMax residual (outlier detected at): {result.resid.abs().idxmax().date()}')
    print(f'Residual value: {result.resid.abs().max():.1f}')
except ImportError:
    print('pip install statsmodels')
Extracting and using trend for forecasting
import pandas as pd
import numpy as np

rng = np.random.default_rng(7)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
t = np.arange(36)
ts = pd.Series(100 + 3*t + 15*np.sin(2*np.pi*t/12) + rng.normal(0, 5, 36), index=idx)

# Fit linear trend
coeffs = np.polyfit(t, ts, 1)
trend  = np.polyval(coeffs, t)
detrended = ts.values - trend

# Seasonal component (average over each month)
seasonal = np.zeros(12)
for m in range(12):
    seasonal[m] = detrended[m::12].mean()

# Reconstruct and forecast next 6 months
t_fut = np.arange(36, 42)
trend_fut    = np.polyval(coeffs, t_fut)
seasonal_fut = seasonal[t_fut % 12]
forecast     = trend_fut + seasonal_fut

print(f'Trend: slope={coeffs[0]:.2f}/month, intercept={coeffs[1]:.2f}')
print('\nSeasonal pattern (monthly avg deviation):')
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m, (name, val) in enumerate(zip(months, seasonal)):
    print(f'  {name}: {val:+.1f}')
print('\nForecast (next 6 months):')
fut_idx = pd.date_range('2025-01-01', periods=6, freq='ME')
for d, v in zip(fut_idx, forecast):
    print(f'  {d.strftime("%b %Y")}: {v:.1f}')
💼 Real-World Scenario
An energy analyst decomposes hourly electricity consumption data to isolate the weekly seasonal pattern, long-term trend, and irregular spikes for demand forecasting.
Real-World Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.seasonal import STL
    rng = np.random.default_rng(42)
    idx = pd.date_range('2024-01-01', periods=52*7, freq='D')
    t = np.arange(len(idx))
    trend_comp   = 500 + 0.3 * t
    weekly_seas  = 80 * np.sin(2*np.pi*t/7)
    annual_seas  = 120 * np.sin(2*np.pi*t/365)
    consumption  = pd.Series(trend_comp + weekly_seas + annual_seas + rng.normal(0, 15, len(t)), index=idx)

    stl = STL(consumption, period=7, robust=True)
    res = stl.fit()

    weekly_pattern = pd.Series(res.seasonal).groupby(pd.date_range('2024-01-01', periods=len(idx), freq='D').dayofweek).mean()
    days = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
    print('Weekly consumption pattern (avg deviation from trend):')
    for d, v in zip(days, weekly_pattern.reindex(range(7))):
        bar = 'β–ˆ' * int(abs(v) / 10)
        print(f'  {d}: {v:+.1f} kWh {bar}')
    print(f'\nOverall trend: +{(res.trend.iloc[-1] - res.trend.iloc[0]):.1f} kWh over the period')
    print(f'Residual std (noise): {res.resid.std():.1f} kWh')
except ImportError:
    print('pip install statsmodels')
🏋️ Practice: Decompose and Evaluate
Generate 3 years of monthly sales data with an upward trend and strong December seasonality (add 200 to December values). Use seasonal_decompose with period=12. Print the seasonal indices for each month. Which month has the highest seasonal component?
Starter Code
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
# TODO: create monthly sales with trend + December spike

# TODO: decompose

# TODO: print seasonal indices by month
✅ Practice Checklist
6. Stationarity & Differencing

Stationarity means constant mean, variance, and autocorrelation over time. Most statistical forecasting models require stationary input. Use the ADF test to check, and differencing to achieve stationarity.

ADF test for stationarity
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.stattools import adfuller
    rng = np.random.default_rng(42)
    # Non-stationary: random walk
    rw = pd.Series(np.cumsum(rng.normal(0, 1, 100)))
    # Stationary: white noise
    wn = pd.Series(rng.normal(0, 1, 100))

    for name, ts in [('Random Walk', rw), ('White Noise', wn)]:
        result = adfuller(ts, autolag='AIC')
        print(f'{name}:')
        print(f'  ADF Statistic: {result[0]:.4f}')
        print(f'  p-value:       {result[1]:.4f}')
        print(f'  Critical 5%:   {result[4]["5%"]:.4f}')
        conclusion = 'STATIONARY' if result[1] < 0.05 else 'NON-STATIONARY'
        print(f'  Conclusion:    {conclusion}')
        print()
except ImportError:
    print('pip install statsmodels')
    import numpy as np
    rng = np.random.default_rng(42)
    ts = np.cumsum(rng.normal(0, 1, 100))
    # Simple manual check: variance of first vs second half
    print('Variance first half:', np.var(ts[:50]).round(2))
    print('Variance second half:', np.var(ts[50:]).round(2))
First and second differencing
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.stattools import adfuller
    rng = np.random.default_rng(0)
    idx = pd.date_range('2020-01-01', periods=60, freq='ME')
    # Trend + noise (non-stationary)
    ts = pd.Series(100 + 2*np.arange(60) + rng.normal(0, 5, 60), index=idx)

    diff1 = ts.diff().dropna()
    diff2 = ts.diff().diff().dropna()

    for name, series in [('Original', ts), ('1st Diff', diff1), ('2nd Diff', diff2)]:
        adf = adfuller(series, autolag='AIC')
        stat = 'STATIONARY' if adf[1] < 0.05 else 'NON-STATIONARY'
        print(f'{name:10s}: ADF={adf[0]:7.3f}, p={adf[1]:.4f} β†’ {stat}')

    print('\nDescriptive stats after 1st differencing:')
    print(diff1.describe().round(3))
except ImportError:
    print('pip install statsmodels')
    import numpy as np
    ts = np.cumsum(np.random.randn(60)) + np.arange(60)
    diff1 = np.diff(ts)
    print(f'Original mean: {ts.mean():.2f}, Diff mean: {diff1.mean():.2f}')
    print(f'Original std:  {ts.std():.2f}, Diff std:  {diff1.std():.2f}')
Log transform for variance stabilization
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2020-01-01', periods=48, freq='ME')
# Growing variance (multiplicative process)
t = np.arange(48)
ts = pd.Series(100 * np.exp(0.05*t + rng.normal(0, 0.1, 48)), index=idx)

log_ts  = np.log(ts)
diff_ts = ts.diff().dropna()
log_diff = log_ts.diff().dropna()  # log returns

print('Original series stats:')
print(f'  Std first half: {ts[:24].std():.2f}')
print(f'  Std second half: {ts[24:].std():.2f}')
print('\nLog-transformed stats:')
print(f'  Std first half: {log_ts[:24].std():.4f}')
print(f'  Std second half: {log_ts[24:].std():.4f}')
print('\nLog-differenced (log returns) first 5:')
print(log_diff.head().round(4))
print(f'Annualized log return: {log_diff.mean() * 12:.3f}')
Seasonal differencing
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.stattools import adfuller, kpss
    rng = np.random.default_rng(7)
    idx = pd.date_range('2020-01-01', periods=60, freq='ME')
    ts = pd.Series(
        100 + np.linspace(0, 20, 60) + 30*np.sin(2*np.pi*np.arange(60)/12) + rng.normal(0, 3, 60),
        index=idx
    )
    # Seasonal differencing (lag=12)
    seasonal_diff = ts.diff(12).dropna()
    # Then first difference to remove trend
    both_diff = seasonal_diff.diff().dropna()

    for name, s in [('Original', ts), ('Seasonal diff(12)', seasonal_diff), ('Both diffs', both_diff)]:
        adf_p = adfuller(s)[1]
        print(f'{name:20s}: ADF p={adf_p:.4f} β†’ {"stationary" if adf_p < 0.05 else "non-stationary"}')
except ImportError:
    print('pip install statsmodels')
    import numpy as np
    ts = np.array([100 + 30*np.sin(2*np.pi*i/12) for i in range(60)])
    sdiff = ts[12:] - ts[:-12]
    print('Seasonal diff mean:', sdiff.mean().round(2))
    print('Seasonal diff std:', sdiff.std().round(2))
💼 Real-World Scenario
A data scientist needs to prepare monthly website traffic data for ARIMA modeling β€” check stationarity, apply appropriate transformations, and verify the result.
Real-World Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.stattools import adfuller
    rng = np.random.default_rng(42)
    idx = pd.date_range('2021-01-01', periods=48, freq='ME')
    traffic = pd.Series(
        5000 * np.exp(0.02 * np.arange(48)) +
        2000 * np.sin(2*np.pi*np.arange(48)/12) +
        rng.normal(0, 300, 48),
        index=idx, name='visitors'
    )

    def check_stationarity(series, name):
        adf = adfuller(series.dropna())
        stationary = adf[1] < 0.05
        print(f'{name}: p={adf[1]:.4f} | {"βœ“ stationary" if stationary else "βœ— non-stationary"}')
        return stationary

    # Step 1: check original
    check_stationarity(traffic, 'Original')
    # Step 2: log transform
    log_traffic = np.log(traffic)
    check_stationarity(log_traffic, 'Log-transformed')
    # Step 3: seasonal diff
    log_sdiff = log_traffic.diff(12)
    check_stationarity(log_sdiff, 'Log + seasonal diff(12)')
    # Step 4: regular diff
    log_sdiff_diff = log_sdiff.diff()
    check_stationarity(log_sdiff_diff, 'Log + sdiff + diff')
    print('\nFinal series ready for ARIMA:')
    print(log_sdiff_diff.dropna().describe().round(4))
except ImportError:
    print('pip install statsmodels')
🏋️ Practice: Make It Stationary
Generate 60 months of quarterly sales data: base=500, trend=+5/month, seasonal amplitude=100, noise std=20. Apply the minimum number of transformations to make it stationary (ADF p < 0.05). Print the ADF result at each step.
Starter Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.stattools import adfuller
    rng = np.random.default_rng(0)
    idx = pd.date_range('2020-01-01', periods=60, freq='ME')
    # TODO: create series with trend + seasonality

    # TODO: apply transformations until stationary

    # TODO: print ADF result at each step
except ImportError:
    print('pip install statsmodels')
✅ Practice Checklist
7. ARIMA Modeling

ARIMA(p,d,q) combines AutoRegression (p lags), Integration (d differences for stationarity), and Moving Average (q lagged forecast errors). SARIMA extends this with seasonal components.

Fitting ARIMA with statsmodels
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.arima.model import ARIMA
    rng = np.random.default_rng(42)
    # AR(1) process
    n = 100
    y = np.zeros(n)
    eps = rng.normal(0, 1, n)
    for t in range(1, n):
        y[t] = 0.7 * y[t-1] + eps[t]
    ts = pd.Series(y)

    model = ARIMA(ts, order=(1, 0, 0))
    result = model.fit()
    print(result.summary().tables[1])
    print(f'\nAIC: {result.aic:.2f}')
    print(f'BIC: {result.bic:.2f}')
    forecast = result.forecast(steps=5)
    print('\nForecast (next 5 steps):')
    print(forecast.round(3).tolist())
except ImportError:
    print('pip install statsmodels')
Selecting ARIMA order with AIC
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.arima.model import ARIMA
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(0)
    ts = pd.Series(np.cumsum(rng.normal(0, 1, 80)))  # random walk β†’ d=1

    best_aic = float('inf')
    best_order = None
    results = []
    for p in range(3):
        for q in range(3):
            try:
                m = ARIMA(ts, order=(p, 1, q)).fit()
                results.append((p, 1, q, m.aic))
                if m.aic < best_aic:
                    best_aic, best_order = m.aic, (p, 1, q)
            except:
                pass
    results.sort(key=lambda x: x[3])
    print('Top 5 ARIMA orders by AIC:')
    for p, d, q, aic in results[:5]:
        mark = '<-- best' if (p,d,q) == best_order else ''
        print(f'  ARIMA({p},{d},{q}): AIC={aic:.2f} {mark}')
except ImportError:
    print('pip install statsmodels')
ARIMA forecast with confidence intervals
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.arima.model import ARIMA
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    idx = pd.date_range('2023-01-01', periods=60, freq='ME')
    ts = pd.Series(
        50 + np.linspace(0, 20, 60) + rng.normal(0, 4, 60),
        index=idx
    )
    train, test = ts[:48], ts[48:]
    model  = ARIMA(train, order=(1, 1, 1)).fit()
    fc     = model.get_forecast(steps=12)
    fc_df  = fc.summary_frame(alpha=0.05)

    print('Forecast vs Actual (last 6 months):')
    comparison = pd.DataFrame({
        'actual':   test.values,
        'forecast': fc_df['mean'].values,
        'lower_95': fc_df['mean_ci_lower'].values,
        'upper_95': fc_df['mean_ci_upper'].values,
    }, index=test.index)
    print(comparison.round(2))
    from sklearn.metrics import mean_absolute_error
    mae = mean_absolute_error(test, fc_df['mean'])
    print(f'\nMAE: {mae:.2f}')
except ImportError:
    print('pip install statsmodels scikit-learn')
SARIMA for seasonal data
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(7)
    idx = pd.date_range('2020-01-01', periods=48, freq='ME')
    ts = pd.Series(
        100 + np.linspace(0, 20, 48) + 25*np.sin(2*np.pi*np.arange(48)/12) + rng.normal(0, 4, 48),
        index=idx
    )
    train = ts[:36]
    # SARIMA(1,1,1)(1,1,0,12)
    model  = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,0,12)).fit(disp=False)
    fc     = model.forecast(steps=12)
    actual = ts[36:]

    print('SARIMA Forecast vs Actual:')
    for d, f, a in zip(fc.index, fc.values, actual.values):
        err = f - a
        print(f'  {d.strftime("%b %Y")}: forecast={f:.1f}, actual={a:.1f}, error={err:+.1f}')
    rmse = np.sqrt(np.mean((fc.values - actual.values)**2))
    print(f'\nRMSE: {rmse:.2f}')
except ImportError:
    print('pip install statsmodels')
💼 Real-World Scenario
A demand planner needs to forecast next 3 months of product demand using historical monthly data. They fit SARIMA, evaluate on a holdout set, and report RMSE and MAPE.
Real-World Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    idx = pd.date_range('2021-01-01', periods=36, freq='ME')
    demand = pd.Series(
        500 + np.linspace(0, 100, 36) +
        150 * np.sin(2*np.pi*np.arange(36)/12) +
        rng.normal(0, 20, 36),
        index=idx
    ).clip(0)

    train, test = demand[:30], demand[30:]
    model  = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,0,12)).fit(disp=False)
    fc     = model.forecast(steps=6)

    mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
    rmse = np.sqrt(np.mean((test.values - fc.values)**2))
    print('3-Month Demand Forecast:')
    for d, f, a in zip(fc.index, fc.values, test.values):
        print(f'  {d.strftime("%b %Y")}: forecast={f:.0f}, actual={a:.0f}')
    print(f'\nMAPE: {mape:.1f}%')
    print(f'RMSE: {rmse:.1f} units')
except ImportError:
    print('pip install statsmodels')
🏋️ Practice: Fit and Forecast
Generate 48 months of AR(2) data: y_t = 0.6*y_{t-1} - 0.2*y_{t-2} + noise. Use ARIMA(2,0,0) on the first 36 months. Forecast the last 12 months and compute MAE. Print forecast vs actual.
Starter Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.arima.model import ARIMA
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    # TODO: generate AR(2) process

    # TODO: split train/test, fit ARIMA(2,0,0)

    # TODO: forecast and compute MAE
except ImportError:
    print('pip install statsmodels')
✅ Practice Checklist
8. Exponential Smoothing

Exponential smoothing methods weight recent observations more heavily. Simple ES handles level; Holt's adds trend; Holt-Winters adds seasonality. These are fast, interpretable, and competitive with ARIMA.

Simple Exponential Smoothing
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import SimpleExpSmoothing
    rng = np.random.default_rng(42)
    ts = pd.Series(rng.normal(100, 10, 30))

    for alpha in [0.1, 0.3, 0.7]:
        model = SimpleExpSmoothing(ts, initialization_method='estimated').fit(smoothing_level=alpha, optimized=False)
        fc = model.forecast(3)
        print(f'alpha={alpha}: last fitted={model.fittedvalues.iloc[-1]:.2f}, forecast={fc.values.round(2).tolist()}')

    # Optimal alpha
    optimal = SimpleExpSmoothing(ts, initialization_method='estimated').fit(optimized=True)
    print(f'\nOptimal alpha: {optimal.params["smoothing_level"]:.4f}')
    print(f'Optimal forecast: {optimal.forecast(3).values.round(2).tolist()}')
except ImportError:
    print('pip install statsmodels')
Holt's Linear Trend Method
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import Holt
    rng = np.random.default_rng(0)
    idx = pd.date_range('2024-01-01', periods=24, freq='ME')
    ts = pd.Series(50 + 2*np.arange(24) + rng.normal(0, 3, 24), index=idx)

    # Holt's with additive trend
    for trend in ['add', 'mul']:
        try:
            model = Holt(ts, exponential=(trend=='mul'), initialization_method='estimated').fit(optimized=True)
            fc    = model.forecast(6)
            print(f'Holt ({trend} trend):')
            print(f'  alpha={model.params["smoothing_level"]:.3f}, beta={model.params["smoothing_trend"]:.3f}')
            print(f'  Forecast: {fc.values.round(1).tolist()}')
        except:
            pass
    print('\nActual last 3:', ts.tail(3).values.round(1).tolist())
except ImportError:
    print('pip install statsmodels')
Holt-Winters Triple Exponential Smoothing
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import ExponentialSmoothing
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    idx = pd.date_range('2020-01-01', periods=48, freq='ME')
    ts = pd.Series(
        100 + np.linspace(0, 30, 48) + 25*np.sin(2*np.pi*np.arange(48)/12) + rng.normal(0, 4, 48),
        index=idx
    )
    train, test = ts[:36], ts[36:]

    model = ExponentialSmoothing(
        train, trend='add', seasonal='add', seasonal_periods=12,
        initialization_method='estimated'
    ).fit(optimized=True)

    fc = model.forecast(12)
    mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
    rmse = np.sqrt(np.mean((test.values - fc.values)**2))
    print('Holt-Winters forecast:')
    print(f'  Alpha (level): {model.params["smoothing_level"]:.3f}')
    print(f'  Beta (trend):  {model.params["smoothing_trend"]:.3f}')
    print(f'  Gamma (season):{model.params["smoothing_seasonal"]:.3f}')
    print(f'\nMAPE: {mape:.2f}%')
    print(f'RMSE: {rmse:.2f}')
except ImportError:
    print('pip install statsmodels')
Comparing ES methods on same data
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import SimpleExpSmoothing, Holt, ExponentialSmoothing
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(7)
    idx = pd.date_range('2021-01-01', periods=36, freq='ME')
    ts = pd.Series(
        200 + np.linspace(0, 50, 36) + 30*np.sin(2*np.pi*np.arange(36)/12) + rng.normal(0, 8, 36),
        index=idx
    )
    train, test = ts[:30], ts[30:]
    n_fc = 6

    models = {
        'SES':   SimpleExpSmoothing(train, initialization_method='estimated').fit(optimized=True),
        'Holt':  Holt(train, initialization_method='estimated').fit(optimized=True),
        'HW':    ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12,
                                      initialization_method='estimated').fit(optimized=True),
    }
    print(f'{"Method":<8} {"RMSE":>8} {"MAPE%":>8}')
    print('-' * 28)
    for name, m in models.items():
        fc  = m.forecast(n_fc)
        rmse = np.sqrt(np.mean((test.values - fc.values)**2))
        mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
        print(f'{name:<8} {rmse:>8.2f} {mape:>8.2f}')
except ImportError:
    print('pip install statsmodels')
💼 Real-World Scenario
A retail company applies Holt-Winters to forecast daily store traffic for the next 30 days, accounting for weekly seasonality and a gradual upward trend.
Real-World Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import ExponentialSmoothing
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    idx = pd.date_range('2024-01-01', periods=90, freq='D')
    traffic = pd.Series(
        500 + np.linspace(0, 50, 90) +
        100 * np.sin(2*np.pi*np.arange(90)/7) +
        rng.normal(0, 20, 90),
        index=idx
    ).clip(0).round()

    train, test = traffic[:75], traffic[75:]
    model = ExponentialSmoothing(
        train, trend='add', seasonal='add', seasonal_periods=7,
        initialization_method='estimated'
    ).fit(optimized=True)

    fc = model.forecast(15)
    mae  = np.mean(np.abs(test.values - fc.values))
    mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100

    print('Holt-Winters Daily Traffic Forecast:')
    print(f'  Params: alpha={model.params["smoothing_level"]:.3f}, '
          f'beta={model.params["smoothing_trend"]:.3f}, '
          f'gamma={model.params["smoothing_seasonal"]:.3f}')
    print(f'\nForecast (next 15 days):')
    days = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
    for d, f, a in zip(fc.index, fc.values, test.values):
        print(f'  {d.strftime("%a %b %d")}: forecast={f:.0f}, actual={a:.0f}')
    print(f'\nMAE: {mae:.1f} | MAPE: {mape:.1f}%')
except ImportError:
    print('pip install statsmodels')
🏋️ Practice: Holt-Winters Tuning
Generate 3 years of monthly retail sales with trend and seasonality. Compare Holt-Winters additive vs multiplicative seasonal on a 6-month holdout. Print RMSE for both. Which model wins?
Starter Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import ExponentialSmoothing
    import warnings; warnings.filterwarnings('ignore')
    rng = np.random.default_rng(42)
    idx = pd.date_range('2021-01-01', periods=36, freq='ME')
    # TODO: create monthly sales with additive or multiplicative seasonality

    # TODO: split and fit both models

    # TODO: compare RMSE
except ImportError:
    print('pip install statsmodels')
✅ Practice Checklist
9. Feature Engineering for Time Series

Transforming raw time series into supervised learning features enables ML models (XGBoost, LightGBM) to forecast. Key features: lag values, rolling stats, calendar attributes, and cyclical encoding.

Lag features
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 10, 30), index=idx, name='demand')
df = ts.to_frame()

# Create lag features
for lag in [1, 2, 3, 7, 14]:
    df[f'lag_{lag}'] = df['demand'].shift(lag)

df.dropna(inplace=True)
print(f'Feature matrix shape: {df.shape}')
print(df.head(5).round(2))

# Correlation with target
corr = df.corr()['demand'].drop('demand')
print('\nLag correlations with demand:')
print(corr.round(3))
Rolling statistical features
import pandas as pd
import numpy as np

rng = np.random.default_rng(0)
idx = pd.date_range('2024-01-01', periods=60, freq='D')
ts = pd.Series(rng.normal(500, 80, 60) + np.linspace(0, 50, 60), index=idx, name='sales')
df = ts.to_frame()

# Rolling features
for w in [7, 14]:
    roll = ts.rolling(w)
    df[f'roll_{w}_mean'] = roll.mean()
    df[f'roll_{w}_std']  = roll.std()
    df[f'roll_{w}_min']  = roll.min()
    df[f'roll_{w}_max']  = roll.max()

# Expanding mean
df['expanding_mean'] = ts.expanding().mean()

df.dropna(inplace=True)
print(f'Feature shape: {df.shape}')
print(df.tail(5).round(1))
Calendar and cyclical features
import pandas as pd
import numpy as np

rng = np.random.default_rng(7)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
df = pd.DataFrame({'sales': rng.normal(100, 15, 365)}, index=idx)

# Calendar features
df['day_of_week']  = df.index.dayofweek
df['day_of_month'] = df.index.day
df['month']        = df.index.month
df['quarter']      = df.index.quarter
df['is_weekend']   = (df.index.dayofweek >= 5).astype(int)
df['is_month_end'] = df.index.is_month_end.astype(int)

# Cyclical encoding (preserves periodicity)
df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
df['month_sin'] = np.sin(2 * np.pi * (df['month'] - 1) / 12)
df['month_cos'] = np.cos(2 * np.pi * (df['month'] - 1) / 12)

print('Feature matrix (first 5 rows):')
print(df.head(5).round(3))
print(f'\nTotal features: {df.shape[1]-1}')
print('Weekend mean:', df[df['is_weekend']==1]['sales'].mean().round(2))
print('Weekday mean:', df[df['is_weekend']==0]['sales'].mean().round(2))
ML forecasting pipeline with lag features
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error

rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=200, freq='D')
ts = pd.Series(
    100 + np.linspace(0, 30, 200) + 20*np.sin(2*np.pi*np.arange(200)/7) + rng.normal(0, 5, 200),
    index=idx, name='demand'
)

df = ts.to_frame()
# Lag + rolling features
for lag in [1, 2, 3, 7]: df[f'lag_{lag}'] = ts.shift(lag)
df['roll7_mean'] = ts.rolling(7).mean()
df['roll7_std']  = ts.rolling(7).std()
df['dow'] = ts.index.dayofweek
df['dow_sin'] = np.sin(2*np.pi*df['dow']/7)
df['dow_cos'] = np.cos(2*np.pi*df['dow']/7)
df.dropna(inplace=True)

X = df.drop(columns='demand')
y = df['demand']
split = int(len(df) * 0.8)
X_train, X_test = X.iloc[:split], X.iloc[split:]
y_train, y_test = y.iloc[:split], y.iloc[split:]

model = GradientBoostingRegressor(n_estimators=100, max_depth=4, random_state=42)
model.fit(X_train, y_train)
preds = model.predict(X_test)

mae  = mean_absolute_error(y_test, preds)
rmse = np.sqrt(mean_squared_error(y_test, preds))
print(f'GBM Forecast: MAE={mae:.2f}, RMSE={rmse:.2f}')
print('\nTop feature importances:')
fi = pd.Series(model.feature_importances_, index=X.columns).sort_values(ascending=False)
for feat, imp in fi.head(5).items():
    print(f'  {feat:15s}: {imp:.4f}')
💼 Real-World Scenario
A logistics company builds an ML forecasting model for package delivery volumes using lag features, rolling statistics, and calendar effects as inputs to a gradient boosting model.
Real-World Code
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error

rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=365, freq='D')
volume = pd.Series(
    3000 + np.linspace(0, 500, 365) +
    800 * np.sin(2*np.pi*np.arange(365)/7) +
    500 * (idx.month == 12).astype(float) +  # December peak
    rng.normal(0, 100, 365),
    index=idx
).clip(0)

df = volume.to_frame(name='volume')
for lag in [1, 2, 3, 7, 14]: df[f'lag_{lag}'] = volume.shift(lag)
for w in [7, 14]: df[f'roll_{w}_mean'] = volume.rolling(w).mean()
df['dow']       = volume.index.dayofweek
df['month']     = volume.index.month
df['is_weekend']= (df['dow'] >= 5).astype(int)
df['dow_sin']   = np.sin(2*np.pi*df['dow']/7)
df['dow_cos']   = np.cos(2*np.pi*df['dow']/7)
df.dropna(inplace=True)

X, y = df.drop('volume', axis=1), df['volume']
split = 300
model = GradientBoostingRegressor(n_estimators=200, max_depth=4, random_state=42)
model.fit(X.iloc[:split], y.iloc[:split])
preds = model.predict(X.iloc[split:])
mae = mean_absolute_error(y.iloc[split:], preds)
print(f'MAE: {mae:.0f} packages/day')
fi = pd.Series(model.feature_importances_, index=X.columns).nlargest(5)
print('\nTop features:')
for f, v in fi.items():
    print(f'  {f}: {v:.4f}')
🏋️ Practice: Build a Feature Matrix
Use the hourly energy dataset (simulate: 7 days Γ— 24 hours = 168 hourly readings with daily cycle). Create: lag_1, lag_24 (same hour yesterday), roll_24_mean, hour_of_day, is_weekday. Train a LinearRegression model and print RMSE.
Starter Code
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=168, freq='h')
# TODO: create hourly energy with daily cycle

# TODO: build feature matrix

# TODO: train LinearRegression and print RMSE
✅ Practice Checklist
10. Forecasting Evaluation

Proper evaluation of time series forecasts requires time-ordered splits (no leakage), multiple metrics (MAE, RMSE, MAPE), and ideally walk-forward validation to simulate real deployment.

MAE, RMSE, MAPE, and SMAPE
import numpy as np

actual   = np.array([100, 120, 130, 115, 140, 125, 160, 155, 170, 180])
forecast = np.array([105, 115, 135, 110, 145, 130, 150, 160, 165, 185])

mae   = np.mean(np.abs(actual - forecast))
mse   = np.mean((actual - forecast)**2)
rmse  = np.sqrt(mse)
mape  = np.mean(np.abs((actual - forecast) / actual)) * 100
smape = np.mean(2 * np.abs(actual - forecast) / (np.abs(actual) + np.abs(forecast))) * 100
# R-squared
ss_res = np.sum((actual - forecast)**2)
ss_tot = np.sum((actual - actual.mean())**2)
r2 = 1 - ss_res / ss_tot

print(f'MAE:   {mae:.2f}')
print(f'RMSE:  {rmse:.2f}')
print(f'MAPE:  {mape:.2f}%')
print(f'sMAPE: {smape:.2f}%')
print(f'RΒ²:    {r2:.4f}')
print(f'\nBias (mean error): {(forecast - actual).mean():.2f}')
Train/test split for time series
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge

rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=200, freq='D')
ts  = pd.Series(100 + np.linspace(0,30,200) + rng.normal(0, 8, 200), index=idx)

df = ts.to_frame(name='y')
for lag in [1, 7, 14]: df[f'lag_{lag}'] = ts.shift(lag)
df.dropna(inplace=True)

X = df.drop('y', axis=1).values
y = df['y'].values

# Time-ordered split (NEVER random split for time series!)
n = len(X)
split = int(n * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

model = Ridge().fit(X_train, y_train)
preds = model.predict(X_test)
mae   = np.mean(np.abs(y_test - preds))
rmse  = np.sqrt(np.mean((y_test - preds)**2))
print(f'Train size: {split}, Test size: {n - split}')
print(f'MAE:  {mae:.2f}')
print(f'RMSE: {rmse:.2f}')
print('\nNEVER use random train_test_split for time series!')
print('This leaks future information into training data.')
Walk-forward (expanding window) validation
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge

rng = np.random.default_rng(42)
n   = 120
ts  = pd.Series(100 + np.linspace(0, 20, n) + rng.normal(0, 6, n))

df = ts.to_frame(name='y')
for lag in [1, 2, 3, 7]: df[f'lag_{lag}'] = ts.shift(lag)
df.dropna(inplace=True)

X = df.drop('y', axis=1).values
y = df['y'].values

# Walk-forward validation
min_train = 30
step = 5
errors = []
for test_start in range(min_train, len(X) - step + 1, step):
    X_train, y_train = X[:test_start], y[:test_start]
    X_test,  y_test  = X[test_start:test_start+step], y[test_start:test_start+step]
    model = Ridge().fit(X_train, y_train)
    preds = model.predict(X_test)
    errors.extend(np.abs(y_test - preds).tolist())

print(f'Walk-forward CV ({len(errors)} predictions):')
print(f'  MAE:  {np.mean(errors):.2f}')
print(f'  RMSE: {np.sqrt(np.mean(np.array(errors)**2)):.2f}')
print(f'  Std:  {np.std(errors):.2f}')
TimeSeriesSplit cross-validation with sklearn
import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.ensemble import GradientBoostingRegressor

rng = np.random.default_rng(42)
n   = 200
ts  = pd.Series(100 + np.linspace(0,40,n) + rng.normal(0,8,n))

df = ts.to_frame(name='y')
for lag in [1,2,3,7]: df[f'lag_{lag}'] = ts.shift(lag)
df['roll7'] = ts.rolling(7).mean()
df.dropna(inplace=True)

X = df.drop('y', axis=1)
y = df['y']

tscv = TimeSeriesSplit(n_splits=5, gap=0)
model = GradientBoostingRegressor(n_estimators=50, max_depth=3, random_state=42)
scores = cross_val_score(model, X, y, cv=tscv, scoring='neg_mean_absolute_error')
maes = -scores

print('TimeSeriesSplit CV results (5 folds):')
for i, (mae, (tr, te)) in enumerate(zip(maes, tscv.split(X)), 1):
    print(f'  Fold {i}: train={len(tr)}, test={len(te)}, MAE={mae:.2f}')
print(f'\nMean MAE: {maes.mean():.2f} Β± {maes.std():.2f}')
💼 Real-World Scenario
A data science team evaluates three forecasting models (ARIMA, Holt-Winters, GBM) on 1 year of daily data using walk-forward validation to select the production model.
Real-World Code
import pandas as pd
import numpy as np

try:
    from statsmodels.tsa.holtwinters import ExponentialSmoothing
    from statsmodels.tsa.arima.model import ARIMA
    from sklearn.ensemble import GradientBoostingRegressor
    import warnings; warnings.filterwarnings('ignore')

    rng = np.random.default_rng(42)
    n   = 200
    idx = pd.date_range('2024-01-01', periods=n, freq='D')
    ts  = pd.Series(
        300 + np.linspace(0,50,n) + 60*np.sin(2*np.pi*np.arange(n)/7) + rng.normal(0,15,n),
        index=idx
    )

    train, test = ts[:160], ts[160:]
    results = {}

    # Holt-Winters
    hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=7, initialization_method='estimated').fit(optimized=True)
    results['Holt-Winters'] = hw.forecast(40)

    # GBM with lag features
    df = ts.to_frame(name='y')
    for lag in [1,2,7]: df[f'lag_{lag}'] = ts.shift(lag)
    df.dropna(inplace=True)
    X, y = df.drop('y',axis=1), df['y']
    split = len(train) - 7  # adjust for lag
    gbm = GradientBoostingRegressor(n_estimators=100, random_state=42)
    gbm.fit(X.iloc[:split], y.iloc[:split])
    results['GBM'] = pd.Series(gbm.predict(X.iloc[split:split+40]), index=test.index[:40])

    actual = test.values[:40]
    print('Model Comparison (40-day holdout):')
    print(f'{"Model":<15} {"MAE":>7} {"RMSE":>7} {"MAPE%":>7}')
    print('-' * 38)
    for name, fc in results.items():
        fc_v = fc.values[:40]
        mae  = np.mean(np.abs(actual - fc_v))
        rmse = np.sqrt(np.mean((actual - fc_v)**2))
        mape = np.mean(np.abs((actual - fc_v)/actual))*100
        print(f'{name:<15} {mae:>7.1f} {rmse:>7.1f} {mape:>7.1f}')
except ImportError:
    print('pip install statsmodels scikit-learn')
🏋️ Practice: Cross-Validate a Forecast
Generate 180 days of daily demand data with weekly seasonality. Use TimeSeriesSplit with 5 folds on lag features. Test Ridge, GradientBoosting, and a naive (last value) baseline. Print mean MAE per model.
Starter Code
import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import Ridge
from sklearn.ensemble import GradientBoostingRegressor

rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=180, freq='D')
# TODO: create demand series

# TODO: create feature matrix

# TODO: cross-validate all three models
✅ Practice Checklist
11. Prophet Forecasting

Use Facebook Prophet for additive time series forecasting with automatic trend, seasonality, and holiday effects. Handle changepoints and produce uncertainty intervals.

Basic Prophet fit and forecast
import pandas as pd
import numpy as np

# Simulate daily data with trend + weekly seasonality (Prophet-ready format)
np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
weekly = 10 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 5, 365)
y = trend + weekly + noise

df = pd.DataFrame({'ds': dates, 'y': y})
print('Prophet input format:')
print(df.head())
print(f'\nShape: {df.shape}')
print(f'Date range: {df.ds.min().date()} to {df.ds.max().date()}')
print(f'y stats: mean={df.y.mean():.1f}, std={df.y.std():.1f}')

# Without Prophet installed, demonstrate the workflow:
print('\nProphet workflow:')
print('  from prophet import Prophet')
print('  m = Prophet(yearly_seasonality=False, weekly_seasonality=True)')
print('  m.fit(df)')
print('  future = m.make_future_dataframe(periods=30)')
print('  forecast = m.predict(future)')
print('  m.plot(forecast)')
Manual additive decomposition as Prophet substitute
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
weekly = 8 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 5, 365)
y = trend + weekly + noise

df = pd.DataFrame({'ds': dates, 'y': y})
df['dow'] = df['ds'].dt.dayofweek

# Fit trend with linear regression
from sklearn.linear_model import LinearRegression
t = np.arange(len(df)).reshape(-1, 1)
lr = LinearRegression().fit(t, df['y'])
df['trend_fit'] = lr.predict(t)

# Fit weekly seasonality as day-of-week means on residuals
df['resid'] = df['y'] - df['trend_fit']
dow_effect = df.groupby('dow')['resid'].mean()
df['seasonal'] = df['dow'].map(dow_effect)
df['yhat'] = df['trend_fit'] + df['seasonal']

mse = ((df['y'] - df['yhat'])**2).mean()**0.5
print(f'RMSE (train): {mse:.2f}')
fig, axes = plt.subplots(3, 1, figsize=(10, 8))
axes[0].plot(df['ds'], df['y'], alpha=0.5, label='actual'); axes[0].plot(df['ds'], df['yhat'], label='fit'); axes[0].legend(); axes[0].set_title('Additive Model Fit')
axes[1].plot(df['ds'], df['trend_fit']); axes[1].set_title('Trend Component')
axes[2].bar(range(7), dow_effect.values); axes[2].set_xticks(range(7)); axes[2].set_xticklabels(['Mon','Tue','Wed','Thu','Fri','Sat','Sun']); axes[2].set_title('Weekly Seasonality')
plt.tight_layout(); plt.savefig('prophet_decomp.png', dpi=80); plt.close(); print('Saved prophet_decomp.png')
Changepoint detection with PELT algorithm
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

# Simulate series with structural breaks
np.random.seed(42)
n = 300
t = np.arange(n)
# Three regimes: flat, upward, downward
y = np.concatenate([
    np.random.normal(10, 2, 100),
    np.random.normal(25, 2, 100),
    np.random.normal(15, 2, 100)
])

# Simple CUSUM-based changepoint detection
def cusum_changepoints(series, threshold=10):
    mu = series.mean()
    cusum_pos = np.zeros(len(series))
    cusum_neg = np.zeros(len(series))
    changepoints = []
    for i in range(1, len(series)):
        cusum_pos[i] = max(0, cusum_pos[i-1] + series[i] - mu - 0.5)
        cusum_neg[i] = max(0, cusum_neg[i-1] - series[i] + mu - 0.5)
        if cusum_pos[i] > threshold or cusum_neg[i] > threshold:
            changepoints.append(i)
            cusum_pos[i] = cusum_neg[i] = 0
    return changepoints

cps = cusum_changepoints(y, threshold=15)
print(f'Detected changepoints at indices: {cps}')
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(t, y, alpha=0.7)
for cp in cps:
    ax.axvline(cp, color='red', linestyle='--', linewidth=2, label=f'CP at {cp}' if cp == cps[0] else '')
ax.set_title('CUSUM Changepoint Detection'); ax.legend()
plt.tight_layout(); plt.savefig('changepoints.png', dpi=80); plt.close(); print('Saved changepoints.png')
Cross-validation for forecasting (walk-forward)
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge

np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=200, freq='D')
trend = np.linspace(0, 50, 200)
weekly = 5 * np.sin(2 * np.pi * np.arange(200) / 7)
noise = np.random.normal(0, 3, 200)
y = trend + weekly + noise

df = pd.DataFrame({'ds': dates, 'y': y})
df['t'] = np.arange(len(df))
df['dow'] = df['ds'].dt.dayofweek
# Encode day-of-week as dummies
for d in range(7):
    df[f'dow_{d}'] = (df['dow'] == d).astype(int)

feat_cols = ['t'] + [f'dow_{d}' for d in range(7)]
X = df[feat_cols].values

# Walk-forward validation: train on first 60%, then expand
initial = 120; horizon = 7; errors = []
for start in range(initial, len(df) - horizon, horizon):
    X_tr, y_tr = X[:start], y[:start]
    X_te, y_te = X[start:start+horizon], y[start:start+horizon]
    m = Ridge().fit(X_tr, y_tr)
    preds = m.predict(X_te)
    errors.append(np.sqrt(((y_te - preds)**2).mean()))

print(f'Walk-forward validation ({len(errors)} folds):')
print(f'  Mean RMSE: {np.mean(errors):.2f}')
print(f'  Std  RMSE: {np.std(errors):.2f}')
print(f'  Min  RMSE: {min(errors):.2f}, Max: {max(errors):.2f}')
💼 Real-World Scenario
Forecast retail store daily sales for next 30 days using historical 2 years of data. The data has weekly patterns (lower on Mondays, peak on weekends), a monthly pay-cycle boost, and a gradual upward trend. Produce point forecasts and 80% prediction intervals.
Real-World Code
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

np.random.seed(0)
dates = pd.date_range('2022-01-01', periods=730, freq='D')
trend = np.linspace(500, 700, 730)
weekly = 40 * np.sin(2 * np.pi * np.arange(730) / 7 - np.pi)
monthly = 30 * np.sin(2 * np.pi * np.arange(730) / 30)
noise = np.random.normal(0, 15, 730)
y = trend + weekly + monthly + noise

df = pd.DataFrame({'ds': dates, 'y': y})
df['t'] = np.arange(len(df))
df['dow'] = df['ds'].dt.dayofweek
df['dom'] = df['ds'].dt.day
for d in range(7): df[f'dow_{d}'] = (df['dow'] == d).astype(int)

feat_cols = ['t'] + [f'dow_{d}' for d in range(7)] + ['dom']
X = df[feat_cols].values
train = df[df['ds'] < '2023-07-01']; test = df[df['ds'] >= '2023-07-01']
idx_tr = df['ds'] < '2023-07-01'

sc = StandardScaler().fit(X[idx_tr])
m = Ridge(alpha=1.0).fit(sc.transform(X[idx_tr]), y[idx_tr])

resid = y[idx_tr] - m.predict(sc.transform(X[idx_tr]))
pred = m.predict(sc.transform(X[~idx_tr]))
print(f'Forecast RMSE: {np.sqrt(((y[~idx_tr]-pred)**2).mean()):.2f}')
print(f'80% PI: Β±{1.28*resid.std():.2f}')
🏋️ Practice: Forecast Weekly Revenue with Confidence Intervals
Given 2 years of weekly revenue data with trend and seasonality, build a feature-based Ridge model (lag features + time index + month dummies). Perform walk-forward validation with 4-week horizons. Report average RMSE and generate 80% prediction intervals using residual std.
Starter Code
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge

np.random.seed(7)
weeks = pd.date_range('2022-01-03', periods=104, freq='W')
trend = np.linspace(10000, 15000, 104)
seasonality = 1500 * np.sin(2 * np.pi * np.arange(104) / 52)
noise = np.random.normal(0, 300, 104)
y = trend + seasonality + noise
df = pd.DataFrame({'ds': weeks, 'revenue': y})

# TODO: Create features (t index, month dummies, lag-1)
# TODO: Walk-forward cross-validation with 4-week horizon
# TODO: Report average RMSE and 80% PI width
✅ Practice Checklist
12. Anomaly Detection in Time Series

Detect outliers and anomalies in sequential data using statistical thresholds, isolation forests, and autoencoder-style reconstruction errors.

Z-score and IQR rolling anomaly detection
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='D')
y = np.sin(np.arange(100) * 0.3) * 10 + np.random.normal(0, 1, 100) + 50
# Inject anomalies
y[[20, 45, 70, 85]] += [25, -20, 30, -15]

df = pd.DataFrame({'ds': dates, 'y': y})

# Rolling z-score
win = 14
df['roll_mean'] = df['y'].rolling(win, min_periods=1).mean()
df['roll_std']  = df['y'].rolling(win, min_periods=1).std().fillna(1)
df['zscore']    = (df['y'] - df['roll_mean']) / df['roll_std']
df['anomaly_z'] = df['zscore'].abs() > 2.5

print(f'Z-score anomalies: {df["anomaly_z"].sum()} detected')
print(df[df['anomaly_z']][['ds','y','zscore']].to_string())
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(df['ds'], df['y'], label='series')
ax.scatter(df[df['anomaly_z']]['ds'], df[df['anomaly_z']]['y'],
           color='red', s=80, zorder=5, label='anomaly')
ax.set_title('Rolling Z-score Anomaly Detection'); ax.legend()
plt.tight_layout(); plt.savefig('anomaly_zscore.png', dpi=80); plt.close(); print('Saved anomaly_zscore.png')
Isolation Forest for multivariate anomalies
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

np.random.seed(42)
n = 200
dates = pd.date_range('2024-01-01', periods=n, freq='H')
cpu    = np.random.normal(40, 10, n)
memory = cpu * 1.5 + np.random.normal(0, 5, n)
# Inject anomalies
cpu[[50, 100, 150]] = [95, 5, 98]
memory[[50, 100, 150]] = [30, 120, 25]

df = pd.DataFrame({'ds': dates, 'cpu': cpu, 'memory': memory})
X = df[['cpu', 'memory']].values

iso = IsolationForest(contamination=0.05, random_state=42)
df['anomaly'] = iso.fit_predict(X) == -1
df['score']   = iso.score_samples(X)

print(f'Anomalies detected: {df["anomaly"].sum()}')
print(df[df['anomaly']][['ds','cpu','memory','score']].to_string())
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
for i, col in enumerate(['cpu', 'memory']):
    axes[i].plot(df['ds'], df[col], alpha=0.7)
    axes[i].scatter(df[df['anomaly']]['ds'], df[df['anomaly']][col],
                    color='red', s=60, zorder=5)
    axes[i].set_ylabel(col)
plt.suptitle('Isolation Forest Anomaly Detection'); plt.tight_layout()
plt.savefig('anomaly_iforest.png', dpi=80); plt.close(); print('Saved anomaly_iforest.png')
LSTM autoencoder reconstruction error
import numpy as np

np.random.seed(42)
# Simulate normal signal
t = np.linspace(0, 20*np.pi, 500)
normal = np.sin(t) + 0.1 * np.random.randn(500)
# Inject anomaly window
anom = normal.copy()
anom[200:220] += 5  # spike

# Simple threshold-based reconstruction error (PCA substitute)
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# Create sliding windows
def make_windows(x, w=20):
    return np.array([x[i:i+w] for i in range(len(x)-w)])

X = make_windows(normal, 20)
X_anom = make_windows(anom, 20)

sc = StandardScaler().fit(X)
X_sc = sc.transform(X)
X_anom_sc = sc.transform(X_anom)

pca = PCA(n_components=5).fit(X_sc)
rec = pca.inverse_transform(pca.transform(X_anom_sc))
errors = ((X_anom_sc - rec)**2).mean(axis=1)

threshold = np.percentile(errors, 95)
print(f'Reconstruction error threshold (95th pct): {threshold:.4f}')
print(f'Anomaly windows detected: {(errors > threshold).sum()}')
print(f'Max error at index: {errors.argmax()} (injected at 200:220)')
STL-based anomaly detection
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import STL
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=365, freq='D')
trend = np.linspace(0, 30, 365)
seasonal = 10 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 2, 365)
y = trend + seasonal + noise

# Inject point anomalies
injected = [80, 160, 250, 320]
y_anom = y.copy()
for idx in injected:
    y_anom[idx] += np.random.choice([-25, 25])

series = pd.Series(y_anom, index=dates)
stl = STL(series, period=7, robust=True)
result = stl.fit()
residual = result.resid

thresh = 3 * residual.std()
anomalies = series[residual.abs() > thresh]
print(f'STL anomalies detected: {len(anomalies)}')
print(f'Injected at: {injected}')
print(f'Detected at: {list(anomalies.index.dayofyear)}')
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
axes[0].plot(series.index, series, alpha=0.6); axes[0].scatter(anomalies.index, anomalies, color='red', s=60, zorder=5, label='anomaly'); axes[0].legend(); axes[0].set_title('STL Anomaly Detection')
axes[1].plot(residual.index, residual); axes[1].axhline(thresh, color='r', linestyle='--'); axes[1].axhline(-thresh, color='r', linestyle='--'); axes[1].set_title('STL Residual')
plt.tight_layout(); plt.savefig('stl_anomaly.png', dpi=80); plt.close(); print('Saved stl_anomaly.png')
💼 Real-World Scenario
Monitor server CPU usage (sampled every 5 minutes) to automatically alert when anomalies occur. The signal has daily patterns (low at night, high during business hours). You must detect both sudden spikes and sustained elevated periods while minimizing false alarms.
Real-World Code
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

np.random.seed(1)
# 2 weeks of 5-min samples = 4032 points
t = np.arange(4032)
daily = 20 * np.sin(2*np.pi*t / 288 - np.pi/2) + 50  # 288 samples/day
noise = np.random.normal(0, 3, 4032)
cpu = np.clip(daily + noise, 5, 100)
# Inject anomalies
cpu[500:510]  += 40  # spike
cpu[1200:1230] += 30  # sustained high

times = pd.date_range('2024-01-01', periods=4032, freq='5min')
df = pd.DataFrame({'time': times, 'cpu': cpu})
df['hour'] = df['time'].dt.hour
df['roll_mean'] = df['cpu'].rolling(12).mean().fillna(method='bfill')
df['roll_std']  = df['cpu'].rolling(12).std().fillna(1)

# Isolation Forest on features
X = df[['cpu', 'hour', 'roll_mean', 'roll_std']].values
iso = IsolationForest(contamination=0.01, random_state=42)
df['anomaly'] = iso.fit_predict(X) == -1
print(f'Anomalies detected: {df["anomaly"].sum()} of {len(df)} points ({df["anomaly"].mean():.1%})')
print(df[df['anomaly']].groupby(df[df['anomaly']]['time'].dt.date)['anomaly'].count())
🏋️ Practice: Detect Order Volume Anomalies
Given hourly e-commerce order counts with weekly seasonality, detect anomalies using rolling z-score (window=24h, threshold=3). Also apply Isolation Forest on (count, hour, day_of_week) features. Compare which method catches more of the injected spikes (3 injected anomalies) with fewer false positives.
Starter Code
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

np.random.seed(99)
hours = pd.date_range('2024-01-01', periods=336, freq='H')  # 2 weeks
base = 100 + 50 * np.sin(2*np.pi*np.arange(336)/24 - np.pi)
noise = np.random.normal(0, 8, 336)
orders = np.clip(base + noise, 0, None)
orders[[100, 200, 280]] += [150, -80, 200]  # inject anomalies
df = pd.DataFrame({'time': hours, 'orders': orders})

# TODO: Rolling z-score anomaly detection (window=24, thresh=3)
# TODO: Isolation Forest on (orders, hour, dayofweek)
# TODO: Compare detected anomaly counts and false positive rates
✅ Practice Checklist
13. Neural Forecasting Fundamentals

Apply LSTM, Temporal Convolutional Networks (TCN), and N-BEATS-style architectures using PyTorch or Keras for sequence forecasting tasks.

Univariate LSTM forecasting (NumPy / PyTorch-free)
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

# Demonstrate the lag-feature approach that underlies RNN forecasting
np.random.seed(42)
n = 300
t = np.arange(n)
y = np.sin(0.2*t) * 10 + 0.1*t + np.random.normal(0, 1, n)

def make_supervised(series, lags=10):
    X, Y = [], []
    for i in range(lags, len(series)):
        X.append(series[i-lags:i])
        Y.append(series[i])
    return np.array(X), np.array(Y)

X, Y = make_supervised(y, lags=20)
split = int(0.8 * len(X))
X_tr, X_te = X[:split], X[split:]
Y_tr, Y_te = Y[:split], Y[split:]

sc_x = StandardScaler().fit(X_tr)
sc_y = StandardScaler().fit(Y_tr.reshape(-1,1))

model = Ridge(alpha=0.1)
model.fit(sc_x.transform(X_tr), sc_y.transform(Y_tr.reshape(-1,1)).ravel())

pred_sc = model.predict(sc_x.transform(X_te))
pred = sc_y.inverse_transform(pred_sc.reshape(-1,1)).ravel()

rmse = np.sqrt(((Y_te - pred)**2).mean())
print(f'Lag-20 linear model RMSE: {rmse:.3f}')
print(f'Baseline (mean) RMSE:     {np.sqrt(((Y_te - Y_tr.mean())**2).mean()):.3f}')
print('This is the feature-engineering equivalent of an LSTM cell state.')
Sequence-to-sequence multi-step forecasting
import numpy as np
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor

np.random.seed(42)
n = 400
t = np.arange(n)
y = 20 * np.sin(0.1*t) + 0.05*t + np.random.normal(0, 2, n)

def make_seq2seq(series, lookback=30, horizon=7):
    X, Y = [], []
    for i in range(lookback, len(series) - horizon + 1):
        X.append(series[i-lookback:i])
        Y.append(series[i:i+horizon])
    return np.array(X), np.array(Y)

X, Y = make_seq2seq(y, lookback=30, horizon=7)
split = int(0.8 * len(X))
X_tr, X_te = X[:split], X[split:]
Y_tr, Y_te = Y[:split], Y[split:]

# Multi-output regression (equivalent to decoder in seq2seq)
model = MultiOutputRegressor(GradientBoostingRegressor(n_estimators=50, random_state=0))
model.fit(X_tr, Y_tr)
pred = model.predict(X_te)

rmse_by_step = np.sqrt(((Y_te - pred)**2).mean(axis=0))
print('RMSE by forecast horizon step:')
for step, r in enumerate(rmse_by_step, 1):
    print(f'  Step {step}: {r:.3f}')
print(f'Overall RMSE: {rmse_by_step.mean():.3f}')
Attention-weighted feature importance for time series
import numpy as np

np.random.seed(42)
n = 500
t = np.arange(n)
y = 15*np.sin(0.15*t) + 5*np.sin(0.05*t) + np.random.normal(0, 1, n)

def make_windows(series, w=20):
    X = np.array([series[i-w:i] for i in range(w, len(series))])
    Y = series[w:]
    return X, Y

X, Y = make_windows(y, 20)

# Simulate attention: weight lags by their correlation with target
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

sc_x = StandardScaler().fit(X)
sc_y = StandardScaler().fit(Y.reshape(-1,1))
X_sc = sc_x.transform(X)
Y_sc = sc_y.transform(Y.reshape(-1,1)).ravel()

# Attention weights = absolute Ridge coefficients (softmax)
model = Ridge(alpha=0.01).fit(X_sc, Y_sc)
raw_weights = np.abs(model.coef_)
attention = raw_weights / raw_weights.sum()  # softmax-like

print('Attention weights by lag (most recent = lag 1):')
for lag in range(1, 6):
    print(f'  Lag {lag:2d}: {attention[-lag]:.4f} ({"high" if attention[-lag] > attention.mean() else "low"})')
print(f'Top-3 most attended lags: {(-attention).argsort()[:3] + 1}')
N-BEATS-inspired basis expansion forecasting
import numpy as np
from sklearn.linear_model import Ridge

np.random.seed(0)
n = 300
t = np.arange(n)
y = 10*np.sin(0.2*t) + 0.1*t + np.random.normal(0, 1.5, n)

# N-BEATS idea: decompose into trend + seasonality bases
def trend_basis(T, degree=3):
    """Polynomial trend basis for N-BEATS trend block."""
    t_norm = np.linspace(0, 1, T)
    return np.column_stack([t_norm**k for k in range(degree+1)])

def fourier_basis(T, harmonics=5):
    """Fourier seasonality basis for N-BEATS seasonality block."""
    t = np.linspace(0, 2*np.pi, T)
    cols = []
    for k in range(1, harmonics+1):
        cols.extend([np.cos(k*t), np.sin(k*t)])
    return np.column_stack(cols)

LB = 30  # lookback
X, Y = [], []
for i in range(LB, n - 7):
    window = y[i-LB:i]
    trend_feats  = trend_basis(LB, degree=2).T @ window / LB
    season_feats = fourier_basis(LB, harmonics=3).T @ window / LB
    X.append(np.concatenate([trend_feats, season_feats]))
    Y.append(y[i:i+7].mean())
X, Y = np.array(X), np.array(Y)

split = int(0.8*len(X))
model = Ridge(alpha=0.1).fit(X[:split], Y[:split])
pred  = model.predict(X[split:])
rmse  = np.sqrt(((Y[split:] - pred)**2).mean())
print(f'N-BEATS basis expansion RMSE: {rmse:.3f}')
print(f'Trend features: {trend_basis(LB, 2).shape[1]}, Fourier features: {fourier_basis(LB, 3).shape[1]}')
💼 Real-World Scenario
Build a multi-step electricity demand forecasting system that predicts next 24 hours (hourly) using the previous 7 days of data. Features include lagged values, hour-of-day, day-of-week, temperature. Compare gradient boosting seq2seq vs naive persistence baseline.
Real-World Code
import numpy as np
import pandas as pd
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor

np.random.seed(5)
hours = pd.date_range('2023-01-01', periods=8760, freq='H')
daily = 500 + 200*np.sin(2*np.pi*np.arange(8760)/24 - np.pi)
weekly= 50 *np.sin(2*np.pi*np.arange(8760)/(24*7))
noise = np.random.normal(0, 20, 8760)
demand = daily + weekly + noise

df = pd.DataFrame({'time': hours, 'demand': demand, 'hour': hours.hour, 'dow': hours.dayofweek})

LB, H = 24*7, 24  # 7-day lookback, 24h horizon
X, Y = [], []
for i in range(LB, len(df)-H):
    feats = list(df['demand'].values[i-LB:i])
    feats += [df['hour'].iloc[i], df['dow'].iloc[i]]
    X.append(feats)
    Y.append(df['demand'].values[i:i+H])
X, Y = np.array(X), np.array(Y)

split = int(0.85*len(X))
model = MultiOutputRegressor(GradientBoostingRegressor(n_estimators=30, random_state=0), n_jobs=-1)
model.fit(X[:split], Y[:split])
pred = model.predict(X[split:])
rmse = np.sqrt(((Y[split:]-pred)**2).mean())
baseline = np.sqrt(((Y[split:]-Y[split-1:-1])**2).mean())  # persistence
print(f'GBM RMSE: {rmse:.2f}, Persistence RMSE: {baseline:.2f}')
print(f'Improvement: {(baseline-rmse)/baseline:.1%}')
🏋️ Practice: Seq2Seq Solar Power Forecasting
Using 1 year of hourly solar generation data (sinusoidal with daily pattern, zero at night), build a seq2seq GBM model with 48-hour lookback and 12-hour horizon. Compare RMSE for the first 4 steps vs last 4 steps. Also implement a simple attention weighting by computing per-lag correlations with the target.
Starter Code
import numpy as np
import pandas as pd
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor

np.random.seed(3)
hours = pd.date_range('2023-01-01', periods=8760, freq='H')
raw = np.maximum(0, np.sin(2*np.pi*np.arange(8760)/24 - np.pi/2))
solar = raw * 1000 + np.random.normal(0, 30, 8760)
df = pd.DataFrame({'time': hours, 'solar': solar})

LB, H = 48, 12
# TODO: Build (X, Y) windows with lookback=48, horizon=12
# TODO: Train GBM MultiOutputRegressor
# TODO: Report RMSE for steps 1-4 vs steps 9-12
# TODO: Compute attention weights via per-lag correlation
✅ Practice Checklist
14. Wavelet Analysis for Time Series

Discrete Wavelet Transform (DWT)
import numpy as np
import pywt
np.random.seed(0)
t = np.linspace(0, 8*np.pi, 512)
signal = (np.sin(t) + 0.5*np.sin(5*t) + np.random.normal(0, 0.2, 512))
wavelet = 'db4'
level = 4
coeffs = pywt.wavedec(signal, wavelet, level=level)
print(f"Decomposition levels: {len(coeffs)-1}")
print(f"Approximation coeff shape: {coeffs[0].shape}")
for i, c in enumerate(coeffs[1:], 1):
    print(f"  Detail level {i}: shape={c.shape}, energy={np.sum(c**2):.2f}")
# Denoise: zero out small detail coefficients
threshold = 0.3
coeffs_denoised = [coeffs[0]] + [pywt.threshold(c, threshold, mode='soft') for c in coeffs[1:]]
denoised = pywt.waverec(coeffs_denoised, wavelet)[:len(signal)]
mse = np.mean((signal - denoised)**2)
print(f"Denoising MSE: {mse:.4f}")
Continuous Wavelet Transform (CWT) Scalogram
import numpy as np
import pywt
np.random.seed(1)
fs = 100  # Hz
t = np.arange(0, 4, 1/fs)
# Chirp: frequency increases from 5 to 20 Hz
freq = 5 + 15*t/4
signal = np.sin(2*np.pi*freq*t) + np.random.normal(0, 0.1, len(t))
widths = np.arange(1, 64)
cwt_matrix, freqs = pywt.cwt(signal, widths, 'morl', sampling_period=1/fs)
power = np.abs(cwt_matrix)**2
# Peak frequency at start vs end
t_start = slice(0, 50); t_end = slice(350, 400)
peak_freq_start = freqs[np.argmax(power[:, t_start].mean(axis=1))]
peak_freq_end   = freqs[np.argmax(power[:, t_end].mean(axis=1))]
print(f"Dominant frequency (start): {peak_freq_start:.1f} Hz")
print(f"Dominant frequency (end):   {peak_freq_end:.1f} Hz")
Wavelet-Based Anomaly Detection
import numpy as np
import pywt
np.random.seed(42)
n = 1000
ts = np.sin(2*np.pi*np.arange(n)/50) + np.random.normal(0, 0.1, n)
# Inject anomalies
ts[300:310] += 3.0
ts[700] -= 4.0
# DWT anomaly detection via reconstruction error per window
coeffs = pywt.wavedec(ts, 'haar', level=3)
# Zero out detail at level 1 (high-freq noise)
coeffs[1] = np.zeros_like(coeffs[1])
reconstructed = pywt.waverec(coeffs, 'haar')[:n]
residuals = np.abs(ts - reconstructed)
threshold = residuals.mean() + 3*residuals.std()
anomaly_idx = np.where(residuals > threshold)[0]
print(f"Anomalies detected at indices: {anomaly_idx[:10]}")
print(f"True anomaly regions: 300-310, 700")
💼 Real-World Scenario
Manufacturing: use wavelet decomposition to separate machine vibration signals by frequency band, detect bearing faults (high-frequency detail) while ignoring normal operational frequencies.
Real-World Code
import numpy as np
import pywt
np.random.seed(7)
fs = 1000  # 1 kHz sampling
t = np.arange(0, 2, 1/fs)  # 2 seconds
# Normal operation: 10 Hz fundamental + harmonics
normal = (np.sin(2*np.pi*10*t) + 0.3*np.sin(2*np.pi*20*t) +
          np.random.normal(0, 0.05, len(t)))
# Fault: adds 150 Hz bearing frequency
fault  = normal + 0.5*np.sin(2*np.pi*150*t)
for label, sig in [('Normal', normal), ('Fault', fault)]:
    coeffs = pywt.wavedec(sig, 'db4', level=5)
    # Level 1 detail captures highest frequencies
    hf_energy = np.sum(coeffs[1]**2) / np.sum(sig**2)
    print(f"{label}: high-freq energy ratio = {hf_energy:.4f}")
    energies = [np.sum(c**2) / np.sum(sig**2) for c in coeffs]
    print(f"  Energy by level: " + ", ".join(f"L{i}={e:.3f}" for i, e in enumerate(energies)))
🏋️ Practice: EEG Brainwave Analysis
Given a simulated EEG signal (1-second, 256 Hz) with delta (1-4 Hz), alpha (8-13 Hz), and beta (13-30 Hz) components, use wavelet decomposition to extract each band. Report the relative power of each band. Inject a 50ms epileptic spike at t=0.5s and detect it using wavelet residuals.
Starter Code
import numpy as np
import pywt
fs = 256  # Hz
t = np.arange(0, 1, 1/fs)
# Multi-band EEG signal
delta = 0.5 * np.sin(2*np.pi*2*t)
alpha = 0.3 * np.sin(2*np.pi*10*t)
beta  = 0.2 * np.sin(2*np.pi*20*t)
noise = np.random.normal(0, 0.05, len(t))
eeg = delta + alpha + beta + noise
# Inject spike
spike_start = int(0.5 * fs)
eeg[spike_start:spike_start+13] += 3.0
# TODO: DWT decomposition with 'db4', level=5
# TODO: Identify which detail levels correspond to each EEG band
# TODO: Compute relative band powers
# TODO: Detect spike using reconstruction error
✅ Practice Checklist
15. Multivariate Time Series & VAR Models

Vector Autoregression (VAR)
import numpy as np
import pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
np.random.seed(42)
n = 200
e1, e2 = np.random.normal(0, 1, n), np.random.normal(0, 1, n)
y1, y2 = np.zeros(n), np.zeros(n)
for t in range(1, n):
    y1[t] = 0.6*y1[t-1] + 0.2*y2[t-1] + e1[t]
    y2[t] = 0.1*y1[t-1] + 0.7*y2[t-1] + e2[t]
df = pd.DataFrame({'y1': y1, 'y2': y2})
model = VAR(df)
results = model.fit(maxlags=4, ic='aic')
print(f"Selected lag order: {results.k_ar}")
forecast = results.forecast(df.values[-results.k_ar:], steps=5)
print("5-step forecast:"); print(forecast.round(3))
Granger Causality Test
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import grangercausalitytests
np.random.seed(1)
n = 300
eps1 = np.random.normal(0, 1, n)
eps2 = np.random.normal(0, 1, n)
x = np.zeros(n); y = np.zeros(n)
for t in range(2, n):
    x[t] = 0.5*x[t-1] + eps1[t]
    y[t] = 0.4*x[t-1] + 0.3*y[t-1] + eps2[t]  # x Granger-causes y
df = pd.DataFrame({'y': y, 'x': x})
max_lag = 4
print("Granger causality (x -> y):")
res = grangercausalitytests(df[['y','x']], maxlag=max_lag, verbose=False)
for lag, r in res.items():
    f_stat = r[0]['ssr_ftest'][0]
    p_val  = r[0]['ssr_ftest'][1]
    print(f"  Lag {lag}: F={f_stat:.3f}, p={p_val:.4f}")
Cointegration & Error Correction
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
from statsmodels.regression.linear_model import OLS
np.random.seed(5)
n = 500
common_trend = np.cumsum(np.random.normal(0, 1, n))
y1 = common_trend + np.random.normal(0, 0.5, n)
y2 = 0.8 * common_trend + 1.2 + np.random.normal(0, 0.5, n)
score, pvalue, _ = coint(y1, y2)
print(f"Cointegration test: t={score:.4f}, p={pvalue:.4f}")
# Estimate cointegrating vector
beta = OLS(y1, np.column_stack([np.ones(n), y2])).fit().params
spread = y1 - beta[1]*y2 - beta[0]
z_spread = (spread - spread.mean()) / spread.std()
print(f"Spread mean-reversion: current z-score = {z_spread[-1]:.3f}")
💼 Real-World Scenario
Macroeconomic forecasting: model GDP, inflation, and interest rates jointly using VAR to capture cross-variable dynamics and produce scenario forecasts for monetary policy analysis.
Real-World Code
import numpy as np
import pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import grangercausalitytests
np.random.seed(10)
n = 120  # 10 years monthly
# Simulate: interest rate affects inflation with lag, both affect GDP
ir = np.cumsum(np.random.normal(0, 0.1, n)) + 3.0
inf = 0.5*np.roll(ir, 2) + np.cumsum(np.random.normal(0, 0.05, n)) + 2.0
gdp = -0.3*np.roll(ir, 3) + 0.4*np.roll(inf, 1) + np.cumsum(np.random.normal(0, 0.08, n)) + 2.5
df = pd.DataFrame({'gdp': gdp, 'inflation': inf, 'interest_rate': ir})
df = df.diff().dropna()  # difference to achieve stationarity
model = VAR(df)
res = model.fit(maxlags=6, ic='aic')
print(f"VAR lag order: {res.k_ar}")
forecast = res.forecast(df.values[-res.k_ar:], steps=12)
print(f"12-month GDP forecast range: {forecast[:,0].min():.3f} to {forecast[:,0].max():.3f}")
# Impulse response
irf = res.irf(10)
print(f"GDP response to interest shock at lag 3: {irf.orth_irfs[3, 0, 2]:.4f}")
🏋️ Practice: Pairs Trading Signal
Using two simulated stock price series that are cointegrated (sharing a common random walk), implement a pairs trading strategy: (1) verify cointegration with Engle-Granger test, (2) estimate the spread, (3) generate long/short signals when z-score > 2 or < -2, (4) backtest and report Sharpe ratio and max drawdown.
Starter Code
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
from statsmodels.regression.linear_model import OLS
np.random.seed(3)
n = 500
common = np.cumsum(np.random.normal(0, 1, n))
price_a = common + np.random.normal(0, 0.5, n) + 50
price_b = 0.9*common + np.random.normal(0, 0.5, n) + 45
# TODO: Cointegration test
# TODO: Compute hedge ratio and spread
# TODO: Z-score of spread
# TODO: Generate trading signals (long A-short B when z<-2, vice versa)
# TODO: Compute daily P&L, Sharpe ratio, max drawdown
✅ Practice Checklist
16. Real-Time Streaming & Online Learning

Online Learning with River
# pip install river
from river import linear_model, preprocessing, metrics, stream
import numpy as np
np.random.seed(0)
n = 1000
X_data = np.column_stack([np.random.randn(n), np.arange(n)/n])
y_data = 3*X_data[:,0] - 2*X_data[:,1] + 0.5 + np.random.normal(0, 0.3, n)
model = preprocessing.StandardScaler() | linear_model.LinearRegression()
metric = metrics.RMSE()
for i, (xi, yi) in enumerate(zip(X_data, y_data)):
    x_dict = {'f0': xi[0], 'f1': xi[1]}
    y_pred = model.predict_one(x_dict)
    if y_pred is not None:
        metric.update(yi, y_pred)
    model.learn_one(x_dict, yi)
    if i % 200 == 0 and y_pred is not None:
        print(f"  n={i}: running RMSE={metric.get():.4f}")
print(f"Final RMSE: {metric.get():.4f}")
Concept Drift Detection (ADWIN)
# pip install river
from river import drift
import numpy as np
np.random.seed(7)
detector = drift.ADWIN()
stream = np.concatenate([
    np.random.normal(0, 1, 500),
    np.random.normal(3, 1, 500)  # sudden drift at t=500
])
drifts = []
for i, x in enumerate(stream):
    detector.update(x)
    if detector.drift_detected:
        drifts.append(i)
        detector = drift.ADWIN()  # reset
print(f"Drift detected at sample(s): {drifts}")
Sliding Window Streaming Statistics
import numpy as np
from collections import deque
class StreamStats:
    def __init__(self, window=100):
        self.w = deque(maxlen=window)
        self.n = 0
    def update(self, x):
        self.w.append(x)
        self.n += 1
        return self.mean(), self.std()
    def mean(self): return np.mean(self.w)
    def std(self):  return np.std(self.w, ddof=1) if len(self.w) > 1 else 0.0
np.random.seed(1)
ss = StreamStats(window=50)
for i in range(300):
    val = np.random.normal(5 if i < 150 else 8, 1)
    mean, std = ss.update(val)
    if i % 50 == 49:
        print(f"  t={i+1}: window mean={mean:.3f}, std={std:.3f}")
💼 Real-World Scenario
IoT sensor network: process a continuous stream of temperature readings at 10 Hz from 50 sensors, detect anomalies in real-time using sliding window statistics, and adapt the baseline when ADWIN detects environmental regime changes.
Real-World Code
import numpy as np
from collections import deque
np.random.seed(42)
n_sensors, n_steps = 5, 1000
window = 60
baselines = [deque(maxlen=window) for _ in range(n_sensors)]
alerts = {i: [] for i in range(n_sensors)}
# Inject drift at step 600 for sensor 2
for t in range(n_steps):
    readings = np.random.normal(22, 0.5, n_sensors)
    if t >= 600:
        readings[2] += 4.0  # sensor 2 drifts
    for s in range(n_sensors):
        baselines[s].append(readings[s])
        if len(baselines[s]) >= 20:
            mu = np.mean(baselines[s])
            sigma = np.std(baselines[s], ddof=1)
            z = (readings[s] - mu) / max(sigma, 1e-6)
            if abs(z) > 3:
                alerts[s].append(t)
for s in range(n_sensors):
    if alerts[s]:
        print(f"Sensor {s}: {len(alerts[s])} alerts, first at t={alerts[s][0]}")
    else:
        print(f"Sensor {s}: no alerts")
🏋️ Practice: Real-Time Fraud Score Streaming
Simulate a stream of 2000 credit card transactions (amount, time_since_last, is_weekend). Use online logistic regression (River) trained incrementally. At t=1000, inject a concept drift (fraudsters change behavior). Detect drift using ADWIN on prediction errors. Report AUC before and after drift, and adaptation speed.
Starter Code
# pip install river
from river import linear_model, preprocessing, metrics, drift, stream as rv_stream
import numpy as np
np.random.seed(99)
n = 2000
# Generate transactions
amounts = np.random.exponential(50, n)
gaps = np.random.exponential(10, n)
is_weekend = np.random.randint(0, 2, n)
# Fraud rule: changes at t=1000
fraud_prob_before = 1 / (1 + np.exp(-(amounts/100 - 0.5)))
fraud_prob_after  = 1 / (1 + np.exp(-(gaps/5 - 1.0)))  # different pattern
y = np.array([
    np.random.binomial(1, fraud_prob_before[i]) if i < 1000
    else np.random.binomial(1, fraud_prob_after[i])
    for i in range(n)
])
# TODO: Online logistic regression with River
# TODO: ADWIN drift detection on binary prediction errors
# TODO: Report AUC in windows before drift, at drift, after adaptation
✅ Practice Checklist
17. Seasonal Decomposition & STL

Time series decompose into Trend, Seasonality, and Residual components. STL (Seasonal-Trend decomposition using LOESS) is robust to outliers and handles arbitrary seasonality. Use it before forecasting to understand your signal.

Classical decomposition with statsmodels
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose

np.random.seed(42)
periods = 120
dates = pd.date_range('2015-01', periods=periods, freq='MS')
trend = np.linspace(100, 160, periods)
seasonal = 15 * np.sin(2 * np.pi * np.arange(periods) / 12)
noise = np.random.normal(0, 5, periods)
ts = pd.Series(trend + seasonal + noise, index=dates)

result = seasonal_decompose(ts, model='additive', period=12)

print("Decomposition components:")
print(f"  Trend range:    [{result.trend.dropna().min():.1f}, {result.trend.dropna().max():.1f}]")
print(f"  Seasonal range: [{result.seasonal.min():.2f}, {result.seasonal.max():.2f}]")
print(f"  Residual std:   {result.resid.dropna().std():.3f}")

# Seasonal indices
seasonal_idx = result.seasonal[:12]
for month, val in zip(range(1, 13), seasonal_idx):
    print(f"  Month {month:2d}: {val:+.2f}")
STL decomposition (robust, flexible seasonality)
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL

np.random.seed(0)
n = 156  # 13 years monthly
dates = pd.date_range('2010-01', periods=n, freq='MS')
trend = np.linspace(50, 120, n) + 0.3 * np.sin(np.linspace(0, 4*np.pi, n)) * 10
seasonal = 20 * np.sin(2 * np.pi * np.arange(n) / 12) +            8  * np.sin(2 * np.pi * np.arange(n) / 6)
# Inject outliers
noise = np.random.normal(0, 3, n)
outliers = np.zeros(n)
outliers[[30, 60, 90]] = 40
ts = pd.Series(trend + seasonal + noise + outliers, index=dates)

stl = STL(ts, period=12, robust=True)
res = stl.fit()

print(f"STL decomposition of {n}-period series:")
print(f"  Trend variance:    {res.trend.var():.2f}")
print(f"  Seasonal variance: {res.seasonal.var():.2f}")
print(f"  Residual variance: {res.resid.var():.2f}")
print(f"  Seasonal strength: {max(0, 1 - res.resid.var()/(res.seasonal+res.resid).var()):.3f}")
print(f"  Trend strength:    {max(0, 1 - res.resid.var()/(res.trend+res.resid).var()):.3f}")
Trend strength, seasonality strength, and change points
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL

np.random.seed(7)
# Monthly retail sales with change point at month 60
n = 120
t = np.arange(n)
trend = np.where(t < 60, 100 + t*0.5, 130 + (t-60)*1.2)
seasonal = 25 * np.sin(2*np.pi*t/12)
noise = np.random.normal(0, 4, n)
ts = pd.Series(trend + seasonal + noise,
               index=pd.date_range('2014-01', periods=n, freq='MS'))

stl = STL(ts, period=12, robust=True)
res = stl.fit()

# Detect change point: where residuals spike
resid_rolling_std = pd.Series(res.resid).rolling(6).std()
change_pt = resid_rolling_std.idxmax()
print(f"Largest residual volatility at index: {change_pt}")

# Year-over-year growth from trend
yoy = pd.Series(res.trend).pct_change(12) * 100
print(f"\nYear-over-year trend growth (last 3):")
for i in [-3, -2, -1]:
    print(f"  Month {n+i}: {yoy.iloc[i]:.2f}%")

# Identify peak and trough season
monthly_seasonal = [res.seasonal[i::12].mean() for i in range(12)]
peak   = np.argmax(monthly_seasonal) + 1
trough = np.argmin(monthly_seasonal) + 1
print(f"\nPeak season: month {peak}, Trough: month {trough}")
💼 Real-World Scenario
A retail analyst decomposes 5 years of monthly e-commerce sales using STL to isolate the holiday spike (seasonality) from long-term growth (trend) before building a forecast model.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL

np.random.seed(42)
n = 60
dates = pd.date_range('2019-01', periods=n, freq='MS')
trend    = 1000 + np.linspace(0, 500, n)
seasonal = 300 * np.sin(2*np.pi*np.arange(n)/12) +            100 * (np.arange(n) % 12 == 11)   # Dec spike
noise    = np.random.normal(0, 30, n)
sales    = pd.Series(trend + seasonal + noise, index=dates)

stl = STL(sales, period=12, robust=True)
res = stl.fit()

print("Retail Sales Decomposition:")
print(f"  Avg monthly trend growth: ${(res.trend.diff().dropna().mean()):.1f}")
print(f"  Peak seasonal boost: ${res.seasonal.max():.0f}")
print(f"  Residual RMSE: ${np.sqrt((res.resid**2).mean()):.1f}")

# Deseasonalized series for cleaner trend analysis
deseas = sales - res.seasonal
print(f"  Deseasonalized range: [{deseas.min():.0f}, {deseas.max():.0f}]")
🏋️ Practice: Custom STL Pipeline
Generate 5 years of weekly sales data (trend + two seasonalities: annual 52-week and quarterly 13-week + noise). Apply STL with period=52. Extract and print: (1) Overall trend slope (slope of linear fit to trend component), (2) Peak week of the year seasonally, (3) Percentage of variance explained by trend, seasonal, and residual components.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from scipy import stats as sp_stats

np.random.seed(42)
n = 260  # 5 years weekly
t = np.arange(n)
trend    = 500 + t * 0.8
seasonal = 80 * np.sin(2*np.pi*t/52) + 30 * np.sin(2*np.pi*t/13)
noise    = np.random.normal(0, 20, n)
ts = pd.Series(trend + seasonal + noise,
               index=pd.date_range('2019-01-07', periods=n, freq='W'))

stl = STL(ts, period=52, robust=True)
res = stl.fit()

# (1) trend slope via linear regression on trend component
# (2) peak week (argmax of first 52 seasonal values)
# (3) variance decomposition
total_var = ts.var()
# TODO: compute each component's share
✅ Practice Checklist
18. Stationarity Testing & ARIMA Preparation

ARIMA requires a stationary series (constant mean/variance). ADF and KPSS tests check stationarity. Differencing and log transforms achieve it. ACF/PACF plots reveal AR and MA orders.

ADF and KPSS stationarity tests
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss

np.random.seed(42)
n = 200
t = np.arange(n)

series = {
    'Random Walk':    np.cumsum(np.random.randn(n)),            # non-stationary
    'Stationary AR1': np.zeros(n),                              # stationary
    'Trend + Noise':  2 + 0.05*t + np.random.randn(n),         # non-stationary (trend)
}
for i in range(1, n):
    series['Stationary AR1'][i] = 0.7*series['Stationary AR1'][i-1] + np.random.randn()

print(f"{'Series':16s}  {'ADF p':>8s}  {'ADF stat':>9s}  {'KPSS p':>8s}  {'Stationary?'}")
print('-' * 60)
for name, data in series.items():
    adf_stat, adf_p, *_ = adfuller(data, autolag='AIC')
    try:
        kpss_stat, kpss_p, *_ = kpss(data, regression='c', nlags='auto')
    except Exception:
        kpss_p = 0.01
    # Stationary: ADF rejects H0 (p<0.05) AND KPSS fails to reject H0 (p>0.05)
    is_stat = (adf_p < 0.05) and (kpss_p > 0.05)
    print(f"{name:16s}  {adf_p:8.4f}  {adf_stat:9.4f}  {kpss_p:8.4f}  {is_stat}")
Differencing, log transform, and Box-Cox
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller
from scipy.stats import boxcox

np.random.seed(0)
n = 150
t = np.arange(n)

# Non-stationary: trending with heteroskedastic variance
ts = pd.Series(np.exp(0.02*t + np.cumsum(np.random.randn(n)*0.2)) * 100)

def adf_pvalue(x):
    return adfuller(x.dropna(), autolag='AIC')[1]

print(f"Original:            ADF p={adf_pvalue(ts):.4f} (non-stationary)")

# Log transform (stabilizes variance)
ts_log = np.log(ts)
print(f"Log transform:       ADF p={adf_pvalue(ts_log):.4f}")

# First difference (removes linear trend)
ts_diff1 = ts.diff()
print(f"First difference:    ADF p={adf_pvalue(ts_diff1):.4f}")

# Log + first difference (common for financial series)
ts_log_diff = ts_log.diff()
print(f"Log + diff:          ADF p={adf_pvalue(ts_log_diff):.4f}")

# Seasonal difference (lag=12 for monthly)
ts_sdiff = ts.diff(12)
print(f"Seasonal diff (12):  ADF p={adf_pvalue(ts_sdiff):.4f}")
ACF, PACF, and identifying ARIMA order
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.tsa.arima_process import ArmaProcess

np.random.seed(42)
n = 300

# Generate AR(2) process: phi1=0.6, phi2=-0.3
ar_params = np.array([1, -0.6, 0.3])   # AR coefficients (sign convention)
ma_params = np.array([1])
ar2_process = ArmaProcess(ar_params, ma_params)
ar2_data = ar2_process.generate_sample(nsample=n)

# Generate MA(1) process
ar_params2 = np.array([1])
ma_params2 = np.array([1, 0.8])
ma1_process = ArmaProcess(ar_params2, ma_params2)
ma1_data = ma1_process.generate_sample(nsample=n)

for name, data in [('AR(2)', ar2_data), ('MA(1)', ma1_data)]:
    acf_vals  = acf(data,  nlags=10, fft=True)[1:6]  # skip lag 0
    pacf_vals = pacf(data, nlags=10)[1:6]
    conf = 1.96 / np.sqrt(n)
    print(f"\n{name} β€” PACF cutoff suggests AR order; ACF cutoff suggests MA order:")
    print(f"  ACF  lags 1-5: {acf_vals.round(3)}")
    print(f"  PACF lags 1-5: {pacf_vals.round(3)}")
    print(f"  95% conf band: Β±{conf:.3f}")
    ar_order  = sum(abs(pacf_vals) > conf)
    ma_order  = sum(abs(acf_vals)  > conf)
    print(f"  Suggested: AR order={ar_order}, MA order={ma_order}")
💼 Real-World Scenario
A commodity trader checks whether monthly aluminium prices are stationary before applying ARIMA. ADF fails to reject non-stationarity; first-differencing the log prices produces a stationary series with ACF suggesting MA(1).
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss, acf, pacf

np.random.seed(10)
# Simulate commodity prices (random walk with drift)
n = 120
prices = pd.Series(
    np.exp(np.cumsum(np.random.normal(0.005, 0.04, n))) * 1800,
    index=pd.date_range('2014-01', periods=n, freq='MS'),
    name='Aluminium_USD_t')

def check_stationarity(series, name):
    adf_stat, adf_p, *_ = adfuller(series.dropna(), autolag='AIC')
    print(f"{name:25s}: ADF p={adf_p:.4f} {'[stationary]' if adf_p < 0.05 else '[non-stationary]'}")
    return adf_p < 0.05

check_stationarity(prices, 'Level prices')
check_stationarity(np.log(prices), 'Log prices')
log_diff = np.log(prices).diff().dropna()
check_stationarity(log_diff, 'Log-differenced')

acf_vals  = acf(log_diff,  nlags=8)[1:]
pacf_vals = pacf(log_diff, nlags=8)[1:]
conf = 1.96 / np.sqrt(len(log_diff))
print(f"\nACF  lags 1-8: {acf_vals.round(3)}")
print(f"PACF lags 1-8: {pacf_vals.round(3)}")
print(f"Suggested ARIMA(p,1,q): p={sum(abs(pacf_vals[:4])>conf)}, q={sum(abs(acf_vals[:4])>conf)}")
🏋️ Practice: Automated Stationarity Pipeline
Write a function make_stationary(ts, max_diffs=2) that: (1) Tests ADF on the original series, (2) If non-stationary, applies log transform (if all positive) and re-tests, (3) If still non-stationary, applies first differencing, (4) Repeats up to max_diffs times, (5) Returns the stationary series, number of differences applied, and whether log was used.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller

def make_stationary(ts, max_diffs=2, alpha=0.05):
    result = ts.copy()
    log_used = False
    n_diffs  = 0
    # (1) check original
    # (2) try log if all positive
    # (3) difference up to max_diffs
    return result, n_diffs, log_used

np.random.seed(42)
ts_rw     = pd.Series(np.cumsum(np.random.randn(100)))
ts_exp    = pd.Series(np.exp(0.03*np.arange(100) + np.random.randn(100)*0.2)*100)
ts_stat   = pd.Series(np.random.randn(100))

for name, ts in [('Random Walk', ts_rw), ('Exponential Trend', ts_exp), ('Stationary', ts_stat)]:
    out, d, log = make_stationary(ts)
    print(f"{name}: diffs={d}, log={log}")
✅ Practice Checklist
19. ARIMA & SARIMA Modeling

ARIMA(p,d,q) models a stationary process as AutoRegressive + Integrated + Moving Average. SARIMA adds seasonal components (P,D,Q,m). Use AIC/BIC for order selection and residual diagnostics to validate.

Fitting ARIMA and forecasting
import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA

np.random.seed(42)
n = 120
dates = pd.date_range('2014-01', periods=n, freq='MS')
trend = np.linspace(100, 150, n)
noise = np.cumsum(np.random.normal(0, 2, n))  # AR component
ts = pd.Series(trend + noise, index=dates)

# Fit ARIMA(1,1,1)
model = ARIMA(ts, order=(1, 1, 1))
fitted = model.fit()
print(fitted.summary().tables[1])

# Forecast 12 months ahead
forecast = fitted.get_forecast(steps=12)
pred_mean = forecast.predicted_mean
pred_ci   = forecast.conf_int(alpha=0.05)

print(f"\n12-month forecast:")
for date, val, lo, hi in zip(pred_mean.index[:3], pred_mean[:3],
                              pred_ci.iloc[:3,0], pred_ci.iloc[:3,1]):
    print(f"  {date.strftime('%Y-%m')}: {val:.1f} [{lo:.1f}, {hi:.1f}]")
print(f"  ...")

# In-sample fit metrics
from sklearn.metrics import mean_squared_error
residuals = fitted.resid
print(f"\nIn-sample RMSE: {np.sqrt((residuals**2).mean()):.3f}")
print(f"AIC: {fitted.aic:.2f}, BIC: {fitted.bic:.2f}")
SARIMA for seasonal data
import numpy as np, pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 84  # 7 years monthly
dates = pd.date_range('2017-01', periods=n, freq='MS')
trend    = np.linspace(1000, 1600, n)
seasonal = 200 * np.sin(2*np.pi*np.arange(n)/12)
noise    = np.cumsum(np.random.normal(0, 20, n))
ts = pd.Series(trend + seasonal + noise, index=dates)

# SARIMA(1,1,1)(1,1,1,12)
model = SARIMAX(ts, order=(1,1,1), seasonal_order=(1,1,1,12),
                trend='n', enforce_stationarity=False,
                enforce_invertibility=False)
fitted = model.fit(disp=False)
print(f"SARIMA(1,1,1)(1,1,1,12): AIC={fitted.aic:.2f}, BIC={fitted.bic:.2f}")

# Compare AIC for different orders
for order, sorder in [((0,1,1),(0,1,1,12)), ((1,1,0),(1,1,0,12))]:
    m = SARIMAX(ts, order=order, seasonal_order=sorder,
                enforce_stationarity=False, enforce_invertibility=False)
    f = m.fit(disp=False)
    print(f"SARIMA{order}{sorder}: AIC={f.aic:.2f}")

forecast = fitted.get_forecast(12)
print(f"\nForecast next month: {forecast.predicted_mean.iloc[0]:.1f}")
Residual diagnostics and model validation
import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
from scipy import stats
import warnings; warnings.filterwarnings('ignore')

np.random.seed(0)
n = 150
ts = pd.Series(np.cumsum(np.random.normal(0.5, 2, n)) + np.random.randn(n),
               index=pd.date_range('2012-01', periods=n, freq='MS'))

model = ARIMA(ts, order=(1, 1, 1))
fitted = model.fit()
residuals = fitted.resid.dropna()

# Test 1: Ljung-Box (autocorrelation in residuals)
lb = acorr_ljungbox(residuals, lags=[10], return_df=True)
print(f"Ljung-Box p (lag 10): {lb['lb_pvalue'].iloc[0]:.4f}  (>0.05 = no autocorrelation = good)")

# Test 2: Normality of residuals
_, p_sw = stats.shapiro(residuals[:50])
print(f"Shapiro-Wilk p:       {p_sw:.4f}  (>0.05 = normal residuals = good)")

# Test 3: Heteroskedasticity (should be constant variance)
half = len(residuals) // 2
_, p_lev = stats.levene(residuals[:half], residuals[half:])
print(f"Levene p:             {p_lev:.4f}  (>0.05 = equal variance = good)")

print(f"\nResidual stats: mean={residuals.mean():.4f}, std={residuals.std():.3f}")
print(f"AIC={fitted.aic:.2f}, model is {'adequate' if lb['lb_pvalue'].iloc[0]>0.05 else 'needs improvement'}")
💼 Real-World Scenario
An energy company forecasts monthly natural gas demand for the next 6 months using SARIMA to capture both the yearly seasonal cycle and trend growth, with 95% prediction intervals for capacity planning.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')

np.random.seed(5)
n = 96
dates = pd.date_range('2016-01', periods=n, freq='MS')
trend    = np.linspace(500, 700, n)
seasonal = 150 * np.cos(2*np.pi*np.arange(n)/12)  # peak in winter
noise    = np.cumsum(np.random.normal(0, 10, n))
demand   = pd.Series(trend + seasonal + noise, index=dates, name='Gas_MMcf')

train = demand[:-12]
test  = demand[-12:]

model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12),
                enforce_stationarity=False, enforce_invertibility=False)
fitted = model.fit(disp=False)

forecast = fitted.get_forecast(steps=12)
pred   = forecast.predicted_mean
ci     = forecast.conf_int()

from sklearn.metrics import mean_absolute_percentage_error
mape = mean_absolute_percentage_error(test, pred) * 100
rmse = np.sqrt(((test - pred)**2).mean())

print(f"SARIMA(1,1,1)(1,1,1,12) forecast metrics:")
print(f"  MAPE: {mape:.2f}%  RMSE: {rmse:.2f} MMcf")
print(f"  AIC: {fitted.aic:.1f}")
print(f"\n6-month capacity forecast (MMcf):")
for i in range(6):
    print(f"  {pred.index[i].strftime('%Y-%m')}: {pred.iloc[i]:.0f} [{ci.iloc[i,0]:.0f}-{ci.iloc[i,1]:.0f}]")
🏋️ Practice: Auto-ARIMA Order Selection
Generate monthly sales data for 4 years with AR(2) dynamics and monthly seasonality. Write a function select_arima_order(ts, max_p=3, max_q=3, d=1) that tries all ARIMA(p,d,q) combinations and returns the order with the lowest AIC. Compare its forecast (12 months) RMSE to naive persistence forecast.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 60
ts = pd.Series(
    100 + np.cumsum(np.random.normal(0, 2, n)) + 20*np.sin(2*np.pi*np.arange(n)/12),
    index=pd.date_range('2019-01', periods=n, freq='MS'))

def select_arima_order(ts, max_p=3, max_q=3, d=1):
    best_aic, best_order = np.inf, None
    for p in range(max_p+1):
        for q in range(max_q+1):
            if p == 0 and q == 0:
                continue
            try:
                m = ARIMA(ts, order=(p, d, q))
                f = m.fit()
                if f.aic < best_aic:
                    best_aic, best_order = f.aic, (p, d, q)
            except Exception:
                pass
    return best_order, best_aic

# TODO: call select_arima_order on train set (first 48 months)
# TODO: forecast last 12 months with best order
# TODO: compare RMSE to naive persistence (repeat last observed value)
✅ Practice Checklist
20. Exponential Smoothing (ETS / Holt-Winters)

Exponential smoothing methods weight recent observations more heavily. Simple ES handles level; Holt's double ES adds trend; Holt-Winters triple ES adds seasonality. The ETS framework unifies them with Error-Trend-Seasonality states.

Simple, Holt's double, and Holt-Winters
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, Holt, ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 60
dates = pd.date_range('2019-01', periods=n, freq='MS')
trend    = np.linspace(100, 160, n)
seasonal = 20 * np.sin(2*np.pi*np.arange(n)/12)
noise    = np.random.normal(0, 5, n)
ts = pd.Series(trend + seasonal + noise, index=dates)

train, test = ts[:-12], ts[-12:]

# Simple Exponential Smoothing (level only)
ses = SimpleExpSmoothing(train, initialization_method='estimated').fit()
ses_fc = ses.forecast(12)

# Holt's (trend)
holt = Holt(train, initialization_method='estimated').fit(
    optimized=True, smoothing_level=0.3, smoothing_trend=0.1)
holt_fc = holt.forecast(12)

# Holt-Winters (trend + seasonality)
hw = ExponentialSmoothing(train, trend='add', seasonal='add',
                           seasonal_periods=12,
                           initialization_method='estimated').fit(optimized=True)
hw_fc = hw.forecast(12)

from sklearn.metrics import mean_squared_error
for name, fc in [('SES', ses_fc), ("Holt's", holt_fc), ('Holt-Winters', hw_fc)]:
    rmse = np.sqrt(mean_squared_error(test, fc))
    print(f"{name:15s}: RMSE={rmse:.2f},  alpha={getattr(getattr(ses if name=='SES' else holt if 'Holt' in name and name!='Holt-Winters' else hw,'model',None),'params',{}).get('smoothing_level','N/A')}")
ETS auto-selection and damped trend
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(0)
n = 84
dates = pd.date_range('2017-01', periods=n, freq='MS')
ts = pd.Series(
    np.linspace(200, 320, n) + 30*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0, 8, n), index=dates)

train, test = ts[:-12], ts[-12:]

configs = [
    ('Additive trend + Additive seasonal',   'add', 'add',  False),
    ('Additive trend + Mult seasonal',        'add', 'mul',  False),
    ('Damped trend + Additive seasonal',      'add', 'add',  True),
]

print(f"{'Model':40s}  {'RMSE':>6s}  {'AIC':>8s}")
for name, trend, seasonal, damped in configs:
    m = ExponentialSmoothing(train, trend=trend, seasonal=seasonal,
                              seasonal_periods=12, damped_trend=damped,
                              initialization_method='estimated')
    fitted = m.fit(optimized=True)
    fc = fitted.forecast(12)
    rmse = np.sqrt(((test - fc)**2).mean())
    print(f"{name:40s}  {rmse:6.2f}  {fitted.aic:8.2f}")
Smoothing parameters and state extraction
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(5)
n = 72
ts = pd.Series(
    150 + np.linspace(0, 50, n) + 25*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0, 6, n),
    index=pd.date_range('2018-01', periods=n, freq='MS'))

hw = ExponentialSmoothing(ts, trend='add', seasonal='add',
                           seasonal_periods=12,
                           initialization_method='estimated').fit(optimized=True)

print("Optimal smoothing parameters:")
print(f"  alpha (level):    {hw.params['smoothing_level']:.4f}")
print(f"  beta  (trend):    {hw.params['smoothing_trend']:.4f}")
print(f"  gamma (seasonal): {hw.params['smoothing_seasonal']:.4f}")

# Extract level, trend, and seasonal states
level    = hw.level
trend_s  = hw.trend
seasonal = hw.season

print(f"\nFinal state:")
print(f"  Level:    {level.iloc[-1]:.2f}")
print(f"  Trend:    {trend_s.iloc[-1]:.4f} per period")
print(f"  Seasonal indices (last 12): {seasonal.iloc[-12:].round(2).values}")

fc = hw.forecast(6)
print(f"\n6-month forecast: {fc.round(1).values}")
💼 Real-World Scenario
A retailer uses Holt-Winters with multiplicative seasonality to forecast weekly ice cream sales, capturing both trend growth and summer/winter seasonality for inventory planning.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(7)
n_weeks = 104  # 2 years weekly
t = np.arange(n_weeks)
trend_w    = 1000 + t * 3
seasonal_w = 400 * np.sin(2*np.pi*t/52) + 150 * (t % 52 > 44).astype(float)  # summer+holiday
noise_w    = np.random.normal(0, 40, n_weeks)
weekly_sales = pd.Series(
    (trend_w + seasonal_w + noise_w).clip(min=100),
    index=pd.date_range('2022-01-03', periods=n_weeks, freq='W'))

train_w = weekly_sales[:-8]
test_w  = weekly_sales[-8:]

# Try additive and multiplicative seasonal
for seas_type in ['add', 'mul']:
    hw = ExponentialSmoothing(
        train_w, trend='add', seasonal=seas_type,
        seasonal_periods=52, initialization_method='estimated').fit(optimized=True)
    fc = hw.forecast(8)
    rmse = np.sqrt(((test_w - fc)**2).mean())
    mape = (abs(test_w - fc) / test_w).mean() * 100
    print(f"Seasonal={seas_type}: RMSE={rmse:.1f}, MAPE={mape:.2f}%, AIC={hw.aic:.1f}")

# Reorder forecast for next 4 weeks
best_hw = ExponentialSmoothing(weekly_sales, trend='add', seasonal='add',
                                seasonal_periods=52, initialization_method='estimated').fit(optimized=True)
next4 = best_hw.forecast(4)
print(f"\nNext 4 weeks inventory needs: {next4.round(0).values}")
🏋️ Practice: ETS Grid Search
Generate 5 years of monthly data with trend + seasonality. Write a grid_search_ets(train, test, h) function that tries all combinations of trend in [None, 'add'], seasonal in [None, 'add', 'mul'], damped in [True, False] (only when trend is not None). Return a DataFrame with columns: model_name, AIC, test_RMSE, test_MAPE. Sort by test_RMSE.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 72
ts = pd.Series(100 + np.linspace(0,60,n) + 25*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*5,
               index=pd.date_range('2018-01', periods=n, freq='MS'))
train, test = ts[:-12], ts[-12:]

def grid_search_ets(train, test, h=12):
    results = []
    # TODO: iterate over trend, seasonal, damped combinations
    # TODO: fit ExponentialSmoothing, forecast h steps
    # TODO: compute AIC, RMSE, MAPE
    # TODO: append to results, return sorted DataFrame
    return pd.DataFrame(results)

print(grid_search_ets(train, test).head(5))
✅ Practice Checklist
21. ML Approach: Lag Features & Sklearn

Transform any time series into a supervised ML problem by creating lag features, rolling statistics, and calendar features. This lets you use any sklearn regressor for forecasting.

Creating lag and rolling features
import numpy as np, pandas as pd

np.random.seed(42)
n = 200
ts = pd.Series(
    100 + np.cumsum(np.random.normal(0.3, 2, n)) +
    15 * np.sin(2*np.pi*np.arange(n)/12),
    index=pd.date_range('2007-01', periods=n, freq='MS'),
    name='sales')

def make_features(ts, lags=6, windows=[3, 6, 12]):
    df = pd.DataFrame({'y': ts})
    # Lag features
    for lag in range(1, lags+1):
        df[f'lag_{lag}'] = ts.shift(lag)
    # Rolling statistics
    for w in windows:
        df[f'roll_mean_{w}']  = ts.shift(1).rolling(w).mean()
        df[f'roll_std_{w}']   = ts.shift(1).rolling(w).std()
        df[f'roll_min_{w}']   = ts.shift(1).rolling(w).min()
    # Calendar features
    df['month']     = ts.index.month
    df['month_sin'] = np.sin(2*np.pi*ts.index.month/12)
    df['month_cos'] = np.cos(2*np.pi*ts.index.month/12)
    df['trend']     = np.arange(len(ts))
    return df.dropna()

features = make_features(ts)
print(f"Feature matrix: {features.shape}")
print(f"Features: {list(features.columns)}")
print(f"\nSample row:\n{features.iloc[0].round(3)}")
Sklearn regressor for forecasting (walk-forward)
import numpy as np, pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_percentage_error
import warnings; warnings.filterwarnings('ignore')

np.random.seed(0)
n = 180
ts = pd.Series(
    100 + np.cumsum(np.random.normal(0.3, 1.5, n)) +
    20 * np.sin(2*np.pi*np.arange(n)/12),
    index=pd.date_range('2009-06', periods=n, freq='MS'))

def make_features(ts, lags=12):
    df = pd.DataFrame({'y': ts})
    for lag in range(1, lags+1):
        df[f'lag_{lag}'] = ts.shift(lag)
    df['roll_mean_6'] = ts.shift(1).rolling(6).mean()
    df['roll_std_6']  = ts.shift(1).rolling(6).std()
    df['month_sin']   = np.sin(2*np.pi*ts.index.month/12)
    df['month_cos']   = np.cos(2*np.pi*ts.index.month/12)
    return df.dropna()

data  = make_features(ts)
cutoff = len(data) - 24
X, y  = data.drop('y', axis=1), data['y']

X_tr, X_te = X.iloc[:cutoff], X.iloc[cutoff:]
y_tr, y_te = y.iloc[:cutoff], y.iloc[cutoff:]

model = GradientBoostingRegressor(n_estimators=200, max_depth=4, random_state=42)
model.fit(X_tr, y_tr)
preds = model.predict(X_te)

rmse = np.sqrt(((y_te.values - preds)**2).mean())
mape = mean_absolute_percentage_error(y_te, preds) * 100
print(f"GBR Forecast: RMSE={rmse:.2f}, MAPE={mape:.2f}%")

# Feature importance
imp = pd.Series(model.feature_importances_, index=X.columns).nlargest(5)
print(f"Top features: {imp.round(3).to_dict()}")
Recursive multi-step forecasting
import numpy as np, pandas as pd
from sklearn.ensemble import RandomForestRegressor
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 150
ts = pd.Series(
    np.cumsum(np.random.normal(0.5, 2, n)) + 100 +
    15 * np.sin(2*np.pi*np.arange(n)/12),
    index=pd.date_range('2012-06', periods=n, freq='MS'))

lags = 6
data = pd.DataFrame({'y': ts})
for l in range(1, lags+1):
    data[f'lag_{l}'] = ts.shift(l)
data['month_sin'] = np.sin(2*np.pi*ts.index.month/12)
data['month_cos'] = np.cos(2*np.pi*ts.index.month/12)
data = data.dropna()

X, y = data.drop('y', axis=1), data['y']
X_tr, X_te = X.iloc[:-12], X.iloc[-12:]
y_tr, y_te = y.iloc[:-12], y.iloc[-12:]

model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)

# Recursive forecast: use predictions as future lags
history = list(y_tr.values[-lags:])
preds = []
future_idx = pd.date_range(X_te.index[0], periods=12, freq='MS')
for i, date in enumerate(future_idx):
    row = [history[-l] for l in range(1, lags+1)]
    row += [np.sin(2*np.pi*date.month/12), np.cos(2*np.pi*date.month/12)]
    pred = model.predict([row])[0]
    preds.append(pred)
    history.append(pred)

rmse = np.sqrt(((y_te.values - np.array(preds))**2).mean())
print(f"Recursive RF RMSE: {rmse:.2f}")
print(f"True last 3:  {y_te.values[-3:].round(1)}")
print(f"Pred last 3:  {np.array(preds[-3:]).round(1)}")
💼 Real-World Scenario
A logistics company uses a LightGBM model with 24 lag features, rolling means, and one-hot encoded weekday/month to forecast daily parcel volume 7 days ahead, outperforming SARIMA by 30% MAPE.
Real-World Code
import numpy as np, pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_percentage_error

np.random.seed(42)
n = 365 * 2  # 2 years daily
t = np.arange(n)
daily = pd.Series(
    1000 + 0.5*t +
    300 * np.sin(2*np.pi*t/365) +
    150 * np.sin(2*np.pi*t/7) +   # weekly pattern
    np.random.normal(0, 50, n),
    index=pd.date_range('2022-01-01', periods=n, freq='D'))

def build_features(ts, lags=14):
    df = pd.DataFrame({'y': ts})
    for l in range(1, lags+1):
        df[f'lag_{l}'] = ts.shift(l)
    for w in [7, 14, 30]:
        df[f'rm_{w}'] = ts.shift(1).rolling(w).mean()
    df['dow']        = ts.index.dayofweek
    df['month']      = ts.index.month
    df['week_sin']   = np.sin(2*np.pi*ts.index.dayofweek/7)
    df['week_cos']   = np.cos(2*np.pi*ts.index.dayofweek/7)
    df['annual_sin'] = np.sin(2*np.pi*ts.index.dayofyear/365)
    return df.dropna()

data = build_features(daily)
X, y = data.drop('y', axis=1), data['y']
split = len(data) - 90
X_tr, X_te, y_tr, y_te = X.iloc[:split], X.iloc[split:], y.iloc[:split], y.iloc[split:]

gbr = GradientBoostingRegressor(n_estimators=300, max_depth=5, learning_rate=0.05, random_state=42)
gbr.fit(X_tr, y_tr)
preds = gbr.predict(X_te)

mape = mean_absolute_percentage_error(y_te, preds) * 100
rmse = np.sqrt(((y_te.values - preds)**2).mean())
print(f"Daily volume forecast: MAPE={mape:.2f}%, RMSE={rmse:.1f}")
top5 = pd.Series(gbr.feature_importances_, index=X.columns).nlargest(5)
print(f"Top features: {top5.round(3).to_dict()}")
🏋️ Practice: Feature Engineering for Weekly Data
Generate 3 years of weekly store sales (trend + annual seasonality + weekly dip on off-season weeks). Create features: lags 1-8, rolling mean/std at 4 and 8 weeks, week-of-year sine/cosine, year indicator. Train RandomForest on first 2.5 years, test on last 6 months. Report RMSE and MAPE. Plot (print) actual vs predicted for the test period.
Starter Code
import numpy as np, pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_percentage_error

np.random.seed(42)
n = 156  # 3 years weekly
t = np.arange(n)
ts = pd.Series(
    5000 + 10*t + 800*np.sin(2*np.pi*t/52) + np.random.normal(0,150,n),
    index=pd.date_range('2021-01-04', periods=n, freq='W'))

# TODO: create lag + rolling + calendar features
# TODO: train/test split (last 26 weeks = test)
# TODO: train RandomForest
# TODO: report RMSE and MAPE
# TODO: print actual vs predicted for test
✅ Practice Checklist
22. Time Series Anomaly Detection

Detect anomalous time points using statistical control limits, rolling z-scores, STL residuals, and IsolationForest. Different methods suit different anomaly types: spikes, level shifts, or trend changes.

Rolling z-score and IQR fences
import numpy as np, pandas as pd

np.random.seed(42)
n = 200
ts = pd.Series(
    50 + np.cumsum(np.random.normal(0.1, 1, n)),
    index=pd.date_range('2020-01-01', periods=n, freq='D'))

# Inject anomalies
anomaly_idx = [40, 90, 140, 170]
ts.iloc[anomaly_idx] += np.array([25, -30, 20, -25])

# Method 1: Rolling z-score
window = 20
roll_mean = ts.rolling(window, min_periods=1).mean()
roll_std  = ts.rolling(window, min_periods=1).std().fillna(1)
z_scores  = (ts - roll_mean) / roll_std

anomalies_z = ts[abs(z_scores) > 3]
print(f"Rolling z-score (|z|>3): {len(anomalies_z)} anomalies detected")
print(f"  True: {anomaly_idx}")
print(f"  Detected: {list(anomalies_z.index.strftime('%Y-%m-%d')[:5])}")

# Method 2: IQR fences on rolling window
q1 = ts.rolling(30).quantile(0.25)
q3 = ts.rolling(30).quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
anomalies_iqr = ts[(ts < lower) | (ts > upper)]
print(f"IQR fences: {len(anomalies_iqr)} anomalies detected")
STL residuals for anomaly detection
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL

np.random.seed(0)
n = 120
ts = pd.Series(
    100 + np.linspace(0,30,n) + 20*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0, 3, n),
    index=pd.date_range('2014-01', periods=n, freq='MS'))

# Inject anomalies at specific months
anom_idx = [24, 50, 85, 100]
ts.iloc[anom_idx] += [40, -35, 50, -40]

stl = STL(ts, period=12, robust=True)
res = stl.fit()
residuals = pd.Series(res.resid, index=ts.index)

# Flag residuals beyond 3 sigma (estimated robustly)
from scipy.stats import median_abs_deviation
mad  = median_abs_deviation(residuals.dropna())
robust_std = mad / 0.6745
threshold  = 3 * robust_std

flags = abs(residuals) > threshold
detected = ts[flags]
print(f"STL residual anomaly detection (threshold={threshold:.2f}):")
print(f"  Flagged {flags.sum()} points")
print(f"  True anomaly indices: {anom_idx}")
print(f"  Detected at: {list(detected.index.strftime('%Y-%m')[:6])}")

tp = sum(1 for i in anom_idx if flags.iloc[i])
fp = flags.sum() - tp
print(f"  TP={tp}, FP={fp}")
IsolationForest on windowed features
import numpy as np, pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

np.random.seed(42)
n = 365
ts = pd.Series(
    100 + 0.05*np.arange(n) + 20*np.sin(2*np.pi*np.arange(n)/365) +
    np.random.normal(0, 4, n),
    index=pd.date_range('2022-01-01', periods=n, freq='D'))

true_anomalies = [50, 120, 200, 280, 340]
ts.iloc[true_anomalies] += np.array([35, -40, 30, -45, 38])

# Build windowed features for each point
window = 7
features = pd.DataFrame({
    'value':      ts,
    'diff1':      ts.diff(),
    'roll_mean':  ts.rolling(window).mean(),
    'roll_std':   ts.rolling(window).std(),
    'z_score':    (ts - ts.rolling(window).mean()) / ts.rolling(window).std(),
}).dropna()

scaler = StandardScaler()
X = scaler.fit_transform(features)

iso = IsolationForest(contamination=len(true_anomalies)/n, random_state=42, n_estimators=200)
preds = iso.fit_predict(X)   # -1 = anomaly

detected_idx = features.index[preds == -1]
print(f"IsolationForest detected: {len(detected_idx)} anomalies")
true_dates = ts.index[true_anomalies]
tp = sum(1 for d in detected_idx if any(abs((d - t).days) <= 2 for t in true_dates))
print(f"True positives (within 2 days): {tp}/{len(true_anomalies)}")
💼 Real-World Scenario
A server monitoring system flags CPU usage anomalies in real-time: rolling z-score catches sudden spikes, while STL residuals detect subtle level shifts that z-score misses after trend changes.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from scipy.stats import median_abs_deviation

np.random.seed(42)
n = 1440  # 24 hours Γ— 60 min (1 day, minutely)
t = np.arange(n)
# CPU: daily cycle + trend
cpu = pd.Series(
    40 + 20*np.sin(2*np.pi*t/1440) + np.random.normal(0, 3, n),
    index=pd.date_range('2024-01-01', periods=n, freq='min'))

# Inject anomalies
cpu.iloc[200:210] += 35    # sustained spike
cpu.iloc[800]     += 60    # sudden spike
cpu.iloc[1200]    -= 25    # drop

# Method 1: Rolling z-score (fast, real-time friendly)
w = 60
z = (cpu - cpu.rolling(w).mean()) / cpu.rolling(w).std()
z_flags = (abs(z) > 3).fillna(False)

# Method 2: Hourly aggregation + STL
hourly = cpu.resample('H').mean()
stl = STL(hourly, period=24, robust=True)
resid = pd.Series(stl.fit().resid, index=hourly.index)
mad = median_abs_deviation(resid.dropna())
stl_flags = abs(resid) > 3 * mad / 0.6745

print(f"Rolling z-score flagged: {z_flags.sum()} minutes")
print(f"STL (hourly) flagged:    {stl_flags.sum()} hours")
print(f"Z-score alerts during spike period: {z_flags.iloc[195:215].sum()}")
🏋️ Practice: Multi-Method Anomaly Ensemble
Generate 2 years of daily temperature data (seasonal + trend + noise). Inject 6 anomalies (3 spikes, 3 drops). Implement 3 detectors: (1) rolling 30-day z-score threshold 3, (2) IQR fence on 30-day window, (3) STL residuals with 3-sigma MAD threshold. Print a table showing which detectors caught each injected anomaly (TP matrix).
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from sklearn.ensemble import IsolationForest
from scipy.stats import median_abs_deviation

np.random.seed(42)
n = 730
t = np.arange(n)
ts = pd.Series(
    15 + 0.01*t + 10*np.sin(2*np.pi*t/365) + np.random.normal(0,1.5,n),
    index=pd.date_range('2022-01-01', periods=n, freq='D'),
    name='temperature')

anom_days = [60, 150, 250, 380, 500, 650]
deltas    = [12, -15, 10, -13, 11, -10]
for day, delta in zip(anom_days, deltas):
    ts.iloc[day] += delta

# TODO: implement 3 anomaly detectors
# TODO: print TP matrix for each injected anomaly
✅ Practice Checklist
23. Forecast Evaluation & Backtesting

RMSE and MAPE are standard forecast metrics but have blind spots. SMAPE handles zeros; MASE benchmarks against naive. Walk-forward backtesting gives realistic performance estimates and detects overfitting to a single test window.

RMSE, MAPE, MASE, SMAPE
import numpy as np

def rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred)**2))

def mape(y_true, y_pred):
    mask = y_true != 0
    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

def smape(y_true, y_pred):
    denom = (np.abs(y_true) + np.abs(y_pred)) / 2
    mask  = denom > 0
    return np.mean(np.abs(y_true[mask] - y_pred[mask]) / denom[mask]) * 100

def mase(y_true, y_pred, y_train, m=1):
    # Mean Absolute Scaled Error: normalized by naive seasonal forecast error
    naive_err = np.mean(np.abs(np.diff(y_train, n=m)))
    return np.mean(np.abs(y_true - y_pred)) / naive_err

np.random.seed(42)
y_true  = np.array([100, 150, 130, 200, 175, 220, 195, 210])
y_pred  = y_true + np.random.randn(len(y_true)) * 15
y_train = np.array([80, 90, 110, 95, 105, 120, 115, 130, 140] + list(y_true[:4]))

print(f"RMSE:  {rmse(y_true, y_pred):.3f}")
print(f"MAPE:  {mape(y_true, y_pred):.2f}%")
print(f"SMAPE: {smape(y_true, y_pred):.2f}%")
print(f"MASE:  {mase(y_true, y_pred, np.array(y_train)):.3f}  (< 1 = better than naive)")

# Bias and directional accuracy
bias = np.mean(y_pred - y_true)
da   = np.mean(np.sign(np.diff(y_true)) == np.sign(np.diff(y_pred))) * 100
print(f"Bias:  {bias:.3f} (positive = over-forecast)")
print(f"Directional accuracy: {da:.1f}%")
Walk-forward cross-validation
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 84
ts = pd.Series(
    100 + np.linspace(0,50,n) + 20*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0,5,n),
    index=pd.date_range('2017-01', periods=n, freq='MS'))

def walk_forward_cv(ts, min_train=24, step=6, h=6):
    errors = []
    starts = range(min_train, len(ts) - h + 1, step)
    for start in starts:
        train = ts[:start]
        test  = ts[start:start+h]
        try:
            hw = ExponentialSmoothing(train, trend='add', seasonal='add',
                                       seasonal_periods=12,
                                       initialization_method='estimated').fit(optimized=True)
            fc = hw.forecast(h)
            rmse = np.sqrt(((test.values - fc.values)**2).mean())
            errors.append({'fold': start, 'n_train': len(train),
                           'rmse': rmse, 'mape': (abs(test - fc)/test).mean()*100})
        except Exception:
            pass
    return pd.DataFrame(errors)

cv_results = walk_forward_cv(ts)
print(f"Walk-forward CV ({len(cv_results)} folds):")
print(cv_results.round(2).to_string(index=False))
print(f"\nMean RMSE: {cv_results.rmse.mean():.3f} Β± {cv_results.rmse.std():.3f}")
print(f"Mean MAPE: {cv_results.mape.mean():.2f}%")
Naive benchmarks and Diebold-Mariano test
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')

np.random.seed(5)
n = 96
ts = pd.Series(
    200 + np.linspace(0,80,n) + 40*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0,8,n),
    index=pd.date_range('2016-01', periods=n, freq='MS'))

train, test = ts[:-12], ts[-12:]

# Naive benchmarks
naive_persistence = pd.Series([train.iloc[-1]] * 12, index=test.index)  # last value
naive_seasonal    = train.iloc[-12:].values  # same season last year

hw = ExponentialSmoothing(train, trend='add', seasonal='add',
                           seasonal_periods=12, initialization_method='estimated').fit(optimized=True)
hw_fc = hw.forecast(12)

forecasts = {
    'Naive Persist': naive_persistence.values,
    'Naive Seasonal': naive_seasonal,
    'Holt-Winters':  hw_fc.values,
}

print(f"{'Model':18s}  {'RMSE':>7s}  {'MAPE':>7s}")
for name, fc in forecasts.items():
    rmse = np.sqrt(((test.values - fc)**2).mean())
    mape = (abs(test.values - fc) / test.values).mean() * 100
    print(f"{name:18s}  {rmse:7.2f}  {mape:7.2f}%")

# MASE relative to seasonal naive
naive_err = np.mean(np.abs(np.diff(train.values, n=12)))
hw_mase = np.mean(np.abs(test.values - hw_fc.values)) / naive_err
print(f"\nHolt-Winters MASE vs seasonal naive: {hw_mase:.3f}")
💼 Real-World Scenario
A demand planner evaluates 3 forecasting models (SARIMA, Holt-Winters, ML) using 5-fold walk-forward CV on 3 years of data. MASE relative to seasonal naive reveals which model truly adds value.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 72
ts = pd.Series(
    500 + np.linspace(0,200,n) + 100*np.sin(2*np.pi*np.arange(n)/12) +
    np.random.normal(0,15,n),
    index=pd.date_range('2018-01', periods=n, freq='MS'))

folds, h = 5, 6
fold_size = (len(ts) - 24) // folds
results = {m: [] for m in ['HW','SARIMA']}

for fold in range(folds):
    cutoff = 24 + fold * fold_size
    train = ts[:cutoff]
    test  = ts[cutoff:cutoff+h]
    if len(test) < h: break
    try:
        hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12,
                                   initialization_method='estimated').fit(optimized=True)
        hw_fc = hw.forecast(h)
        results['HW'].append(np.sqrt(((test.values - hw_fc.values)**2).mean()))
    except: pass
    try:
        sm = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12),
                     enforce_stationarity=False).fit(disp=False)
        sm_fc = sm.forecast(h)
        results['SARIMA'].append(np.sqrt(((test.values - sm_fc.values)**2).mean()))
    except: pass

for model, rmses in results.items():
    if rmses:
        print(f"{model:8s}: mean RMSE={np.mean(rmses):.2f} Β± {np.std(rmses):.2f} ({len(rmses)} folds)")
🏋️ Practice: Comprehensive Forecast Tournament
Generate 5 years of monthly data. Implement a forecast_tournament(ts, h=12) function that: (1) Runs walk-forward CV (4 folds) on SARIMA, Holt-Winters, and a naive seasonal benchmark, (2) Returns a summary DataFrame with model, mean_RMSE, std_RMSE, mean_MAPE, (3) Declares a winner. Test it on your generated data.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')

def forecast_tournament(ts, h=12, n_folds=4):
    # TODO: implement walk-forward CV for each model
    # TODO: return DataFrame: model, mean_RMSE, std_RMSE, mean_MAPE
    pass

np.random.seed(42)
n = 72
ts = pd.Series(
    300 + np.linspace(0,100,n) + 60*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*10,
    index=pd.date_range('2018-01', periods=n, freq='MS'))

results = forecast_tournament(ts, h=6, n_folds=4)
print(results)
✅ Practice Checklist
24. Multivariate Time Series & Prophet

VAR models multiple related time series jointly. Granger causality tests whether one series predicts another. Facebook Prophet is a user-friendly forecaster for business time series with holidays and change points.

Vector Autoregression (VAR)
import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import adfuller

np.random.seed(42)
n = 120
# Two co-moving series: temperature and ice cream sales
temp  = pd.Series(20 + 10*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*2)
sales = 0.8*temp.shift(1).fillna(0) + np.random.randn(n)*5 + 50

df = pd.DataFrame({'temp': temp, 'sales': sales})
# Difference to achieve stationarity
df_diff = df.diff().dropna()

model = VAR(df_diff)
result = model.fit(maxlags=4, ic='aic')
print(f"VAR({result.k_ar}) fitted. AIC={result.aic:.2f}")
print(f"\nGranger causality test (temp -> sales):")

lag_order = result.k_ar
test = result.test_causality('sales', 'temp', kind='f')
print(f"  F-stat={test.test_statistic:.3f}, p={test.pvalue:.4f}")
print(f"  Temp Granger-causes sales? {test.pvalue < 0.05}")

fc = result.forecast(df_diff.values[-lag_order:], steps=6)
print(f"\n6-step forecast (differences):\n{pd.DataFrame(fc, columns=['temp_d','sales_d']).round(2)}")
Granger causality and cross-correlation
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import grangercausalitytests, ccf

np.random.seed(0)
n = 200
# advertising spend leads to sales with 2-period lag
ad_spend  = pd.Series(np.random.normal(100, 20, n))
sales     = 0.7 * ad_spend.shift(2).fillna(0) + np.random.randn(n) * 15 + 200
web_visits = 0.5 * ad_spend.shift(1).fillna(0) + np.random.randn(n) * 10

df = pd.DataFrame({'sales': sales.diff(), 'ad_spend': ad_spend.diff(),
                   'web': web_visits.diff()}).dropna()

print("Granger causality: ad_spend -> sales (maxlag=5):")
gc = grangercausalitytests(df[['sales','ad_spend']], maxlag=3, verbose=False)
for lag, res in gc.items():
    p = res[0]['ssr_ftest'][1]
    print(f"  lag {lag}: F-p={p:.4f}  {'*' if p<0.05 else ''}")

# Cross-correlation function
cc = ccf(df['ad_spend'], df['sales'], alpha=0.05, unbiased=True)
print(f"\nCross-correlation (ad_spend, sales) at lags 0-5:")
for lag, r in enumerate(cc[0][:6]):
    print(f"  lag {lag}: r={r:.3f}")
Facebook Prophet forecasting
import numpy as np, pandas as pd
try:
    from prophet import Prophet
    HAS_PROPHET = True
except ImportError:
    HAS_PROPHET = False
    print("Install: pip install prophet")

if HAS_PROPHET:
    np.random.seed(42)
    n = 180
    dates = pd.date_range('2018-01-01', periods=n, freq='MS')
    trend    = np.linspace(1000, 2000, n)
    seasonal = 300 * np.sin(2*np.pi*np.arange(n)/12)
    noise    = np.random.normal(0, 50, n)
    df = pd.DataFrame({'ds': dates, 'y': trend + seasonal + noise})

    m = Prophet(yearly_seasonality=True, weekly_seasonality=False,
                daily_seasonality=False, changepoint_prior_scale=0.05)
    m.fit(df)

    future = m.make_future_dataframe(periods=24, freq='MS')
    forecast = m.predict(future)

    print(forecast[['ds','yhat','yhat_lower','yhat_upper']].tail(6).round(1).to_string())
    print(f"\nTrend at end: {forecast['trend'].iloc[-1]:.1f}")
else:
    print("Prophet demo: requires 'pip install prophet'")
    print("Prophet uses additive model: y = trend + seasonality + holidays + error")
    print("Key params: changepoint_prior_scale, seasonality_prior_scale, holidays")
💼 Real-World Scenario
A macroeconomist builds a VAR model on monthly GDP, unemployment, and inflation data to forecast 6 months ahead, using Granger causality to confirm which indicators lead others.
Real-World Code
import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
import warnings; warnings.filterwarnings('ignore')

np.random.seed(7)
n = 120
t = np.arange(n)

# Simulated macro indicators (stationary after differencing)
gdp_growth    = pd.Series(0.5 + 0.3*np.sin(2*np.pi*t/24) + np.random.randn(n)*0.8)
unemployment  = pd.Series(5.0 - 0.4*gdp_growth.shift(1).fillna(5) + np.random.randn(n)*0.3)
inflation     = pd.Series(2.0 + 0.2*gdp_growth.shift(2).fillna(2) + np.random.randn(n)*0.4)

df = pd.DataFrame({'gdp': gdp_growth, 'unemp': unemployment, 'inflation': inflation})

model = VAR(df)
fitted = model.fit(maxlags=6, ic='aic')
print(f"VAR({fitted.k_ar}) AIC={fitted.aic:.2f}")

for caused in ['unemp','inflation']:
    test = fitted.test_causality(caused, 'gdp', kind='f')
    print(f"GDP -> {caused}: p={test.pvalue:.4f} {'(significant)' if test.pvalue<0.05 else ''}")

fc = pd.DataFrame(
    fitted.forecast(df.values[-fitted.k_ar:], steps=6),
    columns=['gdp','unemp','inflation'])
print(f"\n6-month forecast:\n{fc.round(3).to_string()}")
🏋️ Practice: Multivariate Forecasting Pipeline
Generate 3 correlated time series (temp, humidity, energy_demand) where energy = 0.7*temp_lag1 + 0.4*humidity + noise. (1) Test stationarity, difference if needed. (2) Fit VAR, select lag order by AIC. (3) Test Granger causality of temp -> energy and humidity -> energy. (4) Forecast 6 steps and compare to actual using RMSE.
Starter Code
import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import adfuller, grangercausalitytests
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)
n = 120
temp     = pd.Series(20 + 10*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*2)
humidity = pd.Series(60 + 15*np.sin(2*np.pi*(np.arange(n)-3)/12) + np.random.randn(n)*4)
energy   = pd.Series(0.7*temp.shift(1).fillna(temp.mean()) + 0.4*humidity + np.random.randn(n)*5 + 20)

df = pd.DataFrame({'temp':temp, 'humidity':humidity, 'energy':energy})

# (1) Test stationarity and difference
# (2) Fit VAR with lag selection
# (3) Granger causality
# (4) Forecast and RMSE
✅ Practice Checklist