ποΈ SQL with Python
32 topics • Click any card to expand
Python's built-in sqlite3 module connects to relational databases β no server needed. Use :memory: for prototyping.
import sqlite3
conn = sqlite3.connect(":memory:") # in-memory DB
cur = conn.cursor()
cur.execute(
"CREATE TABLE employees ("
" id INTEGER PRIMARY KEY,"
" name TEXT NOT NULL,"
" dept TEXT,"
" salary REAL,"
" hire_date TEXT)"
)
data = [
(1, "Alice", "Engineering", 95000, "2022-03-15"),
(2, "Bob", "Marketing", 72000, "2021-08-01"),
(3, "Carol", "Engineering", 88000, "2023-01-10"),
]
cur.executemany("INSERT INTO employees VALUES (?,?,?,?,?)", data)
conn.commit()
count = conn.execute("SELECT COUNT(*) FROM employees").fetchone()[0]
print(f"Inserted {count} rows.")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row # rows behave like dicts
conn.execute("CREATE TABLE products (id INT, name TEXT, price REAL)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
(1,"Widget",9.99),(2,"Gadget",49.99),(3,"Doohickey",19.99),
])
for row in conn.execute("SELECT * FROM products"):
print(f" [{row['id']}] {row['name']:12s} ${row['price']:.2f}")
conn.close()import sqlite3
# 'with' ensures commit on success, rollback on exception
with sqlite3.connect(":memory:") as conn:
conn.executescript("""
CREATE TABLE categories (id INT PRIMARY KEY, name TEXT);
CREATE TABLE items (id INT, cat_id INT, name TEXT, qty INT);
INSERT INTO categories VALUES (1,'Electronics'),(2,'Clothing');
INSERT INTO items VALUES
(1,1,'Laptop',10),(2,1,'Phone',25),
(3,2,'T-Shirt',100),(4,2,'Jeans',50);
""")
rows = conn.execute(
"SELECT c.name, COUNT(i.id) as items, SUM(i.qty) as total_qty "
"FROM categories c JOIN items i ON c.id=i.cat_id "
"GROUP BY c.id"
).fetchall()
for r in rows:
print(f" {r[0]:14s} items={r[1]} qty={r[2]}")import sqlite3
conn = sqlite3.connect(":memory:")
conn.executescript("""
CREATE TABLE employees (id INT PRIMARY KEY, name TEXT, dept TEXT, salary REAL);
CREATE TABLE departments (id INT PRIMARY KEY, name TEXT, budget REAL);
CREATE INDEX idx_dept ON employees(dept);
CREATE INDEX idx_salary ON employees(salary);
""")
# Inspect all objects via sqlite_master
print("Schema objects:")
for row in conn.execute(
"SELECT type, name, sql FROM sqlite_master ORDER BY type, name"
).fetchall():
print(f" [{row[0]:5s}] {row[1]}")
# List only tables
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
print("Tables:", [t[0] for t in tables])
# List columns of a table using PRAGMA
print("Columns in employees:")
for col in conn.execute("PRAGMA table_info(employees)").fetchall():
print(f" col#{col[0]} {col[1]:10s} type={col[2]:8s} notnull={col[3]} pk={col[5]}")
conn.close()import sqlite3, random, datetime
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE sales ("
" id INT, customer TEXT, product TEXT,"
" amount REAL, sale_date TEXT)"
)
customers = ["Alice","Bob","Carol","Dave","Eve"]
products = ["Widget","Gadget","Doohickey","Gizmo"]
base = datetime.date(2024, 1, 1)
records = [
(i, random.choice(customers), random.choice(products),
round(random.uniform(20, 500), 2),
(base + datetime.timedelta(days=random.randint(0, 180))).isoformat())
for i in range(1, 1001)
]
conn.executemany("INSERT INTO sales VALUES (?,?,?,?,?)", records)
conn.commit()
row = conn.execute(
"SELECT COUNT(*), ROUND(SUM(amount),2), ROUND(AVG(amount),2) FROM sales"
).fetchone()
print(f"Rows: {row[0]} Total: ${row[1]:,.2f} Avg: ${row[2]:.2f}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
# TODO: Create the 'books' table
# conn.execute("CREATE TABLE books (...)")
# TODO: Create the 'members' table
# conn.execute("CREATE TABLE members (...)")
# TODO: Insert at least 5 books
# conn.executemany("INSERT INTO books VALUES (?,?,?,?,?)", [...])
# TODO: Insert at least 3 members
# conn.executemany("INSERT INTO members VALUES (?,?,?)", [...])
conn.commit()
# TODO: Query all books published after 2000, ordered by year DESC
# rows = conn.execute("SELECT title, author, year FROM books WHERE ...").fetchall()
# for r in rows:
# print(r)
conn.close()SELECT retrieves columns. WHERE filters rows. Use LIKE, IN, BETWEEN, and IS NULL for flexible filtering.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
(1,"Alice","Eng",95000),(2,"Bob","Marketing",72000),
(3,"Carol","Eng",88000),(4,"Dave","HR",65000),(5,"Eve","Marketing",78000),
])
# All Engineering employees, sorted by salary
rows = conn.execute(
"SELECT name, salary FROM emp WHERE dept='Eng' ORDER BY salary DESC"
).fetchall()
print("Engineering staff:")
for r in rows: print(f" {r[0]:8s} ${r[1]:,}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (name TEXT, price REAL, category TEXT)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
("Apple",0.50,"Fruit"),("Avocado",1.50,"Fruit"),
("Milk",2.99,"Dairy"),("Cheese",4.50,"Dairy"),
("Bread",3.29,"Bakery"),("Cake",None,"Bakery"),
])
q = "SELECT name FROM products WHERE"
print(conn.execute(q + " name LIKE 'A%'").fetchall())
print(conn.execute(q + " category IN ('Dairy','Bakery')").fetchall())
print(conn.execute(q + " price BETWEEN 1 AND 4").fetchall())
print(conn.execute(q + " price IS NULL").fetchall())
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?)", [
("Alice","Eng",95000),("Bob","HR",65000),("Carol","Eng",88000),
("Dave","HR",70000),("Eve","Marketing",78000),("Frank","Eng",102000),
])
# Top 3 earners
print("Top 3 earners:")
for r in conn.execute(
"SELECT name, salary FROM emp ORDER BY salary DESC LIMIT 3"
).fetchall():
print(f" {r[0]:8s} ${r[1]:,}")
# Page 2 (rows 3-4) when sorting by name
print("Page 2 by name:")
for r in conn.execute(
"SELECT name FROM emp ORDER BY name LIMIT 2 OFFSET 2"
).fetchall():
print(f" {r[0]}")
# Distinct departments
depts = conn.execute("SELECT DISTINCT dept FROM emp ORDER BY dept").fetchall()
print("Departments:", [d[0] for d in depts])
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL, bonus REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
("Alice","Eng",95000,5000),("Bob","HR",65000,None),
("Carol","Eng",88000,3000),("Dave","Marketing",72000,None),
("Eve","Eng",110000,8000),("Frank","HR",60000,0),
])
rows = conn.execute(
"SELECT name, dept, salary,"
" COALESCE(bonus, 0) as safe_bonus,"
" CASE "
" WHEN salary >= 100000 THEN 'Senior'"
" WHEN salary >= 80000 THEN 'Mid'"
" ELSE 'Junior'"
" END as band,"
" NULLIF(bonus, 0) as nonzero_bonus "
"FROM emp ORDER BY salary DESC"
).fetchall()
print(f"{'Name':8s} {'Band':7s} {'Salary':>9s} {'Bonus':>7s} {'NZ Bonus':>10s}")
for r in rows:
print(f"{r[0]:8s} {r[4]:7s} ${r[2]:>8,} ${r[3]:>6,.0f} {str(r[5]):>10s}")
# IS NULL / IS NOT NULL filter
missing = conn.execute(
"SELECT name FROM emp WHERE bonus IS NULL OR bonus = 0"
).fetchall()
print("No effective bonus:", [m[0] for m in missing])
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE orders ("
" id INT, customer TEXT, status TEXT, amount REAL, order_date TEXT)"
)
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
(1,"Alice","shipped",120.0,"2024-01-05"),
(2,"Bob","pending",80.0,"2024-01-10"),
(3,"Alice","delivered",250.0,"2024-01-12"),
(4,"Carol","pending",450.0,"2024-01-15"),
(5,"Bob","shipped",310.0,"2024-01-18"),
(6,"Dave","cancelled",90.0,"2024-01-20"),
])
rows = conn.execute(
"SELECT id, customer, amount, order_date "
"FROM orders "
"WHERE status IN ('pending','shipped') AND amount > 100 "
"ORDER BY amount DESC"
).fetchall()
print("Priority orders:")
for r in rows:
print(f" #{r[0]} {r[1]:8s} ${r[2]:6.2f} {r[3]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE employees (id INT, name TEXT, department TEXT, salary REAL, city TEXT)")
conn.executemany("INSERT INTO employees VALUES (?,?,?,?,?)", [
# TODO: add at least 8 rows across multiple departments and cities
# (1, "Alice", "Engineering", 95000, "NYC"),
# ...
])
conn.commit()
# TODO: Query 1 β Engineering employees earning > 80000, sorted by salary DESC
# rows = conn.execute("SELECT name, salary FROM employees WHERE ...").fetchall()
# print("Engineering >80k:")
# for r in rows: print(f" {r[0]:10s} ${r[1]:,}")
# TODO: Query 2 β Employees in NYC or LA
# rows = conn.execute("SELECT name, city FROM employees WHERE city IN (...)").fetchall()
# print("NYC/LA staff:", rows)
# TODO: Query 3 β Top 3 earners company-wide
# rows = conn.execute("SELECT name, salary FROM employees ORDER BY ... LIMIT ...").fetchall()
# print("Top 3:", rows)
conn.close()Aggregate functions (COUNT, SUM, AVG, MAX) summarize groups. HAVING filters groups after aggregation β like WHERE but for groups.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (region TEXT, rep TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("North","Alice",1000),("North","Bob",1500),
("South","Carol",800),("South","Dave",1200),
("East","Eve",2000),("East","Alice",900),
])
rows = conn.execute(
"SELECT region,"
" COUNT(*) as reps,"
" SUM(amount) as total,"
" ROUND(AVG(amount),0) as avg_amt,"
" MAX(amount) as top "
"FROM sales GROUP BY region ORDER BY total DESC"
).fetchall()
print(f"{'Region':8s} {'Reps':>5} {'Total':>8} {'Avg':>8} {'Top':>8}")
for r in rows: print(f"{r[0]:8s} {r[1]:>5} {r[2]:>8} {r[3]:>8} {r[4]:>8}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?)", [
("Eng",120000),("Eng",95000),("Eng",110000),
("HR",65000),
("Marketing",85000),("Marketing",90000),("Marketing",72000),
])
rows = conn.execute(
"SELECT dept, COUNT(*) as headcount, ROUND(AVG(salary),0) as avg_sal "
"FROM emp "
"GROUP BY dept "
"HAVING COUNT(*) >= 2 "
"ORDER BY avg_sal DESC"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (region TEXT, product TEXT, qty INT, revenue REAL)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?)", [
("North","Widget",10,500),("North","Gadget",5,250),
("South","Widget",8,400),("South","Widget",12,600),
("East","Gadget",20,1000),("East","Widget",3,150),
])
# Group by two columns
print("Region + Product breakdown:")
for r in conn.execute(
"SELECT region, product, SUM(qty) as units, ROUND(SUM(revenue),2) as rev "
"FROM orders GROUP BY region, product ORDER BY region, rev DESC"
).fetchall():
print(f" {r[0]:8s} {r[1]:8s} units={r[2]} rev=${r[3]:.2f}")
# Simulated ROLLUP: per-region total
print("Per-region totals:")
for r in conn.execute(
"SELECT region, SUM(qty) as total_units, ROUND(SUM(revenue),2) as total_rev "
"FROM orders GROUP BY region ORDER BY total_rev DESC"
).fetchall():
print(f" {r[0]:8s} units={r[1]} rev=${r[2]:.2f}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, product TEXT, amount REAL, status TEXT)")
conn.executemany("INSERT INTO sales VALUES (?,?,?,?)", [
("Alice","Widget",1200,"won"),("Alice","Gadget",800,"lost"),
("Alice","Widget",500,"won"),("Bob","Gadget",1500,"won"),
("Bob","Widget",300,"lost"),("Bob","Gadget",900,"won"),
("Carol","Widget",2000,"won"),("Carol","Gadget",600,"lost"),
])
# Pivot-style: won vs lost amounts per rep in a single query
rows = conn.execute(
"SELECT rep,"
" COUNT(*) as deals,"
" SUM(CASE WHEN status='won' THEN amount ELSE 0 END) as won_amt,"
" SUM(CASE WHEN status='lost' THEN amount ELSE 0 END) as lost_amt,"
" ROUND(100.0 * SUM(CASE WHEN status='won' THEN 1 ELSE 0 END) / COUNT(*), 1) as win_pct "
"FROM sales GROUP BY rep ORDER BY won_amt DESC"
).fetchall()
print(f"{'Rep':8s} {'Deals':>6} {'Won $':>8} {'Lost $':>8} {'Win%':>6}")
for r in rows:
print(f"{r[0]:8s} {r[1]:>6} ${r[2]:>7,.0f} ${r[3]:>7,.0f} {r[4]:>5.1f}%")
conn.close()import sqlite3, random, datetime
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE txn (user_id INT, category TEXT, amount REAL, date TEXT)")
cats = ["Food","Transport","Shopping","Entertainment","Utilities"]
rows = [
(random.randint(1,100), random.choice(cats),
round(random.uniform(5,300),2),
(datetime.date(2024,1,1)+datetime.timedelta(days=random.randint(0,89))).isoformat())
for _ in range(2000)
]
conn.executemany("INSERT INTO txn VALUES (?,?,?,?)", rows)
report = conn.execute(
"SELECT category, COUNT(*) as txns,"
" ROUND(SUM(amount),2) as total,"
" ROUND(AVG(amount),2) as avg_amt,"
" ROUND(SUM(amount)*100.0/(SELECT SUM(amount) FROM txn),1) as pct "
"FROM txn GROUP BY category ORDER BY total DESC"
).fetchall()
print(f"{'Category':15s} {'Txns':>5} {'Total':>10} {'Avg':>8} {'%':>6}")
print("-" * 50)
for r in report:
print(f"{r[0]:15s} {r[1]:>5} ${r[2]:>9,.2f} ${r[3]:>7.2f} {r[4]:>5.1f}%")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, region TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
# TODO: insert 10+ rows, vary reps and regions
# ("Alice", "North", 1200), ...
])
conn.commit()
# TODO: Query 1 β total and avg per region, ORDER BY total DESC
# rows = conn.execute(
# "SELECT region, SUM(amount) as total, ROUND(AVG(amount),2) as avg_amt "
# "FROM sales GROUP BY region ORDER BY total DESC"
# ).fetchall()
# for r in rows: print(r)
# TODO: Query 2 β reps with total sales > 3000
# rows = conn.execute(
# "SELECT rep, SUM(amount) as total FROM sales "
# "GROUP BY rep HAVING total > 3000"
# ).fetchall()
# print("High performers:", rows)
# TODO: Query 3 β region with highest average sale
# row = conn.execute(
# "SELECT region, ROUND(AVG(amount),2) as avg_amt FROM sales "
# "GROUP BY region ORDER BY avg_amt DESC LIMIT 1"
# ).fetchone()
# print("Best avg region:", row)
conn.close()JOINs combine rows from multiple tables. INNER keeps only matches; LEFT keeps all rows from the left table even without a match.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE dept (id INT, name TEXT)")
conn.execute("CREATE TABLE emp (id INT, name TEXT, dept_id INT, salary REAL)")
conn.executemany("INSERT INTO dept VALUES (?,?)", [(1,"Eng"),(2,"HR"),(3,"Sales")])
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
(1,"Alice",1,100000),(2,"Bob",2,65000),
(3,"Carol",1,95000),(4,"Dave",3,80000),(5,"Eve",None,75000),
])
print("INNER JOIN:")
for r in conn.execute(
"SELECT e.name, d.name, e.salary "
"FROM emp e INNER JOIN dept d ON e.dept_id=d.id"
).fetchall(): print(" ", r)
print("LEFT JOIN (all employees):")
for r in conn.execute(
"SELECT e.name, COALESCE(d.name,'Unknown'), e.salary "
"FROM emp e LEFT JOIN dept d ON e.dept_id=d.id"
).fetchall(): print(" ", r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE customers (id INT, name TEXT, city TEXT)")
conn.execute("CREATE TABLE orders (id INT, cust_id INT, amount REAL)")
conn.executemany("INSERT INTO customers VALUES (?,?,?)", [
(1,"Alice","NYC"),(2,"Bob","LA"),(3,"Carol","NYC"),(4,"Dave","Chicago"),
])
conn.executemany("INSERT INTO orders VALUES (?,?,?)", [
(1,1,150),(2,1,200),(3,2,80),(4,3,320),(5,3,100),
])
rows = conn.execute(
"SELECT c.name, c.city, COUNT(o.id) as orders, "
" COALESCE(SUM(o.amount),0) as spent "
"FROM customers c LEFT JOIN orders o ON c.id=o.cust_id "
"GROUP BY c.id ORDER BY spent DESC"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
# Self-join: employee + their manager
conn.execute("CREATE TABLE emp (id INT, name TEXT, mgr_id INT, dept TEXT)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
(1,"CEO",None,"Exec"),(2,"CTO",1,"Tech"),(3,"CFO",1,"Finance"),
(4,"Dev Lead",2,"Tech"),(5,"Alice",4,"Tech"),(6,"Bob",4,"Tech"),
])
print("Employee -> Manager:")
for r in conn.execute(
"SELECT e.name as employee, COALESCE(m.name,'β') as manager, e.dept "
"FROM emp e LEFT JOIN emp m ON e.mgr_id=m.id "
"ORDER BY e.id"
).fetchall():
print(f" {r[0]:10s} manager={r[1]:10s} dept={r[2]}")
# Three-table join
conn.execute("CREATE TABLE projects (id INT, name TEXT, lead_id INT)")
conn.executemany("INSERT INTO projects VALUES (?,?,?)", [
(1,"Alpha",4),(2,"Beta",2),(3,"Gamma",3),
])
print("Projects with leads and their department:")
for r in conn.execute(
"SELECT p.name, e.name as lead, e.dept "
"FROM projects p JOIN emp e ON p.lead_id=e.id"
).fetchall():
print(f" {r[0]:8s} lead={r[1]:10s} dept={r[2]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
# CROSS JOIN produces every combination of rows (cartesian product)
conn.execute("CREATE TABLE sizes (size TEXT)")
conn.execute("CREATE TABLE colors (color TEXT)")
conn.executemany("INSERT INTO sizes VALUES (?)", [("S",),("M",),("L",),("XL",)])
conn.executemany("INSERT INTO colors VALUES (?)", [("Red",),("Blue",),("Green",)])
print("All size/color combinations (CROSS JOIN):")
variants = conn.execute(
"SELECT s.size, c.color FROM sizes s CROSS JOIN colors c ORDER BY s.size, c.color"
).fetchall()
for r in variants:
print(f" {r[0]:4s} / {r[1]}")
print(f"Total variants: {len(variants)}")
# Use CROSS JOIN to build a multiplication table
print("3x3 multiplication table:")
conn.execute("CREATE TABLE nums (n INT)")
conn.executemany("INSERT INTO nums VALUES (?)", [(1,),(2,),(3,)])
for r in conn.execute(
"SELECT a.n, b.n, a.n * b.n as product "
"FROM nums a CROSS JOIN nums b ORDER BY a.n, b.n"
).fetchall():
print(f" {r[0]} x {r[1]} = {r[2]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
for ddl in [
"CREATE TABLE products (id INT, name TEXT, category TEXT)",
"CREATE TABLE inventory (product_id INT, warehouse TEXT, qty INT)",
"CREATE TABLE sales_30d (product_id INT, qty_sold INT)",
]:
conn.execute(ddl)
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
(1,"Widget","Hardware"),(2,"Gadget","Electronics"),
(3,"Doohickey","Hardware"),(4,"Gizmo","Electronics"),
])
conn.executemany("INSERT INTO inventory VALUES (?,?,?)", [
(1,"East",500),(1,"West",300),(2,"East",80),(3,"West",200),(4,"East",50),
])
conn.executemany("INSERT INTO sales_30d VALUES (?,?)", [
(1,420),(2,75),(3,60),(4,45),
])
rows = conn.execute(
"SELECT p.name, p.category, SUM(i.qty) as stock, "
" COALESCE(s.qty_sold,0) as sold_30d, "
" ROUND(SUM(i.qty)*1.0/NULLIF(COALESCE(s.qty_sold,0),0),1) as weeks "
"FROM products p "
"JOIN inventory i ON p.id=i.product_id "
"LEFT JOIN sales_30d s ON p.id=s.product_id "
"GROUP BY p.id ORDER BY weeks ASC"
).fetchall()
print(f"{'Product':12s} {'Category':12s} {'Stock':>6} {'Sold':>6} {'Weeks':>6}")
for r in rows:
print(f"{r[0]:12s} {r[1]:12s} {r[2]:>6} {r[3]:>6} {str(r[4]):>6}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE students (id INT, name TEXT, grade TEXT)")
conn.execute("CREATE TABLE enrollments (student_id INT, course TEXT, score REAL)")
conn.executemany("INSERT INTO students VALUES (?,?,?)", [
# TODO: 5 students, at least one with no enrollment
# (1,"Alice","A"), ...
])
conn.executemany("INSERT INTO enrollments VALUES (?,?,?)", [
# TODO: 8 enrollments across the enrolled students
# (1,"Math",88), ...
])
conn.commit()
# TODO: INNER JOIN β student name, course, score
# rows = conn.execute(
# "SELECT s.name, e.course, e.score "
# "FROM students s INNER JOIN enrollments e ON s.id=e.student_id"
# ).fetchall()
# print("Enrolled:")
# for r in rows: print(" ", r)
# TODO: LEFT JOIN β all students (NULL course for unenrolled)
# rows = conn.execute(
# "SELECT s.name, COALESCE(e.course,'β') as course "
# "FROM students s LEFT JOIN enrollments e ON s.id=e.student_id"
# ).fetchall()
# print("All students:")
# for r in rows: print(" ", r)
# TODO: Average score per student
# rows = conn.execute(
# "SELECT s.name, ROUND(AVG(e.score),1) as avg_score "
# "FROM students s JOIN enrollments e ON s.id=e.student_id "
# "GROUP BY s.id ORDER BY avg_score DESC"
# ).fetchall()
# print("Avg scores:", rows)
conn.close()Subqueries run a query inside another. CTEs (WITH clause) make complex logic readable and reusable β they're like named temp tables.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?)", [
("Alice","Eng",120000),("Bob","HR",65000),("Carol","Eng",95000),
("Dave","Mkt",80000),("Eve","Eng",110000),("Frank","HR",72000),
])
# Employees earning more than the company average
rows = conn.execute(
"SELECT name, dept, salary FROM emp "
"WHERE salary > (SELECT AVG(salary) FROM emp) "
"ORDER BY salary DESC"
).fetchall()
print("Above-average earners:")
for r in rows: print(f" {r[0]:8s} {r[1]:5s} ${r[2]:,}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, amount REAL, month TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?)", [
(1,"Alice",100,"Jan"),(2,"Bob",200,"Jan"),
(3,"Alice",150,"Feb"),(4,"Carol",300,"Feb"),
(5,"Bob",250,"Mar"),(6,"Alice",400,"Mar"),
])
rows = conn.execute(
"WITH monthly AS ("
" SELECT month, SUM(amount) as total FROM orders GROUP BY month"
"),"
"avg_m AS (SELECT AVG(total) as avg_total FROM monthly) "
"SELECT m.month, m.total, "
" ROUND(m.total - a.avg_total, 2) as vs_avg "
"FROM monthly m, avg_m a ORDER BY m.month"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, dept TEXT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
(1,"Alice","Eng",120000),(2,"Bob","HR",65000),(3,"Carol","Eng",95000),
(4,"Dave","Mkt",80000),(5,"Eve","Eng",110000),(6,"Frank","HR",72000),
])
# Correlated subquery: employees earning above their own dept average
print("Above dept average:")
for r in conn.execute(
"SELECT name, dept, salary, "
" ROUND((SELECT AVG(salary) FROM emp e2 WHERE e2.dept=e1.dept),0) as dept_avg "
"FROM emp e1 "
"WHERE salary > (SELECT AVG(salary) FROM emp e2 WHERE e2.dept=e1.dept) "
"ORDER BY dept, salary DESC"
).fetchall():
print(f" {r[0]:8s} {r[1]:5s} ${r[2]:,} dept_avg=${r[3]:,}")
# EXISTS subquery: departments that have at least one employee over 100k
print("Depts with 100k+ earner:")
for r in conn.execute(
"SELECT DISTINCT dept FROM emp e1 "
"WHERE EXISTS (SELECT 1 FROM emp e2 WHERE e2.dept=e1.dept AND e2.salary>100000)"
).fetchall():
print(f" {r[0]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, product TEXT, amount REAL, month INT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
(1,"Alice","Widget",200,1),(2,"Bob","Gadget",150,1),(3,"Alice","Gadget",300,2),
(4,"Carol","Widget",450,2),(5,"Bob","Widget",120,3),(6,"Alice","Gizmo",500,3),
(7,"Carol","Gadget",250,3),(8,"Dave","Widget",80,1),
])
# Scalar subquery in SELECT: show each order's % of grand total
print("Each order as % of grand total:")
for r in conn.execute(
"SELECT cust, product, amount,"
" ROUND(amount * 100.0 / (SELECT SUM(amount) FROM orders), 1) as pct_of_total "
"FROM orders ORDER BY amount DESC"
).fetchall():
print(f" {r[0]:8s} {r[1]:8s} ${r[2]:>6.0f} ({r[3]}%)")
# Multi-CTE chain: monthly totals -> rank months -> show top 2
print("Top 2 months by revenue:")
for r in conn.execute(
"WITH monthly AS ("
" SELECT month, SUM(amount) as total FROM orders GROUP BY month"
"),"
"ranked AS ("
" SELECT month, total, RANK() OVER (ORDER BY total DESC) as rnk FROM monthly"
") "
"SELECT month, total, rnk FROM ranked WHERE rnk <= 2 ORDER BY rnk"
).fetchall():
print(f" Month {r[0]} total=${r[1]:.0f} rank={r[2]}")
conn.close()import sqlite3, random, datetime
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE events (user_id INT, event TEXT, ts TEXT)")
random.seed(5)
funnel = ["signup","view_product","add_to_cart","purchase"]
rows = []
for uid in range(1, 501):
n = random.randint(1, 4)
base = datetime.datetime(2024,1,1) + datetime.timedelta(days=random.randint(0,60))
for ev in funnel[:n]:
rows.append((uid, ev, (base+datetime.timedelta(hours=random.randint(1,48))).isoformat()))
base += datetime.timedelta(days=random.randint(0,3))
conn.executemany("INSERT INTO events VALUES (?,?,?)", rows)
report = conn.execute(
"WITH steps AS ("
" SELECT event, COUNT(DISTINCT user_id) as users "
" FROM events "
" WHERE event IN ('signup','view_product','add_to_cart','purchase') "
" GROUP BY event"
"),"
"top AS (SELECT users as n FROM steps WHERE event='signup') "
"SELECT event, users, "
" ROUND(users*100.0/(SELECT n FROM top),1) as pct "
"FROM steps ORDER BY users DESC"
).fetchall()
print("Conversion Funnel:")
for r in report: print(f" {r[0]:16s} {r[1]:4d} users ({r[2]}%)")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, customer TEXT, product TEXT, amount REAL, order_date TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", [
(1,"Alice","Widget",150,"2024-01-05"),
(2,"Bob","Gadget",80,"2024-01-10"),
(3,"Alice","Gadget",220,"2024-02-03"),
(4,"Carol","Widget",310,"2024-02-15"),
(5,"Bob","Widget",95,"2024-03-01"),
(6,"Alice","Gizmo",400,"2024-03-20"),
(7,"Carol","Gadget",175,"2024-04-08"),
(8,"Dave","Widget",60,"2024-04-12"),
])
conn.commit()
# TODO: 1. Orders above overall average amount
# rows = conn.execute(
# "SELECT customer, amount FROM orders "
# "WHERE amount > (SELECT AVG(amount) FROM orders) "
# "ORDER BY amount DESC"
# ).fetchall()
# print("Above avg:", rows)
# TODO: 2. Top customer by total spend (subquery in FROM)
# row = conn.execute(
# "SELECT customer, total FROM "
# " (SELECT customer, SUM(amount) as total FROM orders GROUP BY customer) "
# "ORDER BY total DESC LIMIT 1"
# ).fetchone()
# print("Top customer:", row)
# TODO: 3. CTE β months above average monthly total
# rows = conn.execute(
# "WITH monthly AS ( "
# " SELECT strftime('%Y-%m', order_date) as month, SUM(amount) as total "
# " FROM orders GROUP BY month "
# ") "
# "SELECT month, total FROM monthly "
# "WHERE total > (SELECT AVG(total) FROM monthly) "
# "ORDER BY total DESC"
# ).fetchall()
# print("Above-avg months:", rows)
conn.close()Window functions compute values across rows related to the current row β ranking, running totals, lag/lead β without collapsing groups.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (month TEXT, rep TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("Jan","Alice",5000),("Jan","Bob",4000),("Jan","Carol",6000),
("Feb","Alice",5500),("Feb","Bob",3800),("Feb","Carol",7000),
])
rows = conn.execute(
"SELECT month, rep, amount,"
" RANK() OVER (PARTITION BY month ORDER BY amount DESC) as rnk,"
" SUM(amount) OVER (PARTITION BY month) as month_total,"
" ROUND(amount*100.0/SUM(amount) OVER (PARTITION BY month),1) as pct "
"FROM sales ORDER BY month, rnk"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE rev (week INT, store TEXT, rev REAL)")
conn.executemany("INSERT INTO rev VALUES (?,?,?)", [
(1,"A",10000),(2,"A",12000),(3,"A",9000),(4,"A",14000),(5,"A",11000),
(1,"B",8000),(2,"B",9500),(3,"B",11000),(4,"B",10500),(5,"B",12000),
])
rows = conn.execute(
"SELECT week, store, rev,"
" LAG(rev) OVER (PARTITION BY store ORDER BY week) as prev,"
" rev - LAG(rev) OVER (PARTITION BY store ORDER BY week) as wow,"
" AVG(rev) OVER (PARTITION BY store ORDER BY week "
" ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as ma3 "
"FROM rev ORDER BY store, week"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE scores (student TEXT, subject TEXT, score INT)")
conn.executemany("INSERT INTO scores VALUES (?,?,?)", [
("Alice","Math",92),("Bob","Math",78),("Carol","Math",85),
("Dave","Math",91),("Eve","Math",67),
("Alice","Science",88),("Bob","Science",95),("Carol","Science",72),
])
# ROW_NUMBER and RANK per subject
print("Math rankings:")
for r in conn.execute(
"SELECT student, score,"
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num,"
" RANK() OVER (ORDER BY score DESC) as rnk "
"FROM scores WHERE subject='Math'"
).fetchall():
print(f" {r[0]:8s} score={r[1]} row={r[2]} rank={r[3]}")
# Running total
print("Running total (Math by score):")
for r in conn.execute(
"SELECT student, score,"
" SUM(score) OVER (ORDER BY score DESC ROWS UNBOUNDED PRECEDING) as running_total "
"FROM scores WHERE subject='Math' ORDER BY score DESC"
).fetchall():
print(f" {r[0]:8s} score={r[1]} running_total={r[2]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, month TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("Alice","Jan",5000),("Alice","Feb",6200),("Alice","Mar",4800),
("Alice","Apr",7100),("Alice","May",5500),("Alice","Jun",6800),
("Bob","Jan",4200),("Bob","Feb",3800),("Bob","Mar",5100),
("Bob","Apr",4600),("Bob","May",5900),("Bob","Jun",4300),
])
# FIRST_VALUE and LAST_VALUE show best/worst months in the window
print("First and last month amount per rep (by month order):")
for r in conn.execute(
"SELECT rep, month, amount,"
" FIRST_VALUE(amount) OVER (PARTITION BY rep ORDER BY month "
" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as first_month,"
" LAST_VALUE(amount) OVER (PARTITION BY rep ORDER BY month "
" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_month "
"FROM sales ORDER BY rep, month"
).fetchall():
print(f" {r[0]:6s} {r[1]:4s} ${r[2]:>6,.0f} first=${r[3]:>6,.0f} last=${r[4]:>6,.0f}")
# NTILE divides rows into N equal buckets (quartiles here)
print("NTILE(3) buckets for Alice:")
for r in conn.execute(
"SELECT month, amount, NTILE(3) OVER (ORDER BY amount DESC) as bucket "
"FROM sales WHERE rep='Alice' ORDER BY bucket, amount DESC"
).fetchall():
print(f" bucket={r[2]} {r[0]} ${r[1]:,.0f}")
conn.close()import sqlite3, random, datetime
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE prices (id INT, symbol TEXT, price REAL, date TEXT)")
random.seed(10)
for sym in ["AAPL","GOOG"]:
p = random.uniform(150, 400)
for d in range(20):
p *= (1 + random.gauss(0.001, 0.015))
date = (datetime.date(2024,1,2)+datetime.timedelta(days=d)).isoformat()
conn.execute("INSERT INTO prices VALUES (?,?,?,?)",
(d+1 if sym=="AAPL" else d+21, sym, round(p,2), date))
conn.commit()
result = conn.execute(
"SELECT symbol, date, price,"
" ROUND(AVG(price) OVER (PARTITION BY symbol ORDER BY date "
" ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),2) as ma3,"
" CASE WHEN price > AVG(price) OVER "
" (PARTITION BY symbol ORDER BY date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
" THEN 'BUY' ELSE 'HOLD' END as signal "
"FROM prices WHERE symbol='AAPL' ORDER BY date LIMIT 8"
).fetchall()
print(f"{'Date':12s} {'Price':>8} {'MA3':>8} {'Signal':>7}")
for r in result:
print(f"{r[1]} {r[2]:>8.2f} {r[3]:>8.2f} {r[4]:>7}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, month TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("Alice","Jan",5000),("Bob","Jan",4200),("Carol","Jan",6100),
("Alice","Feb",5500),("Bob","Feb",3900),("Carol","Feb",7200),
("Alice","Mar",4800),("Bob","Mar",5100),("Carol","Mar",6800),
("Alice","Apr",6000),("Bob","Apr",4600),("Carol","Apr",5900),
])
conn.commit()
# TODO: 1. Rank reps within each month by amount DESC
# rows = conn.execute(
# "SELECT month, rep, amount, "
# " RANK() OVER (PARTITION BY month ORDER BY amount DESC) as rnk "
# "FROM sales ORDER BY month, rnk"
# ).fetchall()
# for r in rows: print(r)
# TODO: 2. Running total of amount per rep, ordered by month
# rows = conn.execute(
# "SELECT rep, month, amount, "
# " SUM(amount) OVER (PARTITION BY rep ORDER BY month "
# " ROWS UNBOUNDED PRECEDING) as running_total "
# "FROM sales ORDER BY rep, month"
# ).fetchall()
# for r in rows: print(r)
# TODO: 3. Month-over-month change per rep using LAG
# rows = conn.execute(
# "SELECT rep, month, amount, "
# " LAG(amount) OVER (PARTITION BY rep ORDER BY month) as prev_month, "
# " amount - LAG(amount) OVER (PARTITION BY rep ORDER BY month) as change "
# "FROM sales ORDER BY rep, month"
# ).fetchall()
# for r in rows: print(r)
conn.close()UPDATE modifies existing rows. DELETE removes rows. Always use WHERE β without it you affect the entire table.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE inventory (id INT, item TEXT, qty INT, price REAL)")
conn.executemany("INSERT INTO inventory VALUES (?,?,?,?)", [
(1,"Widget",100,9.99),(2,"Gadget",50,49.99),(3,"Doohickey",200,4.99),
])
# Raise price 10% for low-stock items
conn.execute("UPDATE inventory SET price=ROUND(price*1.1,2) WHERE qty < 100")
# Discontinue very cheap items
conn.execute("DELETE FROM inventory WHERE price < 5")
conn.commit()
print("After update/delete:")
for r in conn.execute("SELECT * FROM inventory").fetchall():
print(f" {r[1]:12s} qty={r[2]:4d} price=${r[3]:.2f}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE config ("
" key TEXT PRIMARY KEY, value TEXT, updated_at TEXT)"
)
def upsert(key, value, ts):
conn.execute(
"INSERT INTO config VALUES (?,?,?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at",
(key, value, ts)
)
conn.commit()
upsert("theme", "dark", "2024-01-01")
upsert("page_size", "25", "2024-01-01")
upsert("theme", "light", "2024-03-01") # update existing
for r in conn.execute("SELECT * FROM config").fetchall():
print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (id INT, name TEXT, category TEXT, price REAL)")
conn.execute("CREATE TABLE discounts (category TEXT, pct REAL)")
conn.executemany("INSERT INTO products VALUES (?,?,?,?)", [
(1,"Widget","Hardware",10.00),(2,"Gadget","Electronics",50.00),
(3,"Cable","Electronics",8.00),(4,"Hammer","Hardware",25.00),
(5,"Phone","Electronics",600.00),
])
conn.executemany("INSERT INTO discounts VALUES (?,?)", [
("Electronics",0.10),("Hardware",0.05),
])
conn.commit()
# Apply per-category discount using subquery in SET
conn.execute(
"UPDATE products SET price = ROUND(price * (1 - "
" (SELECT pct FROM discounts d WHERE d.category=products.category)), 2) "
"WHERE category IN (SELECT category FROM discounts)"
)
conn.commit()
print("After category discounts:")
for r in conn.execute("SELECT name, category, price FROM products ORDER BY category, price").fetchall():
print(f" {r[0]:10s} {r[1]:14s} ${r[2]:.2f}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE page_views ("
" url TEXT PRIMARY KEY, views INT, last_seen TEXT)"
)
# INSERT OR IGNORE β skip if the primary key already exists
initial = [
("https://example.com/home", 100, "2024-01-01"),
("https://example.com/about", 40, "2024-01-01"),
("https://example.com/blog", 75, "2024-01-01"),
]
conn.executemany("INSERT OR IGNORE INTO page_views VALUES (?,?,?)", initial)
# Try to insert a duplicate β silently ignored
conn.execute("INSERT OR IGNORE INTO page_views VALUES (?,?,?)",
("https://example.com/home", 999, "2024-06-01"))
print("After INSERT OR IGNORE:")
for r in conn.execute("SELECT url, views FROM page_views").fetchall():
print(f" {r[0]:40s} views={r[1]}")
# INSERT OR REPLACE β delete + re-insert if conflict (resets the whole row)
conn.execute("INSERT OR REPLACE INTO page_views VALUES (?,?,?)",
("https://example.com/home", 250, "2024-06-01"))
print("After INSERT OR REPLACE (home views updated to 250):")
for r in conn.execute("SELECT url, views, last_seen FROM page_views").fetchall():
print(f" {r[0]:40s} views={r[1]} last_seen={r[2]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE subs ("
" id INT, user TEXT, plan TEXT, price REAL,"
" start_date TEXT, end_date TEXT, status TEXT)"
)
conn.executemany("INSERT INTO subs VALUES (?,?,?,?,?,?,?)", [
(1,"Alice","basic", 9.99,"2024-01-01","2024-12-31","active"),
(2,"Bob", "pro", 29.99,"2023-01-01","2024-01-15","active"),
(3,"Carol","basic", 9.99,"2024-01-01","2024-06-30","active"),
(4,"Dave", "ent", 99.99,"2022-01-01","2024-12-31","active"),
])
today = "2024-07-01"
# Expire overdue subscriptions
conn.execute("UPDATE subs SET status='expired' WHERE end_date < ?", (today,))
# 15% discount for customers active >= 2 years
conn.execute(
"UPDATE subs SET price=ROUND(price*0.85,2) "
"WHERE status='active' "
"AND CAST(strftime('%Y',?) AS INT) - CAST(strftime('%Y',start_date) AS INT) >= 2",
(today,)
)
conn.commit()
print(f"{'User':8s} {'Plan':6s} {'Price':>8} {'Status':>8}")
for r in conn.execute("SELECT user,plan,price,status FROM subs").fetchall():
print(f"{r[0]:8s} {r[1]:6s} ${r[2]:>7.2f} {r[3]:>8}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE accounts (id INT PRIMARY KEY, owner TEXT, balance REAL)")
conn.executemany("INSERT INTO accounts VALUES (?,?,?)", [
(1, "Alice", 1000.0),
(2, "Bob", 500.0),
(3, "Carol", 250.0),
])
conn.commit()
def get_balance(conn, account_id):
row = conn.execute("SELECT balance FROM accounts WHERE id=?", (account_id,)).fetchone()
return row[0] if row else None
def transfer(conn, from_id, to_id, amount):
# TODO: Check sender balance
# balance = get_balance(conn, from_id)
# if balance is None or balance < amount:
# print(f"Transfer failed: insufficient funds (balance={balance})")
# return False
# TODO: Perform transfer in a transaction
# try:
# conn.execute("UPDATE accounts SET balance=balance-? WHERE id=?", (amount, from_id))
# conn.execute("UPDATE accounts SET balance=balance+? WHERE id=?", (amount, to_id))
# conn.commit()
# print(f"Transferred ${amount} from account {from_id} to {to_id}")
# return True
# except Exception as e:
# conn.rollback()
# print(f"Transfer error: {e}")
# return False
pass
# TODO: Test successful transfer: Alice -> Bob, $200
# transfer(conn, 1, 2, 200)
# TODO: Test failing transfer: Carol -> Alice, $500 (Carol only has $250)
# transfer(conn, 3, 1, 500)
# TODO: Print final balances
# for r in conn.execute("SELECT owner, balance FROM accounts").fetchall():
# print(f" {r[0]:8s} ${r[1]:.2f}")
conn.close()Indexes dramatically speed up queries on large tables. Use EXPLAIN QUERY PLAN to see whether SQLite uses your indexes.
import sqlite3, time, random
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE events (id INT, user_id INT, event_type TEXT, ts TEXT)")
rows = [(i, random.randint(1,10000), random.choice(["click","view","buy"]),
f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}")
for i in range(200000)]
conn.executemany("INSERT INTO events VALUES (?,?,?,?)", rows)
conn.commit()
t0 = time.perf_counter()
conn.execute("SELECT COUNT(*) FROM events WHERE user_id=42").fetchone()
no_idx = time.perf_counter() - t0
conn.execute("CREATE INDEX idx_user ON events(user_id)")
t0 = time.perf_counter()
conn.execute("SELECT COUNT(*) FROM events WHERE user_id=42").fetchone()
with_idx = time.perf_counter() - t0
print(f"No index: {no_idx*1000:.2f}ms")
print(f"With index: {with_idx*1000:.2f}ms")
print(f"Speedup: {no_idx/max(with_idx,1e-9):.0f}x")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, cust TEXT, status TEXT, amount REAL)")
conn.execute("CREATE INDEX idx_status ON orders(status)")
plan = conn.execute(
"EXPLAIN QUERY PLAN "
"SELECT cust, SUM(amount) FROM orders "
"WHERE status='pending' GROUP BY cust"
).fetchall()
print("Query plan:")
for row in plan: print(" ", row)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE users ("
" id INT PRIMARY KEY, email TEXT UNIQUE, role TEXT, active INT)"
)
conn.executemany("INSERT INTO users VALUES (?,?,?,?)", [
(1,"alice@x.com","admin",1),(2,"bob@x.com","user",1),
(3,"carol@x.com","user",0),(4,"dave@x.com","mod",1),
])
# UNIQUE index is implicit on email β test enforcement
try:
conn.execute("INSERT INTO users VALUES (5,'alice@x.com','user',1)")
except Exception as e:
print("UNIQUE violation caught:", e)
# Partial-like index: index only active users
conn.execute("CREATE INDEX idx_active_role ON users(role) WHERE active=1")
# Verify index is used
plan = conn.execute(
"EXPLAIN QUERY PLAN SELECT id FROM users WHERE role='admin' AND active=1"
).fetchall()
print("Query plan (with partial index):")
for row in plan: print(" ", row)
# Count using the index
n = conn.execute("SELECT COUNT(*) FROM users WHERE active=1").fetchone()[0]
print(f"Active users: {n}")
conn.close()import sqlite3, time, random
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE orders ("
" id INT, cust_id INT, status TEXT, amount REAL, created TEXT)"
)
rows = [
(i, random.randint(1, 500), random.choice(["pending","shipped","done"]),
round(random.uniform(10, 1000), 2),
f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}")
for i in range(100000)
]
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?)", rows)
conn.commit()
# Query that only needs (status, amount) β a covering index can satisfy it entirely
query = (
"SELECT status, SUM(amount) as total "
"FROM orders WHERE status='pending' GROUP BY status"
)
# Without covering index
plan_before = conn.execute("EXPLAIN QUERY PLAN " + query).fetchall()
t0 = time.perf_counter()
conn.execute(query).fetchall()
t_before = time.perf_counter() - t0
# Create a covering index β includes all columns touched by the query
conn.execute("CREATE INDEX idx_cover ON orders(status, amount)")
plan_after = conn.execute("EXPLAIN QUERY PLAN " + query).fetchall()
t0 = time.perf_counter()
conn.execute(query).fetchall()
t_after = time.perf_counter() - t0
print("Plan WITHOUT covering index:", plan_before[0][-1] if plan_before else "N/A")
print("Plan WITH covering index: ", plan_after[0][-1] if plan_after else "N/A")
print(f"Time before: {t_before*1000:.2f}ms after: {t_after*1000:.2f}ms")
conn.close()import sqlite3, time, random, datetime
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE orders ("
" id INT, cust_id INT, product_id INT,"
" status TEXT, amount REAL, created TEXT)"
)
conn.execute("CREATE TABLE products (id INT PRIMARY KEY, name TEXT, category TEXT)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
(i, f"Prod{i}", random.choice(["Electronics","Food","Clothing","Hardware"]))
for i in range(1, 201)
])
rows = [(i, random.randint(1,5000), random.randint(1,200),
random.choice(["pending","shipped","delivered"]),
round(random.uniform(10,1000),2),
(datetime.date(2024,1,1)+datetime.timedelta(days=random.randint(0,90))).isoformat())
for i in range(1,50001)]
conn.executemany("INSERT INTO orders VALUES (?,?,?,?,?,?)", rows)
for idx in [
"CREATE INDEX idx_status ON orders(status, created)",
"CREATE INDEX idx_prod ON orders(product_id)",
]:
conn.execute(idx)
t0 = time.perf_counter()
result = conn.execute(
"SELECT p.category, o.status, COUNT(*) as cnt, ROUND(SUM(o.amount),2) as rev "
"FROM orders o JOIN products p ON o.product_id=p.id "
"WHERE o.status != 'pending' "
"GROUP BY p.category, o.status ORDER BY rev DESC LIMIT 8"
).fetchall()
print(f"Query: {(time.perf_counter()-t0)*1000:.1f}ms ({len(result)} rows)")
for r in result[:4]:
print(f" {r[0]:14s} | {r[1]:10s} | {r[2]:>5d} | ${r[3]:>10,.2f}")
conn.close()import sqlite3, random, time, datetime
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE log_events (id INT, user_id INT, event_type TEXT, created_at TEXT)"
)
base = datetime.date(2024, 1, 1)
events = ["login", "logout", "purchase", "view"]
rows = [
(i, random.randint(1, 1000), random.choice(events),
(base + datetime.timedelta(days=random.randint(0, 180))).isoformat())
for i in range(1, 100001)
]
conn.executemany("INSERT INTO log_events VALUES (?,?,?,?)", rows)
conn.commit()
# TODO: 1. Time query WITHOUT index
# t0 = time.perf_counter()
# conn.execute("SELECT COUNT(*) FROM log_events WHERE user_id=42").fetchone()
# print(f"No index: {(time.perf_counter()-t0)*1000:.2f}ms")
# TODO: 2. Create index on user_id and re-time
# conn.execute("CREATE INDEX idx_user_id ON log_events(user_id)")
# t0 = time.perf_counter()
# conn.execute("SELECT COUNT(*) FROM log_events WHERE user_id=42").fetchone()
# print(f"With index: {(time.perf_counter()-t0)*1000:.2f}ms")
# TODO: 3. Compound index on (event_type, created_at)
# conn.execute("CREATE INDEX idx_event_date ON log_events(event_type, created_at)")
# plan = conn.execute(
# "EXPLAIN QUERY PLAN SELECT * FROM log_events "
# "WHERE event_type='purchase' AND created_at >= '2024-06-01'"
# ).fetchall()
# print("Query plan:", plan)
conn.close()Pandas integrates directly with SQLite: read query results into DataFrames with pd.read_sql(), write DataFrames back with .to_sql().
import sqlite3
import pandas as pd
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (date TEXT, region TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("2024-01-01","North",1200),("2024-01-01","South",800),
("2024-01-02","North",950),("2024-01-02","East",1100),
])
# SQL β DataFrame
df = pd.read_sql("SELECT * FROM sales", conn)
print(df)
print(df.groupby("region")["amount"].sum())
conn.close()import sqlite3
import pandas as pd
import numpy as np
conn = sqlite3.connect(":memory:")
df = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=30).astype(str),
"value": np.random.randn(30).cumsum() + 100,
"category": np.random.choice(["A","B","C"], 30),
})
df.to_sql("ts", conn, index=False, if_exists="replace")
result = pd.read_sql(
"SELECT category, COUNT(*) as n, ROUND(AVG(value),2) as avg_val "
"FROM ts GROUP BY category ORDER BY avg_val DESC",
conn
)
print(result)
conn.close()import sqlite3
import pandas as pd
import numpy as np
conn = sqlite3.connect(":memory:")
np.random.seed(7)
# Build a large DataFrame and write in chunks
df = pd.DataFrame({
"user_id": np.random.randint(1, 201, 5000),
"product": np.random.choice(["A","B","C","D"], 5000),
"amount": np.round(np.random.uniform(5, 500, 5000), 2),
"month": np.random.choice(["Jan","Feb","Mar","Apr"], 5000),
})
df.to_sql("txn", conn, index=False, if_exists="replace", chunksize=1000)
# Parameterized SQL via pd.read_sql with params argument
month_filter = "Mar"
result = pd.read_sql(
"SELECT product, COUNT(*) as sales, ROUND(SUM(amount),2) as revenue "
"FROM txn WHERE month=? GROUP BY product ORDER BY revenue DESC",
conn, params=(month_filter,)
)
print(f"Sales in {month_filter}:")
print(result.to_string(index=False))
conn.close()import sqlite3
import pandas as pd
import numpy as np
conn = sqlite3.connect(":memory:")
np.random.seed(3)
base_df = pd.DataFrame({
"id": range(1, 11),
"name": [f"Product_{i}" for i in range(1, 11)],
"price": np.round(np.random.uniform(5, 200, 10), 2),
"stock": np.random.randint(0, 500, 10),
})
# First write: create the table
base_df.to_sql("products", conn, index=False, if_exists="replace")
print(f"Initial rows: {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")
# Append new rows with if_exists='append'
new_rows = pd.DataFrame({
"id": [11, 12], "name": ["Product_11","Product_12"],
"price": [19.99, 34.99], "stock": [100, 50],
})
new_rows.to_sql("products", conn, index=False, if_exists="append")
print(f"After append: {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")
# Replace entirely with if_exists='replace'
replacement = base_df.head(3).copy()
replacement.to_sql("products", conn, index=False, if_exists="replace")
print(f"After replace: {pd.read_sql('SELECT COUNT(*) as n FROM products', conn).iloc[0,0]}")
# Read back with column type check
df_back = pd.read_sql("SELECT * FROM products", conn)
print(df_back.dtypes.to_string())
conn.close()import sqlite3
import pandas as pd
import numpy as np
conn = sqlite3.connect(":memory:")
customers = pd.DataFrame({
"id": range(1, 101),
"segment": np.random.choice(["retail","wholesale","online"], 100),
"region": np.random.choice(["North","South","East","West"], 100),
})
transactions = pd.DataFrame({
"customer_id": np.random.randint(1, 101, 2000),
"product": np.random.choice(["A","B","C","D"], 2000),
"amount": np.random.uniform(10, 500, 2000).round(2),
})
customers.to_sql("customers", conn, index=False)
transactions.to_sql("transactions", conn, index=False)
report = pd.read_sql(
"SELECT c.segment, c.region, "
" COUNT(DISTINCT t.customer_id) as customers, "
" ROUND(SUM(t.amount),2) as revenue, "
" ROUND(AVG(t.amount),2) as avg_order "
"FROM transactions t "
"JOIN customers c ON t.customer_id=c.id "
"GROUP BY c.segment, c.region "
"ORDER BY revenue DESC LIMIT 8",
conn
)
print(report.to_string(index=False))
conn.close()import sqlite3
import pandas as pd
import numpy as np
conn = sqlite3.connect(":memory:")
np.random.seed(42)
conn.execute(
"CREATE TABLE transactions "
"(id INT, customer TEXT, category TEXT, amount REAL, txn_date TEXT)"
)
import datetime, random
base = datetime.date(2024, 1, 1)
cats = ["Food", "Electronics", "Clothing", "Sports"]
customers = ["Alice","Bob","Carol","Dave","Eve"]
rows = [
(i, random.choice(customers), random.choice(cats),
round(random.uniform(10, 300), 2),
(base + datetime.timedelta(days=random.randint(0, 89))).isoformat())
for i in range(1, 201)
]
conn.executemany("INSERT INTO transactions VALUES (?,?,?,?,?)", rows)
conn.commit()
# TODO: 1. Create a VIEW named 'monthly_summary'
# conn.execute(
# "CREATE VIEW monthly_summary AS "
# "SELECT strftime('%Y-%m', txn_date) as month, "
# " category, COUNT(*) as txns, ROUND(SUM(amount),2) as revenue "
# "FROM transactions GROUP BY month, category"
# )
# TODO: 2. Query the view directly
# for r in conn.execute("SELECT * FROM monthly_summary ORDER BY revenue DESC LIMIT 5").fetchall():
# print(r)
# TODO: 3. Load into DataFrame and show top 3 by revenue
# df = pd.read_sql("SELECT * FROM monthly_summary ORDER BY revenue DESC", conn)
# print(df.head(3).to_string(index=False))
conn.close()Recursive CTEs traverse hierarchical data (org charts, tree structures). CASE expressions and COALESCE handle conditional logic elegantly.
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE emp (id INT, name TEXT, mgr_id INT, salary REAL)")
conn.executemany("INSERT INTO emp VALUES (?,?,?,?)", [
(1,"CEO",None,300000),(2,"CTO",1,200000),(3,"CFO",1,190000),
(4,"Dev Lead",2,130000),(5,"Dev1",4,100000),(6,"Dev2",4,95000),
(7,"Finance1",3,80000),
])
rows = conn.execute(
"WITH RECURSIVE org AS ("
" SELECT id, name, mgr_id, 0 AS depth FROM emp WHERE mgr_id IS NULL "
" UNION ALL "
" SELECT e.id, e.name, e.mgr_id, o.depth+1 "
" FROM emp e JOIN org o ON e.mgr_id=o.id"
") "
"SELECT depth, name FROM org ORDER BY depth, name"
).fetchall()
for d, name in rows:
print(" " * d + name)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE orders (id INT, amount REAL, status TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?,?)", [
(1,500,"delivered"),(2,120,"pending"),(3,None,"cancelled"),
(4,800,"delivered"),(5,200,"shipped"),(6,None,"pending"),
])
rows = conn.execute(
"SELECT id,"
" COALESCE(amount, 0) as safe_amount,"
" CASE "
" WHEN amount > 500 THEN 'high'"
" WHEN amount > 100 THEN 'medium'"
" WHEN amount IS NULL THEN 'unknown'"
" ELSE 'low'"
" END as tier,"
" status "
"FROM orders ORDER BY id"
).fetchall()
for r in rows: print(r)
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
# Generate Fibonacci numbers using a recursive CTE
rows = conn.execute(
"WITH RECURSIVE fib(n, a, b) AS ("
" SELECT 1, 0, 1 "
" UNION ALL "
" SELECT n+1, b, a+b FROM fib WHERE n < 15"
") "
"SELECT n, a as fib_value FROM fib"
).fetchall()
print("Fibonacci sequence:")
for n, v in rows:
print(f" F({n:2d}) = {v}")
# Generate a date series using recursive CTE
rows2 = conn.execute(
"WITH RECURSIVE dates(d) AS ("
" SELECT '2024-01-01' "
" UNION ALL "
" SELECT date(d, '+1 day') FROM dates WHERE d < '2024-01-07'"
") "
"SELECT d, strftime('%A', d) as weekday FROM dates"
).fetchall()
print("Week of 2024-01-01:")
for d, wd in rows2:
print(f" {d} {wd}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
c = conn.cursor()
c.executescript(
"CREATE TABLE employees ("
" id INTEGER PRIMARY KEY, name TEXT, dept TEXT, salary REAL, hire_date TEXT"
");"
"INSERT INTO employees VALUES"
" (1,'Alice','Engineering',95000,'2020-03-15'),"
" (2,'Bob','Marketing',72000,'2019-07-01'),"
" (3,'Carol','Engineering',105000,'2018-11-20'),"
" (4,'Dave','HR',65000,'2021-02-28'),"
" (5,'Eve','Marketing',78000,'2020-09-10'),"
" (6,'Frank','Engineering',88000,'2022-01-05');"
)
# View: department salary summary
c.execute(
"CREATE VIEW dept_summary AS "
"SELECT dept, COUNT(*) AS headcount, "
" ROUND(AVG(salary),2) AS avg_salary, "
" MAX(salary) AS max_salary, MIN(salary) AS min_salary "
"FROM employees GROUP BY dept"
)
# View: employees with tenure
c.execute(
"CREATE VIEW emp_tenure AS "
"SELECT name, dept, salary, "
" ROUND((julianday('now') - julianday(hire_date)) / 365.25, 1) AS years_tenure "
"FROM employees"
)
# Query the views
print("Department summary:")
c.execute("SELECT * FROM dept_summary ORDER BY avg_salary DESC")
for row in c.fetchall():
print(f" {row[0]:12s} headcount={row[1]}, avg=${row[2]:,.0f}, max=${row[3]:,.0f}")
print("\nEmployee tenure:")
c.execute("SELECT * FROM emp_tenure WHERE years_tenure > 3 ORDER BY years_tenure DESC")
for row in c.fetchall():
print(f" {row[0]:6s} ({row[1]}) - {row[3]} years, ${row[2]:,.0f}")
# Drop view
c.execute("DROP VIEW emp_tenure")
print("\nViews remaining:", [r[0] for r in c.execute("SELECT name FROM sqlite_master WHERE type='view'")])
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE categories (id INT PRIMARY KEY, name TEXT, parent_id INT)")
conn.executemany("INSERT INTO categories VALUES (?,?,?)", [
(1,"All",None),(2,"Electronics",1),(3,"Computers",2),
(4,"Laptops",3),(5,"Gaming Laptops",4),(6,"Ultrabooks",4),
(7,"Phones",2),(8,"Android",7),(9,"Clothing",1),(10,"Mens",9),
])
rows = conn.execute(
"WITH RECURSIVE tree AS ("
" SELECT id, name, parent_id, CAST(name AS TEXT) as path, 0 as depth "
" FROM categories WHERE parent_id IS NULL "
" UNION ALL "
" SELECT c.id, c.name, c.parent_id, t.path||' > '||c.name, t.depth+1 "
" FROM categories c JOIN tree t ON c.parent_id=t.id"
") "
"SELECT id, depth, path FROM tree ORDER BY path"
).fetchall()
for r in rows:
print(f" {' '*r[1]}[{r[0]}] {r[2]}")
conn.close()import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE sales (rep TEXT, quarter TEXT, amount REAL)")
conn.executemany("INSERT INTO sales VALUES (?,?,?)", [
("Alice","Q1",12000),("Alice","Q2",15000),("Alice","Q3",11000),("Alice","Q4",18000),
("Bob","Q1",9000),("Bob","Q2",10500),("Bob","Q3",12000),("Bob","Q4",8500),
("Carol","Q1",20000),("Carol","Q2",18500),("Carol","Q3",22000),("Carol","Q4",19000),
])
conn.commit()
# TODO: Write a CTE query
# rows = conn.execute(
# "WITH rep_totals AS ( "
# " SELECT rep, SUM(amount) as total FROM sales GROUP BY rep "
# "), "
# "company_avg AS ( "
# " SELECT AVG(total) as avg_total FROM rep_totals "
# ") "
# "SELECT r.rep, r.total, "
# " CASE "
# " WHEN r.total > c.avg_total * 1.1 THEN 'above_avg' "
# " WHEN r.total < c.avg_total * 0.9 THEN 'below_avg' "
# " ELSE 'average' "
# " END as performance "
# "FROM rep_totals r, company_avg c "
# "ORDER BY r.total DESC"
# ).fetchall()
# for r in rows:
# print(f" {r[0]:8s} total=${r[1]:,} performance={r[2]}")
conn.close()Write readable, modular SQL with WITH clauses. Break complex queries into named steps and use recursive CTEs for hierarchical data.
import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE orders (order_id INT, customer_id INT, amount REAL, order_date TEXT);
INSERT INTO orders VALUES
(1,101,250,'2024-01-05'),(2,102,80,'2024-01-10'),(3,101,120,'2024-01-15'),
(4,103,400,'2024-02-01'),(5,102,60,'2024-02-10'),(6,101,90,'2024-02-20'),
(7,104,300,'2024-03-01'),(8,103,150,'2024-03-05'),(9,102,200,'2024-03-15');
""")
query = """
WITH customer_totals AS (
SELECT customer_id,
SUM(amount) AS total_spend,
COUNT(*) AS order_count,
AVG(amount) AS avg_order
FROM orders
GROUP BY customer_id
),
high_value AS (
SELECT * FROM customer_totals WHERE total_spend > 300
)
SELECT customer_id,
ROUND(total_spend, 2) AS total_spend,
order_count,
ROUND(avg_order, 2) AS avg_order
FROM high_value
ORDER BY total_spend DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (sale_id INT, product TEXT, region TEXT, amount REAL, sale_date TEXT);
INSERT INTO sales VALUES
(1,'Laptop','East',1200,'2024-01-10'),(2,'Mouse','West',30,'2024-01-15'),
(3,'Laptop','West',1100,'2024-01-20'),(4,'Monitor','East',400,'2024-02-05'),
(5,'Mouse','East',25,'2024-02-10'),(6,'Monitor','West',380,'2024-02-20'),
(7,'Laptop','East',1300,'2024-03-01'),(8,'Keyboard','East',80,'2024-03-10');
""")
query = """
WITH regional_totals AS (
SELECT region, SUM(amount) AS regional_total FROM sales GROUP BY region
),
product_totals AS (
SELECT product, SUM(amount) AS product_total FROM sales GROUP BY product
),
combined AS (
SELECT s.product, s.region, s.amount,
r.regional_total, p.product_total,
ROUND(s.amount * 100.0 / r.regional_total, 1) AS pct_of_region
FROM sales s
JOIN regional_totals r ON s.region = r.region
JOIN product_totals p ON s.product = p.product
)
SELECT product, region, amount, pct_of_region
FROM combined
ORDER BY region, pct_of_region DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE employees (id INT, name TEXT, manager_id INT);
INSERT INTO employees VALUES
(1,'Alice',NULL),(2,'Bob',1),(3,'Carol',1),
(4,'Dave',2),(5,'Eve',2),(6,'Frank',3),(7,'Grace',3),(8,'Hank',4);
""")
query = """
WITH RECURSIVE org_chart AS (
-- Base case: root (CEO)
SELECT id, name, manager_id, 0 AS depth, name AS path
FROM employees WHERE manager_id IS NULL
UNION ALL
-- Recursive step: employees who report to current level
SELECT e.id, e.name, e.manager_id, oc.depth + 1,
oc.path || ' -> ' || e.name
FROM employees e
JOIN org_chart oc ON e.manager_id = oc.id
)
SELECT id, depth, name, path FROM org_chart ORDER BY path;
"""
df = pd.read_sql(query, conn)
for _, row in df.iterrows():
print(' ' * row['depth'] + f"[{row['id']}] {row['name']}")
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (product TEXT, amount REAL);
INSERT INTO sales VALUES
('A',100),('A',200),('B',50),('B',300),('C',150),('C',250),('C',100);
""")
# Subquery version (harder to read)
q_sub = """
SELECT product, avg_amount
FROM (
SELECT product, AVG(amount) AS avg_amount
FROM sales GROUP BY product
) WHERE avg_amount > 100;
"""
# CTE version (same result, easier to read)
q_cte = """
WITH product_avgs AS (
SELECT product, AVG(amount) AS avg_amount
FROM sales GROUP BY product
)
SELECT product, ROUND(avg_amount,2) AS avg_amount
FROM product_avgs
WHERE avg_amount > 100;
"""
print('Subquery result:'); print(pd.read_sql(q_sub, conn).to_string(index=False))
print('CTE result:'); print(pd.read_sql(q_cte, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.execute('CREATE TABLE orders (order_date TEXT, amount REAL)')
import random; random.seed(42)
rows = [(f'2023-{m:02d}-15', random.uniform(10000, 50000)) for m in range(1,13)]
conn.executemany('INSERT INTO orders VALUES (?,?)', rows)
conn.commit()
query = """
-- TODO: CTE 1: monthly_revenue - SUM(amount) GROUP BY month
-- TODO: CTE 2: with_prev - add prev1, prev2 using LAG()
-- TODO: Final: add rolling_avg = (revenue+prev1+prev2)/3
-- add below_avg = CASE WHEN revenue < rolling_avg THEN 1 ELSE 0 END
"""
# df = pd.read_sql(query, conn)
# print(df.to_string(index=False))
conn.close()Perform calculations across related rows without collapsing them β rankings, running totals, lag/lead comparisons, and moving averages.
import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE scores (player TEXT, region TEXT, score INT);
INSERT INTO scores VALUES
('Alice','East',95),('Bob','East',88),('Carol','East',95),
('Dave','West',78),('Eve','West',91),('Frank','West',85),
('Grace','North',74),('Hank','North',80),('Ivy','North',80);
""")
query = """
SELECT player, region, score,
ROW_NUMBER() OVER (PARTITION BY region ORDER BY score DESC) AS row_num,
RANK() OVER (PARTITION BY region ORDER BY score DESC) AS rank,
DENSE_RANK() OVER (PARTITION BY region ORDER BY score DESC) AS dense_rank
FROM scores
ORDER BY region, score DESC;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE daily_sales (sale_date TEXT, revenue REAL);
INSERT INTO daily_sales VALUES
('2024-01-01',1200),('2024-01-02',800),('2024-01-03',1500),
('2024-01-04',600),('2024-01-05',2000),('2024-01-06',900),
('2024-01-07',1100);
""")
query = """
SELECT sale_date, revenue,
SUM(revenue) OVER (ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total,
AVG(revenue) OVER (ORDER BY sale_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg_3d,
SUM(revenue) OVER () AS grand_total,
ROUND(revenue * 100.0 / SUM(revenue) OVER (), 1) AS pct_of_total
FROM daily_sales ORDER BY sale_date;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE monthly_revenue (month TEXT, category TEXT, revenue REAL);
INSERT INTO monthly_revenue VALUES
('2024-01','Electronics',50000),('2024-02','Electronics',55000),
('2024-03','Electronics',48000),('2024-04','Electronics',62000),
('2024-01','Clothing',30000),('2024-02','Clothing',28000),
('2024-03','Clothing',35000),('2024-04','Clothing',32000);
""")
query = """
SELECT month, category, revenue,
LAG(revenue) OVER (PARTITION BY category ORDER BY month) AS prev_month,
LEAD(revenue) OVER (PARTITION BY category ORDER BY month) AS next_month,
ROUND((revenue - LAG(revenue) OVER (PARTITION BY category ORDER BY month))
* 100.0 / LAG(revenue) OVER (PARTITION BY category ORDER BY month), 1) AS mom_pct
FROM monthly_revenue
ORDER BY category, month;
"""
print(pd.read_sql(query, conn).to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE customers (name TEXT, lifetime_value REAL);
INSERT INTO customers VALUES
('Alice',1200),('Bob',450),('Carol',3400),('Dave',890),('Eve',2100),
('Frank',200),('Grace',4500),('Hank',700),('Ivy',1800),('Jake',350),
('Kim',2800),('Leo',600),('Mia',950),('Ned',1100),('Ora',3100);
""")
query = """
SELECT name, lifetime_value,
NTILE(4) OVER (ORDER BY lifetime_value) AS quartile,
NTILE(10) OVER (ORDER BY lifetime_value) AS decile,
RANK() OVER (ORDER BY lifetime_value DESC) AS rank
FROM customers
ORDER BY lifetime_value DESC;
"""
df = pd.read_sql(query, conn)
print(df.to_string(index=False))
print('\nQuartile summary:')
print(df.groupby('quartile')['lifetime_value'].agg(['min','max','mean']).round(0))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE sales (salesperson TEXT, region TEXT, revenue REAL);
INSERT INTO sales VALUES
('Alice','East',95000),('Bob','East',88000),('Carol','East',102000),
('Dave','West',78000),('Eve','West',91000),('Frank','West',85000),
('Grace','North',67000),('Hank','North',74000);
""")
query = """
-- TODO: DENSE_RANK within region by revenue desc
-- TODO: AVG(revenue) OVER region
-- TODO: revenue - avg as diff_from_avg
-- TODO: CASE WHEN rank=1 THEN 'CHAMPION' ELSE '' END
"""
# df = pd.read_sql(query, conn)
# print(df.to_string(index=False))
conn.close()Write faster SQL β understand EXPLAIN QUERY PLAN, use indexes effectively, and rewrite slow patterns as efficient alternatives.
import sqlite3, time
conn = sqlite3.connect(':memory:')
import random; random.seed(42)
n = 100000
conn.execute('CREATE TABLE orders (order_id INT, customer_id INT, amount REAL, status TEXT)')
conn.executemany('INSERT INTO orders VALUES (?,?,?,?)',
[(i, random.randint(1,1000), random.uniform(10,500), random.choice(['pending','shipped','done']))
for i in range(n)])
conn.commit()
# Query WITHOUT index
t0 = time.perf_counter()
conn.execute('SELECT * FROM orders WHERE customer_id = 42').fetchall()
t1 = time.perf_counter()
print(f'Without index: {(t1-t0)*1000:.2f} ms')
# Create index
conn.execute('CREATE INDEX idx_customer ON orders(customer_id)')
t0 = time.perf_counter()
conn.execute('SELECT * FROM orders WHERE customer_id = 42').fetchall()
t1 = time.perf_counter()
print(f'With index: {(t1-t0)*1000:.2f} ms')
# Composite index for common filter+sort
conn.execute('CREATE INDEX idx_status_amount ON orders(status, amount DESC)')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
import random; random.seed(0)
conn.execute('CREATE TABLE sales (id INT, region TEXT, amount REAL)')
conn.executemany('INSERT INTO sales VALUES (?,?,?)',
[(i, random.choice(['East','West','North']), random.uniform(10,1000)) for i in range(10000)])
conn.commit()
def explain(conn, query, label):
plan = conn.execute(f'EXPLAIN QUERY PLAN {query}').fetchall()
print(f'\n--- {label} ---')
for row in plan:
print(' ', row)
explain(conn, 'SELECT * FROM sales WHERE region = "East"', 'No index')
conn.execute('CREATE INDEX idx_region ON sales(region)')
explain(conn, 'SELECT * FROM sales WHERE region = "East"', 'With index')
explain(conn, 'SELECT region, SUM(amount) FROM sales GROUP BY region', 'Aggregation')
conn.close()import sqlite3, time
conn = sqlite3.connect(':memory:')
import random; random.seed(42)
conn.execute('CREATE TABLE customers (id INT PRIMARY KEY, name TEXT)')
conn.execute('CREATE TABLE orders (id INT, customer_id INT, amount REAL)')
conn.executemany('INSERT INTO customers VALUES (?,?)', [(i,f'C{i}') for i in range(1000)])
conn.executemany('INSERT INTO orders VALUES (?,?,?)',
[(i, random.randint(1,1000), random.uniform(10,500)) for i in range(50000)])
conn.commit()
conn.execute('CREATE INDEX idx_oid ON orders(customer_id)')
queries = {
'IN subquery': 'SELECT * FROM customers WHERE id IN (SELECT DISTINCT customer_id FROM orders WHERE amount > 400)',
'EXISTS': 'SELECT * FROM customers c WHERE EXISTS (SELECT 1 FROM orders o WHERE o.customer_id=c.id AND o.amount>400)',
'JOIN': 'SELECT DISTINCT c.* FROM customers c JOIN orders o ON c.id=o.customer_id WHERE o.amount>400',
}
for label, q in queries.items():
t0 = time.perf_counter()
rows = conn.execute(q).fetchall()
print(f'{label:15s}: {(time.perf_counter()-t0)*1000:.2f} ms, {len(rows)} rows')
conn.close()import sqlite3, time
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE authors (id INT, name TEXT);
CREATE TABLE books (id INT, title TEXT, author_id INT, pages INT);
""")
import random; random.seed(0)
authors = [(i, f'Author {i}') for i in range(50)]
books = [(i, f'Book {i}', random.randint(0,49), random.randint(100,600)) for i in range(500)]
conn.executemany('INSERT INTO authors VALUES (?,?)', authors)
conn.executemany('INSERT INTO books VALUES (?,?,?,?)', books)
conn.commit()
# N+1 pattern (BAD)
t0 = time.perf_counter()
all_books = conn.execute('SELECT * FROM books').fetchall()
results = []
for b in all_books:
auth = conn.execute('SELECT name FROM authors WHERE id=?', (b[2],)).fetchone()
results.append((b[1], auth[0] if auth else 'Unknown'))
print(f'N+1 queries: {(time.perf_counter()-t0)*1000:.2f} ms ({len(results)} results)')
# Single JOIN (GOOD)
t0 = time.perf_counter()
results2 = conn.execute('SELECT b.title, a.name FROM books b LEFT JOIN authors a ON b.author_id=a.id').fetchall()
print(f'Single JOIN: {(time.perf_counter()-t0)*1000:.2f} ms ({len(results2)} results)')
conn.close()import sqlite3, time
conn = sqlite3.connect(':memory:')
conn.execute('CREATE TABLE orders (id INT, customer_id INT, amount REAL, status TEXT)')
import random; random.seed(42)
rows = [(i, random.randint(1,1000), random.uniform(10,500), random.choice(['pending','done'])) for i in range(100000)]
conn.executemany('INSERT INTO orders VALUES (?,?,?,?)', rows)
conn.commit()
q = 'SELECT * FROM orders WHERE customer_id = 42'
# TODO: time query without index
# TODO: EXPLAIN QUERY PLAN before index
# TODO: CREATE INDEX idx_cust ON orders(customer_id)
# TODO: time query with index
# TODO: EXPLAIN QUERY PLAN after index
# TODO: print speedup factor
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE orders (id INTEGER PRIMARY KEY, customer_id INTEGER,
amount REAL, status TEXT, order_date TEXT);
INSERT INTO orders SELECT value, (value%100)+1, RANDOM()*1000,
CASE value%3 WHEN 0 THEN 'pending' WHEN 1 THEN 'completed' ELSE 'cancelled' END,
date('2024-01-01', '+'||(value%365)||' days')
FROM generate_series(1, 10000);
''')
plan = conn.execute('''
EXPLAIN QUERY PLAN
SELECT customer_id, COUNT(*), AVG(amount)
FROM orders WHERE status='completed'
GROUP BY customer_id ORDER BY AVG(amount) DESC
''').fetchall()
print('Query plan (no index):')
for row in plan: print(' ', row)
conn.close()import sqlite3, time
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE sales (id INTEGER PRIMARY KEY, region TEXT, amount REAL, sale_date TEXT);
INSERT INTO sales SELECT value, CASE value%4 WHEN 0 THEN 'North' WHEN 1 THEN 'South'
WHEN 2 THEN 'East' ELSE 'West' END, RANDOM()*1000,
date('2024-01-01', '+'||(value%365)||' days')
FROM generate_series(1, 100000);
''')
query = "SELECT region, AVG(amount) FROM sales WHERE sale_date > '2024-06-01' GROUP BY region"
t0 = time.time(); conn.execute(query).fetchall(); t_slow = time.time()-t0
conn.execute('CREATE INDEX idx_date ON sales(sale_date)')
t0 = time.time(); conn.execute(query).fetchall(); t_fast = time.time()-t0
print(f'Without index: {t_slow*1000:.1f}ms')
print(f'With index: {t_fast*1000:.1f}ms')
print(f'Speedup: {t_slow/max(t_fast,0.001):.1f}x')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, dept_id INTEGER, salary REAL);
CREATE TABLE departments (id INTEGER PRIMARY KEY, name TEXT);
INSERT INTO departments VALUES (1,'Engineering'),(2,'Sales'),(3,'HR');
INSERT INTO employees SELECT v, 'Emp '||v, (v%3)+1, 50000+RANDOM()*50000
FROM generate_series(1,30) v;
''')
print('=== N+1 (avoid) ===')
for eid,name,dept_id in conn.execute('SELECT id,name,dept_id FROM employees LIMIT 3').fetchall():
dept = conn.execute('SELECT name FROM departments WHERE id=?',(dept_id,)).fetchone()
print(f' {name} -> {dept[0]}')
print('=== Single JOIN (preferred) ===')
for row in conn.execute('SELECT e.name, d.name, e.salary FROM employees e JOIN departments d ON e.dept_id=d.id LIMIT 3').fetchall():
print(f' {row[0]} | {row[1]} | ${row[2]:,.0f}')
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE transactions (id INTEGER PRIMARY KEY, user_id INTEGER,
amount REAL, type TEXT, ts TEXT);
INSERT INTO transactions SELECT v, (v%50)+1, RANDOM()*500,
CASE v%3 WHEN 0 THEN 'purchase' WHEN 1 THEN 'refund' ELSE 'fee' END,
date('2024-01-01','+'||(v%300)||' days') FROM generate_series(1,1000) v;
''')
cte_query = '''
WITH purchase_summary AS (
SELECT user_id, SUM(amount) total, COUNT(*) cnt
FROM transactions WHERE type='purchase' GROUP BY user_id
), avg_thresh AS (
SELECT AVG(amount)*5 threshold FROM transactions WHERE type='purchase'
)
SELECT ps.user_id, ROUND(ps.total,2) total, ps.cnt
FROM purchase_summary ps, avg_thresh at WHERE ps.total > at.threshold
ORDER BY ps.total DESC LIMIT 5
'''
df = pd.read_sql(cte_query, conn)
print('Top buyers via CTE:')
print(df.to_string(index=False))
conn.close()import sqlite3, time
conn = sqlite3.connect(':memory:')
# TODO: create orders table with 100K rows (id, customer_id, amount, status)
# TODO: time filter query on customer_id=42
# TODO: EXPLAIN QUERY PLAN before index
# TODO: CREATE INDEX idx_cust ON orders(customer_id)
# TODO: time query with index, print speedup
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE sales (year INTEGER, quarter TEXT, region TEXT, revenue REAL);
INSERT INTO sales VALUES
(2024,'Q1','North',120000),(2024,'Q2','North',135000),(2024,'Q3','North',118000),(2024,'Q4','North',145000),
(2024,'Q1','South',98000),(2024,'Q2','South',105000),(2024,'Q3','South',112000),(2024,'Q4','South',125000);
''')
df = pd.read_sql('''
SELECT region,
SUM(CASE WHEN quarter='Q1' THEN revenue ELSE 0 END) Q1,
SUM(CASE WHEN quarter='Q2' THEN revenue ELSE 0 END) Q2,
SUM(CASE WHEN quarter='Q3' THEN revenue ELSE 0 END) Q3,
SUM(CASE WHEN quarter='Q4' THEN revenue ELSE 0 END) Q4,
SUM(revenue) Total
FROM sales GROUP BY region ORDER BY Total DESC
''', conn)
print(df.to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE server_metrics (server TEXT, cpu_pct REAL, mem_pct REAL, disk_pct REAL, net_pct REAL);
INSERT INTO server_metrics VALUES ('web-01',72.5,65.2,45.0,30.1),
('web-02',55.0,70.8,52.3,28.7),('db-01',88.2,91.5,78.4,15.2);
''')
df = pd.read_sql('''
SELECT server,'CPU' metric, cpu_pct value FROM server_metrics
UNION ALL SELECT server,'Memory',mem_pct FROM server_metrics
UNION ALL SELECT server,'Disk',disk_pct FROM server_metrics
UNION ALL SELECT server,'Network',net_pct FROM server_metrics
ORDER BY server, metric
''', conn)
print(df.to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE survey (respondent INTEGER, question TEXT, score INTEGER);
INSERT INTO survey VALUES (1,'Q1',4),(1,'Q2',5),(1,'Q3',3),(1,'Q4',4),
(2,'Q1',3),(2,'Q2',4),(2,'Q3',5),(2,'Q4',2),(3,'Q1',5),(3,'Q2',5),(3,'Q3',4),(3,'Q4',5);
''')
questions = [r[0] for r in conn.execute('SELECT DISTINCT question FROM survey ORDER BY question')]
cases = ','.join(f"MAX(CASE WHEN question='{q}' THEN score END) AS {q}" for q in questions)
df = pd.read_sql(f'SELECT respondent, {cases}, ROUND(AVG(score),2) avg_score FROM survey GROUP BY respondent ORDER BY avg_score DESC', conn)
print(df.to_string(index=False))
conn.close()import sqlite3, pandas as pd
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE feedback (category TEXT, sentiment TEXT, count INTEGER);
INSERT INTO feedback VALUES
('Product','Positive',450),('Product','Neutral',120),('Product','Negative',80),
('Service','Positive',380),('Service','Neutral',95),('Service','Negative',125),
('Price','Positive',200),('Price','Neutral',180),('Price','Negative',220);
''')
df = pd.read_sql('''
SELECT category,
SUM(CASE WHEN sentiment='Positive' THEN count ELSE 0 END) positive,
SUM(CASE WHEN sentiment='Neutral' THEN count ELSE 0 END) neutral,
SUM(CASE WHEN sentiment='Negative' THEN count ELSE 0 END) negative,
SUM(count) total,
ROUND(100.0*SUM(CASE WHEN sentiment='Positive' THEN count ELSE 0 END)/SUM(count),1) pct_pos
FROM feedback GROUP BY category ORDER BY pct_pos DESC
''', conn)
print(df.to_string(index=False))
conn.close()import sqlite3, pandas as pd, numpy as np
conn = sqlite3.connect(':memory:')
np.random.seed(42)
reps = ['Alice','Bob','Carol','Dave','Eve']
quarters = ['Q1','Q2','Q3','Q4']
rows = [(r,q,int(np.random.exponential(80000))) for r in reps for q in quarters]
conn.executescript('CREATE TABLE rep_sales (rep TEXT, quarter TEXT, revenue INTEGER);')
conn.executemany('INSERT INTO rep_sales VALUES (?,?,?)', rows)
# TODO: get distinct quarters dynamically
# TODO: build CASE WHEN pivot query
# TODO: read to DataFrame and print
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL);
CREATE TABLE price_audit (log_id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER, old_price REAL, new_price REAL,
changed_at DATETIME DEFAULT CURRENT_TIMESTAMP);
INSERT INTO products VALUES (1,'Widget',9.99),(2,'Gadget',24.99);
CREATE TRIGGER log_price AFTER UPDATE OF price ON products
BEGIN INSERT INTO price_audit(product_id,old_price,new_price)
VALUES(NEW.id,OLD.price,NEW.price); END;
''')
conn.execute('UPDATE products SET price=12.99 WHERE id=1')
conn.execute('UPDATE products SET price=29.99 WHERE id=2')
for row in conn.execute('SELECT * FROM price_audit').fetchall():
print(f'Product {row[1]}: ${row[2]} -> ${row[3]}')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE inventory (product_id INTEGER PRIMARY KEY, quantity INTEGER CHECK(quantity >= 0));
CREATE TABLE orders (id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, qty INTEGER);
INSERT INTO inventory VALUES (1,100),(2,5);
CREATE TRIGGER decrement_stock AFTER INSERT ON orders
BEGIN UPDATE inventory SET quantity=quantity-NEW.qty WHERE product_id=NEW.product_id; END;
''')
conn.execute('INSERT INTO orders(product_id,qty) VALUES(1,30)')
print('After ordering 30 of product 1:')
for row in conn.execute('SELECT * FROM inventory').fetchall():
print(f' Product {row[0]}: {row[1]} units')
try:
conn.execute('INSERT INTO orders(product_id,qty) VALUES(2,10)')
except Exception as e:
print(f'Order failed (CHECK constraint): {type(e).__name__}')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE accounts (id INTEGER PRIMARY KEY, name TEXT, balance REAL);
CREATE TABLE transfers (id INTEGER PRIMARY KEY AUTOINCREMENT,
from_id INTEGER, to_id INTEGER, amount REAL, ts DATETIME DEFAULT CURRENT_TIMESTAMP);
INSERT INTO accounts VALUES (1,'Alice',5000),(2,'Bob',1500),(3,'Carol',3200);
''')
def transfer(conn, from_id, to_id, amount):
bal = conn.execute('SELECT balance FROM accounts WHERE id=?',(from_id,)).fetchone()
if not bal or bal[0] < amount:
print(f' FAILED: ${bal[0] if bal else 0:.2f} < ${amount:.2f}')
return
conn.execute('BEGIN')
conn.execute('UPDATE accounts SET balance=balance-? WHERE id=?',(amount,from_id))
conn.execute('UPDATE accounts SET balance=balance+? WHERE id=?',(amount,to_id))
conn.execute('INSERT INTO transfers(from_id,to_id,amount) VALUES(?,?,?)',(from_id,to_id,amount))
conn.execute('COMMIT')
print(f' OK: ${amount:.2f} from acct {from_id} to acct {to_id}')
transfer(conn,1,2,500)
transfer(conn,2,3,3000) # fails
for row in conn.execute('SELECT id,name,balance FROM accounts').fetchall():
print(f'{row[1]}: ${row[2]:.2f}')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript('''
CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, salary REAL, dept TEXT);
INSERT INTO employees VALUES (1,'Alice',75000,'Eng'),(2,'Bob',68000,'Sales');
CREATE VIEW emp_view AS SELECT id, name, salary, dept FROM employees;
CREATE TRIGGER update_via_view INSTEAD OF UPDATE OF salary ON emp_view
BEGIN UPDATE employees SET salary=NEW.salary WHERE id=OLD.id; END;
''')
conn.execute("UPDATE emp_view SET salary=90000 WHERE name='Alice'")
for row in conn.execute('SELECT name, salary FROM employees').fetchall():
print(f'{row[0]}: ${row[1]:,.0f}')
conn.close()import sqlite3
conn = sqlite3.connect(':memory:')
# TODO: CREATE TABLE accounts (id, name, balance)
# TODO: CREATE TABLE large_withdrawals (id AUTOINCREMENT, account_id, amount, ts)
# TODO: CREATE TRIGGER log_large after INSERT on transactions WHEN amount > 1000
# TODO: enforce non-negative balance via CHECK or trigger
# TODO: insert test transactions, show results
conn.close()Subqueries (nested SELECT) can appear in WHERE, FROM, SELECT, and HAVING clauses. Master scalar, column, table, and correlated subqueries for complex filtering and derived metrics.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE sales (id INTEGER, rep TEXT, amount REAL, region TEXT)''')
runmany('INSERT INTO sales VALUES (?,?,?,?)', [
(1,'Alice',1200,'North'),(2,'Bob',800,'South'),(3,'Carol',1500,'North'),
(4,'Dave',600,'South'),(5,'Eve',2000,'North'),(6,'Frank',950,'East'),
])
# Scalar subquery: single value in WHERE
rows = run('''
SELECT rep, amount FROM sales
WHERE amount > (SELECT AVG(amount) FROM sales)
ORDER BY amount DESC
''')
print("Above-average reps:")
for r in rows: print(f" {r[0]}: ${r[1]:,.0f}")
# IN with subquery
rows = run('''
SELECT rep, amount FROM sales
WHERE region IN (SELECT DISTINCT region FROM sales WHERE amount > 1000)
AND amount < 1000
ORDER BY amount
''')
print("In high-value regions but low individual amount:")
for r in rows: print(f" {r[0]}: ${r[1]:,.0f}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE employees (id INTEGER, name TEXT, dept TEXT, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
(1,'Alice','Eng',90000),(2,'Bob','Eng',85000),(3,'Carol','Sales',70000),
(4,'Dave','Sales',72000),(5,'Eve','HR',65000),(6,'Frank','Eng',95000),
(7,'Grace','HR',68000),(8,'Henry','Sales',75000),
])
# Correlated subquery: runs ONCE PER ROW in outer query
# Find employees earning above their department average
rows = run('''
SELECT e.name, e.dept, e.salary,
ROUND((SELECT AVG(salary) FROM employees i WHERE i.dept = e.dept), 0) AS dept_avg
FROM employees e
WHERE e.salary > (SELECT AVG(salary) FROM employees i WHERE i.dept = e.dept)
ORDER BY e.dept, e.salary DESC
''')
print("Above-department-average earners:")
for r in rows:
print(f" {r[0]} ({r[1]}): ${r[2]:,.0f} (dept avg ${r[3]:,.0f})")
# EXISTS: check for related rows
rows = run('''
SELECT DISTINCT dept FROM employees e
WHERE EXISTS (
SELECT 1 FROM employees i
WHERE i.dept = e.dept AND i.salary > 80000
)
''')
print("Departments with at least one high earner:", [r[0] for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE orders (id INTEGER, cust_id INTEGER, total REAL, yr INTEGER)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
(1,1,500,2023),(2,1,300,2023),(3,2,1200,2023),(4,2,800,2024),
(5,3,200,2024),(6,1,900,2024),(7,3,600,2023),(8,2,400,2024),
])
# Derived table: aggregate first, then filter/join
rows = run('''
SELECT cust_id, total_2023, total_2024,
ROUND(total_2024 - total_2023, 0) AS growth
FROM (
SELECT cust_id,
SUM(CASE WHEN yr=2023 THEN total ELSE 0 END) AS total_2023,
SUM(CASE WHEN yr=2024 THEN total ELSE 0 END) AS total_2024
FROM orders
GROUP BY cust_id
) yearly
ORDER BY growth DESC
''')
print("Year-over-year growth by customer:")
for r in rows:
print(f" Cust {r[0]}: 2023=${r[1]:,.0f} -> 2024=${r[2]:,.0f} ({r[3]:+,.0f})")
# Derived table with LIMIT for top-N analysis
rows = run('''
SELECT cust_id, avg_order
FROM (SELECT cust_id, AVG(total) AS avg_order FROM orders GROUP BY cust_id)
WHERE avg_order > 500
''')
print("Customers with avg order > 500:", rows)import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE sales (id INTEGER, rep TEXT, region TEXT, amount REAL, quarter INTEGER)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?)', [
(1,'Alice','North',1200,1),(2,'Bob','North',800,1),(3,'Carol','South',1500,1),
(4,'Dave','South',600,1),(5,'Eve','North',2000,2),(6,'Frank','South',950,2),
(7,'Grace','East',1100,1),(8,'Henry','East',750,1),(9,'Iris','East',1300,2),
])
company_avg = run('SELECT AVG(amount) FROM sales')[0][0]
print(f"Company avg: ${company_avg:,.0f}")
rows = run(f'''
SELECT s.rep, s.region, s.amount,
ROUND(regional.avg_amount, 0) AS regional_avg
FROM sales s
JOIN (
SELECT region, AVG(amount) AS avg_amount
FROM sales GROUP BY region
) regional ON s.region = regional.region
WHERE s.amount > regional.avg_amount
AND s.amount > {company_avg}
ORDER BY s.region, s.amount DESC
''')
print("Reps beating both regional AND company average:")
for r in rows:
print(f" {r[0]} ({r[1]}): ${r[2]:,.0f} > regional ${r[3]:,.0f}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE products (id INT, name TEXT, category TEXT, price REAL, cost REAL)''')
runmany('INSERT INTO products VALUES (?,?,?,?,?)', [
(1,'Apple','Fruit',1.2,0.4),(2,'Banana','Fruit',0.5,0.15),(3,'Carrot','Veg',0.8,0.3),
(4,'Potato','Veg',0.6,0.2),(5,'Milk','Dairy',1.5,0.8),(6,'Cheese','Dairy',4.0,1.5),
(7,'Yogurt','Dairy',2.0,0.9),(8,'Broccoli','Veg',1.1,0.4),
])
# 1. Each product with its category avg price
print("=== Product vs Category Avg ===")
# TODO: write query with scalar correlated subquery for category avg
# 2. Cheapest product in each category
print("=== Cheapest per Category ===")
# TODO: use correlated subquery to find min price per category
# 3. Categories with avg margin > 30%
print("=== High-Margin Categories ===")
# TODO: derived table with margin calc, then filter
SQL provides rich string functions: UPPER/LOWER, SUBSTR, TRIM, REPLACE, LENGTH, INSTR, and LIKE/GLOB for pattern matching. Essential for cleaning and searching text data.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE contacts (id INTEGER, name TEXT, email TEXT, phone TEXT)''')
runmany('INSERT INTO contacts VALUES (?,?,?,?)', [
(1,' Alice Smith ','alice@EXAMPLE.com','(555) 123-4567'),
(2,'Bob Jones','bob.jones@mail.com','555.987.6543'),
(3,'carol lee','CAROL@COMPANY.ORG','5551234567'),
(4,'DAVE BROWN','dave@test.net','555-111-2222'),
])
rows = run('''
SELECT
id,
TRIM(name) AS name_clean,
LOWER(TRIM(name)) AS name_lower,
UPPER(SUBSTR(TRIM(name), 1, 1)) AS initial,
LOWER(email) AS email_lower,
LENGTH(TRIM(name)) AS name_len,
INSTR(TRIM(name), " ") AS space_pos,
-- Extract first name (up to first space)
SUBSTR(TRIM(name), 1, INSTR(TRIM(name)||" ", " ")-1) AS first_name
FROM contacts
''')
for r in rows:
print(f" ID {r[0]}: '{r[1]}' | initial={r[2]} | fname={r[7]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE products (id INTEGER, sku TEXT, name TEXT, category TEXT)''')
runmany('INSERT INTO products VALUES (?,?,?,?)', [
(1,'FRUIT-APL-001','Apple (Red)','Fruit'),
(2,'FRUIT-BAN-002','Banana','Fruit'),
(3,'VEG-CAR-001','Carrot','Vegetable'),
(4,'VEG-POT-002','Potato','Vegetable'),
(5,'DAIRY-MLK-001','Whole Milk','Dairy'),
(6,'FRUIT-APL-002','Apple (Green)','Fruit'),
(7,'DAIRY-CHZ-001','Cheddar Cheese','Dairy'),
])
# LIKE: case-insensitive %, _ wildcards
rows = run("SELECT name FROM products WHERE name LIKE 'Apple%'")
print("Starts with 'Apple':", [r[0] for r in rows])
rows = run("SELECT name FROM products WHERE name LIKE '%e%'")
print("Contains 'e':", [r[0] for r in rows])
rows = run("SELECT sku FROM products WHERE sku LIKE 'FRUIT-___-___'")
print("SKUs matching FRUIT-???-???:", [r[0] for r in rows])
# GLOB: case-sensitive, uses * and ? (Unix-style)
rows = run("SELECT sku FROM products WHERE sku GLOB 'FRUIT-*'")
print("GLOB FRUIT-*:", [r[0] for r in rows])
rows = run("SELECT sku FROM products WHERE sku GLOB '*-001'")
print("GLOB *-001:", [r[0] for r in rows])
# NOT LIKE
rows = run("SELECT name FROM products WHERE name NOT LIKE '%Apple%'")
print("Not apple:", [r[0] for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE raw_data (id INTEGER, raw_phone TEXT, raw_email TEXT)''')
runmany('INSERT INTO raw_data VALUES (?,?,?)', [
(1,'(555) 123-4567',' User@Example.COM '),
(2,'555.987.6543','bob.jones@MAIL.COM'),
(3,'5551234567','CAROL@company.org'),
])
rows = run('''
SELECT
id,
-- Normalize phone: keep only digits via REPLACE chain
REPLACE(REPLACE(REPLACE(REPLACE(raw_phone, "(", ""), ")", ""), "-", ""), ".", "") AS phone_digits,
-- Trim spaces and lowercase email
LOWER(TRIM(raw_email)) AS email_clean,
-- Extract domain from email
SUBSTR(
LOWER(TRIM(raw_email)),
INSTR(LOWER(TRIM(raw_email)), "@") + 1
) AS domain,
-- Check email format (has @ and .)
CASE WHEN INSTR(raw_email, "@") > 0 AND INSTR(raw_email, ".") > 0
THEN "valid" ELSE "invalid" END AS email_status
FROM raw_data
''')
for r in rows:
print(f" ID {r[0]}: phone={r[1]}, email={r[2]}, domain={r[3]}, {r[4]}")
# REPLACE for data masking
rows = run('''
SELECT id,
SUBSTR(phone_digits, 1, 3) || "****" || SUBSTR(phone_digits, 8) AS masked
FROM (SELECT id, REPLACE(REPLACE(REPLACE(raw_phone,"(",""),")",""),"-","") AS phone_digits
FROM raw_data)
''')
print("Masked phones:", [(r[0], r[1]) for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE customers (id INTEGER, email TEXT, tier TEXT)''')
runmany('INSERT INTO customers VALUES (?,?,?)', [
(1,'alice@ACME.COM','gold'),(2,'bob@tech.org','silver'),
(3,'carol@acme.com','gold'),(4,'dave@STARTUP.IO','bronze'),
(5,'eve@TECH.ORG','silver'),(6,'frank@enterprise.net','gold'),
(7,'grace@startup.io','bronze'),(8,'henry@acme.com','silver'),
(9,'iris@ENTERPRISE.NET','gold'),(10,'jack@freelance.dev','bronze'),
])
rows = run('''
SELECT
LOWER(SUBSTR(email, INSTR(email, "@") + 1)) AS domain,
COUNT(*) AS customers,
SUM(CASE WHEN tier = "gold" THEN 1 ELSE 0 END) AS gold,
SUM(CASE WHEN tier = "silver" THEN 1 ELSE 0 END) AS silver,
SUM(CASE WHEN tier = "bronze" THEN 1 ELSE 0 END) AS bronze
FROM customers
GROUP BY LOWER(SUBSTR(email, INSTR(email, "@") + 1))
ORDER BY customers DESC
''')
print("Domain analysis:")
for r in rows:
print(f" {r[0]:<20} total={r[1]} gold={r[2]} silver={r[3]} bronze={r[4]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE people (id INT, full_name TEXT, email TEXT)''')
runmany('INSERT INTO people VALUES (?,?,?)', [
(1,'Alice Marie Smith','alice.smith@company.com'),
(2,'Bob Jones','bob@mail.net'),
(3,'Carol Ann Lee','carol.lee@invalid'),
(4,'Dave','dave@x.io'),
])
rows = run('''
SELECT
id,
-- first_name: text before first space
SUBSTR(full_name, 1,
CASE WHEN INSTR(full_name, " ") > 0
THEN INSTR(full_name, " ")-1
ELSE LENGTH(full_name) END) AS first_name,
-- TODO: last_name (after last space, or full_name if no space)
-- TODO: initials
-- TODO: username (lower_first + . + last 4 chars before @)
-- TODO: email_valid
email
FROM people
''')
for r in rows: print(r)
SQL date functions compute differences, extract components, and format timestamps. In SQLite, dates are stored as TEXT (ISO), INTEGER (Unix epoch), or REAL (Julian day).
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
# SQLite stores dates as text in ISO format: 'YYYY-MM-DD'
rows = run('''
SELECT
DATE('now') AS today,
DATE('now', '+7 days') AS next_week,
DATE('now', '-30 days') AS last_month,
DATE('now', 'start of month') AS month_start,
DATE('now', 'start of year') AS year_start,
DATE('now', '+1 year', '-1 day') AS end_of_next_year,
DATETIME('now') AS now_dt,
STRFTIME('%Y-%m-%d %H:%M', 'now') AS formatted
''')
for key, val in zip(['today','next_week','last_month','month_start','year_start','eony','now_dt','fmt'], rows[0]):
print(f" {key}: {val}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE employees (id INTEGER, name TEXT, hire_date TEXT, birth_date TEXT)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
(1,'Alice','2019-03-15','1990-07-22'),
(2,'Bob','2021-11-01','1985-02-14'),
(3,'Carol','2022-06-20','1995-09-08'),
(4,'Dave','2018-01-10','1988-12-01'),
])
rows = run('''
SELECT
name,
hire_date,
-- Extract parts
STRFTIME('%Y', hire_date) AS hire_year,
STRFTIME('%m', hire_date) AS hire_month,
STRFTIME('%d', hire_date) AS hire_day,
-- Days since hire
CAST(JULIANDAY('now') - JULIANDAY(hire_date) AS INTEGER) AS days_employed,
-- Years employed
CAST((JULIANDAY('now') - JULIANDAY(hire_date)) / 365.25 AS INTEGER) AS years_employed,
-- Age from birth_date
CAST((JULIANDAY('now') - JULIANDAY(birth_date)) / 365.25 AS INTEGER) AS age
FROM employees
ORDER BY hire_date
''')
print("Employee tenure and age:")
for r in rows:
print(f" {r[0]}: hired {r[1]}, {r[7]} days ({r[8]} yrs), age {r[9]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
import datetime
run('''CREATE TABLE events (id INTEGER, ts TEXT, event TEXT, value REAL)''')
import random; random.seed(42)
events_data = []
for i in range(30):
d = datetime.date(2024, 1, 1) + datetime.timedelta(days=i)
for _ in range(random.randint(1, 5)):
events_data.append((i*10+random.randint(1,9),
d.strftime('%Y-%m-%d'),
random.choice(['view','click','purchase']),
round(random.uniform(1, 100), 2)))
runmany('INSERT INTO events VALUES (?,?,?,?)', events_data)
# Group by week
rows = run('''
SELECT
STRFTIME('%Y-W%W', ts) AS week,
COUNT(*) AS events,
SUM(CASE WHEN event="purchase" THEN 1 ELSE 0 END) AS purchases,
ROUND(SUM(value), 2) AS total_value
FROM events
GROUP BY week
ORDER BY week
''')
print("Weekly breakdown:")
for r in rows: print(f" {r[0]}: {r[1]} events, {r[2]} purchases, ${r[3]}")
# Days with no purchases
rows = run('''
SELECT ts, COUNT(*) AS events
FROM events
WHERE event != "purchase"
AND ts NOT IN (SELECT DISTINCT ts FROM events WHERE event = "purchase")
GROUP BY ts
ORDER BY ts
LIMIT 3
''')
print("Days with no purchases (sample):", [(r[0],r[1]) for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE subscriptions (id INT, cust_id INT, start_date TEXT, end_date TEXT, plan TEXT)''')
runmany('INSERT INTO subscriptions VALUES (?,?,?,?,?)', [
(1,1,'2023-01-15','2024-01-15','annual'),
(2,2,'2023-03-01','2023-05-15','monthly'),
(3,3,'2023-06-01',None,'annual'),
(4,4,'2023-09-10','2023-10-05','monthly'),
(5,5,'2024-01-01',None,'monthly'),
(6,6,'2023-02-14','2023-11-30','annual'),
])
rows = run('''
SELECT
cust_id, plan, start_date,
COALESCE(end_date, DATE("now")) AS effective_end,
CAST((JULIANDAY(COALESCE(end_date, DATE("now"))) -
JULIANDAY(start_date)) AS INTEGER) AS tenure_days,
CASE WHEN end_date IS NOT NULL THEN "churned" ELSE "active" END AS status,
CASE WHEN end_date IS NOT NULL
AND JULIANDAY(end_date) - JULIANDAY(start_date) < 90
THEN "early_churn" ELSE "ok" END AS churn_flag
FROM subscriptions
ORDER BY start_date
''')
print("Subscription analysis:")
for r in rows:
print(f" Cust {r[0]} ({r[1]}): {r[4]} days, {r[5]}, {r[6]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE bookings (id INT, room TEXT, check_in TEXT, check_out TEXT, guest TEXT)''')
runmany('INSERT INTO bookings VALUES (?,?,?,?,?)', [
(1,'101','2024-05-28','2024-06-03','Alice'),
(2,'102','2024-06-05','2024-06-08','Bob'),
(3,'101','2024-06-10','2024-06-15','Carol'),
(4,'103','2024-06-01','2024-06-30','Dave'),
(5,'102','2024-07-01','2024-07-05','Alice'),
(6,'101','2024-08-01','2024-08-10','Bob'),
])
# 1. Overlapping bookings with date range
target_in, target_out = '2024-06-01', '2024-06-10'
print("=== Bookings overlapping Jun 1-10 ===")
rows = run(f'''
SELECT room, guest, check_in, check_out
FROM bookings
WHERE check_in < "{target_out}" AND check_out > "{target_in}"
ORDER BY check_in
''')
for r in rows: print(f" {r}")
# 2. Avg stay length by month
print("=== Avg stay by month ===")
rows = run('''
SELECT
STRFTIME("%Y-%m", check_in) AS month,
ROUND(AVG(JULIANDAY(check_out) - JULIANDAY(check_in)), 1) AS avg_nights,
COUNT(*) AS bookings
FROM bookings GROUP BY month ORDER BY month
''')
for r in rows: print(f" {r}")
# 3. TODO: rooms booked > 80% of Q2 days
# 4. TODO: return visits within 6 months
CASE WHEN is SQL's conditional expression. Use it for bucketing, pivoting, null handling, and complex conditional aggregations. Combined with COALESCE and NULLIF for robust null handling.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE students (id INTEGER, name TEXT, score INTEGER, grade TEXT)''')
runmany('INSERT INTO students VALUES (?,?,?,?)', [
(1,'Alice',95,None),(2,'Bob',72,None),(3,'Carol',88,None),
(4,'Dave',61,None),(5,'Eve',79,None),(6,'Frank',55,None),
])
rows = run('''
SELECT
name, score,
-- Searched CASE (most flexible)
CASE
WHEN score >= 90 THEN "A"
WHEN score >= 80 THEN "B"
WHEN score >= 70 THEN "C"
WHEN score >= 60 THEN "D"
ELSE "F"
END AS letter_grade,
-- Simple CASE (like switch)
CASE ROUND(score/10)*10
WHEN 100 THEN "Perfect"
WHEN 90 THEN "Excellent"
WHEN 80 THEN "Good"
WHEN 70 THEN "Pass"
ELSE "Fail"
END AS band,
-- Boolean expression
CASE WHEN score >= 70 THEN 1 ELSE 0 END AS passed
FROM students
ORDER BY score DESC
''')
for r in rows:
print(f" {r[0]}: {r[1]} -> {r[2]} ({r[3]}) passed={r[4]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE orders (id INTEGER, status TEXT, region TEXT, total REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
(1,'completed','North',500),(2,'cancelled','South',300),(3,'completed','North',800),
(4,'pending','East',200),(5,'completed','South',650),(6,'cancelled','North',400),
(7,'completed','East',1200),(8,'pending','South',350),(9,'completed','East',900),
])
rows = run('''
SELECT
region,
COUNT(*) AS total_orders,
SUM(CASE WHEN status = "completed" THEN 1 ELSE 0 END) AS completed,
SUM(CASE WHEN status = "cancelled" THEN 1 ELSE 0 END) AS cancelled,
SUM(CASE WHEN status = "pending" THEN 1 ELSE 0 END) AS pending,
ROUND(100.0 * SUM(CASE WHEN status="completed" THEN 1 ELSE 0 END) / COUNT(*), 1) AS completion_pct,
ROUND(SUM(CASE WHEN status="completed" THEN total ELSE 0 END), 0) AS completed_revenue,
ROUND(AVG(CASE WHEN status="completed" THEN total ELSE NULL END), 0) AS avg_completed
FROM orders
GROUP BY region
ORDER BY completed_revenue DESC
''')
print("Order analysis by region:")
for r in rows:
print(f" {r[0]}: {r[1]} orders, {r[2]} completed ({r[5]}%), revenue=${r[6]}, avg=${r[7]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE metrics (id INTEGER, name TEXT, q1 REAL, q2 REAL, q3 REAL, q4 REAL)''')
runmany('INSERT INTO metrics VALUES (?,?,?,?,?,?)', [
(1,'Revenue',1000,None,1200,None),
(2,'Costs',600,700,None,650),
(3,'Users',500,520,None,None),
])
rows = run('''
SELECT
name,
-- COALESCE: return first non-NULL value
COALESCE(q1, 0) AS q1_safe,
COALESCE(q2, q1, 0) AS q2_or_q1, -- fill q2 from q1
COALESCE(q3, q2, q1, 0) AS q3_fill,
-- NULLIF: return NULL if two values are equal
NULLIF(q1, 0) AS q1_null_if_zero,
-- IIF (SQLite 3.32+): shorthand for simple CASE WHEN
IIF(q4 IS NULL, "missing", "present") AS q4_status,
-- Compute quarter growth (handles NULLs gracefully)
CASE WHEN q1 IS NOT NULL AND q2 IS NOT NULL
THEN ROUND((q2 - q1) / q1 * 100, 1)
ELSE NULL END AS q1_q2_growth_pct
FROM metrics
''')
print("Metric analysis:")
for r in rows:
print(f" {r[0]}: q1_safe={r[1]}, q2_or_q1={r[2]}, q3_fill={r[3]}, q4={r[5]}, growth={r[6]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE purchases (id INT, cust_id INT, amount REAL, purchase_date TEXT)''')
runmany('INSERT INTO purchases VALUES (?,?,?,?)', [
(1,1,150,'2024-01-10'),(2,1,200,'2024-02-15'),(3,1,180,'2024-03-01'),
(4,2,500,'2024-03-20'),(5,2,300,'2024-03-25'),
(6,3,50,'2023-12-01'),(7,4,1000,'2024-03-28'),(8,4,800,'2024-03-29'),
(9,4,600,'2024-03-30'),
])
rows = run('''
WITH rfm AS (
SELECT
cust_id,
CAST(JULIANDAY("2024-04-01") - JULIANDAY(MAX(purchase_date)) AS INT) AS recency_days,
COUNT(*) AS frequency,
ROUND(SUM(amount), 0) AS monetary
FROM purchases GROUP BY cust_id
)
SELECT
cust_id, recency_days, frequency, monetary,
CASE
WHEN recency_days <= 7 AND frequency >= 3 AND monetary >= 500 THEN "Platinum"
WHEN recency_days <= 30 AND frequency >= 2 THEN "Gold"
WHEN recency_days <= 60 THEN "Silver"
ELSE "Bronze"
END AS tier
FROM rfm ORDER BY monetary DESC
''')
print("Customer RFM tiers:")
for r in rows:
print(f" Cust {r[0]}: recency={r[1]}d, freq={r[2]}, monetary=${r[3]} -> {r[4]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE test_results (student_id INT, subject TEXT, score INT)''')
runmany('INSERT INTO test_results VALUES (?,?,?)', [
(1,'Math',85),(1,'English',72),(1,'Science',90),
(2,'Math',60),(2,'English',88),(2,'Science',55),
(3,'Math',95),(3,'English',91),(3,'Science',87),
(4,'Math',70),(4,'English',65),(4,'Science',80),
])
# 1. Pivot: count of each grade per subject
print("=== Grade Distribution by Subject ===")
rows = run('''
SELECT subject,
SUM(CASE WHEN score >= 90 THEN 1 ELSE 0 END) AS A_count,
SUM(CASE WHEN score >= 80 AND score < 90 THEN 1 ELSE 0 END) AS B_count,
SUM(CASE WHEN score >= 70 AND score < 80 THEN 1 ELSE 0 END) AS C_count,
SUM(CASE WHEN score < 70 THEN 1 ELSE 0 END) AS D_or_F_count
FROM test_results GROUP BY subject
''')
for r in rows: print(f" {r}")
# 2. TODO: letter grade + needs_review flag
# 3. TODO: weakest subject per student
# 4. TODO: students passing all subjects
SQL set operations combine results of multiple SELECT statements: UNION (all unique), UNION ALL (keep duplicates), INTERSECT (common rows), and EXCEPT/MINUS (rows in first but not second).
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE customers_2023 (id INTEGER, name TEXT, email TEXT)''')
run('''CREATE TABLE customers_2024 (id INTEGER, name TEXT, email TEXT)''')
runmany('INSERT INTO customers_2023 VALUES (?,?,?)', [
(1,'Alice','alice@mail.com'),(2,'Bob','bob@mail.com'),
(3,'Carol','carol@mail.com'),(4,'Dave','dave@mail.com'),
])
runmany('INSERT INTO customers_2024 VALUES (?,?,?)', [
(3,'Carol','carol@mail.com'),(4,'Dave','dave@mail.com'),
(5,'Eve','eve@mail.com'),(6,'Frank','frank@mail.com'),
])
# UNION: unique rows only (deduplicates)
rows = run('''
SELECT "2023" AS year, name FROM customers_2023
UNION
SELECT "2024" AS year, name FROM customers_2024
ORDER BY name
''')
print(f"UNION (unique names): {len(rows)} rows")
for r in rows: print(f" {r}")
# UNION ALL: keeps all rows including duplicates
rows = run('''
SELECT "2023" AS cohort, name FROM customers_2023
UNION ALL
SELECT "2024" AS cohort, name FROM customers_2024
ORDER BY name
''')
print(f"UNION ALL (all rows): {len(rows)} rows")
# Useful: combine logs from two tables
rows = run('''
SELECT "new_customer" AS event, name, "2023" AS yr FROM customers_2023
UNION ALL
SELECT "new_customer", name, "2024" FROM customers_2024
ORDER BY yr, name
''')
print("All customer events:", len(rows))import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE eligible_2023 (cust_id INTEGER)''')
run('''CREATE TABLE eligible_2024 (cust_id INTEGER)''')
run('''CREATE TABLE opted_out (cust_id INTEGER)''')
runmany('INSERT INTO eligible_2023 VALUES (?)', [(i,) for i in [1,2,3,4,5,6]])
runmany('INSERT INTO eligible_2024 VALUES (?)', [(i,) for i in [3,4,5,6,7,8]])
runmany('INSERT INTO opted_out VALUES (?)', [(i,) for i in [2,5,8]])
# INTERSECT: rows that appear in BOTH queries
rows = run('''
SELECT cust_id FROM eligible_2023
INTERSECT
SELECT cust_id FROM eligible_2024
ORDER BY cust_id
''')
print("Eligible BOTH years:", [r[0] for r in rows])
# EXCEPT: rows in first but NOT in second
rows = run('''
SELECT cust_id FROM eligible_2023
EXCEPT
SELECT cust_id FROM eligible_2024
''')
print("Only eligible in 2023:", [r[0] for r in rows])
# Combine: eligible both years, excluding opted-out
rows = run('''
SELECT cust_id FROM eligible_2023
INTERSECT
SELECT cust_id FROM eligible_2024
EXCEPT
SELECT cust_id FROM opted_out
ORDER BY cust_id
''')
print("Eligible both years AND not opted out:", [r[0] for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE source_records (id INTEGER, value REAL, ts TEXT)''')
run('''CREATE TABLE target_records (id INTEGER, value REAL, ts TEXT)''')
runmany('INSERT INTO source_records VALUES (?,?,?)', [
(1,100.0,'2024-01-01'),(2,200.0,'2024-01-02'),(3,300.0,'2024-01-03'),
(4,400.0,'2024-01-04'),(5,500.0,'2024-01-05'),
])
runmany('INSERT INTO target_records VALUES (?,?,?)', [
(1,100.0,'2024-01-01'),(2,201.0,'2024-01-02'), # id=2 has wrong value
(3,300.0,'2024-01-03'), # id=4,5 missing
(6,600.0,'2024-01-06'), # id=6 is extra
])
# Records in source but not in target (missing)
missing = run('''
SELECT id, value, "missing_from_target" AS issue FROM source_records
EXCEPT
SELECT id, value, "missing_from_target" FROM target_records
''')
print("Issues in reconciliation:")
for r in missing: print(f" Source id={r[0]}, value={r[1]} not in target")
# Records in target but not in source (extra)
extra = run('''
SELECT id, value FROM target_records
EXCEPT
SELECT id, value FROM source_records
''')
for r in extra: print(f" Target id={r[0]}, value={r[1]} not in source")
# Exact matches
matched = run('''
SELECT COUNT(*) FROM (
SELECT id, value FROM source_records
INTERSECT
SELECT id, value FROM target_records
)
''')
print(f"Exactly matched records: {matched[0][0]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE source_customers (id INT, name TEXT, email TEXT, tier TEXT)''')
run('''CREATE TABLE migrated_customers (id INT, name TEXT, email TEXT, tier TEXT)''')
runmany('INSERT INTO source_customers VALUES (?,?,?,?)', [
(1,'Alice','alice@a.com','gold'),(2,'Bob','bob@b.com','silver'),
(3,'Carol','carol@c.com','bronze'),(4,'Dave','dave@d.com','gold'),
(5,'Eve','eve@e.com','silver'),
])
runmany('INSERT INTO migrated_customers VALUES (?,?,?,?)', [
(1,'Alice','alice@a.com','gold'),(2,'Bob','bob_new@b.com','silver'),
(3,'Carol','carol@c.com','bronze'),(6,'Frank','frank@f.com','bronze'),
])
fully_matched = run('''
SELECT COUNT(*) FROM (
SELECT id,name,email,tier FROM source_customers
INTERSECT
SELECT id,name,email,tier FROM migrated_customers)''')[0][0]
missing_in_dest = run('''
SELECT id, name FROM source_customers
EXCEPT SELECT id, name FROM migrated_customers''')
extra_in_dest = run('''
SELECT id, name FROM migrated_customers
EXCEPT SELECT id, name FROM source_customers''')
print(f"Source: {run('SELECT COUNT(*) FROM source_customers')[0][0]} records")
print(f"Migrated: {run('SELECT COUNT(*) FROM migrated_customers')[0][0]} records")
print(f"Fully matched: {fully_matched}")
print(f"Missing from dest: {[(r[0],r[1]) for r in missing_in_dest]}")
print(f"Extra in dest: {[(r[0],r[1]) for r in extra_in_dest]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE active_jan (user_id INT)''')
run('''CREATE TABLE active_feb (user_id INT)''')
run('''CREATE TABLE active_mar (user_id INT)''')
runmany('INSERT INTO active_jan VALUES (?)', [(i,) for i in [1,2,3,4,5,6]])
runmany('INSERT INTO active_feb VALUES (?)', [(i,) for i in [2,3,4,5,7,8]])
runmany('INSERT INTO active_mar VALUES (?)', [(i,) for i in [3,4,5,6,8,9]])
# 1. All 3 months
rows = run('''
SELECT user_id FROM active_jan
INTERSECT SELECT user_id FROM active_feb
INTERSECT SELECT user_id FROM active_mar
''')
print("Active all 3 months:", [r[0] for r in rows])
# 2. Jan but not Feb
rows = run('SELECT user_id FROM active_jan EXCEPT SELECT user_id FROM active_feb')
print("Jan but not Feb:", [r[0] for r in rows])
# 3. All unique
rows = run('''
SELECT user_id FROM active_jan UNION
SELECT user_id FROM active_feb UNION
SELECT user_id FROM active_mar ORDER BY user_id
''')
print("All unique users:", [r[0] for r in rows])
# 4. TODO: Exactly 2 of 3 months
CTEs (WITH clauses) can be chained, referenced multiple times, and nested to build complex queries step by step. They replace temp tables in most cases and dramatically improve readability.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE transactions (id INT, cust_id INT, amount REAL, category TEXT, ts TEXT)''')
runmany('INSERT INTO transactions VALUES (?,?,?,?,?)', [
(1,1,150,'food','2024-01-05'),(2,1,300,'tech','2024-01-10'),
(3,2,200,'food','2024-01-08'),(4,2,100,'food','2024-01-12'),
(5,3,500,'tech','2024-01-03'),(6,3,250,'clothing','2024-01-15'),
(7,1,80,'food','2024-02-01'),(8,2,600,'tech','2024-02-05'),
(9,3,120,'food','2024-02-10'),
])
rows = run('''
WITH
-- CTE 1: customer totals
cust_totals AS (
SELECT cust_id, SUM(amount) AS total_spend,
COUNT(*) AS n_transactions, MAX(ts) AS last_purchase
FROM transactions GROUP BY cust_id
),
-- CTE 2: top category per customer
top_cat AS (
SELECT cust_id,
category,
SUM(amount) AS cat_total,
ROW_NUMBER() OVER (PARTITION BY cust_id ORDER BY SUM(amount) DESC) AS rn
FROM transactions GROUP BY cust_id, category
),
-- CTE 3: combine using previous CTEs
summary AS (
SELECT ct.cust_id, ct.total_spend, ct.n_transactions, ct.last_purchase,
tc.category AS top_category
FROM cust_totals ct
JOIN top_cat tc ON ct.cust_id = tc.cust_id AND tc.rn = 1
)
SELECT *, ROUND(total_spend / n_transactions, 2) AS avg_txn
FROM summary ORDER BY total_spend DESC
''')
print("Customer summary:")
for r in rows:
print(f" Cust {r[0]}: ${r[1]} total, {r[2]} txns, top={r[4]}, avg=${r[5]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE org (id INT, name TEXT, manager_id INT, level INT)''')
runmany('INSERT INTO org VALUES (?,?,?,?)', [
(1,'CEO',None,1),(2,'CTO',1,2),(3,'CFO',1,2),(4,'VP Eng',2,3),
(5,'VP Data',2,3),(6,'Dir Finance',3,3),(7,'Sr Dev',4,4),
(8,'Data Eng',5,4),(9,'Analyst',5,4),(10,'Accountant',6,4),
])
# Recursive CTE: traverse the org hierarchy
rows = run('''
WITH RECURSIVE hierarchy AS (
-- Base case: start from CEO (no manager)
SELECT id, name, manager_id, 0 AS depth, name AS path
FROM org WHERE manager_id IS NULL
UNION ALL
-- Recursive case: join to find reports
SELECT o.id, o.name, o.manager_id,
h.depth + 1,
h.path || " -> " || o.name
FROM org o
JOIN hierarchy h ON o.manager_id = h.id
)
SELECT depth, name, path FROM hierarchy ORDER BY path
''')
for r in rows:
indent = " " * r[0]
print(f"{indent}{r[1]}")
# Count reports under each manager
rows = run('''
WITH RECURSIVE reports AS (
SELECT id, manager_id FROM org
UNION ALL
SELECT r.id, o.manager_id FROM reports r JOIN org o ON o.id = r.manager_id
WHERE o.manager_id IS NOT NULL
)
SELECT o.name, COUNT(*) AS total_reports
FROM org o JOIN reports r ON r.manager_id = o.id
GROUP BY o.id ORDER BY total_reports DESC LIMIT 5
''')
print("Managers by total reports:", [(r[0], r[1]) for r in rows])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE raw_sales (rep TEXT, product TEXT, amount REAL, region TEXT, sale_date TEXT)''')
import random; random.seed(42)
reps = ['Alice','Bob','Carol','Dave','Eve']
prods = ['Widget','Gadget','Gizmo']
regions = ['North','South','East']
rows_data = [(random.choice(reps), random.choice(prods),
round(random.uniform(100,2000), 2),
random.choice(regions),
f"2024-0{random.randint(1,3)}-{random.randint(1,28):02d}")
for _ in range(50)]
runmany('INSERT INTO raw_sales VALUES (?,?,?,?,?)', rows_data)
rows = run('''
WITH
-- Step 1: normalize and add month
cleaned AS (
SELECT rep, product, region, amount,
STRFTIME("%Y-%m", sale_date) AS month
FROM raw_sales WHERE amount > 0
),
-- Step 2: rep-level monthly totals
rep_monthly AS (
SELECT rep, month, SUM(amount) AS monthly_total,
COUNT(*) AS n_sales
FROM cleaned GROUP BY rep, month
),
-- Step 3: rank reps per month
ranked AS (
SELECT *, RANK() OVER (PARTITION BY month ORDER BY monthly_total DESC) AS rank
FROM rep_monthly
)
-- Step 4: show top-2 per month
SELECT month, rank, rep, ROUND(monthly_total, 0) AS total, n_sales
FROM ranked WHERE rank <= 2
ORDER BY month, rank
''')
print("Top 2 reps per month:")
for r in rows: print(f" {r[0]} #{r[1]}: {r[2]} ${r[3]:,.0f} ({r[4]} sales)")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE funnel_events (user_id INT, event TEXT, ts TEXT)''')
import random; random.seed(42)
events_raw = []
for uid in range(1, 201):
events_raw.append((uid,'impression','2024-01'))
if random.random() < 0.6: events_raw.append((uid,'click','2024-01'))
if random.random() < 0.4: events_raw.append((uid,'add_to_cart','2024-01'))
if random.random() < 0.35: events_raw.append((uid,'purchase','2024-01'))
runmany('INSERT INTO funnel_events VALUES (?,?,?)', events_raw)
rows = run('''
WITH
stage_counts AS (
SELECT event,
COUNT(DISTINCT user_id) AS users
FROM funnel_events
GROUP BY event
),
ordered AS (
SELECT event, users,
CASE event
WHEN "impression" THEN 1
WHEN "click" THEN 2
WHEN "add_to_cart" THEN 3
WHEN "purchase" THEN 4
END AS stage_order
FROM stage_counts
),
with_prev AS (
SELECT event, users, stage_order,
LAG(users) OVER (ORDER BY stage_order) AS prev_users
FROM ordered
)
SELECT event, users,
CASE WHEN prev_users IS NOT NULL
THEN ROUND(100.0 * users / prev_users, 1)
ELSE 100.0 END AS step_pct,
ROUND(100.0 * users / MAX(users) OVER (), 1) AS overall_pct
FROM with_prev ORDER BY stage_order
''')
print("Conversion funnel:")
for r in rows:
bar = "#" * int(r[3] / 5)
print(f" {r[0]:15s}: {r[1]:>4d} users | step={r[2]:>5.1f}% | {bar}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE monthly_orders (product TEXT, month TEXT, revenue REAL)''')
runmany('INSERT INTO monthly_orders VALUES (?,?,?)', [
('Widget','2024-01',1000),('Widget','2024-02',950),('Widget','2024-03',700),
('Widget','2024-04',980),('Gadget','2024-01',500),('Gadget','2024-02',520),
('Gadget','2024-03',480),('Gadget','2024-04',200),
])
rows = run('''
WITH
-- CTE2: 3-month moving average using LAG
with_lags AS (
SELECT product, month, revenue,
LAG(revenue,1) OVER (PARTITION BY product ORDER BY month) AS prev1,
LAG(revenue,2) OVER (PARTITION BY product ORDER BY month) AS prev2
FROM monthly_orders
),
moving_avg AS (
SELECT product, month, revenue,
ROUND((revenue + COALESCE(prev1,revenue) + COALESCE(prev2,revenue)) / 3.0, 1) AS ma3
FROM with_lags
),
-- CTE3: flag months with >20% drop vs moving avg
flagged AS (
SELECT product, month, revenue, ma3,
ROUND((revenue - ma3) / ma3 * 100, 1) AS drop_pct
FROM moving_avg
WHERE revenue < ma3 * 0.8 -- more than 20% below MA
)
SELECT * FROM flagged ORDER BY drop_pct
''')
print("Revenue alerts (>20% below 3-month MA):")
for r in rows: print(f" {r[0]} {r[1]}: ${r[2]} vs MA ${r[3]} ({r[4]}%)")
Views are saved SELECT statements that behave like tables. They simplify complex queries, enforce access control, and create stable interfaces over evolving schema.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE employees (id INT, name TEXT, dept TEXT, salary REAL, hire_date TEXT)''')
run('''CREATE TABLE departments (id INT, name TEXT, budget REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?,?)', [
(1,'Alice','Eng',90000,'2020-03-15'),(2,'Bob','Eng',85000,'2021-06-01'),
(3,'Carol','Sales',70000,'2019-11-20'),(4,'Dave','Sales',72000,'2022-01-10'),
(5,'Eve','HR',65000,'2020-08-05'),(6,'Frank','Eng',95000,'2018-04-22'),
])
runmany('INSERT INTO departments VALUES (?,?,?)', [
(1,'Eng',500000),(2,'Sales',300000),(3,'HR',150000),
])
# Create a view
run('''
CREATE VIEW emp_summary AS
SELECT e.id, e.name, e.dept, e.salary,
ROUND(100.0 * e.salary / d.budget, 2) AS salary_pct_budget,
CAST((JULIANDAY("now") - JULIANDAY(e.hire_date)) / 365.25 AS INT) AS years
FROM employees e
JOIN departments d ON e.dept = d.name
''')
# Query the view like a table
rows = run('SELECT * FROM emp_summary ORDER BY salary DESC')
print("Employee summary (from view):")
for r in rows: print(f" {r[1]} ({r[2]}): ${r[3]:,}, {r[4]:.2f}% of dept budget, {r[5]} yrs")
# View within query
rows = run('''
SELECT dept, COUNT(*) AS n, ROUND(AVG(salary), 0) AS avg_sal
FROM emp_summary
GROUP BY dept ORDER BY avg_sal DESC
''')
for r in rows: print(f" {r[0]}: {r[1]} employees, avg ${r[2]:,}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE products (id INT, name TEXT, price REAL, active INT DEFAULT 1)''')
runmany('INSERT INTO products VALUES (?,?,?,?)', [
(1,'Apple',1.2,1),(2,'Banana',0.5,1),(3,'Carrot',0.8,0),(4,'Date',2.0,1),
])
run('''CREATE VIEW active_products AS
SELECT id, name, price FROM products WHERE active = 1''')
rows = run('SELECT * FROM active_products')
print("Active products:", rows)
# In SQLite, simple views can be updated using INSTEAD OF triggers
# But directly: INSERT INTO simple single-table view may work
run("INSERT INTO active_products VALUES (5, 'Elderberry', 3.5)")
rows = run('SELECT id, name, price, active FROM products WHERE id = 5')
print("Inserted via view:", rows) # active defaults to 1
# DROP VIEW
run('DROP VIEW active_products')
print("View dropped. Tables unaffected.")
rows = run('SELECT COUNT(*) FROM products')
print("Products table still has:", rows[0][0], "rows")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
# SQLite doesn't have native materialized views
# Simulate with CREATE TABLE AS SELECT
run('''CREATE TABLE employees (id INT, dept TEXT, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?)', [
(i, ['Eng','Sales','HR'][i%3], 60000 + i*5000) for i in range(12)
])
# Simulated materialized view (refresh manually)
run('''DROP TABLE IF EXISTS dept_stats_mv''')
run('''
CREATE TABLE dept_stats_mv AS
SELECT dept,
COUNT(*) AS headcount,
ROUND(AVG(salary), 0) AS avg_salary,
MAX(salary) AS max_salary
FROM employees GROUP BY dept
''')
rows = run('SELECT * FROM dept_stats_mv ORDER BY avg_salary DESC')
print("Materialized dept stats:")
for r in rows: print(f" {r[0]}: {r[1]} people, avg ${r[2]:,}, max ${r[3]:,}")
# Refresh: insert more data, recreate
run('INSERT INTO employees VALUES (99, "Eng", 200000)')
run('DROP TABLE dept_stats_mv')
run('''
CREATE TABLE dept_stats_mv AS
SELECT dept, COUNT(*) AS headcount, ROUND(AVG(salary),0) AS avg_salary
FROM employees GROUP BY dept
''')
rows = run('SELECT * FROM dept_stats_mv ORDER BY dept')
print("Refreshed stats:", rows)import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE orders (id INT, cust_id INT, product_id INT, amount REAL, order_date TEXT)''')
run('''CREATE TABLE customers (id INT, name TEXT, tier TEXT, region TEXT)''')
run('''CREATE TABLE products (id INT, name TEXT, category TEXT, cost REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?,?)', [
(1,1,1,500,'2024-01-10'),(2,1,2,300,'2024-01-15'),(3,2,1,800,'2024-02-05'),
(4,2,3,200,'2024-02-20'),(5,3,2,600,'2024-03-01'),(6,1,3,400,'2024-03-10'),
])
runmany('INSERT INTO customers VALUES (?,?,?,?)', [
(1,'Alice','gold','North'),(2,'Bob','silver','South'),(3,'Carol','bronze','East'),
])
runmany('INSERT INTO products VALUES (?,?,?,?)', [
(1,'Widget','Hardware',200),(2,'Gadget','Electronics',100),(3,'Gizmo','Software',50),
])
run('''CREATE VIEW order_detail AS
SELECT o.id AS order_id, c.name AS customer, c.tier, c.region,
p.name AS product, p.category,
o.amount, p.cost,
ROUND(o.amount - p.cost, 2) AS margin,
ROUND((o.amount - p.cost) / o.amount * 100, 1) AS margin_pct,
o.order_date
FROM orders o
JOIN customers c ON c.id = o.cust_id
JOIN products p ON p.id = o.product_id
''')
rows = run('SELECT customer, product, amount, margin_pct FROM order_detail ORDER BY margin_pct DESC')
print("Order detail (from view):")
for r in rows: print(f" {r[0]} - {r[1]}: ${r[2]} ({r[3]}% margin)")
rows = run('SELECT category, ROUND(AVG(margin_pct),1) AS avg_margin FROM order_detail GROUP BY category')
print("Avg margin by category:", rows)import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE sales (id INT, rep TEXT, product TEXT, amount REAL, region TEXT, sale_date TEXT)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?,?)', [
(1,'Alice','Widget',500,'North','2024-01-10'),
(2,'Alice','Gadget',300,'North','2024-01-15'),
(3,'Bob','Widget',800,'South','2024-02-01'),
(4,'Bob','Gizmo',200,'South','2024-02-10'),
(5,'Carol','Gadget',600,'East','2024-02-20'),
(6,'Alice','Gizmo',400,'North','2024-03-01'),
(7,'Carol','Widget',700,'East','2024-03-10'),
(8,'Bob','Gadget',350,'South','2024-03-15'),
])
# 1. Rep dashboard view
run('''CREATE VIEW rep_dashboard AS
SELECT rep, SUM(amount) AS total, COUNT(*) AS n_sales,
ROUND(AVG(amount),0) AS avg_sale,
RANK() OVER (ORDER BY SUM(amount) DESC) AS rank
FROM sales GROUP BY rep
''')
print("=== Rep Dashboard ===")
for r in run('SELECT * FROM rep_dashboard ORDER BY rank'): print(f" {r}")
# 2. Regional summary view
run('''CREATE VIEW regional_summary AS
SELECT region, ROUND(SUM(amount),0) AS total,
ROUND(100.0*SUM(amount)/SUM(SUM(amount)) OVER (), 1) AS pct_of_total
FROM sales GROUP BY region
''')
print("=== Regional Summary ===")
for r in run('SELECT * FROM regional_summary ORDER BY total DESC'): print(f" {r}")
# 3. TODO: top products view
Constraints (PRIMARY KEY, UNIQUE, NOT NULL, CHECK, FOREIGN KEY) enforce data quality at the database level β the last line of defense against bad data.
import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
price REAL NOT NULL CHECK (price > 0),
stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
category TEXT CHECK (category IN ("food", "tech", "clothing"))
)''')
# Valid insert
run("INSERT INTO products (sku, name, price, stock, category) VALUES ('SKU-001', 'Apple', 1.5, 100, 'food')")
run("INSERT INTO products (sku, name, price, stock, category) VALUES ('SKU-002', 'Phone', 699.0, 50, 'tech')")
print("Inserted valid products:", run('SELECT id, sku, name, price FROM products'))
# Test constraint violations
import sqlite3 as _sqlite3
tests = [
("Duplicate SKU", "INSERT INTO products (sku, name, price) VALUES ('SKU-001', 'Pear', 0.9)"),
("Negative price", "INSERT INTO products (sku, name, price) VALUES ('SKU-003', 'X', -5.0)"),
("Negative stock", "INSERT INTO products (sku, name, price, stock) VALUES ('SKU-004', 'Y', 1.0, -10)"),
("Invalid category", "INSERT INTO products (sku, name, price, category) VALUES ('SKU-005', 'Z', 2.0, 'toys')"),
("NULL name", "INSERT INTO products (sku, price) VALUES ('SKU-006', 3.0)"),
]
for label, sql in tests:
try:
run(sql)
print(f" FAIL: {label} should have been rejected!")
except Exception as e:
print(f" OK: {label} rejected -> {str(e)[:50]}")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE categories (id INT PRIMARY KEY, name TEXT NOT NULL UNIQUE)''')
run('''CREATE TABLE products (
id INT PRIMARY KEY,
name TEXT NOT NULL,
cat_id INT NOT NULL REFERENCES categories(id) ON DELETE RESTRICT ON UPDATE CASCADE
)''')
run('''CREATE TABLE orders (
id INT PRIMARY KEY,
product_id INT NOT NULL REFERENCES products(id) ON DELETE CASCADE
)''')
runmany('INSERT INTO categories VALUES (?,?)', [(1,'Food'),(2,'Tech'),(3,'Clothing')])
runmany('INSERT INTO products VALUES (?,?,?)', [(1,'Apple',1),(2,'Phone',2),(3,'Shirt',3)])
runmany('INSERT INTO orders VALUES (?,?)', [(1,1),(2,1),(3,2)])
# Violate FK
import sqlite3 as _sqlite3
try:
run('INSERT INTO products VALUES (4, "Widget", 99)') # cat_id=99 doesn't exist
print("FAIL: should have rejected")
except Exception as e:
print("OK: FK rejected:", str(e)[:50])
# ON DELETE CASCADE: delete product -> orders cascade deleted
print("Orders before:", run('SELECT COUNT(*) FROM orders')[0][0])
run('DELETE FROM products WHERE id = 1')
print("Orders after deleting product 1:", run('SELECT COUNT(*) FROM orders')[0][0])
# ON DELETE RESTRICT: cannot delete category if products reference it
try:
run('DELETE FROM categories WHERE id = 2')
print("FAIL: should have rejected")
except Exception as e:
print("OK: RESTRICT works:", str(e)[:50])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
run('''CREATE TABLE inventory (
sku TEXT PRIMARY KEY,
name TEXT NOT NULL,
stock INT NOT NULL DEFAULT 0 CHECK (stock >= 0)
)''')
runmany('INSERT INTO inventory VALUES (?,?,?)', [
('A001','Apple',100),('B002','Banana',200),
])
# ON CONFLICT IGNORE: skip duplicate inserts silently
run("INSERT OR IGNORE INTO inventory VALUES ('A001', 'Apple Updated', 150)")
print("After OR IGNORE:", run("SELECT stock FROM inventory WHERE sku='A001'")[0][0]) # still 100
# ON CONFLICT REPLACE: delete + re-insert
run("INSERT OR REPLACE INTO inventory VALUES ('A001', 'Apple Premium', 120)")
print("After OR REPLACE:", run("SELECT name, stock FROM inventory WHERE sku='A001'")[0])
# UPSERT (INSERT ... ON CONFLICT DO UPDATE) - SQLite 3.24+
run('''
INSERT INTO inventory (sku, name, stock) VALUES ("C003", "Cherry", 50)
ON CONFLICT(sku) DO UPDATE SET
stock = stock + excluded.stock,
name = excluded.name
''')
run('''
INSERT INTO inventory (sku, name, stock) VALUES ("C003", "Cherry Deluxe", 30)
ON CONFLICT(sku) DO UPDATE SET
stock = stock + excluded.stock,
name = excluded.name
''')
print("After UPSERT x2:", run("SELECT name, stock FROM inventory WHERE sku='C003'")[0])import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE warehouses (id INT PRIMARY KEY, location TEXT)''')
run('''CREATE TABLE products (id INT PRIMARY KEY, sku TEXT UNIQUE NOT NULL,
name TEXT NOT NULL, price REAL CHECK (price > 0))''')
run('''CREATE TABLE stock_levels (
product_id INT REFERENCES products(id) ON DELETE CASCADE,
warehouse_id INT REFERENCES warehouses(id),
qty INT NOT NULL DEFAULT 0 CHECK (qty >= 0),
PRIMARY KEY (product_id, warehouse_id)
)''')
runmany('INSERT INTO warehouses VALUES (?,?)', [(1,'NYC'),(2,'LA'),(3,'Chicago')])
runmany('INSERT INTO products VALUES (?,?,?,?)',
[(1,'SKU-A','Widget',29.99),(2,'SKU-B','Gadget',49.99)])
runmany('INSERT INTO stock_levels VALUES (?,?,?)',
[(1,1,500),(1,2,300),(2,1,200),(2,3,150)])
# UPSERT stock: add received quantity
def receive_stock(product_id, warehouse_id, qty):
run(f'''
INSERT INTO stock_levels (product_id, warehouse_id, qty)
VALUES ({product_id}, {warehouse_id}, {qty})
ON CONFLICT (product_id, warehouse_id) DO UPDATE SET qty = qty + {qty}
''')
receive_stock(1, 1, 100)
receive_stock(2, 2, 50) # new location
rows = run('''
SELECT p.name, w.location, sl.qty
FROM stock_levels sl
JOIN products p ON p.id = sl.product_id
JOIN warehouses w ON w.id = sl.warehouse_id
ORDER BY p.name, sl.qty DESC
''')
print("Current stock:")
for r in rows: print(f" {r[0]} @ {r[1]}: {r[2]} units")import sqlite3, contextlib
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA journal_mode=WAL')
def run(sql, params=()):
with contextlib.closing(conn.cursor()) as cur:
cur.execute(sql, params)
if cur.description:
return cur.fetchall()
conn.commit()
return cur.rowcount
def runmany(sql, rows):
with contextlib.closing(conn.cursor()) as cur:
cur.executemany(sql, rows)
conn.commit()
conn.execute('PRAGMA foreign_keys = ON')
run('''CREATE TABLE accounts (
id INT PRIMARY KEY,
holder TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0 CHECK (balance >= 0),
account_type TEXT CHECK (account_type IN ("checking", "savings")),
opened_date TEXT DEFAULT (DATE("now"))
)''')
run('''CREATE TABLE txns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INT NOT NULL REFERENCES accounts(id),
amount REAL NOT NULL CHECK (amount > 0),
type TEXT NOT NULL CHECK (type IN ("debit","credit")),
ts TEXT DEFAULT (DATETIME("now"))
)''')
run("INSERT INTO accounts VALUES (1,'Alice',1000,'checking','2024-01-01')")
# 1. Try to go negative via debit > balance
import sqlite3 as _sqlite3
try:
run("INSERT INTO txns (account_id, amount, type) VALUES (1, 5000, 'debit')")
# Without trigger, balance isn't auto-updated. Add trigger below.
except Exception as e:
print("Error:", e)
# 2. TODO: CREATE TRIGGER after_txn AFTER INSERT ON txns
# that updates accounts.balance = balance - amount WHERE type='debit'
# or balance + amount WHERE type='credit'
# 3. TODO: UPSERT to create or update an account balance
print("Current balance:", run("SELECT balance FROM accounts WHERE id=1")[0][0])
Use FILTER clauses for conditional aggregation, emulate ROLLUP with UNION ALL, and combine window functions for multi-dimensional analysis.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
region TEXT, product TEXT,
amount REAL, q INTEGER)''')
runmany('INSERT INTO sales VALUES (?,?,?,?,?)', [
(1,'North','A',100,1),(2,'North','B',200,1),
(3,'South','A',150,2),(4,'South','B',250,2),
(5,'North','A',120,3),(6,'South','B',300,3)])
# FILTER clause β conditional aggregation
rows = run('''
SELECT
region,
SUM(amount) AS total,
SUM(amount) FILTER(WHERE q=1) AS q1,
SUM(amount) FILTER(WHERE q=2) AS q2
FROM sales GROUP BY region''')
print(rows)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales2 (
region TEXT, product TEXT, amount REAL)''')
runmany('INSERT INTO sales2 VALUES (?,?,?)', [
('North','A',100),('North','B',200),
('South','A',150),('South','B',250)])
# Emulate ROLLUP with UNION ALL
rows = run('''
SELECT region, product, SUM(amount)
FROM sales2 GROUP BY region, product
UNION ALL
SELECT region, NULL, SUM(amount)
FROM sales2 GROUP BY region
UNION ALL
SELECT NULL, NULL, SUM(amount) FROM sales2
ORDER BY 1,2''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS metrics (
dept TEXT, month INTEGER, revenue REAL)''')
runmany('INSERT INTO metrics VALUES (?,?,?)', [
('Eng',1,5000),('Eng',2,6000),('HR',1,2000),('HR',2,2500)])
# PERCENT_RANK and CUME_DIST
rows = run('''
SELECT dept, month, revenue,
RANK() OVER (ORDER BY revenue) AS rnk,
ROUND(100.0*RANK() OVER(ORDER BY revenue)/COUNT(*) OVER(),1) AS pct
FROM metrics ORDER BY revenue''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY, customer TEXT,
status TEXT, amount REAL)''')
runmany('INSERT INTO orders VALUES (?,?,?,?)', [
(1,'Alice','paid',300),(2,'Bob','pending',150),
(3,'Alice','paid',200),(4,'Bob','paid',400),(5,'Carol','refunded',100)])
# Multi-dimensional conditional aggregation
rows = run('''
SELECT customer,
COUNT(*) FILTER(WHERE status='paid') AS paid_cnt,
SUM(amount) FILTER(WHERE status='paid') AS paid_sum,
COUNT(*) FILTER(WHERE status='pending') AS pend_cnt
FROM orders GROUP BY customer ORDER BY customer''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS rev (
region TEXT, q INTEGER, amount REAL)''')
runmany('INSERT INTO rev VALUES (?,?,?)', [
('North',1,500),('North',2,700),
('South',1,400),('South',2,600)])
rows = run('''
SELECT region, q,
SUM(amount) AS total,
SUM(amount) FILTER(WHERE q=1) AS q1_total
FROM rev GROUP BY region, q
UNION ALL
SELECT NULL, NULL, SUM(amount), NULL FROM rev''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY, type TEXT, amount REAL)''')
runmany('INSERT INTO transactions VALUES (?,?,?)', [
(1,'credit',500),(2,'debit',200),(3,'credit',300),(4,'debit',150)])
# Write your FILTER aggregation query here
Use self-joins to query hierarchical data, non-equi joins for range matching, and interval joins to detect overlaps.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY, name TEXT,
manager_id INTEGER, salary REAL)''')
runmany('INSERT INTO employees VALUES (?,?,?,?)', [
(1,'Alice',None,9000),(2,'Bob',1,7000),
(3,'Carol',1,7500),(4,'Dave',2,5000),(5,'Eve',2,5500)])
# Self-join: each employee with their manager's name
rows = run('''
SELECT e.name AS employee, m.name AS manager
FROM employees e
LEFT JOIN employees m ON e.manager_id = m.id
ORDER BY e.id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS employees2 (
id INTEGER PRIMARY KEY, name TEXT, salary REAL, dept TEXT)''')
runmany('INSERT INTO employees2 VALUES (?,?,?,?)', [
(1,'Alice',9000,'Eng'),(2,'Bob',7000,'Eng'),
(3,'Carol',7500,'HR'),(4,'Dave',5000,'HR')])
# Non-equi join: employees earning more than others in same dept
rows = run('''
SELECT a.name AS higher, b.name AS lower,
a.salary - b.salary AS diff
FROM employees2 a
JOIN employees2 b
ON a.dept = b.dept AND a.salary > b.salary
ORDER BY diff DESC''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS prices (
product TEXT, price REAL, valid_from TEXT, valid_to TEXT)''')
runmany('INSERT INTO prices VALUES (?,?,?,?)', [
('Widget',10.0,'2024-01-01','2024-03-31'),
('Widget',12.0,'2024-04-01','2024-12-31'),
('Gadget',25.0,'2024-01-01','2024-12-31')])
run('''CREATE TABLE IF NOT EXISTS orders3 (
id INTEGER, product TEXT, order_date TEXT, qty INTEGER)''')
runmany('INSERT INTO orders3 VALUES (?,?,?,?)', [
(1,'Widget','2024-02-15',3),(2,'Widget','2024-05-20',5),(3,'Gadget','2024-03-10',2)])
# Range join: match orders to the correct price band
rows = run('''
SELECT o.id, o.product, o.order_date, p.price, o.qty*p.price AS total
FROM orders3 o
JOIN prices p
ON o.product=p.product
AND o.order_date BETWEEN p.valid_from AND p.valid_to
ORDER BY o.id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY, name TEXT,
start TEXT, end TEXT)''')
runmany('INSERT INTO tasks VALUES (?,?,?,?)', [
(1,'Deploy','2024-01-10','2024-01-15'),
(2,'Test','2024-01-12','2024-01-18'),
(3,'Review','2024-01-20','2024-01-25'),
(4,'Release','2024-01-16','2024-01-22')])
# Find overlapping task pairs
rows = run('''
SELECT a.name AS task1, b.name AS task2
FROM tasks a JOIN tasks b
ON a.id < b.id
AND a.start <= b.end
AND a.end >= b.start
ORDER BY a.id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS staff (
id INTEGER PRIMARY KEY, name TEXT,
manager_id INTEGER, salary REAL, dept TEXT)''')
runmany('INSERT INTO staff VALUES (?,?,?,?,?)', [
(1,'CEO',None,15000,'Exec'),(2,'CTO',1,12000,'Eng'),
(3,'Dev1',2,8000,'Eng'),(4,'Dev2',2,7500,'Eng'),(5,'CHRO',1,11000,'HR')])
rows = run('''
SELECT e.name, m.name AS mgr,
e.salary,
AVG(p.salary) OVER(PARTITION BY e.manager_id) AS peer_avg
FROM staff e
LEFT JOIN staff m ON e.manager_id=m.id
ORDER BY e.id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS staff (
id INTEGER PRIMARY KEY, name TEXT,
dept TEXT, salary REAL)''')
runmany('INSERT INTO staff VALUES (?,?,?,?)', [
(1,'Alice','Eng',9000),(2,'Bob','Eng',7000),
(3,'Carol','HR',8000),(4,'Dave','HR',6000)])
# Write a non-equi self-join here
Go beyond basic ranking: use LAG/LEAD for period comparisons, NTILE for bucketing, FIRST_VALUE/LAST_VALUE for partition anchors, and rolling window frames.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS stock (
dt TEXT, ticker TEXT, close REAL)''')
runmany('INSERT INTO stock VALUES (?,?,?)', [
('2024-01-01','AAPL',185.0),('2024-01-02','AAPL',186.5),
('2024-01-03','AAPL',184.0),('2024-01-04','AAPL',188.0),
('2024-01-05','AAPL',190.0)])
# Daily return using LAG
rows = run('''
SELECT dt, close,
LAG(close) OVER(ORDER BY dt) AS prev,
ROUND(close - LAG(close) OVER(ORDER BY dt), 2) AS change
FROM stock ORDER BY dt''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales3 (
month INTEGER, revenue REAL)''')
runmany('INSERT INTO sales3 VALUES (?,?)', [
(1,5000),(2,5500),(3,4800),(4,6200),(5,6800),(6,7100)])
# Compare to next month with LEAD; running avg
rows = run('''
SELECT month, revenue,
LEAD(revenue) OVER(ORDER BY month) AS next_month,
ROUND(AVG(revenue) OVER(
ORDER BY month ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),2) AS rolling3
FROM sales3''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS scores (
student TEXT, score INTEGER)''')
runmany('INSERT INTO scores VALUES (?,?)', [
('Alice',92),('Bob',75),('Carol',88),
('Dave',60),('Eve',95),('Frank',70)])
# NTILE: split into quartiles
rows = run('''
SELECT student, score,
NTILE(4) OVER(ORDER BY score) AS quartile
FROM scores ORDER BY score''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS temps (
city TEXT, day INTEGER, temp REAL)''')
runmany('INSERT INTO temps VALUES (?,?,?)', [
('NYC',1,5.0),('NYC',2,7.0),('NYC',3,4.0),
('LA',1,20.0),('LA',2,22.0),('LA',3,19.0)])
# FIRST_VALUE and LAST_VALUE per city
rows = run('''
SELECT city, day, temp,
FIRST_VALUE(temp) OVER(PARTITION BY city ORDER BY day) AS first_t,
MAX(temp) OVER(PARTITION BY city) AS peak
FROM temps ORDER BY city, day''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS monthly (
month INTEGER, revenue REAL)''')
runmany('INSERT INTO monthly VALUES (?,?)', [
(1,4000),(2,4500),(3,4200),(4,5000),(5,5500),(6,6000)])
rows = run('''
SELECT month, revenue,
LAG(revenue) OVER(ORDER BY month) AS prev,
ROUND(revenue - LAG(revenue) OVER(ORDER BY month),2) AS mom_change,
NTILE(3) OVER(ORDER BY revenue) AS tier
FROM monthly''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS visits (
user_id INTEGER, visit_date TEXT, pages_viewed INTEGER)''')
runmany('INSERT INTO visits VALUES (?,?,?)', [
(1,'2024-01-01',5),(1,'2024-01-05',8),(1,'2024-01-12',3),
(2,'2024-01-02',10),(2,'2024-01-10',7)])
# Write LAG and MAX window functions here
Apply 1NF, 2NF, and 3NF to eliminate redundancy, design proper primary and foreign keys, and inspect schemas with SQLite system tables.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# 1NF: atomic values, no repeating groups
run('''CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL)''')
run('''CREATE TABLE IF NOT EXISTS contact_phones (
contact_id INTEGER,
phone TEXT NOT NULL,
FOREIGN KEY(contact_id) REFERENCES contacts(id))''')
runmany('INSERT INTO contacts VALUES (?,?,?)', [
(1,'Alice','alice@ex.com'),(2,'Bob','bob@ex.com')])
runmany('INSERT INTO contact_phones VALUES (?,?)', [
(1,'555-0001'),(1,'555-0002'),(2,'555-0003')])
print(run('SELECT * FROM contacts'))
print(run('SELECT * FROM contact_phones'))
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# 2NF: remove partial dependencies (composite key fix)
run('''CREATE TABLE IF NOT EXISTS products (
product_id INTEGER PRIMARY KEY,
product_name TEXT NOT NULL,
price REAL NOT NULL)''')
run('''CREATE TABLE IF NOT EXISTS order_items (
order_id INTEGER,
product_id INTEGER,
qty INTEGER NOT NULL,
PRIMARY KEY(order_id, product_id),
FOREIGN KEY(product_id) REFERENCES products(product_id))''')
runmany('INSERT INTO products VALUES (?,?,?)', [
(10,'Widget',9.99),(11,'Gadget',24.99)])
runmany('INSERT INTO order_items VALUES (?,?,?)', [
(1,10,3),(1,11,1),(2,10,5)])
rows = run('''
SELECT oi.order_id, p.product_name, oi.qty, p.price*oi.qty AS subtotal
FROM order_items oi JOIN products p ON oi.product_id=p.product_id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# 3NF: remove transitive dependencies
run('''CREATE TABLE IF NOT EXISTS departments (
dept_id INTEGER PRIMARY KEY, dept_name TEXT, location TEXT)''')
run('''CREATE TABLE IF NOT EXISTS staff2 (
id INTEGER PRIMARY KEY, name TEXT,
dept_id INTEGER,
FOREIGN KEY(dept_id) REFERENCES departments(dept_id))''')
runmany('INSERT INTO departments VALUES (?,?,?)', [
(1,'Engineering','Floor 3'),(2,'HR','Floor 1')])
runmany('INSERT INTO staff2 VALUES (?,?,?)', [
(1,'Alice',1),(2,'Bob',1),(3,'Carol',2)])
rows = run('''
SELECT s.name, d.dept_name, d.location
FROM staff2 s JOIN departments d ON s.dept_id=d.dept_id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# Schema inspection and FK checks
run('''CREATE TABLE IF NOT EXISTS parent (id INTEGER PRIMARY KEY, val TEXT)''')
run('''CREATE TABLE IF NOT EXISTS child (
id INTEGER PRIMARY KEY,
parent_id INTEGER,
FOREIGN KEY(parent_id) REFERENCES parent(id))''')
run('''PRAGMA foreign_keys=ON''')
runmany('INSERT INTO parent VALUES (?,?)', [(1,'A'),(2,'B')])
runmany('INSERT INTO child VALUES (?,?)', [(10,1),(11,2)])
# List tables and schema
tables = run("SELECT name FROM sqlite_master WHERE type='table'")
print('Tables:', tables)
schema = run("SELECT sql FROM sqlite_master WHERE name='child'")
print(schema[0][0])
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY, name TEXT, email TEXT)''')
run('''CREATE TABLE IF NOT EXISTS products2 (
id INTEGER PRIMARY KEY, name TEXT, price REAL)''')
run('''CREATE TABLE IF NOT EXISTS orders2 (
id INTEGER PRIMARY KEY, customer_id INTEGER, order_date TEXT)''')
run('''CREATE TABLE IF NOT EXISTS order_lines (
order_id INTEGER, product_id INTEGER, qty INTEGER,
PRIMARY KEY(order_id, product_id))''')
runmany('INSERT INTO customers VALUES (?,?,?)', [(1,'Alice','a@ex.com')])
runmany('INSERT INTO products2 VALUES (?,?,?)', [(1,'Widget',10.0)])
runmany('INSERT INTO orders2 VALUES (?,?,?)', [(1,1,'2024-01-15')])
runmany('INSERT INTO order_lines VALUES (?,?,?)', [(1,1,3)])
rows = run('''
SELECT c.name, o.order_date, p.name, ol.qty, p.price*ol.qty AS total
FROM orders2 o
JOIN customers c ON o.customer_id=c.id
JOIN order_lines ol ON ol.order_id=o.id
JOIN products2 p ON p.id=ol.product_id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# Denormalized (bad):
# orders_flat: order_id, customer_name, customer_email, product_name, price, qty
# Violates 2NF (price depends only on product) and 3NF if email depends on name.
# Write your normalized CREATE TABLE statements here:
# run('''CREATE TABLE customers ...''')
# run('''CREATE TABLE products ...''')
# run('''CREATE TABLE orders ...''')
# run('''CREATE TABLE order_items ...''')
Store, extract, modify, and filter JSON data in SQLite using json_extract, json_set, json_insert, and json_remove.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
payload TEXT)''') -- stored as JSON text
data = [
(1, json.dumps({'user':'alice','action':'login','meta':{'ip':'1.2.3.4'}})),
(2, json.dumps({'user':'bob','action':'purchase','meta':{'items':3,'total':59.99}})),
(3, json.dumps({'user':'alice','action':'logout','meta':{'ip':'1.2.3.4'}})),
]
runmany('INSERT INTO events VALUES (?,?)', data)
# Extract top-level field with json_extract
rows = run("SELECT id, json_extract(payload,'$.user'), json_extract(payload,'$.action') FROM events")
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS events2 (id INTEGER PRIMARY KEY, payload TEXT)''')
runmany('INSERT INTO events2 VALUES (?,?)', [
(1, json.dumps({'user':'alice','tags':['sql','python'],'score':95})),
(2, json.dumps({'user':'bob','tags':['ml','python'],'score':80})),
])
# Access nested and array elements
rows = run('''
SELECT id,
json_extract(payload, '$.user') AS user,
json_extract(payload, '$.score') AS score,
json_extract(payload, '$.tags[0]') AS first_tag
FROM events2''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS configs (
id INTEGER PRIMARY KEY, settings TEXT)''')
run('''INSERT INTO configs VALUES (1, ?)''',
(json.dumps({'theme':'dark','lang':'en','max_rows':100}),))
# Modify JSON with json_set, json_insert, json_remove
run('''UPDATE configs SET settings = json_set(settings, '$.theme', 'light') WHERE id=1''')
run('''UPDATE configs SET settings = json_set(settings, '$.debug', 1) WHERE id=1''')
row = run('SELECT settings FROM configs WHERE id=1')
print(json.loads(row[0][0]))
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS products3 (
id INTEGER PRIMARY KEY, name TEXT, attributes TEXT)''')
runmany('INSERT INTO products3 VALUES (?,?,?)', [
(1,'Widget', json.dumps({'color':'red','weight':0.5,'tags':['sale','new']})),
(2,'Gadget', json.dumps({'color':'blue','weight':1.2,'tags':['new']})),
(3,'Doohickey', json.dumps({'color':'red','weight':0.3,'tags':['sale']})),
])
# Filter on JSON field and aggregate
rows = run('''
SELECT name,
json_extract(attributes,'$.color') AS color,
json_extract(attributes,'$.weight') AS weight
FROM products3
WHERE json_extract(attributes,'$.color')='red'
ORDER BY weight''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS user_events (
id INTEGER PRIMARY KEY, payload TEXT)''')
runmany('INSERT INTO user_events VALUES (?,?)', [
(1,json.dumps({'user':'alice','action':'purchase','amount':49.99})),
(2,json.dumps({'user':'bob','action':'view','amount':0})),
(3,json.dumps({'user':'alice','action':'purchase','amount':29.99})),
(4,json.dumps({'user':'carol','action':'purchase','amount':99.99})),
])
rows = run('''
SELECT
json_extract(payload,'$.user') AS user,
SUM(CAST(json_extract(payload,'$.amount') AS REAL)) AS total_spend
FROM user_events
WHERE json_extract(payload,'$.action')='purchase'
GROUP BY user ORDER BY total_spend DESC''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport json
run('''CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY, data TEXT)''')
runmany('INSERT INTO logs VALUES (?,?)', [
(1,json.dumps({'level':'INFO','message':'Started','timestamp':'2024-01-01T09:00:00'})),
(2,json.dumps({'level':'ERROR','message':'Null ref','timestamp':'2024-01-01T09:05:00'})),
(3,json.dumps({'level':'ERROR','message':'Timeout','timestamp':'2024-01-01T09:10:00'})),
])
# Write your json_extract filter query here
Use SQLite's FTS5 virtual tables to perform keyword search, phrase matching, boolean operators, column-specific search, and ranked results with snippets.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# FTS5 virtual table for full-text search
run('''CREATE VIRTUAL TABLE IF NOT EXISTS articles
USING fts5(title, body, tokenize='porter ascii')''')
runmany('INSERT INTO articles VALUES (?,?)', [
('SQL Basics','Learn the fundamentals of SQL queries and databases'),
('Python for Data','Python is great for data analysis and machine learning'),
('Advanced SQL','Window functions and CTEs are powerful SQL features'),
('NumPy Guide','NumPy provides fast array operations for numerical data'),
])
# Simple search
rows = run("SELECT title FROM articles WHERE articles MATCH 'SQL'")
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS docs
USING fts5(title, content)''')
runmany('INSERT INTO docs VALUES (?,?)', [
('Intro to ML','Machine learning uses statistical models to make predictions'),
('Deep Learning','Neural networks form the basis of deep learning systems'),
('SQL Mastery','Master SQL with window functions and query optimization'),
('Data Wrangling','Pandas and NumPy help with data cleaning and transformation'),
])
# Phrase search and column filter
rows = run("SELECT title FROM docs WHERE docs MATCH 'title:SQL'")
print('Title match:', rows)
rows = run("SELECT title, rank FROM docs WHERE docs MATCH 'data' ORDER BY rank")
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS notes
USING fts5(author, text)''')
runmany('INSERT INTO notes VALUES (?,?)', [
('Alice','Python decorators and closures are advanced features'),
('Bob','SQL joins and subqueries are essential for data analysis'),
('Alice','NumPy broadcasting makes array operations concise'),
('Bob','Window functions in SQL enable running totals and rankings'),
])
# Prefix search and boolean operators
rows = run("SELECT author, text FROM notes WHERE notes MATCH 'SQL AND data'")
print('AND:', rows)
rows = run("SELECT author, text FROM notes WHERE notes MATCH 'Python OR NumPy'")
print('OR:', [r[0] for r in rows])
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS kb
USING fts5(title, body)''')
runmany('INSERT INTO kb VALUES (?,?)', [
('Error Handling','Use try-except blocks to catch and handle Python exceptions gracefully'),
('Logging Best Practices','Configure logging with handlers, formatters and log levels'),
('Testing Strategies','Unit tests and integration tests ensure code reliability'),
('Code Review Tips','Peer code review improves code quality and knowledge sharing'),
])
# Snippet and highlight
rows = run('''
SELECT title, snippet(kb, 1, '<b>', '</b>', '...', 8)
FROM kb WHERE kb MATCH 'code'
ORDER BY rank''')
for r in rows: print(r[0], '|', r[1])
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS support_kb
USING fts5(title, content, tokenize='porter ascii')''')
runmany('INSERT INTO support_kb VALUES (?,?)', [
('Password Reset','To reset your password go to settings and click reset'),
('Billing FAQ','For billing questions contact support or view your invoice'),
('Account Setup','Set up your account by verifying your email address'),
('Password Policy','Passwords must be 8 characters with a mix of letters and numbers'),
])
query = 'password'
rows = run(f'''
SELECT title,
snippet(support_kb, 1, '**', '**', '...', 6) AS preview,
rank
FROM support_kb
WHERE support_kb MATCH ?
ORDER BY rank
''', (query,))
for r in rows: print(r[0], '|', r[1])
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE VIRTUAL TABLE IF NOT EXISTS recipes
USING fts5(name, ingredients, instructions)''')
runmany('INSERT INTO recipes VALUES (?,?,?)', [
('Pasta','flour eggs garlic olive oil','stir pasta in boiling water'),
('Soup','carrots garlic onion broth','simmer and stir until soft'),
('Salad','lettuce tomato cucumber','toss with dressing'),
('Stir Fry','garlic ginger soy sauce vegetables','stir fry on high heat'),
])
# Write your FTS5 AND query here
Use EXPLAIN QUERY PLAN to understand query execution, create regular, composite, covering, and partial indexes, and run ANALYZE to update planner statistics.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport time
run('''CREATE TABLE IF NOT EXISTS big (
id INTEGER PRIMARY KEY, val INTEGER, cat TEXT)''')
# Insert sample data
import random; random.seed(42)
rows = [(i, random.randint(1,1000), random.choice(['A','B','C'])) for i in range(1,5001)]
runmany('INSERT INTO big VALUES (?,?,?)', rows)
# Query without index
t0 = time.perf_counter()
run('SELECT COUNT(*) FROM big WHERE val > 500')
t1 = time.perf_counter()
# Add index
run('CREATE INDEX IF NOT EXISTS idx_val ON big(val)')
t2 = time.perf_counter()
run('SELECT COUNT(*) FROM big WHERE val > 500')
t3 = time.perf_counter()
print(f'Without index: {(t1-t0)*1000:.2f}ms')
print(f'With index: {(t3-t2)*1000:.2f}ms')
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS orders4 (
id INTEGER PRIMARY KEY, customer_id INTEGER,
status TEXT, amount REAL)''')
import random; random.seed(0)
rows = [(i, random.randint(1,100), random.choice(['new','paid','shipped']),
round(random.uniform(10,500),2)) for i in range(1,2001)]
runmany('INSERT INTO orders4 VALUES (?,?,?,?)', rows)
# EXPLAIN QUERY PLAN
plan = run('''EXPLAIN QUERY PLAN
SELECT customer_id, SUM(amount)
FROM orders4
WHERE status='paid'
GROUP BY customer_id''')
for p in plan: print(p)
# Add composite index
run('CREATE INDEX IF NOT EXISTS idx_status_cust ON orders4(status, customer_id)')
plan2 = run('''EXPLAIN QUERY PLAN
SELECT customer_id, SUM(amount)
FROM orders4 WHERE status='paid'
GROUP BY customer_id''')
for p in plan2: print(p)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS logs2 (
id INTEGER PRIMARY KEY,
user_id INTEGER, event TEXT, ts TEXT)''')
import random, string; random.seed(1)
events = ['login','logout','purchase','view']
rows = [(i, random.randint(1,50),
random.choice(events),
f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}')
for i in range(1,3001)]
runmany('INSERT INTO logs2 VALUES (?,?,?,?)', rows)
# Covering index: avoid table access for common query
run('CREATE INDEX IF NOT EXISTS idx_cov ON logs2(user_id, event, ts)')
plan = run('''EXPLAIN QUERY PLAN
SELECT user_id, event, ts FROM logs2
WHERE user_id=1 ORDER BY ts''')
for p in plan: print(p)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY, name TEXT, price REAL, stock INTEGER)''')
import random; random.seed(5)
rows = [(i,f'item_{i}',round(random.uniform(1,100),2),random.randint(0,500)) for i in range(1,1001)]
runmany('INSERT INTO items VALUES (?,?,?,?)', rows)
# ANALYZE updates statistics for the query planner
run('ANALYZE')
# Partial index: only index in-stock items
run('CREATE INDEX IF NOT EXISTS idx_instock ON items(price) WHERE stock > 0')
plan = run('''EXPLAIN QUERY PLAN
SELECT name, price FROM items
WHERE stock > 0 AND price < 20
ORDER BY price''')
for p in plan: print(p)
print(run('SELECT COUNT(*) FROM items WHERE stock>0'))
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS cust2 (id INTEGER PRIMARY KEY, name TEXT, tier TEXT)''')
run('''CREATE TABLE IF NOT EXISTS ord2 (
id INTEGER PRIMARY KEY, cust_id INTEGER, amount REAL, status TEXT)''')
import random; random.seed(7)
runmany('INSERT INTO cust2 VALUES (?,?,?)', [(i,f'C{i}',random.choice(['gold','silver'])) for i in range(1,501)])
runmany('INSERT INTO ord2 VALUES (?,?,?,?)',
[(i,random.randint(1,500),round(random.uniform(10,500),2),
random.choice(['paid','pending'])) for i in range(1,2001)])
plan = run('''EXPLAIN QUERY PLAN
SELECT c.name, SUM(o.amount)
FROM ord2 o JOIN cust2 c ON o.cust_id=c.id
WHERE o.status='paid' AND c.tier='gold'
GROUP BY c.id''')
for p in plan: print(p)
run('CREATE INDEX IF NOT EXISTS idx_ord_status ON ord2(status, cust_id)')
run('CREATE INDEX IF NOT EXISTS idx_cust_tier ON cust2(tier)')
plan2 = run('''EXPLAIN QUERY PLAN
SELECT c.name, SUM(o.amount)
FROM ord2 o JOIN cust2 c ON o.cust_id=c.id
WHERE o.status='paid' AND c.tier='gold'
GROUP BY c.id''')
for p in plan2: print(p)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nimport random; random.seed(9)
run('''CREATE TABLE IF NOT EXISTS transactions2 (
id INTEGER PRIMARY KEY, account_id INTEGER,
type TEXT, amount REAL, created_at TEXT)''')
rows = [(i, random.randint(1,50), random.choice(['debit','credit']),
round(random.uniform(1,1000),2), f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}')
for i in range(1,501)]
runmany('INSERT INTO transactions2 VALUES (?,?,?,?,?)', rows)
# First check the plan, then add an index
plan = run('''EXPLAIN QUERY PLAN
SELECT * FROM transactions2
WHERE account_id=5 AND type='debit' ORDER BY created_at''')
for p in plan: print(p)
# Add your index here
Apply SQL as a full data analysis tool: ingest and clean dirty data, compute monthly trends with running totals, compare performance vs. group averages, and pivot long-format survey data.
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\n# End-to-end: ingest, clean, transform, analyze
run('''CREATE TABLE IF NOT EXISTS raw_sales (
id INTEGER PRIMARY KEY,
rep TEXT, region TEXT,
amount TEXT, -- stored as text (dirty data)
sale_date TEXT)''')
runmany('INSERT INTO raw_sales VALUES (?,?,?,?,?)', [
(1,'Alice','North','1,200.50','2024-01-15'),
(2,'Bob','South','800','2024-01-20'),
(3,'Alice','North',' 950.00 ','2024-02-01'),
(4,'Carol','East','','2024-02-10'), -- missing amount
(5,'Bob','South','1100.75','2024-02-20'),
])
# Clean: strip whitespace, handle empty, cast
rows = run('''
SELECT rep, region,
CASE WHEN TRIM(REPLACE(amount,',',''))='' THEN NULL
ELSE CAST(TRIM(REPLACE(amount,',','')) AS REAL)
END AS clean_amount,
sale_date
FROM raw_sales''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS cleaned_sales (
rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO cleaned_sales VALUES (?,?,?,?)', [
('Alice','North',1200.50,'2024-01-15'),
('Bob','South',800.00,'2024-01-20'),
('Alice','North',950.00,'2024-02-01'),
('Bob','South',1100.75,'2024-02-20'),
('Carol','East',750.00,'2024-03-05'),
('Alice','North',1350.00,'2024-03-10'),
])
# Monthly trend and running total
rows = run('''
WITH monthly AS (
SELECT SUBSTR(sale_date,1,7) AS month,
rep, SUM(amount) AS total
FROM cleaned_sales GROUP BY month, rep)
SELECT month, rep, total,
SUM(total) OVER(PARTITION BY rep ORDER BY month) AS running_total
FROM monthly ORDER BY rep, month''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS sales_f (
rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO sales_f VALUES (?,?,?,?)', [
('Alice','North',1200,'2024-01-15'),('Bob','South',800,'2024-01-20'),
('Alice','North',950,'2024-02-01'),('Carol','East',750,'2024-02-10'),
('Bob','South',1100,'2024-02-20'),('Alice','North',1350,'2024-03-10'),
('Carol','East',900,'2024-03-15'),('Bob','South',1250,'2024-03-20'),
])
# Cohort-style: rep performance vs. regional average
rows = run('''
SELECT rep, region, ROUND(SUM(amount),2) AS rep_total,
ROUND(AVG(SUM(amount)) OVER(PARTITION BY region),2) AS region_avg,
ROUND(SUM(amount) - AVG(SUM(amount)) OVER(PARTITION BY region),2) AS vs_avg
FROM sales_f GROUP BY rep, region ORDER BY region, rep''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS survey (
respondent_id INTEGER, question TEXT, score INTEGER)''')
runmany('INSERT INTO survey VALUES (?,?,?)', [
(1,'Q1',4),(1,'Q2',5),(1,'Q3',3),
(2,'Q1',2),(2,'Q2',3),(2,'Q3',4),
(3,'Q1',5),(3,'Q2',5),(3,'Q3',5),
(4,'Q1',3),(4,'Q2',2),(4,'Q3',3),
])
# Pivot: one row per respondent, one column per question
rows = run('''
SELECT respondent_id,
MAX(CASE WHEN question='Q1' THEN score END) AS Q1,
MAX(CASE WHEN question='Q2' THEN score END) AS Q2,
MAX(CASE WHEN question='Q3' THEN score END) AS Q3,
ROUND(AVG(score),2) AS avg_score
FROM survey GROUP BY respondent_id ORDER BY respondent_id''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS pipeline_sales (
rep TEXT, region TEXT, amount REAL, sale_date TEXT)''')
runmany('INSERT INTO pipeline_sales VALUES (?,?,?,?)', [
('Alice','North',1200,'2024-01-10'),('Bob','North',950,'2024-01-20'),
('Alice','North',1400,'2024-02-05'),('Bob','North',1100,'2024-02-15'),
('Carol','South',800,'2024-01-12'),('Dave','South',900,'2024-01-25'),
('Carol','South',1050,'2024-02-08'),('Dave','South',750,'2024-02-18'),
])
rows = run('''
WITH monthly AS (
SELECT rep, region, SUBSTR(sale_date,1,7) AS month,
SUM(amount) AS total
FROM pipeline_sales GROUP BY rep, region, month),
with_growth AS (
SELECT *,
LAG(total) OVER(PARTITION BY rep ORDER BY month) AS prev,
ROUND(100.0*(total - LAG(total) OVER(PARTITION BY rep ORDER BY month))
/ LAG(total) OVER(PARTITION BY rep ORDER BY month), 1) AS pct_growth
FROM monthly)
SELECT *, RANK() OVER(PARTITION BY region, month ORDER BY total DESC) AS region_rank
FROM with_growth ORDER BY region, month, region_rank''')
for r in rows: print(r)
import sqlite3, contextlib\nconn = sqlite3.connect(':memory:')\nconn.execute('PRAGMA journal_mode=WAL')\ndef run(sql, params=()):\n with contextlib.closing(conn.cursor()) as cur:\n cur.execute(sql, params)\n if cur.description:\n return cur.fetchall()\n conn.commit()\n return cur.rowcount\ndef runmany(sql, rows):\n with contextlib.closing(conn.cursor()) as cur:\n cur.executemany(sql, rows)\n conn.commit()\nrun('''CREATE TABLE IF NOT EXISTS web_logs (
session_id INTEGER, user_id INTEGER,
page TEXT, duration_sec INTEGER, log_date TEXT)''')
runmany('INSERT INTO web_logs VALUES (?,?,?,?,?)', [
(1,1,'home',45,'2024-01-01'),(2,1,'about',20,'2024-01-01'),
(3,2,'home',60,'2024-01-01'),(4,2,'shop',90,'2024-01-01'),
(5,1,'home',35,'2024-01-02'),(6,1,'shop',55,'2024-01-02'),
(7,3,'home',15,'2024-01-01'),(8,3,'about',40,'2024-01-01'),
(9,2,'home',80,'2024-01-02'),(10,3,'shop',50,'2024-01-02'),
])
# Write your query: filter >30s, count per user/day, window max