π Time Series Analysis
24 topics • Click any card to expand
Pandas DatetimeIndex enables powerful time-based selection, alignment, and operations. Use pd.to_datetime() to parse dates and set_index() to create a time series.
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
dates = pd.date_range('2024-01-01', periods=10, freq='D')
s = pd.Series(rng.integers(100, 500, 10), index=dates)
print(s)
print('\nIndex type:', type(s.index))
print('Freq:', s.index.freq)
print('First date:', s.index[0])
print('Last date:', s.index[-1])import pandas as pd
import numpy as np
rng = np.random.default_rng(7)
df = pd.DataFrame({
'date': ['2024-01-15', '2024-02-20', '2024-03-05', '2024-04-10'],
'revenue': rng.integers(1000, 9999, 4),
'units': rng.integers(10, 200, 4),
})
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
print(df)
print('\n.dt accessor on a column:')
df2 = df.reset_index()
df2['year'] = df2['date'].dt.year
df2['month'] = df2['date'].dt.month
df2['dow'] = df2['date'].dt.day_name()
print(df2[['date','year','month','dow']])import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
idx = pd.date_range('2023-01-01', '2024-12-31', freq='D')
ts = pd.Series(rng.normal(100, 15, len(idx)), index=idx)
# Slice by string
print('Jan 2024:', ts['2024-01'].shape)
print('Q1 2024:', ts['2024-01':'2024-03'].shape)
print('A specific week:', ts['2024-06-10':'2024-06-16'].values.round(2))
# Boolean mask
print('\nDays with value > 130:', (ts > 130).sum())
print('Weekend values mean:', ts[ts.index.dayofweek >= 5].mean().round(2))import pandas as pd
import numpy as np
# Create UTC series and convert timezone
idx = pd.date_range('2024-01-01 00:00', periods=6, freq='6h', tz='UTC')
ts = pd.Series(range(6), index=idx)
print('UTC:')
print(ts)
# Convert to US/Eastern
ts_eastern = ts.tz_convert('US/Eastern')
print('\nUS/Eastern:')
print(ts_eastern)
# Date arithmetic
today = pd.Timestamp('2024-06-15')
print('\n30 days later:', today + pd.Timedelta(days=30))
print('Next Monday:', today + pd.offsets.Week(weekday=0))
print('End of month:', today + pd.offsets.MonthEnd(0))
# Period vs Timestamp
period = pd.Period('2024-Q2', freq='Q')
print('\nQ2 2024 start:', period.start_time)
print('Q2 2024 end: ', period.end_time)import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
raw = pd.DataFrame({
'timestamp': ['2024-01-15 09:32', '2024/02/20 14:05', 'March 5, 2024 11:00', '2024-04-10T08:45:00'],
'store': ['NYC', 'LA', 'CHI', 'HOU'],
'amount': rng.uniform(50, 500, 4).round(2),
})
# Parse mixed formats
raw['ts_parsed'] = pd.to_datetime(raw['timestamp'], infer_datetime_format=True)
raw = raw.set_index('ts_parsed').sort_index()
raw.index = raw.index.tz_localize('US/Eastern').tz_convert('UTC')
print('Cleaned time series:')
print(raw[['store', 'amount']])
print('\nIndex timezone:', raw.index.tz)
print('Date range:', raw.index[0], 'to', raw.index[-1])import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
# TODO: create weekly DatetimeIndex (Mondays only) for 2024
# Hint: pd.date_range(..., freq='W-MON')
# TODO: create Series with random sales
# TODO: resample to quarterly and print
Resampling changes the time frequency of a series β downsampling aggregates to lower frequency, upsampling interpolates to higher frequency.
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
daily = pd.Series(rng.uniform(100, 500, 365), index=idx, name='sales')
# Downsample to different frequencies
print('Weekly sum (first 4):')
print(daily.resample('W').sum().head(4).round(2))
print('\nMonthly stats:')
monthly = daily.resample('ME').agg(['sum','mean','max','min'])
print(monthly.round(2))
print('\nQuarterly mean:')
print(daily.resample('QE').mean().round(2))import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
price = 100 + np.cumsum(rng.normal(0, 1.5, 365))
ts = pd.Series(price, index=idx, name='price')
# OHLC aggregation
weekly_ohlc = ts.resample('W').ohlc()
print('Weekly OHLC (first 5):')
print(weekly_ohlc.head().round(2))
# Custom aggregation
monthly_custom = ts.resample('ME').agg(
open = ('first'),
close = ('last'),
high = ('max'),
low = ('min'),
range = (lambda x: x.max() - x.min()),
)
print('\nMonthly custom agg:')
print(monthly_custom.round(2))import pandas as pd
import numpy as np
rng = np.random.default_rng(7)
# Monthly data β daily via upsampling
monthly_idx = pd.date_range('2024-01-01', periods=6, freq='ME')
monthly = pd.Series([120, 135, 128, 142, 156, 149], index=monthly_idx, name='revenue')
# Forward fill
daily_ffill = monthly.resample('D').ffill()
# Linear interpolation
daily_interp = monthly.resample('D').interpolate('linear')
# Cubic interpolation
daily_cubic = monthly.resample('D').interpolate('cubic')
print('Monthly original:')
print(monthly)
print('\nDaily (first 10, forward fill):')
print(daily_ffill.head(10).round(2))
print('\nDaily (first 10, linear interp):')
print(daily_interp.head(10).round(2))import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=24*7, freq='h', tz='UTC')
hourly = pd.Series(rng.uniform(10, 100, len(idx)), index=idx)
# asfreq β select existing values at new frequency (no aggregation)
every_6h = hourly.asfreq('6h')
print('asfreq every 6h (first 8):')
print(every_6h.head(8).round(2))
# Resample by time-of-day
hourly_mean = hourly.groupby(hourly.index.hour).mean()
print('\nMean by hour of day:')
for h, v in hourly_mean.items():
bar = 'β' * int(v / 10)
print(f' {h:02d}h: {v:5.1f} {bar}')import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2022-01-01', '2024-12-31', freq='D')
noise = rng.normal(0, 20, len(idx))
trend = np.linspace(500, 800, len(idx))
seasonal = 80 * np.sin(2 * np.pi * np.arange(len(idx)) / 365)
daily = pd.Series(trend + seasonal + noise, index=idx, name='revenue').clip(0)
# Aggregate
weekly = daily.resample('W').sum()
monthly = daily.resample('ME').sum()
quarterly = daily.resample('QE').sum()
# Growth rates
monthly_growth = monthly.pct_change() * 100
quarterly_growth = quarterly.pct_change() * 100
print('Monthly revenue with MoM growth (last 6):')
report = pd.DataFrame({'revenue': monthly, 'mom_growth_%': monthly_growth}).tail(6)
print(report.round(2))
print('\nQuarterly totals with QoQ growth:')
print(pd.DataFrame({'revenue': quarterly, 'qoq_%': quarterly_growth}).round(2))import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', '2024-01-31 23:00', freq='h')
# TODO: create hourly temp with sine daily cycle + noise
# TODO: resample to daily min/mean/max
# TODO: print coldest and warmest day
Rolling windows compute statistics over a sliding fixed-length window. Expanding windows grow from the start of the series. Both are essential for smoothing and feature engineering.
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 15, 30), index=idx, name='value')
rm7 = ts.rolling(7).mean()
rm14 = ts.rolling(14).mean()
std7 = ts.rolling(7).std()
result = pd.DataFrame({'raw': ts, 'ma7': rm7, 'ma14': rm14, 'std7': std7})
print(result.tail(10).round(2))
print('\nNaN count from rolling(7):', rm7.isna().sum())import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
ts = pd.Series(rng.normal(50, 10, 20), name='signal')
# min_periods: require at least 3 valid values
r_minp = ts.rolling(7, min_periods=3).mean()
# center=True: window centered on current observation
r_center = ts.rolling(7, center=True).mean()
print('Standard rolling(7): first 7 values')
print(ts.rolling(7).mean().head(7).round(2).tolist())
print('\nWith min_periods=3: first 7 values')
print(r_minp.head(7).round(2).tolist())
print('\nCentered rolling(7): first 7 values')
print(r_center.head(7).round(2).tolist())
# Rolling correlation between two series
s2 = pd.Series(rng.normal(50, 10, 20))
print('\nRolling correlation (last 5):')
print(ts.rolling(5).corr(s2).tail().round(3).tolist())import pandas as pd
import numpy as np
rng = np.random.default_rng(7)
ts = pd.Series(rng.exponential(100, 20), name='revenue')
exp_mean = ts.expanding().mean()
exp_std = ts.expanding().std()
exp_max = ts.expanding().max()
cumsum = ts.cumsum()
result = pd.DataFrame({
'revenue': ts.round(2),
'cum_mean': exp_mean.round(2),
'cum_std': exp_std.round(2),
'cum_max': exp_max.round(2),
'cumsum': cumsum.round(2),
})
print(result)import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 20, 30) + np.linspace(0, 30, 30), index=idx)
# EWM with different spans
ewm5 = ts.ewm(span=5, adjust=False).mean()
ewm14 = ts.ewm(span=14, adjust=False).mean()
# EWM with alpha (decay factor directly)
ewm_a = ts.ewm(alpha=0.3, adjust=False).mean()
result = pd.DataFrame({'raw': ts, 'ewm5': ewm5, 'ewm14': ewm14, 'ewm_a0.3': ewm_a})
print(result.round(2))
print('\nEWM vs SMA final values:')
print(f' EWM(span=5): {ewm5.iloc[-1]:.2f}')
print(f' SMA(window=5):{ts.rolling(5).mean().iloc[-1]:.2f}')import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=90, freq='D')
orders = pd.Series(rng.normal(500, 60, 90), index=idx, name='orders')
# Inject anomalies
orders.iloc[15] = 950
orders.iloc[45] = 120
orders.iloc[72] = 880
roll = orders.rolling(14)
rm = roll.mean()
rs = roll.std()
upper = rm + 2 * rs
lower = rm - 2 * rs
anomalies = orders[(orders > upper) | (orders < lower)]
print(f'Total days: {len(orders)}, Anomalies detected: {len(anomalies)}')
print('\nAnomalous days:')
for date, val in anomalies.items():
print(f' {date.date()}: {val:.0f} orders (mean={rm[date]:.0f}, '
f'bounds=[{lower[date]:.0f}, {upper[date]:.0f}])')import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=60, freq='B')
# TODO: create price series with cumulative returns
# TODO: compute 20-day SMA and Bollinger Bands
# TODO: flag days outside bands and print last 10 rows
Visualizing time series reveals trends, seasonality, and anomalies. Key plots include line charts, seasonal decomposition, ACF/PACF correlograms, and lag plots.
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=365, freq='D')
ts = pd.Series(
100 + np.linspace(0, 50, 365) + 20 * np.sin(2*np.pi*np.arange(365)/365) + rng.normal(0, 5, 365),
index=idx, name='sales'
)
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(ts.index, ts.values, lw=1, color='steelblue', label='Daily')
ax.plot(ts.rolling(30).mean().index, ts.rolling(30).mean(), lw=2, color='tomato', label='30-day MA')
ax.axvline(pd.Timestamp('2023-06-21'), color='green', ls='--', lw=1.5, label='Summer Solstice')
ax.fill_between(ts.index, ts.rolling(30).mean() - ts.rolling(30).std(),
ts.rolling(30).mean() + ts.rolling(30).std(), alpha=0.2, color='tomato')
ax.set(title='Sales Time Series with 30-Day Moving Average', xlabel='Date', ylabel='Sales')
ax.legend(); ax.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('ts_basic.png', dpi=100)
plt.close()
print('Saved ts_basic.png')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
monthly = pd.Series(
200 + 50*np.sin(2*np.pi*np.arange(36)/12) + rng.normal(0, 10, 36),
index=idx, name='revenue'
)
fig, axes = plt.subplots(3, 4, figsize=(14, 8), sharey=True)
month_names = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m in range(1, 13):
ax = axes[(m-1)//4][(m-1)%4]
data = monthly[monthly.index.month == m]
ax.bar(range(len(data)), data.values, color='steelblue', alpha=0.7)
ax.axhline(data.mean(), color='tomato', lw=2)
ax.set_title(month_names[m-1], fontsize=10)
ax.set_xticks([])
plt.suptitle('Seasonal Subseries Plot β Monthly Revenue', fontsize=13)
plt.tight_layout()
plt.savefig('ts_seasonal_sub.png', dpi=100)
plt.close()
print('Saved ts_seasonal_sub.png')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
try:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
rng = np.random.default_rng(42)
# AR(2) process: y_t = 0.7*y_{t-1} - 0.3*y_{t-2} + noise
n = 200
y = np.zeros(n)
eps = rng.normal(0, 1, n)
for t in range(2, n):
y[t] = 0.7*y[t-1] - 0.3*y[t-2] + eps[t]
ts = pd.Series(y)
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
plot_acf(ts, lags=20, ax=axes[0], title='ACF β AR(2) Process')
plot_pacf(ts, lags=20, ax=axes[1], title='PACF β AR(2) Process', method='ywm')
plt.tight_layout()
plt.savefig('ts_acf.png', dpi=100)
plt.close()
print('Saved ts_acf.png')
print('ACF at lag 1:', round(ts.autocorr(1), 3))
print('ACF at lag 2:', round(ts.autocorr(2), 3))
except ImportError:
print('statsmodels not installed β pip install statsmodels')
import pandas as pd, numpy as np
ts = pd.Series(np.random.randn(100))
print('Manual ACF at lags 1-5:')
for lag in range(1, 6):
print(f' lag {lag}: {ts.autocorr(lag):.3f}')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=100, freq='B')
price = 100 + np.cumsum(rng.normal(0, 1.5, 100))
ts = pd.Series(price, index=idx)
returns = ts.pct_change().dropna() * 100
fig, axes = plt.subplots(1, 3, figsize=(14, 4))
# Lag plot
axes[0].scatter(ts.values[:-1], ts.values[1:], alpha=0.5, s=20, color='steelblue')
axes[0].set(title='Lag Plot (lag=1)', xlabel='Price(t)', ylabel='Price(t+1)')
# Return distribution
axes[1].hist(returns, bins=20, color='coral', edgecolor='white', alpha=0.8)
axes[1].set(title='Daily Return Distribution', xlabel='Return (%)', ylabel='Count')
# Rolling volatility
vol = returns.rolling(10).std()
axes[2].plot(vol.index, vol, color='purple', lw=1.5)
axes[2].set(title='10-Day Rolling Volatility', xlabel='Date', ylabel='Std Dev (%)')
axes[2].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig('ts_lag_dist.png', dpi=100)
plt.close()
print('Saved ts_lag_dist.png')
print(f'Return stats: mean={returns.mean():.2f}%, std={returns.std():.2f}%, skew={returns.skew():.2f}')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-02', periods=104, freq='W-MON')
trend = np.linspace(5000, 8000, 104)
seasonal = 1500 * np.sin(2*np.pi*np.arange(104)/52)
sales = pd.Series(trend + seasonal + rng.normal(0, 200, 104), index=idx, name='sales')
y2023 = sales['2023'].values
y2024 = sales['2024'].values[:len(y2023)]
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
axes[0].plot(sales.index, sales, alpha=0.4, lw=1, color='steelblue', label='Weekly')
axes[0].plot(sales.rolling(8).mean().index, sales.rolling(8).mean(), lw=2, color='tomato', label='8-wk MA')
axes[0].set(title='Weekly E-Commerce Sales (2023-2024)', ylabel='Sales ($)')
axes[0].legend(); axes[0].grid(alpha=0.3)
weeks = range(1, min(len(y2023), len(y2024))+1)
axes[1].plot(weeks, y2023[:len(weeks)], label='2023', color='steelblue', lw=2)
axes[1].plot(weeks, y2024[:len(weeks)], label='2024', color='tomato', lw=2)
axes[1].set(title='Year-over-Year Comparison', xlabel='Week', ylabel='Sales ($)')
axes[1].legend(); axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.savefig('ts_dashboard.png', dpi=100)
plt.close()
print('Saved ts_dashboard.png')
yoy_growth = (y2024.mean() / y2023.mean() - 1) * 100
print(f'YoY growth: {yoy_growth:+.1f}%')import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=52, freq='W')
# TODO: create 3 product series with different patterns
# TODO: plot all 3 with rolling averages
plt.savefig('practice_ts.png', dpi=100)
plt.close()
print('Saved practice_ts.png')Decomposition separates a time series into trend, seasonal, and residual components. Additive decomposition (Y = T + S + R) works for constant amplitude seasonality; multiplicative (Y = T Γ S Γ R) for growing amplitude.
import pandas as pd
import numpy as np
try:
from statsmodels.tsa.seasonal import seasonal_decompose
rng = np.random.default_rng(42)
idx = pd.date_range('2021-01-01', periods=48, freq='ME')
trend_comp = np.linspace(100, 180, 48)
seasonal_comp = 20 * np.sin(2*np.pi*np.arange(48)/12)
noise = rng.normal(0, 5, 48)
ts = pd.Series(trend_comp + seasonal_comp + noise, index=idx)
result = seasonal_decompose(ts, model='additive', period=12)
print('Decomposition components (first 6 months):')
df = pd.DataFrame({
'observed': result.observed,
'trend': result.trend,
'seasonal': result.seasonal,
'resid': result.resid,
}).dropna()
print(df.head(6).round(2))
print(f'\nTrend range: {result.trend.dropna().min():.1f} - {result.trend.dropna().max():.1f}')
print(f'Seasonal amplitude: {result.seasonal.max():.1f}')
print(f'Residual std: {result.resid.dropna().std():.2f}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.seasonal import seasonal_decompose
rng = np.random.default_rng(0)
idx = pd.date_range('2020-01-01', periods=60, freq='ME')
t = np.arange(60)
trend = 100 + 2*t
seasonal = 0.3 * np.sin(2*np.pi*t/12) # proportional to trend
additive_ts = pd.Series(trend + 20*np.sin(2*np.pi*t/12) + rng.normal(0,3,60), index=idx)
multiplicative_ts = pd.Series(trend * (1 + seasonal) + rng.normal(0,3,60), index=idx)
for name, ts, model in [('Additive TS', additive_ts, 'additive'),
('Multiplicative TS', multiplicative_ts, 'multiplicative')]:
dec = seasonal_decompose(ts, model=model, period=12)
resid_std = dec.resid.dropna().std()
print(f'{name} ({model}): residual std = {resid_std:.3f}')
print(f' Seasonal max = {dec.seasonal.max():.3f}')
except ImportError:
print('pip install statsmodels')
import numpy as np
t = np.arange(60)
ts = 100 + 2*t + 20*np.sin(2*np.pi*t/12) + np.random.normal(0,3,60)
print('Manual trend (linear fit):')
coeffs = np.polyfit(t, ts, 1)
print(f' slope={coeffs[0]:.2f}, intercept={coeffs[1]:.2f}')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.seasonal import STL
rng = np.random.default_rng(42)
idx = pd.date_range('2020-01-01', periods=60, freq='ME')
ts = pd.Series(
100 + np.linspace(0, 40, 60) + 15*np.sin(2*np.pi*np.arange(60)/12) + rng.normal(0, 4, 60),
index=idx
)
# Add outlier
ts.iloc[24] = 250
stl = STL(ts, period=12, robust=True)
result = stl.fit()
print('STL Decomposition (robust=True):')
df = pd.DataFrame({'trend': result.trend, 'seasonal': result.seasonal, 'resid': result.resid})
print(df.tail(6).round(2))
print(f'\nMax residual (outlier detected at): {result.resid.abs().idxmax().date()}')
print(f'Residual value: {result.resid.abs().max():.1f}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
rng = np.random.default_rng(7)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
t = np.arange(36)
ts = pd.Series(100 + 3*t + 15*np.sin(2*np.pi*t/12) + rng.normal(0, 5, 36), index=idx)
# Fit linear trend
coeffs = np.polyfit(t, ts, 1)
trend = np.polyval(coeffs, t)
detrended = ts.values - trend
# Seasonal component (average over each month)
seasonal = np.zeros(12)
for m in range(12):
seasonal[m] = detrended[m::12].mean()
# Reconstruct and forecast next 6 months
t_fut = np.arange(36, 42)
trend_fut = np.polyval(coeffs, t_fut)
seasonal_fut = seasonal[t_fut % 12]
forecast = trend_fut + seasonal_fut
print(f'Trend: slope={coeffs[0]:.2f}/month, intercept={coeffs[1]:.2f}')
print('\nSeasonal pattern (monthly avg deviation):')
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m, (name, val) in enumerate(zip(months, seasonal)):
print(f' {name}: {val:+.1f}')
print('\nForecast (next 6 months):')
fut_idx = pd.date_range('2025-01-01', periods=6, freq='ME')
for d, v in zip(fut_idx, forecast):
print(f' {d.strftime("%b %Y")}: {v:.1f}')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.seasonal import STL
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=52*7, freq='D')
t = np.arange(len(idx))
trend_comp = 500 + 0.3 * t
weekly_seas = 80 * np.sin(2*np.pi*t/7)
annual_seas = 120 * np.sin(2*np.pi*t/365)
consumption = pd.Series(trend_comp + weekly_seas + annual_seas + rng.normal(0, 15, len(t)), index=idx)
stl = STL(consumption, period=7, robust=True)
res = stl.fit()
weekly_pattern = pd.Series(res.seasonal).groupby(pd.date_range('2024-01-01', periods=len(idx), freq='D').dayofweek).mean()
days = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
print('Weekly consumption pattern (avg deviation from trend):')
for d, v in zip(days, weekly_pattern.reindex(range(7))):
bar = 'β' * int(abs(v) / 10)
print(f' {d}: {v:+.1f} kWh {bar}')
print(f'\nOverall trend: +{(res.trend.iloc[-1] - res.trend.iloc[0]):.1f} kWh over the period')
print(f'Residual std (noise): {res.resid.std():.1f} kWh')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2022-01-01', periods=36, freq='ME')
# TODO: create monthly sales with trend + December spike
# TODO: decompose
# TODO: print seasonal indices by month
Stationarity means constant mean, variance, and autocorrelation over time. Most statistical forecasting models require stationary input. Use the ADF test to check, and differencing to achieve stationarity.
import pandas as pd
import numpy as np
try:
from statsmodels.tsa.stattools import adfuller
rng = np.random.default_rng(42)
# Non-stationary: random walk
rw = pd.Series(np.cumsum(rng.normal(0, 1, 100)))
# Stationary: white noise
wn = pd.Series(rng.normal(0, 1, 100))
for name, ts in [('Random Walk', rw), ('White Noise', wn)]:
result = adfuller(ts, autolag='AIC')
print(f'{name}:')
print(f' ADF Statistic: {result[0]:.4f}')
print(f' p-value: {result[1]:.4f}')
print(f' Critical 5%: {result[4]["5%"]:.4f}')
conclusion = 'STATIONARY' if result[1] < 0.05 else 'NON-STATIONARY'
print(f' Conclusion: {conclusion}')
print()
except ImportError:
print('pip install statsmodels')
import numpy as np
rng = np.random.default_rng(42)
ts = np.cumsum(rng.normal(0, 1, 100))
# Simple manual check: variance of first vs second half
print('Variance first half:', np.var(ts[:50]).round(2))
print('Variance second half:', np.var(ts[50:]).round(2))import pandas as pd
import numpy as np
try:
from statsmodels.tsa.stattools import adfuller
rng = np.random.default_rng(0)
idx = pd.date_range('2020-01-01', periods=60, freq='ME')
# Trend + noise (non-stationary)
ts = pd.Series(100 + 2*np.arange(60) + rng.normal(0, 5, 60), index=idx)
diff1 = ts.diff().dropna()
diff2 = ts.diff().diff().dropna()
for name, series in [('Original', ts), ('1st Diff', diff1), ('2nd Diff', diff2)]:
adf = adfuller(series, autolag='AIC')
stat = 'STATIONARY' if adf[1] < 0.05 else 'NON-STATIONARY'
print(f'{name:10s}: ADF={adf[0]:7.3f}, p={adf[1]:.4f} β {stat}')
print('\nDescriptive stats after 1st differencing:')
print(diff1.describe().round(3))
except ImportError:
print('pip install statsmodels')
import numpy as np
ts = np.cumsum(np.random.randn(60)) + np.arange(60)
diff1 = np.diff(ts)
print(f'Original mean: {ts.mean():.2f}, Diff mean: {diff1.mean():.2f}')
print(f'Original std: {ts.std():.2f}, Diff std: {diff1.std():.2f}')import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2020-01-01', periods=48, freq='ME')
# Growing variance (multiplicative process)
t = np.arange(48)
ts = pd.Series(100 * np.exp(0.05*t + rng.normal(0, 0.1, 48)), index=idx)
log_ts = np.log(ts)
diff_ts = ts.diff().dropna()
log_diff = log_ts.diff().dropna() # log returns
print('Original series stats:')
print(f' Std first half: {ts[:24].std():.2f}')
print(f' Std second half: {ts[24:].std():.2f}')
print('\nLog-transformed stats:')
print(f' Std first half: {log_ts[:24].std():.4f}')
print(f' Std second half: {log_ts[24:].std():.4f}')
print('\nLog-differenced (log returns) first 5:')
print(log_diff.head().round(4))
print(f'Annualized log return: {log_diff.mean() * 12:.3f}')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.stattools import adfuller, kpss
rng = np.random.default_rng(7)
idx = pd.date_range('2020-01-01', periods=60, freq='ME')
ts = pd.Series(
100 + np.linspace(0, 20, 60) + 30*np.sin(2*np.pi*np.arange(60)/12) + rng.normal(0, 3, 60),
index=idx
)
# Seasonal differencing (lag=12)
seasonal_diff = ts.diff(12).dropna()
# Then first difference to remove trend
both_diff = seasonal_diff.diff().dropna()
for name, s in [('Original', ts), ('Seasonal diff(12)', seasonal_diff), ('Both diffs', both_diff)]:
adf_p = adfuller(s)[1]
print(f'{name:20s}: ADF p={adf_p:.4f} β {"stationary" if adf_p < 0.05 else "non-stationary"}')
except ImportError:
print('pip install statsmodels')
import numpy as np
ts = np.array([100 + 30*np.sin(2*np.pi*i/12) for i in range(60)])
sdiff = ts[12:] - ts[:-12]
print('Seasonal diff mean:', sdiff.mean().round(2))
print('Seasonal diff std:', sdiff.std().round(2))import pandas as pd
import numpy as np
try:
from statsmodels.tsa.stattools import adfuller
rng = np.random.default_rng(42)
idx = pd.date_range('2021-01-01', periods=48, freq='ME')
traffic = pd.Series(
5000 * np.exp(0.02 * np.arange(48)) +
2000 * np.sin(2*np.pi*np.arange(48)/12) +
rng.normal(0, 300, 48),
index=idx, name='visitors'
)
def check_stationarity(series, name):
adf = adfuller(series.dropna())
stationary = adf[1] < 0.05
print(f'{name}: p={adf[1]:.4f} | {"β stationary" if stationary else "β non-stationary"}')
return stationary
# Step 1: check original
check_stationarity(traffic, 'Original')
# Step 2: log transform
log_traffic = np.log(traffic)
check_stationarity(log_traffic, 'Log-transformed')
# Step 3: seasonal diff
log_sdiff = log_traffic.diff(12)
check_stationarity(log_sdiff, 'Log + seasonal diff(12)')
# Step 4: regular diff
log_sdiff_diff = log_sdiff.diff()
check_stationarity(log_sdiff_diff, 'Log + sdiff + diff')
print('\nFinal series ready for ARIMA:')
print(log_sdiff_diff.dropna().describe().round(4))
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.stattools import adfuller
rng = np.random.default_rng(0)
idx = pd.date_range('2020-01-01', periods=60, freq='ME')
# TODO: create series with trend + seasonality
# TODO: apply transformations until stationary
# TODO: print ADF result at each step
except ImportError:
print('pip install statsmodels')ARIMA(p,d,q) combines AutoRegression (p lags), Integration (d differences for stationarity), and Moving Average (q lagged forecast errors). SARIMA extends this with seasonal components.
import pandas as pd
import numpy as np
try:
from statsmodels.tsa.arima.model import ARIMA
rng = np.random.default_rng(42)
# AR(1) process
n = 100
y = np.zeros(n)
eps = rng.normal(0, 1, n)
for t in range(1, n):
y[t] = 0.7 * y[t-1] + eps[t]
ts = pd.Series(y)
model = ARIMA(ts, order=(1, 0, 0))
result = model.fit()
print(result.summary().tables[1])
print(f'\nAIC: {result.aic:.2f}')
print(f'BIC: {result.bic:.2f}')
forecast = result.forecast(steps=5)
print('\nForecast (next 5 steps):')
print(forecast.round(3).tolist())
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.arima.model import ARIMA
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(0)
ts = pd.Series(np.cumsum(rng.normal(0, 1, 80))) # random walk β d=1
best_aic = float('inf')
best_order = None
results = []
for p in range(3):
for q in range(3):
try:
m = ARIMA(ts, order=(p, 1, q)).fit()
results.append((p, 1, q, m.aic))
if m.aic < best_aic:
best_aic, best_order = m.aic, (p, 1, q)
except:
pass
results.sort(key=lambda x: x[3])
print('Top 5 ARIMA orders by AIC:')
for p, d, q, aic in results[:5]:
mark = '<-- best' if (p,d,q) == best_order else ''
print(f' ARIMA({p},{d},{q}): AIC={aic:.2f} {mark}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.arima.model import ARIMA
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=60, freq='ME')
ts = pd.Series(
50 + np.linspace(0, 20, 60) + rng.normal(0, 4, 60),
index=idx
)
train, test = ts[:48], ts[48:]
model = ARIMA(train, order=(1, 1, 1)).fit()
fc = model.get_forecast(steps=12)
fc_df = fc.summary_frame(alpha=0.05)
print('Forecast vs Actual (last 6 months):')
comparison = pd.DataFrame({
'actual': test.values,
'forecast': fc_df['mean'].values,
'lower_95': fc_df['mean_ci_lower'].values,
'upper_95': fc_df['mean_ci_upper'].values,
}, index=test.index)
print(comparison.round(2))
from sklearn.metrics import mean_absolute_error
mae = mean_absolute_error(test, fc_df['mean'])
print(f'\nMAE: {mae:.2f}')
except ImportError:
print('pip install statsmodels scikit-learn')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(7)
idx = pd.date_range('2020-01-01', periods=48, freq='ME')
ts = pd.Series(
100 + np.linspace(0, 20, 48) + 25*np.sin(2*np.pi*np.arange(48)/12) + rng.normal(0, 4, 48),
index=idx
)
train = ts[:36]
# SARIMA(1,1,1)(1,1,0,12)
model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,0,12)).fit(disp=False)
fc = model.forecast(steps=12)
actual = ts[36:]
print('SARIMA Forecast vs Actual:')
for d, f, a in zip(fc.index, fc.values, actual.values):
err = f - a
print(f' {d.strftime("%b %Y")}: forecast={f:.1f}, actual={a:.1f}, error={err:+.1f}')
rmse = np.sqrt(np.mean((fc.values - actual.values)**2))
print(f'\nRMSE: {rmse:.2f}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
idx = pd.date_range('2021-01-01', periods=36, freq='ME')
demand = pd.Series(
500 + np.linspace(0, 100, 36) +
150 * np.sin(2*np.pi*np.arange(36)/12) +
rng.normal(0, 20, 36),
index=idx
).clip(0)
train, test = demand[:30], demand[30:]
model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,0,12)).fit(disp=False)
fc = model.forecast(steps=6)
mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
rmse = np.sqrt(np.mean((test.values - fc.values)**2))
print('3-Month Demand Forecast:')
for d, f, a in zip(fc.index, fc.values, test.values):
print(f' {d.strftime("%b %Y")}: forecast={f:.0f}, actual={a:.0f}')
print(f'\nMAPE: {mape:.1f}%')
print(f'RMSE: {rmse:.1f} units')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.arima.model import ARIMA
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
# TODO: generate AR(2) process
# TODO: split train/test, fit ARIMA(2,0,0)
# TODO: forecast and compute MAE
except ImportError:
print('pip install statsmodels')Exponential smoothing methods weight recent observations more heavily. Simple ES handles level; Holt's adds trend; Holt-Winters adds seasonality. These are fast, interpretable, and competitive with ARIMA.
import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
rng = np.random.default_rng(42)
ts = pd.Series(rng.normal(100, 10, 30))
for alpha in [0.1, 0.3, 0.7]:
model = SimpleExpSmoothing(ts, initialization_method='estimated').fit(smoothing_level=alpha, optimized=False)
fc = model.forecast(3)
print(f'alpha={alpha}: last fitted={model.fittedvalues.iloc[-1]:.2f}, forecast={fc.values.round(2).tolist()}')
# Optimal alpha
optimal = SimpleExpSmoothing(ts, initialization_method='estimated').fit(optimized=True)
print(f'\nOptimal alpha: {optimal.params["smoothing_level"]:.4f}')
print(f'Optimal forecast: {optimal.forecast(3).values.round(2).tolist()}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import Holt
rng = np.random.default_rng(0)
idx = pd.date_range('2024-01-01', periods=24, freq='ME')
ts = pd.Series(50 + 2*np.arange(24) + rng.normal(0, 3, 24), index=idx)
# Holt's with additive trend
for trend in ['add', 'mul']:
try:
model = Holt(ts, exponential=(trend=='mul'), initialization_method='estimated').fit(optimized=True)
fc = model.forecast(6)
print(f'Holt ({trend} trend):')
print(f' alpha={model.params["smoothing_level"]:.3f}, beta={model.params["smoothing_trend"]:.3f}')
print(f' Forecast: {fc.values.round(1).tolist()}')
except:
pass
print('\nActual last 3:', ts.tail(3).values.round(1).tolist())
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
idx = pd.date_range('2020-01-01', periods=48, freq='ME')
ts = pd.Series(
100 + np.linspace(0, 30, 48) + 25*np.sin(2*np.pi*np.arange(48)/12) + rng.normal(0, 4, 48),
index=idx
)
train, test = ts[:36], ts[36:]
model = ExponentialSmoothing(
train, trend='add', seasonal='add', seasonal_periods=12,
initialization_method='estimated'
).fit(optimized=True)
fc = model.forecast(12)
mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
rmse = np.sqrt(np.mean((test.values - fc.values)**2))
print('Holt-Winters forecast:')
print(f' Alpha (level): {model.params["smoothing_level"]:.3f}')
print(f' Beta (trend): {model.params["smoothing_trend"]:.3f}')
print(f' Gamma (season):{model.params["smoothing_seasonal"]:.3f}')
print(f'\nMAPE: {mape:.2f}%')
print(f'RMSE: {rmse:.2f}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, Holt, ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(7)
idx = pd.date_range('2021-01-01', periods=36, freq='ME')
ts = pd.Series(
200 + np.linspace(0, 50, 36) + 30*np.sin(2*np.pi*np.arange(36)/12) + rng.normal(0, 8, 36),
index=idx
)
train, test = ts[:30], ts[30:]
n_fc = 6
models = {
'SES': SimpleExpSmoothing(train, initialization_method='estimated').fit(optimized=True),
'Holt': Holt(train, initialization_method='estimated').fit(optimized=True),
'HW': ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12,
initialization_method='estimated').fit(optimized=True),
}
print(f'{"Method":<8} {"RMSE":>8} {"MAPE%":>8}')
print('-' * 28)
for name, m in models.items():
fc = m.forecast(n_fc)
rmse = np.sqrt(np.mean((test.values - fc.values)**2))
mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
print(f'{name:<8} {rmse:>8.2f} {mape:>8.2f}')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=90, freq='D')
traffic = pd.Series(
500 + np.linspace(0, 50, 90) +
100 * np.sin(2*np.pi*np.arange(90)/7) +
rng.normal(0, 20, 90),
index=idx
).clip(0).round()
train, test = traffic[:75], traffic[75:]
model = ExponentialSmoothing(
train, trend='add', seasonal='add', seasonal_periods=7,
initialization_method='estimated'
).fit(optimized=True)
fc = model.forecast(15)
mae = np.mean(np.abs(test.values - fc.values))
mape = np.mean(np.abs((test.values - fc.values) / test.values)) * 100
print('Holt-Winters Daily Traffic Forecast:')
print(f' Params: alpha={model.params["smoothing_level"]:.3f}, '
f'beta={model.params["smoothing_trend"]:.3f}, '
f'gamma={model.params["smoothing_seasonal"]:.3f}')
print(f'\nForecast (next 15 days):')
days = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
for d, f, a in zip(fc.index, fc.values, test.values):
print(f' {d.strftime("%a %b %d")}: forecast={f:.0f}, actual={a:.0f}')
print(f'\nMAE: {mae:.1f} | MAPE: {mape:.1f}%')
except ImportError:
print('pip install statsmodels')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
idx = pd.date_range('2021-01-01', periods=36, freq='ME')
# TODO: create monthly sales with additive or multiplicative seasonality
# TODO: split and fit both models
# TODO: compare RMSE
except ImportError:
print('pip install statsmodels')Transforming raw time series into supervised learning features enables ML models (XGBoost, LightGBM) to forecast. Key features: lag values, rolling stats, calendar attributes, and cyclical encoding.
import pandas as pd
import numpy as np
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=30, freq='D')
ts = pd.Series(rng.normal(100, 10, 30), index=idx, name='demand')
df = ts.to_frame()
# Create lag features
for lag in [1, 2, 3, 7, 14]:
df[f'lag_{lag}'] = df['demand'].shift(lag)
df.dropna(inplace=True)
print(f'Feature matrix shape: {df.shape}')
print(df.head(5).round(2))
# Correlation with target
corr = df.corr()['demand'].drop('demand')
print('\nLag correlations with demand:')
print(corr.round(3))import pandas as pd
import numpy as np
rng = np.random.default_rng(0)
idx = pd.date_range('2024-01-01', periods=60, freq='D')
ts = pd.Series(rng.normal(500, 80, 60) + np.linspace(0, 50, 60), index=idx, name='sales')
df = ts.to_frame()
# Rolling features
for w in [7, 14]:
roll = ts.rolling(w)
df[f'roll_{w}_mean'] = roll.mean()
df[f'roll_{w}_std'] = roll.std()
df[f'roll_{w}_min'] = roll.min()
df[f'roll_{w}_max'] = roll.max()
# Expanding mean
df['expanding_mean'] = ts.expanding().mean()
df.dropna(inplace=True)
print(f'Feature shape: {df.shape}')
print(df.tail(5).round(1))import pandas as pd
import numpy as np
rng = np.random.default_rng(7)
idx = pd.date_range('2024-01-01', periods=365, freq='D')
df = pd.DataFrame({'sales': rng.normal(100, 15, 365)}, index=idx)
# Calendar features
df['day_of_week'] = df.index.dayofweek
df['day_of_month'] = df.index.day
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
df['is_month_end'] = df.index.is_month_end.astype(int)
# Cyclical encoding (preserves periodicity)
df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
df['month_sin'] = np.sin(2 * np.pi * (df['month'] - 1) / 12)
df['month_cos'] = np.cos(2 * np.pi * (df['month'] - 1) / 12)
print('Feature matrix (first 5 rows):')
print(df.head(5).round(3))
print(f'\nTotal features: {df.shape[1]-1}')
print('Weekend mean:', df[df['is_weekend']==1]['sales'].mean().round(2))
print('Weekday mean:', df[df['is_weekend']==0]['sales'].mean().round(2))import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=200, freq='D')
ts = pd.Series(
100 + np.linspace(0, 30, 200) + 20*np.sin(2*np.pi*np.arange(200)/7) + rng.normal(0, 5, 200),
index=idx, name='demand'
)
df = ts.to_frame()
# Lag + rolling features
for lag in [1, 2, 3, 7]: df[f'lag_{lag}'] = ts.shift(lag)
df['roll7_mean'] = ts.rolling(7).mean()
df['roll7_std'] = ts.rolling(7).std()
df['dow'] = ts.index.dayofweek
df['dow_sin'] = np.sin(2*np.pi*df['dow']/7)
df['dow_cos'] = np.cos(2*np.pi*df['dow']/7)
df.dropna(inplace=True)
X = df.drop(columns='demand')
y = df['demand']
split = int(len(df) * 0.8)
X_train, X_test = X.iloc[:split], X.iloc[split:]
y_train, y_test = y.iloc[:split], y.iloc[split:]
model = GradientBoostingRegressor(n_estimators=100, max_depth=4, random_state=42)
model.fit(X_train, y_train)
preds = model.predict(X_test)
mae = mean_absolute_error(y_test, preds)
rmse = np.sqrt(mean_squared_error(y_test, preds))
print(f'GBM Forecast: MAE={mae:.2f}, RMSE={rmse:.2f}')
print('\nTop feature importances:')
fi = pd.Series(model.feature_importances_, index=X.columns).sort_values(ascending=False)
for feat, imp in fi.head(5).items():
print(f' {feat:15s}: {imp:.4f}')import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=365, freq='D')
volume = pd.Series(
3000 + np.linspace(0, 500, 365) +
800 * np.sin(2*np.pi*np.arange(365)/7) +
500 * (idx.month == 12).astype(float) + # December peak
rng.normal(0, 100, 365),
index=idx
).clip(0)
df = volume.to_frame(name='volume')
for lag in [1, 2, 3, 7, 14]: df[f'lag_{lag}'] = volume.shift(lag)
for w in [7, 14]: df[f'roll_{w}_mean'] = volume.rolling(w).mean()
df['dow'] = volume.index.dayofweek
df['month'] = volume.index.month
df['is_weekend']= (df['dow'] >= 5).astype(int)
df['dow_sin'] = np.sin(2*np.pi*df['dow']/7)
df['dow_cos'] = np.cos(2*np.pi*df['dow']/7)
df.dropna(inplace=True)
X, y = df.drop('volume', axis=1), df['volume']
split = 300
model = GradientBoostingRegressor(n_estimators=200, max_depth=4, random_state=42)
model.fit(X.iloc[:split], y.iloc[:split])
preds = model.predict(X.iloc[split:])
mae = mean_absolute_error(y.iloc[split:], preds)
print(f'MAE: {mae:.0f} packages/day')
fi = pd.Series(model.feature_importances_, index=X.columns).nlargest(5)
print('\nTop features:')
for f, v in fi.items():
print(f' {f}: {v:.4f}')import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=168, freq='h')
# TODO: create hourly energy with daily cycle
# TODO: build feature matrix
# TODO: train LinearRegression and print RMSE
Proper evaluation of time series forecasts requires time-ordered splits (no leakage), multiple metrics (MAE, RMSE, MAPE), and ideally walk-forward validation to simulate real deployment.
import numpy as np
actual = np.array([100, 120, 130, 115, 140, 125, 160, 155, 170, 180])
forecast = np.array([105, 115, 135, 110, 145, 130, 150, 160, 165, 185])
mae = np.mean(np.abs(actual - forecast))
mse = np.mean((actual - forecast)**2)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((actual - forecast) / actual)) * 100
smape = np.mean(2 * np.abs(actual - forecast) / (np.abs(actual) + np.abs(forecast))) * 100
# R-squared
ss_res = np.sum((actual - forecast)**2)
ss_tot = np.sum((actual - actual.mean())**2)
r2 = 1 - ss_res / ss_tot
print(f'MAE: {mae:.2f}')
print(f'RMSE: {rmse:.2f}')
print(f'MAPE: {mape:.2f}%')
print(f'sMAPE: {smape:.2f}%')
print(f'RΒ²: {r2:.4f}')
print(f'\nBias (mean error): {(forecast - actual).mean():.2f}')import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
rng = np.random.default_rng(42)
idx = pd.date_range('2023-01-01', periods=200, freq='D')
ts = pd.Series(100 + np.linspace(0,30,200) + rng.normal(0, 8, 200), index=idx)
df = ts.to_frame(name='y')
for lag in [1, 7, 14]: df[f'lag_{lag}'] = ts.shift(lag)
df.dropna(inplace=True)
X = df.drop('y', axis=1).values
y = df['y'].values
# Time-ordered split (NEVER random split for time series!)
n = len(X)
split = int(n * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
model = Ridge().fit(X_train, y_train)
preds = model.predict(X_test)
mae = np.mean(np.abs(y_test - preds))
rmse = np.sqrt(np.mean((y_test - preds)**2))
print(f'Train size: {split}, Test size: {n - split}')
print(f'MAE: {mae:.2f}')
print(f'RMSE: {rmse:.2f}')
print('\nNEVER use random train_test_split for time series!')
print('This leaks future information into training data.')import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
rng = np.random.default_rng(42)
n = 120
ts = pd.Series(100 + np.linspace(0, 20, n) + rng.normal(0, 6, n))
df = ts.to_frame(name='y')
for lag in [1, 2, 3, 7]: df[f'lag_{lag}'] = ts.shift(lag)
df.dropna(inplace=True)
X = df.drop('y', axis=1).values
y = df['y'].values
# Walk-forward validation
min_train = 30
step = 5
errors = []
for test_start in range(min_train, len(X) - step + 1, step):
X_train, y_train = X[:test_start], y[:test_start]
X_test, y_test = X[test_start:test_start+step], y[test_start:test_start+step]
model = Ridge().fit(X_train, y_train)
preds = model.predict(X_test)
errors.extend(np.abs(y_test - preds).tolist())
print(f'Walk-forward CV ({len(errors)} predictions):')
print(f' MAE: {np.mean(errors):.2f}')
print(f' RMSE: {np.sqrt(np.mean(np.array(errors)**2)):.2f}')
print(f' Std: {np.std(errors):.2f}')import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.ensemble import GradientBoostingRegressor
rng = np.random.default_rng(42)
n = 200
ts = pd.Series(100 + np.linspace(0,40,n) + rng.normal(0,8,n))
df = ts.to_frame(name='y')
for lag in [1,2,3,7]: df[f'lag_{lag}'] = ts.shift(lag)
df['roll7'] = ts.rolling(7).mean()
df.dropna(inplace=True)
X = df.drop('y', axis=1)
y = df['y']
tscv = TimeSeriesSplit(n_splits=5, gap=0)
model = GradientBoostingRegressor(n_estimators=50, max_depth=3, random_state=42)
scores = cross_val_score(model, X, y, cv=tscv, scoring='neg_mean_absolute_error')
maes = -scores
print('TimeSeriesSplit CV results (5 folds):')
for i, (mae, (tr, te)) in enumerate(zip(maes, tscv.split(X)), 1):
print(f' Fold {i}: train={len(tr)}, test={len(te)}, MAE={mae:.2f}')
print(f'\nMean MAE: {maes.mean():.2f} Β± {maes.std():.2f}')import pandas as pd
import numpy as np
try:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import GradientBoostingRegressor
import warnings; warnings.filterwarnings('ignore')
rng = np.random.default_rng(42)
n = 200
idx = pd.date_range('2024-01-01', periods=n, freq='D')
ts = pd.Series(
300 + np.linspace(0,50,n) + 60*np.sin(2*np.pi*np.arange(n)/7) + rng.normal(0,15,n),
index=idx
)
train, test = ts[:160], ts[160:]
results = {}
# Holt-Winters
hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=7, initialization_method='estimated').fit(optimized=True)
results['Holt-Winters'] = hw.forecast(40)
# GBM with lag features
df = ts.to_frame(name='y')
for lag in [1,2,7]: df[f'lag_{lag}'] = ts.shift(lag)
df.dropna(inplace=True)
X, y = df.drop('y',axis=1), df['y']
split = len(train) - 7 # adjust for lag
gbm = GradientBoostingRegressor(n_estimators=100, random_state=42)
gbm.fit(X.iloc[:split], y.iloc[:split])
results['GBM'] = pd.Series(gbm.predict(X.iloc[split:split+40]), index=test.index[:40])
actual = test.values[:40]
print('Model Comparison (40-day holdout):')
print(f'{"Model":<15} {"MAE":>7} {"RMSE":>7} {"MAPE%":>7}')
print('-' * 38)
for name, fc in results.items():
fc_v = fc.values[:40]
mae = np.mean(np.abs(actual - fc_v))
rmse = np.sqrt(np.mean((actual - fc_v)**2))
mape = np.mean(np.abs((actual - fc_v)/actual))*100
print(f'{name:<15} {mae:>7.1f} {rmse:>7.1f} {mape:>7.1f}')
except ImportError:
print('pip install statsmodels scikit-learn')import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import Ridge
from sklearn.ensemble import GradientBoostingRegressor
rng = np.random.default_rng(42)
idx = pd.date_range('2024-01-01', periods=180, freq='D')
# TODO: create demand series
# TODO: create feature matrix
# TODO: cross-validate all three models
Use Facebook Prophet for additive time series forecasting with automatic trend, seasonality, and holiday effects. Handle changepoints and produce uncertainty intervals.
import pandas as pd
import numpy as np
# Simulate daily data with trend + weekly seasonality (Prophet-ready format)
np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
weekly = 10 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 5, 365)
y = trend + weekly + noise
df = pd.DataFrame({'ds': dates, 'y': y})
print('Prophet input format:')
print(df.head())
print(f'\nShape: {df.shape}')
print(f'Date range: {df.ds.min().date()} to {df.ds.max().date()}')
print(f'y stats: mean={df.y.mean():.1f}, std={df.y.std():.1f}')
# Without Prophet installed, demonstrate the workflow:
print('\nProphet workflow:')
print(' from prophet import Prophet')
print(' m = Prophet(yearly_seasonality=False, weekly_seasonality=True)')
print(' m.fit(df)')
print(' future = m.make_future_dataframe(periods=30)')
print(' forecast = m.predict(future)')
print(' m.plot(forecast)')import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
weekly = 8 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 5, 365)
y = trend + weekly + noise
df = pd.DataFrame({'ds': dates, 'y': y})
df['dow'] = df['ds'].dt.dayofweek
# Fit trend with linear regression
from sklearn.linear_model import LinearRegression
t = np.arange(len(df)).reshape(-1, 1)
lr = LinearRegression().fit(t, df['y'])
df['trend_fit'] = lr.predict(t)
# Fit weekly seasonality as day-of-week means on residuals
df['resid'] = df['y'] - df['trend_fit']
dow_effect = df.groupby('dow')['resid'].mean()
df['seasonal'] = df['dow'].map(dow_effect)
df['yhat'] = df['trend_fit'] + df['seasonal']
mse = ((df['y'] - df['yhat'])**2).mean()**0.5
print(f'RMSE (train): {mse:.2f}')
fig, axes = plt.subplots(3, 1, figsize=(10, 8))
axes[0].plot(df['ds'], df['y'], alpha=0.5, label='actual'); axes[0].plot(df['ds'], df['yhat'], label='fit'); axes[0].legend(); axes[0].set_title('Additive Model Fit')
axes[1].plot(df['ds'], df['trend_fit']); axes[1].set_title('Trend Component')
axes[2].bar(range(7), dow_effect.values); axes[2].set_xticks(range(7)); axes[2].set_xticklabels(['Mon','Tue','Wed','Thu','Fri','Sat','Sun']); axes[2].set_title('Weekly Seasonality')
plt.tight_layout(); plt.savefig('prophet_decomp.png', dpi=80); plt.close(); print('Saved prophet_decomp.png')import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# Simulate series with structural breaks
np.random.seed(42)
n = 300
t = np.arange(n)
# Three regimes: flat, upward, downward
y = np.concatenate([
np.random.normal(10, 2, 100),
np.random.normal(25, 2, 100),
np.random.normal(15, 2, 100)
])
# Simple CUSUM-based changepoint detection
def cusum_changepoints(series, threshold=10):
mu = series.mean()
cusum_pos = np.zeros(len(series))
cusum_neg = np.zeros(len(series))
changepoints = []
for i in range(1, len(series)):
cusum_pos[i] = max(0, cusum_pos[i-1] + series[i] - mu - 0.5)
cusum_neg[i] = max(0, cusum_neg[i-1] - series[i] + mu - 0.5)
if cusum_pos[i] > threshold or cusum_neg[i] > threshold:
changepoints.append(i)
cusum_pos[i] = cusum_neg[i] = 0
return changepoints
cps = cusum_changepoints(y, threshold=15)
print(f'Detected changepoints at indices: {cps}')
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(t, y, alpha=0.7)
for cp in cps:
ax.axvline(cp, color='red', linestyle='--', linewidth=2, label=f'CP at {cp}' if cp == cps[0] else '')
ax.set_title('CUSUM Changepoint Detection'); ax.legend()
plt.tight_layout(); plt.savefig('changepoints.png', dpi=80); plt.close(); print('Saved changepoints.png')import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
np.random.seed(42)
dates = pd.date_range('2022-01-01', periods=200, freq='D')
trend = np.linspace(0, 50, 200)
weekly = 5 * np.sin(2 * np.pi * np.arange(200) / 7)
noise = np.random.normal(0, 3, 200)
y = trend + weekly + noise
df = pd.DataFrame({'ds': dates, 'y': y})
df['t'] = np.arange(len(df))
df['dow'] = df['ds'].dt.dayofweek
# Encode day-of-week as dummies
for d in range(7):
df[f'dow_{d}'] = (df['dow'] == d).astype(int)
feat_cols = ['t'] + [f'dow_{d}' for d in range(7)]
X = df[feat_cols].values
# Walk-forward validation: train on first 60%, then expand
initial = 120; horizon = 7; errors = []
for start in range(initial, len(df) - horizon, horizon):
X_tr, y_tr = X[:start], y[:start]
X_te, y_te = X[start:start+horizon], y[start:start+horizon]
m = Ridge().fit(X_tr, y_tr)
preds = m.predict(X_te)
errors.append(np.sqrt(((y_te - preds)**2).mean()))
print(f'Walk-forward validation ({len(errors)} folds):')
print(f' Mean RMSE: {np.mean(errors):.2f}')
print(f' Std RMSE: {np.std(errors):.2f}')
print(f' Min RMSE: {min(errors):.2f}, Max: {max(errors):.2f}')import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
np.random.seed(0)
dates = pd.date_range('2022-01-01', periods=730, freq='D')
trend = np.linspace(500, 700, 730)
weekly = 40 * np.sin(2 * np.pi * np.arange(730) / 7 - np.pi)
monthly = 30 * np.sin(2 * np.pi * np.arange(730) / 30)
noise = np.random.normal(0, 15, 730)
y = trend + weekly + monthly + noise
df = pd.DataFrame({'ds': dates, 'y': y})
df['t'] = np.arange(len(df))
df['dow'] = df['ds'].dt.dayofweek
df['dom'] = df['ds'].dt.day
for d in range(7): df[f'dow_{d}'] = (df['dow'] == d).astype(int)
feat_cols = ['t'] + [f'dow_{d}' for d in range(7)] + ['dom']
X = df[feat_cols].values
train = df[df['ds'] < '2023-07-01']; test = df[df['ds'] >= '2023-07-01']
idx_tr = df['ds'] < '2023-07-01'
sc = StandardScaler().fit(X[idx_tr])
m = Ridge(alpha=1.0).fit(sc.transform(X[idx_tr]), y[idx_tr])
resid = y[idx_tr] - m.predict(sc.transform(X[idx_tr]))
pred = m.predict(sc.transform(X[~idx_tr]))
print(f'Forecast RMSE: {np.sqrt(((y[~idx_tr]-pred)**2).mean()):.2f}')
print(f'80% PI: Β±{1.28*resid.std():.2f}')import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
np.random.seed(7)
weeks = pd.date_range('2022-01-03', periods=104, freq='W')
trend = np.linspace(10000, 15000, 104)
seasonality = 1500 * np.sin(2 * np.pi * np.arange(104) / 52)
noise = np.random.normal(0, 300, 104)
y = trend + seasonality + noise
df = pd.DataFrame({'ds': weeks, 'revenue': y})
# TODO: Create features (t index, month dummies, lag-1)
# TODO: Walk-forward cross-validation with 4-week horizon
# TODO: Report average RMSE and 80% PI width
Detect outliers and anomalies in sequential data using statistical thresholds, isolation forests, and autoencoder-style reconstruction errors.
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='D')
y = np.sin(np.arange(100) * 0.3) * 10 + np.random.normal(0, 1, 100) + 50
# Inject anomalies
y[[20, 45, 70, 85]] += [25, -20, 30, -15]
df = pd.DataFrame({'ds': dates, 'y': y})
# Rolling z-score
win = 14
df['roll_mean'] = df['y'].rolling(win, min_periods=1).mean()
df['roll_std'] = df['y'].rolling(win, min_periods=1).std().fillna(1)
df['zscore'] = (df['y'] - df['roll_mean']) / df['roll_std']
df['anomaly_z'] = df['zscore'].abs() > 2.5
print(f'Z-score anomalies: {df["anomaly_z"].sum()} detected')
print(df[df['anomaly_z']][['ds','y','zscore']].to_string())
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(df['ds'], df['y'], label='series')
ax.scatter(df[df['anomaly_z']]['ds'], df[df['anomaly_z']]['y'],
color='red', s=80, zorder=5, label='anomaly')
ax.set_title('Rolling Z-score Anomaly Detection'); ax.legend()
plt.tight_layout(); plt.savefig('anomaly_zscore.png', dpi=80); plt.close(); print('Saved anomaly_zscore.png')import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
n = 200
dates = pd.date_range('2024-01-01', periods=n, freq='H')
cpu = np.random.normal(40, 10, n)
memory = cpu * 1.5 + np.random.normal(0, 5, n)
# Inject anomalies
cpu[[50, 100, 150]] = [95, 5, 98]
memory[[50, 100, 150]] = [30, 120, 25]
df = pd.DataFrame({'ds': dates, 'cpu': cpu, 'memory': memory})
X = df[['cpu', 'memory']].values
iso = IsolationForest(contamination=0.05, random_state=42)
df['anomaly'] = iso.fit_predict(X) == -1
df['score'] = iso.score_samples(X)
print(f'Anomalies detected: {df["anomaly"].sum()}')
print(df[df['anomaly']][['ds','cpu','memory','score']].to_string())
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
for i, col in enumerate(['cpu', 'memory']):
axes[i].plot(df['ds'], df[col], alpha=0.7)
axes[i].scatter(df[df['anomaly']]['ds'], df[df['anomaly']][col],
color='red', s=60, zorder=5)
axes[i].set_ylabel(col)
plt.suptitle('Isolation Forest Anomaly Detection'); plt.tight_layout()
plt.savefig('anomaly_iforest.png', dpi=80); plt.close(); print('Saved anomaly_iforest.png')import numpy as np
np.random.seed(42)
# Simulate normal signal
t = np.linspace(0, 20*np.pi, 500)
normal = np.sin(t) + 0.1 * np.random.randn(500)
# Inject anomaly window
anom = normal.copy()
anom[200:220] += 5 # spike
# Simple threshold-based reconstruction error (PCA substitute)
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# Create sliding windows
def make_windows(x, w=20):
return np.array([x[i:i+w] for i in range(len(x)-w)])
X = make_windows(normal, 20)
X_anom = make_windows(anom, 20)
sc = StandardScaler().fit(X)
X_sc = sc.transform(X)
X_anom_sc = sc.transform(X_anom)
pca = PCA(n_components=5).fit(X_sc)
rec = pca.inverse_transform(pca.transform(X_anom_sc))
errors = ((X_anom_sc - rec)**2).mean(axis=1)
threshold = np.percentile(errors, 95)
print(f'Reconstruction error threshold (95th pct): {threshold:.4f}')
print(f'Anomaly windows detected: {(errors > threshold).sum()}')
print(f'Max error at index: {errors.argmax()} (injected at 200:220)')import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import STL
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=365, freq='D')
trend = np.linspace(0, 30, 365)
seasonal = 10 * np.sin(2 * np.pi * np.arange(365) / 7)
noise = np.random.normal(0, 2, 365)
y = trend + seasonal + noise
# Inject point anomalies
injected = [80, 160, 250, 320]
y_anom = y.copy()
for idx in injected:
y_anom[idx] += np.random.choice([-25, 25])
series = pd.Series(y_anom, index=dates)
stl = STL(series, period=7, robust=True)
result = stl.fit()
residual = result.resid
thresh = 3 * residual.std()
anomalies = series[residual.abs() > thresh]
print(f'STL anomalies detected: {len(anomalies)}')
print(f'Injected at: {injected}')
print(f'Detected at: {list(anomalies.index.dayofyear)}')
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
axes[0].plot(series.index, series, alpha=0.6); axes[0].scatter(anomalies.index, anomalies, color='red', s=60, zorder=5, label='anomaly'); axes[0].legend(); axes[0].set_title('STL Anomaly Detection')
axes[1].plot(residual.index, residual); axes[1].axhline(thresh, color='r', linestyle='--'); axes[1].axhline(-thresh, color='r', linestyle='--'); axes[1].set_title('STL Residual')
plt.tight_layout(); plt.savefig('stl_anomaly.png', dpi=80); plt.close(); print('Saved stl_anomaly.png')import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
np.random.seed(1)
# 2 weeks of 5-min samples = 4032 points
t = np.arange(4032)
daily = 20 * np.sin(2*np.pi*t / 288 - np.pi/2) + 50 # 288 samples/day
noise = np.random.normal(0, 3, 4032)
cpu = np.clip(daily + noise, 5, 100)
# Inject anomalies
cpu[500:510] += 40 # spike
cpu[1200:1230] += 30 # sustained high
times = pd.date_range('2024-01-01', periods=4032, freq='5min')
df = pd.DataFrame({'time': times, 'cpu': cpu})
df['hour'] = df['time'].dt.hour
df['roll_mean'] = df['cpu'].rolling(12).mean().fillna(method='bfill')
df['roll_std'] = df['cpu'].rolling(12).std().fillna(1)
# Isolation Forest on features
X = df[['cpu', 'hour', 'roll_mean', 'roll_std']].values
iso = IsolationForest(contamination=0.01, random_state=42)
df['anomaly'] = iso.fit_predict(X) == -1
print(f'Anomalies detected: {df["anomaly"].sum()} of {len(df)} points ({df["anomaly"].mean():.1%})')
print(df[df['anomaly']].groupby(df[df['anomaly']]['time'].dt.date)['anomaly'].count())import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
np.random.seed(99)
hours = pd.date_range('2024-01-01', periods=336, freq='H') # 2 weeks
base = 100 + 50 * np.sin(2*np.pi*np.arange(336)/24 - np.pi)
noise = np.random.normal(0, 8, 336)
orders = np.clip(base + noise, 0, None)
orders[[100, 200, 280]] += [150, -80, 200] # inject anomalies
df = pd.DataFrame({'time': hours, 'orders': orders})
# TODO: Rolling z-score anomaly detection (window=24, thresh=3)
# TODO: Isolation Forest on (orders, hour, dayofweek)
# TODO: Compare detected anomaly counts and false positive rates
Apply LSTM, Temporal Convolutional Networks (TCN), and N-BEATS-style architectures using PyTorch or Keras for sequence forecasting tasks.
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
# Demonstrate the lag-feature approach that underlies RNN forecasting
np.random.seed(42)
n = 300
t = np.arange(n)
y = np.sin(0.2*t) * 10 + 0.1*t + np.random.normal(0, 1, n)
def make_supervised(series, lags=10):
X, Y = [], []
for i in range(lags, len(series)):
X.append(series[i-lags:i])
Y.append(series[i])
return np.array(X), np.array(Y)
X, Y = make_supervised(y, lags=20)
split = int(0.8 * len(X))
X_tr, X_te = X[:split], X[split:]
Y_tr, Y_te = Y[:split], Y[split:]
sc_x = StandardScaler().fit(X_tr)
sc_y = StandardScaler().fit(Y_tr.reshape(-1,1))
model = Ridge(alpha=0.1)
model.fit(sc_x.transform(X_tr), sc_y.transform(Y_tr.reshape(-1,1)).ravel())
pred_sc = model.predict(sc_x.transform(X_te))
pred = sc_y.inverse_transform(pred_sc.reshape(-1,1)).ravel()
rmse = np.sqrt(((Y_te - pred)**2).mean())
print(f'Lag-20 linear model RMSE: {rmse:.3f}')
print(f'Baseline (mean) RMSE: {np.sqrt(((Y_te - Y_tr.mean())**2).mean()):.3f}')
print('This is the feature-engineering equivalent of an LSTM cell state.')import numpy as np
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor
np.random.seed(42)
n = 400
t = np.arange(n)
y = 20 * np.sin(0.1*t) + 0.05*t + np.random.normal(0, 2, n)
def make_seq2seq(series, lookback=30, horizon=7):
X, Y = [], []
for i in range(lookback, len(series) - horizon + 1):
X.append(series[i-lookback:i])
Y.append(series[i:i+horizon])
return np.array(X), np.array(Y)
X, Y = make_seq2seq(y, lookback=30, horizon=7)
split = int(0.8 * len(X))
X_tr, X_te = X[:split], X[split:]
Y_tr, Y_te = Y[:split], Y[split:]
# Multi-output regression (equivalent to decoder in seq2seq)
model = MultiOutputRegressor(GradientBoostingRegressor(n_estimators=50, random_state=0))
model.fit(X_tr, Y_tr)
pred = model.predict(X_te)
rmse_by_step = np.sqrt(((Y_te - pred)**2).mean(axis=0))
print('RMSE by forecast horizon step:')
for step, r in enumerate(rmse_by_step, 1):
print(f' Step {step}: {r:.3f}')
print(f'Overall RMSE: {rmse_by_step.mean():.3f}')import numpy as np
np.random.seed(42)
n = 500
t = np.arange(n)
y = 15*np.sin(0.15*t) + 5*np.sin(0.05*t) + np.random.normal(0, 1, n)
def make_windows(series, w=20):
X = np.array([series[i-w:i] for i in range(w, len(series))])
Y = series[w:]
return X, Y
X, Y = make_windows(y, 20)
# Simulate attention: weight lags by their correlation with target
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
sc_x = StandardScaler().fit(X)
sc_y = StandardScaler().fit(Y.reshape(-1,1))
X_sc = sc_x.transform(X)
Y_sc = sc_y.transform(Y.reshape(-1,1)).ravel()
# Attention weights = absolute Ridge coefficients (softmax)
model = Ridge(alpha=0.01).fit(X_sc, Y_sc)
raw_weights = np.abs(model.coef_)
attention = raw_weights / raw_weights.sum() # softmax-like
print('Attention weights by lag (most recent = lag 1):')
for lag in range(1, 6):
print(f' Lag {lag:2d}: {attention[-lag]:.4f} ({"high" if attention[-lag] > attention.mean() else "low"})')
print(f'Top-3 most attended lags: {(-attention).argsort()[:3] + 1}')import numpy as np
from sklearn.linear_model import Ridge
np.random.seed(0)
n = 300
t = np.arange(n)
y = 10*np.sin(0.2*t) + 0.1*t + np.random.normal(0, 1.5, n)
# N-BEATS idea: decompose into trend + seasonality bases
def trend_basis(T, degree=3):
"""Polynomial trend basis for N-BEATS trend block."""
t_norm = np.linspace(0, 1, T)
return np.column_stack([t_norm**k for k in range(degree+1)])
def fourier_basis(T, harmonics=5):
"""Fourier seasonality basis for N-BEATS seasonality block."""
t = np.linspace(0, 2*np.pi, T)
cols = []
for k in range(1, harmonics+1):
cols.extend([np.cos(k*t), np.sin(k*t)])
return np.column_stack(cols)
LB = 30 # lookback
X, Y = [], []
for i in range(LB, n - 7):
window = y[i-LB:i]
trend_feats = trend_basis(LB, degree=2).T @ window / LB
season_feats = fourier_basis(LB, harmonics=3).T @ window / LB
X.append(np.concatenate([trend_feats, season_feats]))
Y.append(y[i:i+7].mean())
X, Y = np.array(X), np.array(Y)
split = int(0.8*len(X))
model = Ridge(alpha=0.1).fit(X[:split], Y[:split])
pred = model.predict(X[split:])
rmse = np.sqrt(((Y[split:] - pred)**2).mean())
print(f'N-BEATS basis expansion RMSE: {rmse:.3f}')
print(f'Trend features: {trend_basis(LB, 2).shape[1]}, Fourier features: {fourier_basis(LB, 3).shape[1]}')import numpy as np
import pandas as pd
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor
np.random.seed(5)
hours = pd.date_range('2023-01-01', periods=8760, freq='H')
daily = 500 + 200*np.sin(2*np.pi*np.arange(8760)/24 - np.pi)
weekly= 50 *np.sin(2*np.pi*np.arange(8760)/(24*7))
noise = np.random.normal(0, 20, 8760)
demand = daily + weekly + noise
df = pd.DataFrame({'time': hours, 'demand': demand, 'hour': hours.hour, 'dow': hours.dayofweek})
LB, H = 24*7, 24 # 7-day lookback, 24h horizon
X, Y = [], []
for i in range(LB, len(df)-H):
feats = list(df['demand'].values[i-LB:i])
feats += [df['hour'].iloc[i], df['dow'].iloc[i]]
X.append(feats)
Y.append(df['demand'].values[i:i+H])
X, Y = np.array(X), np.array(Y)
split = int(0.85*len(X))
model = MultiOutputRegressor(GradientBoostingRegressor(n_estimators=30, random_state=0), n_jobs=-1)
model.fit(X[:split], Y[:split])
pred = model.predict(X[split:])
rmse = np.sqrt(((Y[split:]-pred)**2).mean())
baseline = np.sqrt(((Y[split:]-Y[split-1:-1])**2).mean()) # persistence
print(f'GBM RMSE: {rmse:.2f}, Persistence RMSE: {baseline:.2f}')
print(f'Improvement: {(baseline-rmse)/baseline:.1%}')import numpy as np
import pandas as pd
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor
np.random.seed(3)
hours = pd.date_range('2023-01-01', periods=8760, freq='H')
raw = np.maximum(0, np.sin(2*np.pi*np.arange(8760)/24 - np.pi/2))
solar = raw * 1000 + np.random.normal(0, 30, 8760)
df = pd.DataFrame({'time': hours, 'solar': solar})
LB, H = 48, 12
# TODO: Build (X, Y) windows with lookback=48, horizon=12
# TODO: Train GBM MultiOutputRegressor
# TODO: Report RMSE for steps 1-4 vs steps 9-12
# TODO: Compute attention weights via per-lag correlation
import numpy as np
import pywt
np.random.seed(0)
t = np.linspace(0, 8*np.pi, 512)
signal = (np.sin(t) + 0.5*np.sin(5*t) + np.random.normal(0, 0.2, 512))
wavelet = 'db4'
level = 4
coeffs = pywt.wavedec(signal, wavelet, level=level)
print(f"Decomposition levels: {len(coeffs)-1}")
print(f"Approximation coeff shape: {coeffs[0].shape}")
for i, c in enumerate(coeffs[1:], 1):
print(f" Detail level {i}: shape={c.shape}, energy={np.sum(c**2):.2f}")
# Denoise: zero out small detail coefficients
threshold = 0.3
coeffs_denoised = [coeffs[0]] + [pywt.threshold(c, threshold, mode='soft') for c in coeffs[1:]]
denoised = pywt.waverec(coeffs_denoised, wavelet)[:len(signal)]
mse = np.mean((signal - denoised)**2)
print(f"Denoising MSE: {mse:.4f}")import numpy as np
import pywt
np.random.seed(1)
fs = 100 # Hz
t = np.arange(0, 4, 1/fs)
# Chirp: frequency increases from 5 to 20 Hz
freq = 5 + 15*t/4
signal = np.sin(2*np.pi*freq*t) + np.random.normal(0, 0.1, len(t))
widths = np.arange(1, 64)
cwt_matrix, freqs = pywt.cwt(signal, widths, 'morl', sampling_period=1/fs)
power = np.abs(cwt_matrix)**2
# Peak frequency at start vs end
t_start = slice(0, 50); t_end = slice(350, 400)
peak_freq_start = freqs[np.argmax(power[:, t_start].mean(axis=1))]
peak_freq_end = freqs[np.argmax(power[:, t_end].mean(axis=1))]
print(f"Dominant frequency (start): {peak_freq_start:.1f} Hz")
print(f"Dominant frequency (end): {peak_freq_end:.1f} Hz")import numpy as np
import pywt
np.random.seed(42)
n = 1000
ts = np.sin(2*np.pi*np.arange(n)/50) + np.random.normal(0, 0.1, n)
# Inject anomalies
ts[300:310] += 3.0
ts[700] -= 4.0
# DWT anomaly detection via reconstruction error per window
coeffs = pywt.wavedec(ts, 'haar', level=3)
# Zero out detail at level 1 (high-freq noise)
coeffs[1] = np.zeros_like(coeffs[1])
reconstructed = pywt.waverec(coeffs, 'haar')[:n]
residuals = np.abs(ts - reconstructed)
threshold = residuals.mean() + 3*residuals.std()
anomaly_idx = np.where(residuals > threshold)[0]
print(f"Anomalies detected at indices: {anomaly_idx[:10]}")
print(f"True anomaly regions: 300-310, 700")import numpy as np
import pywt
np.random.seed(7)
fs = 1000 # 1 kHz sampling
t = np.arange(0, 2, 1/fs) # 2 seconds
# Normal operation: 10 Hz fundamental + harmonics
normal = (np.sin(2*np.pi*10*t) + 0.3*np.sin(2*np.pi*20*t) +
np.random.normal(0, 0.05, len(t)))
# Fault: adds 150 Hz bearing frequency
fault = normal + 0.5*np.sin(2*np.pi*150*t)
for label, sig in [('Normal', normal), ('Fault', fault)]:
coeffs = pywt.wavedec(sig, 'db4', level=5)
# Level 1 detail captures highest frequencies
hf_energy = np.sum(coeffs[1]**2) / np.sum(sig**2)
print(f"{label}: high-freq energy ratio = {hf_energy:.4f}")
energies = [np.sum(c**2) / np.sum(sig**2) for c in coeffs]
print(f" Energy by level: " + ", ".join(f"L{i}={e:.3f}" for i, e in enumerate(energies)))import numpy as np
import pywt
fs = 256 # Hz
t = np.arange(0, 1, 1/fs)
# Multi-band EEG signal
delta = 0.5 * np.sin(2*np.pi*2*t)
alpha = 0.3 * np.sin(2*np.pi*10*t)
beta = 0.2 * np.sin(2*np.pi*20*t)
noise = np.random.normal(0, 0.05, len(t))
eeg = delta + alpha + beta + noise
# Inject spike
spike_start = int(0.5 * fs)
eeg[spike_start:spike_start+13] += 3.0
# TODO: DWT decomposition with 'db4', level=5
# TODO: Identify which detail levels correspond to each EEG band
# TODO: Compute relative band powers
# TODO: Detect spike using reconstruction error
import numpy as np
import pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
np.random.seed(42)
n = 200
e1, e2 = np.random.normal(0, 1, n), np.random.normal(0, 1, n)
y1, y2 = np.zeros(n), np.zeros(n)
for t in range(1, n):
y1[t] = 0.6*y1[t-1] + 0.2*y2[t-1] + e1[t]
y2[t] = 0.1*y1[t-1] + 0.7*y2[t-1] + e2[t]
df = pd.DataFrame({'y1': y1, 'y2': y2})
model = VAR(df)
results = model.fit(maxlags=4, ic='aic')
print(f"Selected lag order: {results.k_ar}")
forecast = results.forecast(df.values[-results.k_ar:], steps=5)
print("5-step forecast:"); print(forecast.round(3))import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import grangercausalitytests
np.random.seed(1)
n = 300
eps1 = np.random.normal(0, 1, n)
eps2 = np.random.normal(0, 1, n)
x = np.zeros(n); y = np.zeros(n)
for t in range(2, n):
x[t] = 0.5*x[t-1] + eps1[t]
y[t] = 0.4*x[t-1] + 0.3*y[t-1] + eps2[t] # x Granger-causes y
df = pd.DataFrame({'y': y, 'x': x})
max_lag = 4
print("Granger causality (x -> y):")
res = grangercausalitytests(df[['y','x']], maxlag=max_lag, verbose=False)
for lag, r in res.items():
f_stat = r[0]['ssr_ftest'][0]
p_val = r[0]['ssr_ftest'][1]
print(f" Lag {lag}: F={f_stat:.3f}, p={p_val:.4f}")import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
from statsmodels.regression.linear_model import OLS
np.random.seed(5)
n = 500
common_trend = np.cumsum(np.random.normal(0, 1, n))
y1 = common_trend + np.random.normal(0, 0.5, n)
y2 = 0.8 * common_trend + 1.2 + np.random.normal(0, 0.5, n)
score, pvalue, _ = coint(y1, y2)
print(f"Cointegration test: t={score:.4f}, p={pvalue:.4f}")
# Estimate cointegrating vector
beta = OLS(y1, np.column_stack([np.ones(n), y2])).fit().params
spread = y1 - beta[1]*y2 - beta[0]
z_spread = (spread - spread.mean()) / spread.std()
print(f"Spread mean-reversion: current z-score = {z_spread[-1]:.3f}")import numpy as np
import pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import grangercausalitytests
np.random.seed(10)
n = 120 # 10 years monthly
# Simulate: interest rate affects inflation with lag, both affect GDP
ir = np.cumsum(np.random.normal(0, 0.1, n)) + 3.0
inf = 0.5*np.roll(ir, 2) + np.cumsum(np.random.normal(0, 0.05, n)) + 2.0
gdp = -0.3*np.roll(ir, 3) + 0.4*np.roll(inf, 1) + np.cumsum(np.random.normal(0, 0.08, n)) + 2.5
df = pd.DataFrame({'gdp': gdp, 'inflation': inf, 'interest_rate': ir})
df = df.diff().dropna() # difference to achieve stationarity
model = VAR(df)
res = model.fit(maxlags=6, ic='aic')
print(f"VAR lag order: {res.k_ar}")
forecast = res.forecast(df.values[-res.k_ar:], steps=12)
print(f"12-month GDP forecast range: {forecast[:,0].min():.3f} to {forecast[:,0].max():.3f}")
# Impulse response
irf = res.irf(10)
print(f"GDP response to interest shock at lag 3: {irf.orth_irfs[3, 0, 2]:.4f}")import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint
from statsmodels.regression.linear_model import OLS
np.random.seed(3)
n = 500
common = np.cumsum(np.random.normal(0, 1, n))
price_a = common + np.random.normal(0, 0.5, n) + 50
price_b = 0.9*common + np.random.normal(0, 0.5, n) + 45
# TODO: Cointegration test
# TODO: Compute hedge ratio and spread
# TODO: Z-score of spread
# TODO: Generate trading signals (long A-short B when z<-2, vice versa)
# TODO: Compute daily P&L, Sharpe ratio, max drawdown
# pip install river
from river import linear_model, preprocessing, metrics, stream
import numpy as np
np.random.seed(0)
n = 1000
X_data = np.column_stack([np.random.randn(n), np.arange(n)/n])
y_data = 3*X_data[:,0] - 2*X_data[:,1] + 0.5 + np.random.normal(0, 0.3, n)
model = preprocessing.StandardScaler() | linear_model.LinearRegression()
metric = metrics.RMSE()
for i, (xi, yi) in enumerate(zip(X_data, y_data)):
x_dict = {'f0': xi[0], 'f1': xi[1]}
y_pred = model.predict_one(x_dict)
if y_pred is not None:
metric.update(yi, y_pred)
model.learn_one(x_dict, yi)
if i % 200 == 0 and y_pred is not None:
print(f" n={i}: running RMSE={metric.get():.4f}")
print(f"Final RMSE: {metric.get():.4f}")# pip install river
from river import drift
import numpy as np
np.random.seed(7)
detector = drift.ADWIN()
stream = np.concatenate([
np.random.normal(0, 1, 500),
np.random.normal(3, 1, 500) # sudden drift at t=500
])
drifts = []
for i, x in enumerate(stream):
detector.update(x)
if detector.drift_detected:
drifts.append(i)
detector = drift.ADWIN() # reset
print(f"Drift detected at sample(s): {drifts}")import numpy as np
from collections import deque
class StreamStats:
def __init__(self, window=100):
self.w = deque(maxlen=window)
self.n = 0
def update(self, x):
self.w.append(x)
self.n += 1
return self.mean(), self.std()
def mean(self): return np.mean(self.w)
def std(self): return np.std(self.w, ddof=1) if len(self.w) > 1 else 0.0
np.random.seed(1)
ss = StreamStats(window=50)
for i in range(300):
val = np.random.normal(5 if i < 150 else 8, 1)
mean, std = ss.update(val)
if i % 50 == 49:
print(f" t={i+1}: window mean={mean:.3f}, std={std:.3f}")import numpy as np
from collections import deque
np.random.seed(42)
n_sensors, n_steps = 5, 1000
window = 60
baselines = [deque(maxlen=window) for _ in range(n_sensors)]
alerts = {i: [] for i in range(n_sensors)}
# Inject drift at step 600 for sensor 2
for t in range(n_steps):
readings = np.random.normal(22, 0.5, n_sensors)
if t >= 600:
readings[2] += 4.0 # sensor 2 drifts
for s in range(n_sensors):
baselines[s].append(readings[s])
if len(baselines[s]) >= 20:
mu = np.mean(baselines[s])
sigma = np.std(baselines[s], ddof=1)
z = (readings[s] - mu) / max(sigma, 1e-6)
if abs(z) > 3:
alerts[s].append(t)
for s in range(n_sensors):
if alerts[s]:
print(f"Sensor {s}: {len(alerts[s])} alerts, first at t={alerts[s][0]}")
else:
print(f"Sensor {s}: no alerts")# pip install river
from river import linear_model, preprocessing, metrics, drift, stream as rv_stream
import numpy as np
np.random.seed(99)
n = 2000
# Generate transactions
amounts = np.random.exponential(50, n)
gaps = np.random.exponential(10, n)
is_weekend = np.random.randint(0, 2, n)
# Fraud rule: changes at t=1000
fraud_prob_before = 1 / (1 + np.exp(-(amounts/100 - 0.5)))
fraud_prob_after = 1 / (1 + np.exp(-(gaps/5 - 1.0))) # different pattern
y = np.array([
np.random.binomial(1, fraud_prob_before[i]) if i < 1000
else np.random.binomial(1, fraud_prob_after[i])
for i in range(n)
])
# TODO: Online logistic regression with River
# TODO: ADWIN drift detection on binary prediction errors
# TODO: Report AUC in windows before drift, at drift, after adaptation
Time series decompose into Trend, Seasonality, and Residual components. STL (Seasonal-Trend decomposition using LOESS) is robust to outliers and handles arbitrary seasonality. Use it before forecasting to understand your signal.
import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
np.random.seed(42)
periods = 120
dates = pd.date_range('2015-01', periods=periods, freq='MS')
trend = np.linspace(100, 160, periods)
seasonal = 15 * np.sin(2 * np.pi * np.arange(periods) / 12)
noise = np.random.normal(0, 5, periods)
ts = pd.Series(trend + seasonal + noise, index=dates)
result = seasonal_decompose(ts, model='additive', period=12)
print("Decomposition components:")
print(f" Trend range: [{result.trend.dropna().min():.1f}, {result.trend.dropna().max():.1f}]")
print(f" Seasonal range: [{result.seasonal.min():.2f}, {result.seasonal.max():.2f}]")
print(f" Residual std: {result.resid.dropna().std():.3f}")
# Seasonal indices
seasonal_idx = result.seasonal[:12]
for month, val in zip(range(1, 13), seasonal_idx):
print(f" Month {month:2d}: {val:+.2f}")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
np.random.seed(0)
n = 156 # 13 years monthly
dates = pd.date_range('2010-01', periods=n, freq='MS')
trend = np.linspace(50, 120, n) + 0.3 * np.sin(np.linspace(0, 4*np.pi, n)) * 10
seasonal = 20 * np.sin(2 * np.pi * np.arange(n) / 12) + 8 * np.sin(2 * np.pi * np.arange(n) / 6)
# Inject outliers
noise = np.random.normal(0, 3, n)
outliers = np.zeros(n)
outliers[[30, 60, 90]] = 40
ts = pd.Series(trend + seasonal + noise + outliers, index=dates)
stl = STL(ts, period=12, robust=True)
res = stl.fit()
print(f"STL decomposition of {n}-period series:")
print(f" Trend variance: {res.trend.var():.2f}")
print(f" Seasonal variance: {res.seasonal.var():.2f}")
print(f" Residual variance: {res.resid.var():.2f}")
print(f" Seasonal strength: {max(0, 1 - res.resid.var()/(res.seasonal+res.resid).var()):.3f}")
print(f" Trend strength: {max(0, 1 - res.resid.var()/(res.trend+res.resid).var()):.3f}")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
np.random.seed(7)
# Monthly retail sales with change point at month 60
n = 120
t = np.arange(n)
trend = np.where(t < 60, 100 + t*0.5, 130 + (t-60)*1.2)
seasonal = 25 * np.sin(2*np.pi*t/12)
noise = np.random.normal(0, 4, n)
ts = pd.Series(trend + seasonal + noise,
index=pd.date_range('2014-01', periods=n, freq='MS'))
stl = STL(ts, period=12, robust=True)
res = stl.fit()
# Detect change point: where residuals spike
resid_rolling_std = pd.Series(res.resid).rolling(6).std()
change_pt = resid_rolling_std.idxmax()
print(f"Largest residual volatility at index: {change_pt}")
# Year-over-year growth from trend
yoy = pd.Series(res.trend).pct_change(12) * 100
print(f"\nYear-over-year trend growth (last 3):")
for i in [-3, -2, -1]:
print(f" Month {n+i}: {yoy.iloc[i]:.2f}%")
# Identify peak and trough season
monthly_seasonal = [res.seasonal[i::12].mean() for i in range(12)]
peak = np.argmax(monthly_seasonal) + 1
trough = np.argmin(monthly_seasonal) + 1
print(f"\nPeak season: month {peak}, Trough: month {trough}")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
np.random.seed(42)
n = 60
dates = pd.date_range('2019-01', periods=n, freq='MS')
trend = 1000 + np.linspace(0, 500, n)
seasonal = 300 * np.sin(2*np.pi*np.arange(n)/12) + 100 * (np.arange(n) % 12 == 11) # Dec spike
noise = np.random.normal(0, 30, n)
sales = pd.Series(trend + seasonal + noise, index=dates)
stl = STL(sales, period=12, robust=True)
res = stl.fit()
print("Retail Sales Decomposition:")
print(f" Avg monthly trend growth: ${(res.trend.diff().dropna().mean()):.1f}")
print(f" Peak seasonal boost: ${res.seasonal.max():.0f}")
print(f" Residual RMSE: ${np.sqrt((res.resid**2).mean()):.1f}")
# Deseasonalized series for cleaner trend analysis
deseas = sales - res.seasonal
print(f" Deseasonalized range: [{deseas.min():.0f}, {deseas.max():.0f}]")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from scipy import stats as sp_stats
np.random.seed(42)
n = 260 # 5 years weekly
t = np.arange(n)
trend = 500 + t * 0.8
seasonal = 80 * np.sin(2*np.pi*t/52) + 30 * np.sin(2*np.pi*t/13)
noise = np.random.normal(0, 20, n)
ts = pd.Series(trend + seasonal + noise,
index=pd.date_range('2019-01-07', periods=n, freq='W'))
stl = STL(ts, period=52, robust=True)
res = stl.fit()
# (1) trend slope via linear regression on trend component
# (2) peak week (argmax of first 52 seasonal values)
# (3) variance decomposition
total_var = ts.var()
# TODO: compute each component's share
ARIMA requires a stationary series (constant mean/variance). ADF and KPSS tests check stationarity. Differencing and log transforms achieve it. ACF/PACF plots reveal AR and MA orders.
import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss
np.random.seed(42)
n = 200
t = np.arange(n)
series = {
'Random Walk': np.cumsum(np.random.randn(n)), # non-stationary
'Stationary AR1': np.zeros(n), # stationary
'Trend + Noise': 2 + 0.05*t + np.random.randn(n), # non-stationary (trend)
}
for i in range(1, n):
series['Stationary AR1'][i] = 0.7*series['Stationary AR1'][i-1] + np.random.randn()
print(f"{'Series':16s} {'ADF p':>8s} {'ADF stat':>9s} {'KPSS p':>8s} {'Stationary?'}")
print('-' * 60)
for name, data in series.items():
adf_stat, adf_p, *_ = adfuller(data, autolag='AIC')
try:
kpss_stat, kpss_p, *_ = kpss(data, regression='c', nlags='auto')
except Exception:
kpss_p = 0.01
# Stationary: ADF rejects H0 (p<0.05) AND KPSS fails to reject H0 (p>0.05)
is_stat = (adf_p < 0.05) and (kpss_p > 0.05)
print(f"{name:16s} {adf_p:8.4f} {adf_stat:9.4f} {kpss_p:8.4f} {is_stat}")import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller
from scipy.stats import boxcox
np.random.seed(0)
n = 150
t = np.arange(n)
# Non-stationary: trending with heteroskedastic variance
ts = pd.Series(np.exp(0.02*t + np.cumsum(np.random.randn(n)*0.2)) * 100)
def adf_pvalue(x):
return adfuller(x.dropna(), autolag='AIC')[1]
print(f"Original: ADF p={adf_pvalue(ts):.4f} (non-stationary)")
# Log transform (stabilizes variance)
ts_log = np.log(ts)
print(f"Log transform: ADF p={adf_pvalue(ts_log):.4f}")
# First difference (removes linear trend)
ts_diff1 = ts.diff()
print(f"First difference: ADF p={adf_pvalue(ts_diff1):.4f}")
# Log + first difference (common for financial series)
ts_log_diff = ts_log.diff()
print(f"Log + diff: ADF p={adf_pvalue(ts_log_diff):.4f}")
# Seasonal difference (lag=12 for monthly)
ts_sdiff = ts.diff(12)
print(f"Seasonal diff (12): ADF p={adf_pvalue(ts_sdiff):.4f}")import numpy as np, pandas as pd
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.tsa.arima_process import ArmaProcess
np.random.seed(42)
n = 300
# Generate AR(2) process: phi1=0.6, phi2=-0.3
ar_params = np.array([1, -0.6, 0.3]) # AR coefficients (sign convention)
ma_params = np.array([1])
ar2_process = ArmaProcess(ar_params, ma_params)
ar2_data = ar2_process.generate_sample(nsample=n)
# Generate MA(1) process
ar_params2 = np.array([1])
ma_params2 = np.array([1, 0.8])
ma1_process = ArmaProcess(ar_params2, ma_params2)
ma1_data = ma1_process.generate_sample(nsample=n)
for name, data in [('AR(2)', ar2_data), ('MA(1)', ma1_data)]:
acf_vals = acf(data, nlags=10, fft=True)[1:6] # skip lag 0
pacf_vals = pacf(data, nlags=10)[1:6]
conf = 1.96 / np.sqrt(n)
print(f"\n{name} β PACF cutoff suggests AR order; ACF cutoff suggests MA order:")
print(f" ACF lags 1-5: {acf_vals.round(3)}")
print(f" PACF lags 1-5: {pacf_vals.round(3)}")
print(f" 95% conf band: Β±{conf:.3f}")
ar_order = sum(abs(pacf_vals) > conf)
ma_order = sum(abs(acf_vals) > conf)
print(f" Suggested: AR order={ar_order}, MA order={ma_order}")import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss, acf, pacf
np.random.seed(10)
# Simulate commodity prices (random walk with drift)
n = 120
prices = pd.Series(
np.exp(np.cumsum(np.random.normal(0.005, 0.04, n))) * 1800,
index=pd.date_range('2014-01', periods=n, freq='MS'),
name='Aluminium_USD_t')
def check_stationarity(series, name):
adf_stat, adf_p, *_ = adfuller(series.dropna(), autolag='AIC')
print(f"{name:25s}: ADF p={adf_p:.4f} {'[stationary]' if adf_p < 0.05 else '[non-stationary]'}")
return adf_p < 0.05
check_stationarity(prices, 'Level prices')
check_stationarity(np.log(prices), 'Log prices')
log_diff = np.log(prices).diff().dropna()
check_stationarity(log_diff, 'Log-differenced')
acf_vals = acf(log_diff, nlags=8)[1:]
pacf_vals = pacf(log_diff, nlags=8)[1:]
conf = 1.96 / np.sqrt(len(log_diff))
print(f"\nACF lags 1-8: {acf_vals.round(3)}")
print(f"PACF lags 1-8: {pacf_vals.round(3)}")
print(f"Suggested ARIMA(p,1,q): p={sum(abs(pacf_vals[:4])>conf)}, q={sum(abs(acf_vals[:4])>conf)}")import numpy as np, pandas as pd
from statsmodels.tsa.stattools import adfuller
def make_stationary(ts, max_diffs=2, alpha=0.05):
result = ts.copy()
log_used = False
n_diffs = 0
# (1) check original
# (2) try log if all positive
# (3) difference up to max_diffs
return result, n_diffs, log_used
np.random.seed(42)
ts_rw = pd.Series(np.cumsum(np.random.randn(100)))
ts_exp = pd.Series(np.exp(0.03*np.arange(100) + np.random.randn(100)*0.2)*100)
ts_stat = pd.Series(np.random.randn(100))
for name, ts in [('Random Walk', ts_rw), ('Exponential Trend', ts_exp), ('Stationary', ts_stat)]:
out, d, log = make_stationary(ts)
print(f"{name}: diffs={d}, log={log}")
ARIMA(p,d,q) models a stationary process as AutoRegressive + Integrated + Moving Average. SARIMA adds seasonal components (P,D,Q,m). Use AIC/BIC for order selection and residual diagnostics to validate.
import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA
np.random.seed(42)
n = 120
dates = pd.date_range('2014-01', periods=n, freq='MS')
trend = np.linspace(100, 150, n)
noise = np.cumsum(np.random.normal(0, 2, n)) # AR component
ts = pd.Series(trend + noise, index=dates)
# Fit ARIMA(1,1,1)
model = ARIMA(ts, order=(1, 1, 1))
fitted = model.fit()
print(fitted.summary().tables[1])
# Forecast 12 months ahead
forecast = fitted.get_forecast(steps=12)
pred_mean = forecast.predicted_mean
pred_ci = forecast.conf_int(alpha=0.05)
print(f"\n12-month forecast:")
for date, val, lo, hi in zip(pred_mean.index[:3], pred_mean[:3],
pred_ci.iloc[:3,0], pred_ci.iloc[:3,1]):
print(f" {date.strftime('%Y-%m')}: {val:.1f} [{lo:.1f}, {hi:.1f}]")
print(f" ...")
# In-sample fit metrics
from sklearn.metrics import mean_squared_error
residuals = fitted.resid
print(f"\nIn-sample RMSE: {np.sqrt((residuals**2).mean()):.3f}")
print(f"AIC: {fitted.aic:.2f}, BIC: {fitted.bic:.2f}")import numpy as np, pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 84 # 7 years monthly
dates = pd.date_range('2017-01', periods=n, freq='MS')
trend = np.linspace(1000, 1600, n)
seasonal = 200 * np.sin(2*np.pi*np.arange(n)/12)
noise = np.cumsum(np.random.normal(0, 20, n))
ts = pd.Series(trend + seasonal + noise, index=dates)
# SARIMA(1,1,1)(1,1,1,12)
model = SARIMAX(ts, order=(1,1,1), seasonal_order=(1,1,1,12),
trend='n', enforce_stationarity=False,
enforce_invertibility=False)
fitted = model.fit(disp=False)
print(f"SARIMA(1,1,1)(1,1,1,12): AIC={fitted.aic:.2f}, BIC={fitted.bic:.2f}")
# Compare AIC for different orders
for order, sorder in [((0,1,1),(0,1,1,12)), ((1,1,0),(1,1,0,12))]:
m = SARIMAX(ts, order=order, seasonal_order=sorder,
enforce_stationarity=False, enforce_invertibility=False)
f = m.fit(disp=False)
print(f"SARIMA{order}{sorder}: AIC={f.aic:.2f}")
forecast = fitted.get_forecast(12)
print(f"\nForecast next month: {forecast.predicted_mean.iloc[0]:.1f}")import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
from scipy import stats
import warnings; warnings.filterwarnings('ignore')
np.random.seed(0)
n = 150
ts = pd.Series(np.cumsum(np.random.normal(0.5, 2, n)) + np.random.randn(n),
index=pd.date_range('2012-01', periods=n, freq='MS'))
model = ARIMA(ts, order=(1, 1, 1))
fitted = model.fit()
residuals = fitted.resid.dropna()
# Test 1: Ljung-Box (autocorrelation in residuals)
lb = acorr_ljungbox(residuals, lags=[10], return_df=True)
print(f"Ljung-Box p (lag 10): {lb['lb_pvalue'].iloc[0]:.4f} (>0.05 = no autocorrelation = good)")
# Test 2: Normality of residuals
_, p_sw = stats.shapiro(residuals[:50])
print(f"Shapiro-Wilk p: {p_sw:.4f} (>0.05 = normal residuals = good)")
# Test 3: Heteroskedasticity (should be constant variance)
half = len(residuals) // 2
_, p_lev = stats.levene(residuals[:half], residuals[half:])
print(f"Levene p: {p_lev:.4f} (>0.05 = equal variance = good)")
print(f"\nResidual stats: mean={residuals.mean():.4f}, std={residuals.std():.3f}")
print(f"AIC={fitted.aic:.2f}, model is {'adequate' if lb['lb_pvalue'].iloc[0]>0.05 else 'needs improvement'}")import numpy as np, pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
np.random.seed(5)
n = 96
dates = pd.date_range('2016-01', periods=n, freq='MS')
trend = np.linspace(500, 700, n)
seasonal = 150 * np.cos(2*np.pi*np.arange(n)/12) # peak in winter
noise = np.cumsum(np.random.normal(0, 10, n))
demand = pd.Series(trend + seasonal + noise, index=dates, name='Gas_MMcf')
train = demand[:-12]
test = demand[-12:]
model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12),
enforce_stationarity=False, enforce_invertibility=False)
fitted = model.fit(disp=False)
forecast = fitted.get_forecast(steps=12)
pred = forecast.predicted_mean
ci = forecast.conf_int()
from sklearn.metrics import mean_absolute_percentage_error
mape = mean_absolute_percentage_error(test, pred) * 100
rmse = np.sqrt(((test - pred)**2).mean())
print(f"SARIMA(1,1,1)(1,1,1,12) forecast metrics:")
print(f" MAPE: {mape:.2f}% RMSE: {rmse:.2f} MMcf")
print(f" AIC: {fitted.aic:.1f}")
print(f"\n6-month capacity forecast (MMcf):")
for i in range(6):
print(f" {pred.index[i].strftime('%Y-%m')}: {pred.iloc[i]:.0f} [{ci.iloc[i,0]:.0f}-{ci.iloc[i,1]:.0f}]")import numpy as np, pandas as pd
from statsmodels.tsa.arima.model import ARIMA
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 60
ts = pd.Series(
100 + np.cumsum(np.random.normal(0, 2, n)) + 20*np.sin(2*np.pi*np.arange(n)/12),
index=pd.date_range('2019-01', periods=n, freq='MS'))
def select_arima_order(ts, max_p=3, max_q=3, d=1):
best_aic, best_order = np.inf, None
for p in range(max_p+1):
for q in range(max_q+1):
if p == 0 and q == 0:
continue
try:
m = ARIMA(ts, order=(p, d, q))
f = m.fit()
if f.aic < best_aic:
best_aic, best_order = f.aic, (p, d, q)
except Exception:
pass
return best_order, best_aic
# TODO: call select_arima_order on train set (first 48 months)
# TODO: forecast last 12 months with best order
# TODO: compare RMSE to naive persistence (repeat last observed value)
Exponential smoothing methods weight recent observations more heavily. Simple ES handles level; Holt's double ES adds trend; Holt-Winters triple ES adds seasonality. The ETS framework unifies them with Error-Trend-Seasonality states.
import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, Holt, ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 60
dates = pd.date_range('2019-01', periods=n, freq='MS')
trend = np.linspace(100, 160, n)
seasonal = 20 * np.sin(2*np.pi*np.arange(n)/12)
noise = np.random.normal(0, 5, n)
ts = pd.Series(trend + seasonal + noise, index=dates)
train, test = ts[:-12], ts[-12:]
# Simple Exponential Smoothing (level only)
ses = SimpleExpSmoothing(train, initialization_method='estimated').fit()
ses_fc = ses.forecast(12)
# Holt's (trend)
holt = Holt(train, initialization_method='estimated').fit(
optimized=True, smoothing_level=0.3, smoothing_trend=0.1)
holt_fc = holt.forecast(12)
# Holt-Winters (trend + seasonality)
hw = ExponentialSmoothing(train, trend='add', seasonal='add',
seasonal_periods=12,
initialization_method='estimated').fit(optimized=True)
hw_fc = hw.forecast(12)
from sklearn.metrics import mean_squared_error
for name, fc in [('SES', ses_fc), ("Holt's", holt_fc), ('Holt-Winters', hw_fc)]:
rmse = np.sqrt(mean_squared_error(test, fc))
print(f"{name:15s}: RMSE={rmse:.2f}, alpha={getattr(getattr(ses if name=='SES' else holt if 'Holt' in name and name!='Holt-Winters' else hw,'model',None),'params',{}).get('smoothing_level','N/A')}")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(0)
n = 84
dates = pd.date_range('2017-01', periods=n, freq='MS')
ts = pd.Series(
np.linspace(200, 320, n) + 30*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0, 8, n), index=dates)
train, test = ts[:-12], ts[-12:]
configs = [
('Additive trend + Additive seasonal', 'add', 'add', False),
('Additive trend + Mult seasonal', 'add', 'mul', False),
('Damped trend + Additive seasonal', 'add', 'add', True),
]
print(f"{'Model':40s} {'RMSE':>6s} {'AIC':>8s}")
for name, trend, seasonal, damped in configs:
m = ExponentialSmoothing(train, trend=trend, seasonal=seasonal,
seasonal_periods=12, damped_trend=damped,
initialization_method='estimated')
fitted = m.fit(optimized=True)
fc = fitted.forecast(12)
rmse = np.sqrt(((test - fc)**2).mean())
print(f"{name:40s} {rmse:6.2f} {fitted.aic:8.2f}")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(5)
n = 72
ts = pd.Series(
150 + np.linspace(0, 50, n) + 25*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0, 6, n),
index=pd.date_range('2018-01', periods=n, freq='MS'))
hw = ExponentialSmoothing(ts, trend='add', seasonal='add',
seasonal_periods=12,
initialization_method='estimated').fit(optimized=True)
print("Optimal smoothing parameters:")
print(f" alpha (level): {hw.params['smoothing_level']:.4f}")
print(f" beta (trend): {hw.params['smoothing_trend']:.4f}")
print(f" gamma (seasonal): {hw.params['smoothing_seasonal']:.4f}")
# Extract level, trend, and seasonal states
level = hw.level
trend_s = hw.trend
seasonal = hw.season
print(f"\nFinal state:")
print(f" Level: {level.iloc[-1]:.2f}")
print(f" Trend: {trend_s.iloc[-1]:.4f} per period")
print(f" Seasonal indices (last 12): {seasonal.iloc[-12:].round(2).values}")
fc = hw.forecast(6)
print(f"\n6-month forecast: {fc.round(1).values}")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(7)
n_weeks = 104 # 2 years weekly
t = np.arange(n_weeks)
trend_w = 1000 + t * 3
seasonal_w = 400 * np.sin(2*np.pi*t/52) + 150 * (t % 52 > 44).astype(float) # summer+holiday
noise_w = np.random.normal(0, 40, n_weeks)
weekly_sales = pd.Series(
(trend_w + seasonal_w + noise_w).clip(min=100),
index=pd.date_range('2022-01-03', periods=n_weeks, freq='W'))
train_w = weekly_sales[:-8]
test_w = weekly_sales[-8:]
# Try additive and multiplicative seasonal
for seas_type in ['add', 'mul']:
hw = ExponentialSmoothing(
train_w, trend='add', seasonal=seas_type,
seasonal_periods=52, initialization_method='estimated').fit(optimized=True)
fc = hw.forecast(8)
rmse = np.sqrt(((test_w - fc)**2).mean())
mape = (abs(test_w - fc) / test_w).mean() * 100
print(f"Seasonal={seas_type}: RMSE={rmse:.1f}, MAPE={mape:.2f}%, AIC={hw.aic:.1f}")
# Reorder forecast for next 4 weeks
best_hw = ExponentialSmoothing(weekly_sales, trend='add', seasonal='add',
seasonal_periods=52, initialization_method='estimated').fit(optimized=True)
next4 = best_hw.forecast(4)
print(f"\nNext 4 weeks inventory needs: {next4.round(0).values}")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 72
ts = pd.Series(100 + np.linspace(0,60,n) + 25*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*5,
index=pd.date_range('2018-01', periods=n, freq='MS'))
train, test = ts[:-12], ts[-12:]
def grid_search_ets(train, test, h=12):
results = []
# TODO: iterate over trend, seasonal, damped combinations
# TODO: fit ExponentialSmoothing, forecast h steps
# TODO: compute AIC, RMSE, MAPE
# TODO: append to results, return sorted DataFrame
return pd.DataFrame(results)
print(grid_search_ets(train, test).head(5))
Transform any time series into a supervised ML problem by creating lag features, rolling statistics, and calendar features. This lets you use any sklearn regressor for forecasting.
import numpy as np, pandas as pd
np.random.seed(42)
n = 200
ts = pd.Series(
100 + np.cumsum(np.random.normal(0.3, 2, n)) +
15 * np.sin(2*np.pi*np.arange(n)/12),
index=pd.date_range('2007-01', periods=n, freq='MS'),
name='sales')
def make_features(ts, lags=6, windows=[3, 6, 12]):
df = pd.DataFrame({'y': ts})
# Lag features
for lag in range(1, lags+1):
df[f'lag_{lag}'] = ts.shift(lag)
# Rolling statistics
for w in windows:
df[f'roll_mean_{w}'] = ts.shift(1).rolling(w).mean()
df[f'roll_std_{w}'] = ts.shift(1).rolling(w).std()
df[f'roll_min_{w}'] = ts.shift(1).rolling(w).min()
# Calendar features
df['month'] = ts.index.month
df['month_sin'] = np.sin(2*np.pi*ts.index.month/12)
df['month_cos'] = np.cos(2*np.pi*ts.index.month/12)
df['trend'] = np.arange(len(ts))
return df.dropna()
features = make_features(ts)
print(f"Feature matrix: {features.shape}")
print(f"Features: {list(features.columns)}")
print(f"\nSample row:\n{features.iloc[0].round(3)}")import numpy as np, pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_percentage_error
import warnings; warnings.filterwarnings('ignore')
np.random.seed(0)
n = 180
ts = pd.Series(
100 + np.cumsum(np.random.normal(0.3, 1.5, n)) +
20 * np.sin(2*np.pi*np.arange(n)/12),
index=pd.date_range('2009-06', periods=n, freq='MS'))
def make_features(ts, lags=12):
df = pd.DataFrame({'y': ts})
for lag in range(1, lags+1):
df[f'lag_{lag}'] = ts.shift(lag)
df['roll_mean_6'] = ts.shift(1).rolling(6).mean()
df['roll_std_6'] = ts.shift(1).rolling(6).std()
df['month_sin'] = np.sin(2*np.pi*ts.index.month/12)
df['month_cos'] = np.cos(2*np.pi*ts.index.month/12)
return df.dropna()
data = make_features(ts)
cutoff = len(data) - 24
X, y = data.drop('y', axis=1), data['y']
X_tr, X_te = X.iloc[:cutoff], X.iloc[cutoff:]
y_tr, y_te = y.iloc[:cutoff], y.iloc[cutoff:]
model = GradientBoostingRegressor(n_estimators=200, max_depth=4, random_state=42)
model.fit(X_tr, y_tr)
preds = model.predict(X_te)
rmse = np.sqrt(((y_te.values - preds)**2).mean())
mape = mean_absolute_percentage_error(y_te, preds) * 100
print(f"GBR Forecast: RMSE={rmse:.2f}, MAPE={mape:.2f}%")
# Feature importance
imp = pd.Series(model.feature_importances_, index=X.columns).nlargest(5)
print(f"Top features: {imp.round(3).to_dict()}")import numpy as np, pandas as pd
from sklearn.ensemble import RandomForestRegressor
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 150
ts = pd.Series(
np.cumsum(np.random.normal(0.5, 2, n)) + 100 +
15 * np.sin(2*np.pi*np.arange(n)/12),
index=pd.date_range('2012-06', periods=n, freq='MS'))
lags = 6
data = pd.DataFrame({'y': ts})
for l in range(1, lags+1):
data[f'lag_{l}'] = ts.shift(l)
data['month_sin'] = np.sin(2*np.pi*ts.index.month/12)
data['month_cos'] = np.cos(2*np.pi*ts.index.month/12)
data = data.dropna()
X, y = data.drop('y', axis=1), data['y']
X_tr, X_te = X.iloc[:-12], X.iloc[-12:]
y_tr, y_te = y.iloc[:-12], y.iloc[-12:]
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_tr, y_tr)
# Recursive forecast: use predictions as future lags
history = list(y_tr.values[-lags:])
preds = []
future_idx = pd.date_range(X_te.index[0], periods=12, freq='MS')
for i, date in enumerate(future_idx):
row = [history[-l] for l in range(1, lags+1)]
row += [np.sin(2*np.pi*date.month/12), np.cos(2*np.pi*date.month/12)]
pred = model.predict([row])[0]
preds.append(pred)
history.append(pred)
rmse = np.sqrt(((y_te.values - np.array(preds))**2).mean())
print(f"Recursive RF RMSE: {rmse:.2f}")
print(f"True last 3: {y_te.values[-3:].round(1)}")
print(f"Pred last 3: {np.array(preds[-3:]).round(1)}")import numpy as np, pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_percentage_error
np.random.seed(42)
n = 365 * 2 # 2 years daily
t = np.arange(n)
daily = pd.Series(
1000 + 0.5*t +
300 * np.sin(2*np.pi*t/365) +
150 * np.sin(2*np.pi*t/7) + # weekly pattern
np.random.normal(0, 50, n),
index=pd.date_range('2022-01-01', periods=n, freq='D'))
def build_features(ts, lags=14):
df = pd.DataFrame({'y': ts})
for l in range(1, lags+1):
df[f'lag_{l}'] = ts.shift(l)
for w in [7, 14, 30]:
df[f'rm_{w}'] = ts.shift(1).rolling(w).mean()
df['dow'] = ts.index.dayofweek
df['month'] = ts.index.month
df['week_sin'] = np.sin(2*np.pi*ts.index.dayofweek/7)
df['week_cos'] = np.cos(2*np.pi*ts.index.dayofweek/7)
df['annual_sin'] = np.sin(2*np.pi*ts.index.dayofyear/365)
return df.dropna()
data = build_features(daily)
X, y = data.drop('y', axis=1), data['y']
split = len(data) - 90
X_tr, X_te, y_tr, y_te = X.iloc[:split], X.iloc[split:], y.iloc[:split], y.iloc[split:]
gbr = GradientBoostingRegressor(n_estimators=300, max_depth=5, learning_rate=0.05, random_state=42)
gbr.fit(X_tr, y_tr)
preds = gbr.predict(X_te)
mape = mean_absolute_percentage_error(y_te, preds) * 100
rmse = np.sqrt(((y_te.values - preds)**2).mean())
print(f"Daily volume forecast: MAPE={mape:.2f}%, RMSE={rmse:.1f}")
top5 = pd.Series(gbr.feature_importances_, index=X.columns).nlargest(5)
print(f"Top features: {top5.round(3).to_dict()}")import numpy as np, pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_percentage_error
np.random.seed(42)
n = 156 # 3 years weekly
t = np.arange(n)
ts = pd.Series(
5000 + 10*t + 800*np.sin(2*np.pi*t/52) + np.random.normal(0,150,n),
index=pd.date_range('2021-01-04', periods=n, freq='W'))
# TODO: create lag + rolling + calendar features
# TODO: train/test split (last 26 weeks = test)
# TODO: train RandomForest
# TODO: report RMSE and MAPE
# TODO: print actual vs predicted for test
Detect anomalous time points using statistical control limits, rolling z-scores, STL residuals, and IsolationForest. Different methods suit different anomaly types: spikes, level shifts, or trend changes.
import numpy as np, pandas as pd
np.random.seed(42)
n = 200
ts = pd.Series(
50 + np.cumsum(np.random.normal(0.1, 1, n)),
index=pd.date_range('2020-01-01', periods=n, freq='D'))
# Inject anomalies
anomaly_idx = [40, 90, 140, 170]
ts.iloc[anomaly_idx] += np.array([25, -30, 20, -25])
# Method 1: Rolling z-score
window = 20
roll_mean = ts.rolling(window, min_periods=1).mean()
roll_std = ts.rolling(window, min_periods=1).std().fillna(1)
z_scores = (ts - roll_mean) / roll_std
anomalies_z = ts[abs(z_scores) > 3]
print(f"Rolling z-score (|z|>3): {len(anomalies_z)} anomalies detected")
print(f" True: {anomaly_idx}")
print(f" Detected: {list(anomalies_z.index.strftime('%Y-%m-%d')[:5])}")
# Method 2: IQR fences on rolling window
q1 = ts.rolling(30).quantile(0.25)
q3 = ts.rolling(30).quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
anomalies_iqr = ts[(ts < lower) | (ts > upper)]
print(f"IQR fences: {len(anomalies_iqr)} anomalies detected")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
np.random.seed(0)
n = 120
ts = pd.Series(
100 + np.linspace(0,30,n) + 20*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0, 3, n),
index=pd.date_range('2014-01', periods=n, freq='MS'))
# Inject anomalies at specific months
anom_idx = [24, 50, 85, 100]
ts.iloc[anom_idx] += [40, -35, 50, -40]
stl = STL(ts, period=12, robust=True)
res = stl.fit()
residuals = pd.Series(res.resid, index=ts.index)
# Flag residuals beyond 3 sigma (estimated robustly)
from scipy.stats import median_abs_deviation
mad = median_abs_deviation(residuals.dropna())
robust_std = mad / 0.6745
threshold = 3 * robust_std
flags = abs(residuals) > threshold
detected = ts[flags]
print(f"STL residual anomaly detection (threshold={threshold:.2f}):")
print(f" Flagged {flags.sum()} points")
print(f" True anomaly indices: {anom_idx}")
print(f" Detected at: {list(detected.index.strftime('%Y-%m')[:6])}")
tp = sum(1 for i in anom_idx if flags.iloc[i])
fp = flags.sum() - tp
print(f" TP={tp}, FP={fp}")import numpy as np, pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
n = 365
ts = pd.Series(
100 + 0.05*np.arange(n) + 20*np.sin(2*np.pi*np.arange(n)/365) +
np.random.normal(0, 4, n),
index=pd.date_range('2022-01-01', periods=n, freq='D'))
true_anomalies = [50, 120, 200, 280, 340]
ts.iloc[true_anomalies] += np.array([35, -40, 30, -45, 38])
# Build windowed features for each point
window = 7
features = pd.DataFrame({
'value': ts,
'diff1': ts.diff(),
'roll_mean': ts.rolling(window).mean(),
'roll_std': ts.rolling(window).std(),
'z_score': (ts - ts.rolling(window).mean()) / ts.rolling(window).std(),
}).dropna()
scaler = StandardScaler()
X = scaler.fit_transform(features)
iso = IsolationForest(contamination=len(true_anomalies)/n, random_state=42, n_estimators=200)
preds = iso.fit_predict(X) # -1 = anomaly
detected_idx = features.index[preds == -1]
print(f"IsolationForest detected: {len(detected_idx)} anomalies")
true_dates = ts.index[true_anomalies]
tp = sum(1 for d in detected_idx if any(abs((d - t).days) <= 2 for t in true_dates))
print(f"True positives (within 2 days): {tp}/{len(true_anomalies)}")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from scipy.stats import median_abs_deviation
np.random.seed(42)
n = 1440 # 24 hours Γ 60 min (1 day, minutely)
t = np.arange(n)
# CPU: daily cycle + trend
cpu = pd.Series(
40 + 20*np.sin(2*np.pi*t/1440) + np.random.normal(0, 3, n),
index=pd.date_range('2024-01-01', periods=n, freq='min'))
# Inject anomalies
cpu.iloc[200:210] += 35 # sustained spike
cpu.iloc[800] += 60 # sudden spike
cpu.iloc[1200] -= 25 # drop
# Method 1: Rolling z-score (fast, real-time friendly)
w = 60
z = (cpu - cpu.rolling(w).mean()) / cpu.rolling(w).std()
z_flags = (abs(z) > 3).fillna(False)
# Method 2: Hourly aggregation + STL
hourly = cpu.resample('H').mean()
stl = STL(hourly, period=24, robust=True)
resid = pd.Series(stl.fit().resid, index=hourly.index)
mad = median_abs_deviation(resid.dropna())
stl_flags = abs(resid) > 3 * mad / 0.6745
print(f"Rolling z-score flagged: {z_flags.sum()} minutes")
print(f"STL (hourly) flagged: {stl_flags.sum()} hours")
print(f"Z-score alerts during spike period: {z_flags.iloc[195:215].sum()}")import numpy as np, pandas as pd
from statsmodels.tsa.seasonal import STL
from sklearn.ensemble import IsolationForest
from scipy.stats import median_abs_deviation
np.random.seed(42)
n = 730
t = np.arange(n)
ts = pd.Series(
15 + 0.01*t + 10*np.sin(2*np.pi*t/365) + np.random.normal(0,1.5,n),
index=pd.date_range('2022-01-01', periods=n, freq='D'),
name='temperature')
anom_days = [60, 150, 250, 380, 500, 650]
deltas = [12, -15, 10, -13, 11, -10]
for day, delta in zip(anom_days, deltas):
ts.iloc[day] += delta
# TODO: implement 3 anomaly detectors
# TODO: print TP matrix for each injected anomaly
RMSE and MAPE are standard forecast metrics but have blind spots. SMAPE handles zeros; MASE benchmarks against naive. Walk-forward backtesting gives realistic performance estimates and detects overfitting to a single test window.
import numpy as np
def rmse(y_true, y_pred):
return np.sqrt(np.mean((y_true - y_pred)**2))
def mape(y_true, y_pred):
mask = y_true != 0
return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
def smape(y_true, y_pred):
denom = (np.abs(y_true) + np.abs(y_pred)) / 2
mask = denom > 0
return np.mean(np.abs(y_true[mask] - y_pred[mask]) / denom[mask]) * 100
def mase(y_true, y_pred, y_train, m=1):
# Mean Absolute Scaled Error: normalized by naive seasonal forecast error
naive_err = np.mean(np.abs(np.diff(y_train, n=m)))
return np.mean(np.abs(y_true - y_pred)) / naive_err
np.random.seed(42)
y_true = np.array([100, 150, 130, 200, 175, 220, 195, 210])
y_pred = y_true + np.random.randn(len(y_true)) * 15
y_train = np.array([80, 90, 110, 95, 105, 120, 115, 130, 140] + list(y_true[:4]))
print(f"RMSE: {rmse(y_true, y_pred):.3f}")
print(f"MAPE: {mape(y_true, y_pred):.2f}%")
print(f"SMAPE: {smape(y_true, y_pred):.2f}%")
print(f"MASE: {mase(y_true, y_pred, np.array(y_train)):.3f} (< 1 = better than naive)")
# Bias and directional accuracy
bias = np.mean(y_pred - y_true)
da = np.mean(np.sign(np.diff(y_true)) == np.sign(np.diff(y_pred))) * 100
print(f"Bias: {bias:.3f} (positive = over-forecast)")
print(f"Directional accuracy: {da:.1f}%")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 84
ts = pd.Series(
100 + np.linspace(0,50,n) + 20*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0,5,n),
index=pd.date_range('2017-01', periods=n, freq='MS'))
def walk_forward_cv(ts, min_train=24, step=6, h=6):
errors = []
starts = range(min_train, len(ts) - h + 1, step)
for start in starts:
train = ts[:start]
test = ts[start:start+h]
try:
hw = ExponentialSmoothing(train, trend='add', seasonal='add',
seasonal_periods=12,
initialization_method='estimated').fit(optimized=True)
fc = hw.forecast(h)
rmse = np.sqrt(((test.values - fc.values)**2).mean())
errors.append({'fold': start, 'n_train': len(train),
'rmse': rmse, 'mape': (abs(test - fc)/test).mean()*100})
except Exception:
pass
return pd.DataFrame(errors)
cv_results = walk_forward_cv(ts)
print(f"Walk-forward CV ({len(cv_results)} folds):")
print(cv_results.round(2).to_string(index=False))
print(f"\nMean RMSE: {cv_results.rmse.mean():.3f} Β± {cv_results.rmse.std():.3f}")
print(f"Mean MAPE: {cv_results.mape.mean():.2f}%")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings; warnings.filterwarnings('ignore')
np.random.seed(5)
n = 96
ts = pd.Series(
200 + np.linspace(0,80,n) + 40*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0,8,n),
index=pd.date_range('2016-01', periods=n, freq='MS'))
train, test = ts[:-12], ts[-12:]
# Naive benchmarks
naive_persistence = pd.Series([train.iloc[-1]] * 12, index=test.index) # last value
naive_seasonal = train.iloc[-12:].values # same season last year
hw = ExponentialSmoothing(train, trend='add', seasonal='add',
seasonal_periods=12, initialization_method='estimated').fit(optimized=True)
hw_fc = hw.forecast(12)
forecasts = {
'Naive Persist': naive_persistence.values,
'Naive Seasonal': naive_seasonal,
'Holt-Winters': hw_fc.values,
}
print(f"{'Model':18s} {'RMSE':>7s} {'MAPE':>7s}")
for name, fc in forecasts.items():
rmse = np.sqrt(((test.values - fc)**2).mean())
mape = (abs(test.values - fc) / test.values).mean() * 100
print(f"{name:18s} {rmse:7.2f} {mape:7.2f}%")
# MASE relative to seasonal naive
naive_err = np.mean(np.abs(np.diff(train.values, n=12)))
hw_mase = np.mean(np.abs(test.values - hw_fc.values)) / naive_err
print(f"\nHolt-Winters MASE vs seasonal naive: {hw_mase:.3f}")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 72
ts = pd.Series(
500 + np.linspace(0,200,n) + 100*np.sin(2*np.pi*np.arange(n)/12) +
np.random.normal(0,15,n),
index=pd.date_range('2018-01', periods=n, freq='MS'))
folds, h = 5, 6
fold_size = (len(ts) - 24) // folds
results = {m: [] for m in ['HW','SARIMA']}
for fold in range(folds):
cutoff = 24 + fold * fold_size
train = ts[:cutoff]
test = ts[cutoff:cutoff+h]
if len(test) < h: break
try:
hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12,
initialization_method='estimated').fit(optimized=True)
hw_fc = hw.forecast(h)
results['HW'].append(np.sqrt(((test.values - hw_fc.values)**2).mean()))
except: pass
try:
sm = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12),
enforce_stationarity=False).fit(disp=False)
sm_fc = sm.forecast(h)
results['SARIMA'].append(np.sqrt(((test.values - sm_fc.values)**2).mean()))
except: pass
for model, rmses in results.items():
if rmses:
print(f"{model:8s}: mean RMSE={np.mean(rmses):.2f} Β± {np.std(rmses):.2f} ({len(rmses)} folds)")import numpy as np, pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings; warnings.filterwarnings('ignore')
def forecast_tournament(ts, h=12, n_folds=4):
# TODO: implement walk-forward CV for each model
# TODO: return DataFrame: model, mean_RMSE, std_RMSE, mean_MAPE
pass
np.random.seed(42)
n = 72
ts = pd.Series(
300 + np.linspace(0,100,n) + 60*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*10,
index=pd.date_range('2018-01', periods=n, freq='MS'))
results = forecast_tournament(ts, h=6, n_folds=4)
print(results)
VAR models multiple related time series jointly. Granger causality tests whether one series predicts another. Facebook Prophet is a user-friendly forecaster for business time series with holidays and change points.
import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import adfuller
np.random.seed(42)
n = 120
# Two co-moving series: temperature and ice cream sales
temp = pd.Series(20 + 10*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*2)
sales = 0.8*temp.shift(1).fillna(0) + np.random.randn(n)*5 + 50
df = pd.DataFrame({'temp': temp, 'sales': sales})
# Difference to achieve stationarity
df_diff = df.diff().dropna()
model = VAR(df_diff)
result = model.fit(maxlags=4, ic='aic')
print(f"VAR({result.k_ar}) fitted. AIC={result.aic:.2f}")
print(f"\nGranger causality test (temp -> sales):")
lag_order = result.k_ar
test = result.test_causality('sales', 'temp', kind='f')
print(f" F-stat={test.test_statistic:.3f}, p={test.pvalue:.4f}")
print(f" Temp Granger-causes sales? {test.pvalue < 0.05}")
fc = result.forecast(df_diff.values[-lag_order:], steps=6)
print(f"\n6-step forecast (differences):\n{pd.DataFrame(fc, columns=['temp_d','sales_d']).round(2)}")import numpy as np, pandas as pd
from statsmodels.tsa.stattools import grangercausalitytests, ccf
np.random.seed(0)
n = 200
# advertising spend leads to sales with 2-period lag
ad_spend = pd.Series(np.random.normal(100, 20, n))
sales = 0.7 * ad_spend.shift(2).fillna(0) + np.random.randn(n) * 15 + 200
web_visits = 0.5 * ad_spend.shift(1).fillna(0) + np.random.randn(n) * 10
df = pd.DataFrame({'sales': sales.diff(), 'ad_spend': ad_spend.diff(),
'web': web_visits.diff()}).dropna()
print("Granger causality: ad_spend -> sales (maxlag=5):")
gc = grangercausalitytests(df[['sales','ad_spend']], maxlag=3, verbose=False)
for lag, res in gc.items():
p = res[0]['ssr_ftest'][1]
print(f" lag {lag}: F-p={p:.4f} {'*' if p<0.05 else ''}")
# Cross-correlation function
cc = ccf(df['ad_spend'], df['sales'], alpha=0.05, unbiased=True)
print(f"\nCross-correlation (ad_spend, sales) at lags 0-5:")
for lag, r in enumerate(cc[0][:6]):
print(f" lag {lag}: r={r:.3f}")import numpy as np, pandas as pd
try:
from prophet import Prophet
HAS_PROPHET = True
except ImportError:
HAS_PROPHET = False
print("Install: pip install prophet")
if HAS_PROPHET:
np.random.seed(42)
n = 180
dates = pd.date_range('2018-01-01', periods=n, freq='MS')
trend = np.linspace(1000, 2000, n)
seasonal = 300 * np.sin(2*np.pi*np.arange(n)/12)
noise = np.random.normal(0, 50, n)
df = pd.DataFrame({'ds': dates, 'y': trend + seasonal + noise})
m = Prophet(yearly_seasonality=True, weekly_seasonality=False,
daily_seasonality=False, changepoint_prior_scale=0.05)
m.fit(df)
future = m.make_future_dataframe(periods=24, freq='MS')
forecast = m.predict(future)
print(forecast[['ds','yhat','yhat_lower','yhat_upper']].tail(6).round(1).to_string())
print(f"\nTrend at end: {forecast['trend'].iloc[-1]:.1f}")
else:
print("Prophet demo: requires 'pip install prophet'")
print("Prophet uses additive model: y = trend + seasonality + holidays + error")
print("Key params: changepoint_prior_scale, seasonality_prior_scale, holidays")import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
import warnings; warnings.filterwarnings('ignore')
np.random.seed(7)
n = 120
t = np.arange(n)
# Simulated macro indicators (stationary after differencing)
gdp_growth = pd.Series(0.5 + 0.3*np.sin(2*np.pi*t/24) + np.random.randn(n)*0.8)
unemployment = pd.Series(5.0 - 0.4*gdp_growth.shift(1).fillna(5) + np.random.randn(n)*0.3)
inflation = pd.Series(2.0 + 0.2*gdp_growth.shift(2).fillna(2) + np.random.randn(n)*0.4)
df = pd.DataFrame({'gdp': gdp_growth, 'unemp': unemployment, 'inflation': inflation})
model = VAR(df)
fitted = model.fit(maxlags=6, ic='aic')
print(f"VAR({fitted.k_ar}) AIC={fitted.aic:.2f}")
for caused in ['unemp','inflation']:
test = fitted.test_causality(caused, 'gdp', kind='f')
print(f"GDP -> {caused}: p={test.pvalue:.4f} {'(significant)' if test.pvalue<0.05 else ''}")
fc = pd.DataFrame(
fitted.forecast(df.values[-fitted.k_ar:], steps=6),
columns=['gdp','unemp','inflation'])
print(f"\n6-month forecast:\n{fc.round(3).to_string()}")import numpy as np, pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import adfuller, grangercausalitytests
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
n = 120
temp = pd.Series(20 + 10*np.sin(2*np.pi*np.arange(n)/12) + np.random.randn(n)*2)
humidity = pd.Series(60 + 15*np.sin(2*np.pi*(np.arange(n)-3)/12) + np.random.randn(n)*4)
energy = pd.Series(0.7*temp.shift(1).fillna(temp.mean()) + 0.4*humidity + np.random.randn(n)*5 + 20)
df = pd.DataFrame({'temp':temp, 'humidity':humidity, 'energy':energy})
# (1) Test stationarity and difference
# (2) Fit VAR with lag selection
# (3) Granger causality
# (4) Forecast and RMSE