Loading Module…

🐍 Python Basics

34 topics • Click any card to expand

1. Variables & Data Types

Python is dynamically typed β€” you don't declare types, Python infers them. The core types are int, float, str, bool, and NoneType.

Basic types and type checking
# Integers
age    = 25
year   = 2024

# Floats
price  = 9.99
pi     = 3.14159

# String
name   = "Alice"

# Boolean
active = True
done   = False

# NoneType
result = None

# Check types
print(type(age))       # <class 'int'>
print(type(price))     # <class 'float'>
print(type(name))      # <class 'str'>
print(type(active))    # <class 'bool'>
print(type(result))    # <class 'NoneType'>
Type conversion (casting)
# Convert between types
x = "42"
print(int(x) + 8)          # 50  (str β†’ int)
print(float(x) * 1.5)      # 63.0

n = 3.9
print(int(n))              # 3   (truncates, not rounds)
print(round(n))            # 4   (rounds)

print(str(100) + " items") # "100 items"
print(bool(0))             # False
print(bool(""))            # False
print(bool("hello"))       # True
print(bool(42))            # True
Multiple assignment and augmented operators
# Multiple assignment
a, b, c = 10, 20, 30
x = y = z = 0
print(a, b, c)   # 10 20 30
print(x, y, z)   # 0 0 0

# Swap without temp variable
a, b = b, a
print(a, b)      # 20 10

# Augmented assignment operators
score = 100
score += 15    # 115
score -= 5     # 110
score *= 2     # 220
score //= 3    # 73
score **= 2    # 5329
print("Score:", score)

# Readable large numbers
population = 8_100_000_000
pi_approx  = 3.141_592_653
print(f"Population: {population:,}")
Complex numbers, None checks, and type introspection
# Complex numbers
z1 = 3 + 4j
z2 = complex(1, -2)
print(f"z1 = {z1},  real={z1.real}, imag={z1.imag}")
print(f"|z1| = {abs(z1)}")          # magnitude: 5.0
print(f"z1 + z2 = {z1 + z2}")
print(f"z1 * z2 = {z1 * z2}")

# None checks β€” always use 'is' / 'is not', never ==
result = None
if result is None:
    print("result is None")

data = [0, "", None, False, 42, "hello"]
for item in data:
    falsy = "falsy" if not item else "truthy"
    none_check = " (is None)" if item is None else ""
    print(f"  {str(item):8s} -> {falsy}{none_check}")

# isinstance β€” safer than type() ==
values = [42, 3.14, "hi", True, None, [1,2]]
for v in values:
    print(f"  {str(v):8s}  int={isinstance(v, int)}  "
          f"float={isinstance(v, float)}  str={isinstance(v, str)}")
💼 Real-World: User Input Validation
A CLI app reads user input and converts it to the correct type before processing.
# Simulating user input processing
def parse_order(quantity_str, price_str, discount_str):
    try:
        quantity = int(quantity_str)
        price    = float(price_str)
        discount = float(discount_str) / 100
    except ValueError as e:
        return f"Invalid input: {e}"

    subtotal = quantity * price
    total    = subtotal * (1 - discount)
    return {
        "quantity": quantity,
        "price":    price,
        "discount": f"{discount:.0%}",
        "total":    round(total, 2)
    }

print(parse_order("3", "29.99", "10"))
print(parse_order("abc", "9.99", "5"))
🏋️ Practice: Variable Juggling
Create name (str), age (int), height (float). Swap age and height using tuple unpacking. Check if original age is between 18 and 65 (inclusive). Print a formatted f-string summary.
Starter Code
name   = "YOUR_NAME"
age    = 25          # set your age
height = 1.75        # set height in meters

# TODO: swap age and height using one line
# age, height = ???

# TODO: check if the original age (now stored in height) is 18-65
is_working_age = ???

# Expected: "Alice | Age: 1.75 | Height: 25m | Working age: True"
print(f"{name} | Age: {age} | Height: {height}m | Working age: {is_working_age}")
✅ Practice Checklist
2. Strings

Strings are sequences of characters. Python provides rich built-in methods for slicing, formatting, searching, and transforming text.

String methods and slicing
text = "  Hello, World!  "

print(text.strip())           # remove whitespace
print(text.lower())           # lowercase
print(text.upper())           # uppercase
print(text.replace("World", "Python"))
print(text.strip().split(", "))  # ['Hello', 'World!']

# Slicing
s = "Python"
print(s[0])      # P
print(s[-1])     # n
print(s[1:4])    # yth
print(s[::-1])   # nohtyP  (reverse)
print(len(s))    # 6
f-strings and formatting
name  = "Alice"
score = 98.567
rank  = 1

# f-string (recommended)
print(f"Name: {name}, Score: {score:.2f}, Rank: #{rank}")

# Padding and alignment
for item, price in [("Apple", 0.5), ("Banana", 0.25), ("Cherry", 1.99)]:
    print(f"{item:<10} ${price:>6.2f}")

# Multi-line string
message = (
    f"Congratulations {name}!
"
    f"Your score of {score:.1f} earned rank #{rank}."
)
print(message)
String searching, splitting, and joining
sentence = "Python is powerful, Python is readable, Python is fun"

print(sentence.count("Python"))       # 3
print(sentence.find("readable"))      # index of first match
print(sentence.startswith("Python"))  # True
print(sentence.endswith("fun"))       # True

# Split and join
parts    = sentence.split(", ")
rejoined = " | ".join(parts)
print(rejoined)

# strip variants
messy = "   hello world   "
print(repr(messy.strip()))    # 'hello world'

# partition β€” splits at first match only
before, sep, after = sentence.partition(" is ")
print(f"Before: '{before}'")
print(f"After:  '{after[:30]}...'")

# replace with count limit
print(sentence.replace("Python", "Ruby", 1))  # only first
f-string advanced: format spec, alignment, padding, expressions
import math

# Format spec: [[fill]align][sign][width][grouping][.precision][type]
pi = math.pi
print(f"{'pi':>12s}: {pi:>12.6f}")      # right-align, 6 decimals
print(f"{'pi':>12s}: {pi:>12.4e}")      # scientific notation
print(f"{'pi':>12s}: {pi:>12.2%}")      # as percentage

# Table with column alignment
header = f"{'Name':<15} {'Score':>8} {'Grade':>6} {'Bar':}"
print(header)
print("-" * 45)
students = [("Alice", 92.5), ("Bob", 74.3), ("Carol Marie", 88.0)]
for name, score in students:
    grade = "A" if score >= 90 else "B" if score >= 80 else "C"
    bar   = "#" * int(score // 10)
    print(f"{name:<15} {score:>8.1f} {grade:>6}  {bar}")

# Nested expressions inside f-strings
items = [3, 1, 4, 1, 5, 9, 2, 6]
print(f"max={max(items)}, sum={sum(items)}, avg={sum(items)/len(items):.2f}")

# Debug format (Python 3.8+): variable=value
x = 42
print(f"{x=}, {x**2=}, {math.sqrt(x)=:.4f}")
💼 Real-World: Log File Parser
A DevOps engineer parses and formats structured log messages from an application server.
import datetime

logs = [
    "[2024-01-15 09:23:11] ERROR   login_service: Invalid credentials for user bob@example.com",
    "[2024-01-15 09:24:55] INFO    auth_service:  Token issued for alice@example.com",
    "[2024-01-15 09:25:03] WARNING api_gateway:   Rate limit 80% for IP 192.168.1.42",
]

print(f"{'Time':9s} {'Level':8s} {'Service':15s} {'Message'}")
print("-" * 65)
for log in logs:
    # Parse: [datetime] LEVEL  service: message
    ts      = log[1:20]
    rest    = log[22:].strip()
    parts   = rest.split(None, 2)
    level   = parts[0]
    service = parts[1].rstrip(":")
    msg     = parts[2] if len(parts) > 2 else ""
    print(f"{ts[11:]:9s} {level:8s} {service:15s} {msg}")
🏋️ Practice: String Cleaning Pipeline
Given raw = ' super-pro Widget X200 ', clean it: strip whitespace, title-case it, replace hyphens with spaces, check if 'pro' appears (case-insensitive), and build a 6-char product code from the first 3 + last 3 chars (no spaces) uppercased.
Starter Code
raw = "  super-pro Widget X200  "

# 1. Strip whitespace and title-case
clean = raw.strip().title()

# 2. TODO: Replace hyphens with spaces
# clean = clean.replace(???)

# 3. TODO: Check 'pro' in original string (case-insensitive)
# has_pro = "pro" in raw.???()

# 4. TODO: Build 6-char code: first 3 + last 3 of clean (no spaces), uppercase
# no_spaces = clean.replace(" ", "")
# code = (no_spaces[:3] + ???).upper()

print(f"Clean: '{clean}'")
# print(f"Has pro: {has_pro}")
# print(f"Code: '{code}'")
# Expected: Clean='Super Pro Widget X200', Code='SUP200'
✅ Practice Checklist
3. Lists

Lists are ordered, mutable sequences. They're the most commonly used Python container β€” used for collections, stacks, queues, and more.

Creating, accessing, modifying
fruits = ["apple", "banana", "cherry", "date"]

print(fruits[0])          # apple
print(fruits[-1])         # date
print(fruits[1:3])        # ['banana', 'cherry']

# Modify
fruits.append("elderberry")     # add to end
fruits.insert(1, "avocado")     # insert at index 1
fruits.remove("banana")         # remove by value
popped = fruits.pop()           # remove & return last
print(fruits)
print("Popped:", popped)
print("Length:", len(fruits))
List methods and list comprehension
nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3]

nums.sort()
print("Sorted:", nums)
print("Reversed:", nums[::-1])
print("Count of 5:", nums.count(5))
print("Index of 9:", nums.index(9))
print("Sum:", sum(nums))
print("Max:", max(nums), "Min:", min(nums))

# List comprehension
squares  = [x**2 for x in range(1, 6)]
evens    = [x for x in range(20) if x % 2 == 0]
print("Squares:", squares)
print("Evens:", evens)
Nested lists, map, filter, and any/all
# Nested list (2D matrix)
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
print("Center:", matrix[1][1])   # 5

# Flatten nested list
flat = [x for row in matrix for x in row]
print("Flat:", flat)

# Transpose with comprehension
transposed = [[matrix[r][c] for r in range(3)] for c in range(3)]
print("Transposed[0]:", transposed[0])

# map and filter
nums = [1, -2, 3, -4, 5, -6]
doubled   = list(map(lambda x: x * 2, nums))
positives = list(filter(lambda x: x > 0, nums))
print("Doubled:  ", doubled)
print("Positives:", positives)

# any / all
print("any > 4:", any(x > 4 for x in nums))
print("all > 0:", all(x > 0 for x in nums))
sort vs sorted, key=, reverse, and bisect for sorted insertion
import bisect

# sort() mutates in place; sorted() returns a new list
nums = [5, 2, 8, 1, 9, 3]
new_sorted = sorted(nums)           # original unchanged
nums.sort()                         # in-place
print("sorted():", new_sorted)
print("sort() in-place:", nums)

# key= β€” sort by custom criteria
words = ["banana", "Apple", "cherry", "date", "FIG"]
print(sorted(words))                          # case-sensitive lexicographic
print(sorted(words, key=str.lower))           # case-insensitive
print(sorted(words, key=len))                 # by length
print(sorted(words, key=lambda w: (-len(w), w.lower())))  # len desc, alpha asc

# Sorting tuples: sort by 2nd element desc, then 1st asc
people = [("Bob",25), ("Alice",30), ("Carol",25), ("Dave",30)]
print(sorted(people, key=lambda p: (-p[1], p[0])))

# bisect β€” fast insertion point in a sorted list (binary search)
scores = [45, 58, 67, 74, 82, 88, 95]
new_score = 79
pos = bisect.bisect_left(scores, new_score)
bisect.insort(scores, new_score)   # inserts in sorted order
print(f"Inserted {new_score} at index {pos}: {scores}")
print(f"Rank from top: {len(scores) - pos} of {len(scores)}")
💼 Real-World: Student Grade Processor
A teacher processes a class grade list: compute stats, filter failing students, and build a ranking.
students = [
    ("Alice", 92), ("Bob", 74), ("Carol", 88),
    ("Dave", 51), ("Eve", 96), ("Frank", 63),
    ("Grace", 85), ("Hank", 47), ("Iris", 79),
]

scores = [s[1] for s in students]
avg    = sum(scores) / len(scores)

passing = [(n, s) for n, s in students if s >= 60]
failing = [(n, s) for n, s in students if s  < 60]
ranked  = sorted(students, key=lambda x: x[1], reverse=True)

print(f"Class average: {avg:.1f}")
print(f"Passing ({len(passing)}): {[n for n,_ in passing]}")
print(f"Failing ({len(failing)}): {[(n,s) for n,s in failing]}")
print("Top 3:", ranked[:3])
🏋️ Practice: Temperature Converter
Given temps_c = [22.5, 35.1, 18.0, 40.2, 28.7, 15.3, 33.8, 25.0], convert all to Fahrenheit (F = C*9/5+32) using a list comprehension. Filter hot days (>30Β°C). Sort descending. Find the min and max.
Starter Code
temps_c = [22.5, 35.1, 18.0, 40.2, 28.7, 15.3, 33.8, 25.0]

# 1. TODO: Convert to Fahrenheit using list comprehension
# temps_f = [??? for t in temps_c]

# 2. TODO: Filter days above 30Β°C
# hot_days = [??? for t in temps_c if ???]

# 3. TODO: Sort temps_c descending
# sorted_desc = sorted(???, reverse=True)

# 4. TODO: Min and max
# lo, hi = min(temps_c), max(temps_c)

print("Fahrenheit:", [round(f, 1) for f in temps_f])
print("Hot days:", sorted(hot_days))
print("Sorted desc:", sorted_desc)
print(f"Range: {lo}Β°C β€” {hi}Β°C")
✅ Practice Checklist
4. Tuples, Sets & Dictionaries

Tuples are immutable sequences; sets are unordered unique collections; dictionaries are key-value mappings.

Tuples and sets
# Tuple β€” immutable
point = (3, 7)
x, y  = point         # unpacking
print(f"x={x}, y={y}")

rgb   = (255, 128, 0)
print("Red channel:", rgb[0])

# Set β€” unique, unordered
a = {1, 2, 3, 4, 5}
b = {3, 4, 5, 6, 7}
print("Union:       ", a | b)
print("Intersection:", a & b)
print("Difference:  ", a - b)

tags = ["python", "data", "python", "ml", "data"]
unique_tags = set(tags)
print("Unique tags:", unique_tags)
Dictionaries
person = {"name": "Alice", "age": 30, "city": "NYC"}

print(person["name"])                 # Alice
print(person.get("email", "N/A"))     # safe get with default

# Add / update
person["email"]  = "alice@example.com"
person["age"]    = 31
del person["city"]

print(person)
print("Keys:",   list(person.keys()))
print("Values:", list(person.values()))

# Iterate
for k, v in person.items():
    print(f"  {k}: {v}")
Dict comprehension and merging
# Dict comprehension from zip
students = ["Alice", "Bob", "Carol", "Dave"]
scores   = [92, 74, 88, 51]

grade_map = dict(zip(students, scores))
print("Grade map:", grade_map)

# Filter with dict comprehension
passing = {name: score for name, score in grade_map.items() if score >= 60}
print("Passing:", passing)

# Map scores to letter grades
def letter(s):
    return "A" if s >= 90 else "B" if s >= 80 else "C" if s >= 70 else "D" if s >= 60 else "F"

letters = {name: letter(score) for name, score in grade_map.items()}
print("Letters:", letters)

# Dict merging with ** operator (Python 3.5+)
defaults = {"timeout": 30, "retries": 3, "verbose": False}
overrides = {"retries": 5, "verbose": True}
config = {**defaults, **overrides}   # overrides wins on conflict
print("Config:", config)

# Python 3.9+ merge operator (| and |=)
# config = defaults | overrides
OrderedDict, ChainMap, dict views, and set update operations
from collections import OrderedDict, ChainMap

# OrderedDict β€” remembers insertion order (useful for LRU-style caches)
od = OrderedDict()
od["first"]  = 1
od["second"] = 2
od["third"]  = 3
od.move_to_end("first")           # move 'first' to the end
print("OrderedDict:", list(od.keys()))

# popitem(last=False) removes from the front (FIFO)
key, val = od.popitem(last=False)
print(f"Popped first: {key}={val}, remaining: {list(od.keys())}")

# ChainMap β€” single view over multiple dicts (first match wins)
defaults = {"color": "blue", "size": "M", "font": "Arial"}
user_prefs = {"color": "red", "size": "L"}
session = {"font": "Helvetica"}
merged = ChainMap(session, user_prefs, defaults)
print("color:", merged["color"])   # 'red'   (user_prefs wins)
print("font:",  merged["font"])    # 'Helvetica' (session wins)

# Dict views are live β€” they reflect changes
d = {"a": 1, "b": 2, "c": 3}
keys_view = d.keys()
d["d"] = 4
print("Live keys view:", list(keys_view))  # includes 'd'

# Set operations with update / intersection_update
a = {1, 2, 3, 4, 5}
b = {3, 4, 5, 6, 7}
a.update({8, 9})                 # union in-place (|=)
print("After update:", sorted(a))
a.intersection_update(b | {8})   # keep only items in both (a &= ...)
print("After intersection_update:", sorted(a))
💼 Real-World: Inventory Tracking System
A small shop tracks stock levels with a dictionary and uses sets to find products needing reorder.
inventory = {
    "apple":   {"qty": 150, "price": 0.50, "min_stock": 50},
    "banana":  {"qty": 30,  "price": 0.25, "min_stock": 40},
    "milk":    {"qty": 10,  "price": 2.99, "min_stock": 20},
    "bread":   {"qty": 80,  "price": 3.49, "min_stock": 15},
    "cheese":  {"qty": 5,   "price": 5.99, "min_stock": 10},
}

reorder = {item for item, data in inventory.items()
           if data["qty"] < data["min_stock"]}

total_value = sum(d["qty"] * d["price"] for d in inventory.values())

print(f"Total inventory value: ${total_value:.2f}")
print(f"Items to reorder ({len(reorder)}): {reorder}")

for item in sorted(reorder):
    d = inventory[item]
    print(f"  {item:8s} qty={d['qty']:3d}  min={d['min_stock']:3d}  (order {d['min_stock']*2 - d['qty']} units)")
🏋️ Practice: Grade Book Manager
Create a grade book from two lists using zip, find failed students with a set comprehension, and map scores to letter grades with a dict comprehension.
Starter Code
students = ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank"]
scores   = [92, 58, 76, 45, 88, 63]

# TODO: Build grade_book dict from zip(students, scores)
# grade_book = dict(???)

# TODO: Find failed students (score < 60) using a set comprehension
# failed = {name for name, score in ???.items() if ???}

# TODO: Map each student to a letter grade with dict comprehension
# Use: A>=90, B>=80, C>=70, D>=60, F otherwise
# Hint: define a helper or use nested ternary
# letter_grades = {name: ??? for name, score in grade_book.items()}

# TODO: Merge grade_book with a "class_info" dict using **
# class_info = {"class": "Python 101", "semester": "Spring 2024"}
# full_record = {**class_info, "grades": letter_grades}

print("Grade book:", grade_book)
print("Failed:", failed)
print("Letter grades:", letter_grades)
✅ Practice Checklist
5. Control Flow

if/elif/else controls which code runs. Python uses indentation (4 spaces) instead of curly braces to define blocks.

if / elif / else
# Basic if-elif-else
temperature = 28

if temperature > 35:
    status = "Heat warning"
elif temperature > 25:
    status = "Warm"
elif temperature > 15:
    status = "Comfortable"
elif temperature > 5:
    status = "Cool"
else:
    status = "Cold"

print(f"{temperature}Β°C β†’ {status}")

# Ternary (one-liner)
label = "Pass" if temperature > 20 else "Fail"
print("Label:", label)

# Chained comparisons
x = 15
if 10 < x < 20:
    print(f"{x} is between 10 and 20")
Logical operators and truthiness
# and, or, not
age    = 22
income = 55000

eligible = age >= 18 and income >= 30000
print("Eligible:", eligible)

username = ""
display  = username or "Anonymous"
print("Display name:", display)

# in / not in
role = "editor"
allowed = ["admin", "editor", "moderator"]
if role in allowed:
    print(f"{role} has access")

# Walrus operator (Python 3.8+)
data = [1, 2, 3]
if n := len(data):
    print(f"List has {n} items")
match statement and password validator
# match statement (Python 3.10+) β€” structured pattern matching
def http_status(code):
    match code:
        case 200:
            return "OK"
        case 201:
            return "Created"
        case 400:
            return "Bad Request"
        case 401 | 403:
            return "Auth error"
        case 404:
            return "Not Found"
        case 500:
            return "Server Error"
        case _:
            return f"Unknown ({code})"

for code in [200, 201, 403, 404, 418]:
    print(f"  {code} β†’ {http_status(code)}")

# any() / all() for password strength
def check_password(pw):
    checks = {
        "length >= 8":  len(pw) >= 8,
        "has uppercase": any(c.isupper() for c in pw),
        "has digit":     any(c.isdigit() for c in pw),
        "has symbol":    any(c in "!@#$%^&*()" for c in pw),
    }
    for rule, ok in checks.items():
        print(f"  {'OK' if ok else 'FAIL':4s}  {rule}")
    return all(checks.values())

print("Strong:", check_password("Secure@9"))
print("Strong:", check_password("weakpass"))
Short-circuit evaluation, assert, and conditional imports
# Short-circuit evaluation
# 'and' stops at the first falsy value, 'or' stops at first truthy
def expensive():
    print("  [expensive() called]")
    return True

print("--- short-circuit AND ---")
result = False and expensive()    # expensive() never called
print("Result:", result)

print("--- short-circuit OR ---")
result = True or expensive()      # expensive() never called
print("Result:", result)

# Practical: safe attribute access via short-circuit
user = None
name = user and user.get("name", "")   # won't crash if user is None
print("Name:", name)                    # None (short-circuited)

user = {"name": "Alice", "role": "admin"}
name = user and user.get("name", "")
print("Name:", name)                    # "Alice"

# assert β€” for debugging invariants (disabled with python -O)
def divide(a, b):
    assert b != 0, f"Divisor must not be zero, got b={b}"
    return a / b

print(divide(10, 2))
try:
    divide(5, 0)
except AssertionError as e:
    print(f"AssertionError: {e}")

# Conditional import β€” try fast C lib, fall back to pure Python
try:
    import ujson as json_lib          # fast third-party JSON
    print("Using ujson")
except ImportError:
    import json as json_lib           # stdlib fallback
    print("Using stdlib json")

data = json_lib.dumps({"key": "value", "nums": [1, 2, 3]})
print("Encoded:", data)
💼 Real-World: Loan Eligibility Checker
A fintech app determines loan eligibility and interest rate tier based on applicant data.
def check_loan(age, income, credit_score, existing_debt):
    # Basic eligibility
    if age < 18:
        return "REJECTED", "Must be 18+"
    if income < 20000:
        return "REJECTED", "Minimum income $20,000"
    if credit_score < 580:
        return "REJECTED", "Credit score below 580"

    debt_to_income = existing_debt / income
    if debt_to_income > 0.5:
        return "REJECTED", f"Debt-to-income {debt_to_income:.0%} exceeds 50%"

    # Approved β€” determine tier
    if credit_score >= 750 and debt_to_income < 0.2:
        rate = 4.5
        tier = "Prime"
    elif credit_score >= 680:
        rate = 6.9
        tier = "Standard"
    else:
        rate = 11.5
        tier = "Subprime"

    return "APPROVED", f"{tier} rate: {rate}%"

applicants = [
    (25, 65000, 720, 5000),
    (17, 80000, 800, 0),
    (35, 90000, 760, 8000),
    (30, 25000, 620, 15000),
]
for a in applicants:
    status, msg = check_loan(*a)
    print(f"  Age={a[0]}, Income=${a[1]:,}, Score={a[2]} β†’ {status}: {msg}")
🏋️ Practice: Traffic Light Simulator
Implement traffic_action(color, has_pedestrian, is_emergency) that returns the correct action string using if/elif/else logic.
Starter Code
def traffic_action(color, has_pedestrian=False, is_emergency=False):
    # TODO: if is_emergency, all lights should yield β€” return "All yield for emergency"

    # TODO: use if/elif/else on color:
    #   "green"  -> "Go" (but if has_pedestrian -> "Go, watch for pedestrians")
    #   "yellow" -> "Slow down" (but if has_pedestrian -> "Stop for pedestrians")
    #   "red"    -> "Stop" (but if has_pedestrian -> "Stop β€” pedestrians crossing")
    #   default  -> f"Unknown signal: {color}"
    pass

# Test cases
print(traffic_action("green"))                          # Go
print(traffic_action("green",  has_pedestrian=True))    # Go, watch for pedestrians
print(traffic_action("yellow"))                         # Slow down
print(traffic_action("red",    has_pedestrian=True))    # Stop β€” pedestrians crossing
print(traffic_action("red",    is_emergency=True))      # All yield for emergency
print(traffic_action("purple"))                         # Unknown signal: purple
✅ Practice Checklist
6. Loops

for iterates over any iterable (list, range, string, dict). while loops run while a condition is True. Use break, continue, and enumerate for control.

for loops
# Loop over list
fruits = ["apple", "banana", "cherry"]
for fruit in fruits:
    print(fruit)

# Range
for i in range(1, 6):
    print(i, end=" ")
print()

# enumerate β€” get index + value
for i, fruit in enumerate(fruits, start=1):
    print(f"{i}. {fruit}")

# zip β€” loop two lists together
prices = [0.5, 0.25, 1.99]
for fruit, price in zip(fruits, prices):
    print(f"  {fruit}: ${price}")
while, break, continue
# while loop
count = 0
total = 0
while count < 5:
    total += count
    count += 1
print(f"Sum 0..4 = {total}")

# break β€” exit early
for n in range(100):
    if n * n > 50:
        print(f"First n where nΒ²>50: {n}")
        break

# continue β€” skip current iteration
for n in range(10):
    if n % 2 == 0:
        continue      # skip even numbers
    print(n, end=" ")
print()

# else on for loop (runs if not broken)
for n in range(2, 10):
    if 7 % n == 0 and n != 7:
        print("7 is not prime"); break
else:
    print("7 is prime")
Nested loops and accumulator pattern
# Multiplication table using nested loops
print("Multiplication table (1-5):")
for i in range(1, 6):
    row = ""
    for j in range(1, 6):
        row += f"{i*j:4d}"
    print(row)

# itertools.product β€” Cartesian product (like nested loops)
import itertools
suits  = ["β™ ", "β™₯", "♦", "♣"]
values = ["A", "K", "Q"]
cards  = list(itertools.product(values, suits))
print(f"\n{len(cards)} high cards:", cards[:4], "...")

# Running maximum accumulator pattern
readings = [12, 7, 25, 18, 30, 14, 42, 9, 36]
running_max = []
current_max = float("-inf")
for val in readings:
    if val > current_max:
        current_max = val
    running_max.append(current_max)
print("\nReadings:    ", readings)
print("Running max: ", running_max)
itertools recipes: chain, islice, takewhile, dropwhile, groupby
import itertools

# chain β€” iterate multiple iterables as one
a = [1, 2, 3]
b = ("four", "five")
c = range(6, 9)
for item in itertools.chain(a, b, c):
    print(item, end=" ")
print()

# islice β€” lazy slice of an iterator (no list copy)
gen = (x**2 for x in itertools.count(1))   # infinite squares
first_10 = list(itertools.islice(gen, 10))
print("First 10 squares:", first_10)

# takewhile / dropwhile β€” conditional iteration
data = [2, 4, 6, 7, 8, 10, 12]
taken   = list(itertools.takewhile(lambda x: x % 2 == 0, data))
dropped = list(itertools.dropwhile(lambda x: x % 2 == 0, data))
print("takewhile even:", taken)    # [2, 4, 6] β€” stops at 7
print("dropwhile even:", dropped)  # [7, 8, 10, 12] β€” starts at 7

# groupby β€” group consecutive items by a key (data must be sorted by key first)
entries = [
    ("Alice", "Engineering"), ("Bob", "Engineering"),
    ("Carol", "Marketing"),   ("Dave", "Marketing"),
    ("Eve",   "Engineering"),
]
entries.sort(key=lambda e: e[1])   # sort by department first
for dept, group in itertools.groupby(entries, key=lambda e: e[1]):
    names = [name for name, _ in group]
    print(f"  {dept}: {names}")
💼 Real-World: Sales Report Generator
A sales manager loops through weekly data to compute running totals, find best weeks, and flag targets.
weekly_sales = [42000, 38500, 51000, 47200, 29800, 55600, 48900, 61000, 39700, 52300]
target       = 45000
best_week    = 0
best_amount  = 0
total        = 0
above_target = 0

for week, sales in enumerate(weekly_sales, start=1):
    total += sales
    if sales > best_amount:
        best_amount = sales
        best_week   = week
    status = "βœ“" if sales >= target else "βœ—"
    if sales >= target:
        above_target += 1
    print(f"  Week {week:2d}: ${sales:>7,}  {status}")

avg = total / len(weekly_sales)
print(f"
Total:     ${total:>9,}")
print(f"Average:   ${avg:>9,.0f}")
print(f"Best week: Week {best_week} (${best_amount:,})")
print(f"On target: {above_target}/{len(weekly_sales)} weeks")
🏋️ Practice: FizzBuzz Plus
Loop from 1 to 30. For each number: if divisible by 3 add 'Fizz', by 5 add 'Buzz', by 7 add 'Zap'. Print the composed string, or the number if none apply.
Starter Code
results = []

for n in range(1, 31):
    label = ""
    # TODO: if divisible by 3, add "Fizz" to label
    # if n % 3 == 0: label += ???

    # TODO: if divisible by 5, add "Buzz" to label

    # TODO: if divisible by 7, add "Zap" to label

    # TODO: if label is still empty, use the number itself
    # results.append(label if label else str(n))
    pass

# Print 10 per line
for i in range(0, 30, 10):
    print("  " + "  ".join(f"{v:8s}" for v in results[i:i+10]))

# Expected row 1: 1  2  Fizz  4  Buzz  Fizz  Zap  8  Fizz  Buzz
✅ Practice Checklist
7. Functions

Functions let you encapsulate reusable logic. Python supports default arguments, *args, **kwargs, and lambda (anonymous) functions.

Defining functions and default arguments
def greet(name, greeting="Hello"):
    # Returns a greeting string
    return f"{greeting}, {name}!"

print(greet("Alice"))
print(greet("Bob", "Hi"))
print(greet(name="Carol", greeting="Hey"))

# Multiple return values (returns a tuple)
def stats(numbers):
    return min(numbers), max(numbers), sum(numbers)/len(numbers)

lo, hi, avg = stats([4, 8, 2, 9, 1, 7])
print(f"min={lo}, max={hi}, avg={avg:.2f}")
*args, **kwargs, lambda
# *args β€” variable positional arguments
def add_all(*args):
    return sum(args)

print(add_all(1, 2, 3))            # 6
print(add_all(10, 20, 30, 40))     # 100

# **kwargs β€” variable keyword arguments
def build_profile(**kwargs):
    return {k: v for k, v in kwargs.items()}

print(build_profile(name="Alice", age=30, role="admin"))

# Lambda (anonymous function)
square   = lambda x: x ** 2
multiply = lambda x, y: x * y

nums = [3, 1, 4, 1, 5, 9, 2, 6]
print(sorted(nums))
print(sorted(nums, key=lambda x: -x))   # descending
Closures and decorators
import time, functools

# Closure β€” inner function captures outer variable
def make_counter(start=0):
    count = [start]  # mutable container so inner fn can modify
    def counter():
        count[0] += 1
        return count[0]
    return counter

c1 = make_counter()
c2 = make_counter(10)
print(c1(), c1(), c1())   # 1 2 3
print(c2(), c2())          # 11 12  (independent state)

# Decorator β€” wraps a function to add behaviour
def timer(func):
    @functools.wraps(func)   # preserves __name__, __doc__
    def wrapper(*args, **kwargs):
        start  = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.6f}s")
        return result
    return wrapper

@timer
def slow_sum(n):
    return sum(range(n))

total = slow_sum(1_000_000)
print(f"Sum = {total:,}")
print("Function name preserved:", slow_sum.__name__)
Type hints, functools.reduce, and inspect signatures
from typing import List, Dict, Optional, Union, Callable
import functools, inspect

# Type hints β€” document intent, checked by mypy (not enforced at runtime)
def calculate_total(
    prices: List[float],
    tax_rate: float = 0.08,
    discount: Optional[float] = None,
) -> Dict[str, float]:
    subtotal = sum(prices)
    disc_amt  = subtotal * discount if discount else 0.0
    taxable   = subtotal - disc_amt
    total     = taxable * (1 + tax_rate)
    return {"subtotal": round(subtotal, 2),
            "discount": round(disc_amt, 2),
            "tax":      round(taxable * tax_rate, 2),
            "total":    round(total, 2)}

result = calculate_total([9.99, 24.50, 4.99], discount=0.1)
for k, v in result.items():
    print(f"  {k:10s}: ${v:.2f}")

# functools.reduce β€” fold sequence into single value
from functools import reduce
factorial = reduce(lambda acc, x: acc * x, range(1, 8))  # 7! = 5040
print(f"7! = {factorial}")

running_totals = []
reduce(lambda acc, x: (running_totals.append(acc + x), acc + x)[1],
       [10, 20, 30, 40], 0)
print("Running totals:", running_totals)

# inspect β€” introspect function signatures at runtime
def my_func(a: int, b: float = 3.14, *args, keyword: str = "hi", **kwargs):
    pass

sig = inspect.signature(my_func)
for name, param in sig.parameters.items():
    kind    = str(param.kind).split(".")[-1]
    default = param.default if param.default is not inspect.Parameter.empty else "required"
    print(f"  {name:10s} [{kind:20s}] default={default}")
💼 Real-World: Data Cleaning Pipeline
A data engineer writes a set of small, composable functions to clean and validate user records.
def clean_name(name):
    return " ".join(w.capitalize() for w in name.strip().split())

def clean_email(email):
    return email.strip().lower()

def validate_age(age, min_age=0, max_age=120):
    try:
        a = int(age)
        return a if min_age <= a <= max_age else None
    except (ValueError, TypeError):
        return None

def clean_record(record):
    return {
        "name":  clean_name(record.get("name", "")),
        "email": clean_email(record.get("email", "")),
        "age":   validate_age(record.get("age")),
    }

raw_records = [
    {"name": "  alice SMITH ", "email": "Alice@Example.COM ", "age": "28"},
    {"name": "BOB jones",      "email": "bob@company.com",    "age": "abc"},
    {"name": "carol  White",   "email": "  CAROL@test.org",  "age": "200"},
]

for rec in raw_records:
    cleaned = clean_record(rec)
    valid = "OK" if cleaned["age"] is not None else "INVALID AGE"
    print(f"  {cleaned['name']:18s} | {cleaned['email']:25s} | age={cleaned['age']} {valid}")
🏋️ Practice: Memoize Decorator
Write a memoize(func) decorator that caches results in a dict keyed by args. Then decorate a recursive fibonacci function and observe the speedup.
Starter Code
def memoize(func):
    cache = {}
    # TODO: define wrapper(*args) that:
    #   1. checks if args is already in cache
    #   2. if yes, returns cache[args]
    #   3. if no, calls func(*args), stores in cache, returns result
    # TODO: use functools.wraps(func) to preserve metadata
    # TODO: return wrapper
    pass

import functools

@memoize
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# Test: should complete instantly even for large n
print([fibonacci(i) for i in range(10)])  # [0,1,1,2,3,5,8,13,21,34]
print(fibonacci(35))                       # 9227465 β€” fast with memoize!
✅ Practice Checklist
8. Classes & OOP

Classes define blueprints for objects. Python supports encapsulation, inheritance, and special (dunder) methods like __str__ and __repr__.

Defining a class with __init__ and methods
class BankAccount:
    def __init__(self, owner, balance=0):
        self.owner   = owner
        self.balance = balance
        self._history = []        # convention: private

    def deposit(self, amount):
        if amount > 0:
            self.balance += amount
            self._history.append(f"+{amount:.2f}")

    def withdraw(self, amount):
        if amount > self.balance:
            print("Insufficient funds")
        else:
            self.balance -= amount
            self._history.append(f"-{amount:.2f}")

    def __str__(self):
        return f"Account({self.owner}, ${self.balance:.2f})"

acc = BankAccount("Alice", 1000)
acc.deposit(500)
acc.withdraw(200)
print(acc)
print("History:", acc._history)
Inheritance
class Animal:
    def __init__(self, name, sound):
        self.name  = name
        self.sound = sound

    def speak(self):
        return f"{self.name} says {self.sound}!"

class Dog(Animal):
    def __init__(self, name, breed):
        super().__init__(name, "Woof")
        self.breed = breed

    def fetch(self, item):
        return f"{self.name} fetches the {item}!"

class Cat(Animal):
    def __init__(self, name):
        super().__init__(name, "Meow")

    def purr(self):
        return f"{self.name} purrs..."

dog = Dog("Rex", "Labrador")
cat = Cat("Whiskers")
print(dog.speak(), dog.fetch("ball"))
print(cat.speak(), cat.purr())
Properties, classmethods, and dunder comparison methods
class Temperature:
    def __init__(self, celsius):
        self._celsius = celsius

    @property
    def celsius(self):
        return self._celsius

    @celsius.setter
    def celsius(self, value):
        if value < -273.15:
            raise ValueError("Temperature below absolute zero!")
        self._celsius = value

    @property
    def fahrenheit(self):
        return self._celsius * 9/5 + 32

    @classmethod
    def from_fahrenheit(cls, f):
        return cls((f - 32) * 5/9)

    def __repr__(self):
        return f"Temperature({self._celsius:.2f}Β°C / {self.fahrenheit:.2f}Β°F)"

    def __lt__(self, other):
        return self._celsius < other._celsius

    def __eq__(self, other):
        return self._celsius == other._celsius

    def __add__(self, other):
        return Temperature(self._celsius + other._celsius)

t1 = Temperature(100)
t2 = Temperature.from_fahrenheit(32)   # 0Β°C
t3 = t1 + t2

print(t1)                  # 100Β°C / 212Β°F
print(t2)                  # 0Β°C / 32Β°F
print(t3)                  # 100Β°C sum
print(t2 < t1)             # True
print(sorted([t1, t2, t3]))
Abstract base classes (ABC), dataclasses, and __slots__
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
import sys

# Abstract Base Class β€” define an interface that subclasses must implement
class Shape(ABC):
    @abstractmethod
    def area(self) -> float:
        ...
    @abstractmethod
    def perimeter(self) -> float:
        ...
    def describe(self):
        return f"{type(self).__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"

class Circle(Shape):
    def __init__(self, radius: float):
        self.radius = radius
    def area(self):
        import math; return math.pi * self.radius ** 2
    def perimeter(self):
        import math; return 2 * math.pi * self.radius

class Rectangle(Shape):
    def __init__(self, w: float, h: float):
        self.w, self.h = w, h
    def area(self):      return self.w * self.h
    def perimeter(self): return 2 * (self.w + self.h)

for shape in [Circle(5), Rectangle(4, 6)]:
    print(shape.describe())

# @dataclass β€” auto-generates __init__, __repr__, __eq__
@dataclass(order=True)
class Point:
    x: float
    y: float
    label: str = field(default="", compare=False)

    def distance_to(self, other: "Point") -> float:
        return ((self.x - other.x)**2 + (self.y - other.y)**2) ** 0.5

p1 = Point(0, 0, "origin")
p2 = Point(3, 4, "target")
print(p1, p2)
print(f"Distance: {p1.distance_to(p2):.2f}")
print("Sorted:", sorted([p2, p1]))

# __slots__ β€” restrict attributes, save memory
class SlottedPoint:
    __slots__ = ("x", "y")
    def __init__(self, x, y):
        self.x, self.y = x, y

sp = SlottedPoint(1, 2)
print(f"SlottedPoint: ({sp.x}, {sp.y})")
try:
    sp.z = 99    # can't add new attributes
except AttributeError as e:
    print(f"AttributeError: {e}")
💼 Real-World: E-Commerce Cart System
An online store uses OOP to model products and a shopping cart with discount logic.
class Product:
    def __init__(self, name, price, category):
        self.name     = name
        self.price    = price
        self.category = category

    def __repr__(self):
        return f"{self.name} (${self.price:.2f})"


class Cart:
    def __init__(self, user):
        self.user  = user
        self.items = []

    def add(self, product, qty=1):
        self.items.append({"product": product, "qty": qty})

    def subtotal(self):
        return sum(i["product"].price * i["qty"] for i in self.items)

    def apply_discount(self, code):
        discounts = {"SAVE10": 0.10, "HALF50": 0.50, "VIP20": 0.20}
        return discounts.get(code.upper(), 0)

    def checkout(self, code=""):
        sub      = self.subtotal()
        discount = self.apply_discount(code)
        total    = sub * (1 - discount)
        print(f"Cart for {self.user}:")
        for i in self.items:
            print(f"  {i['product'].name:15s} x{i['qty']}  ${i['product'].price * i['qty']:.2f}")
        print(f"  Subtotal: ${sub:.2f}")
        if discount: print(f"  Discount: -{discount:.0%}")
        print(f"  Total:    ${total:.2f}")

cart = Cart("Alice")
cart.add(Product("Laptop",  999.99, "Electronics"), 1)
cart.add(Product("Mouse",    29.99, "Electronics"), 2)
cart.add(Product("Notebook",  5.99, "Stationery"),  3)
cart.checkout("SAVE10")
🏋️ Practice: Build a Stack
Implement a Stack class with push, pop, peek, __len__, and __repr__. The stack should raise IndexError on pop/peek from an empty stack.
Starter Code
class Stack:
    def __init__(self):
        # TODO: initialise internal list self._data = []
        pass

    def push(self, item):
        # TODO: append item to self._data
        pass

    def pop(self):
        # TODO: raise IndexError("pop from empty stack") if empty
        # TODO: otherwise remove and return the top item
        pass

    def peek(self):
        # TODO: raise IndexError("peek from empty stack") if empty
        # TODO: otherwise return top item WITHOUT removing it
        pass

    def __len__(self):
        # TODO: return number of items
        pass

    def __repr__(self):
        # TODO: return something like Stack([1, 2, 3]) β€” top is rightmost
        pass

# Tests
s = Stack()
s.push(1); s.push(2); s.push(3)
print(s)            # Stack([1, 2, 3])
print(len(s))       # 3
print(s.peek())     # 3
print(s.pop())      # 3
print(s)            # Stack([1, 2])
try:
    Stack().pop()
except IndexError as e:
    print(f"Caught: {e}")
✅ Practice Checklist
9. Error Handling

Use try/except/finally to handle exceptions gracefully. Raise custom exceptions to signal application-level errors.

try / except / finally
# Basic exception handling
def safe_divide(a, b):
    try:
        result = a / b
    except ZeroDivisionError:
        return "Error: cannot divide by zero"
    except TypeError as e:
        return f"Error: {e}"
    else:
        return result              # runs if no exception
    finally:
        print("safe_divide() called")  # always runs

print(safe_divide(10, 2))
print(safe_divide(10, 0))
print(safe_divide("x", 2))
Multiple exceptions and custom exceptions
class InsufficientFundsError(Exception):
    def __init__(self, amount, balance):
        self.amount  = amount
        self.balance = balance
        super().__init__(f"Tried to withdraw ${amount:.2f}, only ${balance:.2f} available")

def withdraw(balance, amount):
    if not isinstance(amount, (int, float)):
        raise TypeError(f"Amount must be a number, got {type(amount).__name__}")
    if amount <= 0:
        raise ValueError("Amount must be positive")
    if amount > balance:
        raise InsufficientFundsError(amount, balance)
    return balance - amount

for args in [(100, 30), (100, 200), (100, -10), (100, "abc")]:
    try:
        new_bal = withdraw(*args)
        print(f"Withdrew {args[1]}, new balance: {new_bal}")
    except (InsufficientFundsError, ValueError, TypeError) as e:
        print(f"Error: {e}")
Context managers and exception chaining
import time

# Custom context manager using __enter__ / __exit__
class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = time.perf_counter() - self.start
        print(f"Elapsed: {self.elapsed:.6f}s")
        return False  # don't suppress exceptions

with Timer() as t:
    total = sum(range(500_000))
print(f"Sum = {total:,}")

# Exception chaining β€” raise X from Y
class DatabaseError(Exception):
    pass

def fetch_user(user_id, data):
    try:
        return data[user_id]
    except KeyError as e:
        raise DatabaseError(f"User {user_id} not found") from e

records = {"alice": {"age": 30}, "bob": {"age": 25}}

for uid in ["alice", "carol"]:
    try:
        user = fetch_user(uid, records)
        print(f"Found: {user}")
    except DatabaseError as e:
        print(f"DB Error: {e}")
        print(f"  Caused by: {e.__cause__}")
contextlib helpers and logging module basics
import logging
import io
from contextlib import suppress, redirect_stdout, contextmanager

# suppress β€” silently ignore specific exceptions (replaces try/except/pass)
with suppress(FileNotFoundError):
    open("nonexistent_file.txt")   # no error raised
print("suppress: FileNotFoundError silently ignored")

# redirect_stdout β€” capture print() output into a buffer
buffer = io.StringIO()
with redirect_stdout(buffer):
    print("This goes into the buffer, not the terminal")
    print("So does this line")
captured = buffer.getvalue()
print(f"Captured {len(captured.splitlines())} lines: {captured.splitlines()[0]!r}")

# @contextmanager β€” create a context manager with a generator
@contextmanager
def managed_resource(name):
    print(f"  [open]  {name}")
    try:
        yield name.upper()     # value bound to 'as' target
    except Exception as e:
        print(f"  [error] {e}")
        raise
    finally:
        print(f"  [close] {name}")

with managed_resource("database_connection") as res:
    print(f"  Using: {res}")

# logging module basics
logging.basicConfig(
    level=logging.DEBUG,
    format="%(levelname)-8s %(name)s: %(message)s"
)
log = logging.getLogger("myapp")
log.debug("Debug-level detail (only shown at DEBUG+)")
log.info("Server started on port 8080")
log.warning("Disk usage at 85%%")
log.error("Failed to connect to database")
💼 Real-World: Robust File & API Data Reader
A data pipeline gracefully handles missing files, JSON parse errors, and unexpected data formats.
import json

def read_config(filepath):
    try:
        with open(filepath, "r") as f:
            data = json.load(f)
        return data
    except FileNotFoundError:
        print(f"Config file not found: {filepath}")
        return {}
    except json.JSONDecodeError as e:
        print(f"Invalid JSON in {filepath}: {e}")
        return {}

def get_setting(config, key, default=None, required=False):
    value = config.get(key, default)
    if required and value is None:
        raise KeyError(f"Required setting '{key}' is missing from config")
    return value

# Simulate loading a config
sample_config = {"db_host": "localhost", "db_port": 5432, "debug": True}

try:
    host    = get_setting(sample_config, "db_host",    required=True)
    port    = get_setting(sample_config, "db_port",    required=True)
    timeout = get_setting(sample_config, "timeout",    default=30)
    api_key = get_setting(sample_config, "api_key",    required=True)
except KeyError as e:
    print(f"Configuration error: {e}")
    host, port, timeout = "localhost", 5432, 30
    print(f"Using defaults: {host}:{port}, timeout={timeout}s")
🏋️ Practice: Safe Data Parser
Write parse_record(line) that parses a CSV line like 'Alice,28,92.5' into a dict with name (str), age (int), score (float). Return None on any error.
Starter Code
def parse_record(line):
    # TODO: split line by ","
    # TODO: wrap in try/except to catch ValueError and IndexError
    # TODO: inside try:
    #   parts = line.split(",")
    #   name  = parts[0].strip()
    #   age   = int(parts[1].strip())     # may raise ValueError
    #   score = float(parts[2].strip())   # may raise ValueError
    #   return {"name": name, "age": age, "score": score}
    # TODO: on except, return None
    pass

# Test cases
test_lines = [
    "Alice,28,92.5",      # valid
    "Bob,thirty,88.0",    # bad age
    "Carol,22",           # missing score (IndexError)
    "Dave,19,invalid",    # bad score
    "",                   # empty
]

for line in test_lines:
    result = parse_record(line)
    print(f"  {line!r:25s} -> {result}")
✅ Practice Checklist
10. File I/O

Read and write files using open(). Use the with statement to ensure files are always closed. Python handles text and binary files.

Reading and writing text files
import os

# Write a file
with open("demo.txt", "w") as f:
    f.write("Line 1: Hello World
")
    f.write("Line 2: Python File I/O
")
    f.writelines(["Line 3: data
", "Line 4: more data
"])

# Read entire file
with open("demo.txt", "r") as f:
    content = f.read()
print("Full content:
", content)

# Read line by line
with open("demo.txt", "r") as f:
    for i, line in enumerate(f, 1):
        print(f"  [{i}] {line.rstrip()}")

os.remove("demo.txt")  # cleanup
Working with CSV and JSON
import json, csv, io

# JSON
data = {"name": "Alice", "scores": [95, 87, 91], "active": True}
json_str = json.dumps(data, indent=2)
print("JSON:
", json_str)

loaded = json.loads(json_str)
print("Avg score:", sum(loaded["scores"]) / len(loaded["scores"]))

# CSV (using in-memory buffer)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["name", "age", "city"])
writer.writerows([["Alice",30,"NYC"],["Bob",25,"LA"],["Carol",35,"Chicago"]])

output.seek(0)
reader = csv.DictReader(output)
for row in reader:
    print(dict(row))
pathlib and binary I/O
import pathlib, io, tempfile

# pathlib β€” modern, object-oriented path handling
p = pathlib.Path.home()
print("Home dir:", p)
print("Exists:", p.exists())

# Build paths with / operator
tmp = pathlib.Path(tempfile.gettempdir())
data_file = tmp / "demo_data.txt"

# Write and read with pathlib
data_file.write_text("Hello from pathlib!\nLine 2\nLine 3\n", encoding="utf-8")
content = data_file.read_text(encoding="utf-8")
print("Read back:", content.splitlines())

# Inspect path parts
print("Name:     ", data_file.name)
print("Stem:     ", data_file.stem)
print("Suffix:   ", data_file.suffix)
print("Parent:   ", data_file.parent)

data_file.unlink()  # delete

# io.BytesIO β€” in-memory binary buffer (like a file but in RAM)
buf = io.BytesIO()
buf.write(b"\x89PNG\r\n")   # fake PNG header bytes
buf.write(b"binary data here")
buf.seek(0)
header = buf.read(6)
print("Bytes header:", header)
print("Buffer size:", buf.getbuffer().nbytes, "bytes")
pathlib.Path advanced: glob, rglob, iterdir, and tempfile module
import pathlib, tempfile, os

# Create a temporary directory to experiment in
with tempfile.TemporaryDirectory() as tmpdir:
    root = pathlib.Path(tmpdir)

    # Create nested structure
    (root / "src").mkdir()
    (root / "src" / "utils").mkdir()
    (root / "data").mkdir()
    (root / "src" / "main.py").write_text("# main", encoding="utf-8")
    (root / "src" / "helper.py").write_text("# helper", encoding="utf-8")
    (root / "src" / "utils" / "tools.py").write_text("# tools", encoding="utf-8")
    (root / "data" / "report.csv").write_text("a,b,c", encoding="utf-8")
    (root / "data" / "notes.txt").write_text("notes", encoding="utf-8")
    (root / "README.md").write_text("# Project", encoding="utf-8")

    # iterdir() β€” immediate children only (non-recursive)
    print("Top-level items:")
    for item in sorted(root.iterdir()):
        kind = "DIR " if item.is_dir() else "FILE"
        print(f"  {kind}  {item.name}")

    # glob() β€” match pattern in direct children
    print("\n*.md files (glob):", [p.name for p in root.glob("*.md")])

    # rglob() β€” recursive glob across all subdirectories
    print("All .py files (rglob):")
    for py in sorted(root.rglob("*.py")):
        print(f"  {py.relative_to(root)}")

    print("All files (rglob **):")
    all_files = sorted(root.rglob("*"))
    for f in all_files:
        if f.is_file():
            print(f"  {f.relative_to(root)}  ({f.stat().st_size} bytes)")

# tempfile β€” create named temp files that auto-delete
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") as tf:
    tf.write('{"status": "ok"}')
    tmp_path = pathlib.Path(tf.name)

print(f"\nTemp file: {tmp_path.name}")
print("Content:", tmp_path.read_text(encoding="utf-8"))
tmp_path.unlink()   # manual cleanup since delete=False
print("Temp file deleted:", not tmp_path.exists())
💼 Real-World: Sales Report File Processor
A business analyst reads daily sales CSV files, aggregates totals, and writes a JSON summary report.
import csv, json, io

# Simulate CSV content
csv_data = "
".join([
    "date,product,region,qty,price",
    "2024-01-01,Widget,North,10,9.99",
    "2024-01-01,Gadget,South,5,49.99",
    "2024-01-02,Widget,East,15,9.99",
    "2024-01-02,Doohickey,North,8,19.99",
    "2024-01-03,Gadget,East,3,49.99",
    "2024-01-03,Widget,South,12,9.99",
])

reader  = csv.DictReader(io.StringIO(csv_data))
summary = {}

for row in reader:
    revenue = float(row["qty"]) * float(row["price"])
    product = row["product"]
    region  = row["region"]

    if product not in summary:
        summary[product] = {"total_revenue": 0, "total_qty": 0, "regions": {}}
    summary[product]["total_revenue"] += revenue
    summary[product]["total_qty"]     += int(row["qty"])
    summary[product]["regions"][region] = summary[product]["regions"].get(region, 0) + revenue

report = {k: {"revenue": round(v["total_revenue"],2), "qty": v["total_qty"],
              "top_region": max(v["regions"], key=v["regions"].get)}
          for k, v in summary.items()}

print(json.dumps(report, indent=2))
🏋️ Practice: Log File Analyzer
Parse a multi-line log string (via io.StringIO), count occurrences of each log level, and collect all ERROR message lines.
Starter Code
import io

log_data = """2024-01-15 INFO    Server started on port 8080
2024-01-15 DEBUG   Loading config file
2024-01-15 INFO    Database connected
2024-01-15 WARNING Disk usage at 80%
2024-01-15 ERROR   Failed to connect to cache: timeout
2024-01-15 INFO    Request received: GET /home
2024-01-15 ERROR   Database query failed: syntax error
2024-01-15 WARNING Memory usage high: 75%
2024-01-15 INFO    Request completed in 120ms
2024-01-15 CRITICAL Disk full β€” writes disabled"""

# TODO: create a file-like object from log_data using io.StringIO
# f = io.StringIO(???)

# TODO: iterate over lines, split each line to get the level (index 1)
# count level occurrences in a dict: level_counts = {}
# if the level is "ERROR", append the full line to error_lines list

# Expected output:
# Level counts: {'INFO': 4, 'DEBUG': 1, 'WARNING': 2, 'ERROR': 2, 'CRITICAL': 1}
# Error lines:
#   2024-01-15 ERROR   Failed to connect to cache: timeout
#   2024-01-15 ERROR   Database query failed: syntax error
✅ Practice Checklist
11. List Comprehensions & Generators

Comprehensions create lists, dicts, and sets concisely. Generators produce values lazily, saving memory for large sequences.

List, dict, and set comprehensions
# List comprehension
squares = [x**2 for x in range(10)]
evens   = [x for x in range(20) if x % 2 == 0]
matrix  = [[i*j for j in range(1,4)] for i in range(1,4)]

print("Squares:", squares[:5])
print("Evens:", evens)
print("Matrix:", matrix)

# Dict comprehension
word   = "mississippi"
counts = {ch: word.count(ch) for ch in set(word)}
print("Char counts:", dict(sorted(counts.items())))

# Set comprehension
text  = ["hello", "world", "hello", "python"]
unique_upper = {w.upper() for w in text}
print("Unique upper:", unique_upper)
Generators and generator expressions
# Generator function (yields values lazily)
def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

fibs = list(fibonacci(10))
print("Fibonacci:", fibs)

# Generator expression (lazy list comprehension)
big_squares = (x**2 for x in range(1_000_000))
print("First 5:", [next(big_squares) for _ in range(5)])

# sum() with generator β€” no list created in memory
total = sum(x**2 for x in range(1000))
print("Sum of squares 0..999:", total)
Generator pipeline
# Chain generators together β€” each processes values lazily

def read_numbers(data):
    """Yield numbers one at a time from a list."""
    for n in data:
        yield n

def filter_positive(numbers):
    """Yield only positive numbers."""
    for n in numbers:
        if n > 0:
            yield n

def square(numbers):
    """Yield squares of numbers."""
    for n in numbers:
        yield n * n

def running_total(numbers):
    """Yield cumulative sum at each step."""
    total = 0
    for n in numbers:
        total += n
        yield total

# Build the pipeline
raw      = [-3, 1, -1, 4, 5, -9, 2, 6]
pipeline = running_total(square(filter_positive(read_numbers(raw))))

print("Pipeline output:", list(pipeline))
# positives: 1,4,5,2,6  squares: 1,16,25,4,36  running: 1,17,42,46,82

# Nested comprehension β€” flatten a matrix
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat   = [cell for row in matrix for cell in row]
print("Flat matrix:", flat)

# Nested comprehension β€” all pairs where i != j
pairs = [(i, j) for i in range(4) for j in range(4) if i != j]
print(f"Pairs (i!=j): {len(pairs)} pairs, first 4: {pairs[:4]}")
itertools.chain, zip_longest, and starmap
import itertools

# chain.from_iterable β€” flatten one level of nested iterables
nested = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
flat   = list(itertools.chain.from_iterable(nested))
print("chain.from_iterable:", flat)

# zip_longest β€” zip unequal-length iterables, filling with a default
names  = ["Alice", "Bob", "Carol"]
scores = [92, 85]
grades = ["A"]
for row in itertools.zip_longest(names, scores, grades, fillvalue="N/A"):
    print(f"  {row[0]:8s}  score={row[1]:>4}  grade={row[2]}")

# starmap β€” map with argument unpacking (like map but for tuple arguments)
pairs  = [(2, 10), (3, 4), (10, 2), (5, 3)]
powers = list(itertools.starmap(pow, pairs))
print("starmap(pow, pairs):", powers)   # [1024, 81, 100, 125]

# Practical: generate a multiplication table with starmap
import operator
combos = itertools.product(range(1, 4), range(1, 4))
table  = list(itertools.starmap(operator.mul, combos))
print("3x3 mul table (flat):", table)

# accumulate β€” running totals / cumulative operations
sales   = [1200, 850, 1400, 980, 1100]
running = list(itertools.accumulate(sales))
print("Running sales totals:", running)

running_max = list(itertools.accumulate(sales, max))
print("Running maximums:    ", running_max)
💼 Real-World: Log File Streaming Processor
A data engineer uses generators to process large log files line-by-line without loading everything into memory.
import io

# Simulate a large log file as a generator
def stream_logs(file_obj, min_level="WARNING"):
    levels = {"DEBUG":0, "INFO":1, "WARNING":2, "ERROR":3, "CRITICAL":4}
    min_n  = levels.get(min_level, 0)
    for line in file_obj:
        line = line.strip()
        if not line: continue
        parts = line.split(None, 3)
        if len(parts) < 4: continue
        level = parts[1]
        if levels.get(level, 0) >= min_n:
            yield {"ts": parts[0], "level": level, "service": parts[2], "msg": parts[3]}

sample_log = io.StringIO("
".join([
    "2024-01-15 DEBUG   db_pool:      Connection acquired",
    "2024-01-15 INFO    auth_service: User login alice@co.com",
    "2024-01-15 WARNING api_gateway:  Rate limit 90% for 192.168.1.1",
    "2024-01-15 ERROR   payment_svc:  Timeout after 30s for order #8821",
    "2024-01-15 INFO    cache:        Cache miss for key user:42",
    "2024-01-15 CRITICAL db_pool:     Connection pool exhausted!",
]))

alerts = list(stream_logs(sample_log, min_level="WARNING"))
print(f"Found {len(alerts)} alerts:")
for a in alerts:
    print(f"  [{a['level']:8s}] {a['service']:12s} {a['msg']}")
🏋️ Practice: Data Processing Pipeline
Implement three chained generators: csv_rows() yields raw lines, parse_sales() parses each to a dict, high_value() keeps only sales above a threshold.
Starter Code
import io

csv_data = """date,product,qty,price
2024-01-01,Widget,10,9.99
2024-01-02,Gadget,5,49.99
2024-01-03,Widget,15,9.99
2024-01-04,SuperGadget,2,199.99
2024-01-05,Widget,8,9.99
2024-01-06,Gadget,3,49.99"""

def csv_rows(text):
    # TODO: use io.StringIO(text), skip the header line,
    # yield each remaining non-empty stripped line
    pass

def parse_sales(rows):
    # TODO: for each row, split by "," to get date, product, qty, price
    # yield dict: {"date": ..., "product": ..., "revenue": int(qty)*float(price)}
    pass

def high_value(sales, threshold=100):
    # TODO: yield only sales where revenue > threshold
    pass

# Chain the pipeline
pipeline = high_value(parse_sales(csv_rows(csv_data)))
for sale in pipeline:
    print(f"  {sale['date']}  {sale['product']:12s}  ${sale['revenue']:.2f}")
✅ Practice Checklist
12. Modules & Useful Built-ins

Python's standard library is vast. Key modules: os, sys, datetime, math, random, collections, itertools. Use import to access them.

os, datetime, math
import os
import math
import datetime

# os β€” file system and environment
cwd = os.getcwd()
print("CWD:", cwd)
print("Home:", os.path.expanduser("~"))
print("Path exists:", os.path.exists(cwd))

# datetime
today = datetime.date.today()
now   = datetime.datetime.now()
delta = datetime.timedelta(days=30)
print("Today:", today)
print("In 30 days:", today + delta)
print("Day of week:", today.strftime("%A"))

# math
print("pi:",    round(math.pi, 4))
print("sqrt(2):", round(math.sqrt(2), 4))
print("log2(1024):", math.log2(1024))
collections and itertools
from collections import Counter, defaultdict, namedtuple
import itertools

# Counter
words = "the quick brown fox jumps over the lazy dog the".split()
c = Counter(words)
print("Most common:", c.most_common(3))

# defaultdict
from collections import defaultdict
group = defaultdict(list)
data  = [("fruit","apple"),("veg","carrot"),("fruit","banana"),("veg","pea")]
for category, item in data:
    group[category].append(item)
print(dict(group))

# namedtuple
Point = namedtuple("Point", ["x", "y"])
p = Point(3, 7)
print(f"Point: x={p.x}, y={p.y}")

# itertools
pairs = list(itertools.combinations("ABCD", 2))
print("Combinations:", pairs)
functools and secrets
import functools, random, secrets

# functools.reduce β€” fold a sequence into a single value
from functools import reduce
product = reduce(lambda acc, x: acc * x, range(1, 6))  # 5! = 120
print("5! =", product)

# functools.partial β€” fix some arguments of a function
def power(base, exponent):
    return base ** exponent

square = functools.partial(power, exponent=2)
cube   = functools.partial(power, exponent=3)
print("Squares:", [square(x) for x in range(1, 6)])
print("Cubes:  ", [cube(x)   for x in range(1, 6)])

# functools.lru_cache β€” memoize automatically
@functools.lru_cache(maxsize=None)
def fib(n):
    if n < 2: return n
    return fib(n-1) + fib(n-2)

print("fib(35):", fib(35))
print("Cache info:", fib.cache_info())

# random vs secrets
# random β€” reproducible (seeded), for simulations
random.seed(42)
sample = random.sample(range(100), 5)
print("Random sample:", sample)

# secrets β€” cryptographically secure, for tokens/passwords
token = secrets.token_hex(16)     # 32-char hex string
print("Secure token:", token)
pin   = secrets.randbelow(10000)  # 0-9999
print("Secure PIN: ", str(pin).zfill(4))
importlib, sys.path, __name__ guard, and pprint
import sys
import importlib
import pprint

# sys.path β€” where Python searches for modules
print("sys.path entries (first 3):")
for p in sys.argv[0:1]:    # avoid printing too many paths
    pass
for path in sys.path[:3]:
    print(f"  {path!r}")

# sys.argv β€” command-line arguments
print(f"Script name: {sys.argv[0]!r}")

# sys.version / sys.platform β€” runtime info
print(f"Python {sys.version.split()[0]} on {sys.platform}")

# importlib β€” dynamic import by string name
math_mod = importlib.import_module("math")
print(f"math.tau = {math_mod.tau:.6f}")

json_mod = importlib.import_module("json")
encoded  = json_mod.dumps({"key": "value"})
print("Dynamic json.dumps:", encoded)

# __name__ == "__main__" pattern
# This block only runs when the script is executed directly,
# NOT when it is imported as a module.
if __name__ == "__main__":
    print("Running as main script β€” __name__:", __name__)

# pprint β€” pretty-print complex nested structures
data = {
    "users": [
        {"id": 1, "name": "Alice", "roles": ["admin", "editor"],
         "prefs": {"theme": "dark", "lang": "en"}},
        {"id": 2, "name": "Bob",   "roles": ["viewer"],
         "prefs": {"theme": "light", "lang": "fr"}},
    ],
    "meta": {"version": "2.1", "count": 2}
}
print("\npprint output:")
pprint.pprint(data, width=60, sort_dicts=False)
💼 Real-World: Web Request Log Analysis
A backend engineer uses Counter and defaultdict to analyze HTTP access logs and detect suspicious patterns.
from collections import Counter, defaultdict
import datetime

# Simulated access log entries: (ip, method, path, status, ts)
logs = [
    ("192.168.1.10", "GET",  "/home",     200, "2024-01-15 09:00:01"),
    ("10.0.0.5",     "POST", "/login",    401, "2024-01-15 09:00:03"),
    ("10.0.0.5",     "POST", "/login",    401, "2024-01-15 09:00:04"),
    ("10.0.0.5",     "POST", "/login",    401, "2024-01-15 09:00:05"),
    ("192.168.1.10", "GET",  "/products", 200, "2024-01-15 09:01:00"),
    ("172.16.0.1",   "GET",  "/admin",    403, "2024-01-15 09:01:30"),
    ("172.16.0.1",   "GET",  "/admin",    403, "2024-01-15 09:01:32"),
    ("192.168.1.20", "GET",  "/home",     200, "2024-01-15 09:02:00"),
    ("10.0.0.5",     "POST", "/login",    200, "2024-01-15 09:02:10"),
]

status_counts = Counter(entry[3] for entry in logs)
ip_requests   = Counter(entry[0] for entry in logs)
failures_by_ip = defaultdict(int)

for ip, method, path, status, ts in logs:
    if status in (401, 403):
        failures_by_ip[ip] += 1

print("Status codes:", dict(status_counts))
print("
Top IPs:")
for ip, count in ip_requests.most_common():
    fails = failures_by_ip[ip]
    flag  = " ⚠️ SUSPICIOUS" if fails >= 2 else ""
    print(f"  {ip:16s} {count:3d} requests, {fails} failures{flag}")
🏋️ Practice: Analyze Dataset with Built-ins
Use the statistics module and built-in functions to compute mean/median/stdev, find top-5 and bottom-5 scores, and bin scores into letter grade counts with Counter.
Starter Code
import statistics
from collections import Counter

scores = [72, 88, 95, 63, 79, 91, 55, 84, 76, 90,
          67, 83, 58, 97, 71, 80, 89, 62, 75, 93]

# TODO: compute mean, median, stdev using statistics module
# mean   = statistics.mean(scores)
# median = statistics.median(scores)
# stdev  = statistics.stdev(scores)
# print(f"Mean: {mean:.1f}, Median: {median}, StdDev: {stdev:.1f}")

# TODO: use sorted() to get top_5 (highest) and bottom_5 (lowest)
# top_5    = sorted(scores, reverse=True)[:5]
# bottom_5 = sorted(scores)[:5]

# TODO: map each score to a letter grade bin
# def grade_bin(s): return "A" if s>=90 else "B" if s>=80 else "C" if s>=70 else "D" if s>=60 else "F"
# bins = Counter(grade_bin(s) for s in scores)
# print("Grade bins:", dict(sorted(bins.items())))

print("Top 5:   ", top_5)
print("Bottom 5:", bottom_5)
✅ Practice Checklist
13. Context Managers

Manage resources safely and cleanly with the with statement. Guarantee teardown even when exceptions occur.

File handling with context managers
import tempfile, pathlib, os

# Bad pattern: manual open/close risks resource leak
# f = open('data.txt')
# data = f.read()  # if this raises, f never closes
# f.close()

# Good pattern: with statement guarantees close()
tmp = pathlib.Path(tempfile.mktemp(suffix='.txt'))
tmp.write_text('line 1\nline 2\nline 3')

with open(tmp) as f:
    data = f.read()
print('Read:', repr(data))

# Write mode
with open(tmp, 'a') as f:
    f.write('\nline 4')

# File is closed here even if an exception happened inside

# Reading line by line (memory-efficient for large files)
with open(tmp) as f:
    for i, line in enumerate(f, 1):
        print(f'  {i}: {line.rstrip()}')

tmp.unlink()
Multiple context managers in one with
import tempfile, pathlib

src = pathlib.Path(tempfile.mktemp(suffix='.txt'))
dst = pathlib.Path(tempfile.mktemp(suffix='.txt'))
src.write_text('hello from source')

# Open multiple files in one with statement
with open(src) as fin, open(dst, 'w') as fout:
    for line in fin:
        fout.write(line.upper())

print('Copied and uppercased:', dst.read_text())
src.unlink(); dst.unlink()

# Also works for nested managers of different types
import io
with io.StringIO('a,b,c\n1,2,3') as buf:
    print('StringIO:', buf.read())
Custom context manager with __enter__ / __exit__
import time

class Timer:
    def __init__(self, name='block'):
        self.name = name

    def __enter__(self):
        self.start = time.perf_counter()
        return self  # value bound to 'as' variable

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = time.perf_counter() - self.start
        print(f'[{self.name}] elapsed: {self.elapsed*1000:.2f} ms')
        # Return False (default) to re-raise any exception
        return False

with Timer('sum of squares') as t:
    result = sum(x**2 for x in range(1_000_000))

print(f'Result: {result:,}, time stored: {t.elapsed*1000:.2f} ms')

# Suppress specific exceptions by returning True from __exit__
class Suppress:
    def __init__(self, *exc_types):
        self.exc_types = exc_types
    def __enter__(self): return self
    def __exit__(self, exc_type, *_):
        return exc_type in self.exc_types

with Suppress(ZeroDivisionError):
    x = 1 / 0  # suppressed!
print('Continued after ZeroDivisionError')
contextlib.contextmanager decorator
from contextlib import contextmanager, suppress
import tempfile, pathlib

@contextmanager
def temporary_file(suffix='.txt', content=''):
    """Create a temp file, yield its path, delete on exit."""
    path = pathlib.Path(tempfile.mktemp(suffix=suffix))
    path.write_text(content)
    try:
        yield path
    finally:
        if path.exists():
            path.unlink()
        print(f'Cleaned up {path.name}')

@contextmanager
def log_section(name):
    print(f'>>> START: {name}')
    try:
        yield
    except Exception as e:
        print(f'>>> ERROR in {name}: {e}')
        raise
    finally:
        print(f'>>> END: {name}')

with temporary_file(content='hello world') as tmp:
    data = tmp.read_text()
    print('File content:', data)
# File is deleted here

with log_section('data processing'):
    result = [x**2 for x in range(5)]
    print('Result:', result)

# contextlib.suppress replaces try/except for known ignorable errors
with suppress(FileNotFoundError):
    pathlib.Path('nonexistent.txt').unlink()
print('Suppressed FileNotFoundError cleanly')
🏋️ Practice: Database Connection Manager
Write a context manager class DatabaseConnection that simulates opening/closing a DB connection (print messages). It should auto-rollback (print 'rolling back') if an exception occurs inside the with block, and auto-commit otherwise.
Starter Code
class DatabaseConnection:
    def __init__(self, url):
        self.url = url
        self.connected = False

    def __enter__(self):
        # TODO: set self.connected = True, print 'Connected to {url}'
        # TODO: return self
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        # TODO: if exception, print 'Rolling back', else print 'Committed'
        # TODO: print 'Disconnected', set connected = False
        # TODO: return False to propagate exceptions
        pass

# Test: should commit
with DatabaseConnection('sqlite:///app.db') as db:
    print(f'  Using connection (connected={db.connected})')

# Test: should rollback
try:
    with DatabaseConnection('sqlite:///app.db') as db:
        raise ValueError('Oops!')
except ValueError:
    pass
✅ Practice Checklist
14. Regular Expressions

Pattern matching and text extraction with the re module β€” character classes, groups, lookaheads, and real-world parsing patterns.

Basic patterns β€” search, match, findall
import re

text = 'Contact us at support@example.com or sales@company.org for help.'

# re.search β€” find first match anywhere in string
match = re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
if match:
    print('First email:', match.group())

# re.findall β€” return all matches as list
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
print('All emails:', emails)

# re.match β€” only matches at START of string
print('match at start:', re.match(r'Contact', text))    # matches
print('match at start:', re.match(r'support', text))   # None

# re.fullmatch β€” entire string must match
phone = '555-1234'
valid = re.fullmatch(r'\d{3}-\d{4}', phone)
print('Valid phone:', bool(valid))

# Flags: case-insensitive
print(re.findall(r'contact', text, re.IGNORECASE))
Groups and named groups
import re

log_line = '2024-03-15 09:23:41 ERROR [auth] Login failed for user: alice'

# Named groups with (?P<name>...)
pattern = r'(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+) \[(?P<module>\w+)\] (?P<message>.+)'
m = re.match(pattern, log_line)
if m:
    print('Date:   ', m.group('date'))
    print('Level:  ', m.group('level'))
    print('Module: ', m.group('module'))
    print('Message:', m.group('message'))
    print('Dict:   ', m.groupdict())

# Non-capturing groups (?:...)
urls = ['http://example.com', 'https://secure.org', 'ftp://old.net']
for url in urls:
    m = re.match(r'(?:https?|ftp)://([\w.-]+)', url)
    if m:
        print(f'  Domain: {m.group(1)}'  # group(1) = first capturing group
Substitution, splitting, and compiling
import re

text = 'Call us at (555) 123-4567 or 555.987.6543 today!'

# re.sub β€” replace pattern
cleaned = re.sub(r'[()\s.-]', '', text)
print('Cleaned:', cleaned)

# Replace with backreference
normalized = re.sub(r'[()\s.-]+?(\d{3})[)\s.-]+(\d{3})[.-](\d{4})', r'\1-\2-\3', text)
print('Normalized:', normalized)

# re.split β€” split on pattern
sentence = 'one, two;   three | four'
words = re.split(r'[,;|]\s*', sentence)
print('Split:', words)

# Compile for reuse (faster in loops)
EMAIL_RE = re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+', re.IGNORECASE)
texts = ['alice@example.com is admin', 'no email here', 'bob@test.org rocks']
for t in texts:
    found = EMAIL_RE.findall(t)
    if found:
        print(f'  Found in "{t}": {found}')
Common patterns β€” email, URL, date, IP address
import re

PATTERNS = {
    'email':   r'[\w.+-]+@[\w-]+\.[\w.]{2,}',
    'url':     r'https?://[\w/:%#\$&\?\(\)~\.=\+\-]+',
    'date':    r'\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b',
    'phone':   r'\b\d{3}[-.]\d{3}[-.]\d{4}\b',
    'ipv4':    r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
    'hashtag': r'#[\w]+',
}

sample = '''
Email me at john.doe@example.com by 2024-03-15.
Visit https://example.com/page?id=42 for details.
Call 555-123-4567. Server IP: 192.168.1.100
Twitter: #DataScience #Python
'''

for name, pattern in PATTERNS.items():
    matches = re.findall(pattern, sample)
    if matches:
        print(f'{name:8s}: {matches}')
🏋️ Practice: Data Extractor
Write regex patterns to extract all email addresses, US phone numbers (xxx-xxx-xxxx format), and dollar amounts (e.g. $1,234.56) from the sample text below.
Starter Code
import re

text = '''
Please contact billing@company.com or support@help.org.
Call 555-123-4567 or 800-555-9999 for support.
Invoice total: $1,234.56. Discount applied: $50.00.
Admin: admin@internal.net | Helpdesk: 312-555-0100
'''

EMAIL_PATTERN = re.compile(r'')   # TODO
PHONE_PATTERN = re.compile(r'')   # TODO
MONEY_PATTERN = re.compile(r'')   # TODO

print('Emails:', EMAIL_PATTERN.findall(text))
print('Phones:', PHONE_PATTERN.findall(text))
print('Amounts:', MONEY_PATTERN.findall(text))
✅ Practice Checklist
15. Type Hints & Dataclasses

Write self-documenting, IDE-friendly code with type annotations and eliminate boilerplate from data containers with @dataclass.

Basic type hints for functions
from typing import Optional, Union, List

def greet(name: str, times: int = 1) -> str:
    return ('Hello, ' + name + '! ') * times

def parse_int(value: Union[str, int]) -> Optional[int]:
    try:
        return int(value)
    except (ValueError, TypeError):
        return None

def process(items: List[Union[int, float]]) -> float:
    return sum(items) / len(items) if items else 0.0

print(greet('Alice'))
print(greet('Bob', 3))
print(parse_int('42'))
print(parse_int('abc'))   # returns None
print(process([1, 2.5, 3, 4]))

# Python 3.10+ union syntax: int | str instead of Union[int, str]
def modern(x: int | str) -> str:
    return str(x)
print(modern(42))
Generic types β€” List, Dict, Tuple, Callable
from typing import Dict, List, Tuple, Callable, TypeVar

T = TypeVar('T')

def first(items: List[T]) -> Optional[T]:
    return items[0] if items else None

def apply_all(funcs: List[Callable[[int], int]], value: int) -> List[int]:
    return [f(value) for f in funcs]

def parse_config(raw: Dict[str, str]) -> Dict[str, int]:
    return {k: int(v) for k, v in raw.items() if v.isdigit()}

Point = Tuple[float, float]
def distance(p1: Point, p2: Point) -> float:
    return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2) ** 0.5

from typing import Optional
print(first([1, 2, 3]))          # 1
print(first([]))                 # None
print(apply_all([lambda x: x*2, lambda x: x+1], 5))  # [10, 6]
print(parse_config({'a': '10', 'b': 'hello', 'c': '5'}))
print(distance((0.0, 0.0), (3.0, 4.0)))  # 5.0
@dataclass basics β€” auto-generated methods
from dataclasses import dataclass, field
from typing import List

@dataclass
class Point:
    x: float
    y: float

    def distance_to(self, other: 'Point') -> float:
        return ((self.x - other.x)**2 + (self.y - other.y)**2)**0.5

@dataclass
class Product:
    name: str
    price: float
    tags: List[str] = field(default_factory=list)
    in_stock: bool = True

    def __post_init__(self):
        if self.price < 0:
            raise ValueError(f'Price cannot be negative: {self.price}')

p1 = Point(0, 0)
p2 = Point(3, 4)
print(p1)               # Point(x=0, y=0)
print(p2)               # Point(x=3, y=4)
print(p1 == Point(0,0)) # True β€” __eq__ auto-generated
print(p1.distance_to(p2))  # 5.0

laptop = Product('Laptop', 999.99, ['electronics', 'computers'])
print(laptop)
print(laptop.tags)

try:
    Product('Bad', -1)
except ValueError as e:
    print('Caught:', e)
Advanced dataclass β€” frozen, order, slots
from dataclasses import dataclass, field
from typing import List
import functools

@dataclass(frozen=True)   # immutable β€” can be used in sets/dict keys
class Version:
    major: int
    minor: int
    patch: int = 0

    def __str__(self) -> str:
        return f'{self.major}.{self.minor}.{self.patch}'

@dataclass(order=True)    # auto-generates __lt__, __le__, __gt__, __ge__
class Employee:
    sort_index: float = field(init=False, repr=False)
    name: str
    salary: float
    dept: str

    def __post_init__(self):
        object.__setattr__(self, 'sort_index', self.salary) if False else None
        self.sort_index = self.salary  # used for ordering

v1 = Version(1, 2, 3)
v2 = Version(1, 2, 3)
print(v1 == v2)     # True
print(hash(v1))     # hashable because frozen

try:
    v1.major = 2    # raises FrozenInstanceError
except Exception as e:
    print(type(e).__name__, e)

employees = [Employee('Carol', 95000, 'Eng'), Employee('Bob', 80000, 'Sales'), Employee('Alice', 110000, 'Eng')]
employees.sort()
for e in employees:
    print(f'  {e.name}: ${e.salary:,.0f}')
🏋️ Practice: Typed Address Book
Create a Person dataclass (name: str, age: int, email: str, phone: Optional[str] = None). Create an AddressBook dataclass holding a List[Person]. Add methods: add(person), find_by_name(name) -> Optional[Person], adults() -> List[Person] (age >= 18).
Starter Code
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class Person:
    name: str
    age: int
    email: str
    phone: Optional[str] = None

@dataclass
class AddressBook:
    contacts: List[Person] = field(default_factory=list)

    def add(self, person: Person) -> None:
        # TODO
        pass

    def find_by_name(self, name: str) -> Optional[Person]:
        # TODO
        pass

    def adults(self) -> List[Person]:
        # TODO
        pass

book = AddressBook()
book.add(Person('Alice', 30, 'alice@example.com', '555-1234'))
book.add(Person('Bob', 17, 'bob@example.com'))
book.add(Person('Carol', 25, 'carol@example.com', '555-5678'))

print(book.find_by_name('Alice'))
print('Adults:', [p.name for p in book.adults()])
✅ Practice Checklist
16. Concurrency & Async

Speed up I/O-bound tasks with threading and asyncio, CPU-bound tasks with multiprocessing, and understand the GIL. Use concurrent.futures for clean parallel execution.

Threading for I/O-bound tasks
import threading
import time
import random

results = {}
lock = threading.Lock()

def fetch_data(url_id):
    '''Simulate an I/O-bound network call.'''
    time.sleep(random.uniform(0.05, 0.15))  # simulate latency
    data = f'data_from_endpoint_{url_id}'
    with lock:
        results[url_id] = data

# Sequential (slow)
t0 = time.perf_counter()
for i in range(5):
    fetch_data(i)
t_seq = time.perf_counter() - t0
print(f'Sequential: {t_seq:.3f}s')

# Threaded (fast for I/O)
results.clear()
threads = [threading.Thread(target=fetch_data, args=(i,)) for i in range(5)]
t0 = time.perf_counter()
for th in threads: th.start()
for th in threads: th.join()
t_thread = time.perf_counter() - t0
print(f'Threaded:   {t_thread:.3f}s  ({t_seq/t_thread:.1f}x faster)')
print('Results:', list(results.keys()))
concurrent.futures ThreadPool & ProcessPool
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time, math

def cpu_task(n):
    '''CPU-bound: compute sum of first n primes.'''
    primes, count = [], 2
    while len(primes) < n:
        if all(count % p != 0 for p in primes): primes.append(count)
        count += 1
    return sum(primes)

def io_task(delay):
    time.sleep(delay)
    return f'done after {delay:.2f}s'

# ThreadPool for I/O
delays = [0.05, 0.08, 0.06, 0.07, 0.05]
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as ex:
    futures = {ex.submit(io_task, d): d for d in delays}
    for f in as_completed(futures):
        pass
print(f'ThreadPool I/O: {time.perf_counter()-t0:.3f}s (sum={sum(delays):.2f}s serial)')

# ProcessPool for CPU (bypasses GIL)
tasks = [50, 60, 55, 65, 45]
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
    results = list(ex.map(cpu_task, tasks))
print(f'ProcessPool CPU: {time.perf_counter()-t0:.2f}s')
print('Sum of primes results:', results[:3], '...')
asyncio for async I/O
import asyncio
import time

async def fetch(session_id, delay):
    '''Simulate async HTTP request.'''
    await asyncio.sleep(delay)
    return f'response_{session_id}'

async def main():
    delays = [0.1, 0.05, 0.08, 0.12, 0.06]

    # Sequential async (still fast but ordered)
    t0 = time.perf_counter()
    results = []
    for i, d in enumerate(delays):
        r = await fetch(i, d)
        results.append(r)
    print(f'Sequential async: {time.perf_counter()-t0:.3f}s')

    # Concurrent async (all at once)
    t0 = time.perf_counter()
    tasks = [fetch(i, d) for i, d in enumerate(delays)]
    results = await asyncio.gather(*tasks)
    print(f'Concurrent async: {time.perf_counter()-t0:.3f}s')
    print('Results:', results)

asyncio.run(main())
Queue-based producer-consumer
import threading
import queue
import time
import random

def producer(q, n_items):
    for i in range(n_items):
        item = f'item_{i}'
        q.put(item)
        time.sleep(random.uniform(0.01, 0.03))
    q.put(None)  # sentinel

def consumer(q, results):
    while True:
        item = q.get()
        if item is None:
            break
        # Simulate processing
        time.sleep(random.uniform(0.005, 0.015))
        results.append(item.upper())
        q.task_done()

q       = queue.Queue(maxsize=5)
results = []

t0 = time.perf_counter()
prod = threading.Thread(target=producer, args=(q, 10))
cons = threading.Thread(target=consumer, args=(q, results))
prod.start(); cons.start()
prod.join(); cons.join()
print(f'Processed {len(results)} items in {time.perf_counter()-t0:.3f}s')
print('Processed:', results[:5], '...')
🏋️ Practice: Parallel Web Scraper Simulation
Simulate fetching 15 URLs concurrently with ThreadPoolExecutor. Each 'fetch' sleeps for a random 0.05–0.3s and returns a fake HTML string. Collect results in order. Measure speedup vs sequential. Also implement a version using asyncio.gather. Report total time for both.
Starter Code
from concurrent.futures import ThreadPoolExecutor
import asyncio, time, random

URLS = [f'https://example.com/page/{i}' for i in range(15)]

def sync_fetch(url):
    time.sleep(random.uniform(0.05, 0.3))
    return f'<html>{url}</html>'

async def async_fetch(url):
    await asyncio.sleep(random.uniform(0.05, 0.3))
    return f'<html>{url}</html>'

# TODO: (1) ThreadPoolExecutor: fetch all URLs, measure time
# TODO: (2) asyncio.gather: fetch all URLs, measure time
# TODO: (3) Print speedup vs sequential (sum of delays)
✅ Practice Checklist
17. Design Patterns

Apply classic Gang-of-Four patterns in Python: Singleton, Factory, Observer, Strategy, and Decorator. Understand when and why to use each.

Singleton and Factory patterns
# Singleton: one instance per process
class DatabasePool:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.connections = []
            print('Creating new DatabasePool')
        return cls._instance

    def connect(self, host):
        self.connections.append(host)
        return f'Connected to {host}'

pool1 = DatabasePool()
pool2 = DatabasePool()
print('Same instance:', pool1 is pool2)
pool1.connect('db1.server.com')
print('Connections visible from pool2:', pool2.connections)

# Factory: create objects without knowing exact class
class Shape:
    def area(self): raise NotImplementedError

class Circle:
    def __init__(self, r): self.r = r
    def area(self): return 3.14159 * self.r**2
    def __repr__(self): return f'Circle(r={self.r})'

class Rectangle:
    def __init__(self, w, h): self.w, self.h = w, h
    def area(self): return self.w * self.h
    def __repr__(self): return f'Rectangle({self.w}x{self.h})'

def shape_factory(kind, **kwargs):
    shapes = {'circle': Circle, 'rectangle': Rectangle}
    if kind not in shapes: raise ValueError(f'Unknown shape: {kind}')
    return shapes[kind](**kwargs)

for spec in [('circle', {'r': 5}), ('rectangle', {'w': 4, 'h': 6})]:
    s = shape_factory(spec[0], **spec[1])
    print(f'{s}: area={s.area():.2f}')
Observer pattern (event system)
from typing import Callable, Dict, List

class EventBus:
    '''Simple publish-subscribe event system.'''
    def __init__(self):
        self._handlers: Dict[str, List[Callable]] = {}

    def subscribe(self, event: str, handler: Callable):
        self._handlers.setdefault(event, []).append(handler)
        return self  # fluent API

    def publish(self, event: str, **data):
        for handler in self._handlers.get(event, []):
            handler(**data)

    def unsubscribe(self, event: str, handler: Callable):
        if event in self._handlers:
            self._handlers[event] = [h for h in self._handlers[event] if h != handler]

# Usage
bus = EventBus()

def on_order_placed(order_id, amount, user):
    print(f'[EMAIL]   Order #{order_id} placed by {user}: ${amount:.2f}')

def on_order_placed_analytics(order_id, amount, **_):
    print(f'[ANALYTICS] Recorded order #{order_id}, revenue=${amount:.2f}')

def on_order_placed_inventory(order_id, **_):
    print(f'[INVENTORY] Reducing stock for order #{order_id}')

bus.subscribe('order.placed', on_order_placed)
bus.subscribe('order.placed', on_order_placed_analytics)
bus.subscribe('order.placed', on_order_placed_inventory)

# Trigger event
bus.publish('order.placed', order_id=1042, amount=149.99, user='Alice')
Strategy pattern
from abc import ABC, abstractmethod
from typing import List

class SortStrategy(ABC):
    @abstractmethod
    def sort(self, data: list) -> list: ...

class BubbleSort(SortStrategy):
    def sort(self, data):
        arr = data.copy()
        n = len(arr)
        for i in range(n):
            for j in range(n-i-1):
                if arr[j] > arr[j+1]:
                    arr[j], arr[j+1] = arr[j+1], arr[j]
        return arr

class MergeSort(SortStrategy):
    def sort(self, data):
        if len(data) <= 1: return data[:]
        mid = len(data) // 2
        L, R = self.sort(data[:mid]), self.sort(data[mid:])
        result, i, j = [], 0, 0
        while i < len(L) and j < len(R):
            if L[i] <= R[j]: result.append(L[i]); i += 1
            else:             result.append(R[j]); j += 1
        return result + L[i:] + R[j:]

class Sorter:
    def __init__(self, strategy: SortStrategy):
        self._strategy = strategy

    def set_strategy(self, strategy: SortStrategy):
        self._strategy = strategy

    def sort(self, data: list) -> list:
        return self._strategy.sort(data)

import time, random
data = random.sample(range(1000), 20)
sorter = Sorter(BubbleSort())
print('Bubble:', sorter.sort(data)[:5], '...')

sorter.set_strategy(MergeSort())  # swap strategy at runtime
print('Merge: ', sorter.sort(data)[:5], '...')
Decorator and Mixin patterns
import time, functools

# Function decorator: retry with backoff
def retry(max_attempts=3, delay=0.01):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts+1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts: raise
                    print(f'  Attempt {attempt} failed: {e}. Retrying...')
                    time.sleep(delay)
        return wrapper
    return decorator

import random
@retry(max_attempts=4)
def flaky_api_call():
    if random.random() < 0.6: raise ConnectionError('Timeout')
    return 'Success!'

random.seed(42)
print('Result:', flaky_api_call())

# Mixin pattern: add logging capability to any class
class LogMixin:
    def log(self, msg): print(f'[{self.__class__.__name__}] {msg}')

class TimeMixin:
    def timed(self, func, *args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        self.log(f'{func.__name__} took {(time.perf_counter()-t0)*1000:.2f}ms')
        return result

class DataProcessor(LogMixin, TimeMixin):
    def process(self, data):
        import math
        return [math.sqrt(abs(x)) for x in data]

p = DataProcessor()
p.log('Starting processing')
result = p.timed(p.process, list(range(-100, 100)))
print('Sample output:', [round(x,2) for x in result[:5]])
🏋️ Practice: Plugin System with Factory + Observer
Build a data export plugin system: (1) Factory creates exporter objects (CSV, JSON, Parquet) based on format string. Each implements export(data, path). (2) Observer pattern: attach at least 2 listeners (Logger, FileSizeChecker) that react to 'export_complete' events. Test with a list of 100 dicts as data.
Starter Code
from abc import ABC, abstractmethod

class BaseExporter(ABC):
    @abstractmethod
    def export(self, data, path): ...

class CSVExporter(BaseExporter):
    def export(self, data, path):
        # TODO: write CSV using csv module or simple join
        pass

class JSONExporter(BaseExporter):
    def export(self, data, path):
        # TODO: write JSON using json module
        pass

def exporter_factory(fmt: str) -> BaseExporter:
    # TODO: return correct exporter based on fmt
    pass

# TODO: EventBus or simple list of observers
# TODO: Logger observer: print 'Exported N rows to path'
# TODO: FileSizeChecker observer: print file size

data = [{'id': i, 'value': i*2, 'name': f'item_{i}'} for i in range(100)]
# TODO: export to 'output.csv' and 'output.json', trigger events
✅ Practice Checklist
18. Testing with pytest

Write unit tests, parametrized tests, fixtures, and mocks with pytest. Apply TDD principles and measure code coverage.

Basic pytest structure and assertions
# test_math_utils.py  (run with: pytest test_math_utils.py -v)
# Here we demonstrate by running inline
import traceback

def add(a, b): return a + b
def divide(a, b):
    if b == 0: raise ZeroDivisionError('Cannot divide by zero')
    return a / b
def is_prime(n):
    if n < 2: return False
    return all(n % i != 0 for i in range(2, int(n**0.5)+1))

# --- Tests ---
def test_add():
    assert add(2, 3) == 5
    assert add(-1, 1) == 0
    assert add(0, 0) == 0

def test_divide():
    assert divide(10, 2) == 5.0
    assert abs(divide(1, 3) - 0.333) < 0.001

def test_divide_by_zero():
    try:
        divide(5, 0)
        assert False, 'Should have raised'
    except ZeroDivisionError:
        pass  # expected

def test_is_prime():
    primes     = [2, 3, 5, 7, 11, 13]
    non_primes = [0, 1, 4, 6, 9, 15]
    assert all(is_prime(p) for p in primes)
    assert not any(is_prime(n) for n in non_primes)

# Run all tests
tests = [test_add, test_divide, test_divide_by_zero, test_is_prime]
for t in tests:
    try: t(); print(f'PASS {t.__name__}')
    except AssertionError as e: print(f'FAIL {t.__name__}: {e}')
Fixtures and parametrize
# Demonstrate pytest fixture and parametrize patterns
import os, tempfile

# === Fixture pattern ===
class FakeDB:
    def __init__(self):
        self.data = {}
    def insert(self, key, val): self.data[key] = val
    def get(self, key): return self.data.get(key)
    def count(self): return len(self.data)

# In pytest: @pytest.fixture
def db_fixture():
    '''Provide a fresh DB for each test.'''
    return FakeDB()

# === Parametrize pattern ===
# In pytest: @pytest.mark.parametrize('a,b,expected', [...])
def check_multiply(a, b, expected):
    assert a * b == expected, f'{a}*{b} should be {expected}'

params = [(2, 3, 6), (0, 100, 0), (-1, -1, 1), (7, 8, 56)]
for a, b, exp in params:
    try: check_multiply(a, b, exp); print(f'PASS multiply({a},{b})={exp}')
    except AssertionError as e: print(f'FAIL: {e}')

# === Fixture with temp file ===
def test_file_write():
    with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
        f.write('hello world')
        fname = f.name
    try:
        content = open(fname).read()
        assert content == 'hello world'
        print('PASS test_file_write')
    finally:
        os.unlink(fname)

db = db_fixture()
db.insert('user1', {'name': 'Alice'})
assert db.count() == 1
assert db.get('user1')['name'] == 'Alice'
print('PASS db fixture test')
test_file_write()
Mocking with unittest.mock
from unittest.mock import patch, MagicMock, call
import json

# Function that depends on an external service
def fetch_user(user_id: int) -> dict:
    import urllib.request
    url = f'https://api.example.com/users/{user_id}'
    with urllib.request.urlopen(url) as resp:
        return json.loads(resp.read())

def process_user(user_id: int) -> str:
    user = fetch_user(user_id)
    return f'{user["name"]} ({user["email"]})'

# Test without hitting real API
mock_response = MagicMock()
mock_response.read.return_value = json.dumps({'name': 'Alice', 'email': 'alice@co.com'}).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)

with patch('urllib.request.urlopen', return_value=mock_response):
    result = process_user(42)
    print(f'PASS: process_user(42) = {result!r}')

# Test exception handling
def robust_fetch(user_id):
    try:
        return fetch_user(user_id)
    except Exception as e:
        return {'error': str(e)}

with patch('urllib.request.urlopen', side_effect=ConnectionError('Network down')):
    r = robust_fetch(99)
    assert 'error' in r
    print(f'PASS: error handled: {r}')

# Verify mock was called correctly
mock_fn = MagicMock(return_value=42)
mock_fn(1, 2, key='val')
mock_fn(3, 4)
mock_fn.assert_called_with(3, 4)
print('PASS: mock call verification')
Property-based testing with hypothesis
try:
    from hypothesis import given, strategies as st, settings

    # Property: sort is idempotent
    @given(st.lists(st.integers(), max_size=50))
    @settings(max_examples=200)
    def test_sort_idempotent(lst):
        sorted_once  = sorted(lst)
        sorted_twice = sorted(sorted_lst := sorted(lst))
        assert sorted_once == sorted_twice

    # Property: reversed reversed = original
    @given(st.lists(st.integers(), max_size=100))
    def test_reverse_involution(lst):
        assert list(reversed(list(reversed(lst)))) == lst

    # Property: split+join roundtrip
    @given(st.text(alphabet='abcdefghijklmnopqrstuvwxyz ', min_size=1, max_size=50))
    def test_split_join_roundtrip(s):
        words = s.split()
        rejoined = ' '.join(words)
        assert rejoined == ' '.join(s.split())

    test_sort_idempotent()
    test_reverse_involution()
    test_split_join_roundtrip()
    print('PASS: all hypothesis property tests')

except ImportError:
    print('pip install hypothesis')
    print('Hypothesis generates hundreds of random inputs automatically.')
    print('Properties to test: commutativity, idempotence, round-trips, invariants.')

    # Demo without hypothesis: manual property tests
    import random
    random.seed(42)
    for _ in range(100):
        lst = [random.randint(-100, 100) for _ in range(random.randint(0, 30))]
        assert sorted(sorted(lst)) == sorted(lst), 'Sort not idempotent!'
    print('PASS: manual sort idempotence test (100 random lists)')
🏋️ Practice: Test a Data Validation Class
Implement and test a DataValidator class with methods: validate_types(df) checks column dtypes, validate_ranges(df, rules) checks min/max per column, validate_no_nulls(df, cols) checks specific columns. Write at least 6 tests covering: passing validation, each failure mode, and edge cases (empty df, single row).
Starter Code
import pandas as pd
import numpy as np

class DataValidator:
    def validate_types(self, df: pd.DataFrame, expected: dict) -> list:
        '''Return list of (col, actual, expected) for mismatches.'''
        # TODO: compare df[col].dtype.kind vs expected type chars
        pass

    def validate_ranges(self, df: pd.DataFrame, rules: dict) -> list:
        '''rules = {col: (min, max)}. Return list of violations.'''
        # TODO: for each col, check if any values outside range
        pass

    def validate_no_nulls(self, df: pd.DataFrame, cols: list) -> list:
        '''Return cols that contain nulls.'''
        # TODO: check each column for nulls
        pass

# Test functions
def test_valid_types(): ...  # TODO
def test_invalid_type(): ...  # TODO
def test_range_pass(): ...   # TODO
def test_range_fail(): ...   # TODO
def test_no_nulls_pass(): ...  # TODO
def test_no_nulls_fail(): ...  # TODO

# Run all
for t in [test_valid_types, test_invalid_type, test_range_pass,
          test_range_fail, test_no_nulls_pass, test_no_nulls_fail]:
    t()
✅ Practice Checklist
19. Functional Programming

Python supports functional programming with map(), filter(), reduce(), and functools. These let you transform data declaratively without explicit loops.

map() and filter() for data transformation
nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# map() applies a function to every element
squares = list(map(lambda x: x**2, nums))
print("Squares:", squares)

# filter() keeps elements where function returns True
evens = list(filter(lambda x: x % 2 == 0, nums))
print("Evens:", evens)

# Chaining: square the even numbers
result = list(map(lambda x: x**2, filter(lambda x: x % 2 == 0, nums)))
print("Squared evens:", result)

# map with multiple iterables
a, b = [1, 2, 3], [10, 20, 30]
sums = list(map(lambda x, y: x + y, a, b))
print("Pairwise sums:", sums)
functools.reduce() and partial()
from functools import reduce, partial

nums = [1, 2, 3, 4, 5]

# reduce() accumulates a result across an iterable
total = reduce(lambda acc, x: acc + x, nums)
print("Sum via reduce:", total)

product = reduce(lambda acc, x: acc * x, nums)
print("Product:", product)

# partial() freezes some arguments of a function
def power(base, exp):
    return base ** exp

square = partial(power, exp=2)
cube   = partial(power, exp=3)

print("5 squared:", square(5))
print("3 cubed:  ", cube(3))

# partial with data processing
def scale(value, factor=1.0, offset=0.0):
    return value * factor + offset

normalize = partial(scale, factor=0.1, offset=-0.5)
data = [0, 5, 10, 15, 20]
print("Normalized:", list(map(normalize, data)))
Higher-order functions and function pipelines
from functools import reduce

# A function that returns a function
def make_multiplier(n):
    return lambda x: x * n

double = make_multiplier(2)
triple = make_multiplier(3)

print("double(7):", double(7))
print("triple(7):", triple(7))

# Build a pipeline of transformations
def pipeline(*funcs):
    def apply(data):
        return reduce(lambda v, f: f(v), funcs, data)
    return apply

process = pipeline(
    lambda x: [v for v in x if v > 0],    # keep positives
    lambda x: list(map(lambda v: v**0.5, x)),  # sqrt
    lambda x: [round(v, 2) for v in x],   # round
)

data = [-3, 4, 9, -1, 16, 25]
print("Input:", data)
print("Output:", process(data))
💼 Real-World: Sales Data Cleaner
A data pipeline uses functional tools to clean and transform a list of raw sales records without mutating state.
from functools import reduce, partial

records = [
    {"item": "apple",  "qty": 3,  "price": 1.20, "valid": True},
    {"item": "banana", "qty": -1, "price": 0.50, "valid": False},
    {"item": "cherry", "qty": 10, "price": 2.00, "valid": True},
]

# Filter valid records
valid = list(filter(lambda r: r["valid"] and r["qty"] > 0, records))

# Map to compute total
with_total = list(map(lambda r: {**r, "total": r["qty"] * r["price"]}, valid))

# Reduce to grand total
grand = reduce(lambda acc, r: acc + r["total"], with_total, 0.0)

for r in with_total:
    print(f'  {r["item"]:8s}: ${r["total"]:.2f}')
print(f"Grand total: ${grand:.2f}")
🏋️ Practice: Functional Data Processor
Write a function process_data(numbers) that uses ONLY map(), filter(), and reduce() (no loops): remove negatives, multiply each by 3, return the sum. Then create a partial called process_small that pre-filters values below 100 before calling process_data.
Starter Code
from functools import reduce, partial

def process_data(numbers):
    # Step 1: filter out negatives with filter()
    # Step 2: multiply each by 3 with map()
    # Step 3: sum with reduce()
    pass

# Test
print(process_data([1, -2, 3, -4, 5]))  # expect 27

def keep_small(numbers, limit=100):
    return [n for n in numbers if abs(n) < limit]

process_small = partial(process_data, ...)  # TODO: use partial with keep_small
✅ Practice Checklist
20. Itertools in Depth

The itertools module provides fast, memory-efficient tools for working with iterables. Essential for combinatorics, grouping, and chaining data streams.

chain, islice, cycle, repeat for sequence control
import itertools

# chain: join multiple iterables
combined = list(itertools.chain([1, 2], [3, 4], [5]))
print("chain:", combined)

# islice: slice an iterable (works on generators too)
first5 = list(itertools.islice(range(100), 5))
print("islice first 5:", first5)

skip3_take4 = list(itertools.islice(range(100), 3, 7))
print("islice [3:7]:", skip3_take4)

# cycle: repeat sequence infinitely β€” take 7
colors = list(itertools.islice(itertools.cycle(['R', 'G', 'B']), 7))
print("cycle 7:", colors)

# repeat: repeat a value n times
zeros = list(itertools.repeat(0, 5))
print("repeat:", zeros)

# accumulate: running totals
import itertools
data = [1, 3, 2, 5, 4]
running = list(itertools.accumulate(data))
print("accumulate (sum):", running)
combinations, permutations, product
import itertools

items = ['A', 'B', 'C']

# combinations: order does not matter, no repeats
combs = list(itertools.combinations(items, 2))
print("combinations(2):", combs)

# permutations: order matters
perms = list(itertools.permutations(items, 2))
print("permutations(2):", perms)

# product: Cartesian product (like nested loops)
colors = ['red', 'blue']
sizes  = ['S', 'M', 'L']
variants = list(itertools.product(colors, sizes))
print("product:", variants)

# product with repeat: like rolling dice twice
dice = list(itertools.product(range(1, 4), repeat=2))
print("dice pairs:", dice[:6], "...")

print(f"Combinations: {len(combs)}, Permutations: {len(perms)}, Product: {len(dice)}")
groupby and takewhile/dropwhile
import itertools

# groupby: group consecutive elements by a key
# NOTE: input must be sorted by the key first!
data = [
    {"dept": "eng",  "name": "Alice"},
    {"dept": "eng",  "name": "Bob"},
    {"dept": "sales","name": "Carol"},
    {"dept": "sales","name": "Dave"},
    {"dept": "hr",   "name": "Eve"},
]
data.sort(key=lambda x: x["dept"])

for dept, members in itertools.groupby(data, key=lambda x: x["dept"]):
    names = [m["name"] for m in members]
    print(f"  {dept}: {names}")

# takewhile: take elements while condition is True
nums = [2, 4, 6, 1, 8, 10]
taken = list(itertools.takewhile(lambda x: x % 2 == 0, nums))
print("takewhile even:", taken)  # stops at 1

# dropwhile: skip elements while condition is True
dropped = list(itertools.dropwhile(lambda x: x % 2 == 0, nums))
print("dropwhile even:", dropped)  # starts from 1
💼 Real-World: Grid Search Parameter Iterator
A machine learning hyperparameter search uses itertools.product to enumerate all combinations of parameters without nested loops.
import itertools

param_grid = {
    "learning_rate": [0.01, 0.1, 0.001],
    "max_depth":     [3, 5, 7],
    "n_estimators":  [50, 100],
}

keys = list(param_grid.keys())
values = list(param_grid.values())

configs = list(itertools.product(*values))
print(f"Total configs: {len(configs)}")

for i, combo in enumerate(itertools.islice(configs, 3)):
    cfg = dict(zip(keys, combo))
    print(f"  Config {i+1}: {cfg}")
print("  ...")
🏋️ Practice: Itertools Combinatorics
Write a function all_pairs(items) using itertools.combinations that returns all unique pairs. Write team_schedules(teams) using itertools.permutations(teams, 2) for home/away matchups. Write batch(iterable, n) using islice that yields chunks of size n.
Starter Code
import itertools

def all_pairs(items):
    # Return list of all unique 2-element combinations
    pass

def team_schedules(teams):
    # Return list of (home, away) tuples for all matchups
    pass

def batch(iterable, n):
    # Yield successive n-sized chunks from iterable
    it = iter(iterable)
    while True:
        chunk = list(itertools.islice(it, n))
        if not chunk:
            break
        yield chunk

# Tests
print(all_pairs(['A','B','C','D']))   # 6 pairs
print(len(team_schedules(['X','Y','Z'])))  # 6 matchups
print(list(batch(range(10), 3)))     # [[0,1,2],[3,4,5],[6,7,8],[9]]
✅ Practice Checklist
21. Closures & Scoping

Python resolves names using the LEGB rule (Local, Enclosing, Global, Built-in). Closures capture variables from enclosing scopes and are the foundation of decorators and factories.

LEGB scoping rule
x = "global"

def outer():
    x = "enclosing"

    def inner():
        x = "local"
        print("inner sees:", x)       # local

    inner()
    print("outer sees:", x)           # enclosing

outer()
print("module sees:", x)              # global

# Built-in scope: Python's built-in names (len, print, etc.)
print("built-in len:", len([1,2,3]))  # 3

# global keyword β€” modify a global from inside a function
counter = 0
def increment():
    global counter
    counter += 1

increment()
increment()
print("counter:", counter)  # 2

# nonlocal keyword β€” modify an enclosing variable
def make_counter():
    count = 0
    def inc():
        nonlocal count
        count += 1
        return count
    return inc

c = make_counter()
print(c(), c(), c())  # 1 2 3
Closure factories
# A closure captures variables from its defining scope

def make_adder(n):
    # n is captured in the closure
    def add(x):
        return x + n
    return add

add5  = make_adder(5)
add10 = make_adder(10)
print("add5(3):", add5(3))   # 8
print("add10(3):", add10(3)) # 13

# Each closure has its own cell
print("Different objects:", add5 is not add10)  # True

# Closure with mutable state
def make_accumulator():
    total = 0
    def accumulate(value):
        nonlocal total
        total += value
        return total
    return accumulate

acc = make_accumulator()
for v in [10, 25, 5, 60]:
    print(f"  +{v} -> running total: {acc(v)}")
Late binding and closure gotcha
# Common closure gotcha: late binding in loops
# All closures share the SAME variable i

funcs_bad = [lambda: i for i in range(5)]
print("Late binding:", [f() for f in funcs_bad])  # [4, 4, 4, 4, 4]!

# Fix 1: capture current value as default argument
funcs_good = [lambda i=i: i for i in range(5)]
print("Default arg fix:", [f() for f in funcs_good])  # [0, 1, 2, 3, 4]

# Fix 2: use a factory function
def make_func(i):
    def f():
        return i
    return f

funcs_factory = [make_func(i) for i in range(5)]
print("Factory fix:", [f() for f in funcs_factory])  # [0, 1, 2, 3, 4]

# Inspecting closure cells
import inspect
def outer(x):
    def inner():
        return x * 2
    return inner

fn = outer(7)
print("Closure cell value:", fn.__closure__[0].cell_contents)  # 7
💼 Real-World: Configurable Validator Factory
A data validation system uses closures to create reusable validators with baked-in limits, avoiding class overhead.
def make_range_validator(min_val, max_val, field="value"):
    def validate(x):
        if not (min_val <= x <= max_val):
            raise ValueError(f"{field} {x} out of range [{min_val}, {max_val}]")
        return True
    return validate

def make_str_validator(max_len, allowed_chars=None):
    def validate(s):
        if len(s) > max_len:
            raise ValueError(f"String too long: {len(s)} > {max_len}")
        if allowed_chars and not all(c in allowed_chars for c in s):
            raise ValueError(f"Invalid characters in: {s!r}")
        return True
    return validate

validate_age   = make_range_validator(0, 120, "age")
validate_score = make_range_validator(0.0, 1.0, "score")
validate_name  = make_str_validator(50, allowed_chars="abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ")

tests = [(validate_age, 25), (validate_score, 0.85), (validate_name, "Alice Smith")]
for validator, val in tests:
    try:
        print(f"  OK: {val!r}")
        validator(val)
    except ValueError as e:
        print(f"  FAIL: {e}")
🏋️ Practice: Memoize with Closure
Write a function memoize(func) that returns a new function. The new function caches results in a dict (stored in a closure). It should handle any positional arguments as the cache key. Test it with a slow Fibonacci function and verify the cache speeds it up.
Starter Code
def memoize(func):
    cache = {}   # closure variable
    def wrapper(*args):
        # TODO: if args in cache, return cached result
        # TODO: otherwise, call func(*args), store, return
        pass
    return wrapper

@memoize
def fib(n):
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

import time
t0 = time.time()
print(fib(35))     # should be fast after memoize
print(f"Time: {time.time()-t0:.4f}s")
✅ Practice Checklist
22. Decorators in Depth

Decorators wrap functions or classes to add behavior without modifying their source. Master stacked, parameterized, and class-based decorators.

Stacked decorators and functools.wraps
import functools, time

def timer(func):
    @functools.wraps(func)  # preserves __name__, __doc__
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        print(f"[timer] {func.__name__} took {(time.perf_counter()-t0)*1000:.2f}ms")
        return result
    return wrapper

def logger(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"[logger] calling {func.__name__} with args={args}, kwargs={kwargs}")
        return func(*args, **kwargs)
    return wrapper

# Decorators apply bottom-up: logger wraps timer-wrapped function
@logger
@timer
def compute(n):
    return sum(range(n))

result = compute(100_000)
print("Result:", result)
print("Name preserved:", compute.__name__)  # compute, not wrapper
Parameterized decorators (decorator factories)
import functools

def retry(times=3, exceptions=(Exception,)):
    # Outer function receives decorator arguments
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, times + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    print(f"  Attempt {attempt} failed: {e}")
                    if attempt == times:
                        raise
        return wrapper
    return decorator

attempt_count = 0

@retry(times=3, exceptions=(ValueError,))
def unstable_fetch(url):
    global attempt_count
    attempt_count += 1
    if attempt_count < 3:
        raise ValueError(f"Connection failed (attempt {attempt_count})")
    return f"Data from {url}"

result = unstable_fetch("https://api.example.com")
print("Got:", result)
Class-based decorators
import functools

class CallCounter:
    # A class-based decorator that counts calls
    def __init__(self, func):
        functools.update_wrapper(self, func)
        self.func  = func
        self.count = 0

    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"[CallCounter] {self.func.__name__} called {self.count}x")
        return self.func(*args, **kwargs)

@CallCounter
def add(a, b):
    return a + b

add(1, 2)
add(3, 4)
add(5, 6)
print("Total calls:", add.count)  # 3

# Decorator that works on both functions and methods
class validate_positive:
    def __init__(self, func):
        functools.update_wrapper(self, func)
        self.func = func

    def __call__(self, *args, **kwargs):
        for arg in args:
            if isinstance(arg, (int, float)) and arg < 0:
                raise ValueError(f"Expected positive, got {arg}")
        return self.func(*args, **kwargs)

@validate_positive
def sqrt(x):
    return x ** 0.5

print(sqrt(9))   # 3.0
try:    sqrt(-1)
except ValueError as e: print("Caught:", e)
💼 Real-World: Rate Limiter Decorator
A web scraper applies a rate-limiting decorator to avoid overloading target servers, with configurable calls-per-second.
import functools, time

def rate_limit(calls_per_second=1):
    min_interval = 1.0 / calls_per_second
    last_called = [0.0]  # mutable container to allow mutation in closure

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            wait = min_interval - elapsed
            if wait > 0:
                print(f"  Rate limit: waiting {wait:.2f}s")
                time.sleep(wait)
            last_called[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(calls_per_second=2)
def fetch(url):
    return f"Response from {url}"

urls = ["http://a.com", "http://b.com", "http://c.com"]
for url in urls:
    print(fetch(url))
🏋️ Practice: Cache Decorator with TTL
Write a parameterized decorator @cache(ttl=60) that caches function results for ttl seconds. After the TTL expires, re-call the function and refresh the cache. Use a dict with (args, timestamp) as cache entries. Test with a function that returns time.time() so you can observe expiry.
Starter Code
import functools, time

def cache(ttl=60):
    def decorator(func):
        store = {}  # {args: (result, timestamp)}
        @functools.wraps(func)
        def wrapper(*args):
            now = time.time()
            if args in store:
                result, ts = store[args]
                if now - ts < ttl:
                    print(f"  [cache hit] age={now-ts:.1f}s")
                    return result
            # TODO: call func, store result with timestamp, return result
            pass
        return wrapper
    return decorator

@cache(ttl=2)
def get_value(key):
    return f"{key}:{time.time():.2f}"

print(get_value("x"))
print(get_value("x"))  # should be cache hit
time.sleep(2.1)
print(get_value("x"))  # should re-fetch after TTL
✅ Practice Checklist
23. Abstract Base Classes & Protocols

ABCs enforce interface contracts at class creation time. Protocols (PEP 544) enable structural subtyping β€” duck typing with type-checker support.

abc.ABC and @abstractmethod
from abc import ABC, abstractmethod

class Shape(ABC):
    @abstractmethod
    def area(self) -> float:
        pass

    @abstractmethod
    def perimeter(self) -> float:
        pass

    def describe(self):
        # Concrete method shared by all subclasses
        return f"{type(self).__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"

class Circle(Shape):
    def __init__(self, r): self.r = r
    def area(self): return 3.14159 * self.r ** 2
    def perimeter(self): return 2 * 3.14159 * self.r

class Rectangle(Shape):
    def __init__(self, w, h): self.w, self.h = w, h
    def area(self): return self.w * self.h
    def perimeter(self): return 2 * (self.w + self.h)

for shape in [Circle(5), Rectangle(4, 6)]:
    print(shape.describe())

# Cannot instantiate ABC directly
try:
    s = Shape()
except TypeError as e:
    print("Cannot instantiate:", e)
typing.Protocol for structural subtyping
from typing import Protocol, runtime_checkable

@runtime_checkable
class Drawable(Protocol):
    def draw(self) -> str: ...
    def get_color(self) -> str: ...

# Any class with draw() and get_color() satisfies Drawable
# No explicit inheritance required!
class Circle:
    def draw(self): return "O"
    def get_color(self): return "red"

class Square:
    def draw(self): return "[]"
    def get_color(self): return "blue"

class TextLabel:
    def draw(self): return "TEXT"
    def get_color(self): return "black"

def render(item: Drawable) -> str:
    return f"Drawing {item.draw()} in {item.get_color()}"

shapes = [Circle(), Square(), TextLabel()]
for s in shapes:
    print(render(s))
    print(f"  isinstance check: {isinstance(s, Drawable)}")
__subclasshook__ and virtual subclasses
from abc import ABC, abstractmethod

class Sized(ABC):
    @abstractmethod
    def __len__(self): ...

    @classmethod
    def __subclasshook__(cls, C):
        # Automatically treat ANY class with __len__ as Sized
        if cls is Sized:
            if any("__len__" in B.__dict__ for B in C.__mro__):
                return True
        return NotImplemented

# list, dict, str all have __len__ β€” they are virtual subclasses
print(isinstance([], Sized))    # True
print(isinstance({}, Sized))    # True
print(isinstance("hi", Sized))  # True
print(isinstance(42, Sized))    # False

# Register a virtual subclass without inheritance
class SparseVector:
    def __init__(self, data): self.data = data
    def __len__(self): return len(self.data)

print(isinstance(SparseVector({0: 1.0}), Sized))  # True
print(issubclass(SparseVector, Sized))             # True
💼 Real-World: Plugin Architecture with ABC
A data pipeline enforces that all data sources implement a common interface using ABC, then iterates over any registered source.
from abc import ABC, abstractmethod
from typing import Iterator, Any

class DataSource(ABC):
    @abstractmethod
    def connect(self) -> bool: ...
    @abstractmethod
    def read(self) -> Iterator[Any]: ...
    @abstractmethod
    def close(self) -> None: ...

    def stream(self):
        if self.connect():
            yield from self.read()
            self.close()

class CSVSource(DataSource):
    def __init__(self, rows):
        self.rows = rows
    def connect(self):
        print("CSV: opening"); return True
    def read(self):
        return iter(self.rows)
    def close(self):
        print("CSV: closed")

class APISource(DataSource):
    def __init__(self, data):
        self.data = data
    def connect(self):
        print("API: authenticated"); return True
    def read(self):
        return iter(self.data)
    def close(self):
        print("API: session ended")

for src in [CSVSource([1,2,3]), APISource(["a","b"])]:
    for record in src.stream():
        print(" ", record)
🏋️ Practice: Serializable Protocol
Define a Protocol called Serializable with methods to_dict() -> dict and classmethod from_dict(cls, d: dict). Implement it on a Product(name, price, qty) class. Write a function save_all(items) that checks isinstance(item, Serializable) before converting each item to dict.
Starter Code
from typing import Protocol, runtime_checkable
from dataclasses import dataclass

@runtime_checkable
class Serializable(Protocol):
    def to_dict(self) -> dict: ...
    # Note: classmethods in Protocols are tricky β€” just include to_dict for now

@dataclass
class Product:
    name: str
    price: float
    qty: int

    def to_dict(self):
        # TODO: return {"name": ..., "price": ..., "qty": ...}
        pass

    @classmethod
    def from_dict(cls, d: dict):
        # TODO: return cls(d["name"], d["price"], d["qty"])
        pass

def save_all(items):
    results = []
    for item in items:
        if isinstance(item, Serializable):
            results.append(item.to_dict())
        else:
            print(f"Skipped: {item!r} is not Serializable")
    return results

products = [Product("apple", 1.2, 50), Product("banana", 0.5, 200)]
print(save_all(products))
✅ Practice Checklist
24. Descriptors & Properties

Descriptors control attribute access via __get__, __set__, __delete__. The property() built-in is the most common descriptor. __slots__ reduces memory overhead.

property getter, setter, deleter
class Temperature:
    def __init__(self, celsius=0):
        self._celsius = celsius  # private storage

    @property
    def celsius(self):
        return self._celsius

    @celsius.setter
    def celsius(self, value):
        if value < -273.15:
            raise ValueError(f"Temperature {value} below absolute zero!")
        self._celsius = value

    @celsius.deleter
    def celsius(self):
        print("Resetting temperature to 0")
        self._celsius = 0

    @property
    def fahrenheit(self):
        # Read-only computed property
        return self._celsius * 9/5 + 32

t = Temperature(25)
print(f"{t.celsius}C = {t.fahrenheit}F")

t.celsius = 100
print(f"Boiling: {t.celsius}C = {t.fahrenheit}F")

del t.celsius
print(f"Reset: {t.celsius}C")

try:
    t.celsius = -300
except ValueError as e:
    print("Caught:", e)
Descriptor protocol (__get__, __set__, __delete__)
class Validated:
    # A reusable descriptor for validated attributes
    def __init__(self, min_val=None, max_val=None):
        self.min_val = min_val
        self.max_val = max_val
        self.name = None  # set by __set_name__

    def __set_name__(self, owner, name):
        self.name = name  # called when class is defined

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self  # class-level access returns descriptor itself
        return obj.__dict__.get(self.name, None)

    def __set__(self, obj, value):
        if self.min_val is not None and value < self.min_val:
            raise ValueError(f"{self.name} must be >= {self.min_val}, got {value}")
        if self.max_val is not None and value > self.max_val:
            raise ValueError(f"{self.name} must be <= {self.max_val}, got {value}")
        obj.__dict__[self.name] = value

class Person:
    age    = Validated(min_val=0, max_val=150)
    salary = Validated(min_val=0)

    def __init__(self, name, age, salary):
        self.name   = name
        self.age    = age
        self.salary = salary

p = Person("Alice", 30, 75000)
print(f"{p.name}: age={p.age}, salary={p.salary}")
try:
    p.age = -5
except ValueError as e:
    print("Caught:", e)
__slots__ for memory efficiency
import sys

class PointNormal:
    def __init__(self, x, y):
        self.x, self.y = x, y

class PointSlots:
    __slots__ = ('x', 'y')   # declare allowed attributes
    def __init__(self, x, y):
        self.x, self.y = x, y

n = PointNormal(1.0, 2.0)
s = PointSlots(1.0, 2.0)

print(f"Without slots: {sys.getsizeof(n)} bytes, has __dict__: {hasattr(n, '__dict__')}")
print(f"With    slots: {sys.getsizeof(s)} bytes, has __dict__: {hasattr(s, '__dict__')}")

# Slots prevents adding arbitrary attributes
try:
    s.z = 3.0
except AttributeError as e:
    print("Cannot add:", e)

# Memory comparison with many instances
normal_mem = sum(sys.getsizeof(PointNormal(i, i)) for i in range(1000))
slots_mem  = sum(sys.getsizeof(PointSlots(i, i))  for i in range(1000))
print(f"1000 objects β€” normal: {normal_mem} bytes, slots: {slots_mem} bytes")
print(f"Slots saves: {normal_mem - slots_mem} bytes ({(1-slots_mem/normal_mem)*100:.1f}%)")
💼 Real-World: Validated Configuration Class
A configuration system uses descriptors to validate settings when they are set, providing clear error messages without if-statement clutter in __init__.
class TypedAttr:
    def __init__(self, expected_type, default=None):
        self.expected_type = expected_type
        self.default = default
        self.name = None

    def __set_name__(self, owner, name):
        self.name = name

    def __get__(self, obj, objtype=None):
        if obj is None: return self
        return obj.__dict__.get(self.name, self.default)

    def __set__(self, obj, value):
        if not isinstance(value, self.expected_type):
            raise TypeError(
                f"{self.name} must be {self.expected_type.__name__}, "
                f"got {type(value).__name__}"
            )
        obj.__dict__[self.name] = value

class AppConfig:
    host     = TypedAttr(str, "localhost")
    port     = TypedAttr(int, 8080)
    debug    = TypedAttr(bool, False)
    timeout  = TypedAttr(float, 30.0)

cfg = AppConfig()
cfg.host    = "0.0.0.0"
cfg.port    = 443
cfg.debug   = True
cfg.timeout = 5.0

print(f"Config: {cfg.host}:{cfg.port} debug={cfg.debug} timeout={cfg.timeout}s")

try:
    cfg.port = "8080"  # wrong type!
except TypeError as e:
    print("Caught:", e)
🏋️ Practice: Unit-Enforced Measurement
Create a descriptor class UnitFloat(unit, min_val, max_val) that stores a float and records its unit. On __get__, return a namedtuple (value, unit). Create a class Measurement with descriptors for temperature (unit='C', min=-273.15), pressure (unit='Pa', min=0), and humidity (unit='%', min=0, max=100).
Starter Code
from collections import namedtuple

class UnitFloat:
    Reading = namedtuple("Reading", ["value", "unit"])

    def __init__(self, unit, min_val=None, max_val=None):
        self.unit    = unit
        self.min_val = min_val
        self.max_val = max_val
        self.name    = None

    def __set_name__(self, owner, name):
        self.name = name

    def __get__(self, obj, objtype=None):
        if obj is None: return self
        val = obj.__dict__.get(self.name)
        # TODO: return UnitFloat.Reading(val, self.unit) if val is not None else None
        pass

    def __set__(self, obj, value):
        # TODO: validate type is float or int, validate min/max, store
        pass

class Measurement:
    temperature = UnitFloat("C", min_val=-273.15)
    pressure    = UnitFloat("Pa", min_val=0)
    humidity    = UnitFloat("%", min_val=0, max_val=100)

m = Measurement()
m.temperature = 22.5
m.pressure    = 101325.0
m.humidity    = 65.0
print(m.temperature)  # Reading(value=22.5, unit='C')
print(m.humidity)
✅ Practice Checklist
25. Memory Management & Profiling

Python manages memory via reference counting and a cyclic garbage collector. Use sys, gc, tracemalloc, and cProfile to find memory leaks and performance bottlenecks.

sys.getsizeof, id(), and reference counting
import sys

# Basic sizes
for obj in [0, 1, 255, 2**100, 3.14, "hi", "hello world", [], [1,2,3], {}, {"a":1}]:
    print(f"  {repr(obj):<25} {sys.getsizeof(obj):>6} bytes")

# id() returns memory address
a = [1, 2, 3]
b = a          # same object
c = a.copy()   # different object

print("a is b:", a is b)  # True
print("a is c:", a is c)  # False
print("id(a)==id(b):", id(a) == id(b))  # True

# Small integers are cached
x, y = 100, 100
print("100 is 100:", x is y)  # True (cached)

x, y = 1000, 1000
print("1000 is 1000:", x is y)  # False (not cached)

# Nested containers: getsizeof is shallow!
lst = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
print("Shallow size:", sys.getsizeof(lst))   # just the list object
gc module and reference cycles
import gc

print("GC enabled:", gc.isenabled())
print("GC thresholds:", gc.get_threshold())  # (700, 10, 10)

# Reference cycle: a -> b -> a, both become unreachable
class Node:
    def __init__(self, name):
        self.name = name
        self.ref = None

a = Node("A")
b = Node("B")
a.ref = b  # a -> b
b.ref = a  # b -> a (cycle!)

# Delete our references
del a, b

before = gc.collect(0)
print(f"GC collected {before} objects in gen-0")

# Check what gc is tracking
tracked = gc.get_count()
print("GC counts (gen0, gen1, gen2):", tracked)

# Use __del__ to observe collection
class Tracked:
    def __del__(self):
        print(f"  {self!r} collected")

x = Tracked()
del x           # collected immediately (refcount -> 0)
gc.collect()    # collect cycles
tracemalloc and cProfile
import tracemalloc, cProfile, io, pstats

# --- tracemalloc: trace memory allocations ---
tracemalloc.start()

snapshot1 = tracemalloc.take_snapshot()
big_list = [i**2 for i in range(10_000)]
snapshot2 = tracemalloc.take_snapshot()

stats = snapshot2.compare_to(snapshot1, "lineno")
for stat in stats[:3]:
    print(f"  {stat}")

tracemalloc.stop()
del big_list

# --- cProfile: find slow functions ---
def slow_sum(n):
    return sum(i**2 for i in range(n))

def fast_sum(n):
    return n * (n-1) * (2*n-1) // 6  # formula

pr = cProfile.Profile()
pr.enable()
slow_sum(50_000)
fast_sum(50_000)
pr.disable()

sio = io.StringIO()
ps  = pstats.Stats(pr, stream=sio).sort_stats("cumulative")
ps.print_stats(5)
print(sio.getvalue())
💼 Real-World: Memory Leak Detector
A long-running service monitors its own memory usage between requests to detect leaks early.
import tracemalloc, sys

def deep_size(obj, seen=None):
    # Recursively estimate size of a container
    size = sys.getsizeof(obj)
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 0
    seen.add(obj_id)
    if isinstance(obj, dict):
        size += sum(deep_size(v, seen) for v in obj.values())
        size += sum(deep_size(k, seen) for k in obj.keys())
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
        size += sum(deep_size(i, seen) for i in obj)
    return size

# Simulate a request that leaks memory
cache = {}

def handle_request(key, data):
    cache[key] = data  # intentional "leak" into global cache
    return len(data)

tracemalloc.start()
snap1 = tracemalloc.take_snapshot()

for i in range(5):
    handle_request(f"req_{i}", list(range(1000)))

snap2 = tracemalloc.take_snapshot()
top = snap2.compare_to(snap1, "lineno")[:2]
for stat in top:
    print(f"  Memory diff: {stat}")
print(f"Cache deep size: {deep_size(cache):,} bytes")
tracemalloc.stop()
🏋️ Practice: Profile and Optimize
Write two versions of a function that finds all prime numbers up to n: (1) trial_division(n) using a simple loop, (2) sieve(n) using the Sieve of Eratosthenes. Use timeit to benchmark both for n=10000. Use cProfile to show which lines of trial_division are slowest.
Starter Code
import cProfile, timeit

def trial_division(n):
    primes = []
    for num in range(2, n+1):
        if all(num % i != 0 for i in range(2, int(num**0.5)+1)):
            primes.append(num)
    return primes

def sieve(n):
    is_prime = [True] * (n+1)
    is_prime[0] = is_prime[1] = False
    for i in range(2, int(n**0.5)+1):
        if is_prime[i]:
            for j in range(i*i, n+1, i):
                is_prime[j] = False
    return [i for i, p in enumerate(is_prime) if p]

N = 10_000

# Benchmark
t1 = timeit.timeit(lambda: trial_division(N), number=3)
t2 = timeit.timeit(lambda: sieve(N), number=3)
print(f"trial_division: {t1:.3f}s")
print(f"sieve:          {t2:.3f}s")
print(f"Speedup: {t1/t2:.1f}x")

# Profile trial_division
cProfile.run("trial_division(5000)", sort="cumulative")
✅ Practice Checklist
26. Logging Best Practices

Use the logging module instead of print() for production code. It supports levels, handlers, formatters, and log rotation β€” all configurable without code changes.

Basic logging setup and levels
import logging

# Configure root logger
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)-8s] %(name)s: %(message)s",
    datefmt="%H:%M:%S",
)

logger = logging.getLogger("myapp")

# Five standard levels (low to high)
logger.debug("Detailed info for debugging")
logger.info("Normal operation: user logged in")
logger.warning("Something unexpected but not fatal")
logger.error("A failure occurred β€” function returned None")
logger.critical("Service is down!")

# Log exceptions with traceback
try:
    result = 1 / 0
except ZeroDivisionError:
    logger.exception("Division failed")  # includes traceback

# Extra context
user_id = 42
logger.info("Processing order", extra={"user": user_id})

# Check effective level
print("Effective level:", logger.getEffectiveLevel())  # 10 = DEBUG
Multiple handlers and formatters
import logging, io

logger = logging.getLogger("pipeline")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()  # avoid duplicate handlers in notebooks

# Handler 1: console with simple format
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)  # console only shows WARNING+
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))

# Handler 2: "file" (using StringIO here for demo)
log_buffer = io.StringIO()
fh = logging.StreamHandler(log_buffer)
fh.setLevel(logging.DEBUG)   # file gets everything
fh.setFormatter(logging.Formatter(
    "%(asctime)s %(levelname)-8s [%(funcName)s:%(lineno)d] %(message)s",
    datefmt="%H:%M:%S"
))

logger.addHandler(ch)
logger.addHandler(fh)

def process(data):
    logger.debug("Starting process with %d items", len(data))
    logger.info("Processing...")
    if not data:
        logger.warning("Empty input")
    logger.debug("Done")

process([1, 2, 3])
process([])

print("--- File log ---")
print(log_buffer.getvalue())
Logger hierarchy and module-level loggers
import logging

# Best practice: use __name__ as logger name
# This creates a hierarchy: "myapp" -> "myapp.db" -> "myapp.db.query"

root = logging.getLogger()
app  = logging.getLogger("myapp")
db   = logging.getLogger("myapp.db")
qry  = logging.getLogger("myapp.db.query")

# Set up root handler for the demo
logging.basicConfig(
    level=logging.DEBUG,
    format="%(name)-20s %(levelname)s: %(message)s"
)

# Child loggers propagate to parent by default
app.setLevel(logging.INFO)
db.setLevel(logging.DEBUG)   # db subtree shows DEBUG

app.info("App started")
app.debug("This won't show β€” app is INFO level")
db.debug("DB connection established")
qry.debug("SELECT * FROM users")

# Disable propagation to avoid double-logging
# child_logger.propagate = False

# Silence noisy third-party libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
print("Third-party loggers silenced")
💼 Real-World: Pipeline Logger
A data processing pipeline uses structured logging to track progress, errors, and timing without print statements.
import logging, time, io

def setup_logger(name, level=logging.DEBUG):
    log = logging.getLogger(name)
    log.setLevel(level)
    if not log.handlers:
        h = logging.StreamHandler()
        h.setFormatter(logging.Formatter(
            "%(asctime)s %(name)s %(levelname)-8s %(message)s",
            datefmt="%H:%M:%S"
        ))
        log.addHandler(h)
    return log

log = setup_logger("etl")

def extract(source):
    log.info("Extracting from %s", source)
    data = list(range(100))  # simulated data
    log.debug("Extracted %d records", len(data))
    return data

def transform(data):
    log.info("Transforming %d records", len(data))
    t0 = time.time()
    result = [x * 2 for x in data if x % 5 != 0]
    log.debug("Transform took %.3fs, %d records remain", time.time()-t0, len(result))
    return result

def load(data, target):
    log.info("Loading %d records to %s", len(data), target)
    # Simulate occasional error
    if len(data) > 70:
        log.warning("Large batch β€” consider chunking")
    log.info("Load complete")

try:
    d = extract("sales.csv")
    d = transform(d)
    load(d, "warehouse")
except Exception:
    log.exception("Pipeline failed")
🏋️ Practice: Log Analyzer
Write a function parse_log_line(line) that extracts timestamp, level, and message from a log line like '12:34:56 WARNING myapp: disk 90% full'. Write analyze_logs(lines) that counts occurrences of each level and returns a dict like {'WARNING': 3, 'ERROR': 1}. Use the logging module to emit a summary.
Starter Code
import logging, re
from collections import Counter

def parse_log_line(line):
    # Pattern: HH:MM:SS LEVEL name: message
    pattern = r"(\d{2}:\d{2}:\d{2}) (\w+) (\S+): (.+)"
    m = re.match(pattern, line)
    if m:
        return {"time": m.group(1), "level": m.group(2),
                "name": m.group(3), "msg": m.group(4)}
    return None

def analyze_logs(lines):
    # TODO: parse each line, count levels, return Counter dict
    pass

sample_logs = [
    "12:00:01 INFO myapp: started",
    "12:00:02 DEBUG myapp.db: query took 0.1s",
    "12:00:03 WARNING myapp: memory 80% full",
    "12:00:04 ERROR myapp: connection refused",
    "12:00:05 WARNING myapp: retry 1/3",
]

counts = analyze_logs(sample_logs)
print("Level counts:", counts)
✅ Practice Checklist
27. Argparse & CLI Tools

argparse is Python's standard library for building command-line interfaces. It handles argument parsing, type validation, help generation, and subcommands.

Basic ArgumentParser with positional and optional args
import argparse

# Simulate command-line arguments (replace sys.argv for demo)
parser = argparse.ArgumentParser(
    description="Process a data file",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

# Positional argument (required)
parser.add_argument("filename", help="Input CSV file path")

# Optional arguments
parser.add_argument("-o", "--output",  default="output.csv", help="Output file")
parser.add_argument("-n", "--rows",    type=int, default=100, help="Number of rows")
parser.add_argument("-v", "--verbose", action="store_true",   help="Verbose output")
parser.add_argument("--format",        choices=["csv","json","parquet"], default="csv")

# Parse a fake argument list
args = parser.parse_args(["data.csv", "--rows", "500", "--verbose", "--format", "json"])

print(f"File:    {args.filename}")
print(f"Output:  {args.output}")
print(f"Rows:    {args.rows}")
print(f"Verbose: {args.verbose}")
print(f"Format:  {args.format}")
Subcommands (subparsers)
import argparse

parser = argparse.ArgumentParser(prog="datool", description="Data pipeline tool")
subs   = parser.add_subparsers(dest="command", required=True)

# Subcommand: convert
convert = subs.add_parser("convert", help="Convert file format")
convert.add_argument("input",  help="Input file")
convert.add_argument("output", help="Output file")
convert.add_argument("--compression", choices=["none","gzip","snappy"], default="none")

# Subcommand: stats
stats = subs.add_parser("stats", help="Show file statistics")
stats.add_argument("file",  help="File to analyze")
stats.add_argument("--col", action="append", dest="cols", help="Column to analyze (repeatable)")

# Subcommand: validate
validate = subs.add_parser("validate", help="Validate schema")
validate.add_argument("file")
validate.add_argument("--schema", required=True)

# Demo: parse "convert" command
args = parser.parse_args(["convert", "input.csv", "output.parquet", "--compression", "snappy"])
print(f"Command:     {args.command}")
print(f"Input:       {args.input}")
print(f"Output:      {args.output}")
print(f"Compression: {args.compression}")

# Demo: parse "stats" command
args2 = parser.parse_args(["stats", "data.csv", "--col", "price", "--col", "qty"])
print(f"Stats cols:  {args2.cols}")
Argument groups, mutual exclusion, and type validators
import argparse

parser = argparse.ArgumentParser(description="Model training CLI")

# Argument group for visual organization in --help
data_group = parser.add_argument_group("Data options")
data_group.add_argument("--train",  required=True, help="Training data path")
data_group.add_argument("--val",    required=True, help="Validation data path")
data_group.add_argument("--test",   help="Test data path")

# Argument group for model options
model_group = parser.add_argument_group("Model options")
model_group.add_argument("--lr",     type=float, default=0.001)
model_group.add_argument("--epochs", type=int,   default=10)

# Mutually exclusive: can't use --gpu and --cpu together
device = parser.add_mutually_exclusive_group()
device.add_argument("--gpu", action="store_true")
device.add_argument("--cpu", action="store_true")

# Custom type validator
def positive_int(value):
    ivalue = int(value)
    if ivalue <= 0:
        raise argparse.ArgumentTypeError(f"{value} must be a positive integer")
    return ivalue

model_group.add_argument("--batch", type=positive_int, default=32)

args = parser.parse_args(["--train", "train.csv", "--val", "val.csv",
                          "--lr", "0.01", "--gpu", "--batch", "64"])
print(vars(args))
💼 Real-World: ETL Pipeline CLI
A data engineering team builds a CLI tool to run ETL jobs with configurable sources, targets, and options.
import argparse, sys

def run_etl(args):
    print(f"ETL Job: {args.job_name}")
    print(f"  Source:   {args.source} (format={args.format})")
    print(f"  Target:   {args.target}")
    print(f"  Batch:    {args.batch_size}")
    print(f"  Dry run:  {args.dry_run}")

    if args.dry_run:
        print("  [DRY RUN] No data written.")
        return 0
    print("  Writing data...")
    return 0

parser = argparse.ArgumentParser(description="ETL Pipeline Runner")
parser.add_argument("job_name",   help="Job identifier")
parser.add_argument("source",     help="Source connection string")
parser.add_argument("target",     help="Target connection string")
parser.add_argument("--format",   choices=["csv","json","parquet"], default="csv")
parser.add_argument("--batch-size", type=int, default=1000, dest="batch_size")
parser.add_argument("--dry-run",  action="store_true", dest="dry_run")

# Demo
args = parser.parse_args([
    "daily_sales", "s3://bucket/sales.parquet", "postgres://db/warehouse",
    "--format", "parquet", "--batch-size", "5000", "--dry-run"
])
sys.exit(run_etl(args))
🏋️ Practice: File Processor CLI
Build a CLI with two subcommands: (1) count β€” takes a filename, optional --pattern (regex), prints count of matching lines; (2) summary β€” takes a filename, --cols (repeatable), prints first/last/count for each column in a CSV. Use argparse with proper help strings and type validation.
Starter Code
import argparse, csv, re

parser = argparse.ArgumentParser(prog="fileproc")
subs = parser.add_subparsers(dest="cmd", required=True)

# count subcommand
count_p = subs.add_parser("count", help="Count lines matching pattern")
count_p.add_argument("file")
count_p.add_argument("--pattern", default=".*", help="Regex pattern")

# summary subcommand
sum_p = subs.add_parser("summary", help="Summarize CSV columns")
sum_p.add_argument("file")
sum_p.add_argument("--col", action="append", dest="cols")

def cmd_count(args):
    pattern = re.compile(args.pattern)
    # TODO: open args.file, count lines matching pattern
    pass

def cmd_summary(args):
    # TODO: open CSV, for each col in args.cols, print first/last/count
    pass

args = parser.parse_args(["count", "data.txt", "--pattern", "ERROR"])
if args.cmd == "count":
    cmd_count(args)
elif args.cmd == "summary":
    cmd_summary(args)
✅ Practice Checklist
28. JSON & Data Serialization

Python's json module handles serialization to/from JSON. For Python-specific objects, use pickle. For configuration, use configparser or tomllib.

json.dumps / loads with custom encoder
import json
from datetime import datetime, date
from decimal import Decimal

# Basic usage
data = {"name": "Alice", "scores": [95, 87, 92], "active": True}
text = json.dumps(data, indent=2)
print("JSON string:")
print(text)

loaded = json.loads(text)
print("Loaded back:", loaded)

# Custom encoder for non-serializable types
class AppEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        if isinstance(obj, Decimal):
            return float(obj)
        if isinstance(obj, set):
            return sorted(list(obj))
        return super().default(obj)

record = {
    "created": datetime(2024, 1, 15, 9, 30),
    "price":   Decimal("29.99"),
    "tags":    {"python", "data", "tutorial"},
}

print(json.dumps(record, cls=AppEncoder, indent=2))
Custom decoder and JSON schema validation pattern
import json
from datetime import datetime

# Custom decoder using object_hook
def decode_record(d):
    for key, val in d.items():
        # Auto-parse ISO datetime strings
        if isinstance(val, str) and len(val) >= 19 and "T" in val:
            try:
                d[key] = datetime.fromisoformat(val)
            except ValueError:
                pass
    return d

json_str = '''
{
    "id": 42,
    "name": "Order #42",
    "created_at": "2024-01-15T09:30:00",
    "updated_at": "2024-03-20T14:00:00",
    "amount": 299.99
}
'''

obj = json.loads(json_str, object_hook=decode_record)
print("Type of created_at:", type(obj["created_at"]))  # datetime
print("Year:", obj["created_at"].year)

# Simple schema validation pattern
def validate(data, schema):
    errors = []
    for field, expected_type in schema.items():
        if field not in data:
            errors.append(f"Missing: {field}")
        elif not isinstance(data[field], expected_type):
            errors.append(f"{field}: expected {expected_type.__name__}, got {type(data[field]).__name__}")
    return errors

schema = {"id": int, "name": str, "amount": float}
print("Errors:", validate(obj, schema) or "None")
pickle, configparser, and tomllib
import pickle, configparser, io

# ─── pickle: serialize any Python object ───────────────────────────────────
class Model:
    def __init__(self, weights):
        self.weights = weights
    def predict(self, x):
        return sum(w * xi for w, xi in zip(self.weights, x))

model = Model([0.5, -0.3, 1.2])
buf = io.BytesIO()

pickle.dump(model, buf)
print("Pickled size:", buf.tell(), "bytes")

buf.seek(0)
loaded_model = pickle.load(buf)
print("Prediction:", loaded_model.predict([1.0, 2.0, 3.0]))

# ─── configparser: INI-format config files ─────────────────────────────────
config_text = '''
[database]
host = localhost
port = 5432
name = mydb

[app]
debug = true
workers = 4
log_level = INFO
'''
cfg = configparser.ConfigParser()
cfg.read_string(config_text)

print("DB host:", cfg["database"]["host"])
print("DB port:", cfg.getint("database", "port"))
print("Debug:  ", cfg.getboolean("app", "debug"))
print("Workers:", cfg.getint("app", "workers"))
print("Sections:", cfg.sections())
💼 Real-World: API Response Cache
A data ingestion service serializes API responses to JSON with metadata, then deserializes and validates them on re-read.
import json, hashlib
from datetime import datetime

class APICache:
    def __init__(self):
        self._store = {}  # in memory; use file I/O in production

    def _key(self, url, params):
        raw = json.dumps({"url": url, "params": params}, sort_keys=True)
        return hashlib.md5(raw.encode()).hexdigest()

    def get(self, url, params=None):
        k = self._key(url, params or {})
        if k in self._store:
            entry = json.loads(self._store[k])
            age = (datetime.now() - datetime.fromisoformat(entry["cached_at"])).seconds
            print(f"  [cache hit] age={age}s, key={k[:8]}")
            return entry["data"]
        return None

    def set(self, url, params, data):
        k = self._key(url, params or {})
        entry = {"data": data, "cached_at": datetime.now().isoformat(), "url": url}
        self._store[k] = json.dumps(entry)
        print(f"  [cache set] key={k[:8]}")

cache = APICache()
url = "https://api.example.com/prices"
params = {"symbol": "AAPL", "period": "1d"}

result = cache.get(url, params)
if result is None:
    data = {"symbol": "AAPL", "price": 195.50, "volume": 1_200_000}
    cache.set(url, params, data)
    result = data

print("Result:", result)
cache.get(url, params)  # should be cache hit
🏋️ Practice: Config File Manager
Write a ConfigManager class that loads from a JSON file (on init) and falls back to defaults if the file does not exist. Support get(key, default=None), set(key, value), and save() (writes back to JSON). Write a test that creates a temp file, sets values, saves, reloads, and verifies.
Starter Code
import json, pathlib

class ConfigManager:
    def __init__(self, path, defaults=None):
        self.path = pathlib.Path(path)
        self._data = dict(defaults or {})
        # TODO: if self.path exists, load and merge with self._data
        pass

    def get(self, key, default=None):
        # TODO: return self._data.get(key, default)
        pass

    def set(self, key, value):
        # TODO: update self._data[key] = value
        pass

    def save(self):
        # TODO: write self._data to self.path as JSON (indent=2)
        pass

# Test
import tempfile, os
with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as f:
    json.dump({"theme": "dark"}, f)
    tmp = f.name

cfg = ConfigManager(tmp, defaults={"theme": "light", "font_size": 12})
print("theme:", cfg.get("theme"))      # dark (from file)
print("font:", cfg.get("font_size"))   # 12 (from defaults)
cfg.set("font_size", 14)
cfg.save()

cfg2 = ConfigManager(tmp)
print("reloaded font:", cfg2.get("font_size"))  # 14
os.unlink(tmp)
✅ Practice Checklist
29. Pathlib & File System Ops

pathlib.Path is the modern way to handle filesystem paths in Python. It's cross-platform, object-oriented, and integrates with all standard file operations.

Path manipulation and navigation
from pathlib import Path

# Create a Path object β€” cross-platform!
p = Path("/home/user/data/sales_2024.csv")

# Path components
print("name:       ", p.name)         # sales_2024.csv
print("stem:       ", p.stem)         # sales_2024
print("suffix:     ", p.suffix)       # .csv
print("suffixes:   ", Path("a.tar.gz").suffixes)  # ['.tar', '.gz']
print("parent:     ", p.parent)       # /home/user/data
print("parts:      ", p.parts)

# Building paths with / operator
base    = Path("/home/user")
data    = base / "data"
outfile = data / "reports" / "q1.xlsx"
print("Built path:", outfile)

# Resolve, absolute, relative_to
cwd = Path.cwd()
print("CWD:", cwd)
print("Home:", Path.home())

# Check existence
print("exists:", p.exists())
print("is_file:", p.is_file())
print("is_dir: ", p.is_dir())

# Change suffix
renamed = p.with_suffix(".parquet")
print("With new suffix:", renamed)
Glob patterns and directory walking
import tempfile, pathlib

# Create a temp directory structure for demo
tmp = pathlib.Path(tempfile.mkdtemp())
(tmp / "data").mkdir()
(tmp / "data" / "sales.csv").write_text("a,b")
(tmp / "data" / "costs.csv").write_text("c,d")
(tmp / "reports").mkdir()
(tmp / "reports" / "q1.xlsx").write_text("x")
(tmp / "reports" / "q2.xlsx").write_text("y")
(tmp / "config.json").write_text("{}")

# glob: match in one directory
csvs = list(tmp.glob("data/*.csv"))
print("CSVs:", [f.name for f in csvs])

# rglob: recursive glob
all_files = list(tmp.rglob("*"))
print("All files:")
for f in sorted(all_files):
    print("  ", f.relative_to(tmp))

# Filter only files (not directories)
only_files = [f for f in tmp.rglob("*") if f.is_file()]
print("File count:", len(only_files))

# Cleanup
import shutil
shutil.rmtree(tmp)
print("Temp dir removed")
Reading, writing, and file operations
import tempfile, pathlib, shutil

tmp = pathlib.Path(tempfile.mkdtemp())

# Write and read text
(tmp / "hello.txt").write_text("Hello, World!")
content = (tmp / "hello.txt").read_text()
print("read_text:", content)

# Write and read bytes
(tmp / "data.bin").write_bytes(b"\x00\x01\x02\x03")
raw = (tmp / "data.bin").read_bytes()
print("read_bytes:", raw.hex())

# Open with context manager for large files
log = tmp / "log.txt"
with log.open("w") as f:
    for i in range(5):
        f.write(f"line {i}\n")

with log.open() as f:
    for line in f:
        print(" ", line.rstrip())

# stat: file metadata
s = log.stat()
print(f"Size: {s.st_size} bytes")

# mkdir, rename, unlink, shutil operations
(tmp / "subdir").mkdir(parents=True, exist_ok=True)
shutil.copy(log, tmp / "subdir" / "log_copy.txt")
print("Copied:", list((tmp / "subdir").iterdir()))

shutil.rmtree(tmp)
print("Done")
💼 Real-World: Data File Organizer
A data engineer uses pathlib to scan a raw data directory, classify files by type, and move them to organized subdirectories.
import tempfile, pathlib, shutil

# Setup demo files
src = pathlib.Path(tempfile.mkdtemp())
for name in ["sales.csv", "costs.csv", "model.pkl", "report.pdf",
             "config.json", "weights.pkl", "notes.txt"]:
    (src / name).write_text(f"content of {name}")

print("Input files:", [f.name for f in sorted(src.iterdir())])

# Classification map
TYPE_MAP = {
    ".csv":  "data",
    ".pkl":  "models",
    ".json": "config",
    ".pdf":  "reports",
    ".txt":  "misc",
}

moved = []
for file in src.iterdir():
    if not file.is_file():
        continue
    category = TYPE_MAP.get(file.suffix, "other")
    dest_dir = src / category
    dest_dir.mkdir(exist_ok=True)
    dest = dest_dir / file.name
    shutil.move(str(file), dest)
    moved.append(f"{file.name} -> {category}/")

for m in moved:
    print(" ", m)

# Show final structure
for subdir in sorted(src.iterdir()):
    if subdir.is_dir():
        print(f"  {subdir.name}/:", [f.name for f in subdir.iterdir()])

shutil.rmtree(src)
🏋️ Practice: Log File Archiver
Write a function archive_logs(log_dir, archive_dir, days_old=7) that uses pathlib to: (1) find all .log files in log_dir older than days_old days, (2) compress each with shutil.make_archive (or just move for simplicity), (3) move them to archive_dir/YYYY-MM/ subfolders based on file modification date. Return a list of moved files.
Starter Code
import pathlib, shutil, tempfile
from datetime import datetime, timedelta

def archive_logs(log_dir, archive_dir, days_old=7):
    log_dir     = pathlib.Path(log_dir)
    archive_dir = pathlib.Path(archive_dir)
    cutoff      = datetime.now() - timedelta(days=days_old)
    moved       = []

    for log_file in log_dir.glob("*.log"):
        mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
        if mtime < cutoff:
            # TODO: create archive_dir/YYYY-MM/ folder
            # TODO: move log_file there
            # TODO: append (log_file.name, dest) to moved
            pass

    return moved

# Demo setup
import os, time
tmp_logs    = pathlib.Path(tempfile.mkdtemp())
tmp_archive = pathlib.Path(tempfile.mkdtemp())

# Create fake old log files
for i in range(3):
    f = tmp_logs / f"app_{i}.log"
    f.write_text(f"log content {i}")
    # Make it 10 days old
    old_time = time.time() - 10 * 86400
    os.utime(f, (old_time, old_time))

(tmp_logs / "recent.log").write_text("recent")  # should NOT be archived

result = archive_logs(tmp_logs, tmp_archive, days_old=7)
print("Archived:", result)
shutil.rmtree(tmp_logs); shutil.rmtree(tmp_archive)
✅ Practice Checklist
30. String Formatting Mastery

Master Python's string formatting mini-language: f-strings, format(), format spec DSL, textwrap, and Template strings for safe user-controlled formatting.

f-string advanced features and format spec
# Format spec: [[fill]align][sign][z][#][0][width][grouping][.precision][type]
pi = 3.14159265358979

# Width, precision, type
print(f"{pi:.2f}")        # 3.14
print(f"{pi:10.4f}")      # right-aligned in width 10
print(f"{pi:<10.4f}|")    # left-aligned
print(f"{pi:^10.4f}|")    # center-aligned
print(f"{pi:+.3f}")       # force + sign

# Integer formatting
n = 1_234_567
print(f"{n:,}")            # 1,234,567
print(f"{n:_}")            # 1_234_567
print(f"{n:>15,}")         # right-aligned width 15
print(f"{255:#x}")         # 0xff  hex with prefix
print(f"{255:08b}")        # 11111111  binary, zero-padded

# Percentage
print(f"{0.857:.1%}")      # 85.7%

# Datetime in f-string
from datetime import datetime
now = datetime(2024, 3, 15, 9, 5, 7)
print(f"{now:%Y-%m-%d %H:%M:%S}")  # 2024-03-15 09:05:07
print(f"{now:%B %d, %Y}")          # March 15, 2024

# Expression in f-string
data = [1, 2, 3, 4, 5]
print(f"Mean: {sum(data)/len(data):.2f}, Max: {max(data)}")

# Self-documenting expressions (Python 3.8+)
x = 42
print(f"{x=}")   # x=42
textwrap, Template, and str methods
import textwrap
from string import Template

# textwrap.wrap / fill: wrap long text
long_text = ("Python is a high-level, interpreted, general-purpose programming language. "
             "Its design philosophy emphasizes code readability with the use of significant indentation.")

wrapped = textwrap.fill(long_text, width=50)
print(wrapped)
print()

# Dedent: remove common leading whitespace (useful after triple-quote strings)
indented = '''
    def foo():
        return 42
    '''
print(repr(textwrap.dedent(indented).strip()))

# Template: safe for user-provided format strings (no code execution risk)
tmpl = Template("Hello $name, your balance is $$${balance:.2f}")
print(tmpl.substitute(name="Alice", balance=1234.56))

# safe_substitute: does not raise for missing keys
tmpl2 = Template("Dear $name, ref: $ref_id")
print(tmpl2.safe_substitute(name="Bob"))  # $ref_id stays

# str methods useful for formatting
cols = ["id", "name", "price", "qty"]
print(" | ".join(c.ljust(10) for c in cols))
print("-" * 45)
row = [1, "apple", 1.20, 50]
print(" | ".join(str(v).ljust(10) for v in row))
Building tables and reports with format()
# format() with the mini-language directly
print(format(3.14159, ".2f"))
print(format(1234567, ","))
print(format("hello", ">20"))

# Building a text table
headers = ["Product", "Qty", "Price", "Total"]
rows = [
    ("Apple",    50, 1.20, 60.00),
    ("Banana",  200, 0.50, 100.00),
    ("Cherry",   30, 2.00, 60.00),
    ("Durian",    5, 8.75, 43.75),
]

# Column widths
w = [12, 6, 8, 10]
fmt_h = "  ".join(f"{h:>{ww}}" for h, ww in zip(headers, w))
sep   = "  ".join("-"*ww for ww in w)
print(fmt_h)
print(sep)
for row in rows:
    vals = [f"{row[0]:<{w[0]}}", f"{row[1]:>{w[1]}",
            f"{row[2]:>{w[2]}.2f}", f"{row[3]:>{w[3]}.2f}"]
    print("  ".join(vals))

total = sum(r[3] for r in rows)
print(sep)
print(f"{'TOTAL':>{sum(w)+6}}: {total:.2f}")
💼 Real-World: Report Generator
A finance team generates formatted summary reports from sales data using f-strings and textwrap.
from datetime import date
import textwrap

def format_report(title, data, width=60):
    border  = "=" * width
    today   = date.today().strftime("%B %d, %Y")
    lines   = [border, f"  {title}".center(width), f"  Generated: {today}".center(width), border, ""]

    # Summary stats
    totals  = [r["revenue"] for r in data]
    lines  += [
        f"  {'Region':<15} {'Revenue':>12} {'Units':>8} {'Avg/Unit':>10}",
        "  " + "-" * (width - 2),
    ]

    for r in data:
        avg = r["revenue"] / r["units"] if r["units"] else 0
        lines.append(
            f"  {r['region']:<15} ${r['revenue']:>11,.0f} {r['units']:>8,} ${avg:>9.2f}"
        )

    lines += ["  " + "-" * (width - 2),
              f"  {'TOTAL':<15} ${sum(totals):>11,.0f}",
              "", border]
    return "\n".join(lines)

data = [
    {"region": "North",  "revenue": 1_450_000, "units": 9_800},
    {"region": "South",  "revenue":   980_000, "units": 7_200},
    {"region": "East",   "revenue": 2_100_000, "units": 14_500},
    {"region": "West",   "revenue": 1_750_000, "units": 11_000},
]

print(format_report("Q1 2024 Sales Report", data))
🏋️ Practice: Invoice Formatter
Write a function format_invoice(company, items, tax_rate) where items is a list of (desc, qty, unit_price) tuples. Print a formatted invoice with: header (company name, date), line items table (description, qty, unit price, line total), subtotal, tax amount, and grand total. Use f-strings with format specs for alignment.
Starter Code
from datetime import date

def format_invoice(company, items, tax_rate=0.08):
    today    = date.today()
    subtotal = sum(qty * price for _, qty, price in items)
    tax      = subtotal * tax_rate
    total    = subtotal + tax

    w = 55
    print("=" * w)
    print(f"  {company}".center(w))
    print(f"  Invoice Date: {today}".center(w))
    print("=" * w)
    print(f"  {'Description':<22} {'Qty':>4} {'Unit':>8} {'Total':>10}")
    print("  " + "-" * (w-2))

    for desc, qty, price in items:
        # TODO: print each line with f-string formatting
        pass

    print("  " + "-" * (w-2))
    # TODO: print subtotal, tax, and grand total rows
    print("=" * w)

format_invoice("Acme Corp", [
    ("Python Training",  1, 2500.00),
    ("Jupyter Setup",    3,  150.00),
    ("Cloud Credits",   10,   49.99),
], tax_rate=0.09)
✅ Practice Checklist
31. Performance Optimization & Caching

Profile before optimizing. Use timeit for micro-benchmarks, functools.cache for memoization, __slots__ for memory, and algorithmic improvements for the biggest wins.

timeit for micro-benchmarking
import timeit

# Compare list comprehension vs map() vs for-loop
setup = "data = list(range(10_000))"

t_comp  = timeit.timeit("[x**2 for x in data]",      setup=setup, number=1000)
t_map   = timeit.timeit("list(map(lambda x: x**2, data))", setup=setup, number=1000)
t_loop  = timeit.timeit('''
result = []
for x in data:
    result.append(x**2)
''', setup=setup, number=1000)

print(f"List comprehension: {t_comp:.3f}s")
print(f"map(lambda):        {t_map:.3f}s")
print(f"for loop + append:  {t_loop:.3f}s")

# Compare string joining methods
setup2 = "parts = ['a'] * 1000"
t_join  = timeit.timeit("''.join(parts)",        setup=setup2, number=5000)
t_plus  = timeit.timeit("s=''
for p in parts: s += p", setup=setup2, number=5000)
print(f"join():     {t_join:.4f}s")
print(f"+=:         {t_plus:.4f}s")
print(f"join speedup: {t_plus/t_join:.1f}x")
functools.cache and lru_cache
import functools, time

# lru_cache: memoize with a max size limit
@functools.lru_cache(maxsize=128)
def fib_lru(n):
    if n <= 1: return n
    return fib_lru(n-1) + fib_lru(n-2)

# functools.cache: unlimited cache (Python 3.9+)
@functools.cache
def fib_cache(n):
    if n <= 1: return n
    return fib_cache(n-1) + fib_cache(n-2)

t0 = time.perf_counter()
result = fib_lru(40)
print(f"fib(40) = {result}, lru_cache time: {(time.perf_counter()-t0)*1000:.2f}ms")
print("Cache info:", fib_lru.cache_info())

# cached_property: compute once, then return stored value
class DataStats:
    def __init__(self, data):
        self._data = data

    @functools.cached_property
    def mean(self):
        print("  (computing mean...)")
        return sum(self._data) / len(self._data)

    @functools.cached_property
    def std(self):
        print("  (computing std...)")
        m = self.mean
        return (sum((x-m)**2 for x in self._data) / len(self._data)) ** 0.5

ds = DataStats(list(range(1000)))
print("mean:", ds.mean)
print("mean:", ds.mean)  # no recompute
print("std: ", ds.std)
Algorithmic improvements and built-in speed
import timeit, collections

# O(n) lookup with set vs list
data_list = list(range(10_000))
data_set  = set(data_list)

t_list = timeit.timeit("9999 in data_list", globals=locals(), number=100_000)
t_set  = timeit.timeit("9999 in data_set",  globals=locals(), number=100_000)
print(f"list 'in': {t_list:.4f}s")
print(f"set  'in': {t_set:.4f}s")
print(f"Set speedup: {t_list/t_set:.0f}x")

# Counter vs manual counting
words = "the quick brown fox jumps over the lazy dog the fox".split()

t_manual = timeit.timeit('''
counts = {}
for w in words:
    counts[w] = counts.get(w, 0) + 1
''', globals={"words": words}, number=50_000)

t_counter = timeit.timeit("collections.Counter(words)",
                           globals={"collections": collections, "words": words},
                           number=50_000)
print(f"Manual count: {t_manual:.4f}s")
print(f"Counter:      {t_counter:.4f}s")

# Use sorted() key function instead of cmp
records = [{"name": n, "score": s} for n, s in [("Bob", 72), ("Alice", 95), ("Charlie", 88)]]
sorted_records = sorted(records, key=lambda r: r["score"], reverse=True)
for r in sorted_records:
    print(f"  {r['name']:10s}: {r['score']}")
💼 Real-World: DataFrame-like Aggregator
A custom data aggregation class uses caching and efficient data structures to compute statistics on large datasets without pandas.
import functools, collections, statistics

class FastAggregator:
    def __init__(self, records):
        self._records = records
        self._by_key  = None  # lazy

    def _ensure_index(self):
        if self._by_key is None:
            self._by_key = collections.defaultdict(list)
            for r in self._records:
                self._by_key[r["group"]].append(r["value"])

    @functools.cached_property
    def group_means(self):
        self._ensure_index()
        return {k: statistics.mean(v) for k, v in self._by_key.items()}

    @functools.cached_property
    def group_counts(self):
        self._ensure_index()
        return {k: len(v) for k, v in self._by_key.items()}

    @functools.cached_property
    def overall_mean(self):
        vals = [r["value"] for r in self._records]
        return statistics.mean(vals)

import random
random.seed(42)
records = [{"group": f"G{i%5}", "value": random.gauss(50, 10)} for i in range(10_000)]

agg = FastAggregator(records)
print("Group means:", {k: f"{v:.2f}" for k, v in agg.group_means.items()})
print("Group counts:", agg.group_counts)
print("Overall mean:", f"{agg.overall_mean:.2f}")
print("(Accessing again β€” no recompute):", f"{agg.overall_mean:.2f}")
🏋️ Practice: Benchmark Challenge
Write three versions of a function find_duplicates(lst) that returns a list of values appearing more than once: (1) brute_force using nested loops O(n^2), (2) sort_based by sorting first O(n log n), (3) hash_based using Counter O(n). Benchmark all three with timeit on a list of 10,000 integers. Report the speedups.
Starter Code
import timeit, collections, random

random.seed(42)
data = [random.randint(0, 500) for _ in range(10_000)]

def brute_force(lst):
    dups = set()
    for i in range(len(lst)):
        for j in range(i+1, len(lst)):
            if lst[i] == lst[j]:
                dups.add(lst[i])
    return list(dups)

def sort_based(lst):
    # TODO: sort, then check adjacent equal elements
    pass

def hash_based(lst):
    # TODO: use collections.Counter, return keys with count > 1
    pass

# Only benchmark sort_based and hash_based (brute force is too slow)
for name, fn in [("sort_based", sort_based), ("hash_based", hash_based)]:
    t = timeit.timeit(lambda: fn(data), number=100)
    print(f"{name}: {t:.4f}s, found {len(fn(data))} duplicates")
✅ Practice Checklist
32. Virtual Environments & Package Management

Virtual environments isolate project dependencies. pip manages packages, and importlib enables dynamic imports at runtime β€” essential for building extensible systems.

venv and pip (commands and concepts)
# These commands are run in the terminal (not runnable as Python code)
# They are shown here as strings for educational purposes

venv_commands = '''
# Create a virtual environment
python -m venv .venv

# Activate (macOS/Linux)
source .venv/bin/activate

# Activate (Windows)
.venv\\Scripts\\activate

# Install packages
pip install requests pandas scikit-learn

# Install from requirements file
pip install -r requirements.txt

# Freeze current environment
pip freeze > requirements.txt

# Upgrade a package
pip install --upgrade numpy

# Show installed packages
pip list
pip show numpy

# Deactivate
deactivate
'''

# requirements.txt format:
req_txt = '''
# requirements.txt
numpy>=1.24,<2.0
pandas==2.1.0
scikit-learn>=1.3
requests>=2.31
matplotlib>=3.7; python_version >= "3.9"
'''

# pyproject.toml format (modern, preferred):
pyproject_toml = '''
[project]
name = "my-ml-project"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
    "numpy>=1.24",
    "pandas>=2.1",
    "scikit-learn>=1.3",
]

[project.optional-dependencies]
dev = ["pytest", "black", "mypy"]
'''

print("Common venv workflow:")
for cmd in ["python -m venv .venv", "source .venv/bin/activate", "pip install -r requirements.txt"]:
    print(f"  $ {cmd}")
importlib: dynamic imports at runtime
import importlib, sys

# Basic dynamic import
math = importlib.import_module("math")
print("sqrt(16):", math.sqrt(16))

# Import a submodule
pprint = importlib.import_module("pprint")
pprint.pprint({"a": 1, "b": [2, 3]})

# Conditional import: use fast version if available
def import_or_fallback(preferred, fallback):
    try:
        return importlib.import_module(preferred)
    except ImportError:
        print(f"  {preferred} not found, using {fallback}")
        return importlib.import_module(fallback)

json_mod = import_or_fallback("ujson", "json")  # ujson is faster if installed
print("json module:", json_mod.__name__)

# importlib.util: check if a module is available without importing it
import importlib.util

for pkg in ["numpy", "pandas", "flask", "fastapi", "nonexistent_pkg"]:
    spec = importlib.util.find_spec(pkg)
    status = "installed" if spec else "NOT installed"
    print(f"  {pkg:<20} {status}")
Package structure and __init__.py
import tempfile, pathlib, sys, importlib

# Create a minimal package structure in a temp directory
tmp = pathlib.Path(tempfile.mkdtemp())
pkg = tmp / "mypackage"
pkg.mkdir()

# Package init
(pkg / "__init__.py").write_text('''
__version__ = "1.0.0"
from mypackage.utils import add, multiply
''')

(pkg / "utils.py").write_text('''
def add(a, b):
    return a + b

def multiply(a, b):
    return a * b
''')

(pkg / "models.py").write_text('''
class LinearModel:
    def __init__(self, slope=1, intercept=0):
        self.slope = slope
        self.intercept = intercept

    def predict(self, x):
        return self.slope * x + self.intercept
''')

# Add tmp to path so Python can find our package
sys.path.insert(0, str(tmp))

# Import our package
mypackage = importlib.import_module("mypackage")
print("Version:", mypackage.__version__)
print("add:", mypackage.add(3, 4))
print("multiply:", mypackage.multiply(3, 4))

models = importlib.import_module("mypackage.models")
m = models.LinearModel(slope=2.5, intercept=-1)
print("predict(10):", m.predict(10))

sys.path.pop(0)
import shutil; shutil.rmtree(tmp)
💼 Real-World: Plugin Loader System
An application dynamically loads analysis plugins from a directory at startup using importlib, without hardcoding plugin names.
import importlib, importlib.util, pathlib, sys, tempfile, shutil

# Create plugin directory with two demo plugins
tmp = pathlib.Path(tempfile.mkdtemp())
plugin_dir = tmp / "plugins"
plugin_dir.mkdir()

(plugin_dir / "plugin_stats.py").write_text('''
def run(data):
    n = len(data)
    mean = sum(data) / n
    return {"plugin": "stats", "count": n, "mean": round(mean, 2)}
''')

(plugin_dir / "plugin_filter.py").write_text('''
def run(data):
    filtered = [x for x in data if x > 0]
    return {"plugin": "filter", "kept": len(filtered), "dropped": len(data)-len(filtered)}
''')

def load_plugins(plugin_dir):
    plugins = {}
    for path in sorted(pathlib.Path(plugin_dir).glob("plugin_*.py")):
        name = path.stem
        spec = importlib.util.spec_from_file_location(name, path)
        mod  = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)
        plugins[name] = mod
        print(f"  Loaded: {name}")
    return plugins

sys.path.insert(0, str(plugin_dir))
plugins = load_plugins(plugin_dir)

data = [3, -1, 7, 0, -2, 5, 9]
for name, plugin in plugins.items():
    print(f"  {name}: {plugin.run(data)}")

sys.path.pop(0)
shutil.rmtree(tmp)
🏋️ Practice: Dependency Checker
Write a function check_dependencies(requirements) that takes a list of package names and uses importlib.util.find_spec() to check if each is installed. Return a dict with 'installed' and 'missing' lists. Write another function install_missing(missing) that prints the pip install command needed (don't actually run it β€” just print it).
Starter Code
import importlib.util

def check_dependencies(requirements):
    installed = []
    missing   = []
    for pkg in requirements:
        # Note: package names may differ from import names (e.g. scikit-learn -> sklearn)
        import_name = pkg.replace("-", "_").split(">=")[0].split("==")[0].strip()
        spec = importlib.util.find_spec(import_name)
        if spec:
            installed.append(pkg)
        else:
            missing.append(pkg)
    return {"installed": installed, "missing": missing}

def install_missing(missing):
    # TODO: print pip install command for each missing package
    pass

packages = ["numpy", "pandas", "requests", "flask", "nonexistent_lib", "anotherMissingPkg"]
result = check_dependencies(packages)
print("Installed:", result["installed"])
print("Missing:",   result["missing"])
install_missing(result["missing"])
✅ Practice Checklist
33. Introspection & Metaprogramming

Python's runtime lets you inspect and modify objects, classes, and functions dynamically. Use inspect, dir(), getattr(), and metaclasses for powerful abstractions.

dir(), type(), getattr(), hasattr(), inspect
import inspect

class Rectangle:
    width: float
    height: float

    def __init__(self, w, h):
        self.width = w
        self.height = h

    def area(self):
        return self.width * self.height

    def perimeter(self):
        return 2 * (self.width + self.height)

r = Rectangle(4, 6)

# dir() lists all attributes and methods
attrs = [a for a in dir(r) if not a.startswith("_")]
print("Public attrs:", attrs)

# type() and isinstance()
print("type:", type(r).__name__)
print("isinstance(r, Rectangle):", isinstance(r, Rectangle))
print("isinstance(r, object):   ", isinstance(r, object))

# getattr / setattr / hasattr / delattr
for method in ["area", "perimeter", "nonexistent"]:
    if hasattr(r, method):
        fn = getattr(r, method)
        print(f"{method}(): {fn()}")
    else:
        print(f"{method}: not found")

# inspect module
print("Source file:", inspect.getfile(Rectangle))
sig = inspect.signature(Rectangle.__init__)
print("Signature:", sig)
print("Parameters:", list(sig.parameters.keys()))
__dict__, __class__, MRO, and vars()
class Animal:
    kingdom = "Animalia"

    def __init__(self, name, species):
        self.name    = name
        self.species = species

    def speak(self):
        return "..."

class Dog(Animal):
    def __init__(self, name):
        super().__init__(name, "Canis lupus familiaris")

    def speak(self):
        return "Woof!"

class GoldenRetriever(Dog):
    breed = "Golden Retriever"

g = GoldenRetriever("Buddy")

# Instance __dict__: instance attributes only
print("Instance __dict__:", g.__dict__)

# Class __dict__: class attributes only
print("Class __dict__ keys:", list(GoldenRetriever.__dict__.keys()))

# vars(): same as __dict__ for objects
print("vars(g):", vars(g))

# Method Resolution Order (MRO)
print("MRO:", [c.__name__ for c in GoldenRetriever.__mro__])

# Class attributes vs instance attributes
print("Class attr 'kingdom':", g.kingdom)  # inherited from Animal
g.kingdom = "override"                      # creates instance attr
print("Instance attr 'kingdom':", g.__dict__["kingdom"])
print("Class still has:", Animal.kingdom)
Metaclasses and __init_subclass__
# Metaclass: controls how classes are created

class SingletonMeta(type):
    # Ensure only one instance per class
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class AppConfig(metaclass=SingletonMeta):
    def __init__(self):
        self.debug = False
        self.host  = "localhost"

c1 = AppConfig()
c2 = AppConfig()
c1.debug = True

print("Same object:", c1 is c2)  # True
print("c2.debug:", c2.debug)     # True β€” same instance!

# __init_subclass__: called when a subclass is defined
class PluginBase:
    _registry = {}

    def __init_subclass__(cls, plugin_name=None, **kwargs):
        super().__init_subclass__(**kwargs)
        name = plugin_name or cls.__name__.lower()
        PluginBase._registry[name] = cls
        print(f"Registered plugin: {name!r}")

class CSVPlugin(PluginBase, plugin_name="csv"):
    def run(self): return "csv output"

class JSONPlugin(PluginBase, plugin_name="json"):
    def run(self): return "json output"

print("Registry:", list(PluginBase._registry.keys()))
plugin = PluginBase._registry["csv"]()
print("CSV plugin run:", plugin.run())
💼 Real-World: Auto-Documented API
A REST API framework uses introspection to auto-generate documentation from function signatures and docstrings.
import inspect

class APIRouter:
    def __init__(self):
        self.routes = {}

    def route(self, path, method="GET"):
        def decorator(func):
            sig    = inspect.signature(func)
            doc    = inspect.getdoc(func) or "No description"
            params = {
                name: {"annotation": str(p.annotation.__name__ if p.annotation is not inspect.Parameter.empty else "any"),
                       "default": str(p.default) if p.default is not inspect.Parameter.empty else "required"}
                for name, p in list(sig.parameters.items())[1:]  # skip 'self'
            }
            self.routes[f"{method} {path}"] = {
                "handler": func.__name__,
                "doc":     doc,
                "params":  params,
            }
            return func
        return decorator

    def docs(self):
        for endpoint, info in self.routes.items():
            print(f"\n{endpoint} -> {info['handler']}")
            print(f"  {info['doc']}")
            for p, meta in info["params"].items():
                print(f"  - {p}: {meta['annotation']} (default={meta['default']})")

router = APIRouter()

@router.route("/users", "GET")
def list_users(limit: int = 20, offset: int = 0):
    # Return paginated list of users.
    pass

@router.route("/users/{id}", "GET")
def get_user(user_id: int, include_meta: bool = False):
    # Fetch a single user by ID.
    pass

router.docs()
🏋️ Practice: Class Inspector
Write a function inspect_class(cls) that returns a dict with: 'name' (class name), 'bases' (list of base class names), 'mro' (list of names), 'class_attrs' (non-dunder class-level attributes), 'methods' (public methods with their signatures as strings). Test it on a class you define with inheritance.
Starter Code
import inspect

def inspect_class(cls):
    result = {
        "name":        cls.__name__,
        "bases":       [b.__name__ for b in cls.__bases__],
        "mro":         [c.__name__ for c in cls.__mro__],
        "class_attrs": {},
        "methods":     {},
    }

    for name, val in cls.__dict__.items():
        if name.startswith("_"):
            continue
        if callable(val):
            sig = inspect.signature(val)
            result["methods"][name] = str(sig)
        else:
            result["class_attrs"][name] = repr(val)

    return result

class Vehicle:
    wheels = 4
    fuel   = "gasoline"

    def __init__(self, brand, speed):
        self.brand = brand
        self.speed = speed

    def drive(self, distance: float) -> float:
        return distance / self.speed

class ElectricCar(Vehicle):
    fuel = "electric"

    def charge(self, hours: int) -> str:
        return f"Charging for {hours}h"

for cls in [Vehicle, ElectricCar]:
    info = inspect_class(cls)
    print(f"\n{info['name']}:")
    print(f"  bases: {info['bases']}")
    print(f"  attrs: {info['class_attrs']}")
    print(f"  methods: {info['methods']}")
✅ Practice Checklist
34. Advanced Type Hints

Python's typing module enables static analysis with TypeVar, Generic, Protocol, overload, and Literal. Well-typed code is self-documenting and catches bugs before runtime.

TypeVar and Generic classes
from typing import TypeVar, Generic, Iterable, Optional

T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")

# Generic function: type-safe identity
def first(items: list[T]) -> Optional[T]:
    return items[0] if items else None

print(first([1, 2, 3]))        # int
print(first(["a", "b"]))       # str
print(first([]))               # None

# Generic class: type-safe stack
class Stack(Generic[T]):
    def __init__(self) -> None:
        self._items: list[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        if not self._items:
            raise IndexError("pop from empty stack")
        return self._items.pop()

    def peek(self) -> Optional[T]:
        return self._items[-1] if self._items else None

    def __len__(self) -> int:
        return len(self._items)

int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
int_stack.push(3)
print("peek:", int_stack.peek())  # 3
print("pop: ", int_stack.pop())   # 3
print("len: ", len(int_stack))    # 2
Union, Optional, Literal, Final, TypeAlias
from typing import Union, Optional, Literal, Final
import sys

# Union: accepts multiple types (Python 3.10+: int | str)
def process(value: Union[int, str, float]) -> str:
    return f"Got {type(value).__name__}: {value}"

print(process(42))
print(process("hello"))
print(process(3.14))

# Optional[T] is shorthand for Union[T, None]
def find_user(user_id: int) -> Optional[dict]:
    db = {1: {"name": "Alice"}, 2: {"name": "Bob"}}
    return db.get(user_id)

user = find_user(1)
if user:
    print("Found:", user["name"])

# Literal: restrict to specific values
Mode = Literal["read", "write", "append"]

def open_file(path: str, mode: Mode) -> str:
    return f"Opening {path} in {mode} mode"

print(open_file("data.csv", "read"))

# Final: constant that cannot be reassigned
MAX_RETRIES: Final = 3
API_URL:     Final[str] = "https://api.example.com"

# TypeAlias (Python 3.10+)
if sys.version_info >= (3, 10):
    from typing import TypeAlias
    Vector: TypeAlias = list[float]
    Matrix: TypeAlias = list[list[float]]

print(f"MAX_RETRIES: {MAX_RETRIES}")
@overload for multiple signatures
from typing import overload, Union

# @overload allows multiple type signatures for the same function
# Only the implementation signature uses the body

@overload
def parse(value: str) -> int: ...
@overload
def parse(value: bytes) -> float: ...
@overload
def parse(value: int) -> str: ...

def parse(value: Union[str, bytes, int]) -> Union[int, float, str]:
    if isinstance(value, str):
        return int(value)
    elif isinstance(value, bytes):
        return float(value.decode())
    else:
        return str(value)

print(parse("42"))    # int
print(parse(b"3.14")) # float
print(parse(100))     # str

# TypedDict: dict with typed keys
from typing import TypedDict, NotRequired

class UserRecord(TypedDict):
    id:    int
    name:  str
    email: str
    age:   NotRequired[int]  # optional key

def create_user(data: UserRecord) -> str:
    return f"User {data['name']} ({data['email']})"

user: UserRecord = {"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30}
print(create_user(user))

user2: UserRecord = {"id": 2, "name": "Bob", "email": "bob@example.com"}
print(create_user(user2))  # age is optional
💼 Real-World: Typed Data Pipeline
A production data pipeline uses TypedDict, Generic, and Union to enforce type contracts across stages, catching mismatches early.
from typing import TypedDict, Generic, TypeVar, Optional, Callable
from dataclasses import dataclass, field

T = TypeVar("T")
R = TypeVar("R")

class RawRecord(TypedDict):
    id:    int
    name:  str
    value: float
    valid: bool

class CleanRecord(TypedDict):
    id:    int
    name:  str
    value: float

@dataclass
class Pipeline(Generic[T, R]):
    steps: list[Callable[[T], R]] = field(default_factory=list)

    def add_step(self, fn: Callable) -> "Pipeline":
        self.steps.append(fn)
        return self

    def run(self, data: list[T]) -> list:
        result = data
        for step in self.steps:
            result = [step(r) for r in result if r is not None]
        return result

def filter_valid(r: RawRecord) -> Optional[RawRecord]:
    return r if r["valid"] and r["value"] > 0 else None

def normalize(r: RawRecord) -> CleanRecord:
    return {"id": r["id"], "name": r["name"].strip().title(), "value": round(r["value"], 2)}

records: list[RawRecord] = [
    {"id": 1, "name": "alice smith",  "value": 129.5,  "valid": True},
    {"id": 2, "name": "BOB JONES",    "value": -5.0,   "valid": False},
    {"id": 3, "name": "  carol lee ", "value": 89.99,  "valid": True},
]

pipeline: Pipeline[RawRecord, CleanRecord] = Pipeline()
pipeline.add_step(filter_valid).add_step(normalize)
result = pipeline.run(records)
for r in result:
    print(f"  {r}")
🏋️ Practice: Generic Result Type
Implement a generic Result[T, E] class (inspired by Rust) with two states: Ok(value: T) and Err(error: E). Add methods: is_ok(), is_err(), unwrap() (returns value or raises), unwrap_or(default), map(fn) (applies fn to value if Ok, returns new Result). Write tests using Result[int, str].
Starter Code
from typing import Generic, TypeVar, Callable, Optional
from dataclasses import dataclass

T = TypeVar("T")
E = TypeVar("E")
U = TypeVar("U")

@dataclass
class Result(Generic[T, E]):
    _value: Optional[T] = None
    _error: Optional[E] = None

    @classmethod
    def ok(cls, value: T) -> "Result[T, E]":
        return cls(_value=value)

    @classmethod
    def err(cls, error: E) -> "Result[T, E]":
        return cls(_error=error)

    def is_ok(self) -> bool:
        return self._error is None

    def is_err(self) -> bool:
        return self._error is not None

    def unwrap(self) -> T:
        if self.is_err():
            raise ValueError(f"Called unwrap on Err: {self._error}")
        return self._value

    def unwrap_or(self, default: T) -> T:
        # TODO: return value if ok, else default
        pass

    def map(self, fn: Callable[[T], U]) -> "Result[U, E]":
        # TODO: if ok, return Result.ok(fn(self._value)), else return self
        pass

# Tests
r1 = Result.ok(42)
r2 = Result.err("not found")

print(r1.is_ok(), r1.unwrap())
print(r2.is_err(), r2.unwrap_or(-1))
print(r1.map(lambda x: x * 2).unwrap())
try:    r2.unwrap()
except ValueError as e: print("Caught:", e)
✅ Practice Checklist