Polars Study Guide
Blazing-fast DataFrame library β Rust-powered, lazy execution, built for big data.
10 Topics • High-Performance DataFramesInstall Polars and create DataFrames from dicts, lists, NumPy arrays, and ranges.
Creating DataFrames
# pip install polars
import polars as pl
import numpy as np
# From dict
df = pl.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'Dave'],
'age': [25, 30, 35, 28],
'score': [92.5, 88.0, 95.1, 76.3],
'active': [True, False, True, True]
})
print(df)
# Schema info
print(df.schema)
print(df.dtypes)
print(df.shape)Series & Data Types
import polars as pl
# Series
s = pl.Series('values', [10, 20, 30, 40, 50])
print(s)
print('dtype:', s.dtype)
print('mean:', s.mean())
# Explicit dtypes
df = pl.DataFrame({
'id': pl.Series([1, 2, 3], dtype=pl.Int32),
'price': pl.Series([9.99, 14.99, 4.99], dtype=pl.Float32),
'label': pl.Series(['a', 'b', 'c'], dtype=pl.Categorical),
})
print(df)
print(df.schema)
# Quick summary
df2 = pl.DataFrame({'x': range(100), 'y': [i**2 for i in range(100)]})
print(df2.describe())From CSV String & List of Dicts
import polars as pl
import io
# From a CSV string (in-memory)
csv_data = 'name,age,score\nAlice,25,92.5\nBob,30,88.0\nCarol,35,95.1'
df_csv = pl.read_csv(io.StringIO(csv_data))
print('From CSV string:')
print(df_csv)
# From list of dicts
records = [
{'product': 'Widget', 'price': 9.99, 'qty': 100},
{'product': 'Gadget', 'price': 24.99, 'qty': 50},
{'product': 'Doohickey', 'price': 4.99, 'qty': 200},
]
df_dicts = pl.DataFrame(records)
print('From list of dicts:')
print(df_dicts)
print('Schema:', df_dicts.schema)pl.from_records, pl.from_arrow & Schema Enforcement
import polars as pl
import numpy as np
# pl.from_records β list of tuples with explicit schema
records = [(1, 'Alice', 92.5), (2, 'Bob', 88.0), (3, 'Carol', 95.1)]
schema = {'id': pl.Int32, 'name': pl.Utf8, 'score': pl.Float32}
df_rec = pl.from_records(records, schema=schema)
print('from_records:')
print(df_rec)
print('dtypes:', df_rec.dtypes)
# pl.from_arrow β zero-copy from PyArrow table
try:
import pyarrow as pa
arrow_table = pa.table({'x': [10, 20, 30], 'y': [1.1, 2.2, 3.3]})
df_arrow = pl.from_arrow(arrow_table)
print('from_arrow:', df_arrow)
except ImportError:
print('pyarrow not installed β skipping from_arrow demo')
# Schema enforcement with dtypes dict
np.random.seed(42)
df_typed = pl.DataFrame(
{
'user_id': np.random.randint(1, 1000, 5).tolist(),
'amount': np.random.uniform(10, 500, 5).tolist(),
'category': ['food', 'tech', 'food', 'travel', 'tech'],
},
schema={'user_id': pl.Int32, 'amount': pl.Float64, 'category': pl.Categorical}
)
print('Schema-enforced DataFrame:')
print(df_typed)
print('Schema:', df_typed.schema)import polars as pl
import numpy as np
# Simulate large transaction dataset
np.random.seed(42)
n = 500_000
df = pl.DataFrame({
'transaction_id': range(n),
'user_id': np.random.randint(1000, 9999, n),
'amount': np.random.exponential(50, n).round(2),
'category': np.random.choice(['food','tech','travel','health'], n),
'status': np.random.choice(['completed','pending','failed'], n, p=[0.8,0.15,0.05])
})
print(f'Loaded {df.shape[0]:,} rows x {df.shape[1]} cols')
print(df.schema)
print(df.head())import polars as pl
import io
# 1. From dict
# TODO: df = pl.DataFrame({
# 'city': [...],
# 'population': [...],
# 'area_km2': [...]
# })
# 2. From list of dicts
# TODO: records = [{'city': ..., 'population': ..., 'area_km2': ...}, ...]
# TODO: df2 = pl.DataFrame(records)
# 3. Parse CSV string
csv_str = 'city,pop,area\nTokyo,13960000,2194.0\nParis,2161000,105.4\nLondon,8982000,1572.0'
# TODO: df3 = pl.read_csv(io.StringIO(csv_str))
# TODO: print('Schema:', df3.schema)
# 4. Add density column
# TODO: df4 = df.with_columns(
# (pl.col('population') / pl.col('area_km2')).round(1).alias('density')
# )
# TODO: print(df4)Polars uses an expression API for column operations β composable, lazy-friendly, and fast.
select() and with_columns()
import polars as pl
df = pl.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve'],
'salary': [60000, 75000, 90000, 55000, 110000],
'dept': ['Eng', 'Sales', 'Eng', 'HR', 'Eng'],
'years': [3, 5, 8, 2, 12]
})
# select: pick & transform columns
print(df.select(['name', 'salary']))
# with_columns: add new columns
df2 = df.with_columns([
(pl.col('salary') * 1.1).alias('new_salary'),
(pl.col('salary') / pl.col('years')).alias('salary_per_year'),
pl.col('name').str.to_uppercase().alias('name_upper')
])
print(df2)Expressions: Arithmetic, Aliases & Chaining
import polars as pl
df = pl.DataFrame({
'x': [1, 2, 3, 4, 5],
'y': [10, 20, 30, 40, 50],
'label': ['a', 'b', 'a', 'b', 'a']
})
# Expression chaining
result = df.select([
pl.col('x'),
pl.col('y'),
(pl.col('x') + pl.col('y')).alias('sum'),
(pl.col('y') / pl.col('x')).alias('ratio').round(2),
pl.col('x').pow(2).alias('x_squared'),
pl.col('label').is_in(['a']).alias('is_a')
])
print(result)
# Horizontal operations
df2 = df.with_columns(
pl.max_horizontal('x', 'y').alias('row_max'),
pl.sum_horizontal('x', 'y').alias('row_sum')
)
print(df2)pl.when / then / otherwise (Conditional Columns)
import polars as pl
df = pl.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve'],
'score': [92, 45, 78, 55, 88],
'attempts': [1, 3, 2, 4, 1]
})
# pl.when().then().otherwise() β like np.where but composable
result = df.with_columns([
pl.when(pl.col('score') >= 90)
.then(pl.lit('A'))
.when(pl.col('score') >= 70)
.then(pl.lit('B'))
.when(pl.col('score') >= 55)
.then(pl.lit('C'))
.otherwise(pl.lit('F'))
.alias('grade'),
pl.when(pl.col('attempts') == 1)
.then(pl.lit('First try'))
.otherwise(pl.lit('Retried'))
.alias('attempt_label')
])
print(result)pl.struct, pl.concat_list, unnest & pl.col('*') Patterns
import polars as pl
df = pl.DataFrame({
'first': ['Alice', 'Bob', 'Carol'],
'last': ['Smith', 'Jones', 'White'],
'score_a': [85, 90, 78],
'score_b': [92, 88, 95],
})
# pl.struct β pack multiple cols into a struct column
df2 = df.with_columns(
pl.struct(['first', 'last']).alias('full_name_struct'),
pl.struct(['score_a', 'score_b']).alias('scores_struct')
)
print('With struct columns:')
print(df2.select(['full_name_struct', 'scores_struct']))
# unnest β expand a struct back to individual columns
df3 = df2.select('scores_struct').unnest('scores_struct')
print('Unnested scores:')
print(df3)
# pl.concat_list β combine columns into a list column
df4 = df.with_columns(
pl.concat_list(['score_a', 'score_b']).alias('all_scores')
)
print('concat_list result:')
print(df4.select(['first', 'all_scores']))
# pl.col('*') β select all columns at once
df5 = df.select(pl.col('*').exclude(['first', 'last']))
print('All numeric cols via col(*).exclude:')
print(df5)import polars as pl
import numpy as np
np.random.seed(42)
n = 1000
products = pl.DataFrame({
'product_id': range(n),
'cost': np.random.uniform(5, 200, n).round(2),
'price': np.random.uniform(10, 400, n).round(2),
'units_sold': np.random.randint(0, 500, n),
'category': np.random.choice(['Electronics','Clothing','Food','Books'], n)
})
enriched = products.with_columns([
((pl.col('price') - pl.col('cost')) / pl.col('price') * 100)
.round(1).alias('margin_pct'),
(pl.col('price') * pl.col('units_sold')).alias('revenue'),
(pl.col('cost') > pl.col('price') * 0.8).alias('low_margin_flag')
])
print(enriched.filter(pl.col('low_margin_flag')).head(5))
print('Low margin products:', enriched['low_margin_flag'].sum())import polars as pl
df = pl.DataFrame({
'employee': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve'],
'base_salary': [55000, 72000, 95000, 48000, 110000],
'bonus_pct': [5.0, 8.0, 12.0, 3.0, 15.0],
'years_exp': [2, 5, 9, 1, 13]
})
# 1. Add total_comp
# TODO: df = df.with_columns(
# (pl.col('base_salary') * (1 + pl.col('bonus_pct') / 100))
# .round(2).alias('total_comp')
# )
# 2. Add level with pl.when
# TODO: df = df.with_columns(
# pl.when(pl.col('years_exp') < 3).then(pl.lit('Junior'))
# .when(pl.col('years_exp') < 7).then(pl.lit('Mid'))
# .otherwise(pl.lit('Senior'))
# .alias('level')
# )
# 3. Sum base_salary + 10000 raise using pl.sum_horizontal
# TODO: df = df.with_columns(
# pl.sum_horizontal('base_salary', pl.lit(10000)).alias('salary_with_raise')
# )
# 4. Filter to Senior only
# TODO: seniors = df.filter(pl.col('level') == 'Senior')
# TODO: print(seniors)Filter rows with Boolean expressions and sort by one or multiple columns.
filter() with Boolean Expressions
import polars as pl
df = pl.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve', 'Frank'],
'age': [25, 42, 35, 19, 55, 30],
'city': ['NY', 'LA', 'NY', 'Chicago', 'LA', 'NY'],
'salary': [70000, 95000, 80000, 45000, 120000, 65000]
})
# Single condition
print(df.filter(pl.col('age') > 30))
# Multiple conditions (AND)
print(df.filter(
(pl.col('city') == 'NY') & (pl.col('salary') > 65000)
))
# OR condition
print(df.filter(
(pl.col('city') == 'LA') | (pl.col('age') < 25)
))
# is_in
print(df.filter(pl.col('city').is_in(['NY', 'LA'])))sort() and top_k()
import polars as pl
df = pl.DataFrame({
'product': ['A', 'B', 'C', 'D', 'E'],
'sales': [300, 150, 450, 280, 100],
'region': ['East', 'West', 'East', 'West', 'East'],
'profit': [45.0, 30.0, 90.0, 55.0, 10.0]
})
# Sort single column
print(df.sort('sales', descending=True))
# Sort multiple columns
print(df.sort(['region', 'sales'], descending=[False, True]))
# top_k β faster than sort for large data
print(df.top_k(3, by='profit'))
# Null handling in sort
df2 = pl.DataFrame({'val': [3, None, 1, None, 2]})
print(df2.sort('val', nulls_last=True))Chained Filtering with is_between & str.contains
import polars as pl
df = pl.DataFrame({
'product': ['Widget Pro', 'Gadget Mini', 'Widget Lite', 'Super Gadget', 'Widget Max'],
'price': [29.99, 9.99, 14.99, 49.99, 39.99],
'rating': [4.5, 3.8, 4.1, 4.9, 4.3],
'in_stock': [True, False, True, True, False]
})
# Chain multiple filter conditions
result = df.filter(
pl.col('product').str.contains('Widget') &
pl.col('price').is_between(10.0, 45.0) &
pl.col('in_stock') &
(pl.col('rating') >= 4.0)
)
print('Filtered:', result)
# Negate a filter with ~
not_widget = df.filter(~pl.col('product').str.contains('Widget'))
print('Not Widget:', not_widget)is_between, top_k & Multi-Column Sort with nulls_last
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'name': [f'Item_{i}' for i in range(10)],
'price': [12.5, None, 45.0, 8.99, None, 99.0, 34.5, 22.0, None, 67.0],
'rating': np.round(np.random.uniform(1.0, 5.0, 10), 1).tolist(),
'region': ['East','West','East','North','South','West','East','North','South','West'],
})
# is_between on a column with nulls (nulls excluded automatically)
mid_range = df.filter(pl.col('price').is_between(10.0, 60.0))
print('Price between 10 and 60:')
print(mid_range)
# top_k β fast partial sort (no need to sort the whole frame)
print('Top 3 by rating (top_k):')
print(df.top_k(3, by='rating'))
# Sort by multiple cols with nulls_last=True
print('Sort by region asc, price desc (nulls last):')
print(df.sort(
['region', 'price'],
descending=[False, True],
nulls_last=True
))import polars as pl
import numpy as np
np.random.seed(42)
n = 100_000
txns = pl.DataFrame({
'txn_id': range(n),
'amount': np.random.exponential(500, n).round(2),
'country': np.random.choice(['US','UK','NG','VN','BR','RU'], n,
p=[0.5,0.2,0.1,0.05,0.1,0.05]),
'card_type': np.random.choice(['credit','debit'], n),
'hour': np.random.randint(0, 24, n)
})
HIGH_RISK = ['NG', 'VN', 'RU']
suspects = txns.filter(
(pl.col('amount') > 10000) &
(pl.col('country').is_in(HIGH_RISK)) &
(pl.col('hour').is_between(0, 5)) # odd hours
).sort('amount', descending=True)
print(f'Suspicious transactions: {len(suspects):,}')
print(suspects.head(10))import polars as pl
import numpy as np
np.random.seed(42)
n = 20
df = pl.DataFrame({
'employee': [f'Emp_{i}' for i in range(n)],
'dept': np.random.choice(['Engineering','Sales','HR','Marketing'], n),
'salary': np.random.randint(45000, 130000, n),
'rating': np.round(np.random.uniform(2.5, 5.0, n), 1),
'remote': np.random.choice([True, False], n)
})
print(df)
# 1. Engineering, salary > 70000, rating >= 4.0
# TODO: result1 = df.filter(
# (pl.col('dept') == 'Engineering') &
# (pl.col('salary') > 70000) &
# (pl.col('rating') >= 4.0)
# )
# print('Filter 1:', result1)
# 2. NOT remote AND salary between 50000 and 90000
# TODO: result2 = df.filter(
# ~pl.col('remote') & pl.col('salary').is_between(50000, 90000)
# )
# print('Filter 2:', result2)
# 3. Sort dept asc, salary desc
# TODO: sorted_df = df.sort(['dept', 'salary'], descending=[False, True])
# print('Sorted:', sorted_df)
# 4. Top-3 rated remote workers
# TODO: top_remote = df.filter(pl.col('remote')).top_k(3, by='rating')
# print('Top remote:', top_remote)Aggregate data by groups using Polars' expressive and parallel groupby engine.
group_by().agg()
import polars as pl
df = pl.DataFrame({
'dept': ['Eng','Eng','Sales','Sales','HR','HR','Eng'],
'name': ['Alice','Bob','Carol','Dave','Eve','Frank','Grace'],
'salary': [90000, 85000, 70000, 65000, 55000, 58000, 95000],
'years': [5, 3, 7, 2, 8, 4, 6]
})
# Basic aggregation
print(df.group_by('dept').agg([
pl.len().alias('headcount'),
pl.col('salary').mean().alias('avg_salary'),
pl.col('salary').max().alias('max_salary'),
pl.col('years').sum().alias('total_years')
]))
# Multiple groups
print(df.group_by(['dept']).agg(
pl.col('salary').median().alias('median_salary')
).sort('median_salary', descending=True))Advanced Aggregations
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'category': np.random.choice(['A','B','C'], 1000),
'value': np.random.normal(100, 20, 1000),
'qty': np.random.randint(1, 50, 1000)
})
stats = df.group_by('category').agg([
pl.len().alias('count'),
pl.col('value').mean().round(2).alias('mean'),
pl.col('value').std().round(2).alias('std'),
pl.col('value').quantile(0.25).round(2).alias('q25'),
pl.col('value').quantile(0.75).round(2).alias('q75'),
pl.col('qty').sum().alias('total_qty'),
(pl.col('value') * pl.col('qty')).sum().round(2).alias('weighted_sum')
]).sort('category')
print(stats)GroupBy with Expressions & Filtering Groups
import polars as pl
import numpy as np
np.random.seed(0)
df = pl.DataFrame({
'region': np.random.choice(['North','South','East','West'], 200),
'product': np.random.choice(['A','B','C'], 200),
'revenue': np.random.exponential(1000, 200).round(2),
'units': np.random.randint(1, 50, 200)
})
# Multi-column groupby with many aggregations
summary = df.group_by(['region', 'product']).agg([
pl.len().alias('count'),
pl.col('revenue').sum().round(2).alias('total_revenue'),
pl.col('revenue').mean().round(2).alias('avg_revenue'),
pl.col('units').sum().alias('total_units'),
(pl.col('revenue').sum() / pl.col('units').sum()).round(2).alias('rev_per_unit')
]).sort(['region', 'product'])
print(summary)
# Filter groups: only regions where total revenue > 20000
high_rev = df.group_by('region').agg(
pl.col('revenue').sum().alias('total')
).filter(pl.col('total') > 20000).sort('total', descending=True)
print('High-revenue regions:', high_rev)group_by_dynamic for Time-Based Grouping & Rolling Aggregations
import polars as pl
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n = 500
base = datetime(2024, 1, 1)
# Build a time-series DataFrame with a proper Datetime column
df = pl.DataFrame({
'ts': [base + timedelta(hours=int(h)) for h in np.random.uniform(0, 24*30, n)],
'sales': np.random.exponential(200, n).round(2),
'units': np.random.randint(1, 20, n),
}).sort('ts')
# group_by_dynamic β aggregate by calendar window (daily buckets)
daily = df.group_by_dynamic('ts', every='1d').agg([
pl.col('sales').sum().round(2).alias('daily_revenue'),
pl.col('units').sum().alias('daily_units'),
pl.len().alias('transactions'),
])
print('Daily aggregation (first 7 days):')
print(daily.head(7))
# Rolling aggregation β 7-day rolling sum over the sorted frame
rolling = df.with_columns(
pl.col('sales').rolling_sum(window_size=7, min_periods=1).alias('rolling_7d_sales')
)
print('Rolling 7-day sales sum (last 5 rows):')
print(rolling.select(['ts', 'sales', 'rolling_7d_sales']).tail(5))import polars as pl
import numpy as np
from datetime import date, timedelta
np.random.seed(42)
n = 50_000
start = date(2024, 1, 1)
orders = pl.DataFrame({
'order_id': range(n),
'store_id': np.random.choice(['S001','S002','S003','S004'], n),
'order_date': [str(start + timedelta(days=int(d)))
for d in np.random.randint(0, 90, n)],
'amount': np.random.exponential(80, n).round(2),
'items': np.random.randint(1, 10, n)
})
daily = orders.group_by(['store_id', 'order_date']).agg([
pl.len().alias('order_count'),
pl.col('amount').sum().round(2).alias('revenue'),
pl.col('amount').mean().round(2).alias('avg_basket'),
pl.col('items').mean().round(1).alias('avg_items')
]).sort(['store_id', 'order_date'])
print(f'Daily summaries: {daily.shape}')
print(daily.head(8))import polars as pl
import numpy as np
np.random.seed(7)
n = 150
df = pl.DataFrame({
'salesperson': [f'Rep_{i%10}' for i in range(n)],
'region': np.random.choice(['North','South','East','West'], n),
'product': np.random.choice(['Widget','Gadget','Doohickey'], n),
'amount': np.random.exponential(800, n).round(2),
'qty': np.random.randint(1, 20, n)
})
# 1. GroupBy region + product
# TODO: summary = df.group_by(['region', 'product']).agg([
# pl.len().alias('count'),
# pl.col('amount').sum().round(2).alias('total_amount'),
# pl.col('amount').mean().round(2).alias('avg_amount'),
# pl.col('qty').sum().alias('total_qty')
# ]).sort(['region', 'product'])
# print(summary)
# 2. Top salesperson per region
# TODO: top_reps = df.group_by(['region', 'salesperson']).agg(
# pl.col('amount').sum().round(2).alias('total_sales')
# ).sort('total_sales', descending=True)
# TODO: print(top_reps.group_by('region').first())
# 3. Filter groups with count >= 5 and avg > 500
# TODO: filtered = summary.filter(
# (pl.col('count') >= 5) & (pl.col('avg_amount') > 500)
# )
# print('Qualifying groups:', filtered)Combine DataFrames with inner, left, outer, cross, and anti joins β all in parallel.
inner, left & outer joins
import polars as pl
customers = pl.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve']
})
orders = pl.DataFrame({
'order_id': [101, 102, 103, 104],
'cust_id': [1, 2, 2, 6], # 6 has no customer
'amount': [250, 80, 120, 400]
})
# Inner join
print('INNER:')
print(customers.join(orders, left_on='id', right_on='cust_id', how='inner'))
# Left join
print('LEFT (all customers):')
print(customers.join(orders, left_on='id', right_on='cust_id', how='left'))
# Anti join β customers with NO orders
print('ANTI (no orders):')
print(customers.join(orders, left_on='id', right_on='cust_id', how='anti'))Semi Join & Concat
import polars as pl
products = pl.DataFrame({
'sku': ['A', 'B', 'C', 'D', 'E'],
'price': [10.0, 25.0, 15.0, 8.0, 50.0]
})
sold = pl.DataFrame({'sku': ['A', 'C', 'E']})
# Semi join β only rows with a match (no extra columns)
print('SEMI (sold products):')
print(products.join(sold, on='sku', how='semi'))
# Vertical concat (stack rows)
df1 = pl.DataFrame({'a': [1, 2], 'b': ['x', 'y']})
df2 = pl.DataFrame({'a': [3, 4], 'b': ['z', 'w']})
print('CONCAT vertical:')
print(pl.concat([df1, df2]))
# Horizontal concat (side by side)
df3 = pl.DataFrame({'c': [10, 20], 'd': [30, 40]})
print('CONCAT horizontal:')
print(pl.concat([df1, df3], how='horizontal'))Cross Join & Suffix Disambiguation
import polars as pl
# Cross join β all combinations
sizes = pl.DataFrame({'size': ['S', 'M', 'L', 'XL']})
colors = pl.DataFrame({'color': ['Red', 'Blue', 'Green']})
variants = sizes.join(colors, how='cross')
print(f'Product variants ({len(variants)} rows):')
print(variants)
# Join with overlapping column names β use suffix
orders = pl.DataFrame({
'order_id': [1, 2, 3],
'amount': [100, 200, 300],
'status': ['paid', 'pending', 'paid']
})
returns = pl.DataFrame({
'order_id': [2, 3],
'amount': [200, 150], # refund amount
'status': ['approved', 'partial']
})
joined = orders.join(returns, on='order_id', how='left', suffix='_refund')
print('With suffix disambiguation:')
print(joined)Semi Join, join with on=, and Asof Join Concept
import polars as pl
# --- Semi join: keep only left rows that have a match ---
inventory = pl.DataFrame({
'sku': ['A001', 'B002', 'C003', 'D004', 'E005'],
'stock': [100, 0, 50, 200, 0]
})
ordered_skus = pl.DataFrame({'sku': ['A001', 'C003', 'E005']})
in_stock_orders = inventory.join(ordered_skus, on='sku', how='semi')
print('Semi join β ordered items in inventory:')
print(in_stock_orders)
# --- join with on= (same column name on both sides) ---
employees = pl.DataFrame({
'dept_id': [1, 2, 3, 1],
'name': ['Alice', 'Bob', 'Carol', 'Dave']
})
departments = pl.DataFrame({
'dept_id': [1, 2, 3],
'dept_name': ['Engineering', 'Sales', 'HR']
})
joined = employees.join(departments, on='dept_id', how='inner')
print('join with on= (shared key name):')
print(joined)
# --- Asof join concept: join on nearest key ---
# Match each trade to the most recent quote before it
quotes = pl.DataFrame({
'time': [1, 3, 5, 8, 12],
'bid': [100.0, 100.5, 101.0, 101.5, 102.0]
})
trades = pl.DataFrame({
'time': [2, 4, 9, 11],
'qty': [10, 5, 20, 8]
})
asof = trades.join_asof(quotes, on='time', strategy='backward')
print('Asof join (each trade gets most recent bid):')
print(asof)import polars as pl
import numpy as np
np.random.seed(42)
customers = pl.DataFrame({
'cust_id': range(1000),
'name': [f'Customer_{i}' for i in range(1000)],
'signup_year': np.random.randint(2018, 2024, 1000)
})
tiers = pl.DataFrame({
'cust_id': range(1000),
'tier': np.random.choice(['Bronze','Silver','Gold','Platinum'], 1000)
})
last_orders = pl.DataFrame({
'cust_id': np.random.choice(range(900), 800, replace=False), # 100 never ordered
'last_order_amount': np.random.exponential(200, 800).round(2)
})
enriched = (
customers
.join(tiers, on='cust_id', how='left')
.join(last_orders, on='cust_id', how='left')
)
print(f'Enriched: {enriched.shape}')
print(enriched.head(5))
print('Never ordered:', enriched['last_order_amount'].is_null().sum())import polars as pl
students = pl.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Carol', 'Dave', 'Eve'],
'grade': ['A', 'B', 'A', 'C', 'B']
})
enrollments = pl.DataFrame({
'student_id': [1, 1, 2, 3, 3, 3],
'course': ['Math', 'Science', 'Math', 'English', 'Math', 'Art'],
'score': [92, 87, 75, 88, 95, 91]
})
# 1. Left join β all students with courses
# TODO: left = students.join(enrollments, left_on='id', right_on='student_id', how='left')
# TODO: print('Left join:', left)
# 2. Anti join β students with no enrollments
# TODO: no_courses = students.join(enrollments, left_on='id', right_on='student_id', how='anti')
# TODO: print('No courses:', no_courses)
# 3. Inner join + avg score per student
# TODO: inner = students.join(enrollments, left_on='id', right_on='student_id', how='inner')
# TODO: avg_scores = inner.group_by('name').agg(pl.col('score').mean().round(1).alias('avg_score'))
# TODO: print('Avg scores:', avg_scores)
# 4. Cross join: each student x each available course
courses = pl.DataFrame({'available_course': ['Math', 'Science', 'Art']})
# TODO: assignments = students.select('name').join(courses, how='cross')
# TODO: print(f'All assignments ({len(assignments)} rows):', assignments)Polars has a rich .str and .dt namespace for vectorized string and datetime manipulations.
String Operations (.str)
import polars as pl
df = pl.DataFrame({
'email': ['alice@example.com', 'bob@test.org', 'carol@example.com'],
'full_name': ['Alice Smith', 'Bob Jones', 'Carol White'],
'code': [' ABC-123 ', 'DEF-456', ' GHI-789 ']
})
result = df.with_columns([
pl.col('email').str.split('@').list.get(1).alias('domain'),
pl.col('email').str.contains('example').alias('is_example'),
pl.col('full_name').str.split(' ').list.get(0).alias('first_name'),
pl.col('full_name').str.to_lowercase().alias('name_lower'),
pl.col('code').str.strip_chars().alias('code_clean'),
pl.col('code').str.strip_chars().str.replace('-', '_').alias('code_underscore')
])
print(result)Date/Time Operations (.dt)
import polars as pl
from datetime import date
df = pl.DataFrame({
'date_str': ['2024-01-15', '2024-03-22', '2024-07-04', '2024-12-31'],
'event': ['Q1 Review', 'Sprint End', 'Holiday', 'Year End']
})
df2 = df.with_columns(
pl.col('date_str').str.to_date('%Y-%m-%d').alias('date')
)
result = df2.with_columns([
pl.col('date').dt.year().alias('year'),
pl.col('date').dt.month().alias('month'),
pl.col('date').dt.day().alias('day'),
pl.col('date').dt.weekday().alias('weekday'), # Mon=1
pl.col('date').dt.quarter().alias('quarter'),
pl.col('date').dt.strftime('%B %d, %Y').alias('formatted')
])
print(result)str.contains, str.split, str.replace & Date Arithmetic
import polars as pl
from datetime import date
# String operations
df = pl.DataFrame({
'url': ['https://example.com/product/123', 'https://example.com/cart', 'https://shop.io/item/456'],
'tags': ['python,data,science', 'ml,ai,deep-learning', 'stats,python,r'],
})
result = df.with_columns([
pl.col('url').str.contains('product').alias('is_product_page'),
pl.col('url').str.split('/').list.last().alias('path_end'),
pl.col('url').str.replace('https://', '').alias('url_no_scheme'),
pl.col('tags').str.split(',').list.len().alias('tag_count'),
pl.col('tags').str.contains('python').alias('has_python'),
])
print(result)
# Date arithmetic
df2 = pl.DataFrame({
'start': [date(2024, 1, 1), date(2024, 3, 15), date(2024, 6, 30)],
'end': [date(2024, 4, 1), date(2024, 5, 20), date(2024, 12, 31)]
})
df3 = df2.with_columns([
pl.col('start').dt.strftime('%B %Y').alias('start_label'),
(pl.col('end') - pl.col('start')).dt.total_days().alias('duration_days'),
pl.col('start').dt.month().alias('start_month'),
pl.col('end').dt.year().alias('end_year'),
])
print(df3)str.extract with Regex, str.split_exact & dt.truncate
import polars as pl
from datetime import date
# String regex extraction
df = pl.DataFrame({'log': [
'2024-01-15 ERROR db_pool Connection timeout after 30s',
'2024-01-16 INFO auth_svc User alice logged in',
'2024-01-17 WARN api_gw Rate limit at 95%',
]})
result = df.with_columns([
pl.col('log').str.extract(r'(\d{4}-\d{2}-\d{2})', 1).alias('date'),
pl.col('log').str.extract(r'\d{4}-\d{2}-\d{2} (\w+)', 1).alias('level'),
pl.col('log').str.extract(r'\w+ \w+\s+(\w+)\s', 1).alias('service'),
])
print('Regex extraction:')
print(result)
# str.split_exact β fixed number of parts
df2 = pl.DataFrame({'ts': ['2024-01-15', '2024-06-30', '2024-12-01']})
split = df2.with_columns(
pl.col('ts').str.split_exact('-', 2).alias('parts')
).unnest('parts').rename({'field_0':'year','field_1':'month','field_2':'day'})
print('\nSplit date parts:')
print(split)
# dt.truncate β round dates to month/week
df3 = pl.DataFrame({'dt': pl.date_range(
pl.date(2024, 1, 1), pl.date(2024, 3, 31), interval='11d', eager=True
)})
df3 = df3.with_columns([
pl.col('dt').dt.truncate('1mo').alias('month_start'),
pl.col('dt').dt.truncate('1w').alias('week_start'),
])
print('\nDate truncation:')
print(df3.head(6))import polars as pl
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n = 10_000
base = datetime(2024, 1, 1)
logs = pl.DataFrame({
'timestamp': [(base + timedelta(hours=int(h))).strftime('%Y-%m-%d %H:%M:%S')
for h in np.random.uniform(0, 24*90, n)],
'user_id': np.random.randint(1, 1000, n),
'page': np.random.choice(['/home','/product','/cart','/checkout'], n),
'response_ms': np.random.exponential(200, n).round(1)
})
enriched = logs.with_columns(
pl.col('timestamp').str.to_datetime('%Y-%m-%d %H:%M:%S').alias('dt')
).with_columns([
pl.col('dt').dt.hour().alias('hour'),
pl.col('dt').dt.weekday().alias('weekday'),
(pl.col('dt').dt.weekday() >= 6).alias('is_weekend')
])
hourly = enriched.group_by('hour').agg([
pl.len().alias('requests'),
pl.col('response_ms').mean().round(1).alias('avg_ms')
]).sort('hour')
print('Peak hour:', hourly.top_k(1, by='requests')['hour'][0], 'h')
print('Weekend traffic:', enriched['is_weekend'].mean().__round__(1))import polars as pl
from datetime import date
df = pl.DataFrame({
'username': ['alice_99', 'bob.smith', 'carol123', 'dave_x', 'eve.m'],
'email': ['alice@gmail.com', 'bob@company.org', 'carol@yahoo.com', 'dave@outlook.com', 'eve@gmail.com'],
'signup_date': ['2023-03-15', '2022-11-01', '2024-01-20', '2021-07-04', '2023-09-30']
})
# 1. Extract domain
# TODO: df = df.with_columns(
# pl.col('email').str.split('@').list.get(1).alias('domain')
# )
# 2. Flag gmail/yahoo
# TODO: df = df.with_columns(
# pl.col('email').str.contains('gmail|yahoo').alias('is_free_email')
# )
# 3. Parse date and extract features
# TODO: df = df.with_columns(
# pl.col('signup_date').str.to_date('%Y-%m-%d').alias('date')
# ).with_columns([
# pl.col('date').dt.year().alias('year'),
# pl.col('date').dt.month().alias('month'),
# pl.col('date').dt.weekday().alias('weekday'),
# ])
# 4. Formatted label
# TODO: df = df.with_columns(
# pl.col('date').dt.strftime('%b %Y').alias('signup_label')
# )
# 5. Days since signup
today = date(2025, 1, 1)
# TODO: df = df.with_columns(
# (pl.lit(today) - pl.col('date')).dt.total_days().alias('days_since_signup')
# )
# TODO: print(df)Use LazyFrame to build a query plan that Polars optimizes and executes in parallel β essential for large data.
LazyFrame Basics
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'id': range(1_000_000),
'value': np.random.randn(1_000_000),
'group': np.random.choice(['A','B','C','D'], 1_000_000),
'score': np.random.randint(0, 100, 1_000_000)
})
# Convert to LazyFrame
result = (
df.lazy() # build query plan
.filter(pl.col('score') > 50) # predicate pushdown
.with_columns(
(pl.col('value') * 2).alias('value2')
)
.group_by('group').agg(
pl.col('value2').mean().round(4).alias('mean_v2'),
pl.len().alias('count')
)
.sort('group')
)
# See the query plan
print(result.explain())
# Execute
print(result.collect())scan_csv & Streaming
import polars as pl
import tempfile, os, numpy as np
# Write a sample CSV to scan
np.random.seed(42)
n = 100_000
tmp = pl.DataFrame({
'a': np.random.randint(0, 100, n),
'b': np.random.randn(n),
'c': np.random.choice(['X','Y','Z'], n)
})
path = os.path.join(tempfile.gettempdir(), 'polars_demo.csv')
tmp.write_csv(path)
# scan_csv: reads lazily β only loads needed data
result = (
pl.scan_csv(path)
.filter(pl.col('c') == 'X')
.filter(pl.col('a') > 90)
.select(['a', 'b'])
.collect()
)
print('Filtered result:', result.shape)
print(result.head())Lazy Pipeline: filter, select, groupby, collect
import polars as pl
import numpy as np
np.random.seed(42)
# Build a large DataFrame then use LazyFrame
df = pl.DataFrame({
'user_id': np.random.randint(1, 500, 50_000),
'product': np.random.choice(['A','B','C','D','E'], 50_000),
'amount': np.random.exponential(100, 50_000).round(2),
'returned': np.random.choice([True, False], 50_000, p=[0.05, 0.95])
})
# Convert to lazy and build a multi-step pipeline
lazy_result = (
df.lazy()
.filter(~pl.col('returned')) # exclude returns
.filter(pl.col('amount') > 10) # min order
.with_columns(
(pl.col('amount') * 1.1).alias('amount_with_tax')
)
.group_by(['user_id', 'product']).agg([
pl.len().alias('orders'),
pl.col('amount_with_tax').sum().round(2).alias('total_spent')
])
.filter(pl.col('orders') >= 3) # frequent buyers
.sort('total_spent', descending=True)
)
# Execute
result = lazy_result.collect()
print(f'Frequent buyers: {len(result):,}')
print(result.head(8))Streaming Mode & LazyFrame Schema Inspection
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
n = 500_000
df = pl.DataFrame({
'id': range(n),
'amount': np.random.exponential(100, n).round(2),
'category': np.random.choice(['A','B','C','D'], n),
'flag': np.random.choice([True, False], n),
})
pq_path = os.path.join(tempfile.gettempdir(), 'streaming_demo.parquet')
df.write_parquet(pq_path)
# LazyFrame schema inspection β no data loaded yet
lf = pl.scan_parquet(pq_path)
print('Schema (no data loaded):')
print(lf.schema)
print('Columns:', lf.columns)
# Build a lazy query
query = (
lf.filter(pl.col('flag') & (pl.col('amount') > 50))
.group_by('category')
.agg([
pl.col('amount').mean().round(2).alias('avg'),
pl.len().alias('count')
])
.sort('avg', descending=True)
)
# Show the optimized query plan
print('\nOptimized plan:')
query.explain(optimized=True)
# Execute
result = query.collect()
print('\nResult:')
print(result)import polars as pl
import numpy as np
import tempfile, os
# Simulate large dataset written to CSV
np.random.seed(42)
n = 200_000
big_df = pl.DataFrame({
'ts': np.arange(n),
'user': np.random.randint(1, 10000, n),
'event': np.random.choice(['click','view','buy','exit'], n),
'duration_s': np.random.exponential(30, n).round(1),
'country': np.random.choice(['US','UK','DE','JP','BR'], n)
})
path = os.path.join(tempfile.gettempdir(), 'events.csv')
big_df.write_csv(path)
# Lazy pipeline β only loads relevant columns and rows
summary = (
pl.scan_csv(path)
.filter(pl.col('event') == 'buy')
.filter(pl.col('duration_s') > 5)
.group_by('country').agg([
pl.len().alias('purchases'),
pl.col('duration_s').mean().round(2).alias('avg_duration'),
pl.col('user').n_unique().alias('unique_buyers')
])
.sort('purchases', descending=True)
.collect()
)
print('Purchase summary by country:')
print(summary)import polars as pl
import numpy as np
np.random.seed(0)
n = 100_000
df = pl.DataFrame({
'customer_id': np.random.randint(1, 5000, n),
'category': np.random.choice(['Food','Tech','Fashion','Home'], n),
'spend': np.random.exponential(60, n).round(2),
'is_member': np.random.choice([True, False], n, p=[0.4, 0.6])
})
# Build the lazy pipeline
# TODO: lazy = df.lazy()
# TODO: lazy = lazy.filter(pl.col('is_member') & (pl.col('spend') > 20))
# TODO: lazy = lazy.with_columns(
# (pl.col('spend') * 0.9).round(2).alias('spend_after_discount')
# )
# TODO: lazy = lazy.group_by('category').agg([
# pl.len().alias('count'),
# pl.col('spend').sum().round(2).alias('total_spend'),
# pl.col('spend_after_discount').mean().round(2).alias('avg_discounted')
# ])
# TODO: lazy = lazy.sort('total_spend', descending=True)
# Print query plan and execute
# TODO: print(lazy.explain())
# TODO: result = lazy.collect()
# TODO: print(result)Polars natively reads/writes CSV, JSON, Parquet, and Arrow β Parquet is the recommended format.
CSV & JSON
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
df = pl.DataFrame({
'id': range(100),
'name': [f'Item_{i}' for i in range(100)],
'value': np.random.randn(100).round(4),
'tag': np.random.choice(['A','B','C'], 100)
})
tmp = tempfile.gettempdir()
# CSV
csv_path = os.path.join(tmp, 'demo.csv')
df.write_csv(csv_path)
df_csv = pl.read_csv(csv_path)
print('CSV roundtrip:', df_csv.shape)
# JSON (newline-delimited)
json_path = os.path.join(tmp, 'demo.ndjson')
df.write_ndjson(json_path)
df_json = pl.read_ndjson(json_path)
print('NDJSON roundtrip:', df_json.shape)Parquet (Recommended Format)
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
n = 500_000
df = pl.DataFrame({
'id': range(n),
'amount': np.random.exponential(200, n).round(2),
'category': np.random.choice(['A','B','C','D'], n),
'flag': np.random.choice([True, False], n)
})
tmp = tempfile.gettempdir()
pq_path = os.path.join(tmp, 'demo.parquet')
# Write Parquet (compressed, columnar)
df.write_parquet(pq_path, compression='zstd')
size_mb = os.path.getsize(pq_path) / 1024 / 1024
print(f'Parquet size: {size_mb:.2f} MB (for {n:,} rows)')
# Read full
df2 = pl.read_parquet(pq_path)
print('Read back:', df2.shape)
# Scan (lazy) β column pruning + predicate pushdown
result = pl.scan_parquet(pq_path).filter(
pl.col('category') == 'A'
).select(['id','amount']).collect()
print('Filtered parquet:', result.shape)IPC/Arrow Format and In-Memory Read
import polars as pl
import numpy as np
import tempfile, os, io
np.random.seed(42)
df = pl.DataFrame({
'id': range(1000),
'value': np.random.randn(1000).round(4),
'category': np.random.choice(['A','B','C'], 1000)
})
tmp = tempfile.gettempdir()
# IPC (Arrow) β fastest local I/O format
ipc_path = os.path.join(tmp, 'demo.arrow')
df.write_ipc(ipc_path)
df_ipc = pl.read_ipc(ipc_path)
size_ipc = os.path.getsize(ipc_path) / 1024
print(f'IPC size: {size_ipc:.1f} KB, shape: {df_ipc.shape}')
# In-memory CSV read from string
csv_str = df.write_csv()
df_str = pl.read_csv(io.StringIO(csv_str))
print('From string CSV:', df_str.shape)
# Column pruning on Parquet read
pq_path = os.path.join(tmp, 'pruned_demo.parquet')
df.write_parquet(pq_path)
df_cols = pl.read_parquet(pq_path, columns=['id', 'category'])
print('Column-pruned parquet:', df_cols.shape, df_cols.columns)scan_csv with Schema Override & In-Memory Parquet
import polars as pl
import io
# scan_csv with explicit schema override
csv_data = 'id,score,grade,active\n1,92.5,A,true\n2,78.0,B,false\n3,85.5,A,true\n'
schema_override = {
'id': pl.Int32,
'score': pl.Float32,
'grade': pl.Categorical,
'active': pl.Boolean,
}
df = pl.read_csv(io.StringIO(csv_data), schema_overrides=schema_override)
print('With schema override:')
print(df)
print('Dtypes:', df.dtypes)
# In-memory Parquet (BytesIO β no file system needed)
import numpy as np
np.random.seed(0)
df2 = pl.DataFrame({
'x': np.random.randn(1000).astype('float32'),
'y': np.random.randint(0, 10, 1000),
})
buf = io.BytesIO()
df2.write_parquet(buf, compression='zstd')
buf.seek(0)
size_kb = buf.getbuffer().nbytes / 1024
df3 = pl.read_parquet(buf)
print(f'\nIn-memory Parquet: {df2.shape} β {size_kb:.1f} KB')
print('Read back:', df3.shape)import polars as pl
import numpy as np
import tempfile, os
from datetime import date, timedelta
np.random.seed(42)
n = 100_000
start = date(2024, 1, 1)
df = pl.DataFrame({
'txn_id': range(n),
'date': [str(start + timedelta(days=int(d))) for d in np.random.randint(0, 365, n)],
'amount': np.random.exponential(150, n).round(2),
'merchant': np.random.choice(['Amazon','Netflix','Uber','Spotify'], n)
})
# Add month column for partitioning
df2 = df.with_columns(
pl.col('date').str.to_date('%Y-%m-%d').dt.month().alias('month')
)
tmp = tempfile.gettempdir()
pq_path = os.path.join(tmp, 'transactions.parquet')
df2.write_parquet(pq_path, compression='snappy')
# Query Jan only β fast because of column pushdown
jan = pl.scan_parquet(pq_path).filter(
pl.col('month') == 1
).collect()
print(f'January transactions: {len(jan):,}')
print(f'Jan revenue: ${jan["amount"].sum():,.2f}')import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
n = 10_000
df = pl.DataFrame({
'order_id': range(n),
'customer': [f'cust_{i % 500}' for i in range(n)],
'product': np.random.choice(['Widget','Gadget','Doohickey'], n),
'amount': np.random.exponential(100, n).round(2),
'region': np.random.choice(['North','South','East','West'], n),
})
tmp = tempfile.gettempdir()
# TODO: Write df to CSV and read it back
# csv_path = os.path.join(tmp, 'orders.csv')
# df.write_csv(???)
# df_csv = pl.read_csv(???)
# print('CSV roundtrip:', df_csv.shape)
# TODO: Write df to Parquet with 'zstd' compression
# pq_path = os.path.join(tmp, 'orders.parquet')
# df.write_parquet(???, compression='zstd')
# TODO: Use scan_parquet (lazy) to filter region == 'North'
# and select only ['order_id', 'customer', 'amount']
# result = pl.scan_parquet(pq_path).filter(???).select(???).collect()
# print('North orders:', result.shape)
# print('North revenue: ${:.2f}'.format(result['amount'].sum()))Compute rolling statistics, cumulative sums, and rank within groups using Polars window expressions.
Rolling & Cumulative
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'day': range(1, 31),
'sales': np.random.randint(50, 200, 30)
})
result = df.with_columns([
pl.col('sales').cum_sum().alias('cumulative_sales'),
pl.col('sales').rolling_mean(window_size=7).round(1).alias('7d_avg'),
pl.col('sales').rolling_max(window_size=7).alias('7d_max'),
pl.col('sales').rolling_std(window_size=7).round(2).alias('7d_std'),
pl.col('sales').pct_change().round(4).alias('pct_change')
])
print(result.tail(10))Group Window Expressions
import polars as pl
df = pl.DataFrame({
'dept': ['Eng','Eng','Eng','Sales','Sales','HR','HR'],
'name': ['Alice','Bob','Carol','Dave','Eve','Frank','Grace'],
'salary': [90000, 85000, 95000, 70000, 65000, 55000, 58000]
})
result = df.with_columns([
pl.col('salary').rank('dense').over('dept').alias('rank_in_dept'),
pl.col('salary').mean().over('dept').round(0).alias('dept_avg'),
(pl.col('salary') - pl.col('salary').mean().over('dept'))
.round(0).alias('vs_dept_avg'),
pl.col('salary').max().over('dept').alias('dept_max')
])
print(result)Lead/Lag and Running Max
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'day': range(1, 21),
'sales': np.random.randint(100, 500, 20)
})
result = df.with_columns([
pl.col('sales').shift(1).alias('prev_day'),
pl.col('sales').shift(-1).alias('next_day'),
(pl.col('sales') - pl.col('sales').shift(1)).alias('day_delta'),
pl.col('sales').rolling_mean(window_size=3, min_periods=1).round(1).alias('3d_avg'),
pl.col('sales').cum_max().alias('running_max'),
(pl.col('sales') / pl.col('sales').max() * 100).round(1).alias('pct_of_max')
])
print(result)ewm_mean (Exponential Weighted), rolling_quantile & map_elements
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
'day': range(1, 31),
'price': np.random.uniform(95, 105, 30).round(2),
'vol': np.random.randint(1000, 5000, 30),
})
result = df.with_columns([
# Exponential weighted mean (recent values get more weight)
pl.col('price').ewm_mean(span=5).round(3).alias('ewm_5'),
# Rolling 7-day median (robust to outliers)
pl.col('price').rolling_median(window_size=7).round(3).alias('rolling_med_7'),
# Rolling 75th percentile
pl.col('vol').rolling_quantile(quantile=0.75, window_size=7).alias('vol_q75'),
])
print(result.head(10))
# map_elements for custom per-element logic
df2 = pl.DataFrame({'scores': [[85, 90, 78], [60, 70], [95, 88, 92, 80]]})
result2 = df2.with_columns(
pl.col('scores').map_elements(
lambda lst: round(sum(lst) / len(lst), 2),
return_dtype=pl.Float64
).alias('avg_score')
)
print('\nCustom avg via map_elements:')
print(result2)import polars as pl
import numpy as np
from datetime import date, timedelta
np.random.seed(42)
n = 500
start = date(2024, 1, 1)
df = pl.DataFrame({
'date': [str(start + timedelta(days=i)) for i in range(n)],
'region': np.random.choice(['North','South','East','West'], n),
'rep': [f'Rep_{np.random.randint(1,6)}' for _ in range(n)],
'revenue': np.random.exponential(5000, n).round(2)
})
result = df.sort('date').with_columns([
pl.col('revenue').rolling_mean(window_size=30).round(2).alias('30d_avg_rev'),
pl.col('revenue').cum_sum().alias('ytd_revenue'),
pl.col('revenue').rank('dense', descending=True).over('region').alias('region_rank')
])
# Top performer per region
top = result.filter(pl.col('region_rank') == 1).group_by('region').agg(
pl.col('rep').first(),
pl.col('revenue').sum().round(2).alias('total_rev')
).sort('total_rev', descending=True)
print('Top reps by region:')
print(top)import polars as pl
import numpy as np
from datetime import date, timedelta
np.random.seed(42)
n = 90
df = pl.DataFrame({
'date': [str(date(2024, 1, 1) + timedelta(days=i)) for i in range(n)],
'product': np.random.choice(['Widget', 'Gadget', 'Doohickey'], n),
'units_sold': np.random.randint(10, 200, n),
'revenue': np.random.exponential(500, n).round(2),
})
# TODO: Sort by date and add window columns:
# 1. rolling_mean of revenue (window=7, min_periods=1) -> 'rolling_7d'
# 2. cum_sum of units_sold -> 'cumulative_units'
# 3. revenue / max(revenue) * 100 -> 'pct_of_peak'
# 4. rank of revenue within 'product' group (descending) -> 'product_rank'
result = df # TODO: replace
# TODO: Filter product_rank <= 3 and print top days per product
# top3 = result.filter(pl.col('product_rank') <= 3).sort(['product', 'product_rank'])
# print(top3)Key differences and equivalents between Polars and Pandas β migrate your workflows efficiently.
Side-by-Side Comparison
import polars as pl
import pandas as pd
# Creating a DataFrame
# Pandas:
pd_df = pd.DataFrame({'a': [1,2,3], 'b': [4,5,6]})
# Polars:
pl_df = pl.DataFrame({'a': [1,2,3], 'b': [4,5,6]})
# Filtering
# Pandas: df[df['a'] > 1]
# Polars:
print(pl_df.filter(pl.col('a') > 1))
# Adding column
# Pandas: df['c'] = df['a'] + df['b']
# Polars:
pl_df2 = pl_df.with_columns((pl.col('a') + pl.col('b')).alias('c'))
print(pl_df2)
# GroupBy
# Pandas: df.groupby('a')['b'].sum()
# Polars:
pl_df3 = pl.DataFrame({'a': ['x','x','y','y'], 'b': [1,2,3,4]})
print(pl_df3.group_by('a').agg(pl.col('b').sum()))Converting Between Polars & Pandas
import polars as pl
import pandas as pd
import numpy as np
# Start with Pandas
pd_df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Carol'],
'score': [92.5, 88.0, 95.1]
})
# Pandas -> Polars
pl_df = pl.from_pandas(pd_df)
print('From pandas:', pl_df)
# Polars -> Pandas
back_to_pd = pl_df.to_pandas()
print('Back to pandas:', back_to_pd.dtypes)
# Polars -> NumPy
arr = pl_df['score'].to_numpy()
print('NumPy array:', arr)
# Apply pandas where Polars lacks support
# e.g., complex visualization or sklearn pipelines
from sklearn.preprocessing import StandardScaler
X = StandardScaler().fit_transform(pl_df.select('score').to_pandas())
print('Scaled scores:', X.flatten().round(2))Method Chaining Comparison
import polars as pl
import pandas as pd
import numpy as np
np.random.seed(42)
n = 1000
data = {
'dept': np.random.choice(['Eng', 'Sales', 'HR'], n),
'score': np.random.randint(0, 100, n),
'active': np.random.choice([True, False], n)
}
# Pandas: filter + conditional column + groupby
pd_df = pd.DataFrame(data)
pd_active = pd_df[pd_df['active']].copy()
pd_active['grade'] = pd.cut(pd_active['score'], bins=[0,60,80,100], labels=['C','B','A'])
pd_result = pd_active.groupby(['dept','grade'])['score'].mean().round(2).reset_index()
print('Pandas:')
print(pd_result.sort_values(['dept','grade']).head(6))
# Polars equivalent β more explicit, no chained assignment
pl_df = pl.DataFrame(data)
pl_result = (
pl_df.filter(pl.col('active'))
.with_columns(
pl.when(pl.col('score') >= 80).then(pl.lit('A'))
.when(pl.col('score') >= 60).then(pl.lit('B'))
.otherwise(pl.lit('C')).alias('grade')
)
.group_by(['dept', 'grade'])
.agg(pl.col('score').mean().round(2).alias('avg_score'))
.sort(['dept', 'grade'])
)
print('\nPolars:')
print(pl_result.head(6))Arrow Zero-Copy Interchange & Categorical dtype
import polars as pl
import pandas as pd
import numpy as np
# Categorical dtype in Polars (much more efficient than object)
np.random.seed(42)
n = 100_000
df_pl = pl.DataFrame({
'product': pl.Series(np.random.choice(['Widget','Gadget','Doohickey'], n)).cast(pl.Categorical),
'region': pl.Series(np.random.choice(['North','South','East','West'], n)).cast(pl.Categorical),
'revenue': np.random.exponential(200, n).round(2),
})
print('Categorical memory usage:')
print(f' estimated size: {df_pl.estimated_size("mb"):.2f} MB')
print(f' product unique values: {df_pl["product"].n_unique()}')
# Zero-copy to pandas via Arrow
df_pd = df_pl.to_pandas(use_pyarrow_extension_array=True)
print('\nArrow-backed pandas dtypes:')
print(df_pd.dtypes)
# from_pandas preserving categorical
df_pd2 = pd.DataFrame({
'color': pd.Categorical(['red','blue','red','green','blue']),
'val': [1, 2, 3, 4, 5]
})
df_pl2 = pl.from_pandas(df_pd2)
print('\nFrom pandas Categorical:')
print(df_pl2)
print('dtype:', df_pl2['color'].dtype)import polars as pl
import pandas as pd
import numpy as np
import time
np.random.seed(42)
n = 500_000
data = {
'user_id': np.random.randint(1, 10000, n),
'amount': np.random.exponential(100, n),
'category': np.random.choice(['A','B','C'], n)
}
# PANDAS approach
t0 = time.time()
pd_df = pd.DataFrame(data)
pd_result = (pd_df[pd_df['amount'] > 50]
.groupby('category')['amount']
.agg(['mean','sum','count'])
.reset_index())
t_pd = time.time() - t0
# POLARS approach
t0 = time.time()
pl_df = pl.DataFrame(data)
pl_result = (pl_df.filter(pl.col('amount') > 50)
.group_by('category').agg([
pl.col('amount').mean().round(4).alias('mean'),
pl.col('amount').sum().round(2).alias('sum'),
pl.len().alias('count')
])
.sort('category'))
t_pl = time.time() - t0
print(f'Pandas: {t_pd:.3f}s')
print(f'Polars: {t_pl:.3f}s')
print(f'Speedup: {t_pd/t_pl:.1f}x')
print(pl_result)import pandas as pd
import polars as pl
import numpy as np
np.random.seed(42)
n = 50_000
data = {
'user_id': np.random.randint(1, 5000, n),
'product': np.random.choice(['A', 'B', 'C', 'D'], n),
'spend': np.random.exponential(75, n).round(2),
'is_premium': np.random.choice([True, False], n)
}
# Existing Pandas pipeline (working β do not modify)
pd_df = pd.DataFrame(data)
pd_result = (
pd_df[pd_df['is_premium'] & (pd_df['spend'] > 50)]
.groupby('product')['spend']
.agg(['mean', 'sum', 'count'])
.reset_index()
.sort_values('sum', ascending=False)
)
print('Pandas result:')
print(pd_result)
# TODO: Rewrite in Polars
pl_df = pl.DataFrame(data)
pl_result = (
pl_df
# TODO: .filter(is_premium AND spend > 50)
# TODO: .group_by('product').agg(mean, sum, count of 'spend')
# TODO: .sort('sum', descending=True)
)
print('\nPolars result:')
# print(pl_result)Use Polars LazyFrame to build query plans that are optimized before execution. Process datasets larger than RAM with streaming mode.
LazyFrame vs DataFrame
import polars as pl
import numpy as np
np.random.seed(42)
data = {
'id': range(100_000),
'category': np.random.choice(['A','B','C','D'], 100_000),
'value': np.random.randn(100_000) * 100 + 500,
'qty': np.random.randint(1, 100, 100_000),
}
# Eager DataFrame
df = pl.DataFrame(data)
# Lazy: nothing executes yet
lf = pl.LazyFrame(data)
plan = (
lf
.filter(pl.col('value') > 500)
.group_by('category')
.agg(
pl.col('value').mean().alias('avg_val'),
pl.col('qty').sum().alias('total_qty'),
)
.sort('avg_val', descending=True)
)
print('Lazy plan (before execution):')
print(plan.explain())
result = plan.collect() # executes query
print('\nResult after .collect():')
print(result)Query Optimization with LazyFrame
import polars as pl
import numpy as np
import time
np.random.seed(0)
n = 500_000
df = pl.DataFrame({
'user_id': np.random.randint(1, 10000, n),
'product': np.random.choice(['laptop','phone','tablet','watch'], n),
'revenue': np.random.exponential(100, n),
'country': np.random.choice(['US','UK','DE','FR','JP'], n),
})
# Eager: no optimization
t0 = time.perf_counter()
eager = (
df
.filter(pl.col('country') == 'US')
.filter(pl.col('revenue') > 50)
.group_by('product')
.agg(pl.col('revenue').sum())
)
t_eager = time.perf_counter() - t0
# Lazy: Polars merges filters and optimizes scan
t0 = time.perf_counter()
lazy = (
df.lazy()
.filter(pl.col('country') == 'US')
.filter(pl.col('revenue') > 50)
.group_by('product')
.agg(pl.col('revenue').sum())
.collect()
)
t_lazy = time.perf_counter() - t0
print(f'Eager: {t_eager*1000:.1f}ms | Lazy: {t_lazy*1000:.1f}ms')
print(lazy.sort('revenue', descending=True))scan_csv for Larger-than-RAM Processing
import polars as pl
import tempfile, os, numpy as np
# Create a sample CSV file
np.random.seed(42)
rows = 200_000
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
fname = f.name
f.write('id,category,value,qty\n')
cats = np.random.choice(['A','B','C'], rows)
vals = np.random.randn(rows) * 50 + 200
qtys = np.random.randint(1, 50, rows)
for i in range(rows):
f.write(f'{i},{cats[i]},{vals[i]:.2f},{qtys[i]}\n')
# scan_csv = lazy: doesn't load file yet
lf = pl.scan_csv(fname)
print('Schema:', lf.schema)
result = (
lf
.filter(pl.col('value') > 200)
.group_by('category')
.agg(
pl.col('value').mean().round(2).alias('avg_value'),
pl.col('qty').sum().alias('total_qty'),
pl.len().alias('count'),
)
.sort('avg_value', descending=True)
.collect(streaming=True) # streaming=True for large files
)
print(result)
os.unlink(fname)Sink to Parquet (streaming write)
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
n = 100_000
df = pl.DataFrame({
'ts': pl.date_range(
pl.date(2024, 1, 1), pl.date(2024, 12, 31),
interval='1h', eager=True
)[:n],
'sensor': np.random.choice(['s1','s2','s3'], n),
'reading': np.random.normal(25, 5, n),
})
tmpdir = tempfile.mkdtemp()
out_path = os.path.join(tmpdir, 'out.parquet')
# Write to parquet
df.write_parquet(out_path)
# Read back with scan_parquet (lazy)
result = (
pl.scan_parquet(out_path)
.filter(pl.col('reading') > 25)
.group_by('sensor')
.agg(pl.col('reading').mean().alias('avg_reading'), pl.len().alias('n'))
.collect()
)
print(result)
fsize = os.path.getsize(out_path)
print(f'Parquet file size: {fsize/1024:.1f} KB for {n:,} rows')
import shutil; shutil.rmtree(tmpdir)import polars as pl
import numpy as np
import tempfile, os
# Simulate a medium CSV log file
np.random.seed(7)
n = 500_000
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
fname = f.name
f.write('user_id,region,product,event,ts\n')
regions = np.random.choice(['NA','EU','APAC'], n)
products = np.random.choice([f'p{i}' for i in range(20)], n)
events = np.random.choice(['click','view','purchase'], n, p=[0.6,0.3,0.1])
users = np.random.randint(1, 10000, n)
for i in range(n):
f.write(f'{users[i]},{regions[i]},{products[i]},{events[i]},2024-01-{(i%28)+1:02d}\n')
result = (
pl.scan_csv(fname)
.filter(pl.col('event') == 'purchase')
.group_by(['region', 'product'])
.agg(pl.len().alias('purchases'))
.sort('purchases', descending=True)
.collect(streaming=True)
)
print('Top purchases by region:')
print(result.head(10))
os.unlink(fname)import polars as pl
import numpy as np
import time
np.random.seed(42)
n = 1_000_000
df = pl.DataFrame({
'user_id': np.random.randint(1, 100000, n),
'country': np.random.choice(['US','UK','DE','FR','JP','AU'], n),
'product_category': np.random.choice(['electronics','clothing','food','books'], n),
'revenue': np.random.exponential(50, n),
})
# TODO: (1) Total revenue per country using LazyFrame
# TODO: (2) Top 3 categories per country by revenue
# TODO: Compare eager vs lazy execution time
# TODO: print the query plan with .explain()
Compute rolling statistics, cumulative aggregates, rank-based features, and temporal calculations within groups using Polars window expressions.
Rolling & Expanding Windows
import polars as pl
import numpy as np
np.random.seed(42)
dates = pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 12, 31), interval='1d', eager=True)
df = pl.DataFrame({
'date': dates,
'sales': np.random.normal(1000, 200, len(dates)).round(2),
})
result = df.with_columns([
pl.col('sales').rolling_mean(window_size=7).alias('rolling_7d'),
pl.col('sales').rolling_mean(window_size=30).alias('rolling_30d'),
pl.col('sales').rolling_std(window_size=7).alias('rolling_std_7d'),
pl.col('sales').cum_sum().alias('cumulative_sales'),
pl.col('sales').rolling_max(window_size=7).alias('rolling_max_7d'),
])
print(result.head(35).tail(5))
print(f'\nTotal annual sales: {result["cumulative_sales"][-1]:,.0f}')
print(f'Peak 7-day avg: {result["rolling_7d"].max():,.1f}')Rank and Dense Rank
import polars as pl
import numpy as np
np.random.seed(0)
df = pl.DataFrame({
'product': np.random.choice(['A','B','C','D','E'], 50),
'region': np.random.choice(['North','South','East'], 50),
'revenue': np.random.exponential(1000, 50).round(2),
})
# Global rank
ranked = df.with_columns([
pl.col('revenue').rank(method='dense', descending=True).alias('global_rank'),
])
# Rank within region
regional = df.with_columns([
pl.col('revenue').rank(method='dense', descending=True)
.over('region').alias('region_rank'),
pl.col('revenue').rank(method='ordinal', descending=True)
.over('region').alias('region_ordinal_rank'),
])
print('Top 5 by revenue with global rank:')
print(ranked.sort('global_rank').head())
print('\nTop 3 per region:')
print(regional.filter(pl.col('region_rank') <= 3).sort(['region','region_rank']))Group-wise Aggregations with over()
import polars as pl
import numpy as np
np.random.seed(42)
n = 200
df = pl.DataFrame({
'employee': [f'emp_{i}' for i in range(n)],
'dept': np.random.choice(['Engineering','Marketing','Sales','HR'], n),
'salary': np.random.normal(80000, 20000, n).round(0),
'yrs_exp': np.random.randint(1, 20, n),
})
result = df.with_columns([
pl.col('salary').mean().over('dept').alias('dept_avg_salary'),
pl.col('salary').rank(method='dense', descending=True).over('dept').alias('dept_rank'),
(pl.col('salary') - pl.col('salary').mean().over('dept')).alias('salary_vs_dept_avg'),
pl.len().over('dept').alias('dept_size'),
])
print('Employees earning above dept average:')
above_avg = result.filter(pl.col('salary_vs_dept_avg') > 0)
print(above_avg.sort('dept').select(['employee','dept','salary','dept_avg_salary','dept_rank']).head(8))Shift & Lag Features for Time Series
import polars as pl
import numpy as np
np.random.seed(7)
dates = pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 6, 30), interval='1d', eager=True)
stocks = ['AAPL', 'GOOG']
all_dfs = []
for stock in stocks:
prices = 150 + np.cumsum(np.random.randn(len(dates)) * 2)
all_dfs.append(pl.DataFrame({'date': dates, 'stock': stock, 'close': prices.round(2)}))
df = pl.concat(all_dfs).sort(['stock', 'date'])
result = df.with_columns([
pl.col('close').shift(1).over('stock').alias('prev_close'),
pl.col('close').shift(7).over('stock').alias('close_7d_ago'),
pl.col('close').pct_change().over('stock').alias('daily_return'),
pl.col('close').rolling_mean(window_size=5).over('stock').alias('ma5'),
pl.col('close').rolling_mean(window_size=20).over('stock').alias('ma20'),
])
print(result.filter(pl.col('stock') == 'AAPL').tail(5))import polars as pl
import numpy as np
np.random.seed(42)
dates = pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 12, 31), interval='1d', eager=True)
cats = ['Electronics', 'Clothing', 'Food']
frames = []
for cat in cats:
revenue = 5000 + np.cumsum(np.random.randn(len(dates)) * 200)
frames.append(pl.DataFrame({'date': dates, 'category': cat, 'revenue': revenue.round(2)}))
df = pl.concat(frames).sort(['category','date'])
result = df.with_columns([
pl.col('revenue').rolling_mean(window_size=30).over('category').alias('ma30'),
pl.col('revenue').pct_change().over('category').alias('dod_growth'),
pl.col('date').dt.month().alias('month'),
])
monthly = (
result
.group_by(['month','category'])
.agg(pl.col('revenue').sum().alias('monthly_rev'))
.with_columns(
pl.col('monthly_rev').rank(method='dense', descending=True).over('month').alias('rank')
)
.sort(['month','rank'])
)
print(monthly.head(9))import polars as pl
import numpy as np
np.random.seed(42)
dates = pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 6, 30), interval='1d', eager=True)
prods = ['alpha','beta','gamma','delta','epsilon']
frames = []
for p in prods:
sales = np.abs(np.random.normal(500, 150, len(dates)))
frames.append(pl.DataFrame({'date': dates, 'product': p, 'sales': sales.round(2)}))
df = pl.concat(frames).sort(['product','date'])
# TODO: Add 7d and 30d rolling means per product
# TODO: Add wow_growth = (sales - shift(7)) / shift(7) per product
# TODO: Add cumulative sales per product
# TODO: Add weekly rank within each week per product
# TODO: Print peak day and best rank for each product
Maximize Polars performance through parallelism settings, predicate pushdown, schema optimization, and benchmarking against pandas.
Polars vs Pandas Benchmark
import polars as pl
import pandas as pd
import numpy as np
import time
np.random.seed(42)
n = 1_000_000
data = {
'id': np.random.randint(1, 10000, n),
'category': np.random.choice(['A','B','C','D','E'], n),
'value': np.random.randn(n) * 100,
'qty': np.random.randint(1, 50, n),
}
df_pd = pd.DataFrame(data)
df_pl = pl.DataFrame(data)
def bench(fn, name, reps=3):
times = []
for _ in range(reps):
t0 = time.perf_counter()
fn()
times.append(time.perf_counter() - t0)
print(f'{name:<30} {min(times)*1000:.1f}ms (best of {reps})')
bench(lambda: df_pd.groupby('category')['value'].agg(['mean','sum','std']), 'Pandas groupby')
bench(lambda: df_pl.group_by('category').agg(pl.col('value').mean(), pl.col('value').sum(), pl.col('value').std()), 'Polars groupby')
bench(lambda: df_pd.sort_values('value'), 'Pandas sort')
bench(lambda: df_pl.sort('value'), 'Polars sort')Schema Optimization & Categorical Encoding
import polars as pl
import numpy as np
np.random.seed(42)
n = 500_000
# Default dtype inference
df_default = pl.DataFrame({
'user_id': np.random.randint(1, 10000, n),
'country': np.random.choice(['US','UK','DE','FR','JP'], n),
'product': np.random.choice([f'prod_{i}' for i in range(100)], n),
'revenue': np.random.exponential(50, n),
'quantity': np.random.randint(1, 20, n),
})
# Optimized dtypes
df_opt = df_default.with_columns([
pl.col('user_id').cast(pl.UInt32),
pl.col('country').cast(pl.Categorical),
pl.col('product').cast(pl.Categorical),
pl.col('revenue').cast(pl.Float32),
pl.col('quantity').cast(pl.UInt8),
])
def mem_mb(df): return df.estimated_size('mb')
print(f'Default schema: {mem_mb(df_default):.1f} MB')
print(f'Optimized schema: {mem_mb(df_opt):.1f} MB')
print(f'Reduction: {(1 - mem_mb(df_opt)/mem_mb(df_default))*100:.0f}%')
print('\nOptimized schema:')
print(df_opt.schema)Parallelism & Thread Control
import polars as pl
import numpy as np
import time
print(f'Polars version: {pl.__version__}')
print(f'Available threads: {pl.thread_pool_size()}')
np.random.seed(0)
n = 2_000_000
df = pl.DataFrame({
'g': np.random.randint(0, 100, n),
'v': np.random.randn(n),
})
# Polars uses multiple threads automatically
# Demonstrate with a complex query
t0 = time.perf_counter()
result = (
df.lazy()
.group_by('g')
.agg([
pl.col('v').mean().alias('mean'),
pl.col('v').std().alias('std'),
pl.col('v').quantile(0.25).alias('q25'),
pl.col('v').quantile(0.75).alias('q75'),
pl.col('v').max() - pl.col('v').min(),
])
.collect()
)
t = time.perf_counter() - t0
print(f'\n{n:,} rows, 100 groups, 5 aggs: {t*1000:.1f}ms')
print(result.head())Predicate Pushdown & Column Pruning
import polars as pl
import numpy as np
np.random.seed(42)
n = 1_000_000
df = pl.DataFrame({
'id': np.arange(n),
'region': np.random.choice(['NA','EU','APAC'], n),
'product': np.random.choice(['A','B','C','D'], n),
'revenue': np.random.exponential(200, n),
'cost': np.random.exponential(80, n),
'quantity': np.random.randint(1, 100, n),
'discount': np.random.uniform(0, 0.3, n),
})
# Without optimization hints (eager)
# Polars lazy plan shows predicate/column pushdown
lf = df.lazy()
plan = (
lf
.filter(pl.col('region') == 'EU')
.select(['product', 'revenue', 'quantity'])
.group_by('product')
.agg(
pl.col('revenue').sum().alias('total_rev'),
pl.col('quantity').mean().alias('avg_qty'),
)
)
print('Optimized query plan:')
print(plan.explain(optimized=True))
result = plan.collect()
print('\nResult:')
print(result.sort('total_rev', descending=True))import polars as pl
import numpy as np
import time
np.random.seed(0)
n = 2_000_000
df = pl.DataFrame({
'region': np.random.choice(['NA','EU','APAC','LATAM'], n),
'product': np.random.choice([f'p{i}' for i in range(50)], n),
'revenue': np.random.exponential(100, n),
'cost': np.random.exponential(40, n),
'quantity': np.random.randint(1, 100, n),
'returned': np.random.choice([True, False], n, p=[0.05, 0.95]),
}).with_columns([
pl.col('region').cast(pl.Categorical),
pl.col('product').cast(pl.Categorical),
pl.col('revenue').cast(pl.Float32),
pl.col('cost').cast(pl.Float32),
pl.col('quantity').cast(pl.UInt16),
])
t0 = time.perf_counter()
result = (
df.lazy()
.filter(~pl.col('returned'))
.group_by(['region', 'product'])
.agg([
pl.col('revenue').sum().alias('total_revenue'),
pl.col('cost').sum().alias('total_cost'),
pl.col('quantity').sum().alias('total_qty'),
(pl.col('revenue') - pl.col('cost')).mean().alias('avg_margin'),
pl.len().alias('transactions'),
])
.with_columns((pl.col('total_revenue') - pl.col('total_cost')).alias('profit'))
.sort('profit', descending=True)
.collect()
)
t = time.perf_counter() - t0
print(f'{n:,} rows processed in {t:.2f}s')
print(result.head(5))import polars as pl
import numpy as np
import time
np.random.seed(42)
n = 1_000_000
raw = pl.DataFrame({
'user_id': np.random.randint(1, 10000, n),
'country': np.random.choice(['US','UK','DE','FR'], n),
'product': np.random.choice([f'p{i}' for i in range(20)], n),
'revenue': np.random.exponential(100, n),
'cost': np.random.exponential(60, n),
})
# TODO: (1) Eager version - filter, group_by, agg
# TODO: (2) Lazy version with schema optimization
# TODO: Benchmark both with time.perf_counter()
# TODO: Print speedup ratio
LazyFrame enables query optimization through predicate pushdown, projection pushdown, and streaming execution. Build full pipelines before calling .collect() to let Polars optimize the execution plan.
LazyFrame with Query Plan Inspection
import polars as pl
import numpy as np
np.random.seed(0)
n = 1_000_000
# Build large lazy dataset
lf = (
pl.LazyFrame({
"id": np.arange(n),
"category": np.random.choice(["A","B","C","D"], n),
"value": np.random.normal(100, 20, n),
"weight": np.random.uniform(0.5, 2.0, n),
})
.filter(pl.col("value") > 60)
.with_columns([
(pl.col("value") * pl.col("weight")).alias("weighted_value"),
pl.col("category").cast(pl.Categorical),
])
.group_by("category")
.agg([
pl.col("weighted_value").mean().alias("mean_wv"),
pl.col("value").std().alias("std_val"),
pl.col("id").count().alias("n"),
])
.sort("mean_wv", descending=True)
)
# Show the query plan
print("Query plan:")
print(lf.explain())
result = lf.collect()
print(result)
Streaming Parquet Processing
import polars as pl
import numpy as np
np.random.seed(1)
n = 500_000
# Streaming scan simulation: process in chunks
df = pl.DataFrame({
"user_id": np.random.randint(1, 1000, n),
"revenue": np.random.lognormal(3, 1, n),
"product": np.random.choice(["X","Y","Z"], n),
"is_premium":np.random.randint(0, 2, n).astype(bool),
})
# Save and reload to test scan_parquet streaming
import tempfile, os
tmp = tempfile.mktemp(suffix=".parquet")
df.write_parquet(tmp)
# Streaming aggregation from parquet
result = (
pl.scan_parquet(tmp)
.filter(pl.col("is_premium"))
.group_by("product")
.agg([
pl.col("revenue").sum().alias("total_rev"),
pl.col("user_id").n_unique().alias("unique_users"),
pl.col("revenue").mean().alias("avg_rev"),
])
.sort("total_rev", descending=True)
.collect(streaming=True)
)
os.unlink(tmp)
print(result)
Lazy vs Eager Benchmark
import polars as pl
import numpy as np
import time
np.random.seed(42)
n = 200_000
df = pl.DataFrame({
"id": np.arange(n),
"x": np.random.randn(n),
"group": np.random.choice([f"G{i}" for i in range(50)], n),
})
# Pattern 1: inefficient (multiple passes)
t0 = time.perf_counter()
for _ in range(3):
_ = df.filter(pl.col("x") > 0).group_by("group").agg(pl.col("x").mean())
t1 = time.perf_counter()
# Pattern 2: single lazy pass with predicate pushdown
t2 = time.perf_counter()
for _ in range(3):
_ = (pl.LazyFrame(df)
.filter(pl.col("x") > 0)
.group_by("group")
.agg(pl.col("x").mean())
.collect())
t3 = time.perf_counter()
print(f"Eager repeated passes: {(t1-t0)*1000:.1f} ms")
print(f"LazyFrame with pushdown: {(t3-t2)*1000:.1f} ms")
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(0)
n = 500_000
df = pl.DataFrame({
"order_id": np.arange(n),
"category": np.random.choice(["Electronics","Clothing","Food","Sports","Books"], n),
"revenue": np.random.lognormal(4, 1, n),
"is_returned":np.random.binomial(1, 0.05, n).astype(bool),
"customer_tier":np.random.choice(["gold","silver","bronze"], n),
})
tmp = tempfile.mktemp(suffix=".parquet")
df.write_parquet(tmp)
result = (
pl.scan_parquet(tmp)
.filter(~pl.col("is_returned"))
.group_by(["category","customer_tier"])
.agg([
pl.col("revenue").sum().alias("total_rev"),
pl.col("revenue").mean().alias("avg_rev"),
pl.col("order_id").count().alias("n_orders"),
])
.sort("total_rev", descending=True)
.collect(streaming=True)
)
os.unlink(tmp)
print(result.head(8))
import polars as pl
import numpy as np
import time
np.random.seed(7)
n = 2_000_000
df = pl.DataFrame({
"transaction_id": np.arange(n),
"customer_id": np.random.randint(1, 10000, n),
"amount": np.random.lognormal(4, 1, n),
"category": np.random.choice(["food","travel","retail","tech","health"], n),
"is_fraud": np.random.binomial(1, 0.02, n).astype(bool),
"day_of_week": np.random.randint(0, 7, n),
})
# TODO: Use LazyFrame to filter fraud transactions, group by category+day_of_week
# TODO: Compute: sum(amount), count, fraud_rate per group
# TODO: Show query plan with .explain()
# TODO: Compare timing: eager vs lazy with collect(streaming=True)
# TODO: Save result to parquet, reload with scan_parquet and verify row count
Polars provides high-performance time series operations: rolling aggregations, resampling with group_by_dynamic, and lag/shift features. Temporal operations run significantly faster than pandas for large datasets.
Rolling Statistics & Time Features
import polars as pl
import numpy as np
np.random.seed(0)
n = 1000
dates = pl.date_range(
pl.date(2022, 1, 1), pl.date(2024, 9, 26), interval="1d", eager=True
)[:n]
df = pl.DataFrame({
"date": dates,
"value": 100 + np.cumsum(np.random.normal(0, 1, n)),
"volume":np.random.poisson(1000, n).astype(float),
})
result = (
df.with_columns([
pl.col("date").dt.year().alias("year"),
pl.col("date").dt.month().alias("month"),
pl.col("date").dt.weekday().alias("dow"),
pl.col("value").rolling_mean(window_size=7).alias("ma7"),
pl.col("value").rolling_mean(window_size=30).alias("ma30"),
pl.col("value").pct_change().alias("return_1d"),
])
.filter(pl.col("ma7").is_not_null())
)
print(result.select(["date","value","ma7","ma30","return_1d"]).head(5))
print(result.select(pl.col("return_1d").std()).item())
Resampling: Weekly & Monthly OHLC
import polars as pl
import numpy as np
np.random.seed(1)
n = 500
dates = pl.date_range(pl.date(2023,1,1), pl.date(2024,5,14), interval="1d", eager=True)[:n]
df = pl.DataFrame({
"date": dates,
"price": 50 + np.cumsum(np.random.normal(0, 0.5, n)),
})
# Resample to weekly and monthly
weekly = (
df.group_by_dynamic("date", every="1w")
.agg([
pl.col("price").first().alias("open"),
pl.col("price").max().alias("high"),
pl.col("price").min().alias("low"),
pl.col("price").last().alias("close"),
pl.col("price").mean().alias("avg"),
])
)
monthly = (
df.group_by_dynamic("date", every="1mo")
.agg([
pl.col("price").last().alias("month_close"),
pl.col("price").std().alias("monthly_vol"),
])
)
print("Weekly OHLC:"); print(weekly.head(4))
print("Monthly volatility:"); print(monthly.head(4))
Lag Features & Rolling Metrics
import polars as pl
import numpy as np
np.random.seed(2)
n = 300
dates = pl.date_range(pl.date(2023,1,1), pl.date(2023,10,28), interval="1d", eager=True)[:n]
df = pl.DataFrame({
"date": dates,
"value": 100 + np.cumsum(np.random.normal(0, 1.5, n)),
})
result = (
df.with_columns([
pl.col("value").shift(1).alias("lag_1"),
pl.col("value").shift(7).alias("lag_7"),
pl.col("value").shift(30).alias("lag_30"),
pl.col("value").rolling_std(window_size=14).alias("vol_14"),
(pl.col("value") / pl.col("value").shift(1) - 1).alias("return_1d"),
pl.col("value").rolling_max(window_size=52).alias("rolling_high_52"),
pl.col("value").rolling_min(window_size=52).alias("rolling_low_52"),
])
.with_columns([
((pl.col("value") - pl.col("rolling_low_52")) /
(pl.col("rolling_high_52") - pl.col("rolling_low_52"))).alias("pct_rank_52w"),
])
.drop_nulls()
)
print(result.select(["date","value","lag_1","vol_14","pct_rank_52w"]).tail(5))
print(f"Rows after dropna: {len(result)}")
import polars as pl
import numpy as np
np.random.seed(1)
n = 365
dates = pl.date_range(pl.date(2023,1,1), pl.date(2023,12,31), interval="1d", eager=True)[:n]
sales = 1000 + 200*np.sin(2*np.pi*np.arange(n)/7) + np.random.normal(0, 80, n)
sales[100] += 800 # spike anomaly
df = pl.DataFrame({"date": dates, "sales": sales})
result = (
df.with_columns([
pl.col("sales").rolling_mean(window_size=7).alias("ma7"),
pl.col("sales").rolling_mean(window_size=30).alias("ma30"),
pl.col("sales").rolling_std(window_size=30).alias("std30"),
])
.with_columns([
((pl.col("sales") - pl.col("ma30")) / pl.col("std30")).alias("z_score"),
])
.with_columns([
(pl.col("z_score").abs() > 2).alias("anomaly"),
])
)
anomalies = result.filter(pl.col("anomaly"))
print(f"Anomalies detected: {len(anomalies)}")
print(anomalies.select(["date","sales","ma30","z_score"]))
weekly = df.group_by_dynamic("date", every="1w").agg(pl.col("sales").sum().alias("weekly_total"))
print(weekly.head(5))
import polars as pl
import numpy as np
np.random.seed(99)
n = 730 # 2 years daily
dates = pl.date_range(pl.date(2022,1,1), pl.date(2023,12,31), interval="1d", eager=True)[:n]
# Multi-asset time series
df = pl.DataFrame({
"date": dates,
"AAPL": 100 + np.cumsum(np.random.normal(0.05, 1.2, n)),
"MSFT": 200 + np.cumsum(np.random.normal(0.08, 1.5, n)),
"GOOG": 90 + np.cumsum(np.random.normal(0.03, 1.0, n)),
})
# TODO: Add rolling 20d and 50d MA for each stock
# TODO: Add daily returns for each stock
# TODO: Resample to weekly OHLC for AAPL
# TODO: Add 52-week high/low and % distance from 52w high
# TODO: Monthly summary: avg return and volatility per stock
Polars is built on Apache Arrow, enabling zero-copy interop with PyArrow, DuckDB, and other Arrow-native tools. Write partitioned Parquet files for efficient partial reads in production data pipelines.
Polars <-> PyArrow Interoperability
import polars as pl
import pyarrow as pa
import numpy as np
np.random.seed(0)
n = 100_000
# Create Polars DataFrame and convert to Arrow
df = pl.DataFrame({
"id": np.arange(n),
"score": np.random.randn(n),
"category":np.random.choice(["A","B","C"], n),
"amount": np.random.lognormal(4, 1, n),
})
# Polars -> Arrow -> Polars (zero-copy where possible)
arrow_table = df.to_arrow()
print(f"Arrow schema: {arrow_table.schema}")
print(f"Num chunks: {arrow_table.column('score').num_chunks}")
# Compute with pyarrow
import pyarrow.compute as pc
mean_score = pc.mean(arrow_table.column("score"))
print(f"Mean score via pyarrow: {mean_score.as_py():.6f}")
# Back to Polars
df2 = pl.from_arrow(arrow_table)
print(f"Roundtrip OK: {df.shape == df2.shape}")
Parquet vs CSV: Speed & Size Benchmark
import polars as pl
import numpy as np
import tempfile, os, time
np.random.seed(1)
n = 500_000
df = pl.DataFrame({
"id": np.arange(n),
"group": np.random.choice(["X","Y","Z","W"], n),
"value": np.random.randn(n),
"amount": np.random.lognormal(3, 1.5, n),
"flag": np.random.randint(0, 2, n).astype(bool),
})
tmp_parquet = tempfile.mktemp(suffix=".parquet")
tmp_csv = tempfile.mktemp(suffix=".csv")
# Write and read Parquet vs CSV
t0 = time.perf_counter()
df.write_parquet(tmp_parquet, compression="snappy")
t1 = time.perf_counter()
df.write_csv(tmp_csv)
t2 = time.perf_counter()
df_pq = pl.read_parquet(tmp_parquet)
t3 = time.perf_counter()
df_csv = pl.read_csv(tmp_csv)
t4 = time.perf_counter()
pq_mb = os.path.getsize(tmp_parquet) / 1e6
csv_mb = os.path.getsize(tmp_csv) / 1e6
print(f"Parquet: write={t1-t0:.3f}s read={t3-t2:.3f}s size={pq_mb:.1f}MB")
print(f"CSV: write={t2-t1:.3f}s read={t4-t3:.3f}s size={csv_mb:.1f}MB")
for f in [tmp_parquet, tmp_csv]: os.unlink(f)
Partitioned Parquet with PyArrow + Polars scan
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import tempfile, os
np.random.seed(2)
n = 200_000
df = pl.DataFrame({
"year": np.random.choice([2021,2022,2023], n),
"region": np.random.choice(["NA","EU","APAC"], n),
"product":np.random.choice(["A","B","C","D"], n),
"revenue":np.random.lognormal(5, 1, n),
"units": np.random.poisson(50, n),
})
# Write partitioned parquet (by year+region) via pyarrow
arrow_table = df.to_arrow()
tmpdir = tempfile.mkdtemp()
pq.write_to_dataset(arrow_table, root_path=tmpdir,
partition_cols=["year","region"])
# Read only 2023 NA partition with Polars scan
result = (
pl.scan_parquet(f"{tmpdir}/year=2023/region=NA/**/*.parquet")
.group_by("product")
.agg(pl.col("revenue").sum(), pl.col("units").sum())
.collect()
)
print(f"2023 NA partition result:"); print(result)
import shutil; shutil.rmtree(tmpdir)
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import tempfile, os, shutil
np.random.seed(5)
n = 200_000
df = pl.DataFrame({
"year": np.random.choice([2021,2022,2023], n),
"device_type": np.random.choice(["sensor","camera","gateway"], n),
"timestamp_h": np.random.randint(0, 24, n),
"temperature": np.random.normal(22, 5, n),
"humidity": np.random.normal(60, 10, n),
"power_kw": np.random.lognormal(1, 0.5, n),
})
tmpdir = tempfile.mkdtemp()
pq.write_to_dataset(df.to_arrow(), root_path=tmpdir,
partition_cols=["year","device_type"])
# Read only 2023 sensors
result = (
pl.scan_parquet(f"{tmpdir}/year=2023/device_type=sensor/**/*.parquet")
.group_by("timestamp_h")
.agg([
pl.col("temperature").mean().alias("avg_temp"),
pl.col("power_kw").sum().alias("total_power"),
pl.col("humidity").std().alias("hum_std"),
])
.sort("timestamp_h")
.collect()
)
print(f"2023 sensor hourly aggregates ({len(result)} hours):")
print(result.head(5))
shutil.rmtree(tmpdir)
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import tempfile, os, time
np.random.seed(33)
n = 1_000_000
df = pl.DataFrame({
"date": np.random.choice(["2022","2023","2024"], n),
"country": np.random.choice(["US","UK","DE","FR","JP"], n),
"category": np.random.choice(["A","B","C","D","E"], n),
"sales": np.random.lognormal(4, 1.5, n),
"cost": np.random.lognormal(3.5, 1.2, n),
"units": np.random.poisson(100, n),
})
# TODO: Write partitioned parquet by date+country using pyarrow
# TODO: Read only 2024 US partition using pl.scan_parquet
# TODO: Compute: total sales, total cost, margin, units per category for 2024 US
# TODO: Benchmark: read whole parquet vs partitioned scan for single partition
# TODO: Convert to Arrow table and compute gross_margin with pyarrow.compute
Polars LazyFrames build a query plan without executing it. Calling .collect() triggers optimized execution with predicate pushdown, projection pushdown, and parallel execution.
LazyFrame Basics
import polars as pl
import numpy as np
np.random.seed(42)
n = 10_000
# Create a DataFrame
df = pl.DataFrame({
"id": range(n),
"category": np.random.choice(["A", "B", "C", "D"], n),
"value": np.random.randn(n) * 100,
"quantity": np.random.randint(1, 100, n),
"active": np.random.choice([True, False], n),
})
# Eager (immediate execution)
eager_result = (df
.filter(pl.col("active"))
.filter(pl.col("value") > 0)
.select(["category", "value", "quantity"])
.group_by("category")
.agg(pl.col("value").mean().alias("avg_value"),
pl.col("quantity").sum().alias("total_qty"))
)
print("Eager result:")
print(eager_result.sort("category"))
# Lazy (deferred execution)
lazy_result = (df.lazy()
.filter(pl.col("active"))
.filter(pl.col("value") > 0)
.select(["category", "value", "quantity"])
.group_by("category")
.agg(pl.col("value").mean().alias("avg_value"),
pl.col("quantity").sum().alias("total_qty"))
.collect() # execute here
)
print("\nLazy result (same output):")
print(lazy_result.sort("category"))
print(f"\nResults match: {eager_result.sort('category').equals(lazy_result.sort('category'))}")
Query Plan & Optimization
import polars as pl
import numpy as np
np.random.seed(42)
n = 50_000
df = pl.DataFrame({
"user_id": range(n),
"age": np.random.randint(18, 70, n),
"country": np.random.choice(["US", "UK", "DE", "FR", "JP"], n),
"purchase_amount": np.random.exponential(50, n),
"is_premium": np.random.choice([True, False], n),
})
# Build lazy query
q = (df.lazy()
.filter(pl.col("is_premium") & (pl.col("age") >= 25))
.filter(pl.col("purchase_amount") > 20)
.select(["user_id", "country", "purchase_amount", "age"])
.group_by("country")
.agg([
pl.col("purchase_amount").mean().alias("avg_purchase"),
pl.col("purchase_amount").max().alias("max_purchase"),
pl.len().alias("n_users"),
])
.sort("avg_purchase", descending=True)
)
# Inspect the query plan
print("Optimized query plan:")
print(q.explain(optimized=True))
# Execute
result = q.collect()
print("\nResult:")
print(result)
Streaming Large Datasets
import polars as pl
import numpy as np, tempfile, os
np.random.seed(42)
# Create a large CSV file for streaming demo
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
csv_path = f.name
f.write("id,category,value,quantity\n")
for i in range(100_000):
cat = np.random.choice(["X","Y","Z"])
val = round(np.random.randn()*100, 2)
qty = np.random.randint(1, 50)
f.write(f"{i},{cat},{val},{qty}\n")
# Stream processing: process without loading all data into memory
result = (
pl.scan_csv(csv_path) # scan_csv returns LazyFrame
.filter(pl.col("value") > 0)
.group_by("category")
.agg([
pl.col("value").sum().alias("total_value"),
pl.col("quantity").mean().alias("avg_qty"),
pl.len().alias("count"),
])
.sort("total_value", descending=True)
.collect(streaming=True) # stream=True for memory-efficient processing
)
print(f"File size: {os.path.getsize(csv_path)/1024:.0f} KB")
print("Streaming result:")
print(result)
os.unlink(csv_path)
import polars as pl
import numpy as np, tempfile, os
np.random.seed(42)
n = 50_000
# Simulate log data
endpoints = ["/api/users", "/api/orders", "/api/products", "/api/search", "/api/auth"]
logs = pl.DataFrame({
"timestamp": pl.Series([f"2024-01-15 {h:02d}:{m:02d}:{s:02d}"
for h, m, s in zip(
np.random.randint(0, 24, n),
np.random.randint(0, 60, n),
np.random.randint(0, 60, n))]),
"endpoint": np.random.choice(endpoints, n),
"status_code": np.random.choice([200, 200, 200, 400, 404, 500], n),
"response_ms": np.random.exponential(100, n),
"user_id": np.random.randint(1, 10000, n),
})
# Process with LazyFrames
result = (logs.lazy()
.filter(pl.col("status_code") < 400) # only successful requests
.with_columns(
pl.col("timestamp").str.slice(11, 2).cast(pl.Int32).alias("hour")
)
.group_by(["endpoint", "hour"])
.agg([
pl.len().alias("request_count"),
pl.col("response_ms").mean().round(2).alias("avg_ms"),
pl.col("response_ms").quantile(0.95).round(2).alias("p95_ms"),
])
.sort("avg_ms", descending=True)
.collect()
)
print("Top slowest endpoint/hour combos:")
print(result.head(10))
# Top slowest endpoints overall
slowest = (logs.lazy()
.group_by("endpoint")
.agg(pl.col("response_ms").mean().round(2).alias("avg_ms"),
pl.col("response_ms").quantile(0.99).round(2).alias("p99_ms"),
pl.len().alias("total_requests"))
.sort("avg_ms", descending=True)
.collect())
print("\nEndpoint Performance:")
print(slowest)
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
"category": np.random.choice(["A","B","C"], 1000),
"x": np.random.randn(1000),
"y": np.random.randint(1, 100, 1000),
})
# 1. df.lazy()
# 2. .filter(pl.col("x") > 0)
# 3. .with_columns((pl.col("x") * pl.col("y")).alias("xy"))
# 4. .group_by("category").agg(...)
# 5. print .explain() before .collect()
Polars GroupBy supports parallel multi-aggregation, dynamic grouping, rolling windows, and complex expressions that outperform pandas groupby significantly.
Multi-Column & Complex Aggregations
import polars as pl
import numpy as np
np.random.seed(42)
n = 5000
df = pl.DataFrame({
"date": pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 12, 31), "1d", eager=True)[:n % 366].extend(
pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 12, 31), "1d", eager=True)
)[:n],
"product": np.random.choice(["Widget", "Gadget", "Doohickey", "Thingamajig"], n),
"region": np.random.choice(["North", "South", "East", "West"], n),
"sales": np.random.exponential(100, n).round(2),
"units": np.random.randint(1, 50, n),
"returned": np.random.choice([True, False], n, p=[0.1, 0.9]),
})
# Multi-column groupby with complex aggregations
result = df.group_by(["product", "region"]).agg([
pl.col("sales").sum().alias("total_sales"),
pl.col("sales").mean().round(2).alias("avg_sale"),
pl.col("sales").std().round(2).alias("std_sale"),
pl.col("units").sum().alias("total_units"),
pl.col("returned").mean().round(4).alias("return_rate"),
pl.len().alias("n_transactions"),
pl.col("sales").quantile(0.9).round(2).alias("p90_sale"),
]).sort(["product", "total_sales"], descending=[False, True])
print("Product x Region aggregation:")
print(result.head(8))
print(f"\nTotal rows: {result.shape[0]}")
Rolling & Dynamic GroupBy
import polars as pl
import numpy as np
np.random.seed(42)
n = 200
dates = pl.date_range(pl.date(2024, 1, 1), period=f"{n}d", interval="1d", eager=True)
df = pl.DataFrame({
"date": dates,
"value": np.cumsum(np.random.randn(n)) + 50,
"volume": np.random.randint(100, 1000, n),
})
# Rolling aggregations (7-day moving average)
df_rolling = df.with_columns([
pl.col("value").rolling_mean(window_size=7).alias("ma7"),
pl.col("value").rolling_mean(window_size=30).alias("ma30"),
pl.col("value").rolling_std(window_size=7).alias("std7"),
pl.col("volume").rolling_sum(window_size=7).alias("vol7"),
])
print("Rolling aggregations:")
print(df_rolling.tail(5))
# Dynamic groupby (group by month)
df_monthly = (df
.with_columns(pl.col("date").dt.month().alias("month"))
.group_by("month")
.agg([
pl.col("value").mean().round(2).alias("avg_val"),
pl.col("value").max().alias("max_val"),
pl.col("volume").sum().alias("total_vol"),
])
.sort("month")
)
print("\nMonthly aggregation:")
print(df_monthly)
Conditional Aggregations & Pivot
import polars as pl
import numpy as np
np.random.seed(42)
n = 1000
df = pl.DataFrame({
"quarter": np.random.choice(["Q1","Q2","Q3","Q4"], n),
"product": np.random.choice(["A","B","C"], n),
"sales": np.random.exponential(100, n).round(2),
"target": np.random.uniform(80, 120, n).round(2),
})
# Conditional aggregation: count by condition
result = df.group_by(["quarter", "product"]).agg([
pl.col("sales").sum().alias("total_sales"),
(pl.col("sales") > pl.col("target")).sum().alias("above_target"),
pl.len().alias("n"),
((pl.col("sales") > pl.col("target")).sum() / pl.len()).round(3).alias("hit_rate"),
])
print("Conditional aggregation:")
print(result.sort(["quarter", "product"]).head(8))
# Pivot: reshape data (like pandas pivot_table)
pivot = (df.group_by(["quarter", "product"])
.agg(pl.col("sales").sum().alias("total"))
.pivot(index="quarter", on="product", values="total", aggregate_function="sum")
.sort("quarter"))
print("\nPivot table (quarter x product sales):")
print(pivot)
import polars as pl
import numpy as np
np.random.seed(42)
n = 10_000
dates = pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 12, 31), "1d", eager=True)
df = pl.DataFrame({
"order_date": np.random.choice(dates.to_list(), n),
"customer_id": np.random.randint(1, 1000, n),
"category": np.random.choice(["Electronics","Clothing","Books","Sports","Home"], n),
"amount": np.random.exponential(80, n).round(2),
"returned": np.random.choice([False, True], n, p=[0.88, 0.12]),
})
# Weekly revenue by category
weekly = (df.lazy()
.with_columns(pl.col("order_date").dt.week().alias("week"))
.filter(~pl.col("returned"))
.group_by(["week", "category"])
.agg(pl.col("amount").sum().round(2).alias("revenue"), pl.len().alias("orders"))
.sort(["week", "category"])
.collect()
)
print("Weekly revenue (first 10 rows):")
print(weekly.head(10))
# Customer lifetime value
clv = (df.lazy()
.group_by("customer_id")
.agg([
pl.col("amount").sum().round(2).alias("ltv"),
pl.len().alias("total_orders"),
pl.col("returned").mean().round(3).alias("return_rate"),
pl.col("amount").mean().round(2).alias("avg_order"),
])
.sort("ltv", descending=True)
.head(10)
.collect()
)
print("\nTop 10 Customers by LTV:")
print(clv)
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
"region": np.random.choice(["North","South","East","West"], 500),
"product": np.random.choice(["A","B","C"], 500),
"sales": np.random.exponential(100, 500).round(2),
"returned": np.random.choice([True, False], 500),
})
# 1. group_by(["region", "product"])
# 2. agg sum, mean, std of sales + count of returned
# 3. Pivot to show product as columns
Polars supports all join types (inner, left, outer, cross, semi, anti) with parallel execution. Join strategies (hash, sort-merge) are auto-selected based on data characteristics.
Inner, Left & Outer Joins
import polars as pl
import numpy as np
np.random.seed(42)
# Customers table
customers = pl.DataFrame({
"customer_id": range(1, 11),
"name": [f"Customer_{i}" for i in range(1, 11)],
"tier": np.random.choice(["Gold", "Silver", "Bronze"], 10),
})
# Orders table
orders = pl.DataFrame({
"order_id": range(1, 21),
"customer_id": np.random.choice(range(1, 14), 20), # some customers missing
"amount": np.random.exponential(100, 20).round(2),
"status": np.random.choice(["completed", "pending", "cancelled"], 20),
})
# Inner join: only matching customers
inner = customers.join(orders, on="customer_id", how="inner")
print(f"Inner join: {len(inner)} rows (customers with orders)")
# Left join: all customers, NaN for missing orders
left = customers.join(orders, on="customer_id", how="left")
print(f"Left join: {len(left)} rows (all customers)")
print(f" Customers with no orders: {left.filter(pl.col('order_id').is_null()).shape[0]}")
# Full/outer join
outer = customers.join(orders, on="customer_id", how="full")
print(f"Full join: {len(outer)} rows")
# Aggregation join: aggregate before joining
order_summary = (orders
.group_by("customer_id")
.agg([pl.col("amount").sum().alias("total_spend"),
pl.len().alias("order_count")])
)
enriched = customers.join(order_summary, on="customer_id", how="left")
print("\nCustomer enriched data:")
print(enriched.head(5))
Semi, Anti Joins & Asof Join
import polars as pl
import numpy as np
np.random.seed(42)
# Product table
products = pl.DataFrame({
"product_id": range(1, 21),
"name": [f"Product_{i}" for i in range(1, 21)],
"price": np.random.uniform(10, 200, 20).round(2),
"category": np.random.choice(["Electronics","Clothing","Books"], 20),
})
# Sold products (subset)
sold = pl.DataFrame({
"product_id": np.random.choice(range(1, 21), 12, replace=False),
"units_sold": np.random.randint(5, 100, 12),
})
# Semi-join: products that have been sold (filtering join)
has_sold = products.join(sold, on="product_id", how="semi")
print(f"Products sold: {len(has_sold)}")
# Anti-join: products that have NOT been sold
never_sold = products.join(sold, on="product_id", how="anti")
print(f"Products never sold: {len(never_sold)}")
print(never_sold.select(["product_id", "name", "price"]).head(5))
# Cross join (cartesian product)
sizes = pl.DataFrame({"size": ["S", "M", "L", "XL"]})
colors = pl.DataFrame({"color": ["Red", "Blue", "Green"]})
variants = sizes.join(colors, how="cross")
print(f"\nProduct variants (cross join): {len(variants)} combinations")
print(variants.head(6))
Join Performance Tips
import polars as pl
import numpy as np, time
np.random.seed(42)
n = 100_000
# Large DataFrames for join performance demo
df_left = pl.DataFrame({
"id": np.arange(n),
"value_a": np.random.randn(n),
"group": np.random.choice(list("ABCDE"), n),
})
df_right = pl.DataFrame({
"id": np.random.randint(0, n, n),
"value_b": np.random.randn(n),
"score": np.random.uniform(0, 1, n),
})
# Standard join
t0 = time.time()
result = df_left.join(df_right, on="id", how="inner")
t1 = time.time()
print(f"Inner join ({len(result):,} rows): {(t1-t0)*1000:.1f}ms")
# Join with pre-filtering (reduce data first)
t0 = time.time()
left_filtered = df_left.filter(pl.col("group").is_in(["A", "B"]))
right_filtered = df_right.filter(pl.col("score") > 0.5)
result_filtered = left_filtered.join(right_filtered, on="id", how="inner")
t1 = time.time()
print(f"Pre-filtered join ({len(result_filtered):,} rows): {(t1-t0)*1000:.1f}ms")
# Multiple join keys
df_multi_left = pl.DataFrame({"key1": list("AABBC"), "key2": [1,2,1,2,1], "val": range(5)})
df_multi_right = pl.DataFrame({"key1": list("AABC"), "key2": [1,2,2,1], "label": list("WXYZ")})
multi_join = df_multi_left.join(df_multi_right, on=["key1", "key2"], how="left")
print("\nMulti-key join:")
print(multi_join)
import polars as pl
import numpy as np
np.random.seed(42)
n_cust = 500
customers = pl.DataFrame({
"customer_id": range(n_cust),
"name": [f"Cust_{i}" for i in range(n_cust)],
"join_date": pl.date_range(pl.date(2022,1,1), pl.date(2023,12,31), "1d", eager=True)[:n_cust],
})
orders = pl.DataFrame({
"order_id": range(2000),
"customer_id": np.random.randint(0, n_cust, 2000),
"product_id": np.random.randint(0, 50, 2000),
"amount": np.random.exponential(80, 2000).round(2),
})
products = pl.DataFrame({
"product_id": range(50),
"category": np.random.choice(["Electronics","Clothing","Books","Sports"], 50),
"price": np.random.uniform(10, 500, 50).round(2),
})
reviews = pl.DataFrame({
"customer_id": np.random.randint(0, n_cust, 1000),
"rating": np.random.randint(1, 6, 1000),
})
# Build customer 360 view
order_stats = (orders.lazy()
.join(products.lazy(), on="product_id")
.group_by("customer_id")
.agg([
pl.col("amount").sum().round(2).alias("total_spend"),
pl.len().alias("n_orders"),
pl.col("category").mode().first().alias("fav_category"),
])
)
review_stats = (reviews.lazy()
.group_by("customer_id")
.agg(pl.col("rating").mean().round(2).alias("avg_rating"))
)
customer360 = (customers.lazy()
.join(order_stats, on="customer_id", how="left")
.join(review_stats, on="customer_id", how="left")
.sort("total_spend", descending=True)
.collect()
)
print("Customer 360 View (top 10):")
print(customer360.head(10))
import polars as pl
import numpy as np
np.random.seed(42)
users = pl.DataFrame({"user_id": range(10), "name": [f"U{i}" for i in range(10)]})
orders = pl.DataFrame({"order_id": range(15), "user_id": np.random.randint(0, 12, 15), "amount": np.random.randn(15)})
products = pl.DataFrame({"product_id": range(5), "category": list("AABBC")})
# 1. Inner join users and orders
# 2. Left join result with products (on a fake key)
# 3. Anti-join: find users with no orders
Polars string operations are vectorized and run in parallel. The .str namespace provides split, replace, extract, slice, strip, and regex operations optimized for large text columns.
String Manipulation
import polars as pl
import numpy as np
# Sample text data
df = pl.DataFrame({
"email": ["alice@example.com", "BOB.SMITH@Gmail.COM", " charlie@test.org ",
"invalid-email", "diana.prince@hero.net"],
"full_name": ["Alice Johnson", "Bob Smith", "Charlie Brown",
"Diana Prince", "Eve Williams"],
"phone": ["(555) 123-4567", "555.234.5678", "1-555-345-6789",
"5554567890", "555 456 7890"],
"description": [" Product is great! ", "NEEDS improvement",
"works as expected...", " EXCELLENT quality ", "good value"],
})
# String operations
result = df.with_columns([
pl.col("email").str.to_lowercase().str.strip_chars().alias("email_clean"),
pl.col("full_name").str.split(" ").list.first().alias("first_name"),
pl.col("full_name").str.split(" ").list.last().alias("last_name"),
pl.col("description").str.strip_chars().str.to_lowercase().alias("desc_clean"),
pl.col("phone").str.replace_all(r"[^\d]", "").alias("phone_digits"),
])
print("String operations result:")
print(result.select(["email_clean", "first_name", "last_name", "phone_digits", "desc_clean"]))
# String contains/starts_with/ends_with
email_check = df.with_columns([
pl.col("email").str.contains("@").alias("has_at"),
pl.col("email").str.ends_with(".com").alias("is_dotcom"),
pl.col("email").str.contains("@").alias("is_valid"),
])
print("\nEmail validation:")
print(email_check.select(["email", "has_at", "is_dotcom", "is_valid"]))
Regex Extraction & Pattern Matching
import polars as pl
# Text data with structured patterns
df = pl.DataFrame({
"log_entry": [
"2024-01-15 10:23:45 ERROR user_id=123 msg=Login failed",
"2024-01-15 10:24:01 INFO user_id=456 msg=Login success",
"2024-01-15 10:25:12 WARN user_id=789 msg=Rate limit exceeded",
"2024-01-15 10:26:33 ERROR user_id=101 msg=DB connection timeout",
"2024-01-15 10:27:55 INFO user_id=202 msg=Profile updated",
]
})
# Extract structured data from log entries
result = df.with_columns([
# Extract date and time
pl.col("log_entry").str.extract(r"(\d{4}-\d{2}-\d{2})", 1).alias("date"),
pl.col("log_entry").str.extract(r"(\d{2}:\d{2}:\d{2})", 1).alias("time"),
# Extract log level
pl.col("log_entry").str.extract(r"(ERROR|WARN|INFO|DEBUG)", 1).alias("level"),
# Extract user_id
pl.col("log_entry").str.extract(r"user_id=(\d+)", 1).cast(pl.Int32).alias("user_id"),
# Extract message
pl.col("log_entry").str.extract(r"msg=(.+)$", 1).alias("message"),
])
print("Extracted log fields:")
print(result.drop("log_entry"))
# Count pattern matches
df2 = pl.DataFrame({
"text": ["apple pie and apple juice", "banana split", "apple apple apple", "cherry"]
})
df2 = df2.with_columns(
pl.col("text").str.count_matches("apple").alias("apple_count")
)
print("\nPattern count:")
print(df2)
String Splitting & Parsing
import polars as pl
df = pl.DataFrame({
"csv_row": ["Alice,30,Engineer,New York",
"Bob,25,Designer,Los Angeles",
"Charlie,35,Manager,Chicago"],
"tags": ["python|pandas|polars", "javascript|react|node", "python|sklearn|pytorch"],
"path": ["/home/user/data/2024/jan/sales.csv",
"/home/user/data/2024/feb/orders.parquet",
"/home/user/data/2024/mar/logs.json"],
})
# Split CSV string into columns
parsed = (df
.with_columns(pl.col("csv_row").str.split(",").alias("parts"))
.with_columns([
pl.col("parts").list.get(0).alias("name"),
pl.col("parts").list.get(1).cast(pl.Int32).alias("age"),
pl.col("parts").list.get(2).alias("role"),
pl.col("parts").list.get(3).alias("city"),
])
.drop(["parts", "csv_row"])
)
print("Parsed CSV columns:")
print(parsed)
# Tags: split and explode
tags_exploded = (df
.with_columns(pl.col("tags").str.split("|"))
.explode("tags")
.rename({"tags": "tag"})
.select(["tag"])
.group_by("tag")
.agg(pl.len().alias("count"))
.sort("count", descending=True)
)
print("\nTag frequency:")
print(tags_exploded)
# Extract filename from path
df_paths = df.with_columns(
pl.col("path").str.split("/").list.last().alias("filename")
)
print("\nFilenames:", df_paths["filename"].to_list())
import polars as pl
import numpy as np
np.random.seed(42)
# Raw product descriptions
descriptions = [
"Apple iPhone 15 Pro 256GB - Price: $999.99 | Category: Smartphones",
"Samsung Galaxy S24 Ultra 512GB for $1199 in Electronics category",
"Sony WH-1000XM5 Headphones - Electronics - USD 349.99",
"Nike Air Max 2024 Running Shoes - Sports - Price $129.95",
"Bosch Professional Drill Set - Tools - $89.99 - Model: GSR18V",
"Canon EOS R50 Camera Body - Photography - Price: $679.00",
]
df = pl.DataFrame({"raw": descriptions * 100}) # simulate 600 products
result = df.with_columns([
# Extract brand (first word, capitalized)
pl.col("raw").str.extract(r"^([A-Z][a-zA-Z]+)", 1).alias("brand"),
# Extract price
pl.col("raw").str.extract(r"\$([\d,]+\.\d{2})", 1)
.str.replace(",", "").cast(pl.Float64).alias("price"),
# Extract category
pl.col("raw").str.extract(
r"(Smartphones|Electronics|Sports|Tools|Photography|Clothing)", 1
).alias("category"),
])
print("Extracted product data:")
print(result.drop("raw").head(6))
print("\nPrice statistics by category:")
print(result.group_by("category").agg([
pl.col("price").mean().round(2).alias("avg_price"),
pl.col("price").min().alias("min_price"),
pl.col("price").max().alias("max_price"),
pl.len().alias("count"),
]).sort("avg_price", descending=True))
import polars as pl
logs = pl.DataFrame({"line": [
"2024-01-15 ERROR Something went wrong",
"2024-01-16 INFO Process completed",
"2024-01-17 WARN Memory usage high",
]})
# 1. Extract date with r"(\d{4}-\d{2}-\d{2})"
# 2. Extract level with r"(ERROR|WARN|INFO)"
# 3. Extract message (everything after level)
# 4. Print result without original column
Polars .dt namespace provides fast datetime operations: extraction, arithmetic, truncation, and timezone handling. All operations are vectorized and work on Series natively.
Date Extraction & Arithmetic
import polars as pl
import numpy as np
from datetime import date, timedelta
# Create datetime data
np.random.seed(42)
n = 200
start = date(2024, 1, 1)
dates = [start + timedelta(days=int(d)) for d in np.random.randint(0, 365, n)]
df = pl.DataFrame({
"event_date": pl.Series(dates),
"sales": np.random.exponential(100, n).round(2),
"returns": np.random.randint(0, 5, n),
})
# Date component extraction
df_with_parts = df.with_columns([
pl.col("event_date").dt.year().alias("year"),
pl.col("event_date").dt.month().alias("month"),
pl.col("event_date").dt.day().alias("day"),
pl.col("event_date").dt.weekday().alias("weekday"), # 0=Monday
pl.col("event_date").dt.week().alias("week_num"),
pl.col("event_date").dt.quarter().alias("quarter"),
])
print("Date components:")
print(df_with_parts.head(3))
# Date arithmetic
df_with_calc = df.with_columns([
(pl.col("event_date") + pl.duration(days=30)).alias("due_date"),
(pl.lit(date(2024, 12, 31)) - pl.col("event_date")).dt.total_days().alias("days_until_eoy"),
])
print("\nDate arithmetic:")
print(df_with_calc.select(["event_date", "due_date", "days_until_eoy"]).head(3))
# Groupby month
monthly = df.group_by(pl.col("event_date").dt.month().alias("month")).agg(
pl.col("sales").sum().round(2).alias("total_sales"),
pl.len().alias("n_transactions")
).sort("month")
print("\nMonthly summary:")
print(monthly)
Time Series Resampling & Windows
import polars as pl
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n = 1000
# Minute-level timestamp data
start = datetime(2024, 1, 1, 0, 0, 0)
timestamps = [start + timedelta(minutes=i*5 + np.random.randint(0, 5)) for i in range(n)]
df = pl.DataFrame({
"ts": pl.Series(timestamps).cast(pl.Datetime),
"temp": 20 + np.cumsum(np.random.randn(n) * 0.5),
"pressure": 1013 + np.cumsum(np.random.randn(n) * 0.2),
})
# Truncate to hourly resolution
df_hourly = df.with_columns(
pl.col("ts").dt.truncate("1h").alias("hour")
).group_by("hour").agg([
pl.col("temp").mean().round(2).alias("avg_temp"),
pl.col("temp").min().alias("min_temp"),
pl.col("temp").max().alias("max_temp"),
pl.col("pressure").mean().round(2).alias("avg_pressure"),
pl.len().alias("n_readings"),
]).sort("hour")
print("Hourly resampled data (first 5 hours):")
print(df_hourly.head(5))
# Daily summary
df_daily = df.with_columns(
pl.col("ts").dt.date().alias("date")
).group_by("date").agg([
pl.col("temp").mean().round(2).alias("daily_avg_temp"),
pl.col("temp").std().round(3).alias("temp_std"),
]).sort("date")
print("\nDaily summary (first 3 days):")
print(df_daily.head(3))
Timezone Handling & Duration
import polars as pl
from datetime import datetime, timezone, timedelta
# Timezone-aware datetime operations
timestamps_utc = pl.Series([
datetime(2024, 3, 15, 12, 0, 0),
datetime(2024, 6, 15, 18, 30, 0),
datetime(2024, 12, 1, 8, 45, 0),
]).cast(pl.Datetime).dt.replace_time_zone("UTC")
df = pl.DataFrame({
"event_utc": timestamps_utc,
"duration_sec": pl.Series([3600, 7200, 1800]), # seconds
})
# Convert to different timezones
df_tz = df.with_columns([
pl.col("event_utc").dt.convert_time_zone("America/New_York").alias("event_eastern"),
pl.col("event_utc").dt.convert_time_zone("Europe/London").alias("event_london"),
pl.col("event_utc").dt.convert_time_zone("Asia/Tokyo").alias("event_tokyo"),
])
print("Multi-timezone timestamps:")
print(df_tz.select(["event_utc", "event_eastern", "event_tokyo"]))
# Duration calculations
df_duration = df.with_columns([
pl.duration(seconds=pl.col("duration_sec")).alias("duration"),
(pl.col("event_utc") + pl.duration(hours=2)).alias("event_plus_2h"),
])
print("\nDuration operations:")
print(df_duration.select(["event_utc", "duration", "event_plus_2h"]))
# Business day calculation (using weekday filter)
n_workdays = (pl.date_range(pl.date(2024, 1, 1), pl.date(2024, 1, 31), "1d", eager=True)
.filter(pl.Series([d.weekday() < 5 for d in pl.date_range(
pl.date(2024, 1, 1), pl.date(2024, 1, 31), "1d", eager=True).to_list()]))
.len())
print(f"\nWorkdays in January 2024: {n_workdays}")
import polars as pl
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n = 2000
start = datetime(2024, 1, 1)
ts = [start + timedelta(minutes=i) for i in range(n)]
df = pl.DataFrame({
"timestamp": pl.Series(ts).cast(pl.Datetime),
"temperature": 25 + np.cumsum(np.random.randn(n)*0.3),
"humidity": 60 + np.cumsum(np.random.randn(n)*0.2),
"sensor_id": np.random.choice(["S1","S2","S3"], n),
})
# Add some anomalies
anomaly_idx = np.random.choice(n, 20, replace=False)
df = df.with_row_index().with_columns([
pl.when(pl.col("index").is_in(anomaly_idx.tolist()))
.then(pl.col("temperature") + 15)
.otherwise(pl.col("temperature"))
.alias("temperature")
]).drop("index")
# Hourly statistics
hourly = (df.with_columns(pl.col("timestamp").dt.truncate("1h").alias("hour"))
.group_by(["hour", "sensor_id"])
.agg([
pl.col("temperature").mean().round(2).alias("avg_temp"),
pl.col("temperature").std().round(3).alias("std_temp"),
pl.col("humidity").mean().round(2).alias("avg_humidity"),
pl.len().alias("n_readings"),
])
.sort(["hour", "sensor_id"])
)
print("Hourly sensor statistics (first 6 rows):")
print(hourly.head(6))
import polars as pl
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n = 500
ts = [datetime(2024,1,1) + timedelta(minutes=i*10) for i in range(n)]
df = pl.DataFrame({
"ts": pl.Series(ts).cast(pl.Datetime),
"value": np.random.randn(n) * 10 + 50,
})
# 1. Extract day: pl.col("ts").dt.date()
# 2. Group by day, compute mean/min/max
# 3. Add year, month, weekday columns
Polars window functions apply expressions over groups without collapsing rows. They enable rankings, cumulative sums, lag/lead features, and partition-based calculations in a single pass.
Ranking & Cumulative Window Functions
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
"date": pl.date_range(pl.date(2024, 1, 1), period="30d", interval="1d", eager=True),
"product": np.tile(["A", "B", "C"], 10),
"region": np.random.choice(["East", "West"], 30),
"sales": np.random.exponential(100, 30).round(2),
})
# Ranking within groups (dense_rank)
df_ranked = df.with_columns([
pl.col("sales").rank("dense", descending=True).over("product").alias("rank_in_product"),
pl.col("sales").rank("ordinal", descending=True).over("region").alias("rank_in_region"),
])
print("Rankings:")
print(df_ranked.select(["product", "region", "sales", "rank_in_product", "rank_in_region"]).head(6))
# Cumulative sum and running stats
df_cumulative = df.sort("date").with_columns([
pl.col("sales").cum_sum().over("product").alias("cumsum_by_product"),
pl.col("sales").cum_mean().over("product").alias("cum_mean_by_product"),
])
print("\nCumulative stats by product:")
print(df_cumulative.select(["date", "product", "sales",
"cumsum_by_product", "cum_mean_by_product"]).head(6))
# Group-level stats without collapsing
df_group_stats = df.with_columns([
pl.col("sales").mean().over("product").alias("product_avg"),
pl.col("sales").sum().over("region").alias("region_total"),
(pl.col("sales") / pl.col("sales").sum().over("product")).alias("pct_of_product"),
])
print("\nGroup stats (no collapse):")
print(df_group_stats.head(6))
Lag, Lead & Shift Features
import polars as pl
import numpy as np
np.random.seed(42)
n = 60
df = pl.DataFrame({
"date": pl.date_range(pl.date(2024, 1, 1), period=f"{n}d", interval="1d", eager=True),
"product": ["A"]*(n//2) + ["B"]*(n//2),
"sales": np.random.exponential(100, n).round(2),
})
# Sort before lag/lead
df = df.sort(["product", "date"])
# Lag/lead features for time series
df_features = df.with_columns([
pl.col("sales").shift(1).over("product").alias("lag_1"),
pl.col("sales").shift(7).over("product").alias("lag_7"),
pl.col("sales").shift(-1).over("product").alias("lead_1"),
# Difference from previous period
(pl.col("sales") - pl.col("sales").shift(1).over("product")).alias("delta"),
# Percent change
((pl.col("sales") - pl.col("sales").shift(1).over("product")) /
pl.col("sales").shift(1).over("product")).round(4).alias("pct_change"),
# Rolling mean (window function style)
pl.col("sales").rolling_mean(window_size=7).over("product").alias("rolling_7"),
])
print("Lag/lead features:")
print(df_features.filter(pl.col("product")=="A").head(8))
Conditional Expressions & When/Then/Otherwise
import polars as pl
import numpy as np
np.random.seed(42)
n = 500
df = pl.DataFrame({
"customer_id": range(n),
"age": np.random.randint(18, 75, n),
"income": np.random.exponential(50000, n).round(0),
"credit_score": np.random.randint(300, 850, n),
"loan_amount": np.random.exponential(15000, n).round(0),
"on_time_payments": np.random.randint(0, 60, n),
"total_payments": np.random.randint(12, 60, n),
})
# Complex conditional expressions
df_scored = df.with_columns([
# Age category
pl.when(pl.col("age") < 25).then(pl.lit("Young"))
.when(pl.col("age") < 45).then(pl.lit("Middle"))
.otherwise(pl.lit("Senior")).alias("age_group"),
# Credit tier
pl.when(pl.col("credit_score") >= 750).then(pl.lit("Excellent"))
.when(pl.col("credit_score") >= 650).then(pl.lit("Good"))
.when(pl.col("credit_score") >= 550).then(pl.lit("Fair"))
.otherwise(pl.lit("Poor")).alias("credit_tier"),
# Payment ratio
(pl.col("on_time_payments") / pl.col("total_payments")).round(3).alias("payment_ratio"),
# Approval decision
pl.when(
(pl.col("credit_score") >= 650) &
(pl.col("income") >= 30000) &
(pl.col("loan_amount") <= pl.col("income") * 5)
).then(pl.lit("Approved")).otherwise(pl.lit("Denied")).alias("decision"),
])
print("Loan applications with features:")
print(df_scored.head(8))
# Summary by tier
print("\nApproval by credit tier:")
print(df_scored.group_by("credit_tier").agg([
pl.len().alias("n"),
(pl.col("decision")=="Approved").mean().round(3).alias("approval_rate"),
]).sort("credit_tier"))
import polars as pl
import numpy as np
from datetime import date, timedelta
np.random.seed(42)
n = 5000
customers = 500
purchase_dates = [date(2024,1,1) + timedelta(days=int(d))
for d in np.random.randint(0, 180, n)]
df = pl.DataFrame({
"purchase_date": pl.Series(purchase_dates),
"customer_id": np.random.randint(0, customers, n),
"amount": np.random.exponential(60, n).round(2),
"category": np.random.choice(["A","B","C"], n),
})
df = df.sort(["customer_id", "purchase_date"])
# Feature engineering with window functions
features = df.with_columns([
# Cumulative spend per customer
pl.col("amount").cum_sum().over("customer_id").alias("cumulative_spend"),
# Rank by amount within customer
pl.col("amount").rank("ordinal", descending=True).over("customer_id").alias("amount_rank"),
# Rolling 30-day purchase count (using row-based rolling as proxy)
pl.col("amount").rolling_mean(window_size=5).over("customer_id").alias("rolling_mean_5"),
# Delta from previous purchase
(pl.col("amount") - pl.col("amount").shift(1).over("customer_id")).alias("amount_delta"),
])
print("Feature matrix (first 10 rows):")
print(features.head(10))
# Customer-level summary
summary = features.group_by("customer_id").agg([
pl.col("cumulative_spend").last().round(2).alias("total_spend"),
pl.len().alias("n_purchases"),
pl.col("purchase_date").max().alias("last_purchase"),
pl.col("amount").std().round(2).alias("spend_volatility"),
]).sort("total_spend", descending=True)
print("\nCustomer summary (top 5):")
print(summary.head(5))
import polars as pl
import numpy as np
np.random.seed(42)
df = pl.DataFrame({
"group": np.random.choice(["A","B"], 20),
"value": np.random.randn(20).round(2),
"date": pl.date_range(pl.date(2024,1,1), period="20d", interval="1d", eager=True),
})
# 1. .sort(["group", "date"])
# 2. Add rank column: pl.col("value").rank("dense", descending=True).over("group")
# 3. Add cumsum: pl.col("value").cum_sum().over("group")
# 4. Add lag1: pl.col("value").shift(1).over("group")
# 5. Assert len(result) == len(df)
Polars consistently outperforms pandas 5-100x due to Rust implementation, SIMD operations, parallel query execution, and memory-efficient Apache Arrow format.
Benchmark: GroupBy & Aggregation
import polars as pl
import pandas as pd
import numpy as np
import time
np.random.seed(42)
n = 500_000
# Create test data
data = {
"id": range(n),
"category": np.random.choice(list("ABCDEFGHIJ"), n),
"sub_cat": np.random.choice(list("XYZ"), n),
"value1": np.random.randn(n) * 100,
"value2": np.random.exponential(50, n),
"count": np.random.randint(1, 100, n),
}
df_pd = pd.DataFrame(data)
df_pl = pl.DataFrame(data)
# Benchmark: complex groupby + aggregation
def pandas_groupby():
return (df_pd
.groupby(["category", "sub_cat"])
.agg({"value1": ["mean", "std", "sum"], "value2": ["mean", "max"], "count": "sum"})
)
def polars_groupby():
return (df_pl.lazy()
.group_by(["category", "sub_cat"])
.agg([
pl.col("value1").mean().alias("v1_mean"),
pl.col("value1").std().alias("v1_std"),
pl.col("value1").sum().alias("v1_sum"),
pl.col("value2").mean().alias("v2_mean"),
pl.col("value2").max().alias("v2_max"),
pl.col("count").sum().alias("count_sum"),
])
.collect()
)
# Time both
for name, fn in [("Pandas", pandas_groupby), ("Polars", polars_groupby)]:
times = []
for _ in range(3):
t0 = time.time()
result = fn()
times.append((time.time()-t0)*1000)
print(f"{name}: avg={sum(times)/len(times):.1f}ms, min={min(times):.1f}ms")
Filter & Join Benchmark
import polars as pl
import pandas as pd
import numpy as np
import time
np.random.seed(42)
n_main = 200_000
n_ref = 1_000
main_data = {
"user_id": np.random.randint(0, n_ref, n_main),
"product": np.random.choice(list("ABCDE"), n_main),
"amount": np.random.exponential(50, n_main),
"date": np.random.randint(0, 365, n_main),
}
ref_data = {
"user_id": range(n_ref),
"tier": np.random.choice(["Gold", "Silver", "Bronze"], n_ref),
"discount": np.random.uniform(0, 0.3, n_ref).round(3),
}
df_pd_main = pd.DataFrame(main_data)
df_pd_ref = pd.DataFrame(ref_data)
df_pl_main = pl.DataFrame(main_data)
df_pl_ref = pl.DataFrame(ref_data)
def pandas_join_filter():
merged = df_pd_main.merge(df_pd_ref, on="user_id", how="left")
filtered = merged[(merged["tier"]=="Gold") & (merged["amount"] > 50)]
return filtered.groupby("product")["amount"].agg(["sum","mean","count"])
def polars_join_filter():
return (df_pl_main.lazy()
.join(df_pl_ref.lazy(), on="user_id", how="left")
.filter((pl.col("tier")=="Gold") & (pl.col("amount") > 50))
.group_by("product")
.agg([pl.col("amount").sum(), pl.col("amount").mean(), pl.len()])
.collect())
for name, fn in [("Pandas", pandas_join_filter), ("Polars", polars_join_filter)]:
t0 = time.time()
result = fn()
t1 = time.time()
print(f"{name}: {(t1-t0)*1000:.1f}ms, rows={len(result)}")
print("\nSpeedups are typically 5-50x for large datasets on multi-core machines.")
Memory Efficiency & Migration Tips
import polars as pl
import pandas as pd
import numpy as np
import sys
np.random.seed(42)
n = 100_000
# Memory usage comparison
data = {
"id": range(n),
"name": [f"item_{i}" for i in range(n)],
"value": np.random.randn(n),
"category": np.random.choice(["A","B","C","D","E"], n),
"flag": np.random.choice([True, False], n),
}
df_pd = pd.DataFrame(data)
df_pl = pl.DataFrame(data)
pd_memory = df_pd.memory_usage(deep=True).sum() / 1024**2
# Polars uses Arrow format - typically more efficient
print(f"Pandas memory: {pd_memory:.2f} MB")
print(f"Polars dtypes: {dict(zip(df_pl.columns, df_pl.dtypes))}")
# Pandas -> Polars migration cheatsheet
print("\nPandas -> Polars Migration:")
migrations = [
("df.groupby('col').mean()", "df.group_by('col').agg(pl.all().mean())"),
("df[df['x'] > 0]", "df.filter(pl.col('x') > 0)"),
("df['col'].apply(fn)", "df.with_columns(pl.col('col').map_elements(fn))"),
("pd.merge(df1, df2, on='k')","df1.join(df2, on='k', how='inner')"),
("df.rename({'a':'b'},...)", "df.rename({'a': 'b'})"),
("df.drop('col', axis=1)", "df.drop('col')"),
("df.sort_values('col')", "df.sort('col')"),
("df.head(10)", "df.head(10)"),
]
for pandas_code, polars_code in migrations:
print(f" Pandas: {pandas_code}")
print(f" Polars: {polars_code}\n")
import polars as pl
import pandas as pd
import numpy as np
import time, tempfile, os
np.random.seed(42)
n = 200_000
# Simulate writing a CSV (in-memory for demo)
data = {
"transaction_id": range(n),
"customer_id": np.random.randint(0, 10000, n),
"product_id": np.random.randint(0, 500, n),
"amount": np.random.exponential(80, n).round(2),
"status": np.random.choice(["completed","pending","failed","cancelled"], n),
"date": np.random.randint(0, 365, n),
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f:
csv_path = f.name
import csv as csv_lib
writer = csv_lib.DictWriter(f, fieldnames=data.keys())
writer.writeheader()
for i in range(n):
writer.writerow({k: data[k][i] for k in data})
# Pandas pipeline
t0 = time.time()
df_pd = pd.read_csv(csv_path)
df_pd = df_pd[df_pd["status"] == "completed"]
result_pd = df_pd.groupby("product_id")["amount"].agg(["sum","mean","count"])
pd_time = (time.time()-t0)*1000
# Polars pipeline
t0 = time.time()
result_pl = (
pl.scan_csv(csv_path)
.filter(pl.col("status") == "completed")
.group_by("product_id")
.agg([pl.col("amount").sum(), pl.col("amount").mean(), pl.len()])
.collect()
)
pl_time = (time.time()-t0)*1000
os.unlink(csv_path)
print(f"Pandas: {pd_time:.0f}ms ({len(result_pd)} rows)")
print(f"Polars: {pl_time:.0f}ms ({len(result_pl)} rows)")
print(f"Speedup: {pd_time/pl_time:.1f}x")
import polars as pl
import pandas as pd
import numpy as np, time
np.random.seed(42)
n = 50_000
data = {"group": np.random.choice(list("ABCD"), n),
"value": np.random.randn(n), "flag": np.random.choice([True, False], n)}
df_pd = pd.DataFrame(data)
df_pl = pl.DataFrame(data)
# Pandas version (given):
# result = df_pd[df_pd["flag"]].groupby("group")["value"].agg(["sum","mean","std"])
# TODO: Write the equivalent Polars version using .filter() and .group_by()
# Time both and print speedup
Production Polars patterns include reading/writing Parquet, Arrow IPC, and cloud storage, integrating with ML pipelines, and building robust ETL pipelines with error handling.
Reading & Writing Parquet/Arrow
import polars as pl
import numpy as np
import tempfile, os, time
np.random.seed(42)
n = 100_000
df = pl.DataFrame({
"id": range(n),
"category": np.random.choice(list("ABCDE"), n),
"value": np.random.randn(n).round(4),
"date": pl.date_range(pl.date(2024,1,1), period=f"{n}d", interval="1d", eager=True)[:n],
"active": np.random.choice([True, False], n),
})
with tempfile.TemporaryDirectory() as tmpdir:
# Write formats
csv_path = os.path.join(tmpdir, "data.csv")
parquet_path = os.path.join(tmpdir, "data.parquet")
ipc_path = os.path.join(tmpdir, "data.arrow")
df.write_csv(csv_path)
df.write_parquet(parquet_path)
df.write_ipc(ipc_path)
sizes = {
"CSV": os.path.getsize(csv_path),
"Parquet": os.path.getsize(parquet_path),
"Arrow IPC": os.path.getsize(ipc_path),
}
print("File sizes:")
for fmt, size in sizes.items():
print(f" {fmt}: {size/1024:.1f} KB ({size/sizes['CSV']*100:.0f}% of CSV)")
# Read speeds
for fmt, read_fn, path in [
("CSV", lambda: pl.read_csv(csv_path), csv_path),
("Parquet", lambda: pl.read_parquet(parquet_path), parquet_path),
("Arrow", lambda: pl.read_ipc(ipc_path), ipc_path),
]:
t0 = time.time()
result = read_fn()
t_ms = (time.time()-t0)*1000
print(f" {fmt} read: {t_ms:.1f}ms, shape={result.shape}")
print("\nRecommendation: Use Parquet for storage (5-10x smaller + faster reads)")
Polars in ML Pipelines
import polars as pl
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
np.random.seed(42)
n = 2000
# Create Polars DataFrame
df = pl.DataFrame({
"age": np.random.randint(18, 70, n),
"income": np.random.exponential(50000, n).round(0),
"credit_score": np.random.randint(300, 850, n),
"debt_ratio": np.random.uniform(0, 1, n).round(3),
"employment_years": np.random.randint(0, 40, n),
"category": np.random.choice(["A","B","C"], n),
})
# Feature engineering in Polars
df_features = df.with_columns([
(pl.col("income") / (pl.col("age") + 1)).alias("income_per_age"),
(pl.col("credit_score") / 850).alias("credit_norm"),
pl.col("category").cast(pl.Categorical).cast(pl.Int8).alias("category_code"),
]).drop("category")
# Target
target = (df["credit_score"] > 650).cast(pl.Int32)
# Convert to numpy for sklearn
X = df_features.to_numpy()
y = target.to_numpy()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train_sc = scaler.fit_transform(X_train)
X_test_sc = scaler.transform(X_test)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_sc, y_train)
print(classification_report(y_test, model.predict(X_test_sc)))
# Feature importance back to Polars
importances = pl.DataFrame({
"feature": df_features.columns,
"importance": model.feature_importances_,
}).sort("importance", descending=True)
print("Feature importances:")
print(importances)
ETL Pipeline with Error Handling
import polars as pl
import numpy as np
import tempfile, os
np.random.seed(42)
n = 1000
# Simulate messy input data
data_rows = []
for i in range(n):
row = f"{i},{np.random.choice(['A','B','C'])},"
val = np.random.randn()
if np.random.random() < 0.05: row += "NULL" # missing value
elif np.random.random() < 0.02: row += "N/A" # invalid
else: row += f"{val:.4f}"
row += f",{np.random.randint(1, 100)}"
data_rows.append(row)
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("id,category,value,count\n")
f.write("\n".join(data_rows))
csv_path = f.name
def run_etl(path):
df = pl.read_csv(path, null_values=["NULL", "N/A", ""])
# Validate schema
n_before = len(df)
df = df.filter(pl.col("value").is_not_null())
n_after = len(df)
print(f"Removed {n_before - n_after} null rows ({(n_before-n_after)/n_before*100:.1f}%)")
# Type casting and validation
df = df.with_columns([
pl.col("value").cast(pl.Float64),
pl.col("count").cast(pl.Int32),
pl.col("category").cast(pl.Categorical),
])
# Remove outliers (|z-score| > 3)
mean_val = df["value"].mean()
std_val = df["value"].std()
df = df.filter(((pl.col("value") - mean_val) / std_val).abs() <= 3)
# Aggregate
result = df.group_by("category").agg([
pl.col("value").mean().round(4).alias("avg_value"),
pl.col("count").sum().alias("total_count"),
pl.len().alias("n_rows"),
]).sort("category")
return df, result
clean_df, summary = run_etl(csv_path)
print("\nETL Summary:")
print(summary)
os.unlink(csv_path)
import polars as pl
import numpy as np
import tempfile, os, json
np.random.seed(42)
n = 5000
# Messy raw data
raw = pl.DataFrame({
"customer_id": np.random.randint(0, 1000, n),
"amount": np.where(np.random.random(n) < 0.03, None, np.random.exponential(80, n).round(2)),
"category": np.random.choice(["Electronics","Clothing","Books",None,"Sports"], n),
"date_str": [f"2024-{np.random.randint(1,13):02d}-{np.random.randint(1,29):02d}" for _ in range(n)],
"status": np.random.choice(["COMPLETED","completed","Complete","FAILED","Cancelled"], n),
})
# Reference table
ref = pl.DataFrame({
"customer_id": range(1000),
"segment": np.random.choice(["Premium","Standard","Basic"], 1000),
})
def etl_pipeline(df, ref_df):
metrics = {}
n_input = len(df)
# 1. Clean and validate
df = (df
.filter(pl.col("amount").is_not_null())
.filter(pl.col("category").is_not_null())
.with_columns([
pl.col("status").str.to_uppercase().alias("status"),
pl.col("date_str").str.to_date().alias("date"),
pl.col("amount").clip(0, 10000),
])
.filter(pl.col("status") == "COMPLETED")
)
metrics["n_after_clean"] = len(df)
metrics["pct_kept"] = round(len(df)/n_input*100, 1)
# 2. Join with reference
df = df.join(ref_df, on="customer_id", how="left")
# 3. Feature engineering
df = df.with_columns([
pl.col("date").dt.month().alias("month"),
pl.col("date").dt.quarter().alias("quarter"),
])
# 4. Aggregation
summary = df.group_by(["segment", "quarter", "category"]).agg([
pl.col("amount").sum().round(2).alias("total_revenue"),
pl.len().alias("n_orders"),
pl.col("amount").mean().round(2).alias("avg_order"),
]).sort(["segment", "quarter"])
return df, summary, metrics
clean_df, summary, metrics = etl_pipeline(raw, ref)
print("ETL Metrics:", json.dumps(metrics, indent=2))
print("\nRevenue Summary (top 10):")
print(summary.sort("total_revenue", descending=True).head(10))
import polars as pl
import numpy as np
np.random.seed(42)
raw = pl.DataFrame({
"id": range(100),
"group": np.random.choice(["A","B",None,"C"], 100),
"value": np.where(np.random.random(100)<0.1, None, np.random.randn(100).round(2)),
"status": np.random.choice(["active","ACTIVE","Inactive"], 100),
})
lookup = pl.DataFrame({"group": ["A","B","C"], "label": ["Alpha","Beta","Gamma"]})
def etl(df, lookup_df):
# 1. Drop nulls
# 2. Normalize status to uppercase
# 3. Join with lookup on "group"
# 4. Group by label, compute mean+count of value
pass
print(etl(raw, lookup))