Loading Module…

πŸ—„οΈ SQL with Python

32 topics • Click any card to expand

1. Setup with sqlite3

Python's built-in sqlite3 module connects to relational databases β€” no server needed. Use :memory: for prototyping.

Connect, create table, insert rows
import sqlite3

conn = sqlite3.connect(":memory:")   # in-memory DB
cur  = conn.cursor()

cur.execute(
    "CREATE TABLE employees ("
    "  id INTEGER PRIMARY KEY,"
    "  name TEXT NOT NULL,"
    "  dept TEXT,"
    "  salary REAL,"
    "  hire_date TEXT)"
)

data = [
    (1, "Alice", "Engineering", 95000, "2022-03-15"),
    (2, "Bob",   "Marketing",   72000, "2021-08-01"),
    (3, "Carol", "Engineering", 88000, "2023-01-10"),
]
cur.executemany("INSERT INTO employees VALUES (?,?,?,?,?)", data)
conn.commit()

count = conn.execute("SELECT COUNT(*) FROM employees").fetchone()[0]
print(f"Inserted {count} rows.")
conn.close()
Row factory for dict-like access
import sqlite3

conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row           # rows behave like dicts

conn.execute("CREATE TABLE products (id INT, name TEXT, price REAL)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
    (1,"Widget",9.99),(2,"Gadget",49.99),(3,"Doohickey",19.99),
])

for row in conn.execute("SELECT * FROM products"):
    print(f"  [{row['id']}] {row['name']:12s} ${row['price']:.2f}")
conn.close()
Context manager and executescript
import sqlite3

# 'with' ensures commit on success, rollback on exception
with sqlite3.connect(":memory:") as conn:
    conn.executescript("""
        CREATE TABLE categories (id INT PRIMARY KEY, name TEXT);
        CREATE TABLE items (id INT, cat_id INT, name TEXT, qty INT);
        INSERT INTO categories VALUES (1,'Electronics'),(2,'Clothing');
        INSERT INTO items VALUES
            (1,1,'Laptop',10),(2,1,'Phone',25),
            (3,2,'T-Shirt',100),(4,2,'Jeans',50);
    """)

    rows = conn.execute(
        "SELECT c.name, COUNT(i.id) as items, SUM(i.qty) as total_qty "
        "FROM categories c JOIN items i ON c.id=i.cat_id "
        "GROUP BY c.id"
    ).fetchall()
    for r in rows:
        print(f"  {r[0]:14s} items={r[1]}  qty={r[2]}")
sqlite_master schema introspection
import sqlite3

conn = sqlite3.connect(":memory:")
conn.executescript("""
    CREATE TABLE employees (id INT PRIMARY KEY, name TEXT, dept TEXT, salary REAL);
    CREATE TABLE departments (id INT PRIMARY KEY, name TEXT, budget REAL);
    CREATE INDEX idx_dept ON employees(dept);
    CREATE INDEX idx_salary ON employees(salary);
""")

# Inspect all objects via sqlite_master
print("Schema objects:")
for row in conn.execute(
    "SELECT type, name, sql FROM sqlite_master ORDER BY type, name"
).fetchall():
    print(f"  [{row[0]:5s}] {row[1]}")

# List only tables
tables = conn.execute(
    "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
print("Tables:", [t[0] for t in tables])

# List columns of a table using PRAGMA
print("Columns in employees:")
for col in conn.execute("PRAGMA table_info(employees)").fetchall():
    print(f"  col#{col[0]} {col[1]:10s} type={col[2]:8s} notnull={col[3]} pk={col[5]}")

conn.close()
💼 Real-World: Sales Transaction Database Setup
A startup data analyst creates an in-memory SQLite database to prototype reporting queries before moving to PostgreSQL.
import sqlite3, random, datetime

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE sales ("
    "  id INT, customer TEXT, product TEXT,"
    "  amount REAL, sale_date TEXT)"
)

customers = ["Alice","Bob","Carol","Dave","Eve"]
products  = ["Widget","Gadget","Doohickey","Gizmo"]
base      = datetime.date(2024, 1, 1)

records = [
    (i, random.choice(customers), random.choice(products),
     round(random.uniform(20, 500), 2),
     (base + datetime.timedelta(days=random.randint(0, 180))).isoformat())
    for i in range(1, 1001)
]
conn.executemany("INSERT INTO sales VALUES (?,?,?,?,?)", records)
conn.commit()

row = conn.execute(
    "SELECT COUNT(*), ROUND(SUM(amount),2), ROUND(AVG(amount),2) FROM sales"
).fetchone()
print(f"Rows: {row[0]}  Total: ${row[1]:,.2f}  Avg: ${row[2]:.2f}")
conn.close()
🏋️ Practice: Design a Library Table Schema
Create an in-memory SQLite database with two tables: 'books' (id, title, author, year, genre) and 'members' (id, name, email). Insert at least 5 books and 3 members. Then query all books published after 2000, ordered by year descending.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")

# TODO: Create the 'books' table
# conn.execute("CREATE TABLE books (...)")

# TODO: Create the 'members' table
# conn.execute("CREATE TABLE members (...)")

# TODO: Insert at least 5 books
# conn.executemany("INSERT INTO books VALUES (?,?,?,?,?)", [...])

# TODO: Insert at least 3 members
# conn.executemany("INSERT INTO members VALUES (?,?,?)", [...])

conn.commit()

# TODO: Query all books published after 2000, ordered by year DESC
# rows = conn.execute("SELECT title, author, year FROM books WHERE ...").fetchall()
# for r in rows:
#     print(r)

conn.close()
2. SELECT & WHERE

SELECT retrieves columns. WHERE filters rows. Use LIKE, IN, BETWEEN, and IS NULL for flexible filtering.

Basic SELECT and WHERE
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    (1,"Alice","Eng",95000),(2,"Bob","Marketing",72000),
    (3,"Carol","Eng",88000),(4,"Dave","HR",65000),(5,"Eve","Marketing",78000),
])

# All Engineering employees, sorted by salary
rows = conn.execute(
    "SELECT name, salary FROM emp WHERE dept='Eng' ORDER BY salary DESC"
).fetchall()
print("Engineering staff:")
for r in rows: print(f"  {r[0]:8s} ${r[1]:,}")
conn.close()
LIKE, IN, BETWEEN, IS NULL
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (name TEXT, price REAL, category TEXT)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
    ("Apple",0.50,"Fruit"),("Avocado",1.50,"Fruit"),
    ("Milk",2.99,"Dairy"),("Cheese",4.50,"Dairy"),
    ("Bread",3.29,"Bakery"),("Cake",None,"Bakery"),
])

q = "SELECT name FROM products WHERE"
print(conn.execute(q + " name LIKE 'A%'").fetchall())
print(conn.execute(q + " category IN ('Dairy','Bakery')").fetchall())
print(conn.execute(q + " price BETWEEN 1 AND 4").fetchall())
print(conn.execute(q + " price IS NULL").fetchall())
conn.close()
ORDER BY, LIMIT, OFFSET and DISTINCT
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?)", [
    ("Alice","Eng",95000),("Bob","HR",65000),("Carol","Eng",88000),
    ("Dave","HR",70000),("Eve","Marketing",78000),("Frank","Eng",102000),
])

# Top 3 earners
print("Top 3 earners:")
for r in conn.execute(
    "SELECT name, salary FROM emp ORDER BY salary DESC LIMIT 3"
).fetchall():
    print(f"  {r[0]:8s} ${r[1]:,}")

# Page 2 (rows 3-4) when sorting by name
print("Page 2 by name:")
for r in conn.execute(
    "SELECT name FROM emp ORDER BY name LIMIT 2 OFFSET 2"
).fetchall():
    print(f"  {r[0]}")

# Distinct departments
depts = conn.execute("SELECT DISTINCT dept FROM emp ORDER BY dept").fetchall()
print("Departments:", [d[0] for d in depts])
conn.close()
CASE WHEN, COALESCE, NULLIF, IS NULL
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL, bonus REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    ("Alice","Eng",95000,5000),("Bob","HR",65000,None),
    ("Carol","Eng",88000,3000),("Dave","Marketing",72000,None),
    ("Eve","Eng",110000,8000),("Frank","HR",60000,0),
])

rows = conn.execute(
    "SELECT name, dept, salary,"
    "       COALESCE(bonus, 0) as safe_bonus,"
    "       CASE "
    "         WHEN salary >= 100000 THEN 'Senior'"
    "         WHEN salary >= 80000  THEN 'Mid'"
    "         ELSE 'Junior'"
    "       END as band,"
    "       NULLIF(bonus, 0) as nonzero_bonus "
    "FROM emp ORDER BY salary DESC"
).fetchall()

print(f"{'Name':8s} {'Band':7s} {'Salary':>9s} {'Bonus':>7s} {'NZ Bonus':>10s}")
for r in rows:
    print(f"{r[0]:8s} {r[4]:7s} ${r[2]:>8,} ${r[3]:>6,.0f} {str(r[5]):>10s}")

# IS NULL / IS NOT NULL filter
missing = conn.execute(
    "SELECT name FROM emp WHERE bonus IS NULL OR bonus = 0"
).fetchall()
print("No effective bonus:", [m[0] for m in missing])
conn.close()
💼 Real-World: Priority Order Fulfillment Filter
An e-commerce backend filters pending high-value orders for the fulfillment team's priority queue.
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE orders ("
    "  id INT, customer TEXT, status TEXT, amount REAL, order_date TEXT)"
)
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
    (1,"Alice","shipped",120.0,"2024-01-05"),
    (2,"Bob","pending",80.0,"2024-01-10"),
    (3,"Alice","delivered",250.0,"2024-01-12"),
    (4,"Carol","pending",450.0,"2024-01-15"),
    (5,"Bob","shipped",310.0,"2024-01-18"),
    (6,"Dave","cancelled",90.0,"2024-01-20"),
])

rows = conn.execute(
    "SELECT id, customer, amount, order_date "
    "FROM orders "
    "WHERE status IN ('pending','shipped') AND amount > 100 "
    "ORDER BY amount DESC"
).fetchall()

print("Priority orders:")
for r in rows:
    print(f"  #{r[0]} {r[1]:8s} ${r[2]:6.2f}  {r[3]}")
conn.close()
🏋️ Practice: SELECT with WHERE, ORDER BY, and LIMIT
Build an employees table with columns: id, name, department, salary, city. Insert 8+ rows. Then write three queries: 1) All employees in 'Engineering' earning over 80000, sorted by salary DESC. 2) Employees from 'NYC' or 'LA' using IN. 3) The top 3 earners company-wide using LIMIT.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE employees (id INT, name TEXT, department TEXT, salary REAL, city TEXT)")
conn.executemany("INSERT INTO employees VALUES (?,?,?,?,?)", [
    # TODO: add at least 8 rows across multiple departments and cities
    # (1, "Alice", "Engineering", 95000, "NYC"),
    # ...
])
conn.commit()

# TODO: Query 1 β€” Engineering employees earning > 80000, sorted by salary DESC
# rows = conn.execute("SELECT name, salary FROM employees WHERE ...").fetchall()
# print("Engineering >80k:")
# for r in rows: print(f"  {r[0]:10s} ${r[1]:,}")

# TODO: Query 2 β€” Employees in NYC or LA
# rows = conn.execute("SELECT name, city FROM employees WHERE city IN (...)").fetchall()
# print("NYC/LA staff:", rows)

# TODO: Query 3 β€” Top 3 earners company-wide
# rows = conn.execute("SELECT name, salary FROM employees ORDER BY ... LIMIT ...").fetchall()
# print("Top 3:", rows)

conn.close()
3. GROUP BY, Aggregates & HAVING

Aggregate functions (COUNT, SUM, AVG, MAX) summarize groups. HAVING filters groups after aggregation β€” like WHERE but for groups.

COUNT, SUM, AVG per group
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (region TEXT, rep TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("North","Alice",1000),("North","Bob",1500),
    ("South","Carol",800),("South","Dave",1200),
    ("East","Eve",2000),("East","Alice",900),
])

rows = conn.execute(
    "SELECT region,"
    "       COUNT(*) as reps,"
    "       SUM(amount) as total,"
    "       ROUND(AVG(amount),0) as avg_amt,"
    "       MAX(amount) as top "
    "FROM sales GROUP BY region ORDER BY total DESC"
).fetchall()
print(f"{'Region':8s} {'Reps':>5} {'Total':>8} {'Avg':>8} {'Top':>8}")
for r in rows: print(f"{r[0]:8s} {r[1]:>5} {r[2]:>8} {r[3]:>8} {r[4]:>8}")
conn.close()
HAVING β€” filter groups by aggregate
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?)", [
    ("Eng",120000),("Eng",95000),("Eng",110000),
    ("HR",65000),
    ("Marketing",85000),("Marketing",90000),("Marketing",72000),
])

rows = conn.execute(
    "SELECT dept, COUNT(*) as headcount, ROUND(AVG(salary),0) as avg_sal "
    "FROM emp "
    "GROUP BY dept "
    "HAVING COUNT(*) >= 2 "
    "ORDER BY avg_sal DESC"
).fetchall()
for r in rows: print(r)
conn.close()
Multi-column GROUP BY and ROLLUP simulation
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (region TEXT, product TEXT, qty INT, revenue REAL)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?)", [
    ("North","Widget",10,500),("North","Gadget",5,250),
    ("South","Widget",8,400),("South","Widget",12,600),
    ("East","Gadget",20,1000),("East","Widget",3,150),
])

# Group by two columns
print("Region + Product breakdown:")
for r in conn.execute(
    "SELECT region, product, SUM(qty) as units, ROUND(SUM(revenue),2) as rev "
    "FROM orders GROUP BY region, product ORDER BY region, rev DESC"
).fetchall():
    print(f"  {r[0]:8s} {r[1]:8s}  units={r[2]}  rev=${r[3]:.2f}")

# Simulated ROLLUP: per-region total
print("Per-region totals:")
for r in conn.execute(
    "SELECT region, SUM(qty) as total_units, ROUND(SUM(revenue),2) as total_rev "
    "FROM orders GROUP BY region ORDER BY total_rev DESC"
).fetchall():
    print(f"  {r[0]:8s}  units={r[1]}  rev=${r[2]:.2f}")
conn.close()
Conditional aggregation with SUM(CASE WHEN)
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, product TEXT, amount REAL, status TEXT)")
conn.executemany("INSERT INTO sales VALUES (?,?,?,?)", [
    ("Alice","Widget",1200,"won"),("Alice","Gadget",800,"lost"),
    ("Alice","Widget",500,"won"),("Bob","Gadget",1500,"won"),
    ("Bob","Widget",300,"lost"),("Bob","Gadget",900,"won"),
    ("Carol","Widget",2000,"won"),("Carol","Gadget",600,"lost"),
])

# Pivot-style: won vs lost amounts per rep in a single query
rows = conn.execute(
    "SELECT rep,"
    "       COUNT(*) as deals,"
    "       SUM(CASE WHEN status='won'  THEN amount ELSE 0 END) as won_amt,"
    "       SUM(CASE WHEN status='lost' THEN amount ELSE 0 END) as lost_amt,"
    "       ROUND(100.0 * SUM(CASE WHEN status='won' THEN 1 ELSE 0 END) / COUNT(*), 1) as win_pct "
    "FROM sales GROUP BY rep ORDER BY won_amt DESC"
).fetchall()

print(f"{'Rep':8s} {'Deals':>6} {'Won $':>8} {'Lost $':>8} {'Win%':>6}")
for r in rows:
    print(f"{r[0]:8s} {r[1]:>6} ${r[2]:>7,.0f} ${r[3]:>7,.0f} {r[4]:>5.1f}%")
conn.close()
💼 Real-World: Monthly Spend Dashboard
A finance analyst builds a monthly category spending dashboard from raw transaction records.
import sqlite3, random, datetime

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE txn (user_id INT, category TEXT, amount REAL, date TEXT)")

cats = ["Food","Transport","Shopping","Entertainment","Utilities"]
rows = [
    (random.randint(1,100), random.choice(cats),
     round(random.uniform(5,300),2),
     (datetime.date(2024,1,1)+datetime.timedelta(days=random.randint(0,89))).isoformat())
    for _ in range(2000)
]
conn.executemany("INSERT INTO txn VALUES (?,?,?,?)", rows)

report = conn.execute(
    "SELECT category, COUNT(*) as txns,"
    "       ROUND(SUM(amount),2) as total,"
    "       ROUND(AVG(amount),2) as avg_amt,"
    "       ROUND(SUM(amount)*100.0/(SELECT SUM(amount) FROM txn),1) as pct "
    "FROM txn GROUP BY category ORDER BY total DESC"
).fetchall()

print(f"{'Category':15s} {'Txns':>5} {'Total':>10} {'Avg':>8} {'%':>6}")
print("-" * 50)
for r in report:
    print(f"{r[0]:15s} {r[1]:>5} ${r[2]:>9,.2f} ${r[3]:>7.2f} {r[4]:>5.1f}%")
conn.close()
🏋️ Practice: GROUP BY with HAVING Aggregation
Create a 'sales' table with columns: rep (TEXT), region (TEXT), amount (REAL). Insert 10+ rows. Write queries to: 1) Show total and average sales per region, ordered by total DESC. 2) Use HAVING to list only reps whose total sales exceed 3000. 3) Find the region with the single highest average sale amount.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, region TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    # TODO: insert 10+ rows, vary reps and regions
    # ("Alice", "North", 1200), ...
])
conn.commit()

# TODO: Query 1 β€” total and avg per region, ORDER BY total DESC
# rows = conn.execute(
#     "SELECT region, SUM(amount) as total, ROUND(AVG(amount),2) as avg_amt "
#     "FROM sales GROUP BY region ORDER BY total DESC"
# ).fetchall()
# for r in rows: print(r)

# TODO: Query 2 β€” reps with total sales > 3000
# rows = conn.execute(
#     "SELECT rep, SUM(amount) as total FROM sales "
#     "GROUP BY rep HAVING total > 3000"
# ).fetchall()
# print("High performers:", rows)

# TODO: Query 3 β€” region with highest average sale
# row = conn.execute(
#     "SELECT region, ROUND(AVG(amount),2) as avg_amt FROM sales "
#     "GROUP BY region ORDER BY avg_amt DESC LIMIT 1"
# ).fetchone()
# print("Best avg region:", row)

conn.close()
4. JOINs

JOINs combine rows from multiple tables. INNER keeps only matches; LEFT keeps all rows from the left table even without a match.

INNER JOIN and LEFT JOIN
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE dept (id INT, name TEXT)")
conn.execute("CREATE TABLE emp  (id INT, name TEXT, dept_id INT, salary REAL)")
conn.executemany("INSERT INTO dept VALUES (?,?)", [(1,"Eng"),(2,"HR"),(3,"Sales")])
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    (1,"Alice",1,100000),(2,"Bob",2,65000),
    (3,"Carol",1,95000),(4,"Dave",3,80000),(5,"Eve",None,75000),
])

print("INNER JOIN:")
for r in conn.execute(
    "SELECT e.name, d.name, e.salary "
    "FROM emp e INNER JOIN dept d ON e.dept_id=d.id"
).fetchall(): print(" ", r)

print("LEFT JOIN (all employees):")
for r in conn.execute(
    "SELECT e.name, COALESCE(d.name,'Unknown'), e.salary "
    "FROM emp e LEFT JOIN dept d ON e.dept_id=d.id"
).fetchall(): print(" ", r)
conn.close()
Multi-table JOIN with aggregation
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE customers (id INT, name TEXT, city TEXT)")
conn.execute("CREATE TABLE orders    (id INT, cust_id INT, amount REAL)")
conn.executemany("INSERT INTO customers VALUES (?,?,?)", [
    (1,"Alice","NYC"),(2,"Bob","LA"),(3,"Carol","NYC"),(4,"Dave","Chicago"),
])
conn.executemany("INSERT INTO orders VALUES (?,?,?)", [
    (1,1,150),(2,1,200),(3,2,80),(4,3,320),(5,3,100),
])

rows = conn.execute(
    "SELECT c.name, c.city, COUNT(o.id) as orders, "
    "       COALESCE(SUM(o.amount),0) as spent "
    "FROM customers c LEFT JOIN orders o ON c.id=o.cust_id "
    "GROUP BY c.id ORDER BY spent DESC"
).fetchall()
for r in rows: print(r)
conn.close()
Self-JOIN and multiple LEFT JOINs
import sqlite3

conn = sqlite3.connect(":memory:")
# Self-join: employee + their manager
conn.execute("CREATE TABLE emp (id INT, name TEXT, mgr_id INT, dept TEXT)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    (1,"CEO",None,"Exec"),(2,"CTO",1,"Tech"),(3,"CFO",1,"Finance"),
    (4,"Dev Lead",2,"Tech"),(5,"Alice",4,"Tech"),(6,"Bob",4,"Tech"),
])

print("Employee -> Manager:")
for r in conn.execute(
    "SELECT e.name as employee, COALESCE(m.name,'β€”') as manager, e.dept "
    "FROM emp e LEFT JOIN emp m ON e.mgr_id=m.id "
    "ORDER BY e.id"
).fetchall():
    print(f"  {r[0]:10s}  manager={r[1]:10s}  dept={r[2]}")

# Three-table join
conn.execute("CREATE TABLE projects (id INT, name TEXT, lead_id INT)")
conn.executemany("INSERT INTO projects VALUES (?,?,?)", [
    (1,"Alpha",4),(2,"Beta",2),(3,"Gamma",3),
])

print("Projects with leads and their department:")
for r in conn.execute(
    "SELECT p.name, e.name as lead, e.dept "
    "FROM projects p JOIN emp e ON p.lead_id=e.id"
).fetchall():
    print(f"  {r[0]:8s}  lead={r[1]:10s}  dept={r[2]}")
conn.close()
CROSS JOIN for cartesian product
import sqlite3

conn = sqlite3.connect(":memory:")

# CROSS JOIN produces every combination of rows (cartesian product)
conn.execute("CREATE TABLE sizes  (size TEXT)")
conn.execute("CREATE TABLE colors (color TEXT)")
conn.executemany("INSERT INTO sizes  VALUES (?)", [("S",),("M",),("L",),("XL",)])
conn.executemany("INSERT INTO colors VALUES (?)", [("Red",),("Blue",),("Green",)])

print("All size/color combinations (CROSS JOIN):")
variants = conn.execute(
    "SELECT s.size, c.color FROM sizes s CROSS JOIN colors c ORDER BY s.size, c.color"
).fetchall()
for r in variants:
    print(f"  {r[0]:4s} / {r[1]}")
print(f"Total variants: {len(variants)}")

# Use CROSS JOIN to build a multiplication table
print("3x3 multiplication table:")
conn.execute("CREATE TABLE nums (n INT)")
conn.executemany("INSERT INTO nums VALUES (?)", [(1,),(2,),(3,)])
for r in conn.execute(
    "SELECT a.n, b.n, a.n * b.n as product "
    "FROM nums a CROSS JOIN nums b ORDER BY a.n, b.n"
).fetchall():
    print(f"  {r[0]} x {r[1]} = {r[2]}")
conn.close()
💼 Real-World: Inventory & Sales Cross-Reference
A warehouse manager joins product, inventory, and sales tables to identify stock risk for top-selling items.
import sqlite3

conn = sqlite3.connect(":memory:")
for ddl in [
    "CREATE TABLE products  (id INT, name TEXT, category TEXT)",
    "CREATE TABLE inventory (product_id INT, warehouse TEXT, qty INT)",
    "CREATE TABLE sales_30d (product_id INT, qty_sold INT)",
]:
    conn.execute(ddl)

conn.executemany("INSERT INTO products VALUES (?,?,?)", [
    (1,"Widget","Hardware"),(2,"Gadget","Electronics"),
    (3,"Doohickey","Hardware"),(4,"Gizmo","Electronics"),
])
conn.executemany("INSERT INTO inventory VALUES (?,?,?)", [
    (1,"East",500),(1,"West",300),(2,"East",80),(3,"West",200),(4,"East",50),
])
conn.executemany("INSERT INTO sales_30d VALUES (?,?)", [
    (1,420),(2,75),(3,60),(4,45),
])

rows = conn.execute(
    "SELECT p.name, p.category, SUM(i.qty) as stock, "
    "       COALESCE(s.qty_sold,0) as sold_30d, "
    "       ROUND(SUM(i.qty)*1.0/NULLIF(COALESCE(s.qty_sold,0),0),1) as weeks "
    "FROM products p "
    "JOIN inventory i ON p.id=i.product_id "
    "LEFT JOIN sales_30d s ON p.id=s.product_id "
    "GROUP BY p.id ORDER BY weeks ASC"
).fetchall()
print(f"{'Product':12s} {'Category':12s} {'Stock':>6} {'Sold':>6} {'Weeks':>6}")
for r in rows:
    print(f"{r[0]:12s} {r[1]:12s} {r[2]:>6} {r[3]:>6} {str(r[4]):>6}")
conn.close()
🏋️ Practice: INNER and LEFT JOIN Queries
Create two tables: 'students' (id, name, grade) and 'enrollments' (student_id, course, score). Insert 5 students and 8 enrollments (some students have no enrollment). Write: 1) An INNER JOIN to show each student's course and score. 2) A LEFT JOIN to include students with no enrollment (show NULL for course). 3) Add a GROUP BY to show each student's average score.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE students (id INT, name TEXT, grade TEXT)")
conn.execute("CREATE TABLE enrollments (student_id INT, course TEXT, score REAL)")

conn.executemany("INSERT INTO students VALUES (?,?,?)", [
    # TODO: 5 students, at least one with no enrollment
    # (1,"Alice","A"), ...
])
conn.executemany("INSERT INTO enrollments VALUES (?,?,?)", [
    # TODO: 8 enrollments across the enrolled students
    # (1,"Math",88), ...
])
conn.commit()

# TODO: INNER JOIN β€” student name, course, score
# rows = conn.execute(
#     "SELECT s.name, e.course, e.score "
#     "FROM students s INNER JOIN enrollments e ON s.id=e.student_id"
# ).fetchall()
# print("Enrolled:")
# for r in rows: print(" ", r)

# TODO: LEFT JOIN β€” all students (NULL course for unenrolled)
# rows = conn.execute(
#     "SELECT s.name, COALESCE(e.course,'β€”') as course "
#     "FROM students s LEFT JOIN enrollments e ON s.id=e.student_id"
# ).fetchall()
# print("All students:")
# for r in rows: print(" ", r)

# TODO: Average score per student
# rows = conn.execute(
#     "SELECT s.name, ROUND(AVG(e.score),1) as avg_score "
#     "FROM students s JOIN enrollments e ON s.id=e.student_id "
#     "GROUP BY s.id ORDER BY avg_score DESC"
# ).fetchall()
# print("Avg scores:", rows)

conn.close()
5. Subqueries & CTEs

Subqueries run a query inside another. CTEs (WITH clause) make complex logic readable and reusable β€” they're like named temp tables.

Scalar subquery and IN subquery
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?)", [
    ("Alice","Eng",120000),("Bob","HR",65000),("Carol","Eng",95000),
    ("Dave","Mkt",80000),("Eve","Eng",110000),("Frank","HR",72000),
])

# Employees earning more than the company average
rows = conn.execute(
    "SELECT name, dept, salary FROM emp "
    "WHERE salary > (SELECT AVG(salary) FROM emp) "
    "ORDER BY salary DESC"
).fetchall()
print("Above-average earners:")
for r in rows: print(f"  {r[0]:8s} {r[1]:5s} ${r[2]:,}")
conn.close()
CTE (WITH clause)
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, amount REAL, month TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?)", [
    (1,"Alice",100,"Jan"),(2,"Bob",200,"Jan"),
    (3,"Alice",150,"Feb"),(4,"Carol",300,"Feb"),
    (5,"Bob",250,"Mar"),(6,"Alice",400,"Mar"),
])

rows = conn.execute(
    "WITH monthly AS ("
    "  SELECT month, SUM(amount) as total FROM orders GROUP BY month"
    "),"
    "avg_m AS (SELECT AVG(total) as avg_total FROM monthly) "
    "SELECT m.month, m.total, "
    "       ROUND(m.total - a.avg_total, 2) as vs_avg "
    "FROM monthly m, avg_m a ORDER BY m.month"
).fetchall()
for r in rows: print(r)
conn.close()
Correlated subquery and EXISTS
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    (1,"Alice","Eng",120000),(2,"Bob","HR",65000),(3,"Carol","Eng",95000),
    (4,"Dave","Mkt",80000),(5,"Eve","Eng",110000),(6,"Frank","HR",72000),
])

# Correlated subquery: employees earning above their own dept average
print("Above dept average:")
for r in conn.execute(
    "SELECT name, dept, salary, "
    "       ROUND((SELECT AVG(salary) FROM emp e2 WHERE e2.dept=e1.dept),0) as dept_avg "
    "FROM emp e1 "
    "WHERE salary > (SELECT AVG(salary) FROM emp e2 WHERE e2.dept=e1.dept) "
    "ORDER BY dept, salary DESC"
).fetchall():
    print(f"  {r[0]:8s} {r[1]:5s} ${r[2]:,}  dept_avg=${r[3]:,}")

# EXISTS subquery: departments that have at least one employee over 100k
print("Depts with 100k+ earner:")
for r in conn.execute(
    "SELECT DISTINCT dept FROM emp e1 "
    "WHERE EXISTS (SELECT 1 FROM emp e2 WHERE e2.dept=e1.dept AND e2.salary>100000)"
).fetchall():
    print(f"  {r[0]}")
conn.close()
Scalar subquery in SELECT and multi-CTE chain
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, product TEXT, amount REAL, month INT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
    (1,"Alice","Widget",200,1),(2,"Bob","Gadget",150,1),(3,"Alice","Gadget",300,2),
    (4,"Carol","Widget",450,2),(5,"Bob","Widget",120,3),(6,"Alice","Gizmo",500,3),
    (7,"Carol","Gadget",250,3),(8,"Dave","Widget",80,1),
])

# Scalar subquery in SELECT: show each order's % of grand total
print("Each order as % of grand total:")
for r in conn.execute(
    "SELECT cust, product, amount,"
    "       ROUND(amount * 100.0 / (SELECT SUM(amount) FROM orders), 1) as pct_of_total "
    "FROM orders ORDER BY amount DESC"
).fetchall():
    print(f"  {r[0]:8s} {r[1]:8s} ${r[2]:>6.0f}  ({r[3]}%)")

# Multi-CTE chain: monthly totals -> rank months -> show top 2
print("Top 2 months by revenue:")
for r in conn.execute(
    "WITH monthly AS ("
    "  SELECT month, SUM(amount) as total FROM orders GROUP BY month"
    "),"
    "ranked AS ("
    "  SELECT month, total, RANK() OVER (ORDER BY total DESC) as rnk FROM monthly"
    ") "
    "SELECT month, total, rnk FROM ranked WHERE rnk <= 2 ORDER BY rnk"
).fetchall():
    print(f"  Month {r[0]}  total=${r[1]:.0f}  rank={r[2]}")
conn.close()
💼 Real-World: Customer Conversion Funnel Analysis
A product analyst uses CTEs to compute step-by-step conversion rates through the signup funnel.
import sqlite3, random, datetime

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE events (user_id INT, event TEXT, ts TEXT)")

random.seed(5)
funnel = ["signup","view_product","add_to_cart","purchase"]
rows = []
for uid in range(1, 501):
    n = random.randint(1, 4)
    base = datetime.datetime(2024,1,1) + datetime.timedelta(days=random.randint(0,60))
    for ev in funnel[:n]:
        rows.append((uid, ev, (base+datetime.timedelta(hours=random.randint(1,48))).isoformat()))
        base += datetime.timedelta(days=random.randint(0,3))
conn.executemany("INSERT INTO events VALUES (?,?,?)", rows)

report = conn.execute(
    "WITH steps AS ("
    "  SELECT event, COUNT(DISTINCT user_id) as users "
    "  FROM events "
    "  WHERE event IN ('signup','view_product','add_to_cart','purchase') "
    "  GROUP BY event"
    "),"
    "top AS (SELECT users as n FROM steps WHERE event='signup') "
    "SELECT event, users, "
    "       ROUND(users*100.0/(SELECT n FROM top),1) as pct "
    "FROM steps ORDER BY users DESC"
).fetchall()

print("Conversion Funnel:")
for r in report: print(f"  {r[0]:16s} {r[1]:4d} users ({r[2]}%)")
conn.close()
🏋️ Practice: Subquery in WHERE and CTE
Using an 'orders' table (id, customer, product, amount, order_date): 1) Write a subquery in WHERE to find orders with amount above the overall average. 2) Write a subquery in FROM (derived table) to get the top customer by total spend. 3) Write a WITH clause CTE that computes monthly totals, then selects only months above the average monthly total.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, customer TEXT, product TEXT, amount REAL, order_date TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
    (1,"Alice","Widget",150,"2024-01-05"),
    (2,"Bob","Gadget",80,"2024-01-10"),
    (3,"Alice","Gadget",220,"2024-02-03"),
    (4,"Carol","Widget",310,"2024-02-15"),
    (5,"Bob","Widget",95,"2024-03-01"),
    (6,"Alice","Gizmo",400,"2024-03-20"),
    (7,"Carol","Gadget",175,"2024-04-08"),
    (8,"Dave","Widget",60,"2024-04-12"),
])
conn.commit()

# TODO: 1. Orders above overall average amount
# rows = conn.execute(
#     "SELECT customer, amount FROM orders "
#     "WHERE amount > (SELECT AVG(amount) FROM orders) "
#     "ORDER BY amount DESC"
# ).fetchall()
# print("Above avg:", rows)

# TODO: 2. Top customer by total spend (subquery in FROM)
# row = conn.execute(
#     "SELECT customer, total FROM "
#     "  (SELECT customer, SUM(amount) as total FROM orders GROUP BY customer) "
#     "ORDER BY total DESC LIMIT 1"
# ).fetchone()
# print("Top customer:", row)

# TODO: 3. CTE β€” months above average monthly total
# rows = conn.execute(
#     "WITH monthly AS ( "
#     "  SELECT strftime('%Y-%m', order_date) as month, SUM(amount) as total "
#     "  FROM orders GROUP BY month "
#     ") "
#     "SELECT month, total FROM monthly "
#     "WHERE total > (SELECT AVG(total) FROM monthly) "
#     "ORDER BY total DESC"
# ).fetchall()
# print("Above-avg months:", rows)

conn.close()
6. Window Functions

Window functions compute values across rows related to the current row β€” ranking, running totals, lag/lead β€” without collapsing groups.

RANK, SUM OVER, percent of total
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (month TEXT, rep TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("Jan","Alice",5000),("Jan","Bob",4000),("Jan","Carol",6000),
    ("Feb","Alice",5500),("Feb","Bob",3800),("Feb","Carol",7000),
])

rows = conn.execute(
    "SELECT month, rep, amount,"
    "       RANK() OVER (PARTITION BY month ORDER BY amount DESC) as rnk,"
    "       SUM(amount) OVER (PARTITION BY month) as month_total,"
    "       ROUND(amount*100.0/SUM(amount) OVER (PARTITION BY month),1) as pct "
    "FROM sales ORDER BY month, rnk"
).fetchall()
for r in rows: print(r)
conn.close()
LAG, LEAD, moving average
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE rev (week INT, store TEXT, rev REAL)")
conn.executemany("INSERT INTO rev VALUES (?,?,?)", [
    (1,"A",10000),(2,"A",12000),(3,"A",9000),(4,"A",14000),(5,"A",11000),
    (1,"B",8000),(2,"B",9500),(3,"B",11000),(4,"B",10500),(5,"B",12000),
])

rows = conn.execute(
    "SELECT week, store, rev,"
    "       LAG(rev) OVER (PARTITION BY store ORDER BY week) as prev,"
    "       rev - LAG(rev) OVER (PARTITION BY store ORDER BY week) as wow,"
    "       AVG(rev) OVER (PARTITION BY store ORDER BY week "
    "                      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as ma3 "
    "FROM rev ORDER BY store, week"
).fetchall()
for r in rows: print(r)
conn.close()
ROW_NUMBER, NTILE, and running total
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE scores (student TEXT, subject TEXT, score INT)")
conn.executemany("INSERT INTO scores VALUES (?,?,?)", [
    ("Alice","Math",92),("Bob","Math",78),("Carol","Math",85),
    ("Dave","Math",91),("Eve","Math",67),
    ("Alice","Science",88),("Bob","Science",95),("Carol","Science",72),
])

# ROW_NUMBER and RANK per subject
print("Math rankings:")
for r in conn.execute(
    "SELECT student, score,"
    "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num,"
    "       RANK()       OVER (ORDER BY score DESC) as rnk "
    "FROM scores WHERE subject='Math'"
).fetchall():
    print(f"  {r[0]:8s}  score={r[1]}  row={r[2]}  rank={r[3]}")

# Running total
print("Running total (Math by score):")
for r in conn.execute(
    "SELECT student, score,"
    "       SUM(score) OVER (ORDER BY score DESC ROWS UNBOUNDED PRECEDING) as running_total "
    "FROM scores WHERE subject='Math' ORDER BY score DESC"
).fetchall():
    print(f"  {r[0]:8s}  score={r[1]}  running_total={r[2]}")
conn.close()
FIRST_VALUE, LAST_VALUE, and NTILE
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, month TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("Alice","Jan",5000),("Alice","Feb",6200),("Alice","Mar",4800),
    ("Alice","Apr",7100),("Alice","May",5500),("Alice","Jun",6800),
    ("Bob","Jan",4200),("Bob","Feb",3800),("Bob","Mar",5100),
    ("Bob","Apr",4600),("Bob","May",5900),("Bob","Jun",4300),
])

# FIRST_VALUE and LAST_VALUE show best/worst months in the window
print("First and last month amount per rep (by month order):")
for r in conn.execute(
    "SELECT rep, month, amount,"
    "       FIRST_VALUE(amount) OVER (PARTITION BY rep ORDER BY month "
    "                                 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as first_month,"
    "       LAST_VALUE(amount)  OVER (PARTITION BY rep ORDER BY month "
    "                                 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_month "
    "FROM sales ORDER BY rep, month"
).fetchall():
    print(f"  {r[0]:6s} {r[1]:4s} ${r[2]:>6,.0f}  first=${r[3]:>6,.0f}  last=${r[4]:>6,.0f}")

# NTILE divides rows into N equal buckets (quartiles here)
print("NTILE(3) buckets for Alice:")
for r in conn.execute(
    "SELECT month, amount, NTILE(3) OVER (ORDER BY amount DESC) as bucket "
    "FROM sales WHERE rep='Alice' ORDER BY bucket, amount DESC"
).fetchall():
    print(f"  bucket={r[2]}  {r[0]}  ${r[1]:,.0f}")
conn.close()
💼 Real-World: Moving Average Trading Signals
A quant uses window functions to flag days when price crosses above the 3-period moving average β€” a simple trend signal.
import sqlite3, random, datetime

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE prices (id INT, symbol TEXT, price REAL, date TEXT)")

random.seed(10)
for sym in ["AAPL","GOOG"]:
    p = random.uniform(150, 400)
    for d in range(20):
        p *= (1 + random.gauss(0.001, 0.015))
        date = (datetime.date(2024,1,2)+datetime.timedelta(days=d)).isoformat()
        conn.execute("INSERT INTO prices VALUES (?,?,?,?)",
                     (d+1 if sym=="AAPL" else d+21, sym, round(p,2), date))
conn.commit()

result = conn.execute(
    "SELECT symbol, date, price,"
    "       ROUND(AVG(price) OVER (PARTITION BY symbol ORDER BY date "
    "                              ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),2) as ma3,"
    "       CASE WHEN price > AVG(price) OVER "
    "            (PARTITION BY symbol ORDER BY date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
    "            THEN 'BUY' ELSE 'HOLD' END as signal "
    "FROM prices WHERE symbol='AAPL' ORDER BY date LIMIT 8"
).fetchall()

print(f"{'Date':12s} {'Price':>8} {'MA3':>8} {'Signal':>7}")
for r in result:
    print(f"{r[1]} {r[2]:>8.2f} {r[3]:>8.2f} {r[4]:>7}")
conn.close()
🏋️ Practice: ROW_NUMBER and RANK Window Functions
Create a 'sales' table with columns rep (TEXT), month (TEXT), amount (REAL). Insert data for 3 reps across 4 months. Write a query using: 1) RANK() OVER (PARTITION BY month ORDER BY amount DESC) to rank reps each month. 2) SUM(amount) OVER (PARTITION BY rep ORDER BY month) for a running total per rep. 3) LAG(amount) to compute month-over-month change per rep.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, month TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("Alice","Jan",5000),("Bob","Jan",4200),("Carol","Jan",6100),
    ("Alice","Feb",5500),("Bob","Feb",3900),("Carol","Feb",7200),
    ("Alice","Mar",4800),("Bob","Mar",5100),("Carol","Mar",6800),
    ("Alice","Apr",6000),("Bob","Apr",4600),("Carol","Apr",5900),
])
conn.commit()

# TODO: 1. Rank reps within each month by amount DESC
# rows = conn.execute(
#     "SELECT month, rep, amount, "
#     "       RANK() OVER (PARTITION BY month ORDER BY amount DESC) as rnk "
#     "FROM sales ORDER BY month, rnk"
# ).fetchall()
# for r in rows: print(r)

# TODO: 2. Running total of amount per rep, ordered by month
# rows = conn.execute(
#     "SELECT rep, month, amount, "
#     "       SUM(amount) OVER (PARTITION BY rep ORDER BY month "
#     "                         ROWS UNBOUNDED PRECEDING) as running_total "
#     "FROM sales ORDER BY rep, month"
# ).fetchall()
# for r in rows: print(r)

# TODO: 3. Month-over-month change per rep using LAG
# rows = conn.execute(
#     "SELECT rep, month, amount, "
#     "       LAG(amount) OVER (PARTITION BY rep ORDER BY month) as prev_month, "
#     "       amount - LAG(amount) OVER (PARTITION BY rep ORDER BY month) as change "
#     "FROM sales ORDER BY rep, month"
# ).fetchall()
# for r in rows: print(r)

conn.close()
7. UPDATE & DELETE

UPDATE modifies existing rows. DELETE removes rows. Always use WHERE β€” without it you affect the entire table.

UPDATE with conditions
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE inventory (id INT, item TEXT, qty INT, price REAL)")
conn.executemany("INSERT INTO inventory VALUES (?,?,?,?)", [
    (1,"Widget",100,9.99),(2,"Gadget",50,49.99),(3,"Doohickey",200,4.99),
])

# Raise price 10% for low-stock items
conn.execute("UPDATE inventory SET price=ROUND(price*1.1,2) WHERE qty < 100")
# Discontinue very cheap items
conn.execute("DELETE FROM inventory WHERE price < 5")
conn.commit()

print("After update/delete:")
for r in conn.execute("SELECT * FROM inventory").fetchall():
    print(f"  {r[1]:12s} qty={r[2]:4d} price=${r[3]:.2f}")
conn.close()
UPSERT (INSERT OR REPLACE / ON CONFLICT)
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE config ("
    "  key TEXT PRIMARY KEY, value TEXT, updated_at TEXT)"
)

def upsert(key, value, ts):
    conn.execute(
        "INSERT INTO config VALUES (?,?,?) "
        "ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at",
        (key, value, ts)
    )
    conn.commit()

upsert("theme",     "dark",  "2024-01-01")
upsert("page_size", "25",    "2024-01-01")
upsert("theme",     "light", "2024-03-01")   # update existing

for r in conn.execute("SELECT * FROM config").fetchall():
    print(r)
conn.close()
Bulk UPDATE with JOIN-like subquery
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (id INT, name TEXT, category TEXT, price REAL)")
conn.execute("CREATE TABLE discounts (category TEXT, pct REAL)")

conn.executemany("INSERT INTO products VALUES (?,?,?,?)", [
    (1,"Widget","Hardware",10.00),(2,"Gadget","Electronics",50.00),
    (3,"Cable","Electronics",8.00),(4,"Hammer","Hardware",25.00),
    (5,"Phone","Electronics",600.00),
])
conn.executemany("INSERT INTO discounts VALUES (?,?)", [
    ("Electronics",0.10),("Hardware",0.05),
])
conn.commit()

# Apply per-category discount using subquery in SET
conn.execute(
    "UPDATE products SET price = ROUND(price * (1 - "
    "  (SELECT pct FROM discounts d WHERE d.category=products.category)), 2) "
    "WHERE category IN (SELECT category FROM discounts)"
)
conn.commit()

print("After category discounts:")
for r in conn.execute("SELECT name, category, price FROM products ORDER BY category, price").fetchall():
    print(f"  {r[0]:10s} {r[1]:14s} ${r[2]:.2f}")
conn.close()
INSERT OR IGNORE and INSERT OR REPLACE (UPSERT patterns)
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE page_views ("
    "  url TEXT PRIMARY KEY, views INT, last_seen TEXT)"
)

# INSERT OR IGNORE β€” skip if the primary key already exists
initial = [
    ("https://example.com/home",  100, "2024-01-01"),
    ("https://example.com/about",  40, "2024-01-01"),
    ("https://example.com/blog",   75, "2024-01-01"),
]
conn.executemany("INSERT OR IGNORE INTO page_views VALUES (?,?,?)", initial)
# Try to insert a duplicate β€” silently ignored
conn.execute("INSERT OR IGNORE INTO page_views VALUES (?,?,?)",
             ("https://example.com/home", 999, "2024-06-01"))

print("After INSERT OR IGNORE:")
for r in conn.execute("SELECT url, views FROM page_views").fetchall():
    print(f"  {r[0]:40s}  views={r[1]}")

# INSERT OR REPLACE β€” delete + re-insert if conflict (resets the whole row)
conn.execute("INSERT OR REPLACE INTO page_views VALUES (?,?,?)",
             ("https://example.com/home", 250, "2024-06-01"))

print("After INSERT OR REPLACE (home views updated to 250):")
for r in conn.execute("SELECT url, views, last_seen FROM page_views").fetchall():
    print(f"  {r[0]:40s}  views={r[1]}  last_seen={r[2]}")
conn.close()
💼 Real-World: Subscription Lifecycle Automation
A SaaS platform batch-expires overdue subscriptions and applies loyalty discounts to long-term customers.
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE subs ("
    "  id INT, user TEXT, plan TEXT, price REAL,"
    "  start_date TEXT, end_date TEXT, status TEXT)"
)
conn.executemany("INSERT INTO subs VALUES (?,?,?,?,?,?,?)", [
    (1,"Alice","basic",  9.99,"2024-01-01","2024-12-31","active"),
    (2,"Bob",  "pro",   29.99,"2023-01-01","2024-01-15","active"),
    (3,"Carol","basic",  9.99,"2024-01-01","2024-06-30","active"),
    (4,"Dave", "ent",   99.99,"2022-01-01","2024-12-31","active"),
])

today = "2024-07-01"
# Expire overdue subscriptions
conn.execute("UPDATE subs SET status='expired' WHERE end_date < ?", (today,))
# 15% discount for customers active >= 2 years
conn.execute(
    "UPDATE subs SET price=ROUND(price*0.85,2) "
    "WHERE status='active' "
    "AND CAST(strftime('%Y',?) AS INT) - CAST(strftime('%Y',start_date) AS INT) >= 2",
    (today,)
)
conn.commit()

print(f"{'User':8s} {'Plan':6s} {'Price':>8} {'Status':>8}")
for r in conn.execute("SELECT user,plan,price,status FROM subs").fetchall():
    print(f"{r[0]:8s} {r[1]:6s} ${r[2]:>7.2f} {r[3]:>8}")
conn.close()
🏋️ Practice: Transaction with Error Handling
Create an 'accounts' table (id, owner, balance). Insert 3 accounts. Write a transfer function that: 1) Checks the sender has sufficient balance. 2) DECREMENTs the sender's balance and INCREMENTs the receiver's balance inside a transaction. 3) Rolls back if any error occurs. Test both a successful transfer and a failing one (insufficient funds).
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE accounts (id INT PRIMARY KEY, owner TEXT, balance REAL)")
conn.executemany("INSERT INTO accounts VALUES (?,?,?)", [
    (1, "Alice", 1000.0),
    (2, "Bob",    500.0),
    (3, "Carol",  250.0),
])
conn.commit()

def get_balance(conn, account_id):
    row = conn.execute("SELECT balance FROM accounts WHERE id=?", (account_id,)).fetchone()
    return row[0] if row else None

def transfer(conn, from_id, to_id, amount):
    # TODO: Check sender balance
    # balance = get_balance(conn, from_id)
    # if balance is None or balance < amount:
    #     print(f"Transfer failed: insufficient funds (balance={balance})")
    #     return False

    # TODO: Perform transfer in a transaction
    # try:
    #     conn.execute("UPDATE accounts SET balance=balance-? WHERE id=?", (amount, from_id))
    #     conn.execute("UPDATE accounts SET balance=balance+? WHERE id=?", (amount, to_id))
    #     conn.commit()
    #     print(f"Transferred ${amount} from account {from_id} to {to_id}")
    #     return True
    # except Exception as e:
    #     conn.rollback()
    #     print(f"Transfer error: {e}")
    #     return False
    pass

# TODO: Test successful transfer: Alice -> Bob, $200
# transfer(conn, 1, 2, 200)

# TODO: Test failing transfer: Carol -> Alice, $500 (Carol only has $250)
# transfer(conn, 3, 1, 500)

# TODO: Print final balances
# for r in conn.execute("SELECT owner, balance FROM accounts").fetchall():
#     print(f"  {r[0]:8s} ${r[1]:.2f}")

conn.close()
8. Indexes & EXPLAIN

Indexes dramatically speed up queries on large tables. Use EXPLAIN QUERY PLAN to see whether SQLite uses your indexes.

Measure index speedup
import sqlite3, time, random

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE events (id INT, user_id INT, event_type TEXT, ts TEXT)")
rows = [(i, random.randint(1,10000), random.choice(["click","view","buy"]),
         f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}")
        for i in range(200000)]
conn.executemany("INSERT INTO events VALUES (?,?,?,?)", rows)
conn.commit()

t0 = time.perf_counter()
conn.execute("SELECT COUNT(*) FROM events WHERE user_id=42").fetchone()
no_idx = time.perf_counter() - t0

conn.execute("CREATE INDEX idx_user ON events(user_id)")

t0 = time.perf_counter()
conn.execute("SELECT COUNT(*) FROM events WHERE user_id=42").fetchone()
with_idx = time.perf_counter() - t0

print(f"No index:   {no_idx*1000:.2f}ms")
print(f"With index: {with_idx*1000:.2f}ms")
print(f"Speedup:    {no_idx/max(with_idx,1e-9):.0f}x")
conn.close()
EXPLAIN QUERY PLAN
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, status TEXT, amount REAL)")
conn.execute("CREATE INDEX idx_status ON orders(status)")

plan = conn.execute(
    "EXPLAIN QUERY PLAN "
    "SELECT cust, SUM(amount) FROM orders "
    "WHERE status='pending' GROUP BY cust"
).fetchall()

print("Query plan:")
for row in plan: print(" ", row)
conn.close()
Partial index and UNIQUE index
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE users ("
    "  id INT PRIMARY KEY, email TEXT UNIQUE, role TEXT, active INT)"
)
conn.executemany("INSERT INTO users VALUES (?,?,?,?)", [
    (1,"alice@x.com","admin",1),(2,"bob@x.com","user",1),
    (3,"carol@x.com","user",0),(4,"dave@x.com","mod",1),
])

# UNIQUE index is implicit on email β€” test enforcement
try:
    conn.execute("INSERT INTO users VALUES (5,'alice@x.com','user',1)")
except Exception as e:
    print("UNIQUE violation caught:", e)

# Partial-like index: index only active users
conn.execute("CREATE INDEX idx_active_role ON users(role) WHERE active=1")

# Verify index is used
plan = conn.execute(
    "EXPLAIN QUERY PLAN SELECT id FROM users WHERE role='admin' AND active=1"
).fetchall()
print("Query plan (with partial index):")
for row in plan: print(" ", row)

# Count using the index
n = conn.execute("SELECT COUNT(*) FROM users WHERE active=1").fetchone()[0]
print(f"Active users: {n}")
conn.close()
Covering index and EXPLAIN QUERY PLAN comparison
import sqlite3, time, random

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE orders ("
    "  id INT, cust_id INT, status TEXT, amount REAL, created TEXT)"
)
rows = [
    (i, random.randint(1, 500), random.choice(["pending","shipped","done"]),
     round(random.uniform(10, 1000), 2),
     f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}")
    for i in range(100000)
]
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", rows)
conn.commit()

# Query that only needs (status, amount) β€” a covering index can satisfy it entirely
query = (
    "SELECT status, SUM(amount) as total "
    "FROM orders WHERE status='pending' GROUP BY status"
)

# Without covering index
plan_before = conn.execute("EXPLAIN QUERY PLAN " + query).fetchall()
t0 = time.perf_counter()
conn.execute(query).fetchall()
t_before = time.perf_counter() - t0

# Create a covering index β€” includes all columns touched by the query
conn.execute("CREATE INDEX idx_cover ON orders(status, amount)")

plan_after = conn.execute("EXPLAIN QUERY PLAN " + query).fetchall()
t0 = time.perf_counter()
conn.execute(query).fetchall()
t_after = time.perf_counter() - t0

print("Plan WITHOUT covering index:", plan_before[0][-1] if plan_before else "N/A")
print("Plan WITH covering index:   ", plan_after[0][-1]  if plan_after  else "N/A")
print(f"Time before: {t_before*1000:.2f}ms  after: {t_after*1000:.2f}ms")
conn.close()
💼 Real-World: E-Commerce Report with Compound Indexes
A backend engineer adds compound indexes to cut a cross-table analytics report from seconds to milliseconds.
import sqlite3, time, random, datetime

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE orders ("
    "  id INT, cust_id INT, product_id INT,"
    "  status TEXT, amount REAL, created TEXT)"
)
conn.execute("CREATE TABLE products (id INT PRIMARY KEY, name TEXT, category TEXT)")

conn.executemany("INSERT INTO products VALUES (?,?,?)", [
    (i, f"Prod{i}", random.choice(["Electronics","Food","Clothing","Hardware"]))
    for i in range(1, 201)
])
rows = [(i, random.randint(1,5000), random.randint(1,200),
         random.choice(["pending","shipped","delivered"]),
         round(random.uniform(10,1000),2),
         (datetime.date(2024,1,1)+datetime.timedelta(days=random.randint(0,90))).isoformat())
        for i in range(1,50001)]
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?,?)", rows)

for idx in [
    "CREATE INDEX idx_status ON orders(status, created)",
    "CREATE INDEX idx_prod   ON orders(product_id)",
]:
    conn.execute(idx)

t0 = time.perf_counter()
result = conn.execute(
    "SELECT p.category, o.status, COUNT(*) as cnt, ROUND(SUM(o.amount),2) as rev "
    "FROM orders o JOIN products p ON o.product_id=p.id "
    "WHERE o.status != 'pending' "
    "GROUP BY p.category, o.status ORDER BY rev DESC LIMIT 8"
).fetchall()
print(f"Query: {(time.perf_counter()-t0)*1000:.1f}ms  ({len(result)} rows)")
for r in result[:4]:
    print(f"  {r[0]:14s} | {r[1]:10s} | {r[2]:>5d} | ${r[3]:>10,.2f}")
conn.close()
🏋️ Practice: Create Appropriate Indexes
Create a 'log_events' table with 100,000 rows: columns id, user_id (1-1000), event_type ('login','logout','purchase','view'), created_at (date string). 1) Measure query time for SELECT WHERE user_id=42 without an index. 2) Create a single-column index on user_id and re-measure. 3) Create a compound index on (event_type, created_at) and verify with EXPLAIN QUERY PLAN.
Starter Code
import sqlite3, random, time, datetime

conn = sqlite3.connect(":memory:")
conn.execute(
    "CREATE TABLE log_events (id INT, user_id INT, event_type TEXT, created_at TEXT)"
)

base = datetime.date(2024, 1, 1)
events = ["login", "logout", "purchase", "view"]
rows = [
    (i, random.randint(1, 1000), random.choice(events),
     (base + datetime.timedelta(days=random.randint(0, 180))).isoformat())
    for i in range(1, 100001)
]
conn.executemany("INSERT INTO log_events VALUES (?,?,?,?)", rows)
conn.commit()

# TODO: 1. Time query WITHOUT index
# t0 = time.perf_counter()
# conn.execute("SELECT COUNT(*) FROM log_events WHERE user_id=42").fetchone()
# print(f"No index: {(time.perf_counter()-t0)*1000:.2f}ms")

# TODO: 2. Create index on user_id and re-time
# conn.execute("CREATE INDEX idx_user_id ON log_events(user_id)")
# t0 = time.perf_counter()
# conn.execute("SELECT COUNT(*) FROM log_events WHERE user_id=42").fetchone()
# print(f"With index: {(time.perf_counter()-t0)*1000:.2f}ms")

# TODO: 3. Compound index on (event_type, created_at)
# conn.execute("CREATE INDEX idx_event_date ON log_events(event_type, created_at)")
# plan = conn.execute(
#     "EXPLAIN QUERY PLAN SELECT * FROM log_events "
#     "WHERE event_type='purchase' AND created_at >= '2024-06-01'"
# ).fetchall()
# print("Query plan:", plan)

conn.close()
9. SQLite with Pandas

Pandas integrates directly with SQLite: read query results into DataFrames with pd.read_sql(), write DataFrames back with .to_sql().

read_sql and to_sql
import sqlite3
import pandas as pd

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (date TEXT, region TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("2024-01-01","North",1200),("2024-01-01","South",800),
    ("2024-01-02","North",950),("2024-01-02","East",1100),
])

# SQL β†’ DataFrame
df = pd.read_sql("SELECT * FROM sales", conn)
print(df)
print(df.groupby("region")["amount"].sum())
conn.close()
Write DataFrame to SQL and query back
import sqlite3
import pandas as pd
import numpy as np

conn = sqlite3.connect(":memory:")
df = pd.DataFrame({
    "date":     pd.date_range("2024-01-01", periods=30).astype(str),
    "value":    np.random.randn(30).cumsum() + 100,
    "category": np.random.choice(["A","B","C"], 30),
})
df.to_sql("ts", conn, index=False, if_exists="replace")

result = pd.read_sql(
    "SELECT category, COUNT(*) as n, ROUND(AVG(value),2) as avg_val "
    "FROM ts GROUP BY category ORDER BY avg_val DESC",
    conn
)
print(result)
conn.close()
Parameterized queries and chunksize
import sqlite3
import pandas as pd
import numpy as np

conn = sqlite3.connect(":memory:")
np.random.seed(7)

# Build a large DataFrame and write in chunks
df = pd.DataFrame({
    "user_id":  np.random.randint(1, 201, 5000),
    "product":  np.random.choice(["A","B","C","D"], 5000),
    "amount":   np.round(np.random.uniform(5, 500, 5000), 2),
    "month":    np.random.choice(["Jan","Feb","Mar","Apr"], 5000),
})
df.to_sql("txn", conn, index=False, if_exists="replace", chunksize=1000)

# Parameterized SQL via pd.read_sql with params argument
month_filter = "Mar"
result = pd.read_sql(
    "SELECT product, COUNT(*) as sales, ROUND(SUM(amount),2) as revenue "
    "FROM txn WHERE month=? GROUP BY product ORDER BY revenue DESC",
    conn, params=(month_filter,)
)
print(f"Sales in {month_filter}:")
print(result.to_string(index=False))
conn.close()
to_sql if_exists modes and dtype mapping
import sqlite3
import pandas as pd
import numpy as np

conn = sqlite3.connect(":memory:")
np.random.seed(3)

base_df = pd.DataFrame({
    "id":     range(1, 11),
    "name":   [f"Product_{i}" for i in range(1, 11)],
    "price":  np.round(np.random.uniform(5, 200, 10), 2),
    "stock":  np.random.randint(0, 500, 10),
})

# First write: create the table
base_df.to_sql("products", conn, index=False, if_exists="replace")
print(f"Initial rows: {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")

# Append new rows with if_exists='append'
new_rows = pd.DataFrame({
    "id": [11, 12], "name": ["Product_11","Product_12"],
    "price": [19.99, 34.99], "stock": [100, 50],
})
new_rows.to_sql("products", conn, index=False, if_exists="append")
print(f"After append:  {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")

# Replace entirely with if_exists='replace'
replacement = base_df.head(3).copy()
replacement.to_sql("products", conn, index=False, if_exists="replace")
print(f"After replace: {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")

# Read back with column type check
df_back = pd.read_sql("SELECT * FROM products", conn)
print(df_back.dtypes.to_string())
conn.close()
💼 Real-World: Multi-Source Analytics Report
A data analyst joins two normalized SQL tables via pandas to produce a segmented revenue report for stakeholders.
import sqlite3
import pandas as pd
import numpy as np

conn = sqlite3.connect(":memory:")

customers = pd.DataFrame({
    "id":      range(1, 101),
    "segment": np.random.choice(["retail","wholesale","online"], 100),
    "region":  np.random.choice(["North","South","East","West"], 100),
})
transactions = pd.DataFrame({
    "customer_id": np.random.randint(1, 101, 2000),
    "product":     np.random.choice(["A","B","C","D"], 2000),
    "amount":      np.random.uniform(10, 500, 2000).round(2),
})

customers.to_sql("customers", conn, index=False)
transactions.to_sql("transactions", conn, index=False)

report = pd.read_sql(
    "SELECT c.segment, c.region, "
    "       COUNT(DISTINCT t.customer_id) as customers, "
    "       ROUND(SUM(t.amount),2) as revenue, "
    "       ROUND(AVG(t.amount),2) as avg_order "
    "FROM transactions t "
    "JOIN customers c ON t.customer_id=c.id "
    "GROUP BY c.segment, c.region "
    "ORDER BY revenue DESC LIMIT 8",
    conn
)
print(report.to_string(index=False))
conn.close()
🏋️ Practice: Create a View for a Report Query
Create a 'transactions' table (id, customer, category, amount, txn_date) and populate it. Then: 1) CREATE VIEW monthly_summary AS a query grouping by month and category with totals. 2) SELECT from the view to display the report. 3) Use pd.read_sql() to load the view result into a DataFrame and show the top 3 rows by revenue.
Starter Code
import sqlite3
import pandas as pd
import numpy as np

conn = sqlite3.connect(":memory:")
np.random.seed(42)

conn.execute(
    "CREATE TABLE transactions "
    "(id INT, customer TEXT, category TEXT, amount REAL, txn_date TEXT)"
)
import datetime, random
base = datetime.date(2024, 1, 1)
cats = ["Food", "Electronics", "Clothing", "Sports"]
customers = ["Alice","Bob","Carol","Dave","Eve"]
rows = [
    (i, random.choice(customers), random.choice(cats),
     round(random.uniform(10, 300), 2),
     (base + datetime.timedelta(days=random.randint(0, 89))).isoformat())
    for i in range(1, 201)
]
conn.executemany("INSERT INTO transactions VALUES (?,?,?,?,?)", rows)
conn.commit()

# TODO: 1. Create a VIEW named 'monthly_summary'
# conn.execute(
#     "CREATE VIEW monthly_summary AS "
#     "SELECT strftime('%Y-%m', txn_date) as month, "
#     "       category, COUNT(*) as txns, ROUND(SUM(amount),2) as revenue "
#     "FROM transactions GROUP BY month, category"
# )

# TODO: 2. Query the view directly
# for r in conn.execute("SELECT * FROM monthly_summary ORDER BY revenue DESC LIMIT 5").fetchall():
#     print(r)

# TODO: 3. Load into DataFrame and show top 3 by revenue
# df = pd.read_sql("SELECT * FROM monthly_summary ORDER BY revenue DESC", conn)
# print(df.head(3).to_string(index=False))

conn.close()
10. Recursive CTEs & Advanced Patterns

Recursive CTEs traverse hierarchical data (org charts, tree structures). CASE expressions and COALESCE handle conditional logic elegantly.

Recursive CTE β€” org hierarchy
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, mgr_id INT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
    (1,"CEO",None,300000),(2,"CTO",1,200000),(3,"CFO",1,190000),
    (4,"Dev Lead",2,130000),(5,"Dev1",4,100000),(6,"Dev2",4,95000),
    (7,"Finance1",3,80000),
])

rows = conn.execute(
    "WITH RECURSIVE org AS ("
    "  SELECT id, name, mgr_id, 0 AS depth FROM emp WHERE mgr_id IS NULL "
    "  UNION ALL "
    "  SELECT e.id, e.name, e.mgr_id, o.depth+1 "
    "  FROM emp e JOIN org o ON e.mgr_id=o.id"
    ") "
    "SELECT depth, name FROM org ORDER BY depth, name"
).fetchall()
for d, name in rows:
    print("  " * d + name)
conn.close()
CASE expression and COALESCE
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, amount REAL, status TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?)", [
    (1,500,"delivered"),(2,120,"pending"),(3,None,"cancelled"),
    (4,800,"delivered"),(5,200,"shipped"),(6,None,"pending"),
])

rows = conn.execute(
    "SELECT id,"
    "       COALESCE(amount, 0) as safe_amount,"
    "       CASE "
    "         WHEN amount > 500 THEN 'high'"
    "         WHEN amount > 100 THEN 'medium'"
    "         WHEN amount IS NULL THEN 'unknown'"
    "         ELSE 'low'"
    "       END as tier,"
    "       status "
    "FROM orders ORDER BY id"
).fetchall()
for r in rows: print(r)
conn.close()
Fibonacci sequence via recursive CTE
import sqlite3

conn = sqlite3.connect(":memory:")

# Generate Fibonacci numbers using a recursive CTE
rows = conn.execute(
    "WITH RECURSIVE fib(n, a, b) AS ("
    "  SELECT 1, 0, 1 "
    "  UNION ALL "
    "  SELECT n+1, b, a+b FROM fib WHERE n < 15"
    ") "
    "SELECT n, a as fib_value FROM fib"
).fetchall()
print("Fibonacci sequence:")
for n, v in rows:
    print(f"  F({n:2d}) = {v}")

# Generate a date series using recursive CTE
rows2 = conn.execute(
    "WITH RECURSIVE dates(d) AS ("
    "  SELECT '2024-01-01' "
    "  UNION ALL "
    "  SELECT date(d, '+1 day') FROM dates WHERE d < '2024-01-07'"
    ") "
    "SELECT d, strftime('%A', d) as weekday FROM dates"
).fetchall()
print("Week of 2024-01-01:")
for d, wd in rows2:
    print(f"  {d}  {wd}")
conn.close()
Updatable View & View with Parameters via CTE
import sqlite3

conn = sqlite3.connect(":memory:")
c = conn.cursor()
c.executescript(
    "CREATE TABLE employees ("
    "    id INTEGER PRIMARY KEY, name TEXT, dept TEXT, salary REAL, hire_date TEXT"
    ");"
    "INSERT INTO employees VALUES"
    "    (1,'Alice','Engineering',95000,'2020-03-15'),"
    "    (2,'Bob','Marketing',72000,'2019-07-01'),"
    "    (3,'Carol','Engineering',105000,'2018-11-20'),"
    "    (4,'Dave','HR',65000,'2021-02-28'),"
    "    (5,'Eve','Marketing',78000,'2020-09-10'),"
    "    (6,'Frank','Engineering',88000,'2022-01-05');"
)

# View: department salary summary
c.execute(
    "CREATE VIEW dept_summary AS "
    "SELECT dept, COUNT(*) AS headcount, "
    "       ROUND(AVG(salary),2) AS avg_salary, "
    "       MAX(salary) AS max_salary, MIN(salary) AS min_salary "
    "FROM employees GROUP BY dept"
)

# View: employees with tenure
c.execute(
    "CREATE VIEW emp_tenure AS "
    "SELECT name, dept, salary, "
    "       ROUND((julianday('now') - julianday(hire_date)) / 365.25, 1) AS years_tenure "
    "FROM employees"
)

# Query the views
print("Department summary:")
c.execute("SELECT * FROM dept_summary ORDER BY avg_salary DESC")
for row in c.fetchall():
    print(f"  {row[0]:12s} headcount={row[1]}, avg=${row[2]:,.0f}, max=${row[3]:,.0f}")

print("\nEmployee tenure:")
c.execute("SELECT * FROM emp_tenure WHERE years_tenure > 3 ORDER BY years_tenure DESC")
for row in c.fetchall():
    print(f"  {row[0]:6s} ({row[1]}) - {row[3]} years, ${row[2]:,.0f}")

# Drop view
c.execute("DROP VIEW emp_tenure")
print("\nViews remaining:", [r[0] for r in c.execute("SELECT name FROM sqlite_master WHERE type='view'")])
conn.close()
💼 Real-World: Product Category Breadcrumb Builder
An e-commerce search indexer uses a recursive CTE to build full breadcrumb paths for every product category.
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE categories (id INT PRIMARY KEY, name TEXT, parent_id INT)")
conn.executemany("INSERT INTO categories VALUES (?,?,?)", [
    (1,"All",None),(2,"Electronics",1),(3,"Computers",2),
    (4,"Laptops",3),(5,"Gaming Laptops",4),(6,"Ultrabooks",4),
    (7,"Phones",2),(8,"Android",7),(9,"Clothing",1),(10,"Mens",9),
])

rows = conn.execute(
    "WITH RECURSIVE tree AS ("
    "  SELECT id, name, parent_id, CAST(name AS TEXT) as path, 0 as depth "
    "  FROM categories WHERE parent_id IS NULL "
    "  UNION ALL "
    "  SELECT c.id, c.name, c.parent_id, t.path||' > '||c.name, t.depth+1 "
    "  FROM categories c JOIN tree t ON c.parent_id=t.id"
    ") "
    "SELECT id, depth, path FROM tree ORDER BY path"
).fetchall()
for r in rows:
    print(f"  {'  '*r[1]}[{r[0]}] {r[2]}")
conn.close()
🏋️ Practice: Write a WITH Clause (CTE) Query
Create a 'sales' table (rep, quarter, amount). Insert data for 3 reps over 4 quarters. Write a CTE query that: 1) Computes total sales per rep (CTE: rep_totals). 2) Computes the company-wide average rep total (CTE: company_avg). 3) Final SELECT shows each rep's total and whether they are 'above_avg', 'average', or 'below_avg' using CASE.
Starter Code
import sqlite3

conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, quarter TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
    ("Alice","Q1",12000),("Alice","Q2",15000),("Alice","Q3",11000),("Alice","Q4",18000),
    ("Bob","Q1",9000),("Bob","Q2",10500),("Bob","Q3",12000),("Bob","Q4",8500),
    ("Carol","Q1",20000),("Carol","Q2",18500),("Carol","Q3",22000),("Carol","Q4",19000),
])
conn.commit()

# TODO: Write a CTE query
# rows = conn.execute(
#     "WITH rep_totals AS ( "
#     "  SELECT rep, SUM(amount) as total FROM sales GROUP BY rep "
#     "), "
#     "company_avg AS ( "
#     "  SELECT AVG(total) as avg_total FROM rep_totals "
#     ") "
#     "SELECT r.rep, r.total, "
#     "       CASE "
#     "         WHEN r.total > c.avg_total * 1.1 THEN 'above_avg' "
#     "         WHEN r.total < c.avg_total * 0.9 THEN 'below_avg' "
#     "         ELSE 'average' "
#     "       END as performance "
#     "FROM rep_totals r, company_avg c "
#     "ORDER BY r.total DESC"
# ).fetchall()
# for r in rows:
#     print(f"  {r[0]:8s}  total=${r[1]:,}  performance={r[2]}")

conn.close()
11. Common Table Expressions (CTEs)

Write readable, modular SQL with WITH clauses. Break complex queries into named steps and use recursive CTEs for hierarchical data.

Basic CTE to simplify a complex query
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE orders (order_id INT, customer_id INT, amount REAL, order_date TEXT);
INSERT INTO orders VALUES
  (1,101,250,'2024-01-05'),(2,102,80,'2024-01-10'),(3,101,120,'2024-01-15'),
  (4,103,400,'2024-02-01'),(5,102,60,'2024-02-10'),(6,101,90,'2024-02-20'),
  (7,104,300,'2024-03-01'),(8,103,150,'2024-03-05'),(9,102,200,'2024-03-15');
""")
query = """
WITH customer_totals AS (
    SELECT customer_id,
           SUM(amount) AS total_spend,
           COUNT(*) AS order_count,
           AVG(amount) AS avg_order
    FROM orders
    GROUP BY customer_id
),
high_value AS (
    SELECT * FROM customer_totals WHERE total_spend > 300
)
SELECT customer_id,
       ROUND(total_spend, 2) AS total_spend,
       order_count,
       ROUND(avg_order, 2) AS avg_order
FROM high_value
ORDER BY total_spend DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()
Multiple chained CTEs
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (sale_id INT, product TEXT, region TEXT, amount REAL, sale_date TEXT);
INSERT INTO sales VALUES
  (1,'Laptop','East',1200,'2024-01-10'),(2,'Mouse','West',30,'2024-01-15'),
  (3,'Laptop','West',1100,'2024-01-20'),(4,'Monitor','East',400,'2024-02-05'),
  (5,'Mouse','East',25,'2024-02-10'),(6,'Monitor','West',380,'2024-02-20'),
  (7,'Laptop','East',1300,'2024-03-01'),(8,'Keyboard','East',80,'2024-03-10');
""")

query = """
WITH regional_totals AS (
    SELECT region, SUM(amount) AS regional_total FROM sales GROUP BY region
),
product_totals AS (
    SELECT product, SUM(amount) AS product_total FROM sales GROUP BY product
),
combined AS (
    SELECT s.product, s.region, s.amount,
           r.regional_total, p.product_total,
           ROUND(s.amount * 100.0 / r.regional_total, 1) AS pct_of_region
    FROM sales s
    JOIN regional_totals r ON s.region = r.region
    JOIN product_totals p ON s.product = p.product
)
SELECT product, region, amount, pct_of_region
FROM combined
ORDER BY region, pct_of_region DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()
Recursive CTE for hierarchical org chart
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE employees (id INT, name TEXT, manager_id INT);
INSERT INTO employees VALUES
  (1,'Alice',NULL),(2,'Bob',1),(3,'Carol',1),
  (4,'Dave',2),(5,'Eve',2),(6,'Frank',3),(7,'Grace',3),(8,'Hank',4);
""")

query = """
WITH RECURSIVE org_chart AS (
    -- Base case: root (CEO)
    SELECT id, name, manager_id, 0 AS depth, name AS path
    FROM employees WHERE manager_id IS NULL
    UNION ALL
    -- Recursive step: employees who report to current level
    SELECT e.id, e.name, e.manager_id, oc.depth + 1,
           oc.path || ' -> ' || e.name
    FROM employees e
    JOIN org_chart oc ON e.manager_id = oc.id
)
SELECT id, depth, name, path FROM org_chart ORDER BY path;
"""
df = pd.read_sql(query, conn)
for _, row in df.iterrows():
    print('  ' * row['depth'] + f"[{row['id']}] {row['name']}")
conn.close()
CTE vs subquery β€” readability comparison
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (product TEXT, amount REAL);
INSERT INTO sales VALUES
  ('A',100),('A',200),('B',50),('B',300),('C',150),('C',250),('C',100);
""")

# Subquery version (harder to read)
q_sub = """
SELECT product, avg_amount
FROM (
    SELECT product, AVG(amount) AS avg_amount
    FROM sales GROUP BY product
) WHERE avg_amount > 100;
"""

# CTE version (same result, easier to read)
q_cte = """
WITH product_avgs AS (
    SELECT product, AVG(amount) AS avg_amount
    FROM sales GROUP BY product
)
SELECT product, ROUND(avg_amount,2) AS avg_amount
FROM product_avgs
WHERE avg_amount > 100;
"""

print('Subquery result:'); print(pd.read_sql(q_sub, conn).to_string(index=False))
print('CTE result:');      print(pd.read_sql(q_cte, conn).to_string(index=False))
conn.close()
🏋️ Practice: Sales Pipeline CTE
Using CTEs, write a query that: (1) computes monthly revenue, (2) adds a 3-month rolling average using LAG, (3) flags months where revenue fell below the rolling average.
Starter Code
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.execute('CREATE TABLE orders (order_date TEXT, amount REAL)')
import random; random.seed(42)
rows = [(f'2023-{m:02d}-15', random.uniform(10000, 50000)) for m in range(1,13)]
conn.executemany('INSERT INTO orders VALUES (?,?)', rows)
conn.commit()

query = """
-- TODO: CTE 1: monthly_revenue - SUM(amount) GROUP BY month
-- TODO: CTE 2: with_prev - add prev1, prev2 using LAG()
-- TODO: Final: add rolling_avg = (revenue+prev1+prev2)/3
--              add below_avg = CASE WHEN revenue < rolling_avg THEN 1 ELSE 0 END
"""
# df = pd.read_sql(query, conn)
# print(df.to_string(index=False))
conn.close()
✅ Practice Checklist
12. Window Functions

Perform calculations across related rows without collapsing them β€” rankings, running totals, lag/lead comparisons, and moving averages.

ROW_NUMBER, RANK, and DENSE_RANK
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE scores (player TEXT, region TEXT, score INT);
INSERT INTO scores VALUES
  ('Alice','East',95),('Bob','East',88),('Carol','East',95),
  ('Dave','West',78),('Eve','West',91),('Frank','West',85),
  ('Grace','North',74),('Hank','North',80),('Ivy','North',80);
""")

query = """
SELECT player, region, score,
       ROW_NUMBER() OVER (PARTITION BY region ORDER BY score DESC) AS row_num,
       RANK()       OVER (PARTITION BY region ORDER BY score DESC) AS rank,
       DENSE_RANK() OVER (PARTITION BY region ORDER BY score DESC) AS dense_rank
FROM scores
ORDER BY region, score DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()
Running totals and cumulative sums
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE daily_sales (sale_date TEXT, revenue REAL);
INSERT INTO daily_sales VALUES
  ('2024-01-01',1200),('2024-01-02',800),('2024-01-03',1500),
  ('2024-01-04',600),('2024-01-05',2000),('2024-01-06',900),
  ('2024-01-07',1100);
""")

query = """
SELECT sale_date, revenue,
       SUM(revenue) OVER (ORDER BY sale_date
                          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total,
       AVG(revenue) OVER (ORDER BY sale_date
                          ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg_3d,
       SUM(revenue) OVER () AS grand_total,
       ROUND(revenue * 100.0 / SUM(revenue) OVER (), 1) AS pct_of_total
FROM daily_sales ORDER BY sale_date;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()
LAG and LEAD for period-over-period comparison
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE monthly_revenue (month TEXT, category TEXT, revenue REAL);
INSERT INTO monthly_revenue VALUES
  ('2024-01','Electronics',50000),('2024-02','Electronics',55000),
  ('2024-03','Electronics',48000),('2024-04','Electronics',62000),
  ('2024-01','Clothing',30000),('2024-02','Clothing',28000),
  ('2024-03','Clothing',35000),('2024-04','Clothing',32000);
""")

query = """
SELECT month, category, revenue,
       LAG(revenue)  OVER (PARTITION BY category ORDER BY month) AS prev_month,
       LEAD(revenue) OVER (PARTITION BY category ORDER BY month) AS next_month,
       ROUND((revenue - LAG(revenue) OVER (PARTITION BY category ORDER BY month))
             * 100.0 / LAG(revenue) OVER (PARTITION BY category ORDER BY month), 1) AS mom_pct
FROM monthly_revenue
ORDER BY category, month;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()
NTILE and percentile buckets
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE customers (name TEXT, lifetime_value REAL);
INSERT INTO customers VALUES
  ('Alice',1200),('Bob',450),('Carol',3400),('Dave',890),('Eve',2100),
  ('Frank',200),('Grace',4500),('Hank',700),('Ivy',1800),('Jake',350),
  ('Kim',2800),('Leo',600),('Mia',950),('Ned',1100),('Ora',3100);
""")

query = """
SELECT name, lifetime_value,
       NTILE(4)  OVER (ORDER BY lifetime_value)       AS quartile,
       NTILE(10) OVER (ORDER BY lifetime_value)       AS decile,
       RANK()    OVER (ORDER BY lifetime_value DESC)  AS rank
FROM customers
ORDER BY lifetime_value DESC;
"""
df = pd.read_sql(query, conn)
print(df.to_string(index=False))
print('\nQuartile summary:')
print(df.groupby('quartile')['lifetime_value'].agg(['min','max','mean']).round(0))
conn.close()
🏋️ Practice: Sales Leaderboard
Rank salespeople by total revenue within each region using DENSE_RANK. Show their revenue vs the region average and flag the top performer per region with 'CHAMPION'.
Starter Code
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (salesperson TEXT, region TEXT, revenue REAL);
INSERT INTO sales VALUES
  ('Alice','East',95000),('Bob','East',88000),('Carol','East',102000),
  ('Dave','West',78000),('Eve','West',91000),('Frank','West',85000),
  ('Grace','North',67000),('Hank','North',74000);
""")
query = """
-- TODO: DENSE_RANK within region by revenue desc
-- TODO: AVG(revenue) OVER region
-- TODO: revenue - avg as diff_from_avg
-- TODO: CASE WHEN rank=1 THEN 'CHAMPION' ELSE '' END
"""
# df = pd.read_sql(query, conn)
# print(df.to_string(index=False))
conn.close()
✅ Practice Checklist
13. Query Optimization

Write faster SQL β€” understand EXPLAIN QUERY PLAN, use indexes effectively, and rewrite slow patterns as efficient alternatives.

Creating and using indexes
import sqlite3, time

conn = sqlite3.connect(':memory:')
import random; random.seed(42)
n = 100000
conn.execute('CREATE TABLE orders (order_id INT, customer_id INT, amount REAL, status TEXT)')
conn.executemany('INSERT INTO orders VALUES (?,?,?,?)',
    [(i, random.randint(1,1000), random.uniform(10,500), random.choice(['pending','shipped','done']))
     for i in range(n)])
conn.commit()

# Query WITHOUT index
t0 = time.perf_counter()
conn.execute('SELECT * FROM orders WHERE customer_id = 42').fetchall()
t1 = time.perf_counter()
print(f'Without index: {(t1-t0)*1000:.2f} ms')

# Create index
conn.execute('CREATE INDEX idx_customer ON orders(customer_id)')

t0 = time.perf_counter()
conn.execute('SELECT * FROM orders WHERE customer_id = 42').fetchall()
t1 = time.perf_counter()
print(f'With index:    {(t1-t0)*1000:.2f} ms')

# Composite index for common filter+sort
conn.execute('CREATE INDEX idx_status_amount ON orders(status, amount DESC)')
conn.close()
EXPLAIN QUERY PLAN to inspect execution
import sqlite3

conn = sqlite3.connect(':memory:')
import random; random.seed(0)
conn.execute('CREATE TABLE sales (id INT, region TEXT, amount REAL)')
conn.executemany('INSERT INTO sales VALUES (?,?,?)',
    [(i, random.choice(['East','West','North']), random.uniform(10,1000)) for i in range(10000)])
conn.commit()

def explain(conn, query, label):
    plan = conn.execute(f'EXPLAIN QUERY PLAN {query}').fetchall()
    print(f'\n--- {label} ---')
    for row in plan:
        print(' ', row)

explain(conn, 'SELECT * FROM sales WHERE region = "East"', 'No index')
conn.execute('CREATE INDEX idx_region ON sales(region)')
explain(conn, 'SELECT * FROM sales WHERE region = "East"', 'With index')
explain(conn, 'SELECT region, SUM(amount) FROM sales GROUP BY region', 'Aggregation')
conn.close()
EXISTS vs IN vs JOIN performance
import sqlite3, time

conn = sqlite3.connect(':memory:')
import random; random.seed(42)
conn.execute('CREATE TABLE customers (id INT PRIMARY KEY, name TEXT)')
conn.execute('CREATE TABLE orders (id INT, customer_id INT, amount REAL)')
conn.executemany('INSERT INTO customers VALUES (?,?)', [(i,f'C{i}') for i in range(1000)])
conn.executemany('INSERT INTO orders VALUES (?,?,?)',
    [(i, random.randint(1,1000), random.uniform(10,500)) for i in range(50000)])
conn.commit()
conn.execute('CREATE INDEX idx_oid ON orders(customer_id)')

queries = {
    'IN subquery': 'SELECT * FROM customers WHERE id IN (SELECT DISTINCT customer_id FROM orders WHERE amount > 400)',
    'EXISTS':      'SELECT * FROM customers c WHERE EXISTS (SELECT 1 FROM orders o WHERE o.customer_id=c.id AND o.amount>400)',
    'JOIN':        'SELECT DISTINCT c.* FROM customers c JOIN orders o ON c.id=o.customer_id WHERE o.amount>400',
}
for label, q in queries.items():
    t0 = time.perf_counter()
    rows = conn.execute(q).fetchall()
    print(f'{label:15s}: {(time.perf_counter()-t0)*1000:.2f} ms, {len(rows)} rows')
conn.close()
Avoiding N+1 queries with JOIN
import sqlite3, time

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE authors (id INT, name TEXT);
CREATE TABLE books (id INT, title TEXT, author_id INT, pages INT);
""")
import random; random.seed(0)
authors = [(i, f'Author {i}') for i in range(50)]
books   = [(i, f'Book {i}', random.randint(0,49), random.randint(100,600)) for i in range(500)]
conn.executemany('INSERT INTO authors VALUES (?,?)', authors)
conn.executemany('INSERT INTO books VALUES (?,?,?,?)', books)
conn.commit()

# N+1 pattern (BAD)
t0 = time.perf_counter()
all_books = conn.execute('SELECT * FROM books').fetchall()
results = []
for b in all_books:
    auth = conn.execute('SELECT name FROM authors WHERE id=?', (b[2],)).fetchone()
    results.append((b[1], auth[0] if auth else 'Unknown'))
print(f'N+1 queries: {(time.perf_counter()-t0)*1000:.2f} ms ({len(results)} results)')

# Single JOIN (GOOD)
t0 = time.perf_counter()
results2 = conn.execute('SELECT b.title, a.name FROM books b LEFT JOIN authors a ON b.author_id=a.id').fetchall()
print(f'Single JOIN: {(time.perf_counter()-t0)*1000:.2f} ms ({len(results2)} results)')
conn.close()
🏋️ Practice: Index Advisor
Create a 100K-row orders table. Time a filter query on customer_id WITHOUT an index, add the index, re-run, and print the speedup factor. Also run EXPLAIN QUERY PLAN on both.
Starter Code
import sqlite3, time

conn = sqlite3.connect(':memory:')
conn.execute('CREATE TABLE orders (id INT, customer_id INT, amount REAL, status TEXT)')
import random; random.seed(42)
rows = [(i, random.randint(1,1000), random.uniform(10,500), random.choice(['pending','done'])) for i in range(100000)]
conn.executemany('INSERT INTO orders VALUES (?,?,?,?)', rows)
conn.commit()

q = 'SELECT * FROM orders WHERE customer_id = 42'
# TODO: time query without index
# TODO: EXPLAIN QUERY PLAN before index
# TODO: CREATE INDEX idx_cust ON orders(customer_id)
# TODO: time query with index
# TODO: EXPLAIN QUERY PLAN after index
# TODO: print speedup factor
conn.close()
✅ Practice Checklist
14. Query Optimization & Indexing

EXPLAIN QUERY PLAN
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE orders (id INTEGER PRIMARY KEY, customer_id INTEGER,
                         amount REAL, status TEXT, order_date TEXT);
    INSERT INTO orders SELECT value, (value%100)+1, RANDOM()*1000,
        CASE value%3 WHEN 0 THEN 'pending' WHEN 1 THEN 'completed' ELSE 'cancelled' END,
        date('2024-01-01', '+'||(value%365)||' days')
    FROM generate_series(1, 10000);
''')
plan = conn.execute('''
    EXPLAIN QUERY PLAN
    SELECT customer_id, COUNT(*), AVG(amount)
    FROM orders WHERE status='completed'
    GROUP BY customer_id ORDER BY AVG(amount) DESC
''').fetchall()
print('Query plan (no index):')
for row in plan: print(' ', row)
conn.close()
Index speedup measurement
import sqlite3, time

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE sales (id INTEGER PRIMARY KEY, region TEXT, amount REAL, sale_date TEXT);
    INSERT INTO sales SELECT value, CASE value%4 WHEN 0 THEN 'North' WHEN 1 THEN 'South'
        WHEN 2 THEN 'East' ELSE 'West' END, RANDOM()*1000,
        date('2024-01-01', '+'||(value%365)||' days')
    FROM generate_series(1, 100000);
''')
query = "SELECT region, AVG(amount) FROM sales WHERE sale_date > '2024-06-01' GROUP BY region"
t0 = time.time(); conn.execute(query).fetchall(); t_slow = time.time()-t0
conn.execute('CREATE INDEX idx_date ON sales(sale_date)')
t0 = time.time(); conn.execute(query).fetchall(); t_fast = time.time()-t0
print(f'Without index: {t_slow*1000:.1f}ms')
print(f'With index:    {t_fast*1000:.1f}ms')
print(f'Speedup: {t_slow/max(t_fast,0.001):.1f}x')
conn.close()
Avoiding N+1 with JOINs
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, dept_id INTEGER, salary REAL);
    CREATE TABLE departments (id INTEGER PRIMARY KEY, name TEXT);
    INSERT INTO departments VALUES (1,'Engineering'),(2,'Sales'),(3,'HR');
    INSERT INTO employees SELECT v, 'Emp '||v, (v%3)+1, 50000+RANDOM()*50000
    FROM generate_series(1,30) v;
''')
print('=== N+1 (avoid) ===')
for eid,name,dept_id in conn.execute('SELECT id,name,dept_id FROM employees LIMIT 3').fetchall():
    dept = conn.execute('SELECT name FROM departments WHERE id=?',(dept_id,)).fetchone()
    print(f'  {name} -> {dept[0]}')
print('=== Single JOIN (preferred) ===')
for row in conn.execute('SELECT e.name, d.name, e.salary FROM employees e JOIN departments d ON e.dept_id=d.id LIMIT 3').fetchall():
    print(f'  {row[0]} | {row[1]} | ${row[2]:,.0f}')
conn.close()
CTE vs subquery readability
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE transactions (id INTEGER PRIMARY KEY, user_id INTEGER,
                               amount REAL, type TEXT, ts TEXT);
    INSERT INTO transactions SELECT v, (v%50)+1, RANDOM()*500,
        CASE v%3 WHEN 0 THEN 'purchase' WHEN 1 THEN 'refund' ELSE 'fee' END,
        date('2024-01-01','+'||(v%300)||' days') FROM generate_series(1,1000) v;
''')
cte_query = '''
    WITH purchase_summary AS (
        SELECT user_id, SUM(amount) total, COUNT(*) cnt
        FROM transactions WHERE type='purchase' GROUP BY user_id
    ), avg_thresh AS (
        SELECT AVG(amount)*5 threshold FROM transactions WHERE type='purchase'
    )
    SELECT ps.user_id, ROUND(ps.total,2) total, ps.cnt
    FROM purchase_summary ps, avg_thresh at WHERE ps.total > at.threshold
    ORDER BY ps.total DESC LIMIT 5
'''
df = pd.read_sql(cte_query, conn)
print('Top buyers via CTE:')
print(df.to_string(index=False))
conn.close()
🏋️ Practice: Index Advisor
Create a 100K-row orders table. Time a customer_id filter query WITHOUT index, add the index, re-run, print speedup, and show EXPLAIN QUERY PLAN before vs after.
Starter Code
import sqlite3, time

conn = sqlite3.connect(':memory:')
# TODO: create orders table with 100K rows (id, customer_id, amount, status)
# TODO: time filter query on customer_id=42
# TODO: EXPLAIN QUERY PLAN before index
# TODO: CREATE INDEX idx_cust ON orders(customer_id)
# TODO: time query with index, print speedup
conn.close()
✅ Practice Checklist
15. Pivoting & Unpivoting

Pivot with CASE WHEN
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE sales (year INTEGER, quarter TEXT, region TEXT, revenue REAL);
    INSERT INTO sales VALUES
        (2024,'Q1','North',120000),(2024,'Q2','North',135000),(2024,'Q3','North',118000),(2024,'Q4','North',145000),
        (2024,'Q1','South',98000),(2024,'Q2','South',105000),(2024,'Q3','South',112000),(2024,'Q4','South',125000);
''')
df = pd.read_sql('''
    SELECT region,
        SUM(CASE WHEN quarter='Q1' THEN revenue ELSE 0 END) Q1,
        SUM(CASE WHEN quarter='Q2' THEN revenue ELSE 0 END) Q2,
        SUM(CASE WHEN quarter='Q3' THEN revenue ELSE 0 END) Q3,
        SUM(CASE WHEN quarter='Q4' THEN revenue ELSE 0 END) Q4,
        SUM(revenue) Total
    FROM sales GROUP BY region ORDER BY Total DESC
''', conn)
print(df.to_string(index=False))
conn.close()
Unpivot (wide to long) with UNION ALL
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE server_metrics (server TEXT, cpu_pct REAL, mem_pct REAL, disk_pct REAL, net_pct REAL);
    INSERT INTO server_metrics VALUES ('web-01',72.5,65.2,45.0,30.1),
        ('web-02',55.0,70.8,52.3,28.7),('db-01',88.2,91.5,78.4,15.2);
''')
df = pd.read_sql('''
    SELECT server,'CPU' metric, cpu_pct value FROM server_metrics
    UNION ALL SELECT server,'Memory',mem_pct FROM server_metrics
    UNION ALL SELECT server,'Disk',disk_pct FROM server_metrics
    UNION ALL SELECT server,'Network',net_pct FROM server_metrics
    ORDER BY server, metric
''', conn)
print(df.to_string(index=False))
conn.close()
Dynamic pivot with Python
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE survey (respondent INTEGER, question TEXT, score INTEGER);
    INSERT INTO survey VALUES (1,'Q1',4),(1,'Q2',5),(1,'Q3',3),(1,'Q4',4),
        (2,'Q1',3),(2,'Q2',4),(2,'Q3',5),(2,'Q4',2),(3,'Q1',5),(3,'Q2',5),(3,'Q3',4),(3,'Q4',5);
''')
questions = [r[0] for r in conn.execute('SELECT DISTINCT question FROM survey ORDER BY question')]
cases = ','.join(f"MAX(CASE WHEN question='{q}' THEN score END) AS {q}" for q in questions)
df = pd.read_sql(f'SELECT respondent, {cases}, ROUND(AVG(score),2) avg_score FROM survey GROUP BY respondent ORDER BY avg_score DESC', conn)
print(df.to_string(index=False))
conn.close()
Cross-tabulation with percentages
import sqlite3, pandas as pd

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE feedback (category TEXT, sentiment TEXT, count INTEGER);
    INSERT INTO feedback VALUES
        ('Product','Positive',450),('Product','Neutral',120),('Product','Negative',80),
        ('Service','Positive',380),('Service','Neutral',95),('Service','Negative',125),
        ('Price','Positive',200),('Price','Neutral',180),('Price','Negative',220);
''')
df = pd.read_sql('''
    SELECT category,
        SUM(CASE WHEN sentiment='Positive' THEN count ELSE 0 END) positive,
        SUM(CASE WHEN sentiment='Neutral' THEN count ELSE 0 END) neutral,
        SUM(CASE WHEN sentiment='Negative' THEN count ELSE 0 END) negative,
        SUM(count) total,
        ROUND(100.0*SUM(CASE WHEN sentiment='Positive' THEN count ELSE 0 END)/SUM(count),1) pct_pos
    FROM feedback GROUP BY category ORDER BY pct_pos DESC
''', conn)
print(df.to_string(index=False))
conn.close()
🏋️ Practice: Dynamic Sales Pivot
Create a sales table with 5 reps and 4 quarters. Get distinct quarters from DB, build a CASE WHEN pivot query dynamically, and display as a DataFrame.
Starter Code
import sqlite3, pandas as pd, numpy as np

conn = sqlite3.connect(':memory:')
np.random.seed(42)
reps = ['Alice','Bob','Carol','Dave','Eve']
quarters = ['Q1','Q2','Q3','Q4']
rows = [(r,q,int(np.random.exponential(80000))) for r in reps for q in quarters]
conn.executescript('CREATE TABLE rep_sales (rep TEXT, quarter TEXT, revenue INTEGER);')
conn.executemany('INSERT INTO rep_sales VALUES (?,?,?)', rows)
# TODO: get distinct quarters dynamically
# TODO: build CASE WHEN pivot query
# TODO: read to DataFrame and print
conn.close()
✅ Practice Checklist
16. Triggers & Database Automation

Audit log trigger on price UPDATE
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL);
    CREATE TABLE price_audit (log_id INTEGER PRIMARY KEY AUTOINCREMENT,
        product_id INTEGER, old_price REAL, new_price REAL,
        changed_at DATETIME DEFAULT CURRENT_TIMESTAMP);
    INSERT INTO products VALUES (1,'Widget',9.99),(2,'Gadget',24.99);
    CREATE TRIGGER log_price AFTER UPDATE OF price ON products
    BEGIN INSERT INTO price_audit(product_id,old_price,new_price)
          VALUES(NEW.id,OLD.price,NEW.price); END;
''')
conn.execute('UPDATE products SET price=12.99 WHERE id=1')
conn.execute('UPDATE products SET price=29.99 WHERE id=2')
for row in conn.execute('SELECT * FROM price_audit').fetchall():
    print(f'Product {row[1]}: ${row[2]} -> ${row[3]}')
conn.close()
Business rule β€” prevent negative stock
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE inventory (product_id INTEGER PRIMARY KEY, quantity INTEGER CHECK(quantity >= 0));
    CREATE TABLE orders (id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, qty INTEGER);
    INSERT INTO inventory VALUES (1,100),(2,5);
    CREATE TRIGGER decrement_stock AFTER INSERT ON orders
    BEGIN UPDATE inventory SET quantity=quantity-NEW.qty WHERE product_id=NEW.product_id; END;
''')
conn.execute('INSERT INTO orders(product_id,qty) VALUES(1,30)')
print('After ordering 30 of product 1:')
for row in conn.execute('SELECT * FROM inventory').fetchall():
    print(f'  Product {row[0]}: {row[1]} units')
try:
    conn.execute('INSERT INTO orders(product_id,qty) VALUES(2,10)')
except Exception as e:
    print(f'Order failed (CHECK constraint): {type(e).__name__}')
conn.close()
Transactional fund transfer (stored proc pattern)
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE accounts (id INTEGER PRIMARY KEY, name TEXT, balance REAL);
    CREATE TABLE transfers (id INTEGER PRIMARY KEY AUTOINCREMENT,
        from_id INTEGER, to_id INTEGER, amount REAL, ts DATETIME DEFAULT CURRENT_TIMESTAMP);
    INSERT INTO accounts VALUES (1,'Alice',5000),(2,'Bob',1500),(3,'Carol',3200);
''')

def transfer(conn, from_id, to_id, amount):
    bal = conn.execute('SELECT balance FROM accounts WHERE id=?',(from_id,)).fetchone()
    if not bal or bal[0] < amount:
        print(f'  FAILED: ${bal[0] if bal else 0:.2f} < ${amount:.2f}')
        return
    conn.execute('BEGIN')
    conn.execute('UPDATE accounts SET balance=balance-? WHERE id=?',(amount,from_id))
    conn.execute('UPDATE accounts SET balance=balance+? WHERE id=?',(amount,to_id))
    conn.execute('INSERT INTO transfers(from_id,to_id,amount) VALUES(?,?,?)',(from_id,to_id,amount))
    conn.execute('COMMIT')
    print(f'  OK: ${amount:.2f} from acct {from_id} to acct {to_id}')

transfer(conn,1,2,500)
transfer(conn,2,3,3000)  # fails
for row in conn.execute('SELECT id,name,balance FROM accounts').fetchall():
    print(f'{row[1]}: ${row[2]:.2f}')
conn.close()
INSTEAD OF trigger on a view
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript('''
    CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, salary REAL, dept TEXT);
    INSERT INTO employees VALUES (1,'Alice',75000,'Eng'),(2,'Bob',68000,'Sales');
    CREATE VIEW emp_view AS SELECT id, name, salary, dept FROM employees;
    CREATE TRIGGER update_via_view INSTEAD OF UPDATE OF salary ON emp_view
    BEGIN UPDATE employees SET salary=NEW.salary WHERE id=OLD.id; END;
''')
conn.execute("UPDATE emp_view SET salary=90000 WHERE name='Alice'")
for row in conn.execute('SELECT name, salary FROM employees').fetchall():
    print(f'{row[0]}: ${row[1]:,.0f}')
conn.close()
🏋️ Practice: Banking Triggers
Create accounts and transactions tables. Add triggers: (1) log withdrawals >$1000 to large_withdrawals, (2) prevent negative balances. Test with 5 transactions including one that should fail.
Starter Code
import sqlite3

conn = sqlite3.connect(':memory:')
# TODO: CREATE TABLE accounts (id, name, balance)
# TODO: CREATE TABLE large_withdrawals (id AUTOINCREMENT, account_id, amount, ts)
# TODO: CREATE TRIGGER log_large after INSERT on transactions WHEN amount > 1000
# TODO: enforce non-negative balance via CHECK or trigger
# TODO: insert test transactions, show results
conn.close()
✅ Practice Checklist
17. Subqueries in Depth

Subqueries (nested SELECT) can appear in WHERE, FROM, SELECT, and HAVING clauses. Master scalar, column, table, and correlated subqueries for complex filtering and derived metrics.

Scalar and column subqueries in WHERE
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE sales (id INTEGER, rep TEXT, amount REAL, region TEXT)''')
runmany('INSERT INTO sales VALUES (?,?,?,?)', [
    (1,'Alice',1200,'North'),(2,'Bob',800,'South'),(3,'Carol',1500,'North'),
    (4,'Dave',600,'South'),(5,'Eve',2000,'North'),(6,'Frank',950,'East'),
])

# Scalar subquery: single value in WHERE
rows = run('''
    SELECT rep, amount FROM sales
    WHERE amount > (SELECT AVG(amount) FROM sales)
    ORDER BY amount DESC
''')
print("Above-average reps:")
for r in rows: print(f"  {r[0]}: ${r[1]:,.0f}")

# IN with subquery
rows = run('''
    SELECT rep, amount FROM sales
    WHERE region IN (SELECT DISTINCT region FROM sales WHERE amount > 1000)
    AND amount < 1000
    ORDER BY amount
''')
print("In high-value regions but low individual amount:")
for r in rows: print(f"  {r[0]}: ${r[1]:,.0f}")
Correlated subqueries
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE employees (id INTEGER, name TEXT, dept TEXT, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
    (1,'Alice','Eng',90000),(2,'Bob','Eng',85000),(3,'Carol','Sales',70000),
    (4,'Dave','Sales',72000),(5,'Eve','HR',65000),(6,'Frank','Eng',95000),
    (7,'Grace','HR',68000),(8,'Henry','Sales',75000),
])

# Correlated subquery: runs ONCE PER ROW in outer query
# Find employees earning above their department average
rows = run('''
    SELECT e.name, e.dept, e.salary,
           ROUND((SELECT AVG(salary) FROM employees i WHERE i.dept = e.dept), 0) AS dept_avg
    FROM employees e
    WHERE e.salary > (SELECT AVG(salary) FROM employees i WHERE i.dept = e.dept)
    ORDER BY e.dept, e.salary DESC
''')
print("Above-department-average earners:")
for r in rows:
    print(f"  {r[0]} ({r[1]}): ${r[2]:,.0f} (dept avg ${r[3]:,.0f})")

# EXISTS: check for related rows
rows = run('''
    SELECT DISTINCT dept FROM employees e
    WHERE EXISTS (
        SELECT 1 FROM employees i
        WHERE i.dept = e.dept AND i.salary > 80000
    )
''')
print("Departments with at least one high earner:", [r[0] for r in rows])
Derived tables (subquery in FROM)
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE orders (id INTEGER, cust_id INTEGER, total REAL, yr INTEGER)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
    (1,1,500,2023),(2,1,300,2023),(3,2,1200,2023),(4,2,800,2024),
    (5,3,200,2024),(6,1,900,2024),(7,3,600,2023),(8,2,400,2024),
])

# Derived table: aggregate first, then filter/join
rows = run('''
    SELECT cust_id, total_2023, total_2024,
           ROUND(total_2024 - total_2023, 0) AS growth
    FROM (
        SELECT cust_id,
               SUM(CASE WHEN yr=2023 THEN total ELSE 0 END) AS total_2023,
               SUM(CASE WHEN yr=2024 THEN total ELSE 0 END) AS total_2024
        FROM orders
        GROUP BY cust_id
    ) yearly
    ORDER BY growth DESC
''')
print("Year-over-year growth by customer:")
for r in rows:
    print(f"  Cust {r[0]}: 2023=${r[1]:,.0f} -> 2024=${r[2]:,.0f} ({r[3]:+,.0f})")

# Derived table with LIMIT for top-N analysis
rows = run('''
    SELECT cust_id, avg_order
    FROM (SELECT cust_id, AVG(total) AS avg_order FROM orders GROUP BY cust_id)
    WHERE avg_order > 500
''')
print("Customers with avg order > 500:", rows)
💼 Real-World: Sales Ranking with Subqueries
A sales ops team needs to find reps in each region who beat both their regional average AND the company average β€” combining correlated subqueries with derived tables.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE sales (id INTEGER, rep TEXT, region TEXT, amount REAL, quarter INTEGER)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?)', [
    (1,'Alice','North',1200,1),(2,'Bob','North',800,1),(3,'Carol','South',1500,1),
    (4,'Dave','South',600,1),(5,'Eve','North',2000,2),(6,'Frank','South',950,2),
    (7,'Grace','East',1100,1),(8,'Henry','East',750,1),(9,'Iris','East',1300,2),
])
company_avg = run('SELECT AVG(amount) FROM sales')[0][0]
print(f"Company avg: ${company_avg:,.0f}")

rows = run(f'''
    SELECT s.rep, s.region, s.amount,
           ROUND(regional.avg_amount, 0) AS regional_avg
    FROM sales s
    JOIN (
        SELECT region, AVG(amount) AS avg_amount
        FROM sales GROUP BY region
    ) regional ON s.region = regional.region
    WHERE s.amount > regional.avg_amount
      AND s.amount > {company_avg}
    ORDER BY s.region, s.amount DESC
''')
print("Reps beating both regional AND company average:")
for r in rows:
    print(f"  {r[0]} ({r[1]}): ${r[2]:,.0f} > regional ${r[3]:,.0f}")
🏋️ Practice: Self-referencing Subquery
Create a products table (id, name, category, price, cost). Write: (1) A query using a scalar subquery to show each product with its category's avg price. (2) A correlated subquery to find the cheapest product in each category. (3) A derived table query to show categories where avg margin (price-cost)/price > 30%.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE products (id INT, name TEXT, category TEXT, price REAL, cost REAL)''')
runmany('INSERT INTO products VALUES (?,?,?,?,?)', [
    (1,'Apple','Fruit',1.2,0.4),(2,'Banana','Fruit',0.5,0.15),(3,'Carrot','Veg',0.8,0.3),
    (4,'Potato','Veg',0.6,0.2),(5,'Milk','Dairy',1.5,0.8),(6,'Cheese','Dairy',4.0,1.5),
    (7,'Yogurt','Dairy',2.0,0.9),(8,'Broccoli','Veg',1.1,0.4),
])

# 1. Each product with its category avg price
print("=== Product vs Category Avg ===")
# TODO: write query with scalar correlated subquery for category avg

# 2. Cheapest product in each category
print("=== Cheapest per Category ===")
# TODO: use correlated subquery to find min price per category

# 3. Categories with avg margin > 30%
print("=== High-Margin Categories ===")
# TODO: derived table with margin calc, then filter
✅ Practice Checklist
18. String Functions & Pattern Matching

SQL provides rich string functions: UPPER/LOWER, SUBSTR, TRIM, REPLACE, LENGTH, INSTR, and LIKE/GLOB for pattern matching. Essential for cleaning and searching text data.

Core string functions
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE contacts (id INTEGER, name TEXT, email TEXT, phone TEXT)''')
runmany('INSERT INTO contacts VALUES (?,?,?,?)', [
    (1,'  Alice Smith  ','alice@EXAMPLE.com','(555) 123-4567'),
    (2,'Bob  Jones','bob.jones@mail.com','555.987.6543'),
    (3,'carol lee','CAROL@COMPANY.ORG','5551234567'),
    (4,'DAVE BROWN','dave@test.net','555-111-2222'),
])

rows = run('''
    SELECT
        id,
        TRIM(name)                              AS name_clean,
        LOWER(TRIM(name))                       AS name_lower,
        UPPER(SUBSTR(TRIM(name), 1, 1))         AS initial,
        LOWER(email)                            AS email_lower,
        LENGTH(TRIM(name))                      AS name_len,
        INSTR(TRIM(name), " ")                  AS space_pos,
        -- Extract first name (up to first space)
        SUBSTR(TRIM(name), 1, INSTR(TRIM(name)||" ", " ")-1) AS first_name
    FROM contacts
''')
for r in rows:
    print(f"  ID {r[0]}: '{r[1]}' | initial={r[2]} | fname={r[7]}")
LIKE and GLOB pattern matching
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE products (id INTEGER, sku TEXT, name TEXT, category TEXT)''')
runmany('INSERT INTO products VALUES (?,?,?,?)', [
    (1,'FRUIT-APL-001','Apple (Red)','Fruit'),
    (2,'FRUIT-BAN-002','Banana','Fruit'),
    (3,'VEG-CAR-001','Carrot','Vegetable'),
    (4,'VEG-POT-002','Potato','Vegetable'),
    (5,'DAIRY-MLK-001','Whole Milk','Dairy'),
    (6,'FRUIT-APL-002','Apple (Green)','Fruit'),
    (7,'DAIRY-CHZ-001','Cheddar Cheese','Dairy'),
])

# LIKE: case-insensitive %, _ wildcards
rows = run("SELECT name FROM products WHERE name LIKE 'Apple%'")
print("Starts with 'Apple':", [r[0] for r in rows])

rows = run("SELECT name FROM products WHERE name LIKE '%e%'")
print("Contains 'e':", [r[0] for r in rows])

rows = run("SELECT sku FROM products WHERE sku LIKE 'FRUIT-___-___'")
print("SKUs matching FRUIT-???-???:", [r[0] for r in rows])

# GLOB: case-sensitive, uses * and ? (Unix-style)
rows = run("SELECT sku FROM products WHERE sku GLOB 'FRUIT-*'")
print("GLOB FRUIT-*:", [r[0] for r in rows])

rows = run("SELECT sku FROM products WHERE sku GLOB '*-001'")
print("GLOB *-001:", [r[0] for r in rows])

# NOT LIKE
rows = run("SELECT name FROM products WHERE name NOT LIKE '%Apple%'")
print("Not apple:", [r[0] for r in rows])
String manipulation and cleaning
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE raw_data (id INTEGER, raw_phone TEXT, raw_email TEXT)''')
runmany('INSERT INTO raw_data VALUES (?,?,?)', [
    (1,'(555) 123-4567','  User@Example.COM  '),
    (2,'555.987.6543','bob.jones@MAIL.COM'),
    (3,'5551234567','CAROL@company.org'),
])

rows = run('''
    SELECT
        id,
        -- Normalize phone: keep only digits via REPLACE chain
        REPLACE(REPLACE(REPLACE(REPLACE(raw_phone, "(", ""), ")", ""), "-", ""), ".", "") AS phone_digits,
        -- Trim spaces and lowercase email
        LOWER(TRIM(raw_email)) AS email_clean,
        -- Extract domain from email
        SUBSTR(
            LOWER(TRIM(raw_email)),
            INSTR(LOWER(TRIM(raw_email)), "@") + 1
        ) AS domain,
        -- Check email format (has @ and .)
        CASE WHEN INSTR(raw_email, "@") > 0 AND INSTR(raw_email, ".") > 0
             THEN "valid" ELSE "invalid" END AS email_status
    FROM raw_data
''')
for r in rows:
    print(f"  ID {r[0]}: phone={r[1]}, email={r[2]}, domain={r[3]}, {r[4]}")

# REPLACE for data masking
rows = run('''
    SELECT id,
           SUBSTR(phone_digits, 1, 3) || "****" || SUBSTR(phone_digits, 8) AS masked
    FROM (SELECT id, REPLACE(REPLACE(REPLACE(raw_phone,"(",""),")",""),"-","") AS phone_digits
          FROM raw_data)
''')
print("Masked phones:", [(r[0], r[1]) for r in rows])
💼 Real-World: Email Domain Analysis
A marketing team analyzes which email domains their B2B customers use, normalizing inconsistent formatting before aggregating.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE customers (id INTEGER, email TEXT, tier TEXT)''')
runmany('INSERT INTO customers VALUES (?,?,?)', [
    (1,'alice@ACME.COM','gold'),(2,'bob@tech.org','silver'),
    (3,'carol@acme.com','gold'),(4,'dave@STARTUP.IO','bronze'),
    (5,'eve@TECH.ORG','silver'),(6,'frank@enterprise.net','gold'),
    (7,'grace@startup.io','bronze'),(8,'henry@acme.com','silver'),
    (9,'iris@ENTERPRISE.NET','gold'),(10,'jack@freelance.dev','bronze'),
])

rows = run('''
    SELECT
        LOWER(SUBSTR(email, INSTR(email, "@") + 1)) AS domain,
        COUNT(*)                                      AS customers,
        SUM(CASE WHEN tier = "gold"   THEN 1 ELSE 0 END) AS gold,
        SUM(CASE WHEN tier = "silver" THEN 1 ELSE 0 END) AS silver,
        SUM(CASE WHEN tier = "bronze" THEN 1 ELSE 0 END) AS bronze
    FROM customers
    GROUP BY LOWER(SUBSTR(email, INSTR(email, "@") + 1))
    ORDER BY customers DESC
''')
print("Domain analysis:")
for r in rows:
    print(f"  {r[0]:<20} total={r[1]} gold={r[2]} silver={r[3]} bronze={r[4]}")
🏋️ Practice: Name Formatter
Create a people table (id, full_name, email). Write a query that produces: first_name (before first space), last_name (after last space), initials (first letter of each word), username (lowercase first_name + '.' + last 4 of email before @), and email_valid (1 if contains '@' and '.', else 0).
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE people (id INT, full_name TEXT, email TEXT)''')
runmany('INSERT INTO people VALUES (?,?,?)', [
    (1,'Alice Marie Smith','alice.smith@company.com'),
    (2,'Bob Jones','bob@mail.net'),
    (3,'Carol Ann Lee','carol.lee@invalid'),
    (4,'Dave','dave@x.io'),
])

rows = run('''
    SELECT
        id,
        -- first_name: text before first space
        SUBSTR(full_name, 1,
               CASE WHEN INSTR(full_name, " ") > 0
                    THEN INSTR(full_name, " ")-1
                    ELSE LENGTH(full_name) END) AS first_name,
        -- TODO: last_name (after last space, or full_name if no space)
        -- TODO: initials
        -- TODO: username (lower_first + . + last 4 chars before @)
        -- TODO: email_valid
        email
    FROM people
''')
for r in rows: print(r)
✅ Practice Checklist
19. Date & Time Functions

SQL date functions compute differences, extract components, and format timestamps. In SQLite, dates are stored as TEXT (ISO), INTEGER (Unix epoch), or REAL (Julian day).

Date arithmetic and formatting (SQLite)
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
# SQLite stores dates as text in ISO format: 'YYYY-MM-DD'
rows = run('''
    SELECT
        DATE('now')                           AS today,
        DATE('now', '+7 days')               AS next_week,
        DATE('now', '-30 days')              AS last_month,
        DATE('now', 'start of month')        AS month_start,
        DATE('now', 'start of year')         AS year_start,
        DATE('now', '+1 year', '-1 day')     AS end_of_next_year,
        DATETIME('now')                       AS now_dt,
        STRFTIME('%Y-%m-%d %H:%M', 'now')    AS formatted
''')
for key, val in zip(['today','next_week','last_month','month_start','year_start','eony','now_dt','fmt'], rows[0]):
    print(f"  {key}: {val}")
Extracting date parts and calculating age
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE employees (id INTEGER, name TEXT, hire_date TEXT, birth_date TEXT)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
    (1,'Alice','2019-03-15','1990-07-22'),
    (2,'Bob','2021-11-01','1985-02-14'),
    (3,'Carol','2022-06-20','1995-09-08'),
    (4,'Dave','2018-01-10','1988-12-01'),
])

rows = run('''
    SELECT
        name,
        hire_date,
        -- Extract parts
        STRFTIME('%Y', hire_date)               AS hire_year,
        STRFTIME('%m', hire_date)               AS hire_month,
        STRFTIME('%d', hire_date)               AS hire_day,
        -- Days since hire
        CAST(JULIANDAY('now') - JULIANDAY(hire_date) AS INTEGER) AS days_employed,
        -- Years employed
        CAST((JULIANDAY('now') - JULIANDAY(hire_date)) / 365.25 AS INTEGER) AS years_employed,
        -- Age from birth_date
        CAST((JULIANDAY('now') - JULIANDAY(birth_date)) / 365.25 AS INTEGER) AS age
    FROM employees
    ORDER BY hire_date
''')
print("Employee tenure and age:")
for r in rows:
    print(f"  {r[0]}: hired {r[1]}, {r[7]} days ({r[8]} yrs), age {r[9]}")
Time series date bucketing
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
import datetime
run('''CREATE TABLE events (id INTEGER, ts TEXT, event TEXT, value REAL)''')
import random; random.seed(42)
events_data = []
for i in range(30):
    d = datetime.date(2024, 1, 1) + datetime.timedelta(days=i)
    for _ in range(random.randint(1, 5)):
        events_data.append((i*10+random.randint(1,9),
                            d.strftime('%Y-%m-%d'),
                            random.choice(['view','click','purchase']),
                            round(random.uniform(1, 100), 2)))
runmany('INSERT INTO events VALUES (?,?,?,?)', events_data)

# Group by week
rows = run('''
    SELECT
        STRFTIME('%Y-W%W', ts)         AS week,
        COUNT(*)                        AS events,
        SUM(CASE WHEN event="purchase" THEN 1 ELSE 0 END) AS purchases,
        ROUND(SUM(value), 2)            AS total_value
    FROM events
    GROUP BY week
    ORDER BY week
''')
print("Weekly breakdown:")
for r in rows: print(f"  {r[0]}: {r[1]} events, {r[2]} purchases, ${r[3]}")

# Days with no purchases
rows = run('''
    SELECT ts, COUNT(*) AS events
    FROM events
    WHERE event != "purchase"
    AND ts NOT IN (SELECT DISTINCT ts FROM events WHERE event = "purchase")
    GROUP BY ts
    ORDER BY ts
    LIMIT 3
''')
print("Days with no purchases (sample):", [(r[0],r[1]) for r in rows])
💼 Real-World: Subscription Churn Analysis
A SaaS company analyzes monthly churn by comparing subscription start/end dates, computing tenure, and flagging customers who churned within 90 days of signup.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE subscriptions (id INT, cust_id INT, start_date TEXT, end_date TEXT, plan TEXT)''')
runmany('INSERT INTO subscriptions VALUES (?,?,?,?,?)', [
    (1,1,'2023-01-15','2024-01-15','annual'),
    (2,2,'2023-03-01','2023-05-15','monthly'),
    (3,3,'2023-06-01',None,'annual'),
    (4,4,'2023-09-10','2023-10-05','monthly'),
    (5,5,'2024-01-01',None,'monthly'),
    (6,6,'2023-02-14','2023-11-30','annual'),
])

rows = run('''
    SELECT
        cust_id, plan, start_date,
        COALESCE(end_date, DATE("now")) AS effective_end,
        CAST((JULIANDAY(COALESCE(end_date, DATE("now"))) -
              JULIANDAY(start_date)) AS INTEGER)         AS tenure_days,
        CASE WHEN end_date IS NOT NULL THEN "churned" ELSE "active" END AS status,
        CASE WHEN end_date IS NOT NULL
              AND JULIANDAY(end_date) - JULIANDAY(start_date) < 90
             THEN "early_churn" ELSE "ok" END AS churn_flag
    FROM subscriptions
    ORDER BY start_date
''')
print("Subscription analysis:")
for r in rows:
    print(f"  Cust {r[0]} ({r[1]}): {r[4]} days, {r[5]}, {r[6]}")
🏋️ Practice: Date Range Queries
Create a bookings table (id, room, check_in, check_out, guest). Write: (1) Find bookings overlapping a given date range ('2024-06-01' to '2024-06-10'). (2) Calculate avg stay length by month. (3) Find rooms that were booked more than 80% of days in Q2 2024. (4) Show guests with return visits within 6 months.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE bookings (id INT, room TEXT, check_in TEXT, check_out TEXT, guest TEXT)''')
runmany('INSERT INTO bookings VALUES (?,?,?,?,?)', [
    (1,'101','2024-05-28','2024-06-03','Alice'),
    (2,'102','2024-06-05','2024-06-08','Bob'),
    (3,'101','2024-06-10','2024-06-15','Carol'),
    (4,'103','2024-06-01','2024-06-30','Dave'),
    (5,'102','2024-07-01','2024-07-05','Alice'),
    (6,'101','2024-08-01','2024-08-10','Bob'),
])

# 1. Overlapping bookings with date range
target_in, target_out = '2024-06-01', '2024-06-10'
print("=== Bookings overlapping Jun 1-10 ===")
rows = run(f'''
    SELECT room, guest, check_in, check_out
    FROM bookings
    WHERE check_in < "{target_out}" AND check_out > "{target_in}"
    ORDER BY check_in
''')
for r in rows: print(f"  {r}")

# 2. Avg stay length by month
print("=== Avg stay by month ===")
rows = run('''
    SELECT
        STRFTIME("%Y-%m", check_in) AS month,
        ROUND(AVG(JULIANDAY(check_out) - JULIANDAY(check_in)), 1) AS avg_nights,
        COUNT(*) AS bookings
    FROM bookings GROUP BY month ORDER BY month
''')
for r in rows: print(f"  {r}")

# 3. TODO: rooms booked > 80% of Q2 days
# 4. TODO: return visits within 6 months
✅ Practice Checklist
20. CASE WHEN & Conditional Logic

CASE WHEN is SQL's conditional expression. Use it for bucketing, pivoting, null handling, and complex conditional aggregations. Combined with COALESCE and NULLIF for robust null handling.

Simple and searched CASE
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE students (id INTEGER, name TEXT, score INTEGER, grade TEXT)''')
runmany('INSERT INTO students VALUES (?,?,?,?)', [
    (1,'Alice',95,None),(2,'Bob',72,None),(3,'Carol',88,None),
    (4,'Dave',61,None),(5,'Eve',79,None),(6,'Frank',55,None),
])

rows = run('''
    SELECT
        name, score,
        -- Searched CASE (most flexible)
        CASE
            WHEN score >= 90 THEN "A"
            WHEN score >= 80 THEN "B"
            WHEN score >= 70 THEN "C"
            WHEN score >= 60 THEN "D"
            ELSE "F"
        END AS letter_grade,
        -- Simple CASE (like switch)
        CASE ROUND(score/10)*10
            WHEN 100 THEN "Perfect"
            WHEN 90  THEN "Excellent"
            WHEN 80  THEN "Good"
            WHEN 70  THEN "Pass"
            ELSE "Fail"
        END AS band,
        -- Boolean expression
        CASE WHEN score >= 70 THEN 1 ELSE 0 END AS passed
    FROM students
    ORDER BY score DESC
''')
for r in rows:
    print(f"  {r[0]}: {r[1]} -> {r[2]} ({r[3]}) passed={r[4]}")
CASE in aggregations (conditional counting)
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE orders (id INTEGER, status TEXT, region TEXT, total REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
    (1,'completed','North',500),(2,'cancelled','South',300),(3,'completed','North',800),
    (4,'pending','East',200),(5,'completed','South',650),(6,'cancelled','North',400),
    (7,'completed','East',1200),(8,'pending','South',350),(9,'completed','East',900),
])

rows = run('''
    SELECT
        region,
        COUNT(*)                                              AS total_orders,
        SUM(CASE WHEN status = "completed" THEN 1 ELSE 0 END) AS completed,
        SUM(CASE WHEN status = "cancelled" THEN 1 ELSE 0 END) AS cancelled,
        SUM(CASE WHEN status = "pending"   THEN 1 ELSE 0 END) AS pending,
        ROUND(100.0 * SUM(CASE WHEN status="completed" THEN 1 ELSE 0 END) / COUNT(*), 1) AS completion_pct,
        ROUND(SUM(CASE WHEN status="completed" THEN total ELSE 0 END), 0) AS completed_revenue,
        ROUND(AVG(CASE WHEN status="completed" THEN total ELSE NULL END), 0) AS avg_completed
    FROM orders
    GROUP BY region
    ORDER BY completed_revenue DESC
''')
print("Order analysis by region:")
for r in rows:
    print(f"  {r[0]}: {r[1]} orders, {r[2]} completed ({r[5]}%), revenue=${r[6]}, avg=${r[7]}")
COALESCE, NULLIF, and IIF
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE metrics (id INTEGER, name TEXT, q1 REAL, q2 REAL, q3 REAL, q4 REAL)''')
runmany('INSERT INTO metrics VALUES (?,?,?,?,?,?)', [
    (1,'Revenue',1000,None,1200,None),
    (2,'Costs',600,700,None,650),
    (3,'Users',500,520,None,None),
])

rows = run('''
    SELECT
        name,
        -- COALESCE: return first non-NULL value
        COALESCE(q1, 0)                    AS q1_safe,
        COALESCE(q2, q1, 0)               AS q2_or_q1,  -- fill q2 from q1
        COALESCE(q3, q2, q1, 0)           AS q3_fill,

        -- NULLIF: return NULL if two values are equal
        NULLIF(q1, 0)                      AS q1_null_if_zero,

        -- IIF (SQLite 3.32+): shorthand for simple CASE WHEN
        IIF(q4 IS NULL, "missing", "present") AS q4_status,

        -- Compute quarter growth (handles NULLs gracefully)
        CASE WHEN q1 IS NOT NULL AND q2 IS NOT NULL
             THEN ROUND((q2 - q1) / q1 * 100, 1)
             ELSE NULL END AS q1_q2_growth_pct
    FROM metrics
''')
print("Metric analysis:")
for r in rows:
    print(f"  {r[0]}: q1_safe={r[1]}, q2_or_q1={r[2]}, q3_fill={r[3]}, q4={r[5]}, growth={r[6]}")
💼 Real-World: Customer Segmentation
A CRM team segments customers by RFM score (Recency, Frequency, Monetary) using CASE WHEN on aggregated purchase data to assign Bronze/Silver/Gold/Platinum tiers.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE purchases (id INT, cust_id INT, amount REAL, purchase_date TEXT)''')
runmany('INSERT INTO purchases VALUES (?,?,?,?)', [
    (1,1,150,'2024-01-10'),(2,1,200,'2024-02-15'),(3,1,180,'2024-03-01'),
    (4,2,500,'2024-03-20'),(5,2,300,'2024-03-25'),
    (6,3,50,'2023-12-01'),(7,4,1000,'2024-03-28'),(8,4,800,'2024-03-29'),
    (9,4,600,'2024-03-30'),
])

rows = run('''
    WITH rfm AS (
        SELECT
            cust_id,
            CAST(JULIANDAY("2024-04-01") - JULIANDAY(MAX(purchase_date)) AS INT) AS recency_days,
            COUNT(*) AS frequency,
            ROUND(SUM(amount), 0) AS monetary
        FROM purchases GROUP BY cust_id
    )
    SELECT
        cust_id, recency_days, frequency, monetary,
        CASE
            WHEN recency_days <= 7  AND frequency >= 3 AND monetary >= 500 THEN "Platinum"
            WHEN recency_days <= 30 AND frequency >= 2                      THEN "Gold"
            WHEN recency_days <= 60                                          THEN "Silver"
            ELSE "Bronze"
        END AS tier
    FROM rfm ORDER BY monetary DESC
''')
print("Customer RFM tiers:")
for r in rows:
    print(f"  Cust {r[0]}: recency={r[1]}d, freq={r[2]}, monetary=${r[3]} -> {r[4]}")
🏋️ Practice: Score Bucketing
Create a test_results table (student_id, subject, score). Write: (1) A pivot showing each subject's count of A/B/C/D/F grades using CASE in SUM. (2) A letter grade column + 'needs_review' flag (grade D or F). (3) Each student's weakest subject (lowest score). (4) Students who improved: passed all subjects (score >= 70).
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE test_results (student_id INT, subject TEXT, score INT)''')
runmany('INSERT INTO test_results VALUES (?,?,?)', [
    (1,'Math',85),(1,'English',72),(1,'Science',90),
    (2,'Math',60),(2,'English',88),(2,'Science',55),
    (3,'Math',95),(3,'English',91),(3,'Science',87),
    (4,'Math',70),(4,'English',65),(4,'Science',80),
])

# 1. Pivot: count of each grade per subject
print("=== Grade Distribution by Subject ===")
rows = run('''
    SELECT subject,
           SUM(CASE WHEN score >= 90 THEN 1 ELSE 0 END) AS A_count,
           SUM(CASE WHEN score >= 80 AND score < 90 THEN 1 ELSE 0 END) AS B_count,
           SUM(CASE WHEN score >= 70 AND score < 80 THEN 1 ELSE 0 END) AS C_count,
           SUM(CASE WHEN score < 70 THEN 1 ELSE 0 END) AS D_or_F_count
    FROM test_results GROUP BY subject
''')
for r in rows: print(f"  {r}")

# 2. TODO: letter grade + needs_review flag
# 3. TODO: weakest subject per student
# 4. TODO: students passing all subjects
✅ Practice Checklist
21. Set Operations (UNION, INTERSECT, EXCEPT)

SQL set operations combine results of multiple SELECT statements: UNION (all unique), UNION ALL (keep duplicates), INTERSECT (common rows), and EXCEPT/MINUS (rows in first but not second).

UNION and UNION ALL
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE customers_2023 (id INTEGER, name TEXT, email TEXT)''')
run('''CREATE TABLE customers_2024 (id INTEGER, name TEXT, email TEXT)''')
runmany('INSERT INTO customers_2023 VALUES (?,?,?)', [
    (1,'Alice','alice@mail.com'),(2,'Bob','bob@mail.com'),
    (3,'Carol','carol@mail.com'),(4,'Dave','dave@mail.com'),
])
runmany('INSERT INTO customers_2024 VALUES (?,?,?)', [
    (3,'Carol','carol@mail.com'),(4,'Dave','dave@mail.com'),
    (5,'Eve','eve@mail.com'),(6,'Frank','frank@mail.com'),
])

# UNION: unique rows only (deduplicates)
rows = run('''
    SELECT "2023" AS year, name FROM customers_2023
    UNION
    SELECT "2024" AS year, name FROM customers_2024
    ORDER BY name
''')
print(f"UNION (unique names): {len(rows)} rows")
for r in rows: print(f"  {r}")

# UNION ALL: keeps all rows including duplicates
rows = run('''
    SELECT "2023" AS cohort, name FROM customers_2023
    UNION ALL
    SELECT "2024" AS cohort, name FROM customers_2024
    ORDER BY name
''')
print(f"UNION ALL (all rows): {len(rows)} rows")

# Useful: combine logs from two tables
rows = run('''
    SELECT "new_customer" AS event, name, "2023" AS yr FROM customers_2023
    UNION ALL
    SELECT "new_customer", name, "2024" FROM customers_2024
    ORDER BY yr, name
''')
print("All customer events:", len(rows))
INTERSECT and EXCEPT
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE eligible_2023 (cust_id INTEGER)''')
run('''CREATE TABLE eligible_2024 (cust_id INTEGER)''')
run('''CREATE TABLE opted_out (cust_id INTEGER)''')
runmany('INSERT INTO eligible_2023 VALUES (?)', [(i,) for i in [1,2,3,4,5,6]])
runmany('INSERT INTO eligible_2024 VALUES (?)', [(i,) for i in [3,4,5,6,7,8]])
runmany('INSERT INTO opted_out VALUES (?)', [(i,) for i in [2,5,8]])

# INTERSECT: rows that appear in BOTH queries
rows = run('''
    SELECT cust_id FROM eligible_2023
    INTERSECT
    SELECT cust_id FROM eligible_2024
    ORDER BY cust_id
''')
print("Eligible BOTH years:", [r[0] for r in rows])

# EXCEPT: rows in first but NOT in second
rows = run('''
    SELECT cust_id FROM eligible_2023
    EXCEPT
    SELECT cust_id FROM eligible_2024
''')
print("Only eligible in 2023:", [r[0] for r in rows])

# Combine: eligible both years, excluding opted-out
rows = run('''
    SELECT cust_id FROM eligible_2023
    INTERSECT
    SELECT cust_id FROM eligible_2024
    EXCEPT
    SELECT cust_id FROM opted_out
    ORDER BY cust_id
''')
print("Eligible both years AND not opted out:", [r[0] for r in rows])
Set operations for data reconciliation
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE source_records (id INTEGER, value REAL, ts TEXT)''')
run('''CREATE TABLE target_records (id INTEGER, value REAL, ts TEXT)''')
runmany('INSERT INTO source_records VALUES (?,?,?)', [
    (1,100.0,'2024-01-01'),(2,200.0,'2024-01-02'),(3,300.0,'2024-01-03'),
    (4,400.0,'2024-01-04'),(5,500.0,'2024-01-05'),
])
runmany('INSERT INTO target_records VALUES (?,?,?)', [
    (1,100.0,'2024-01-01'),(2,201.0,'2024-01-02'),  # id=2 has wrong value
    (3,300.0,'2024-01-03'),                           # id=4,5 missing
    (6,600.0,'2024-01-06'),                           # id=6 is extra
])

# Records in source but not in target (missing)
missing = run('''
    SELECT id, value, "missing_from_target" AS issue FROM source_records
    EXCEPT
    SELECT id, value, "missing_from_target" FROM target_records
''')
print("Issues in reconciliation:")
for r in missing: print(f"  Source id={r[0]}, value={r[1]} not in target")

# Records in target but not in source (extra)
extra = run('''
    SELECT id, value FROM target_records
    EXCEPT
    SELECT id, value FROM source_records
''')
for r in extra: print(f"  Target id={r[0]}, value={r[1]} not in source")

# Exact matches
matched = run('''
    SELECT COUNT(*) FROM (
        SELECT id, value FROM source_records
        INTERSECT
        SELECT id, value FROM target_records
    )
''')
print(f"Exactly matched records: {matched[0][0]}")
💼 Real-World: Data Migration Audit
A data engineering team validates a migration by comparing source and destination tables using INTERSECT and EXCEPT to find missing, extra, and mismatched records.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE source_customers (id INT, name TEXT, email TEXT, tier TEXT)''')
run('''CREATE TABLE migrated_customers (id INT, name TEXT, email TEXT, tier TEXT)''')
runmany('INSERT INTO source_customers VALUES (?,?,?,?)', [
    (1,'Alice','alice@a.com','gold'),(2,'Bob','bob@b.com','silver'),
    (3,'Carol','carol@c.com','bronze'),(4,'Dave','dave@d.com','gold'),
    (5,'Eve','eve@e.com','silver'),
])
runmany('INSERT INTO migrated_customers VALUES (?,?,?,?)', [
    (1,'Alice','alice@a.com','gold'),(2,'Bob','bob_new@b.com','silver'),
    (3,'Carol','carol@c.com','bronze'),(6,'Frank','frank@f.com','bronze'),
])

fully_matched = run('''
    SELECT COUNT(*) FROM (
        SELECT id,name,email,tier FROM source_customers
        INTERSECT
        SELECT id,name,email,tier FROM migrated_customers)''')[0][0]
missing_in_dest = run('''
    SELECT id, name FROM source_customers
    EXCEPT SELECT id, name FROM migrated_customers''')
extra_in_dest = run('''
    SELECT id, name FROM migrated_customers
    EXCEPT SELECT id, name FROM source_customers''')

print(f"Source: {run('SELECT COUNT(*) FROM source_customers')[0][0]} records")
print(f"Migrated: {run('SELECT COUNT(*) FROM migrated_customers')[0][0]} records")
print(f"Fully matched: {fully_matched}")
print(f"Missing from dest: {[(r[0],r[1]) for r in missing_in_dest]}")
print(f"Extra in dest: {[(r[0],r[1]) for r in extra_in_dest]}")
🏋️ Practice: Set Operation Analytics
Create active_users_jan, active_users_feb, active_users_mar tables (each with user_id). Write queries to find: (1) Users active all 3 months (INTERSECT x3). (2) Users active in Jan but NOT Feb (EXCEPT). (3) All unique users across 3 months (UNION). (4) Users active in exactly 2 of 3 months using INTERSECT/EXCEPT combinations.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE active_jan (user_id INT)''')
run('''CREATE TABLE active_feb (user_id INT)''')
run('''CREATE TABLE active_mar (user_id INT)''')
runmany('INSERT INTO active_jan VALUES (?)', [(i,) for i in [1,2,3,4,5,6]])
runmany('INSERT INTO active_feb VALUES (?)', [(i,) for i in [2,3,4,5,7,8]])
runmany('INSERT INTO active_mar VALUES (?)', [(i,) for i in [3,4,5,6,8,9]])

# 1. All 3 months
rows = run('''
    SELECT user_id FROM active_jan
    INTERSECT SELECT user_id FROM active_feb
    INTERSECT SELECT user_id FROM active_mar
''')
print("Active all 3 months:", [r[0] for r in rows])

# 2. Jan but not Feb
rows = run('SELECT user_id FROM active_jan EXCEPT SELECT user_id FROM active_feb')
print("Jan but not Feb:", [r[0] for r in rows])

# 3. All unique
rows = run('''
    SELECT user_id FROM active_jan UNION
    SELECT user_id FROM active_feb UNION
    SELECT user_id FROM active_mar ORDER BY user_id
''')
print("All unique users:", [r[0] for r in rows])

# 4. TODO: Exactly 2 of 3 months
✅ Practice Checklist
22. Advanced CTEs & Chaining

CTEs (WITH clauses) can be chained, referenced multiple times, and nested to build complex queries step by step. They replace temp tables in most cases and dramatically improve readability.

Multiple CTEs and chaining
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE transactions (id INT, cust_id INT, amount REAL, category TEXT, ts TEXT)''')
runmany('INSERT INTO transactions VALUES (?,?,?,?,?)', [
    (1,1,150,'food','2024-01-05'),(2,1,300,'tech','2024-01-10'),
    (3,2,200,'food','2024-01-08'),(4,2,100,'food','2024-01-12'),
    (5,3,500,'tech','2024-01-03'),(6,3,250,'clothing','2024-01-15'),
    (7,1,80,'food','2024-02-01'),(8,2,600,'tech','2024-02-05'),
    (9,3,120,'food','2024-02-10'),
])

rows = run('''
    WITH
    -- CTE 1: customer totals
    cust_totals AS (
        SELECT cust_id, SUM(amount) AS total_spend,
               COUNT(*) AS n_transactions, MAX(ts) AS last_purchase
        FROM transactions GROUP BY cust_id
    ),
    -- CTE 2: top category per customer
    top_cat AS (
        SELECT cust_id,
               category,
               SUM(amount) AS cat_total,
               ROW_NUMBER() OVER (PARTITION BY cust_id ORDER BY SUM(amount) DESC) AS rn
        FROM transactions GROUP BY cust_id, category
    ),
    -- CTE 3: combine using previous CTEs
    summary AS (
        SELECT ct.cust_id, ct.total_spend, ct.n_transactions, ct.last_purchase,
               tc.category AS top_category
        FROM cust_totals ct
        JOIN top_cat tc ON ct.cust_id = tc.cust_id AND tc.rn = 1
    )
    SELECT *, ROUND(total_spend / n_transactions, 2) AS avg_txn
    FROM summary ORDER BY total_spend DESC
''')
print("Customer summary:")
for r in rows:
    print(f"  Cust {r[0]}: ${r[1]} total, {r[2]} txns, top={r[4]}, avg=${r[5]}")
Recursive CTE for hierarchies
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE org (id INT, name TEXT, manager_id INT, level INT)''')
runmany('INSERT INTO org VALUES (?,?,?,?)', [
    (1,'CEO',None,1),(2,'CTO',1,2),(3,'CFO',1,2),(4,'VP Eng',2,3),
    (5,'VP Data',2,3),(6,'Dir Finance',3,3),(7,'Sr Dev',4,4),
    (8,'Data Eng',5,4),(9,'Analyst',5,4),(10,'Accountant',6,4),
])

# Recursive CTE: traverse the org hierarchy
rows = run('''
    WITH RECURSIVE hierarchy AS (
        -- Base case: start from CEO (no manager)
        SELECT id, name, manager_id, 0 AS depth, name AS path
        FROM org WHERE manager_id IS NULL

        UNION ALL

        -- Recursive case: join to find reports
        SELECT o.id, o.name, o.manager_id,
               h.depth + 1,
               h.path || " -> " || o.name
        FROM org o
        JOIN hierarchy h ON o.manager_id = h.id
    )
    SELECT depth, name, path FROM hierarchy ORDER BY path
''')
for r in rows:
    indent = "  " * r[0]
    print(f"{indent}{r[1]}")

# Count reports under each manager
rows = run('''
    WITH RECURSIVE reports AS (
        SELECT id, manager_id FROM org
        UNION ALL
        SELECT r.id, o.manager_id FROM reports r JOIN org o ON o.id = r.manager_id
        WHERE o.manager_id IS NOT NULL
    )
    SELECT o.name, COUNT(*) AS total_reports
    FROM org o JOIN reports r ON r.manager_id = o.id
    GROUP BY o.id ORDER BY total_reports DESC LIMIT 5
''')
print("Managers by total reports:", [(r[0], r[1]) for r in rows])
CTEs for multi-step data transformations
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE raw_sales (rep TEXT, product TEXT, amount REAL, region TEXT, sale_date TEXT)''')
import random; random.seed(42)
reps = ['Alice','Bob','Carol','Dave','Eve']
prods = ['Widget','Gadget','Gizmo']
regions = ['North','South','East']
rows_data = [(random.choice(reps), random.choice(prods),
              round(random.uniform(100,2000), 2),
              random.choice(regions),
              f"2024-0{random.randint(1,3)}-{random.randint(1,28):02d}")
             for _ in range(50)]
runmany('INSERT INTO raw_sales VALUES (?,?,?,?,?)', rows_data)

rows = run('''
    WITH
    -- Step 1: normalize and add month
    cleaned AS (
        SELECT rep, product, region, amount,
               STRFTIME("%Y-%m", sale_date) AS month
        FROM raw_sales WHERE amount > 0
    ),
    -- Step 2: rep-level monthly totals
    rep_monthly AS (
        SELECT rep, month, SUM(amount) AS monthly_total,
               COUNT(*) AS n_sales
        FROM cleaned GROUP BY rep, month
    ),
    -- Step 3: rank reps per month
    ranked AS (
        SELECT *, RANK() OVER (PARTITION BY month ORDER BY monthly_total DESC) AS rank
        FROM rep_monthly
    )
    -- Step 4: show top-2 per month
    SELECT month, rank, rep, ROUND(monthly_total, 0) AS total, n_sales
    FROM ranked WHERE rank <= 2
    ORDER BY month, rank
''')
print("Top 2 reps per month:")
for r in rows: print(f"  {r[0]} #{r[1]}: {r[2]} ${r[3]:,.0f} ({r[4]} sales)")
💼 Real-World: Funnel Analysis
A product team uses chained CTEs to compute a conversion funnel: impression -> click -> add_to_cart -> purchase, with drop-off rates at each stage.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE funnel_events (user_id INT, event TEXT, ts TEXT)''')
import random; random.seed(42)
events_raw = []
for uid in range(1, 201):
    events_raw.append((uid,'impression','2024-01'))
    if random.random() < 0.6:   events_raw.append((uid,'click','2024-01'))
    if random.random() < 0.4:   events_raw.append((uid,'add_to_cart','2024-01'))
    if random.random() < 0.35:  events_raw.append((uid,'purchase','2024-01'))
runmany('INSERT INTO funnel_events VALUES (?,?,?)', events_raw)

rows = run('''
    WITH
    stage_counts AS (
        SELECT event,
               COUNT(DISTINCT user_id) AS users
        FROM funnel_events
        GROUP BY event
    ),
    ordered AS (
        SELECT event, users,
               CASE event
                   WHEN "impression"   THEN 1
                   WHEN "click"        THEN 2
                   WHEN "add_to_cart"  THEN 3
                   WHEN "purchase"     THEN 4
               END AS stage_order
        FROM stage_counts
    ),
    with_prev AS (
        SELECT event, users, stage_order,
               LAG(users) OVER (ORDER BY stage_order) AS prev_users
        FROM ordered
    )
    SELECT event, users,
           CASE WHEN prev_users IS NOT NULL
                THEN ROUND(100.0 * users / prev_users, 1)
                ELSE 100.0 END AS step_pct,
           ROUND(100.0 * users / MAX(users) OVER (), 1) AS overall_pct
    FROM with_prev ORDER BY stage_order
''')
print("Conversion funnel:")
for r in rows:
    bar = "#" * int(r[3] / 5)
    print(f"  {r[0]:15s}: {r[1]:>4d} users | step={r[2]:>5.1f}% | {bar}")
🏋️ Practice: CTE Report Builder
Build a multi-CTE query that: CTE1 computes monthly revenue per product from an orders table, CTE2 computes 3-month moving average per product (using LAG), CTE3 flags months where revenue dropped more than 20% vs the moving average. Return only flagged months with product, month, revenue, avg, and drop_pct.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE monthly_orders (product TEXT, month TEXT, revenue REAL)''')
runmany('INSERT INTO monthly_orders VALUES (?,?,?)', [
    ('Widget','2024-01',1000),('Widget','2024-02',950),('Widget','2024-03',700),
    ('Widget','2024-04',980),('Gadget','2024-01',500),('Gadget','2024-02',520),
    ('Gadget','2024-03',480),('Gadget','2024-04',200),
])

rows = run('''
    WITH
    -- CTE2: 3-month moving average using LAG
    with_lags AS (
        SELECT product, month, revenue,
               LAG(revenue,1) OVER (PARTITION BY product ORDER BY month) AS prev1,
               LAG(revenue,2) OVER (PARTITION BY product ORDER BY month) AS prev2
        FROM monthly_orders
    ),
    moving_avg AS (
        SELECT product, month, revenue,
               ROUND((revenue + COALESCE(prev1,revenue) + COALESCE(prev2,revenue)) / 3.0, 1) AS ma3
        FROM with_lags
    ),
    -- CTE3: flag months with >20% drop vs moving avg
    flagged AS (
        SELECT product, month, revenue, ma3,
               ROUND((revenue - ma3) / ma3 * 100, 1) AS drop_pct
        FROM moving_avg
        WHERE revenue < ma3 * 0.8   -- more than 20% below MA
    )
    SELECT * FROM flagged ORDER BY drop_pct
''')
print("Revenue alerts (>20% below 3-month MA):")
for r in rows: print(f"  {r[0]} {r[1]}: ${r[2]} vs MA ${r[3]} ({r[4]}%)")
✅ Practice Checklist
23. Views & Virtual Tables

Views are saved SELECT statements that behave like tables. They simplify complex queries, enforce access control, and create stable interfaces over evolving schema.

Creating and using views
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE employees (id INT, name TEXT, dept TEXT, salary REAL, hire_date TEXT)''')
run('''CREATE TABLE departments (id INT, name TEXT, budget REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?,?)', [
    (1,'Alice','Eng',90000,'2020-03-15'),(2,'Bob','Eng',85000,'2021-06-01'),
    (3,'Carol','Sales',70000,'2019-11-20'),(4,'Dave','Sales',72000,'2022-01-10'),
    (5,'Eve','HR',65000,'2020-08-05'),(6,'Frank','Eng',95000,'2018-04-22'),
])
runmany('INSERT INTO departments VALUES (?,?,?)', [
    (1,'Eng',500000),(2,'Sales',300000),(3,'HR',150000),
])

# Create a view
run('''
    CREATE VIEW emp_summary AS
    SELECT e.id, e.name, e.dept, e.salary,
           ROUND(100.0 * e.salary / d.budget, 2) AS salary_pct_budget,
           CAST((JULIANDAY("now") - JULIANDAY(e.hire_date)) / 365.25 AS INT) AS years
    FROM employees e
    JOIN departments d ON e.dept = d.name
''')

# Query the view like a table
rows = run('SELECT * FROM emp_summary ORDER BY salary DESC')
print("Employee summary (from view):")
for r in rows: print(f"  {r[1]} ({r[2]}): ${r[3]:,}, {r[4]:.2f}% of dept budget, {r[5]} yrs")

# View within query
rows = run('''
    SELECT dept, COUNT(*) AS n, ROUND(AVG(salary), 0) AS avg_sal
    FROM emp_summary
    GROUP BY dept ORDER BY avg_sal DESC
''')
for r in rows: print(f"  {r[0]}: {r[1]} employees, avg ${r[2]:,}")
View updates and DROP
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE products (id INT, name TEXT, price REAL, active INT DEFAULT 1)''')
runmany('INSERT INTO products VALUES (?,?,?,?)', [
    (1,'Apple',1.2,1),(2,'Banana',0.5,1),(3,'Carrot',0.8,0),(4,'Date',2.0,1),
])

run('''CREATE VIEW active_products AS
       SELECT id, name, price FROM products WHERE active = 1''')

rows = run('SELECT * FROM active_products')
print("Active products:", rows)

# In SQLite, simple views can be updated using INSTEAD OF triggers
# But directly: INSERT INTO simple single-table view may work
run("INSERT INTO active_products VALUES (5, 'Elderberry', 3.5)")
rows = run('SELECT id, name, price, active FROM products WHERE id = 5')
print("Inserted via view:", rows)  # active defaults to 1

# DROP VIEW
run('DROP VIEW active_products')
print("View dropped. Tables unaffected.")
rows = run('SELECT COUNT(*) FROM products')
print("Products table still has:", rows[0][0], "rows")
Materialized views (via tables) and virtual tables
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
# SQLite doesn't have native materialized views
# Simulate with CREATE TABLE AS SELECT
run('''CREATE TABLE employees (id INT, dept TEXT, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?)', [
    (i, ['Eng','Sales','HR'][i%3], 60000 + i*5000) for i in range(12)
])

# Simulated materialized view (refresh manually)
run('''DROP TABLE IF EXISTS dept_stats_mv''')
run('''
    CREATE TABLE dept_stats_mv AS
    SELECT dept,
           COUNT(*) AS headcount,
           ROUND(AVG(salary), 0) AS avg_salary,
           MAX(salary) AS max_salary
    FROM employees GROUP BY dept
''')

rows = run('SELECT * FROM dept_stats_mv ORDER BY avg_salary DESC')
print("Materialized dept stats:")
for r in rows: print(f"  {r[0]}: {r[1]} people, avg ${r[2]:,}, max ${r[3]:,}")

# Refresh: insert more data, recreate
run('INSERT INTO employees VALUES (99, "Eng", 200000)')
run('DROP TABLE dept_stats_mv')
run('''
    CREATE TABLE dept_stats_mv AS
    SELECT dept, COUNT(*) AS headcount, ROUND(AVG(salary),0) AS avg_salary
    FROM employees GROUP BY dept
''')
rows = run('SELECT * FROM dept_stats_mv ORDER BY dept')
print("Refreshed stats:", rows)
💼 Real-World: Reporting View Layer
A BI team creates views for sales, customer, and product reporting so analysts never need to write complex JOINs β€” they just query well-named views.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE orders (id INT, cust_id INT, product_id INT, amount REAL, order_date TEXT)''')
run('''CREATE TABLE customers (id INT, name TEXT, tier TEXT, region TEXT)''')
run('''CREATE TABLE products (id INT, name TEXT, category TEXT, cost REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?,?)', [
    (1,1,1,500,'2024-01-10'),(2,1,2,300,'2024-01-15'),(3,2,1,800,'2024-02-05'),
    (4,2,3,200,'2024-02-20'),(5,3,2,600,'2024-03-01'),(6,1,3,400,'2024-03-10'),
])
runmany('INSERT INTO customers VALUES (?,?,?,?)', [
    (1,'Alice','gold','North'),(2,'Bob','silver','South'),(3,'Carol','bronze','East'),
])
runmany('INSERT INTO products VALUES (?,?,?,?)', [
    (1,'Widget','Hardware',200),(2,'Gadget','Electronics',100),(3,'Gizmo','Software',50),
])

run('''CREATE VIEW order_detail AS
    SELECT o.id AS order_id, c.name AS customer, c.tier, c.region,
           p.name AS product, p.category,
           o.amount, p.cost,
           ROUND(o.amount - p.cost, 2) AS margin,
           ROUND((o.amount - p.cost) / o.amount * 100, 1) AS margin_pct,
           o.order_date
    FROM orders o
    JOIN customers c ON c.id = o.cust_id
    JOIN products p ON p.id = o.product_id
''')

rows = run('SELECT customer, product, amount, margin_pct FROM order_detail ORDER BY margin_pct DESC')
print("Order detail (from view):")
for r in rows: print(f"  {r[0]} - {r[1]}: ${r[2]} ({r[3]}% margin)")

rows = run('SELECT category, ROUND(AVG(margin_pct),1) AS avg_margin FROM order_detail GROUP BY category')
print("Avg margin by category:", rows)
🏋️ Practice: View-Based Dashboard
Create a sales table (id, rep, product, amount, region, sale_date). Create: (1) A view rep_dashboard with each rep's total, count, avg, and rank. (2) A view regional_summary with region totals and % of grand total. (3) A view top_products showing the top 5 products by revenue. Query each view to confirm it works.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE sales (id INT, rep TEXT, product TEXT, amount REAL, region TEXT, sale_date TEXT)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?,?)', [
    (1,'Alice','Widget',500,'North','2024-01-10'),
    (2,'Alice','Gadget',300,'North','2024-01-15'),
    (3,'Bob','Widget',800,'South','2024-02-01'),
    (4,'Bob','Gizmo',200,'South','2024-02-10'),
    (5,'Carol','Gadget',600,'East','2024-02-20'),
    (6,'Alice','Gizmo',400,'North','2024-03-01'),
    (7,'Carol','Widget',700,'East','2024-03-10'),
    (8,'Bob','Gadget',350,'South','2024-03-15'),
])

# 1. Rep dashboard view
run('''CREATE VIEW rep_dashboard AS
    SELECT rep, SUM(amount) AS total, COUNT(*) AS n_sales,
           ROUND(AVG(amount),0) AS avg_sale,
           RANK() OVER (ORDER BY SUM(amount) DESC) AS rank
    FROM sales GROUP BY rep
''')
print("=== Rep Dashboard ===")
for r in run('SELECT * FROM rep_dashboard ORDER BY rank'): print(f"  {r}")

# 2. Regional summary view
run('''CREATE VIEW regional_summary AS
    SELECT region, ROUND(SUM(amount),0) AS total,
           ROUND(100.0*SUM(amount)/SUM(SUM(amount)) OVER (), 1) AS pct_of_total
    FROM sales GROUP BY region
''')
print("=== Regional Summary ===")
for r in run('SELECT * FROM regional_summary ORDER BY total DESC'): print(f"  {r}")

# 3. TODO: top products view
✅ Practice Checklist
24. Data Integrity & Constraints

Constraints (PRIMARY KEY, UNIQUE, NOT NULL, CHECK, FOREIGN KEY) enforce data quality at the database level β€” the last line of defense against bad data.

PRIMARY KEY, UNIQUE, NOT NULL, CHECK
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
conn.execute('PRAGMA foreign_keys = ON')

run('''CREATE TABLE products (
    id      INTEGER PRIMARY KEY AUTOINCREMENT,
    sku     TEXT    NOT NULL UNIQUE,
    name    TEXT    NOT NULL,
    price   REAL    NOT NULL CHECK (price > 0),
    stock   INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
    category TEXT   CHECK (category IN ("food", "tech", "clothing"))
)''')

# Valid insert
run("INSERT INTO products (sku, name, price, stock, category) VALUES ('SKU-001', 'Apple', 1.5, 100, 'food')")
run("INSERT INTO products (sku, name, price, stock, category) VALUES ('SKU-002', 'Phone', 699.0, 50, 'tech')")
print("Inserted valid products:", run('SELECT id, sku, name, price FROM products'))

# Test constraint violations
import sqlite3 as _sqlite3
tests = [
    ("Duplicate SKU",       "INSERT INTO products (sku, name, price) VALUES ('SKU-001', 'Pear', 0.9)"),
    ("Negative price",      "INSERT INTO products (sku, name, price) VALUES ('SKU-003', 'X', -5.0)"),
    ("Negative stock",      "INSERT INTO products (sku, name, price, stock) VALUES ('SKU-004', 'Y', 1.0, -10)"),
    ("Invalid category",    "INSERT INTO products (sku, name, price, category) VALUES ('SKU-005', 'Z', 2.0, 'toys')"),
    ("NULL name",           "INSERT INTO products (sku, price) VALUES ('SKU-006', 3.0)"),
]
for label, sql in tests:
    try:
        run(sql)
        print(f"  FAIL: {label} should have been rejected!")
    except Exception as e:
        print(f"  OK: {label} rejected -> {str(e)[:50]}")
FOREIGN KEY constraints
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE categories (id INT PRIMARY KEY, name TEXT NOT NULL UNIQUE)''')
run('''CREATE TABLE products (
    id INT PRIMARY KEY,
    name TEXT NOT NULL,
    cat_id INT NOT NULL REFERENCES categories(id) ON DELETE RESTRICT ON UPDATE CASCADE
)''')
run('''CREATE TABLE orders (
    id INT PRIMARY KEY,
    product_id INT NOT NULL REFERENCES products(id) ON DELETE CASCADE
)''')

runmany('INSERT INTO categories VALUES (?,?)', [(1,'Food'),(2,'Tech'),(3,'Clothing')])
runmany('INSERT INTO products VALUES (?,?,?)', [(1,'Apple',1),(2,'Phone',2),(3,'Shirt',3)])
runmany('INSERT INTO orders VALUES (?,?)', [(1,1),(2,1),(3,2)])

# Violate FK
import sqlite3 as _sqlite3
try:
    run('INSERT INTO products VALUES (4, "Widget", 99)')   # cat_id=99 doesn't exist
    print("FAIL: should have rejected")
except Exception as e:
    print("OK: FK rejected:", str(e)[:50])

# ON DELETE CASCADE: delete product -> orders cascade deleted
print("Orders before:", run('SELECT COUNT(*) FROM orders')[0][0])
run('DELETE FROM products WHERE id = 1')
print("Orders after deleting product 1:", run('SELECT COUNT(*) FROM orders')[0][0])

# ON DELETE RESTRICT: cannot delete category if products reference it
try:
    run('DELETE FROM categories WHERE id = 2')
    print("FAIL: should have rejected")
except Exception as e:
    print("OK: RESTRICT works:", str(e)[:50])
ON CONFLICT and UPSERT
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
run('''CREATE TABLE inventory (
    sku   TEXT PRIMARY KEY,
    name  TEXT NOT NULL,
    stock INT  NOT NULL DEFAULT 0 CHECK (stock >= 0)
)''')
runmany('INSERT INTO inventory VALUES (?,?,?)', [
    ('A001','Apple',100),('B002','Banana',200),
])

# ON CONFLICT IGNORE: skip duplicate inserts silently
run("INSERT OR IGNORE INTO inventory VALUES ('A001', 'Apple Updated', 150)")
print("After OR IGNORE:", run("SELECT stock FROM inventory WHERE sku='A001'")[0][0])  # still 100

# ON CONFLICT REPLACE: delete + re-insert
run("INSERT OR REPLACE INTO inventory VALUES ('A001', 'Apple Premium', 120)")
print("After OR REPLACE:", run("SELECT name, stock FROM inventory WHERE sku='A001'")[0])

# UPSERT (INSERT ... ON CONFLICT DO UPDATE) - SQLite 3.24+
run('''
    INSERT INTO inventory (sku, name, stock) VALUES ("C003", "Cherry", 50)
    ON CONFLICT(sku) DO UPDATE SET
        stock = stock + excluded.stock,
        name  = excluded.name
''')
run('''
    INSERT INTO inventory (sku, name, stock) VALUES ("C003", "Cherry Deluxe", 30)
    ON CONFLICT(sku) DO UPDATE SET
        stock = stock + excluded.stock,
        name  = excluded.name
''')
print("After UPSERT x2:", run("SELECT name, stock FROM inventory WHERE sku='C003'")[0])
💼 Real-World: Inventory Management System
An e-commerce warehouse uses FK constraints, CHECK constraints, and UPSERT to maintain data integrity across products, stock levels, and order fulfillment.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE warehouses (id INT PRIMARY KEY, location TEXT)''')
run('''CREATE TABLE products (id INT PRIMARY KEY, sku TEXT UNIQUE NOT NULL,
    name TEXT NOT NULL, price REAL CHECK (price > 0))''')
run('''CREATE TABLE stock_levels (
    product_id INT REFERENCES products(id) ON DELETE CASCADE,
    warehouse_id INT REFERENCES warehouses(id),
    qty INT NOT NULL DEFAULT 0 CHECK (qty >= 0),
    PRIMARY KEY (product_id, warehouse_id)
)''')

runmany('INSERT INTO warehouses VALUES (?,?)', [(1,'NYC'),(2,'LA'),(3,'Chicago')])
runmany('INSERT INTO products VALUES (?,?,?,?)',
        [(1,'SKU-A','Widget',29.99),(2,'SKU-B','Gadget',49.99)])
runmany('INSERT INTO stock_levels VALUES (?,?,?)',
        [(1,1,500),(1,2,300),(2,1,200),(2,3,150)])

# UPSERT stock: add received quantity
def receive_stock(product_id, warehouse_id, qty):
    run(f'''
        INSERT INTO stock_levels (product_id, warehouse_id, qty)
        VALUES ({product_id}, {warehouse_id}, {qty})
        ON CONFLICT (product_id, warehouse_id) DO UPDATE SET qty = qty + {qty}
    ''')

receive_stock(1, 1, 100)
receive_stock(2, 2, 50)  # new location

rows = run('''
    SELECT p.name, w.location, sl.qty
    FROM stock_levels sl
    JOIN products p ON p.id = sl.product_id
    JOIN warehouses w ON w.id = sl.warehouse_id
    ORDER BY p.name, sl.qty DESC
''')
print("Current stock:")
for r in rows: print(f"  {r[0]} @ {r[1]}: {r[2]} units")
🏋️ Practice: Bank Account Constraints
Create an accounts table (id, holder, balance REAL CHECK >= 0, account_type CHECK IN list, opened_date). Create a transactions table (id, account_id FK, amount, type CHECK IN debit/credit, ts). Write: (1) Insert a debit that would go negative (should fail). (2) A trigger that auto-updates balance on transaction insert. (3) UPSERT to create or update account balance.
Starter Code
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
    with contextlib.closing(conn.cursor()) as cur:
        cur.execute(sql, params)
        if cur.description:
            return cur.fetchall()
        conn.commit()
        return cur.rowcount
def runmany(sql, rows):
    with contextlib.closing(conn.cursor()) as cur:
        cur.executemany(sql, rows)
        conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE accounts (
    id INT PRIMARY KEY,
    holder TEXT NOT NULL,
    balance REAL NOT NULL DEFAULT 0 CHECK (balance >= 0),
    account_type TEXT CHECK (account_type IN ("checking", "savings")),
    opened_date TEXT DEFAULT (DATE("now"))
)''')
run('''CREATE TABLE txns (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    account_id INT NOT NULL REFERENCES accounts(id),
    amount REAL NOT NULL CHECK (amount > 0),
    type TEXT NOT NULL CHECK (type IN ("debit","credit")),
    ts TEXT DEFAULT (DATETIME("now"))
)''')

run("INSERT INTO accounts VALUES (1,'Alice',1000,'checking','2024-01-01')")

# 1. Try to go negative via debit > balance
import sqlite3 as _sqlite3
try:
    run("INSERT INTO txns (account_id, amount, type) VALUES (1, 5000, 'debit')")
    # Without trigger, balance isn't auto-updated. Add trigger below.
except Exception as e:
    print("Error:", e)

# 2. TODO: CREATE TRIGGER after_txn AFTER INSERT ON txns
#    that updates accounts.balance = balance - amount WHERE type='debit'
#    or balance + amount WHERE type='credit'

# 3. TODO: UPSERT to create or update an account balance
print("Current balance:", run("SELECT balance FROM accounts WHERE id=1")[0][0])
✅ Practice Checklist
25. Advanced Aggregations

Use FILTER clauses for conditional aggregation, emulate ROLLUP with UNION ALL, and combine window functions for multi-dimensional analysis.

FILTER clause – conditional sums per quarter
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales (
    id INTEGER PRIMARY KEY,
    region TEXT, product TEXT,
    amount REAL, q INTEGER)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?)', [
    (1,'North','A',100,1),(2,'North','B',200,1),
    (3,'South','A',150,2),(4,'South','B',250,2),
    (5,'North','A',120,3),(6,'South','B',300,3)])
# FILTER clause – conditional aggregation
rows = run('''
    SELECT
        region,
        SUM(amount) AS total,
        SUM(amount) FILTER(WHERE q=1) AS q1,
        SUM(amount) FILTER(WHERE q=2) AS q2
    FROM sales GROUP BY region''')
print(rows)
Emulate ROLLUP with UNION ALL
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales2 (
    region TEXT, product TEXT, amount REAL)''')
runmany('INSERT INTO sales2 VALUES (?,?,?)', [
    ('North','A',100),('North','B',200),
    ('South','A',150),('South','B',250)])
# Emulate ROLLUP with UNION ALL
rows = run('''
    SELECT region, product, SUM(amount)
    FROM sales2 GROUP BY region, product
    UNION ALL
    SELECT region, NULL, SUM(amount)
    FROM sales2 GROUP BY region
    UNION ALL
    SELECT NULL, NULL, SUM(amount) FROM sales2
    ORDER BY 1,2''')
for r in rows: print(r)
RANK and percent-rank
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS metrics (
    dept TEXT, month INTEGER, revenue REAL)''')
runmany('INSERT INTO metrics VALUES (?,?,?)', [
    ('Eng',1,5000),('Eng',2,6000),('HR',1,2000),('HR',2,2500)])
# PERCENT_RANK and CUME_DIST
rows = run('''
    SELECT dept, month, revenue,
        RANK() OVER (ORDER BY revenue) AS rnk,
        ROUND(100.0*RANK() OVER(ORDER BY revenue)/COUNT(*) OVER(),1) AS pct
    FROM metrics ORDER BY revenue''')
for r in rows: print(r)
Multi-dimensional conditional aggregation
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS orders (
    id INTEGER PRIMARY KEY, customer TEXT,
    status TEXT, amount REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
    (1,'Alice','paid',300),(2,'Bob','pending',150),
    (3,'Alice','paid',200),(4,'Bob','paid',400),(5,'Carol','refunded',100)])
# Multi-dimensional conditional aggregation
rows = run('''
    SELECT customer,
        COUNT(*) FILTER(WHERE status='paid') AS paid_cnt,
        SUM(amount) FILTER(WHERE status='paid') AS paid_sum,
        COUNT(*) FILTER(WHERE status='pending') AS pend_cnt
    FROM orders GROUP BY customer ORDER BY customer''')
for r in rows: print(r)
💼 Real-World: E-commerce Revenue Breakdown
An analytics team needs quarterly revenue by region, total rows, and a grand-total rollup β€” all in one query.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS rev (
    region TEXT, q INTEGER, amount REAL)''')
runmany('INSERT INTO rev VALUES (?,?,?)', [
    ('North',1,500),('North',2,700),
    ('South',1,400),('South',2,600)])
rows = run('''
    SELECT region, q,
        SUM(amount) AS total,
        SUM(amount) FILTER(WHERE q=1) AS q1_total
    FROM rev GROUP BY region, q
    UNION ALL
    SELECT NULL, NULL, SUM(amount), NULL FROM rev''')
for r in rows: print(r)
🏋️ Practice: Conditional Aggregation Practice
Create a 'transactions' table with columns (id, type TEXT, amount REAL). Write a query that returns total amount where type='credit', total where type='debit', and the net difference β€” all in one row.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS transactions (
    id INTEGER PRIMARY KEY, type TEXT, amount REAL)''')
runmany('INSERT INTO transactions VALUES (?,?,?)', [
    (1,'credit',500),(2,'debit',200),(3,'credit',300),(4,'debit',150)])
# Write your FILTER aggregation query here
✅ Practice Checklist
26. Self-Joins & Non-Equi Joins

Use self-joins to query hierarchical data, non-equi joins for range matching, and interval joins to detect overlaps.

Self-join: employee-manager hierarchy
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS employees (
    id INTEGER PRIMARY KEY, name TEXT,
    manager_id INTEGER, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
    (1,'Alice',None,9000),(2,'Bob',1,7000),
    (3,'Carol',1,7500),(4,'Dave',2,5000),(5,'Eve',2,5500)])
# Self-join: each employee with their manager's name
rows = run('''
    SELECT e.name AS employee, m.name AS manager
    FROM employees e
    LEFT JOIN employees m ON e.manager_id = m.id
    ORDER BY e.id''')
for r in rows: print(r)
Non-equi join: salary comparisons within dept
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS employees2 (
    id INTEGER PRIMARY KEY, name TEXT, salary REAL, dept TEXT)''')
runmany('INSERT INTO employees2 VALUES (?,?,?,?)', [
    (1,'Alice',9000,'Eng'),(2,'Bob',7000,'Eng'),
    (3,'Carol',7500,'HR'),(4,'Dave',5000,'HR')])
# Non-equi join: employees earning more than others in same dept
rows = run('''
    SELECT a.name AS higher, b.name AS lower,
           a.salary - b.salary AS diff
    FROM employees2 a
    JOIN employees2 b
      ON a.dept = b.dept AND a.salary > b.salary
    ORDER BY diff DESC''')
for r in rows: print(r)
Range join: match orders to price bands
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS prices (
    product TEXT, price REAL, valid_from TEXT, valid_to TEXT)''')
runmany('INSERT INTO prices VALUES (?,?,?,?)', [
    ('Widget',10.0,'2024-01-01','2024-03-31'),
    ('Widget',12.0,'2024-04-01','2024-12-31'),
    ('Gadget',25.0,'2024-01-01','2024-12-31')])
run('''CREATE TABLE IF NOT EXISTS orders3 (
    id INTEGER, product TEXT, order_date TEXT, qty INTEGER)''')
runmany('INSERT INTO orders3 VALUES (?,?,?,?)', [
    (1,'Widget','2024-02-15',3),(2,'Widget','2024-05-20',5),(3,'Gadget','2024-03-10',2)])
# Range join: match orders to the correct price band
rows = run('''
    SELECT o.id, o.product, o.order_date, p.price, o.qty*p.price AS total
    FROM orders3 o
    JOIN prices p
      ON o.product=p.product
     AND o.order_date BETWEEN p.valid_from AND p.valid_to
    ORDER BY o.id''')
for r in rows: print(r)
Interval join: detect overlapping tasks
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS tasks (
    id INTEGER PRIMARY KEY, name TEXT,
    start TEXT, end TEXT)''')
runmany('INSERT INTO tasks VALUES (?,?,?,?)', [
    (1,'Deploy','2024-01-10','2024-01-15'),
    (2,'Test','2024-01-12','2024-01-18'),
    (3,'Review','2024-01-20','2024-01-25'),
    (4,'Release','2024-01-16','2024-01-22')])
# Find overlapping task pairs
rows = run('''
    SELECT a.name AS task1, b.name AS task2
    FROM tasks a JOIN tasks b
      ON a.id < b.id
     AND a.start <= b.end
     AND a.end >= b.start
    ORDER BY a.id''')
for r in rows: print(r)
💼 Real-World: Org-Chart & Salary Reporting
HR needs a report showing each employee, their direct manager, and whether they earn more than the average of their peer group.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS staff (
    id INTEGER PRIMARY KEY, name TEXT,
    manager_id INTEGER, salary REAL, dept TEXT)''')
runmany('INSERT INTO staff VALUES (?,?,?,?,?)', [
    (1,'CEO',None,15000,'Exec'),(2,'CTO',1,12000,'Eng'),
    (3,'Dev1',2,8000,'Eng'),(4,'Dev2',2,7500,'Eng'),(5,'CHRO',1,11000,'HR')])
rows = run('''
    SELECT e.name, m.name AS mgr,
        e.salary,
        AVG(p.salary) OVER(PARTITION BY e.manager_id) AS peer_avg
    FROM staff e
    LEFT JOIN staff m ON e.manager_id=m.id
    ORDER BY e.id''')
for r in rows: print(r)
🏋️ Practice: Self-Join Practice
Using the 'staff' table above, find all pairs of employees in the same department where one earns at least 20% more than the other.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS staff (
    id INTEGER PRIMARY KEY, name TEXT,
    dept TEXT, salary REAL)''')
runmany('INSERT INTO staff VALUES (?,?,?,?)', [
    (1,'Alice','Eng',9000),(2,'Bob','Eng',7000),
    (3,'Carol','HR',8000),(4,'Dave','HR',6000)])
# Write a non-equi self-join here
✅ Practice Checklist
27. Analytical Functions Advanced

Go beyond basic ranking: use LAG/LEAD for period comparisons, NTILE for bucketing, FIRST_VALUE/LAST_VALUE for partition anchors, and rolling window frames.

LAG: daily price change
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS stock (
    dt TEXT, ticker TEXT, close REAL)''')
runmany('INSERT INTO stock VALUES (?,?,?)', [
    ('2024-01-01','AAPL',185.0),('2024-01-02','AAPL',186.5),
    ('2024-01-03','AAPL',184.0),('2024-01-04','AAPL',188.0),
    ('2024-01-05','AAPL',190.0)])
# Daily return using LAG
rows = run('''
    SELECT dt, close,
        LAG(close) OVER(ORDER BY dt) AS prev,
        ROUND(close - LAG(close) OVER(ORDER BY dt), 2) AS change
    FROM stock ORDER BY dt''')
for r in rows: print(r)
LEAD & rolling 3-period average
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales3 (
    month INTEGER, revenue REAL)''')
runmany('INSERT INTO sales3 VALUES (?,?)', [
    (1,5000),(2,5500),(3,4800),(4,6200),(5,6800),(6,7100)])
# Compare to next month with LEAD; running avg
rows = run('''
    SELECT month, revenue,
        LEAD(revenue) OVER(ORDER BY month) AS next_month,
        ROUND(AVG(revenue) OVER(
            ORDER BY month ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),2) AS rolling3
    FROM sales3''')
for r in rows: print(r)
NTILE: quartile bucketing
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS scores (
    student TEXT, score INTEGER)''')
runmany('INSERT INTO scores VALUES (?,?)', [
    ('Alice',92),('Bob',75),('Carol',88),
    ('Dave',60),('Eve',95),('Frank',70)])
# NTILE: split into quartiles
rows = run('''
    SELECT student, score,
        NTILE(4) OVER(ORDER BY score) AS quartile
    FROM scores ORDER BY score''')
for r in rows: print(r)
FIRST_VALUE & MAX per partition
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS temps (
    city TEXT, day INTEGER, temp REAL)''')
runmany('INSERT INTO temps VALUES (?,?,?)', [
    ('NYC',1,5.0),('NYC',2,7.0),('NYC',3,4.0),
    ('LA',1,20.0),('LA',2,22.0),('LA',3,19.0)])
# FIRST_VALUE and LAST_VALUE per city
rows = run('''
    SELECT city, day, temp,
        FIRST_VALUE(temp) OVER(PARTITION BY city ORDER BY day) AS first_t,
        MAX(temp) OVER(PARTITION BY city) AS peak
    FROM temps ORDER BY city, day''')
for r in rows: print(r)
💼 Real-World: Sales Trend Analysis
A business analyst wants to see month-over-month revenue change, the next month forecast gap, and each month's performance bucket (top 25%, etc.).
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS monthly (
    month INTEGER, revenue REAL)''')
runmany('INSERT INTO monthly VALUES (?,?)', [
    (1,4000),(2,4500),(3,4200),(4,5000),(5,5500),(6,6000)])
rows = run('''
    SELECT month, revenue,
        LAG(revenue) OVER(ORDER BY month) AS prev,
        ROUND(revenue - LAG(revenue) OVER(ORDER BY month),2) AS mom_change,
        NTILE(3) OVER(ORDER BY revenue) AS tier
    FROM monthly''')
for r in rows: print(r)
🏋️ Practice: Window Function Practice
Using a 'visits' table with columns (user_id, visit_date TEXT, pages_viewed INTEGER), compute for each user: their previous visit date, the gap in days between visits, and their max pages_viewed.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS visits (
    user_id INTEGER, visit_date TEXT, pages_viewed INTEGER)''')
runmany('INSERT INTO visits VALUES (?,?,?)', [
    (1,'2024-01-01',5),(1,'2024-01-05',8),(1,'2024-01-12',3),
    (2,'2024-01-02',10),(2,'2024-01-10',7)])
# Write LAG and MAX window functions here
✅ Practice Checklist
28. Database Design & Normalization

Apply 1NF, 2NF, and 3NF to eliminate redundancy, design proper primary and foreign keys, and inspect schemas with SQLite system tables.

1NF: atomic values and separate phone table
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# 1NF: atomic values, no repeating groups
run('''CREATE TABLE IF NOT EXISTS contacts (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT NOT NULL)''')
run('''CREATE TABLE IF NOT EXISTS contact_phones (
    contact_id INTEGER,
    phone TEXT NOT NULL,
    FOREIGN KEY(contact_id) REFERENCES contacts(id))''')
runmany('INSERT INTO contacts VALUES (?,?,?)', [
    (1,'Alice','alice@ex.com'),(2,'Bob','bob@ex.com')])
runmany('INSERT INTO contact_phones VALUES (?,?)', [
    (1,'555-0001'),(1,'555-0002'),(2,'555-0003')])
print(run('SELECT * FROM contacts'))
print(run('SELECT * FROM contact_phones'))
2NF: remove partial dependency with products table
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# 2NF: remove partial dependencies (composite key fix)
run('''CREATE TABLE IF NOT EXISTS products (
    product_id INTEGER PRIMARY KEY,
    product_name TEXT NOT NULL,
    price REAL NOT NULL)''')
run('''CREATE TABLE IF NOT EXISTS order_items (
    order_id INTEGER,
    product_id INTEGER,
    qty INTEGER NOT NULL,
    PRIMARY KEY(order_id, product_id),
    FOREIGN KEY(product_id) REFERENCES products(product_id))''')
runmany('INSERT INTO products VALUES (?,?,?)', [
    (10,'Widget',9.99),(11,'Gadget',24.99)])
runmany('INSERT INTO order_items VALUES (?,?,?)', [
    (1,10,3),(1,11,1),(2,10,5)])
rows = run('''
    SELECT oi.order_id, p.product_name, oi.qty, p.price*oi.qty AS subtotal
    FROM order_items oi JOIN products p ON oi.product_id=p.product_id''')
for r in rows: print(r)
3NF: move location to departments table
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# 3NF: remove transitive dependencies
run('''CREATE TABLE IF NOT EXISTS departments (
    dept_id INTEGER PRIMARY KEY, dept_name TEXT, location TEXT)''')
run('''CREATE TABLE IF NOT EXISTS staff2 (
    id INTEGER PRIMARY KEY, name TEXT,
    dept_id INTEGER,
    FOREIGN KEY(dept_id) REFERENCES departments(dept_id))''')
runmany('INSERT INTO departments VALUES (?,?,?)', [
    (1,'Engineering','Floor 3'),(2,'HR','Floor 1')])
runmany('INSERT INTO staff2 VALUES (?,?,?)', [
    (1,'Alice',1),(2,'Bob',1),(3,'Carol',2)])
rows = run('''
    SELECT s.name, d.dept_name, d.location
    FROM staff2 s JOIN departments d ON s.dept_id=d.dept_id''')
for r in rows: print(r)
Schema inspection with PRAGMA and sqlite_master
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# Schema inspection and FK checks
run('''CREATE TABLE IF NOT EXISTS parent (id INTEGER PRIMARY KEY, val TEXT)''')
run('''CREATE TABLE IF NOT EXISTS child (
    id INTEGER PRIMARY KEY,
    parent_id INTEGER,
    FOREIGN KEY(parent_id) REFERENCES parent(id))''')
run('''PRAGMA foreign_keys=ON''')
runmany('INSERT INTO parent VALUES (?,?)', [(1,'A'),(2,'B')])
runmany('INSERT INTO child VALUES (?,?)', [(10,1),(11,2)])
# List tables and schema
tables = run("SELECT name FROM sqlite_master WHERE type='table'")
print('Tables:', tables)
schema = run("SELECT sql FROM sqlite_master WHERE name='child'")
print(schema[0][0])
💼 Real-World: Customer Orders Schema
Design a normalized schema for customers, orders, and products. Ensure no partial or transitive dependencies, then write a JOIN query to get order summaries.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS customers (
    id INTEGER PRIMARY KEY, name TEXT, email TEXT)''')
run('''CREATE TABLE IF NOT EXISTS products2 (
    id INTEGER PRIMARY KEY, name TEXT, price REAL)''')
run('''CREATE TABLE IF NOT EXISTS orders2 (
    id INTEGER PRIMARY KEY, customer_id INTEGER, order_date TEXT)''')
run('''CREATE TABLE IF NOT EXISTS order_lines (
    order_id INTEGER, product_id INTEGER, qty INTEGER,
    PRIMARY KEY(order_id, product_id))''')
runmany('INSERT INTO customers VALUES (?,?,?)', [(1,'Alice','a@ex.com')])
runmany('INSERT INTO products2 VALUES (?,?,?)', [(1,'Widget',10.0)])
runmany('INSERT INTO orders2 VALUES (?,?,?)', [(1,1,'2024-01-15')])
runmany('INSERT INTO order_lines VALUES (?,?,?)', [(1,1,3)])
rows = run('''
    SELECT c.name, o.order_date, p.name, ol.qty, p.price*ol.qty AS total
    FROM orders2 o
    JOIN customers c ON o.customer_id=c.id
    JOIN order_lines ol ON ol.order_id=o.id
    JOIN products2 p ON p.id=ol.product_id''')
for r in rows: print(r)
🏋️ Practice: Normalization Practice
You have a denormalized table: orders_flat(order_id, customer_name, customer_email, product_name, price, qty). Identify the normal form violations and write CREATE TABLE statements for a 3NF schema.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# Denormalized (bad):
# orders_flat: order_id, customer_name, customer_email, product_name, price, qty
# Violates 2NF (price depends only on product) and 3NF if email depends on name.
# Write your normalized CREATE TABLE statements here:

# run('''CREATE TABLE customers ...''')
# run('''CREATE TABLE products ...''')
# run('''CREATE TABLE orders ...''')
# run('''CREATE TABLE order_items ...''')
✅ Practice Checklist
29. JSON in SQL

Store, extract, modify, and filter JSON data in SQLite using json_extract, json_set, json_insert, and json_remove.

json_extract: read top-level fields
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS events (
    id INTEGER PRIMARY KEY,
    payload TEXT)''')  -- stored as JSON text
data = [
    (1, json.dumps({'user':'alice','action':'login','meta':{'ip':'1.2.3.4'}})),
    (2, json.dumps({'user':'bob','action':'purchase','meta':{'items':3,'total':59.99}})),
    (3, json.dumps({'user':'alice','action':'logout','meta':{'ip':'1.2.3.4'}})),
]
runmany('INSERT INTO events VALUES (?,?)', data)
# Extract top-level field with json_extract
rows = run("SELECT id, json_extract(payload,'$.user'), json_extract(payload,'$.action') FROM events")
for r in rows: print(r)
Nested and array element access
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS events2 (id INTEGER PRIMARY KEY, payload TEXT)''')
runmany('INSERT INTO events2 VALUES (?,?)', [
    (1, json.dumps({'user':'alice','tags':['sql','python'],'score':95})),
    (2, json.dumps({'user':'bob','tags':['ml','python'],'score':80})),
])
# Access nested and array elements
rows = run('''
    SELECT id,
        json_extract(payload, '$.user') AS user,
        json_extract(payload, '$.score') AS score,
        json_extract(payload, '$.tags[0]') AS first_tag
    FROM events2''')
for r in rows: print(r)
Modify JSON with json_set and json_insert
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS configs (
    id INTEGER PRIMARY KEY, settings TEXT)''')
run('''INSERT INTO configs VALUES (1, ?)''',
    (json.dumps({'theme':'dark','lang':'en','max_rows':100}),))
# Modify JSON with json_set, json_insert, json_remove
run('''UPDATE configs SET settings = json_set(settings, '$.theme', 'light') WHERE id=1''')
run('''UPDATE configs SET settings = json_set(settings, '$.debug', 1) WHERE id=1''')
row = run('SELECT settings FROM configs WHERE id=1')
print(json.loads(row[0][0]))
Filter rows by JSON field values
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS products3 (
    id INTEGER PRIMARY KEY, name TEXT, attributes TEXT)''')
runmany('INSERT INTO products3 VALUES (?,?,?)', [
    (1,'Widget', json.dumps({'color':'red','weight':0.5,'tags':['sale','new']})),
    (2,'Gadget', json.dumps({'color':'blue','weight':1.2,'tags':['new']})),
    (3,'Doohickey', json.dumps({'color':'red','weight':0.3,'tags':['sale']})),
])
# Filter on JSON field and aggregate
rows = run('''
    SELECT name,
        json_extract(attributes,'$.color') AS color,
        json_extract(attributes,'$.weight') AS weight
    FROM products3
    WHERE json_extract(attributes,'$.color')='red'
    ORDER BY weight''')
for r in rows: print(r)
💼 Real-World: User Event Logging
An app stores user events as JSON blobs. Query to find all purchase events, extract the total amount, and rank users by spend.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS user_events (
    id INTEGER PRIMARY KEY, payload TEXT)''')
runmany('INSERT INTO user_events VALUES (?,?)', [
    (1,json.dumps({'user':'alice','action':'purchase','amount':49.99})),
    (2,json.dumps({'user':'bob','action':'view','amount':0})),
    (3,json.dumps({'user':'alice','action':'purchase','amount':29.99})),
    (4,json.dumps({'user':'carol','action':'purchase','amount':99.99})),
])
rows = run('''
    SELECT
        json_extract(payload,'$.user') AS user,
        SUM(CAST(json_extract(payload,'$.amount') AS REAL)) AS total_spend
    FROM user_events
    WHERE json_extract(payload,'$.action')='purchase'
    GROUP BY user ORDER BY total_spend DESC''')
for r in rows: print(r)
🏋️ Practice: JSON Querying Practice
Create a 'logs' table with an id and a 'data' JSON column. Insert 3 log entries with fields: level (INFO/ERROR), message, and timestamp. Write a query to show only ERROR logs with their message and timestamp.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY, data TEXT)''')
runmany('INSERT INTO logs VALUES (?,?)', [
    (1,json.dumps({'level':'INFO','message':'Started','timestamp':'2024-01-01T09:00:00'})),
    (2,json.dumps({'level':'ERROR','message':'Null ref','timestamp':'2024-01-01T09:05:00'})),
    (3,json.dumps({'level':'ERROR','message':'Timeout','timestamp':'2024-01-01T09:10:00'})),
])
# Write your json_extract filter query here
✅ Practice Checklist
30. Full-Text Search

Use SQLite's FTS5 virtual tables to perform keyword search, phrase matching, boolean operators, column-specific search, and ranked results with snippets.

Create FTS5 table and basic MATCH search
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# FTS5 virtual table for full-text search
run('''CREATE VIRTUAL TABLE IF NOT EXISTS articles
    USING fts5(title, body, tokenize='porter ascii')''')
runmany('INSERT INTO articles VALUES (?,?)', [
    ('SQL Basics','Learn the fundamentals of SQL queries and databases'),
    ('Python for Data','Python is great for data analysis and machine learning'),
    ('Advanced SQL','Window functions and CTEs are powerful SQL features'),
    ('NumPy Guide','NumPy provides fast array operations for numerical data'),
])
# Simple search
rows = run("SELECT title FROM articles WHERE articles MATCH 'SQL'")
for r in rows: print(r)
Column-specific search and ranked results
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS docs
    USING fts5(title, content)''')
runmany('INSERT INTO docs VALUES (?,?)', [
    ('Intro to ML','Machine learning uses statistical models to make predictions'),
    ('Deep Learning','Neural networks form the basis of deep learning systems'),
    ('SQL Mastery','Master SQL with window functions and query optimization'),
    ('Data Wrangling','Pandas and NumPy help with data cleaning and transformation'),
])
# Phrase search and column filter
rows = run("SELECT title FROM docs WHERE docs MATCH 'title:SQL'")
print('Title match:', rows)
rows = run("SELECT title, rank FROM docs WHERE docs MATCH 'data' ORDER BY rank")
for r in rows: print(r)
Prefix search and AND/OR boolean operators
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS notes
    USING fts5(author, text)''')
runmany('INSERT INTO notes VALUES (?,?)', [
    ('Alice','Python decorators and closures are advanced features'),
    ('Bob','SQL joins and subqueries are essential for data analysis'),
    ('Alice','NumPy broadcasting makes array operations concise'),
    ('Bob','Window functions in SQL enable running totals and rankings'),
])
# Prefix search and boolean operators
rows = run("SELECT author, text FROM notes WHERE notes MATCH 'SQL AND data'")
print('AND:', rows)
rows = run("SELECT author, text FROM notes WHERE notes MATCH 'Python OR NumPy'")
print('OR:', [r[0] for r in rows])
Snippet extraction with highlighting
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS kb
    USING fts5(title, body)''')
runmany('INSERT INTO kb VALUES (?,?)', [
    ('Error Handling','Use try-except blocks to catch and handle Python exceptions gracefully'),
    ('Logging Best Practices','Configure logging with handlers, formatters and log levels'),
    ('Testing Strategies','Unit tests and integration tests ensure code reliability'),
    ('Code Review Tips','Peer code review improves code quality and knowledge sharing'),
])
# Snippet and highlight
rows = run('''
    SELECT title, snippet(kb, 1, '<b>', '</b>', '...', 8)
    FROM kb WHERE kb MATCH 'code'
    ORDER BY rank''')
for r in rows: print(r[0], '|', r[1])
💼 Real-World: Knowledge Base Search
A support team has a knowledge base table. Implement full-text search that returns articles matching a query, ranked by relevance, with a highlighted snippet.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS support_kb
    USING fts5(title, content, tokenize='porter ascii')''')
runmany('INSERT INTO support_kb VALUES (?,?)', [
    ('Password Reset','To reset your password go to settings and click reset'),
    ('Billing FAQ','For billing questions contact support or view your invoice'),
    ('Account Setup','Set up your account by verifying your email address'),
    ('Password Policy','Passwords must be 8 characters with a mix of letters and numbers'),
])
query = 'password'
rows = run(f'''
    SELECT title,
        snippet(support_kb, 1, '**', '**', '...', 6) AS preview,
        rank
    FROM support_kb
    WHERE support_kb MATCH ?
    ORDER BY rank
''', (query,))
for r in rows: print(r[0], '|', r[1])
🏋️ Practice: FTS5 Practice
Create an FTS5 table called 'recipes' with columns (name, ingredients, instructions). Insert 4 recipes. Write a query to find all recipes where the ingredients contain 'garlic' AND instructions contain 'stir'.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS recipes
    USING fts5(name, ingredients, instructions)''')
runmany('INSERT INTO recipes VALUES (?,?,?)', [
    ('Pasta','flour eggs garlic olive oil','stir pasta in boiling water'),
    ('Soup','carrots garlic onion broth','simmer and stir until soft'),
    ('Salad','lettuce tomato cucumber','toss with dressing'),
    ('Stir Fry','garlic ginger soy sauce vegetables','stir fry on high heat'),
])
# Write your FTS5 AND query here
✅ Practice Checklist
31. Performance Tuning & EXPLAIN

Use EXPLAIN QUERY PLAN to understand query execution, create regular, composite, covering, and partial indexes, and run ANALYZE to update planner statistics.

Index impact benchmark: with vs without
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport time
run('''CREATE TABLE IF NOT EXISTS big (
    id INTEGER PRIMARY KEY, val INTEGER, cat TEXT)''')
# Insert sample data
import random; random.seed(42)
rows = [(i, random.randint(1,1000), random.choice(['A','B','C'])) for i in range(1,5001)]
runmany('INSERT INTO big VALUES (?,?,?)', rows)
# Query without index
t0 = time.perf_counter()
run('SELECT COUNT(*) FROM big WHERE val > 500')
t1 = time.perf_counter()
# Add index
run('CREATE INDEX IF NOT EXISTS idx_val ON big(val)')
t2 = time.perf_counter()
run('SELECT COUNT(*) FROM big WHERE val > 500')
t3 = time.perf_counter()
print(f'Without index: {(t1-t0)*1000:.2f}ms')
print(f'With index:    {(t3-t2)*1000:.2f}ms')
EXPLAIN QUERY PLAN before and after composite index
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS orders4 (
    id INTEGER PRIMARY KEY, customer_id INTEGER,
    status TEXT, amount REAL)''')
import random; random.seed(0)
rows = [(i, random.randint(1,100), random.choice(['new','paid','shipped']),
         round(random.uniform(10,500),2)) for i in range(1,2001)]
runmany('INSERT INTO orders4 VALUES (?,?,?,?)', rows)
# EXPLAIN QUERY PLAN
plan = run('''EXPLAIN QUERY PLAN
    SELECT customer_id, SUM(amount)
    FROM orders4
    WHERE status='paid'
    GROUP BY customer_id''')
for p in plan: print(p)
# Add composite index
run('CREATE INDEX IF NOT EXISTS idx_status_cust ON orders4(status, customer_id)')
plan2 = run('''EXPLAIN QUERY PLAN
    SELECT customer_id, SUM(amount)
    FROM orders4 WHERE status='paid'
    GROUP BY customer_id''')
for p in plan2: print(p)
Covering index to avoid table scans
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS logs2 (
    id INTEGER PRIMARY KEY,
    user_id INTEGER, event TEXT, ts TEXT)''')
import random, string; random.seed(1)
events = ['login','logout','purchase','view']
rows = [(i, random.randint(1,50),
         random.choice(events),
         f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}')
        for i in range(1,3001)]
runmany('INSERT INTO logs2 VALUES (?,?,?,?)', rows)
# Covering index: avoid table access for common query
run('CREATE INDEX IF NOT EXISTS idx_cov ON logs2(user_id, event, ts)')
plan = run('''EXPLAIN QUERY PLAN
    SELECT user_id, event, ts FROM logs2
    WHERE user_id=1 ORDER BY ts''')
for p in plan: print(p)
Partial index and ANALYZE
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS items (
    id INTEGER PRIMARY KEY, name TEXT, price REAL, stock INTEGER)''')
import random; random.seed(5)
rows = [(i,f'item_{i}',round(random.uniform(1,100),2),random.randint(0,500)) for i in range(1,1001)]
runmany('INSERT INTO items VALUES (?,?,?,?)', rows)
# ANALYZE updates statistics for the query planner
run('ANALYZE')
# Partial index: only index in-stock items
run('CREATE INDEX IF NOT EXISTS idx_instock ON items(price) WHERE stock > 0')
plan = run('''EXPLAIN QUERY PLAN
    SELECT name, price FROM items
    WHERE stock > 0 AND price < 20
    ORDER BY price''')
for p in plan: print(p)
print(run('SELECT COUNT(*) FROM items WHERE stock>0'))
💼 Real-World: Slow Query Investigation
A production query joining orders and customers is slow. Use EXPLAIN QUERY PLAN to find the bottleneck, add an appropriate index, and verify the improvement.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS cust2 (id INTEGER PRIMARY KEY, name TEXT, tier TEXT)''')
run('''CREATE TABLE IF NOT EXISTS ord2 (
    id INTEGER PRIMARY KEY, cust_id INTEGER, amount REAL, status TEXT)''')
import random; random.seed(7)
runmany('INSERT INTO cust2 VALUES (?,?,?)', [(i,f'C{i}',random.choice(['gold','silver'])) for i in range(1,501)])
runmany('INSERT INTO ord2 VALUES (?,?,?,?)',
    [(i,random.randint(1,500),round(random.uniform(10,500),2),
      random.choice(['paid','pending'])) for i in range(1,2001)])
plan = run('''EXPLAIN QUERY PLAN
    SELECT c.name, SUM(o.amount)
    FROM ord2 o JOIN cust2 c ON o.cust_id=c.id
    WHERE o.status='paid' AND c.tier='gold'
    GROUP BY c.id''')
for p in plan: print(p)
run('CREATE INDEX IF NOT EXISTS idx_ord_status ON ord2(status, cust_id)')
run('CREATE INDEX IF NOT EXISTS idx_cust_tier ON cust2(tier)')
plan2 = run('''EXPLAIN QUERY PLAN
    SELECT c.name, SUM(o.amount)
    FROM ord2 o JOIN cust2 c ON o.cust_id=c.id
    WHERE o.status='paid' AND c.tier='gold'
    GROUP BY c.id''')
for p in plan2: print(p)
🏋️ Practice: Index Optimization Practice
Create a 'transactions2' table (id, account_id INTEGER, type TEXT, amount REAL, created_at TEXT). Insert 500 rows. Use EXPLAIN QUERY PLAN to check a query filtering by account_id and type, then add the right index to make it efficient.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nimport random; random.seed(9)
run('''CREATE TABLE IF NOT EXISTS transactions2 (
    id INTEGER PRIMARY KEY, account_id INTEGER,
    type TEXT, amount REAL, created_at TEXT)''')
rows = [(i, random.randint(1,50), random.choice(['debit','credit']),
         round(random.uniform(1,1000),2), f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}')
        for i in range(1,501)]
runmany('INSERT INTO transactions2 VALUES (?,?,?,?,?)', rows)
# First check the plan, then add an index
plan = run('''EXPLAIN QUERY PLAN
    SELECT * FROM transactions2
    WHERE account_id=5 AND type='debit' ORDER BY created_at''')
for p in plan: print(p)
# Add your index here
✅ Practice Checklist
32. SQL for Data Analysis Workflow

Apply SQL as a full data analysis tool: ingest and clean dirty data, compute monthly trends with running totals, compare performance vs. group averages, and pivot long-format survey data.

Ingest and clean dirty text amounts
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\n# End-to-end: ingest, clean, transform, analyze
run('''CREATE TABLE IF NOT EXISTS raw_sales (
    id INTEGER PRIMARY KEY,
    rep TEXT, region TEXT,
    amount TEXT,  -- stored as text (dirty data)
    sale_date TEXT)''')
runmany('INSERT INTO raw_sales VALUES (?,?,?,?,?)', [
    (1,'Alice','North','1,200.50','2024-01-15'),
    (2,'Bob','South','800','2024-01-20'),
    (3,'Alice','North','  950.00 ','2024-02-01'),
    (4,'Carol','East','','2024-02-10'),   -- missing amount
    (5,'Bob','South','1100.75','2024-02-20'),
])
# Clean: strip whitespace, handle empty, cast
rows = run('''
    SELECT rep, region,
        CASE WHEN TRIM(REPLACE(amount,',',''))='' THEN NULL
             ELSE CAST(TRIM(REPLACE(amount,',','')) AS REAL)
        END AS clean_amount,
        sale_date
    FROM raw_sales''')
for r in rows: print(r)
Monthly trend with running total CTE
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS cleaned_sales (
    rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO cleaned_sales VALUES (?,?,?,?)', [
    ('Alice','North',1200.50,'2024-01-15'),
    ('Bob','South',800.00,'2024-01-20'),
    ('Alice','North',950.00,'2024-02-01'),
    ('Bob','South',1100.75,'2024-02-20'),
    ('Carol','East',750.00,'2024-03-05'),
    ('Alice','North',1350.00,'2024-03-10'),
])
# Monthly trend and running total
rows = run('''
    WITH monthly AS (
        SELECT SUBSTR(sale_date,1,7) AS month,
               rep, SUM(amount) AS total
        FROM cleaned_sales GROUP BY month, rep)
    SELECT month, rep, total,
        SUM(total) OVER(PARTITION BY rep ORDER BY month) AS running_total
    FROM monthly ORDER BY rep, month''')
for r in rows: print(r)
Rep performance vs. regional average
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales_f (
    rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO sales_f VALUES (?,?,?,?)', [
    ('Alice','North',1200,'2024-01-15'),('Bob','South',800,'2024-01-20'),
    ('Alice','North',950,'2024-02-01'),('Carol','East',750,'2024-02-10'),
    ('Bob','South',1100,'2024-02-20'),('Alice','North',1350,'2024-03-10'),
    ('Carol','East',900,'2024-03-15'),('Bob','South',1250,'2024-03-20'),
])
# Cohort-style: rep performance vs. regional average
rows = run('''
    SELECT rep, region, ROUND(SUM(amount),2) AS rep_total,
        ROUND(AVG(SUM(amount)) OVER(PARTITION BY region),2) AS region_avg,
        ROUND(SUM(amount) - AVG(SUM(amount)) OVER(PARTITION BY region),2) AS vs_avg
    FROM sales_f GROUP BY rep, region ORDER BY region, rep''')
for r in rows: print(r)
Pivot survey data with CASE WHEN
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS survey (
    respondent_id INTEGER, question TEXT, score INTEGER)''')
runmany('INSERT INTO survey VALUES (?,?,?)', [
    (1,'Q1',4),(1,'Q2',5),(1,'Q3',3),
    (2,'Q1',2),(2,'Q2',3),(2,'Q3',4),
    (3,'Q1',5),(3,'Q2',5),(3,'Q3',5),
    (4,'Q1',3),(4,'Q2',2),(4,'Q3',3),
])
# Pivot: one row per respondent, one column per question
rows = run('''
    SELECT respondent_id,
        MAX(CASE WHEN question='Q1' THEN score END) AS Q1,
        MAX(CASE WHEN question='Q2' THEN score END) AS Q2,
        MAX(CASE WHEN question='Q3' THEN score END) AS Q3,
        ROUND(AVG(score),2) AS avg_score
    FROM survey GROUP BY respondent_id ORDER BY respondent_id''')
for r in rows: print(r)
💼 Real-World: Sales Dashboard Pipeline
Build a SQL pipeline: clean raw data, compute monthly totals, calculate month-over-month growth, and rank reps by total sales within their region.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS pipeline_sales (
    rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO pipeline_sales VALUES (?,?,?,?)', [
    ('Alice','North',1200,'2024-01-10'),('Bob','North',950,'2024-01-20'),
    ('Alice','North',1400,'2024-02-05'),('Bob','North',1100,'2024-02-15'),
    ('Carol','South',800,'2024-01-12'),('Dave','South',900,'2024-01-25'),
    ('Carol','South',1050,'2024-02-08'),('Dave','South',750,'2024-02-18'),
])
rows = run('''
    WITH monthly AS (
        SELECT rep, region, SUBSTR(sale_date,1,7) AS month,
               SUM(amount) AS total
        FROM pipeline_sales GROUP BY rep, region, month),
    with_growth AS (
        SELECT *,
            LAG(total) OVER(PARTITION BY rep ORDER BY month) AS prev,
            ROUND(100.0*(total - LAG(total) OVER(PARTITION BY rep ORDER BY month))
                  / LAG(total) OVER(PARTITION BY rep ORDER BY month), 1) AS pct_growth
        FROM monthly)
    SELECT *, RANK() OVER(PARTITION BY region, month ORDER BY total DESC) AS region_rank
    FROM with_growth ORDER BY region, month, region_rank''')
for r in rows: print(r)
🏋️ Practice: End-to-End Analysis Practice
Create a 'web_logs' table with (session_id INTEGER, user_id INTEGER, page TEXT, duration_sec INTEGER, log_date TEXT). Insert 10 rows. Write a query that: (1) filters sessions over 30s, (2) counts sessions per user per day, and (3) shows each user's max daily sessions using a window function.
Starter Code
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.execute(sql, params)\n        if cur.description:\n            return cur.fetchall()\n        conn.commit()\n        return cur.rowcount\ndef runmany(sql, rows):\n    with contextlib.closing(conn.cursor()) as cur:\n        cur.executemany(sql, rows)\n        conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS web_logs (
    session_id INTEGER, user_id INTEGER,
    page TEXT, duration_sec INTEGER, log_date TEXT)''')
runmany('INSERT INTO web_logs VALUES (?,?,?,?,?)', [
    (1,1,'home',45,'2024-01-01'),(2,1,'about',20,'2024-01-01'),
    (3,2,'home',60,'2024-01-01'),(4,2,'shop',90,'2024-01-01'),
    (5,1,'home',35,'2024-01-02'),(6,1,'shop',55,'2024-01-02'),
    (7,3,'home',15,'2024-01-01'),(8,3,'about',40,'2024-01-01'),
    (9,2,'home',80,'2024-01-02'),(10,3,'shop',50,'2024-01-02'),
])
# Write your query: filter >30s, count per user/day, window max
✅ Practice Checklist