π Python Basics
34 topics • Click any card to expand
Python is dynamically typed β you don't declare types, Python infers them. The core types are int, float, str, bool, and NoneType.
# Integers
age = 25
year = 2024
# Floats
price = 9.99
pi = 3.14159
# String
name = "Alice"
# Boolean
active = True
done = False
# NoneType
result = None
# Check types
print(type(age)) # <class 'int'>
print(type(price)) # <class 'float'>
print(type(name)) # <class 'str'>
print(type(active)) # <class 'bool'>
print(type(result)) # <class 'NoneType'># Convert between types
x = "42"
print(int(x) + 8) # 50 (str β int)
print(float(x) * 1.5) # 63.0
n = 3.9
print(int(n)) # 3 (truncates, not rounds)
print(round(n)) # 4 (rounds)
print(str(100) + " items") # "100 items"
print(bool(0)) # False
print(bool("")) # False
print(bool("hello")) # True
print(bool(42)) # True# Multiple assignment
a, b, c = 10, 20, 30
x = y = z = 0
print(a, b, c) # 10 20 30
print(x, y, z) # 0 0 0
# Swap without temp variable
a, b = b, a
print(a, b) # 20 10
# Augmented assignment operators
score = 100
score += 15 # 115
score -= 5 # 110
score *= 2 # 220
score //= 3 # 73
score **= 2 # 5329
print("Score:", score)
# Readable large numbers
population = 8_100_000_000
pi_approx = 3.141_592_653
print(f"Population: {population:,}")# Complex numbers
z1 = 3 + 4j
z2 = complex(1, -2)
print(f"z1 = {z1}, real={z1.real}, imag={z1.imag}")
print(f"|z1| = {abs(z1)}") # magnitude: 5.0
print(f"z1 + z2 = {z1 + z2}")
print(f"z1 * z2 = {z1 * z2}")
# None checks β always use 'is' / 'is not', never ==
result = None
if result is None:
print("result is None")
data = [0, "", None, False, 42, "hello"]
for item in data:
falsy = "falsy" if not item else "truthy"
none_check = " (is None)" if item is None else ""
print(f" {str(item):8s} -> {falsy}{none_check}")
# isinstance β safer than type() ==
values = [42, 3.14, "hi", True, None, [1,2]]
for v in values:
print(f" {str(v):8s} int={isinstance(v, int)} "
f"float={isinstance(v, float)} str={isinstance(v, str)}")# Simulating user input processing
def parse_order(quantity_str, price_str, discount_str):
try:
quantity = int(quantity_str)
price = float(price_str)
discount = float(discount_str) / 100
except ValueError as e:
return f"Invalid input: {e}"
subtotal = quantity * price
total = subtotal * (1 - discount)
return {
"quantity": quantity,
"price": price,
"discount": f"{discount:.0%}",
"total": round(total, 2)
}
print(parse_order("3", "29.99", "10"))
print(parse_order("abc", "9.99", "5"))name = "YOUR_NAME"
age = 25 # set your age
height = 1.75 # set height in meters
# TODO: swap age and height using one line
# age, height = ???
# TODO: check if the original age (now stored in height) is 18-65
is_working_age = ???
# Expected: "Alice | Age: 1.75 | Height: 25m | Working age: True"
print(f"{name} | Age: {age} | Height: {height}m | Working age: {is_working_age}")Strings are sequences of characters. Python provides rich built-in methods for slicing, formatting, searching, and transforming text.
text = " Hello, World! "
print(text.strip()) # remove whitespace
print(text.lower()) # lowercase
print(text.upper()) # uppercase
print(text.replace("World", "Python"))
print(text.strip().split(", ")) # ['Hello', 'World!']
# Slicing
s = "Python"
print(s[0]) # P
print(s[-1]) # n
print(s[1:4]) # yth
print(s[::-1]) # nohtyP (reverse)
print(len(s)) # 6name = "Alice"
score = 98.567
rank = 1
# f-string (recommended)
print(f"Name: {name}, Score: {score:.2f}, Rank: #{rank}")
# Padding and alignment
for item, price in [("Apple", 0.5), ("Banana", 0.25), ("Cherry", 1.99)]:
print(f"{item:<10} ${price:>6.2f}")
# Multi-line string
message = (
f"Congratulations {name}!
"
f"Your score of {score:.1f} earned rank #{rank}."
)
print(message)sentence = "Python is powerful, Python is readable, Python is fun"
print(sentence.count("Python")) # 3
print(sentence.find("readable")) # index of first match
print(sentence.startswith("Python")) # True
print(sentence.endswith("fun")) # True
# Split and join
parts = sentence.split(", ")
rejoined = " | ".join(parts)
print(rejoined)
# strip variants
messy = " hello world "
print(repr(messy.strip())) # 'hello world'
# partition β splits at first match only
before, sep, after = sentence.partition(" is ")
print(f"Before: '{before}'")
print(f"After: '{after[:30]}...'")
# replace with count limit
print(sentence.replace("Python", "Ruby", 1)) # only firstimport math
# Format spec: [[fill]align][sign][width][grouping][.precision][type]
pi = math.pi
print(f"{'pi':>12s}: {pi:>12.6f}") # right-align, 6 decimals
print(f"{'pi':>12s}: {pi:>12.4e}") # scientific notation
print(f"{'pi':>12s}: {pi:>12.2%}") # as percentage
# Table with column alignment
header = f"{'Name':<15} {'Score':>8} {'Grade':>6} {'Bar':}"
print(header)
print("-" * 45)
students = [("Alice", 92.5), ("Bob", 74.3), ("Carol Marie", 88.0)]
for name, score in students:
grade = "A" if score >= 90 else "B" if score >= 80 else "C"
bar = "#" * int(score // 10)
print(f"{name:<15} {score:>8.1f} {grade:>6} {bar}")
# Nested expressions inside f-strings
items = [3, 1, 4, 1, 5, 9, 2, 6]
print(f"max={max(items)}, sum={sum(items)}, avg={sum(items)/len(items):.2f}")
# Debug format (Python 3.8+): variable=value
x = 42
print(f"{x=}, {x**2=}, {math.sqrt(x)=:.4f}")import datetime
logs = [
"[2024-01-15 09:23:11] ERROR login_service: Invalid credentials for user bob@example.com",
"[2024-01-15 09:24:55] INFO auth_service: Token issued for alice@example.com",
"[2024-01-15 09:25:03] WARNING api_gateway: Rate limit 80% for IP 192.168.1.42",
]
print(f"{'Time':9s} {'Level':8s} {'Service':15s} {'Message'}")
print("-" * 65)
for log in logs:
# Parse: [datetime] LEVEL service: message
ts = log[1:20]
rest = log[22:].strip()
parts = rest.split(None, 2)
level = parts[0]
service = parts[1].rstrip(":")
msg = parts[2] if len(parts) > 2 else ""
print(f"{ts[11:]:9s} {level:8s} {service:15s} {msg}")raw = " super-pro Widget X200 "
# 1. Strip whitespace and title-case
clean = raw.strip().title()
# 2. TODO: Replace hyphens with spaces
# clean = clean.replace(???)
# 3. TODO: Check 'pro' in original string (case-insensitive)
# has_pro = "pro" in raw.???()
# 4. TODO: Build 6-char code: first 3 + last 3 of clean (no spaces), uppercase
# no_spaces = clean.replace(" ", "")
# code = (no_spaces[:3] + ???).upper()
print(f"Clean: '{clean}'")
# print(f"Has pro: {has_pro}")
# print(f"Code: '{code}'")
# Expected: Clean='Super Pro Widget X200', Code='SUP200'Lists are ordered, mutable sequences. They're the most commonly used Python container β used for collections, stacks, queues, and more.
fruits = ["apple", "banana", "cherry", "date"]
print(fruits[0]) # apple
print(fruits[-1]) # date
print(fruits[1:3]) # ['banana', 'cherry']
# Modify
fruits.append("elderberry") # add to end
fruits.insert(1, "avocado") # insert at index 1
fruits.remove("banana") # remove by value
popped = fruits.pop() # remove & return last
print(fruits)
print("Popped:", popped)
print("Length:", len(fruits))nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3]
nums.sort()
print("Sorted:", nums)
print("Reversed:", nums[::-1])
print("Count of 5:", nums.count(5))
print("Index of 9:", nums.index(9))
print("Sum:", sum(nums))
print("Max:", max(nums), "Min:", min(nums))
# List comprehension
squares = [x**2 for x in range(1, 6)]
evens = [x for x in range(20) if x % 2 == 0]
print("Squares:", squares)
print("Evens:", evens)# Nested list (2D matrix)
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
print("Center:", matrix[1][1]) # 5
# Flatten nested list
flat = [x for row in matrix for x in row]
print("Flat:", flat)
# Transpose with comprehension
transposed = [[matrix[r][c] for r in range(3)] for c in range(3)]
print("Transposed[0]:", transposed[0])
# map and filter
nums = [1, -2, 3, -4, 5, -6]
doubled = list(map(lambda x: x * 2, nums))
positives = list(filter(lambda x: x > 0, nums))
print("Doubled: ", doubled)
print("Positives:", positives)
# any / all
print("any > 4:", any(x > 4 for x in nums))
print("all > 0:", all(x > 0 for x in nums))import bisect
# sort() mutates in place; sorted() returns a new list
nums = [5, 2, 8, 1, 9, 3]
new_sorted = sorted(nums) # original unchanged
nums.sort() # in-place
print("sorted():", new_sorted)
print("sort() in-place:", nums)
# key= β sort by custom criteria
words = ["banana", "Apple", "cherry", "date", "FIG"]
print(sorted(words)) # case-sensitive lexicographic
print(sorted(words, key=str.lower)) # case-insensitive
print(sorted(words, key=len)) # by length
print(sorted(words, key=lambda w: (-len(w), w.lower()))) # len desc, alpha asc
# Sorting tuples: sort by 2nd element desc, then 1st asc
people = [("Bob",25), ("Alice",30), ("Carol",25), ("Dave",30)]
print(sorted(people, key=lambda p: (-p[1], p[0])))
# bisect β fast insertion point in a sorted list (binary search)
scores = [45, 58, 67, 74, 82, 88, 95]
new_score = 79
pos = bisect.bisect_left(scores, new_score)
bisect.insort(scores, new_score) # inserts in sorted order
print(f"Inserted {new_score} at index {pos}: {scores}")
print(f"Rank from top: {len(scores) - pos} of {len(scores)}")students = [
("Alice", 92), ("Bob", 74), ("Carol", 88),
("Dave", 51), ("Eve", 96), ("Frank", 63),
("Grace", 85), ("Hank", 47), ("Iris", 79),
]
scores = [s[1] for s in students]
avg = sum(scores) / len(scores)
passing = [(n, s) for n, s in students if s >= 60]
failing = [(n, s) for n, s in students if s < 60]
ranked = sorted(students, key=lambda x: x[1], reverse=True)
print(f"Class average: {avg:.1f}")
print(f"Passing ({len(passing)}): {[n for n,_ in passing]}")
print(f"Failing ({len(failing)}): {[(n,s) for n,s in failing]}")
print("Top 3:", ranked[:3])temps_c = [22.5, 35.1, 18.0, 40.2, 28.7, 15.3, 33.8, 25.0]
# 1. TODO: Convert to Fahrenheit using list comprehension
# temps_f = [??? for t in temps_c]
# 2. TODO: Filter days above 30Β°C
# hot_days = [??? for t in temps_c if ???]
# 3. TODO: Sort temps_c descending
# sorted_desc = sorted(???, reverse=True)
# 4. TODO: Min and max
# lo, hi = min(temps_c), max(temps_c)
print("Fahrenheit:", [round(f, 1) for f in temps_f])
print("Hot days:", sorted(hot_days))
print("Sorted desc:", sorted_desc)
print(f"Range: {lo}Β°C β {hi}Β°C")Tuples are immutable sequences; sets are unordered unique collections; dictionaries are key-value mappings.
# Tuple β immutable
point = (3, 7)
x, y = point # unpacking
print(f"x={x}, y={y}")
rgb = (255, 128, 0)
print("Red channel:", rgb[0])
# Set β unique, unordered
a = {1, 2, 3, 4, 5}
b = {3, 4, 5, 6, 7}
print("Union: ", a | b)
print("Intersection:", a & b)
print("Difference: ", a - b)
tags = ["python", "data", "python", "ml", "data"]
unique_tags = set(tags)
print("Unique tags:", unique_tags)person = {"name": "Alice", "age": 30, "city": "NYC"}
print(person["name"]) # Alice
print(person.get("email", "N/A")) # safe get with default
# Add / update
person["email"] = "alice@example.com"
person["age"] = 31
del person["city"]
print(person)
print("Keys:", list(person.keys()))
print("Values:", list(person.values()))
# Iterate
for k, v in person.items():
print(f" {k}: {v}")# Dict comprehension from zip
students = ["Alice", "Bob", "Carol", "Dave"]
scores = [92, 74, 88, 51]
grade_map = dict(zip(students, scores))
print("Grade map:", grade_map)
# Filter with dict comprehension
passing = {name: score for name, score in grade_map.items() if score >= 60}
print("Passing:", passing)
# Map scores to letter grades
def letter(s):
return "A" if s >= 90 else "B" if s >= 80 else "C" if s >= 70 else "D" if s >= 60 else "F"
letters = {name: letter(score) for name, score in grade_map.items()}
print("Letters:", letters)
# Dict merging with ** operator (Python 3.5+)
defaults = {"timeout": 30, "retries": 3, "verbose": False}
overrides = {"retries": 5, "verbose": True}
config = {**defaults, **overrides} # overrides wins on conflict
print("Config:", config)
# Python 3.9+ merge operator (| and |=)
# config = defaults | overridesfrom collections import OrderedDict, ChainMap
# OrderedDict β remembers insertion order (useful for LRU-style caches)
od = OrderedDict()
od["first"] = 1
od["second"] = 2
od["third"] = 3
od.move_to_end("first") # move 'first' to the end
print("OrderedDict:", list(od.keys()))
# popitem(last=False) removes from the front (FIFO)
key, val = od.popitem(last=False)
print(f"Popped first: {key}={val}, remaining: {list(od.keys())}")
# ChainMap β single view over multiple dicts (first match wins)
defaults = {"color": "blue", "size": "M", "font": "Arial"}
user_prefs = {"color": "red", "size": "L"}
session = {"font": "Helvetica"}
merged = ChainMap(session, user_prefs, defaults)
print("color:", merged["color"]) # 'red' (user_prefs wins)
print("font:", merged["font"]) # 'Helvetica' (session wins)
# Dict views are live β they reflect changes
d = {"a": 1, "b": 2, "c": 3}
keys_view = d.keys()
d["d"] = 4
print("Live keys view:", list(keys_view)) # includes 'd'
# Set operations with update / intersection_update
a = {1, 2, 3, 4, 5}
b = {3, 4, 5, 6, 7}
a.update({8, 9}) # union in-place (|=)
print("After update:", sorted(a))
a.intersection_update(b | {8}) # keep only items in both (a &= ...)
print("After intersection_update:", sorted(a))inventory = {
"apple": {"qty": 150, "price": 0.50, "min_stock": 50},
"banana": {"qty": 30, "price": 0.25, "min_stock": 40},
"milk": {"qty": 10, "price": 2.99, "min_stock": 20},
"bread": {"qty": 80, "price": 3.49, "min_stock": 15},
"cheese": {"qty": 5, "price": 5.99, "min_stock": 10},
}
reorder = {item for item, data in inventory.items()
if data["qty"] < data["min_stock"]}
total_value = sum(d["qty"] * d["price"] for d in inventory.values())
print(f"Total inventory value: ${total_value:.2f}")
print(f"Items to reorder ({len(reorder)}): {reorder}")
for item in sorted(reorder):
d = inventory[item]
print(f" {item:8s} qty={d['qty']:3d} min={d['min_stock']:3d} (order {d['min_stock']*2 - d['qty']} units)")students = ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank"]
scores = [92, 58, 76, 45, 88, 63]
# TODO: Build grade_book dict from zip(students, scores)
# grade_book = dict(???)
# TODO: Find failed students (score < 60) using a set comprehension
# failed = {name for name, score in ???.items() if ???}
# TODO: Map each student to a letter grade with dict comprehension
# Use: A>=90, B>=80, C>=70, D>=60, F otherwise
# Hint: define a helper or use nested ternary
# letter_grades = {name: ??? for name, score in grade_book.items()}
# TODO: Merge grade_book with a "class_info" dict using **
# class_info = {"class": "Python 101", "semester": "Spring 2024"}
# full_record = {**class_info, "grades": letter_grades}
print("Grade book:", grade_book)
print("Failed:", failed)
print("Letter grades:", letter_grades)if/elif/else controls which code runs. Python uses indentation (4 spaces) instead of curly braces to define blocks.
# Basic if-elif-else
temperature = 28
if temperature > 35:
status = "Heat warning"
elif temperature > 25:
status = "Warm"
elif temperature > 15:
status = "Comfortable"
elif temperature > 5:
status = "Cool"
else:
status = "Cold"
print(f"{temperature}Β°C β {status}")
# Ternary (one-liner)
label = "Pass" if temperature > 20 else "Fail"
print("Label:", label)
# Chained comparisons
x = 15
if 10 < x < 20:
print(f"{x} is between 10 and 20")# and, or, not
age = 22
income = 55000
eligible = age >= 18 and income >= 30000
print("Eligible:", eligible)
username = ""
display = username or "Anonymous"
print("Display name:", display)
# in / not in
role = "editor"
allowed = ["admin", "editor", "moderator"]
if role in allowed:
print(f"{role} has access")
# Walrus operator (Python 3.8+)
data = [1, 2, 3]
if n := len(data):
print(f"List has {n} items")# match statement (Python 3.10+) β structured pattern matching
def http_status(code):
match code:
case 200:
return "OK"
case 201:
return "Created"
case 400:
return "Bad Request"
case 401 | 403:
return "Auth error"
case 404:
return "Not Found"
case 500:
return "Server Error"
case _:
return f"Unknown ({code})"
for code in [200, 201, 403, 404, 418]:
print(f" {code} β {http_status(code)}")
# any() / all() for password strength
def check_password(pw):
checks = {
"length >= 8": len(pw) >= 8,
"has uppercase": any(c.isupper() for c in pw),
"has digit": any(c.isdigit() for c in pw),
"has symbol": any(c in "!@#$%^&*()" for c in pw),
}
for rule, ok in checks.items():
print(f" {'OK' if ok else 'FAIL':4s} {rule}")
return all(checks.values())
print("Strong:", check_password("Secure@9"))
print("Strong:", check_password("weakpass"))# Short-circuit evaluation
# 'and' stops at the first falsy value, 'or' stops at first truthy
def expensive():
print(" [expensive() called]")
return True
print("--- short-circuit AND ---")
result = False and expensive() # expensive() never called
print("Result:", result)
print("--- short-circuit OR ---")
result = True or expensive() # expensive() never called
print("Result:", result)
# Practical: safe attribute access via short-circuit
user = None
name = user and user.get("name", "") # won't crash if user is None
print("Name:", name) # None (short-circuited)
user = {"name": "Alice", "role": "admin"}
name = user and user.get("name", "")
print("Name:", name) # "Alice"
# assert β for debugging invariants (disabled with python -O)
def divide(a, b):
assert b != 0, f"Divisor must not be zero, got b={b}"
return a / b
print(divide(10, 2))
try:
divide(5, 0)
except AssertionError as e:
print(f"AssertionError: {e}")
# Conditional import β try fast C lib, fall back to pure Python
try:
import ujson as json_lib # fast third-party JSON
print("Using ujson")
except ImportError:
import json as json_lib # stdlib fallback
print("Using stdlib json")
data = json_lib.dumps({"key": "value", "nums": [1, 2, 3]})
print("Encoded:", data)def check_loan(age, income, credit_score, existing_debt):
# Basic eligibility
if age < 18:
return "REJECTED", "Must be 18+"
if income < 20000:
return "REJECTED", "Minimum income $20,000"
if credit_score < 580:
return "REJECTED", "Credit score below 580"
debt_to_income = existing_debt / income
if debt_to_income > 0.5:
return "REJECTED", f"Debt-to-income {debt_to_income:.0%} exceeds 50%"
# Approved β determine tier
if credit_score >= 750 and debt_to_income < 0.2:
rate = 4.5
tier = "Prime"
elif credit_score >= 680:
rate = 6.9
tier = "Standard"
else:
rate = 11.5
tier = "Subprime"
return "APPROVED", f"{tier} rate: {rate}%"
applicants = [
(25, 65000, 720, 5000),
(17, 80000, 800, 0),
(35, 90000, 760, 8000),
(30, 25000, 620, 15000),
]
for a in applicants:
status, msg = check_loan(*a)
print(f" Age={a[0]}, Income=${a[1]:,}, Score={a[2]} β {status}: {msg}")def traffic_action(color, has_pedestrian=False, is_emergency=False):
# TODO: if is_emergency, all lights should yield β return "All yield for emergency"
# TODO: use if/elif/else on color:
# "green" -> "Go" (but if has_pedestrian -> "Go, watch for pedestrians")
# "yellow" -> "Slow down" (but if has_pedestrian -> "Stop for pedestrians")
# "red" -> "Stop" (but if has_pedestrian -> "Stop β pedestrians crossing")
# default -> f"Unknown signal: {color}"
pass
# Test cases
print(traffic_action("green")) # Go
print(traffic_action("green", has_pedestrian=True)) # Go, watch for pedestrians
print(traffic_action("yellow")) # Slow down
print(traffic_action("red", has_pedestrian=True)) # Stop β pedestrians crossing
print(traffic_action("red", is_emergency=True)) # All yield for emergency
print(traffic_action("purple")) # Unknown signal: purplefor iterates over any iterable (list, range, string, dict). while loops run while a condition is True. Use break, continue, and enumerate for control.
# Loop over list
fruits = ["apple", "banana", "cherry"]
for fruit in fruits:
print(fruit)
# Range
for i in range(1, 6):
print(i, end=" ")
print()
# enumerate β get index + value
for i, fruit in enumerate(fruits, start=1):
print(f"{i}. {fruit}")
# zip β loop two lists together
prices = [0.5, 0.25, 1.99]
for fruit, price in zip(fruits, prices):
print(f" {fruit}: ${price}")# while loop
count = 0
total = 0
while count < 5:
total += count
count += 1
print(f"Sum 0..4 = {total}")
# break β exit early
for n in range(100):
if n * n > 50:
print(f"First n where nΒ²>50: {n}")
break
# continue β skip current iteration
for n in range(10):
if n % 2 == 0:
continue # skip even numbers
print(n, end=" ")
print()
# else on for loop (runs if not broken)
for n in range(2, 10):
if 7 % n == 0 and n != 7:
print("7 is not prime"); break
else:
print("7 is prime")# Multiplication table using nested loops
print("Multiplication table (1-5):")
for i in range(1, 6):
row = ""
for j in range(1, 6):
row += f"{i*j:4d}"
print(row)
# itertools.product β Cartesian product (like nested loops)
import itertools
suits = ["β ", "β₯", "β¦", "β£"]
values = ["A", "K", "Q"]
cards = list(itertools.product(values, suits))
print(f"\n{len(cards)} high cards:", cards[:4], "...")
# Running maximum accumulator pattern
readings = [12, 7, 25, 18, 30, 14, 42, 9, 36]
running_max = []
current_max = float("-inf")
for val in readings:
if val > current_max:
current_max = val
running_max.append(current_max)
print("\nReadings: ", readings)
print("Running max: ", running_max)import itertools
# chain β iterate multiple iterables as one
a = [1, 2, 3]
b = ("four", "five")
c = range(6, 9)
for item in itertools.chain(a, b, c):
print(item, end=" ")
print()
# islice β lazy slice of an iterator (no list copy)
gen = (x**2 for x in itertools.count(1)) # infinite squares
first_10 = list(itertools.islice(gen, 10))
print("First 10 squares:", first_10)
# takewhile / dropwhile β conditional iteration
data = [2, 4, 6, 7, 8, 10, 12]
taken = list(itertools.takewhile(lambda x: x % 2 == 0, data))
dropped = list(itertools.dropwhile(lambda x: x % 2 == 0, data))
print("takewhile even:", taken) # [2, 4, 6] β stops at 7
print("dropwhile even:", dropped) # [7, 8, 10, 12] β starts at 7
# groupby β group consecutive items by a key (data must be sorted by key first)
entries = [
("Alice", "Engineering"), ("Bob", "Engineering"),
("Carol", "Marketing"), ("Dave", "Marketing"),
("Eve", "Engineering"),
]
entries.sort(key=lambda e: e[1]) # sort by department first
for dept, group in itertools.groupby(entries, key=lambda e: e[1]):
names = [name for name, _ in group]
print(f" {dept}: {names}")weekly_sales = [42000, 38500, 51000, 47200, 29800, 55600, 48900, 61000, 39700, 52300]
target = 45000
best_week = 0
best_amount = 0
total = 0
above_target = 0
for week, sales in enumerate(weekly_sales, start=1):
total += sales
if sales > best_amount:
best_amount = sales
best_week = week
status = "β" if sales >= target else "β"
if sales >= target:
above_target += 1
print(f" Week {week:2d}: ${sales:>7,} {status}")
avg = total / len(weekly_sales)
print(f"
Total: ${total:>9,}")
print(f"Average: ${avg:>9,.0f}")
print(f"Best week: Week {best_week} (${best_amount:,})")
print(f"On target: {above_target}/{len(weekly_sales)} weeks")results = []
for n in range(1, 31):
label = ""
# TODO: if divisible by 3, add "Fizz" to label
# if n % 3 == 0: label += ???
# TODO: if divisible by 5, add "Buzz" to label
# TODO: if divisible by 7, add "Zap" to label
# TODO: if label is still empty, use the number itself
# results.append(label if label else str(n))
pass
# Print 10 per line
for i in range(0, 30, 10):
print(" " + " ".join(f"{v:8s}" for v in results[i:i+10]))
# Expected row 1: 1 2 Fizz 4 Buzz Fizz Zap 8 Fizz BuzzFunctions let you encapsulate reusable logic. Python supports default arguments, *args, **kwargs, and lambda (anonymous) functions.
def greet(name, greeting="Hello"):
# Returns a greeting string
return f"{greeting}, {name}!"
print(greet("Alice"))
print(greet("Bob", "Hi"))
print(greet(name="Carol", greeting="Hey"))
# Multiple return values (returns a tuple)
def stats(numbers):
return min(numbers), max(numbers), sum(numbers)/len(numbers)
lo, hi, avg = stats([4, 8, 2, 9, 1, 7])
print(f"min={lo}, max={hi}, avg={avg:.2f}")# *args β variable positional arguments
def add_all(*args):
return sum(args)
print(add_all(1, 2, 3)) # 6
print(add_all(10, 20, 30, 40)) # 100
# **kwargs β variable keyword arguments
def build_profile(**kwargs):
return {k: v for k, v in kwargs.items()}
print(build_profile(name="Alice", age=30, role="admin"))
# Lambda (anonymous function)
square = lambda x: x ** 2
multiply = lambda x, y: x * y
nums = [3, 1, 4, 1, 5, 9, 2, 6]
print(sorted(nums))
print(sorted(nums, key=lambda x: -x)) # descendingimport time, functools
# Closure β inner function captures outer variable
def make_counter(start=0):
count = [start] # mutable container so inner fn can modify
def counter():
count[0] += 1
return count[0]
return counter
c1 = make_counter()
c2 = make_counter(10)
print(c1(), c1(), c1()) # 1 2 3
print(c2(), c2()) # 11 12 (independent state)
# Decorator β wraps a function to add behaviour
def timer(func):
@functools.wraps(func) # preserves __name__, __doc__
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.6f}s")
return result
return wrapper
@timer
def slow_sum(n):
return sum(range(n))
total = slow_sum(1_000_000)
print(f"Sum = {total:,}")
print("Function name preserved:", slow_sum.__name__)from typing import List, Dict, Optional, Union, Callable
import functools, inspect
# Type hints β document intent, checked by mypy (not enforced at runtime)
def calculate_total(
prices: List[float],
tax_rate: float = 0.08,
discount: Optional[float] = None,
) -> Dict[str, float]:
subtotal = sum(prices)
disc_amt = subtotal * discount if discount else 0.0
taxable = subtotal - disc_amt
total = taxable * (1 + tax_rate)
return {"subtotal": round(subtotal, 2),
"discount": round(disc_amt, 2),
"tax": round(taxable * tax_rate, 2),
"total": round(total, 2)}
result = calculate_total([9.99, 24.50, 4.99], discount=0.1)
for k, v in result.items():
print(f" {k:10s}: ${v:.2f}")
# functools.reduce β fold sequence into single value
from functools import reduce
factorial = reduce(lambda acc, x: acc * x, range(1, 8)) # 7! = 5040
print(f"7! = {factorial}")
running_totals = []
reduce(lambda acc, x: (running_totals.append(acc + x), acc + x)[1],
[10, 20, 30, 40], 0)
print("Running totals:", running_totals)
# inspect β introspect function signatures at runtime
def my_func(a: int, b: float = 3.14, *args, keyword: str = "hi", **kwargs):
pass
sig = inspect.signature(my_func)
for name, param in sig.parameters.items():
kind = str(param.kind).split(".")[-1]
default = param.default if param.default is not inspect.Parameter.empty else "required"
print(f" {name:10s} [{kind:20s}] default={default}")def clean_name(name):
return " ".join(w.capitalize() for w in name.strip().split())
def clean_email(email):
return email.strip().lower()
def validate_age(age, min_age=0, max_age=120):
try:
a = int(age)
return a if min_age <= a <= max_age else None
except (ValueError, TypeError):
return None
def clean_record(record):
return {
"name": clean_name(record.get("name", "")),
"email": clean_email(record.get("email", "")),
"age": validate_age(record.get("age")),
}
raw_records = [
{"name": " alice SMITH ", "email": "Alice@Example.COM ", "age": "28"},
{"name": "BOB jones", "email": "bob@company.com", "age": "abc"},
{"name": "carol White", "email": " CAROL@test.org", "age": "200"},
]
for rec in raw_records:
cleaned = clean_record(rec)
valid = "OK" if cleaned["age"] is not None else "INVALID AGE"
print(f" {cleaned['name']:18s} | {cleaned['email']:25s} | age={cleaned['age']} {valid}")def memoize(func):
cache = {}
# TODO: define wrapper(*args) that:
# 1. checks if args is already in cache
# 2. if yes, returns cache[args]
# 3. if no, calls func(*args), stores in cache, returns result
# TODO: use functools.wraps(func) to preserve metadata
# TODO: return wrapper
pass
import functools
@memoize
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# Test: should complete instantly even for large n
print([fibonacci(i) for i in range(10)]) # [0,1,1,2,3,5,8,13,21,34]
print(fibonacci(35)) # 9227465 β fast with memoize!Classes define blueprints for objects. Python supports encapsulation, inheritance, and special (dunder) methods like __str__ and __repr__.
class BankAccount:
def __init__(self, owner, balance=0):
self.owner = owner
self.balance = balance
self._history = [] # convention: private
def deposit(self, amount):
if amount > 0:
self.balance += amount
self._history.append(f"+{amount:.2f}")
def withdraw(self, amount):
if amount > self.balance:
print("Insufficient funds")
else:
self.balance -= amount
self._history.append(f"-{amount:.2f}")
def __str__(self):
return f"Account({self.owner}, ${self.balance:.2f})"
acc = BankAccount("Alice", 1000)
acc.deposit(500)
acc.withdraw(200)
print(acc)
print("History:", acc._history)class Animal:
def __init__(self, name, sound):
self.name = name
self.sound = sound
def speak(self):
return f"{self.name} says {self.sound}!"
class Dog(Animal):
def __init__(self, name, breed):
super().__init__(name, "Woof")
self.breed = breed
def fetch(self, item):
return f"{self.name} fetches the {item}!"
class Cat(Animal):
def __init__(self, name):
super().__init__(name, "Meow")
def purr(self):
return f"{self.name} purrs..."
dog = Dog("Rex", "Labrador")
cat = Cat("Whiskers")
print(dog.speak(), dog.fetch("ball"))
print(cat.speak(), cat.purr())class Temperature:
def __init__(self, celsius):
self._celsius = celsius
@property
def celsius(self):
return self._celsius
@celsius.setter
def celsius(self, value):
if value < -273.15:
raise ValueError("Temperature below absolute zero!")
self._celsius = value
@property
def fahrenheit(self):
return self._celsius * 9/5 + 32
@classmethod
def from_fahrenheit(cls, f):
return cls((f - 32) * 5/9)
def __repr__(self):
return f"Temperature({self._celsius:.2f}Β°C / {self.fahrenheit:.2f}Β°F)"
def __lt__(self, other):
return self._celsius < other._celsius
def __eq__(self, other):
return self._celsius == other._celsius
def __add__(self, other):
return Temperature(self._celsius + other._celsius)
t1 = Temperature(100)
t2 = Temperature.from_fahrenheit(32) # 0Β°C
t3 = t1 + t2
print(t1) # 100Β°C / 212Β°F
print(t2) # 0Β°C / 32Β°F
print(t3) # 100Β°C sum
print(t2 < t1) # True
print(sorted([t1, t2, t3]))from abc import ABC, abstractmethod
from dataclasses import dataclass, field
import sys
# Abstract Base Class β define an interface that subclasses must implement
class Shape(ABC):
@abstractmethod
def area(self) -> float:
...
@abstractmethod
def perimeter(self) -> float:
...
def describe(self):
return f"{type(self).__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"
class Circle(Shape):
def __init__(self, radius: float):
self.radius = radius
def area(self):
import math; return math.pi * self.radius ** 2
def perimeter(self):
import math; return 2 * math.pi * self.radius
class Rectangle(Shape):
def __init__(self, w: float, h: float):
self.w, self.h = w, h
def area(self): return self.w * self.h
def perimeter(self): return 2 * (self.w + self.h)
for shape in [Circle(5), Rectangle(4, 6)]:
print(shape.describe())
# @dataclass β auto-generates __init__, __repr__, __eq__
@dataclass(order=True)
class Point:
x: float
y: float
label: str = field(default="", compare=False)
def distance_to(self, other: "Point") -> float:
return ((self.x - other.x)**2 + (self.y - other.y)**2) ** 0.5
p1 = Point(0, 0, "origin")
p2 = Point(3, 4, "target")
print(p1, p2)
print(f"Distance: {p1.distance_to(p2):.2f}")
print("Sorted:", sorted([p2, p1]))
# __slots__ β restrict attributes, save memory
class SlottedPoint:
__slots__ = ("x", "y")
def __init__(self, x, y):
self.x, self.y = x, y
sp = SlottedPoint(1, 2)
print(f"SlottedPoint: ({sp.x}, {sp.y})")
try:
sp.z = 99 # can't add new attributes
except AttributeError as e:
print(f"AttributeError: {e}")class Product:
def __init__(self, name, price, category):
self.name = name
self.price = price
self.category = category
def __repr__(self):
return f"{self.name} (${self.price:.2f})"
class Cart:
def __init__(self, user):
self.user = user
self.items = []
def add(self, product, qty=1):
self.items.append({"product": product, "qty": qty})
def subtotal(self):
return sum(i["product"].price * i["qty"] for i in self.items)
def apply_discount(self, code):
discounts = {"SAVE10": 0.10, "HALF50": 0.50, "VIP20": 0.20}
return discounts.get(code.upper(), 0)
def checkout(self, code=""):
sub = self.subtotal()
discount = self.apply_discount(code)
total = sub * (1 - discount)
print(f"Cart for {self.user}:")
for i in self.items:
print(f" {i['product'].name:15s} x{i['qty']} ${i['product'].price * i['qty']:.2f}")
print(f" Subtotal: ${sub:.2f}")
if discount: print(f" Discount: -{discount:.0%}")
print(f" Total: ${total:.2f}")
cart = Cart("Alice")
cart.add(Product("Laptop", 999.99, "Electronics"), 1)
cart.add(Product("Mouse", 29.99, "Electronics"), 2)
cart.add(Product("Notebook", 5.99, "Stationery"), 3)
cart.checkout("SAVE10")class Stack:
def __init__(self):
# TODO: initialise internal list self._data = []
pass
def push(self, item):
# TODO: append item to self._data
pass
def pop(self):
# TODO: raise IndexError("pop from empty stack") if empty
# TODO: otherwise remove and return the top item
pass
def peek(self):
# TODO: raise IndexError("peek from empty stack") if empty
# TODO: otherwise return top item WITHOUT removing it
pass
def __len__(self):
# TODO: return number of items
pass
def __repr__(self):
# TODO: return something like Stack([1, 2, 3]) β top is rightmost
pass
# Tests
s = Stack()
s.push(1); s.push(2); s.push(3)
print(s) # Stack([1, 2, 3])
print(len(s)) # 3
print(s.peek()) # 3
print(s.pop()) # 3
print(s) # Stack([1, 2])
try:
Stack().pop()
except IndexError as e:
print(f"Caught: {e}")Use try/except/finally to handle exceptions gracefully. Raise custom exceptions to signal application-level errors.
# Basic exception handling
def safe_divide(a, b):
try:
result = a / b
except ZeroDivisionError:
return "Error: cannot divide by zero"
except TypeError as e:
return f"Error: {e}"
else:
return result # runs if no exception
finally:
print("safe_divide() called") # always runs
print(safe_divide(10, 2))
print(safe_divide(10, 0))
print(safe_divide("x", 2))class InsufficientFundsError(Exception):
def __init__(self, amount, balance):
self.amount = amount
self.balance = balance
super().__init__(f"Tried to withdraw ${amount:.2f}, only ${balance:.2f} available")
def withdraw(balance, amount):
if not isinstance(amount, (int, float)):
raise TypeError(f"Amount must be a number, got {type(amount).__name__}")
if amount <= 0:
raise ValueError("Amount must be positive")
if amount > balance:
raise InsufficientFundsError(amount, balance)
return balance - amount
for args in [(100, 30), (100, 200), (100, -10), (100, "abc")]:
try:
new_bal = withdraw(*args)
print(f"Withdrew {args[1]}, new balance: {new_bal}")
except (InsufficientFundsError, ValueError, TypeError) as e:
print(f"Error: {e}")import time
# Custom context manager using __enter__ / __exit__
class Timer:
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self.start
print(f"Elapsed: {self.elapsed:.6f}s")
return False # don't suppress exceptions
with Timer() as t:
total = sum(range(500_000))
print(f"Sum = {total:,}")
# Exception chaining β raise X from Y
class DatabaseError(Exception):
pass
def fetch_user(user_id, data):
try:
return data[user_id]
except KeyError as e:
raise DatabaseError(f"User {user_id} not found") from e
records = {"alice": {"age": 30}, "bob": {"age": 25}}
for uid in ["alice", "carol"]:
try:
user = fetch_user(uid, records)
print(f"Found: {user}")
except DatabaseError as e:
print(f"DB Error: {e}")
print(f" Caused by: {e.__cause__}")import logging
import io
from contextlib import suppress, redirect_stdout, contextmanager
# suppress β silently ignore specific exceptions (replaces try/except/pass)
with suppress(FileNotFoundError):
open("nonexistent_file.txt") # no error raised
print("suppress: FileNotFoundError silently ignored")
# redirect_stdout β capture print() output into a buffer
buffer = io.StringIO()
with redirect_stdout(buffer):
print("This goes into the buffer, not the terminal")
print("So does this line")
captured = buffer.getvalue()
print(f"Captured {len(captured.splitlines())} lines: {captured.splitlines()[0]!r}")
# @contextmanager β create a context manager with a generator
@contextmanager
def managed_resource(name):
print(f" [open] {name}")
try:
yield name.upper() # value bound to 'as' target
except Exception as e:
print(f" [error] {e}")
raise
finally:
print(f" [close] {name}")
with managed_resource("database_connection") as res:
print(f" Using: {res}")
# logging module basics
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)-8s %(name)s: %(message)s"
)
log = logging.getLogger("myapp")
log.debug("Debug-level detail (only shown at DEBUG+)")
log.info("Server started on port 8080")
log.warning("Disk usage at 85%%")
log.error("Failed to connect to database")import json
def read_config(filepath):
try:
with open(filepath, "r") as f:
data = json.load(f)
return data
except FileNotFoundError:
print(f"Config file not found: {filepath}")
return {}
except json.JSONDecodeError as e:
print(f"Invalid JSON in {filepath}: {e}")
return {}
def get_setting(config, key, default=None, required=False):
value = config.get(key, default)
if required and value is None:
raise KeyError(f"Required setting '{key}' is missing from config")
return value
# Simulate loading a config
sample_config = {"db_host": "localhost", "db_port": 5432, "debug": True}
try:
host = get_setting(sample_config, "db_host", required=True)
port = get_setting(sample_config, "db_port", required=True)
timeout = get_setting(sample_config, "timeout", default=30)
api_key = get_setting(sample_config, "api_key", required=True)
except KeyError as e:
print(f"Configuration error: {e}")
host, port, timeout = "localhost", 5432, 30
print(f"Using defaults: {host}:{port}, timeout={timeout}s")def parse_record(line):
# TODO: split line by ","
# TODO: wrap in try/except to catch ValueError and IndexError
# TODO: inside try:
# parts = line.split(",")
# name = parts[0].strip()
# age = int(parts[1].strip()) # may raise ValueError
# score = float(parts[2].strip()) # may raise ValueError
# return {"name": name, "age": age, "score": score}
# TODO: on except, return None
pass
# Test cases
test_lines = [
"Alice,28,92.5", # valid
"Bob,thirty,88.0", # bad age
"Carol,22", # missing score (IndexError)
"Dave,19,invalid", # bad score
"", # empty
]
for line in test_lines:
result = parse_record(line)
print(f" {line!r:25s} -> {result}")Read and write files using open(). Use the with statement to ensure files are always closed. Python handles text and binary files.
import os
# Write a file
with open("demo.txt", "w") as f:
f.write("Line 1: Hello World
")
f.write("Line 2: Python File I/O
")
f.writelines(["Line 3: data
", "Line 4: more data
"])
# Read entire file
with open("demo.txt", "r") as f:
content = f.read()
print("Full content:
", content)
# Read line by line
with open("demo.txt", "r") as f:
for i, line in enumerate(f, 1):
print(f" [{i}] {line.rstrip()}")
os.remove("demo.txt") # cleanupimport json, csv, io
# JSON
data = {"name": "Alice", "scores": [95, 87, 91], "active": True}
json_str = json.dumps(data, indent=2)
print("JSON:
", json_str)
loaded = json.loads(json_str)
print("Avg score:", sum(loaded["scores"]) / len(loaded["scores"]))
# CSV (using in-memory buffer)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["name", "age", "city"])
writer.writerows([["Alice",30,"NYC"],["Bob",25,"LA"],["Carol",35,"Chicago"]])
output.seek(0)
reader = csv.DictReader(output)
for row in reader:
print(dict(row))import pathlib, io, tempfile
# pathlib β modern, object-oriented path handling
p = pathlib.Path.home()
print("Home dir:", p)
print("Exists:", p.exists())
# Build paths with / operator
tmp = pathlib.Path(tempfile.gettempdir())
data_file = tmp / "demo_data.txt"
# Write and read with pathlib
data_file.write_text("Hello from pathlib!\nLine 2\nLine 3\n", encoding="utf-8")
content = data_file.read_text(encoding="utf-8")
print("Read back:", content.splitlines())
# Inspect path parts
print("Name: ", data_file.name)
print("Stem: ", data_file.stem)
print("Suffix: ", data_file.suffix)
print("Parent: ", data_file.parent)
data_file.unlink() # delete
# io.BytesIO β in-memory binary buffer (like a file but in RAM)
buf = io.BytesIO()
buf.write(b"\x89PNG\r\n") # fake PNG header bytes
buf.write(b"binary data here")
buf.seek(0)
header = buf.read(6)
print("Bytes header:", header)
print("Buffer size:", buf.getbuffer().nbytes, "bytes")import pathlib, tempfile, os
# Create a temporary directory to experiment in
with tempfile.TemporaryDirectory() as tmpdir:
root = pathlib.Path(tmpdir)
# Create nested structure
(root / "src").mkdir()
(root / "src" / "utils").mkdir()
(root / "data").mkdir()
(root / "src" / "main.py").write_text("# main", encoding="utf-8")
(root / "src" / "helper.py").write_text("# helper", encoding="utf-8")
(root / "src" / "utils" / "tools.py").write_text("# tools", encoding="utf-8")
(root / "data" / "report.csv").write_text("a,b,c", encoding="utf-8")
(root / "data" / "notes.txt").write_text("notes", encoding="utf-8")
(root / "README.md").write_text("# Project", encoding="utf-8")
# iterdir() β immediate children only (non-recursive)
print("Top-level items:")
for item in sorted(root.iterdir()):
kind = "DIR " if item.is_dir() else "FILE"
print(f" {kind} {item.name}")
# glob() β match pattern in direct children
print("\n*.md files (glob):", [p.name for p in root.glob("*.md")])
# rglob() β recursive glob across all subdirectories
print("All .py files (rglob):")
for py in sorted(root.rglob("*.py")):
print(f" {py.relative_to(root)}")
print("All files (rglob **):")
all_files = sorted(root.rglob("*"))
for f in all_files:
if f.is_file():
print(f" {f.relative_to(root)} ({f.stat().st_size} bytes)")
# tempfile β create named temp files that auto-delete
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") as tf:
tf.write('{"status": "ok"}')
tmp_path = pathlib.Path(tf.name)
print(f"\nTemp file: {tmp_path.name}")
print("Content:", tmp_path.read_text(encoding="utf-8"))
tmp_path.unlink() # manual cleanup since delete=False
print("Temp file deleted:", not tmp_path.exists())import csv, json, io
# Simulate CSV content
csv_data = "
".join([
"date,product,region,qty,price",
"2024-01-01,Widget,North,10,9.99",
"2024-01-01,Gadget,South,5,49.99",
"2024-01-02,Widget,East,15,9.99",
"2024-01-02,Doohickey,North,8,19.99",
"2024-01-03,Gadget,East,3,49.99",
"2024-01-03,Widget,South,12,9.99",
])
reader = csv.DictReader(io.StringIO(csv_data))
summary = {}
for row in reader:
revenue = float(row["qty"]) * float(row["price"])
product = row["product"]
region = row["region"]
if product not in summary:
summary[product] = {"total_revenue": 0, "total_qty": 0, "regions": {}}
summary[product]["total_revenue"] += revenue
summary[product]["total_qty"] += int(row["qty"])
summary[product]["regions"][region] = summary[product]["regions"].get(region, 0) + revenue
report = {k: {"revenue": round(v["total_revenue"],2), "qty": v["total_qty"],
"top_region": max(v["regions"], key=v["regions"].get)}
for k, v in summary.items()}
print(json.dumps(report, indent=2))import io
log_data = """2024-01-15 INFO Server started on port 8080
2024-01-15 DEBUG Loading config file
2024-01-15 INFO Database connected
2024-01-15 WARNING Disk usage at 80%
2024-01-15 ERROR Failed to connect to cache: timeout
2024-01-15 INFO Request received: GET /home
2024-01-15 ERROR Database query failed: syntax error
2024-01-15 WARNING Memory usage high: 75%
2024-01-15 INFO Request completed in 120ms
2024-01-15 CRITICAL Disk full β writes disabled"""
# TODO: create a file-like object from log_data using io.StringIO
# f = io.StringIO(???)
# TODO: iterate over lines, split each line to get the level (index 1)
# count level occurrences in a dict: level_counts = {}
# if the level is "ERROR", append the full line to error_lines list
# Expected output:
# Level counts: {'INFO': 4, 'DEBUG': 1, 'WARNING': 2, 'ERROR': 2, 'CRITICAL': 1}
# Error lines:
# 2024-01-15 ERROR Failed to connect to cache: timeout
# 2024-01-15 ERROR Database query failed: syntax errorComprehensions create lists, dicts, and sets concisely. Generators produce values lazily, saving memory for large sequences.
# List comprehension
squares = [x**2 for x in range(10)]
evens = [x for x in range(20) if x % 2 == 0]
matrix = [[i*j for j in range(1,4)] for i in range(1,4)]
print("Squares:", squares[:5])
print("Evens:", evens)
print("Matrix:", matrix)
# Dict comprehension
word = "mississippi"
counts = {ch: word.count(ch) for ch in set(word)}
print("Char counts:", dict(sorted(counts.items())))
# Set comprehension
text = ["hello", "world", "hello", "python"]
unique_upper = {w.upper() for w in text}
print("Unique upper:", unique_upper)# Generator function (yields values lazily)
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
fibs = list(fibonacci(10))
print("Fibonacci:", fibs)
# Generator expression (lazy list comprehension)
big_squares = (x**2 for x in range(1_000_000))
print("First 5:", [next(big_squares) for _ in range(5)])
# sum() with generator β no list created in memory
total = sum(x**2 for x in range(1000))
print("Sum of squares 0..999:", total)# Chain generators together β each processes values lazily
def read_numbers(data):
"""Yield numbers one at a time from a list."""
for n in data:
yield n
def filter_positive(numbers):
"""Yield only positive numbers."""
for n in numbers:
if n > 0:
yield n
def square(numbers):
"""Yield squares of numbers."""
for n in numbers:
yield n * n
def running_total(numbers):
"""Yield cumulative sum at each step."""
total = 0
for n in numbers:
total += n
yield total
# Build the pipeline
raw = [-3, 1, -1, 4, 5, -9, 2, 6]
pipeline = running_total(square(filter_positive(read_numbers(raw))))
print("Pipeline output:", list(pipeline))
# positives: 1,4,5,2,6 squares: 1,16,25,4,36 running: 1,17,42,46,82
# Nested comprehension β flatten a matrix
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = [cell for row in matrix for cell in row]
print("Flat matrix:", flat)
# Nested comprehension β all pairs where i != j
pairs = [(i, j) for i in range(4) for j in range(4) if i != j]
print(f"Pairs (i!=j): {len(pairs)} pairs, first 4: {pairs[:4]}")import itertools
# chain.from_iterable β flatten one level of nested iterables
nested = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
flat = list(itertools.chain.from_iterable(nested))
print("chain.from_iterable:", flat)
# zip_longest β zip unequal-length iterables, filling with a default
names = ["Alice", "Bob", "Carol"]
scores = [92, 85]
grades = ["A"]
for row in itertools.zip_longest(names, scores, grades, fillvalue="N/A"):
print(f" {row[0]:8s} score={row[1]:>4} grade={row[2]}")
# starmap β map with argument unpacking (like map but for tuple arguments)
pairs = [(2, 10), (3, 4), (10, 2), (5, 3)]
powers = list(itertools.starmap(pow, pairs))
print("starmap(pow, pairs):", powers) # [1024, 81, 100, 125]
# Practical: generate a multiplication table with starmap
import operator
combos = itertools.product(range(1, 4), range(1, 4))
table = list(itertools.starmap(operator.mul, combos))
print("3x3 mul table (flat):", table)
# accumulate β running totals / cumulative operations
sales = [1200, 850, 1400, 980, 1100]
running = list(itertools.accumulate(sales))
print("Running sales totals:", running)
running_max = list(itertools.accumulate(sales, max))
print("Running maximums: ", running_max)import io
# Simulate a large log file as a generator
def stream_logs(file_obj, min_level="WARNING"):
levels = {"DEBUG":0, "INFO":1, "WARNING":2, "ERROR":3, "CRITICAL":4}
min_n = levels.get(min_level, 0)
for line in file_obj:
line = line.strip()
if not line: continue
parts = line.split(None, 3)
if len(parts) < 4: continue
level = parts[1]
if levels.get(level, 0) >= min_n:
yield {"ts": parts[0], "level": level, "service": parts[2], "msg": parts[3]}
sample_log = io.StringIO("
".join([
"2024-01-15 DEBUG db_pool: Connection acquired",
"2024-01-15 INFO auth_service: User login alice@co.com",
"2024-01-15 WARNING api_gateway: Rate limit 90% for 192.168.1.1",
"2024-01-15 ERROR payment_svc: Timeout after 30s for order #8821",
"2024-01-15 INFO cache: Cache miss for key user:42",
"2024-01-15 CRITICAL db_pool: Connection pool exhausted!",
]))
alerts = list(stream_logs(sample_log, min_level="WARNING"))
print(f"Found {len(alerts)} alerts:")
for a in alerts:
print(f" [{a['level']:8s}] {a['service']:12s} {a['msg']}")import io
csv_data = """date,product,qty,price
2024-01-01,Widget,10,9.99
2024-01-02,Gadget,5,49.99
2024-01-03,Widget,15,9.99
2024-01-04,SuperGadget,2,199.99
2024-01-05,Widget,8,9.99
2024-01-06,Gadget,3,49.99"""
def csv_rows(text):
# TODO: use io.StringIO(text), skip the header line,
# yield each remaining non-empty stripped line
pass
def parse_sales(rows):
# TODO: for each row, split by "," to get date, product, qty, price
# yield dict: {"date": ..., "product": ..., "revenue": int(qty)*float(price)}
pass
def high_value(sales, threshold=100):
# TODO: yield only sales where revenue > threshold
pass
# Chain the pipeline
pipeline = high_value(parse_sales(csv_rows(csv_data)))
for sale in pipeline:
print(f" {sale['date']} {sale['product']:12s} ${sale['revenue']:.2f}")Python's standard library is vast. Key modules: os, sys, datetime, math, random, collections, itertools. Use import to access them.
import os
import math
import datetime
# os β file system and environment
cwd = os.getcwd()
print("CWD:", cwd)
print("Home:", os.path.expanduser("~"))
print("Path exists:", os.path.exists(cwd))
# datetime
today = datetime.date.today()
now = datetime.datetime.now()
delta = datetime.timedelta(days=30)
print("Today:", today)
print("In 30 days:", today + delta)
print("Day of week:", today.strftime("%A"))
# math
print("pi:", round(math.pi, 4))
print("sqrt(2):", round(math.sqrt(2), 4))
print("log2(1024):", math.log2(1024))from collections import Counter, defaultdict, namedtuple
import itertools
# Counter
words = "the quick brown fox jumps over the lazy dog the".split()
c = Counter(words)
print("Most common:", c.most_common(3))
# defaultdict
from collections import defaultdict
group = defaultdict(list)
data = [("fruit","apple"),("veg","carrot"),("fruit","banana"),("veg","pea")]
for category, item in data:
group[category].append(item)
print(dict(group))
# namedtuple
Point = namedtuple("Point", ["x", "y"])
p = Point(3, 7)
print(f"Point: x={p.x}, y={p.y}")
# itertools
pairs = list(itertools.combinations("ABCD", 2))
print("Combinations:", pairs)import functools, random, secrets
# functools.reduce β fold a sequence into a single value
from functools import reduce
product = reduce(lambda acc, x: acc * x, range(1, 6)) # 5! = 120
print("5! =", product)
# functools.partial β fix some arguments of a function
def power(base, exponent):
return base ** exponent
square = functools.partial(power, exponent=2)
cube = functools.partial(power, exponent=3)
print("Squares:", [square(x) for x in range(1, 6)])
print("Cubes: ", [cube(x) for x in range(1, 6)])
# functools.lru_cache β memoize automatically
@functools.lru_cache(maxsize=None)
def fib(n):
if n < 2: return n
return fib(n-1) + fib(n-2)
print("fib(35):", fib(35))
print("Cache info:", fib.cache_info())
# random vs secrets
# random β reproducible (seeded), for simulations
random.seed(42)
sample = random.sample(range(100), 5)
print("Random sample:", sample)
# secrets β cryptographically secure, for tokens/passwords
token = secrets.token_hex(16) # 32-char hex string
print("Secure token:", token)
pin = secrets.randbelow(10000) # 0-9999
print("Secure PIN: ", str(pin).zfill(4))import sys
import importlib
import pprint
# sys.path β where Python searches for modules
print("sys.path entries (first 3):")
for p in sys.argv[0:1]: # avoid printing too many paths
pass
for path in sys.path[:3]:
print(f" {path!r}")
# sys.argv β command-line arguments
print(f"Script name: {sys.argv[0]!r}")
# sys.version / sys.platform β runtime info
print(f"Python {sys.version.split()[0]} on {sys.platform}")
# importlib β dynamic import by string name
math_mod = importlib.import_module("math")
print(f"math.tau = {math_mod.tau:.6f}")
json_mod = importlib.import_module("json")
encoded = json_mod.dumps({"key": "value"})
print("Dynamic json.dumps:", encoded)
# __name__ == "__main__" pattern
# This block only runs when the script is executed directly,
# NOT when it is imported as a module.
if __name__ == "__main__":
print("Running as main script β __name__:", __name__)
# pprint β pretty-print complex nested structures
data = {
"users": [
{"id": 1, "name": "Alice", "roles": ["admin", "editor"],
"prefs": {"theme": "dark", "lang": "en"}},
{"id": 2, "name": "Bob", "roles": ["viewer"],
"prefs": {"theme": "light", "lang": "fr"}},
],
"meta": {"version": "2.1", "count": 2}
}
print("\npprint output:")
pprint.pprint(data, width=60, sort_dicts=False)from collections import Counter, defaultdict
import datetime
# Simulated access log entries: (ip, method, path, status, ts)
logs = [
("192.168.1.10", "GET", "/home", 200, "2024-01-15 09:00:01"),
("10.0.0.5", "POST", "/login", 401, "2024-01-15 09:00:03"),
("10.0.0.5", "POST", "/login", 401, "2024-01-15 09:00:04"),
("10.0.0.5", "POST", "/login", 401, "2024-01-15 09:00:05"),
("192.168.1.10", "GET", "/products", 200, "2024-01-15 09:01:00"),
("172.16.0.1", "GET", "/admin", 403, "2024-01-15 09:01:30"),
("172.16.0.1", "GET", "/admin", 403, "2024-01-15 09:01:32"),
("192.168.1.20", "GET", "/home", 200, "2024-01-15 09:02:00"),
("10.0.0.5", "POST", "/login", 200, "2024-01-15 09:02:10"),
]
status_counts = Counter(entry[3] for entry in logs)
ip_requests = Counter(entry[0] for entry in logs)
failures_by_ip = defaultdict(int)
for ip, method, path, status, ts in logs:
if status in (401, 403):
failures_by_ip[ip] += 1
print("Status codes:", dict(status_counts))
print("
Top IPs:")
for ip, count in ip_requests.most_common():
fails = failures_by_ip[ip]
flag = " β οΈ SUSPICIOUS" if fails >= 2 else ""
print(f" {ip:16s} {count:3d} requests, {fails} failures{flag}")import statistics
from collections import Counter
scores = [72, 88, 95, 63, 79, 91, 55, 84, 76, 90,
67, 83, 58, 97, 71, 80, 89, 62, 75, 93]
# TODO: compute mean, median, stdev using statistics module
# mean = statistics.mean(scores)
# median = statistics.median(scores)
# stdev = statistics.stdev(scores)
# print(f"Mean: {mean:.1f}, Median: {median}, StdDev: {stdev:.1f}")
# TODO: use sorted() to get top_5 (highest) and bottom_5 (lowest)
# top_5 = sorted(scores, reverse=True)[:5]
# bottom_5 = sorted(scores)[:5]
# TODO: map each score to a letter grade bin
# def grade_bin(s): return "A" if s>=90 else "B" if s>=80 else "C" if s>=70 else "D" if s>=60 else "F"
# bins = Counter(grade_bin(s) for s in scores)
# print("Grade bins:", dict(sorted(bins.items())))
print("Top 5: ", top_5)
print("Bottom 5:", bottom_5)Manage resources safely and cleanly with the with statement. Guarantee teardown even when exceptions occur.
import tempfile, pathlib, os
# Bad pattern: manual open/close risks resource leak
# f = open('data.txt')
# data = f.read() # if this raises, f never closes
# f.close()
# Good pattern: with statement guarantees close()
tmp = pathlib.Path(tempfile.mktemp(suffix='.txt'))
tmp.write_text('line 1\nline 2\nline 3')
with open(tmp) as f:
data = f.read()
print('Read:', repr(data))
# Write mode
with open(tmp, 'a') as f:
f.write('\nline 4')
# File is closed here even if an exception happened inside
# Reading line by line (memory-efficient for large files)
with open(tmp) as f:
for i, line in enumerate(f, 1):
print(f' {i}: {line.rstrip()}')
tmp.unlink()import tempfile, pathlib
src = pathlib.Path(tempfile.mktemp(suffix='.txt'))
dst = pathlib.Path(tempfile.mktemp(suffix='.txt'))
src.write_text('hello from source')
# Open multiple files in one with statement
with open(src) as fin, open(dst, 'w') as fout:
for line in fin:
fout.write(line.upper())
print('Copied and uppercased:', dst.read_text())
src.unlink(); dst.unlink()
# Also works for nested managers of different types
import io
with io.StringIO('a,b,c\n1,2,3') as buf:
print('StringIO:', buf.read())import time
class Timer:
def __init__(self, name='block'):
self.name = name
def __enter__(self):
self.start = time.perf_counter()
return self # value bound to 'as' variable
def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self.start
print(f'[{self.name}] elapsed: {self.elapsed*1000:.2f} ms')
# Return False (default) to re-raise any exception
return False
with Timer('sum of squares') as t:
result = sum(x**2 for x in range(1_000_000))
print(f'Result: {result:,}, time stored: {t.elapsed*1000:.2f} ms')
# Suppress specific exceptions by returning True from __exit__
class Suppress:
def __init__(self, *exc_types):
self.exc_types = exc_types
def __enter__(self): return self
def __exit__(self, exc_type, *_):
return exc_type in self.exc_types
with Suppress(ZeroDivisionError):
x = 1 / 0 # suppressed!
print('Continued after ZeroDivisionError')from contextlib import contextmanager, suppress
import tempfile, pathlib
@contextmanager
def temporary_file(suffix='.txt', content=''):
"""Create a temp file, yield its path, delete on exit."""
path = pathlib.Path(tempfile.mktemp(suffix=suffix))
path.write_text(content)
try:
yield path
finally:
if path.exists():
path.unlink()
print(f'Cleaned up {path.name}')
@contextmanager
def log_section(name):
print(f'>>> START: {name}')
try:
yield
except Exception as e:
print(f'>>> ERROR in {name}: {e}')
raise
finally:
print(f'>>> END: {name}')
with temporary_file(content='hello world') as tmp:
data = tmp.read_text()
print('File content:', data)
# File is deleted here
with log_section('data processing'):
result = [x**2 for x in range(5)]
print('Result:', result)
# contextlib.suppress replaces try/except for known ignorable errors
with suppress(FileNotFoundError):
pathlib.Path('nonexistent.txt').unlink()
print('Suppressed FileNotFoundError cleanly')class DatabaseConnection:
def __init__(self, url):
self.url = url
self.connected = False
def __enter__(self):
# TODO: set self.connected = True, print 'Connected to {url}'
# TODO: return self
pass
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: if exception, print 'Rolling back', else print 'Committed'
# TODO: print 'Disconnected', set connected = False
# TODO: return False to propagate exceptions
pass
# Test: should commit
with DatabaseConnection('sqlite:///app.db') as db:
print(f' Using connection (connected={db.connected})')
# Test: should rollback
try:
with DatabaseConnection('sqlite:///app.db') as db:
raise ValueError('Oops!')
except ValueError:
passPattern matching and text extraction with the re module β character classes, groups, lookaheads, and real-world parsing patterns.
import re
text = 'Contact us at support@example.com or sales@company.org for help.'
# re.search β find first match anywhere in string
match = re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
if match:
print('First email:', match.group())
# re.findall β return all matches as list
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.]+', text)
print('All emails:', emails)
# re.match β only matches at START of string
print('match at start:', re.match(r'Contact', text)) # matches
print('match at start:', re.match(r'support', text)) # None
# re.fullmatch β entire string must match
phone = '555-1234'
valid = re.fullmatch(r'\d{3}-\d{4}', phone)
print('Valid phone:', bool(valid))
# Flags: case-insensitive
print(re.findall(r'contact', text, re.IGNORECASE))import re
log_line = '2024-03-15 09:23:41 ERROR [auth] Login failed for user: alice'
# Named groups with (?P<name>...)
pattern = r'(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+) \[(?P<module>\w+)\] (?P<message>.+)'
m = re.match(pattern, log_line)
if m:
print('Date: ', m.group('date'))
print('Level: ', m.group('level'))
print('Module: ', m.group('module'))
print('Message:', m.group('message'))
print('Dict: ', m.groupdict())
# Non-capturing groups (?:...)
urls = ['http://example.com', 'https://secure.org', 'ftp://old.net']
for url in urls:
m = re.match(r'(?:https?|ftp)://([\w.-]+)', url)
if m:
print(f' Domain: {m.group(1)}' # group(1) = first capturing groupimport re
text = 'Call us at (555) 123-4567 or 555.987.6543 today!'
# re.sub β replace pattern
cleaned = re.sub(r'[()\s.-]', '', text)
print('Cleaned:', cleaned)
# Replace with backreference
normalized = re.sub(r'[()\s.-]+?(\d{3})[)\s.-]+(\d{3})[.-](\d{4})', r'\1-\2-\3', text)
print('Normalized:', normalized)
# re.split β split on pattern
sentence = 'one, two; three | four'
words = re.split(r'[,;|]\s*', sentence)
print('Split:', words)
# Compile for reuse (faster in loops)
EMAIL_RE = re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+', re.IGNORECASE)
texts = ['alice@example.com is admin', 'no email here', 'bob@test.org rocks']
for t in texts:
found = EMAIL_RE.findall(t)
if found:
print(f' Found in "{t}": {found}')import re
PATTERNS = {
'email': r'[\w.+-]+@[\w-]+\.[\w.]{2,}',
'url': r'https?://[\w/:%#\$&\?\(\)~\.=\+\-]+',
'date': r'\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b',
'phone': r'\b\d{3}[-.]\d{3}[-.]\d{4}\b',
'ipv4': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
'hashtag': r'#[\w]+',
}
sample = '''
Email me at john.doe@example.com by 2024-03-15.
Visit https://example.com/page?id=42 for details.
Call 555-123-4567. Server IP: 192.168.1.100
Twitter: #DataScience #Python
'''
for name, pattern in PATTERNS.items():
matches = re.findall(pattern, sample)
if matches:
print(f'{name:8s}: {matches}')import re
text = '''
Please contact billing@company.com or support@help.org.
Call 555-123-4567 or 800-555-9999 for support.
Invoice total: $1,234.56. Discount applied: $50.00.
Admin: admin@internal.net | Helpdesk: 312-555-0100
'''
EMAIL_PATTERN = re.compile(r'') # TODO
PHONE_PATTERN = re.compile(r'') # TODO
MONEY_PATTERN = re.compile(r'') # TODO
print('Emails:', EMAIL_PATTERN.findall(text))
print('Phones:', PHONE_PATTERN.findall(text))
print('Amounts:', MONEY_PATTERN.findall(text))Write self-documenting, IDE-friendly code with type annotations and eliminate boilerplate from data containers with @dataclass.
from typing import Optional, Union, List
def greet(name: str, times: int = 1) -> str:
return ('Hello, ' + name + '! ') * times
def parse_int(value: Union[str, int]) -> Optional[int]:
try:
return int(value)
except (ValueError, TypeError):
return None
def process(items: List[Union[int, float]]) -> float:
return sum(items) / len(items) if items else 0.0
print(greet('Alice'))
print(greet('Bob', 3))
print(parse_int('42'))
print(parse_int('abc')) # returns None
print(process([1, 2.5, 3, 4]))
# Python 3.10+ union syntax: int | str instead of Union[int, str]
def modern(x: int | str) -> str:
return str(x)
print(modern(42))from typing import Dict, List, Tuple, Callable, TypeVar
T = TypeVar('T')
def first(items: List[T]) -> Optional[T]:
return items[0] if items else None
def apply_all(funcs: List[Callable[[int], int]], value: int) -> List[int]:
return [f(value) for f in funcs]
def parse_config(raw: Dict[str, str]) -> Dict[str, int]:
return {k: int(v) for k, v in raw.items() if v.isdigit()}
Point = Tuple[float, float]
def distance(p1: Point, p2: Point) -> float:
return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2) ** 0.5
from typing import Optional
print(first([1, 2, 3])) # 1
print(first([])) # None
print(apply_all([lambda x: x*2, lambda x: x+1], 5)) # [10, 6]
print(parse_config({'a': '10', 'b': 'hello', 'c': '5'}))
print(distance((0.0, 0.0), (3.0, 4.0))) # 5.0from dataclasses import dataclass, field
from typing import List
@dataclass
class Point:
x: float
y: float
def distance_to(self, other: 'Point') -> float:
return ((self.x - other.x)**2 + (self.y - other.y)**2)**0.5
@dataclass
class Product:
name: str
price: float
tags: List[str] = field(default_factory=list)
in_stock: bool = True
def __post_init__(self):
if self.price < 0:
raise ValueError(f'Price cannot be negative: {self.price}')
p1 = Point(0, 0)
p2 = Point(3, 4)
print(p1) # Point(x=0, y=0)
print(p2) # Point(x=3, y=4)
print(p1 == Point(0,0)) # True β __eq__ auto-generated
print(p1.distance_to(p2)) # 5.0
laptop = Product('Laptop', 999.99, ['electronics', 'computers'])
print(laptop)
print(laptop.tags)
try:
Product('Bad', -1)
except ValueError as e:
print('Caught:', e)from dataclasses import dataclass, field
from typing import List
import functools
@dataclass(frozen=True) # immutable β can be used in sets/dict keys
class Version:
major: int
minor: int
patch: int = 0
def __str__(self) -> str:
return f'{self.major}.{self.minor}.{self.patch}'
@dataclass(order=True) # auto-generates __lt__, __le__, __gt__, __ge__
class Employee:
sort_index: float = field(init=False, repr=False)
name: str
salary: float
dept: str
def __post_init__(self):
object.__setattr__(self, 'sort_index', self.salary) if False else None
self.sort_index = self.salary # used for ordering
v1 = Version(1, 2, 3)
v2 = Version(1, 2, 3)
print(v1 == v2) # True
print(hash(v1)) # hashable because frozen
try:
v1.major = 2 # raises FrozenInstanceError
except Exception as e:
print(type(e).__name__, e)
employees = [Employee('Carol', 95000, 'Eng'), Employee('Bob', 80000, 'Sales'), Employee('Alice', 110000, 'Eng')]
employees.sort()
for e in employees:
print(f' {e.name}: ${e.salary:,.0f}')from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Person:
name: str
age: int
email: str
phone: Optional[str] = None
@dataclass
class AddressBook:
contacts: List[Person] = field(default_factory=list)
def add(self, person: Person) -> None:
# TODO
pass
def find_by_name(self, name: str) -> Optional[Person]:
# TODO
pass
def adults(self) -> List[Person]:
# TODO
pass
book = AddressBook()
book.add(Person('Alice', 30, 'alice@example.com', '555-1234'))
book.add(Person('Bob', 17, 'bob@example.com'))
book.add(Person('Carol', 25, 'carol@example.com', '555-5678'))
print(book.find_by_name('Alice'))
print('Adults:', [p.name for p in book.adults()])Speed up I/O-bound tasks with threading and asyncio, CPU-bound tasks with multiprocessing, and understand the GIL. Use concurrent.futures for clean parallel execution.
import threading
import time
import random
results = {}
lock = threading.Lock()
def fetch_data(url_id):
'''Simulate an I/O-bound network call.'''
time.sleep(random.uniform(0.05, 0.15)) # simulate latency
data = f'data_from_endpoint_{url_id}'
with lock:
results[url_id] = data
# Sequential (slow)
t0 = time.perf_counter()
for i in range(5):
fetch_data(i)
t_seq = time.perf_counter() - t0
print(f'Sequential: {t_seq:.3f}s')
# Threaded (fast for I/O)
results.clear()
threads = [threading.Thread(target=fetch_data, args=(i,)) for i in range(5)]
t0 = time.perf_counter()
for th in threads: th.start()
for th in threads: th.join()
t_thread = time.perf_counter() - t0
print(f'Threaded: {t_thread:.3f}s ({t_seq/t_thread:.1f}x faster)')
print('Results:', list(results.keys()))from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time, math
def cpu_task(n):
'''CPU-bound: compute sum of first n primes.'''
primes, count = [], 2
while len(primes) < n:
if all(count % p != 0 for p in primes): primes.append(count)
count += 1
return sum(primes)
def io_task(delay):
time.sleep(delay)
return f'done after {delay:.2f}s'
# ThreadPool for I/O
delays = [0.05, 0.08, 0.06, 0.07, 0.05]
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as ex:
futures = {ex.submit(io_task, d): d for d in delays}
for f in as_completed(futures):
pass
print(f'ThreadPool I/O: {time.perf_counter()-t0:.3f}s (sum={sum(delays):.2f}s serial)')
# ProcessPool for CPU (bypasses GIL)
tasks = [50, 60, 55, 65, 45]
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
results = list(ex.map(cpu_task, tasks))
print(f'ProcessPool CPU: {time.perf_counter()-t0:.2f}s')
print('Sum of primes results:', results[:3], '...')import asyncio
import time
async def fetch(session_id, delay):
'''Simulate async HTTP request.'''
await asyncio.sleep(delay)
return f'response_{session_id}'
async def main():
delays = [0.1, 0.05, 0.08, 0.12, 0.06]
# Sequential async (still fast but ordered)
t0 = time.perf_counter()
results = []
for i, d in enumerate(delays):
r = await fetch(i, d)
results.append(r)
print(f'Sequential async: {time.perf_counter()-t0:.3f}s')
# Concurrent async (all at once)
t0 = time.perf_counter()
tasks = [fetch(i, d) for i, d in enumerate(delays)]
results = await asyncio.gather(*tasks)
print(f'Concurrent async: {time.perf_counter()-t0:.3f}s')
print('Results:', results)
asyncio.run(main())import threading
import queue
import time
import random
def producer(q, n_items):
for i in range(n_items):
item = f'item_{i}'
q.put(item)
time.sleep(random.uniform(0.01, 0.03))
q.put(None) # sentinel
def consumer(q, results):
while True:
item = q.get()
if item is None:
break
# Simulate processing
time.sleep(random.uniform(0.005, 0.015))
results.append(item.upper())
q.task_done()
q = queue.Queue(maxsize=5)
results = []
t0 = time.perf_counter()
prod = threading.Thread(target=producer, args=(q, 10))
cons = threading.Thread(target=consumer, args=(q, results))
prod.start(); cons.start()
prod.join(); cons.join()
print(f'Processed {len(results)} items in {time.perf_counter()-t0:.3f}s')
print('Processed:', results[:5], '...')from concurrent.futures import ThreadPoolExecutor
import asyncio, time, random
URLS = [f'https://example.com/page/{i}' for i in range(15)]
def sync_fetch(url):
time.sleep(random.uniform(0.05, 0.3))
return f'<html>{url}</html>'
async def async_fetch(url):
await asyncio.sleep(random.uniform(0.05, 0.3))
return f'<html>{url}</html>'
# TODO: (1) ThreadPoolExecutor: fetch all URLs, measure time
# TODO: (2) asyncio.gather: fetch all URLs, measure time
# TODO: (3) Print speedup vs sequential (sum of delays)
Apply classic Gang-of-Four patterns in Python: Singleton, Factory, Observer, Strategy, and Decorator. Understand when and why to use each.
# Singleton: one instance per process
class DatabasePool:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.connections = []
print('Creating new DatabasePool')
return cls._instance
def connect(self, host):
self.connections.append(host)
return f'Connected to {host}'
pool1 = DatabasePool()
pool2 = DatabasePool()
print('Same instance:', pool1 is pool2)
pool1.connect('db1.server.com')
print('Connections visible from pool2:', pool2.connections)
# Factory: create objects without knowing exact class
class Shape:
def area(self): raise NotImplementedError
class Circle:
def __init__(self, r): self.r = r
def area(self): return 3.14159 * self.r**2
def __repr__(self): return f'Circle(r={self.r})'
class Rectangle:
def __init__(self, w, h): self.w, self.h = w, h
def area(self): return self.w * self.h
def __repr__(self): return f'Rectangle({self.w}x{self.h})'
def shape_factory(kind, **kwargs):
shapes = {'circle': Circle, 'rectangle': Rectangle}
if kind not in shapes: raise ValueError(f'Unknown shape: {kind}')
return shapes[kind](**kwargs)
for spec in [('circle', {'r': 5}), ('rectangle', {'w': 4, 'h': 6})]:
s = shape_factory(spec[0], **spec[1])
print(f'{s}: area={s.area():.2f}')from typing import Callable, Dict, List
class EventBus:
'''Simple publish-subscribe event system.'''
def __init__(self):
self._handlers: Dict[str, List[Callable]] = {}
def subscribe(self, event: str, handler: Callable):
self._handlers.setdefault(event, []).append(handler)
return self # fluent API
def publish(self, event: str, **data):
for handler in self._handlers.get(event, []):
handler(**data)
def unsubscribe(self, event: str, handler: Callable):
if event in self._handlers:
self._handlers[event] = [h for h in self._handlers[event] if h != handler]
# Usage
bus = EventBus()
def on_order_placed(order_id, amount, user):
print(f'[EMAIL] Order #{order_id} placed by {user}: ${amount:.2f}')
def on_order_placed_analytics(order_id, amount, **_):
print(f'[ANALYTICS] Recorded order #{order_id}, revenue=${amount:.2f}')
def on_order_placed_inventory(order_id, **_):
print(f'[INVENTORY] Reducing stock for order #{order_id}')
bus.subscribe('order.placed', on_order_placed)
bus.subscribe('order.placed', on_order_placed_analytics)
bus.subscribe('order.placed', on_order_placed_inventory)
# Trigger event
bus.publish('order.placed', order_id=1042, amount=149.99, user='Alice')from abc import ABC, abstractmethod
from typing import List
class SortStrategy(ABC):
@abstractmethod
def sort(self, data: list) -> list: ...
class BubbleSort(SortStrategy):
def sort(self, data):
arr = data.copy()
n = len(arr)
for i in range(n):
for j in range(n-i-1):
if arr[j] > arr[j+1]:
arr[j], arr[j+1] = arr[j+1], arr[j]
return arr
class MergeSort(SortStrategy):
def sort(self, data):
if len(data) <= 1: return data[:]
mid = len(data) // 2
L, R = self.sort(data[:mid]), self.sort(data[mid:])
result, i, j = [], 0, 0
while i < len(L) and j < len(R):
if L[i] <= R[j]: result.append(L[i]); i += 1
else: result.append(R[j]); j += 1
return result + L[i:] + R[j:]
class Sorter:
def __init__(self, strategy: SortStrategy):
self._strategy = strategy
def set_strategy(self, strategy: SortStrategy):
self._strategy = strategy
def sort(self, data: list) -> list:
return self._strategy.sort(data)
import time, random
data = random.sample(range(1000), 20)
sorter = Sorter(BubbleSort())
print('Bubble:', sorter.sort(data)[:5], '...')
sorter.set_strategy(MergeSort()) # swap strategy at runtime
print('Merge: ', sorter.sort(data)[:5], '...')import time, functools
# Function decorator: retry with backoff
def retry(max_attempts=3, delay=0.01):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts+1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts: raise
print(f' Attempt {attempt} failed: {e}. Retrying...')
time.sleep(delay)
return wrapper
return decorator
import random
@retry(max_attempts=4)
def flaky_api_call():
if random.random() < 0.6: raise ConnectionError('Timeout')
return 'Success!'
random.seed(42)
print('Result:', flaky_api_call())
# Mixin pattern: add logging capability to any class
class LogMixin:
def log(self, msg): print(f'[{self.__class__.__name__}] {msg}')
class TimeMixin:
def timed(self, func, *args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
self.log(f'{func.__name__} took {(time.perf_counter()-t0)*1000:.2f}ms')
return result
class DataProcessor(LogMixin, TimeMixin):
def process(self, data):
import math
return [math.sqrt(abs(x)) for x in data]
p = DataProcessor()
p.log('Starting processing')
result = p.timed(p.process, list(range(-100, 100)))
print('Sample output:', [round(x,2) for x in result[:5]])from abc import ABC, abstractmethod
class BaseExporter(ABC):
@abstractmethod
def export(self, data, path): ...
class CSVExporter(BaseExporter):
def export(self, data, path):
# TODO: write CSV using csv module or simple join
pass
class JSONExporter(BaseExporter):
def export(self, data, path):
# TODO: write JSON using json module
pass
def exporter_factory(fmt: str) -> BaseExporter:
# TODO: return correct exporter based on fmt
pass
# TODO: EventBus or simple list of observers
# TODO: Logger observer: print 'Exported N rows to path'
# TODO: FileSizeChecker observer: print file size
data = [{'id': i, 'value': i*2, 'name': f'item_{i}'} for i in range(100)]
# TODO: export to 'output.csv' and 'output.json', trigger events
Write unit tests, parametrized tests, fixtures, and mocks with pytest. Apply TDD principles and measure code coverage.
# test_math_utils.py (run with: pytest test_math_utils.py -v)
# Here we demonstrate by running inline
import traceback
def add(a, b): return a + b
def divide(a, b):
if b == 0: raise ZeroDivisionError('Cannot divide by zero')
return a / b
def is_prime(n):
if n < 2: return False
return all(n % i != 0 for i in range(2, int(n**0.5)+1))
# --- Tests ---
def test_add():
assert add(2, 3) == 5
assert add(-1, 1) == 0
assert add(0, 0) == 0
def test_divide():
assert divide(10, 2) == 5.0
assert abs(divide(1, 3) - 0.333) < 0.001
def test_divide_by_zero():
try:
divide(5, 0)
assert False, 'Should have raised'
except ZeroDivisionError:
pass # expected
def test_is_prime():
primes = [2, 3, 5, 7, 11, 13]
non_primes = [0, 1, 4, 6, 9, 15]
assert all(is_prime(p) for p in primes)
assert not any(is_prime(n) for n in non_primes)
# Run all tests
tests = [test_add, test_divide, test_divide_by_zero, test_is_prime]
for t in tests:
try: t(); print(f'PASS {t.__name__}')
except AssertionError as e: print(f'FAIL {t.__name__}: {e}')# Demonstrate pytest fixture and parametrize patterns
import os, tempfile
# === Fixture pattern ===
class FakeDB:
def __init__(self):
self.data = {}
def insert(self, key, val): self.data[key] = val
def get(self, key): return self.data.get(key)
def count(self): return len(self.data)
# In pytest: @pytest.fixture
def db_fixture():
'''Provide a fresh DB for each test.'''
return FakeDB()
# === Parametrize pattern ===
# In pytest: @pytest.mark.parametrize('a,b,expected', [...])
def check_multiply(a, b, expected):
assert a * b == expected, f'{a}*{b} should be {expected}'
params = [(2, 3, 6), (0, 100, 0), (-1, -1, 1), (7, 8, 56)]
for a, b, exp in params:
try: check_multiply(a, b, exp); print(f'PASS multiply({a},{b})={exp}')
except AssertionError as e: print(f'FAIL: {e}')
# === Fixture with temp file ===
def test_file_write():
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('hello world')
fname = f.name
try:
content = open(fname).read()
assert content == 'hello world'
print('PASS test_file_write')
finally:
os.unlink(fname)
db = db_fixture()
db.insert('user1', {'name': 'Alice'})
assert db.count() == 1
assert db.get('user1')['name'] == 'Alice'
print('PASS db fixture test')
test_file_write()from unittest.mock import patch, MagicMock, call
import json
# Function that depends on an external service
def fetch_user(user_id: int) -> dict:
import urllib.request
url = f'https://api.example.com/users/{user_id}'
with urllib.request.urlopen(url) as resp:
return json.loads(resp.read())
def process_user(user_id: int) -> str:
user = fetch_user(user_id)
return f'{user["name"]} ({user["email"]})'
# Test without hitting real API
mock_response = MagicMock()
mock_response.read.return_value = json.dumps({'name': 'Alice', 'email': 'alice@co.com'}).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch('urllib.request.urlopen', return_value=mock_response):
result = process_user(42)
print(f'PASS: process_user(42) = {result!r}')
# Test exception handling
def robust_fetch(user_id):
try:
return fetch_user(user_id)
except Exception as e:
return {'error': str(e)}
with patch('urllib.request.urlopen', side_effect=ConnectionError('Network down')):
r = robust_fetch(99)
assert 'error' in r
print(f'PASS: error handled: {r}')
# Verify mock was called correctly
mock_fn = MagicMock(return_value=42)
mock_fn(1, 2, key='val')
mock_fn(3, 4)
mock_fn.assert_called_with(3, 4)
print('PASS: mock call verification')try:
from hypothesis import given, strategies as st, settings
# Property: sort is idempotent
@given(st.lists(st.integers(), max_size=50))
@settings(max_examples=200)
def test_sort_idempotent(lst):
sorted_once = sorted(lst)
sorted_twice = sorted(sorted_lst := sorted(lst))
assert sorted_once == sorted_twice
# Property: reversed reversed = original
@given(st.lists(st.integers(), max_size=100))
def test_reverse_involution(lst):
assert list(reversed(list(reversed(lst)))) == lst
# Property: split+join roundtrip
@given(st.text(alphabet='abcdefghijklmnopqrstuvwxyz ', min_size=1, max_size=50))
def test_split_join_roundtrip(s):
words = s.split()
rejoined = ' '.join(words)
assert rejoined == ' '.join(s.split())
test_sort_idempotent()
test_reverse_involution()
test_split_join_roundtrip()
print('PASS: all hypothesis property tests')
except ImportError:
print('pip install hypothesis')
print('Hypothesis generates hundreds of random inputs automatically.')
print('Properties to test: commutativity, idempotence, round-trips, invariants.')
# Demo without hypothesis: manual property tests
import random
random.seed(42)
for _ in range(100):
lst = [random.randint(-100, 100) for _ in range(random.randint(0, 30))]
assert sorted(sorted(lst)) == sorted(lst), 'Sort not idempotent!'
print('PASS: manual sort idempotence test (100 random lists)')import pandas as pd
import numpy as np
class DataValidator:
def validate_types(self, df: pd.DataFrame, expected: dict) -> list:
'''Return list of (col, actual, expected) for mismatches.'''
# TODO: compare df[col].dtype.kind vs expected type chars
pass
def validate_ranges(self, df: pd.DataFrame, rules: dict) -> list:
'''rules = {col: (min, max)}. Return list of violations.'''
# TODO: for each col, check if any values outside range
pass
def validate_no_nulls(self, df: pd.DataFrame, cols: list) -> list:
'''Return cols that contain nulls.'''
# TODO: check each column for nulls
pass
# Test functions
def test_valid_types(): ... # TODO
def test_invalid_type(): ... # TODO
def test_range_pass(): ... # TODO
def test_range_fail(): ... # TODO
def test_no_nulls_pass(): ... # TODO
def test_no_nulls_fail(): ... # TODO
# Run all
for t in [test_valid_types, test_invalid_type, test_range_pass,
test_range_fail, test_no_nulls_pass, test_no_nulls_fail]:
t()
Python supports functional programming with map(), filter(), reduce(), and functools. These let you transform data declaratively without explicit loops.
nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# map() applies a function to every element
squares = list(map(lambda x: x**2, nums))
print("Squares:", squares)
# filter() keeps elements where function returns True
evens = list(filter(lambda x: x % 2 == 0, nums))
print("Evens:", evens)
# Chaining: square the even numbers
result = list(map(lambda x: x**2, filter(lambda x: x % 2 == 0, nums)))
print("Squared evens:", result)
# map with multiple iterables
a, b = [1, 2, 3], [10, 20, 30]
sums = list(map(lambda x, y: x + y, a, b))
print("Pairwise sums:", sums)from functools import reduce, partial
nums = [1, 2, 3, 4, 5]
# reduce() accumulates a result across an iterable
total = reduce(lambda acc, x: acc + x, nums)
print("Sum via reduce:", total)
product = reduce(lambda acc, x: acc * x, nums)
print("Product:", product)
# partial() freezes some arguments of a function
def power(base, exp):
return base ** exp
square = partial(power, exp=2)
cube = partial(power, exp=3)
print("5 squared:", square(5))
print("3 cubed: ", cube(3))
# partial with data processing
def scale(value, factor=1.0, offset=0.0):
return value * factor + offset
normalize = partial(scale, factor=0.1, offset=-0.5)
data = [0, 5, 10, 15, 20]
print("Normalized:", list(map(normalize, data)))from functools import reduce
# A function that returns a function
def make_multiplier(n):
return lambda x: x * n
double = make_multiplier(2)
triple = make_multiplier(3)
print("double(7):", double(7))
print("triple(7):", triple(7))
# Build a pipeline of transformations
def pipeline(*funcs):
def apply(data):
return reduce(lambda v, f: f(v), funcs, data)
return apply
process = pipeline(
lambda x: [v for v in x if v > 0], # keep positives
lambda x: list(map(lambda v: v**0.5, x)), # sqrt
lambda x: [round(v, 2) for v in x], # round
)
data = [-3, 4, 9, -1, 16, 25]
print("Input:", data)
print("Output:", process(data))from functools import reduce, partial
records = [
{"item": "apple", "qty": 3, "price": 1.20, "valid": True},
{"item": "banana", "qty": -1, "price": 0.50, "valid": False},
{"item": "cherry", "qty": 10, "price": 2.00, "valid": True},
]
# Filter valid records
valid = list(filter(lambda r: r["valid"] and r["qty"] > 0, records))
# Map to compute total
with_total = list(map(lambda r: {**r, "total": r["qty"] * r["price"]}, valid))
# Reduce to grand total
grand = reduce(lambda acc, r: acc + r["total"], with_total, 0.0)
for r in with_total:
print(f' {r["item"]:8s}: ${r["total"]:.2f}')
print(f"Grand total: ${grand:.2f}")from functools import reduce, partial
def process_data(numbers):
# Step 1: filter out negatives with filter()
# Step 2: multiply each by 3 with map()
# Step 3: sum with reduce()
pass
# Test
print(process_data([1, -2, 3, -4, 5])) # expect 27
def keep_small(numbers, limit=100):
return [n for n in numbers if abs(n) < limit]
process_small = partial(process_data, ...) # TODO: use partial with keep_small
The itertools module provides fast, memory-efficient tools for working with iterables. Essential for combinatorics, grouping, and chaining data streams.
import itertools
# chain: join multiple iterables
combined = list(itertools.chain([1, 2], [3, 4], [5]))
print("chain:", combined)
# islice: slice an iterable (works on generators too)
first5 = list(itertools.islice(range(100), 5))
print("islice first 5:", first5)
skip3_take4 = list(itertools.islice(range(100), 3, 7))
print("islice [3:7]:", skip3_take4)
# cycle: repeat sequence infinitely β take 7
colors = list(itertools.islice(itertools.cycle(['R', 'G', 'B']), 7))
print("cycle 7:", colors)
# repeat: repeat a value n times
zeros = list(itertools.repeat(0, 5))
print("repeat:", zeros)
# accumulate: running totals
import itertools
data = [1, 3, 2, 5, 4]
running = list(itertools.accumulate(data))
print("accumulate (sum):", running)import itertools
items = ['A', 'B', 'C']
# combinations: order does not matter, no repeats
combs = list(itertools.combinations(items, 2))
print("combinations(2):", combs)
# permutations: order matters
perms = list(itertools.permutations(items, 2))
print("permutations(2):", perms)
# product: Cartesian product (like nested loops)
colors = ['red', 'blue']
sizes = ['S', 'M', 'L']
variants = list(itertools.product(colors, sizes))
print("product:", variants)
# product with repeat: like rolling dice twice
dice = list(itertools.product(range(1, 4), repeat=2))
print("dice pairs:", dice[:6], "...")
print(f"Combinations: {len(combs)}, Permutations: {len(perms)}, Product: {len(dice)}")import itertools
# groupby: group consecutive elements by a key
# NOTE: input must be sorted by the key first!
data = [
{"dept": "eng", "name": "Alice"},
{"dept": "eng", "name": "Bob"},
{"dept": "sales","name": "Carol"},
{"dept": "sales","name": "Dave"},
{"dept": "hr", "name": "Eve"},
]
data.sort(key=lambda x: x["dept"])
for dept, members in itertools.groupby(data, key=lambda x: x["dept"]):
names = [m["name"] for m in members]
print(f" {dept}: {names}")
# takewhile: take elements while condition is True
nums = [2, 4, 6, 1, 8, 10]
taken = list(itertools.takewhile(lambda x: x % 2 == 0, nums))
print("takewhile even:", taken) # stops at 1
# dropwhile: skip elements while condition is True
dropped = list(itertools.dropwhile(lambda x: x % 2 == 0, nums))
print("dropwhile even:", dropped) # starts from 1import itertools
param_grid = {
"learning_rate": [0.01, 0.1, 0.001],
"max_depth": [3, 5, 7],
"n_estimators": [50, 100],
}
keys = list(param_grid.keys())
values = list(param_grid.values())
configs = list(itertools.product(*values))
print(f"Total configs: {len(configs)}")
for i, combo in enumerate(itertools.islice(configs, 3)):
cfg = dict(zip(keys, combo))
print(f" Config {i+1}: {cfg}")
print(" ...")import itertools
def all_pairs(items):
# Return list of all unique 2-element combinations
pass
def team_schedules(teams):
# Return list of (home, away) tuples for all matchups
pass
def batch(iterable, n):
# Yield successive n-sized chunks from iterable
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, n))
if not chunk:
break
yield chunk
# Tests
print(all_pairs(['A','B','C','D'])) # 6 pairs
print(len(team_schedules(['X','Y','Z']))) # 6 matchups
print(list(batch(range(10), 3))) # [[0,1,2],[3,4,5],[6,7,8],[9]]
Python resolves names using the LEGB rule (Local, Enclosing, Global, Built-in). Closures capture variables from enclosing scopes and are the foundation of decorators and factories.
x = "global"
def outer():
x = "enclosing"
def inner():
x = "local"
print("inner sees:", x) # local
inner()
print("outer sees:", x) # enclosing
outer()
print("module sees:", x) # global
# Built-in scope: Python's built-in names (len, print, etc.)
print("built-in len:", len([1,2,3])) # 3
# global keyword β modify a global from inside a function
counter = 0
def increment():
global counter
counter += 1
increment()
increment()
print("counter:", counter) # 2
# nonlocal keyword β modify an enclosing variable
def make_counter():
count = 0
def inc():
nonlocal count
count += 1
return count
return inc
c = make_counter()
print(c(), c(), c()) # 1 2 3# A closure captures variables from its defining scope
def make_adder(n):
# n is captured in the closure
def add(x):
return x + n
return add
add5 = make_adder(5)
add10 = make_adder(10)
print("add5(3):", add5(3)) # 8
print("add10(3):", add10(3)) # 13
# Each closure has its own cell
print("Different objects:", add5 is not add10) # True
# Closure with mutable state
def make_accumulator():
total = 0
def accumulate(value):
nonlocal total
total += value
return total
return accumulate
acc = make_accumulator()
for v in [10, 25, 5, 60]:
print(f" +{v} -> running total: {acc(v)}")# Common closure gotcha: late binding in loops
# All closures share the SAME variable i
funcs_bad = [lambda: i for i in range(5)]
print("Late binding:", [f() for f in funcs_bad]) # [4, 4, 4, 4, 4]!
# Fix 1: capture current value as default argument
funcs_good = [lambda i=i: i for i in range(5)]
print("Default arg fix:", [f() for f in funcs_good]) # [0, 1, 2, 3, 4]
# Fix 2: use a factory function
def make_func(i):
def f():
return i
return f
funcs_factory = [make_func(i) for i in range(5)]
print("Factory fix:", [f() for f in funcs_factory]) # [0, 1, 2, 3, 4]
# Inspecting closure cells
import inspect
def outer(x):
def inner():
return x * 2
return inner
fn = outer(7)
print("Closure cell value:", fn.__closure__[0].cell_contents) # 7def make_range_validator(min_val, max_val, field="value"):
def validate(x):
if not (min_val <= x <= max_val):
raise ValueError(f"{field} {x} out of range [{min_val}, {max_val}]")
return True
return validate
def make_str_validator(max_len, allowed_chars=None):
def validate(s):
if len(s) > max_len:
raise ValueError(f"String too long: {len(s)} > {max_len}")
if allowed_chars and not all(c in allowed_chars for c in s):
raise ValueError(f"Invalid characters in: {s!r}")
return True
return validate
validate_age = make_range_validator(0, 120, "age")
validate_score = make_range_validator(0.0, 1.0, "score")
validate_name = make_str_validator(50, allowed_chars="abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ")
tests = [(validate_age, 25), (validate_score, 0.85), (validate_name, "Alice Smith")]
for validator, val in tests:
try:
print(f" OK: {val!r}")
validator(val)
except ValueError as e:
print(f" FAIL: {e}")def memoize(func):
cache = {} # closure variable
def wrapper(*args):
# TODO: if args in cache, return cached result
# TODO: otherwise, call func(*args), store, return
pass
return wrapper
@memoize
def fib(n):
if n <= 1:
return n
return fib(n-1) + fib(n-2)
import time
t0 = time.time()
print(fib(35)) # should be fast after memoize
print(f"Time: {time.time()-t0:.4f}s")
Decorators wrap functions or classes to add behavior without modifying their source. Master stacked, parameterized, and class-based decorators.
import functools, time
def timer(func):
@functools.wraps(func) # preserves __name__, __doc__
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
print(f"[timer] {func.__name__} took {(time.perf_counter()-t0)*1000:.2f}ms")
return result
return wrapper
def logger(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"[logger] calling {func.__name__} with args={args}, kwargs={kwargs}")
return func(*args, **kwargs)
return wrapper
# Decorators apply bottom-up: logger wraps timer-wrapped function
@logger
@timer
def compute(n):
return sum(range(n))
result = compute(100_000)
print("Result:", result)
print("Name preserved:", compute.__name__) # compute, not wrapperimport functools
def retry(times=3, exceptions=(Exception,)):
# Outer function receives decorator arguments
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, times + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
print(f" Attempt {attempt} failed: {e}")
if attempt == times:
raise
return wrapper
return decorator
attempt_count = 0
@retry(times=3, exceptions=(ValueError,))
def unstable_fetch(url):
global attempt_count
attempt_count += 1
if attempt_count < 3:
raise ValueError(f"Connection failed (attempt {attempt_count})")
return f"Data from {url}"
result = unstable_fetch("https://api.example.com")
print("Got:", result)import functools
class CallCounter:
# A class-based decorator that counts calls
def __init__(self, func):
functools.update_wrapper(self, func)
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"[CallCounter] {self.func.__name__} called {self.count}x")
return self.func(*args, **kwargs)
@CallCounter
def add(a, b):
return a + b
add(1, 2)
add(3, 4)
add(5, 6)
print("Total calls:", add.count) # 3
# Decorator that works on both functions and methods
class validate_positive:
def __init__(self, func):
functools.update_wrapper(self, func)
self.func = func
def __call__(self, *args, **kwargs):
for arg in args:
if isinstance(arg, (int, float)) and arg < 0:
raise ValueError(f"Expected positive, got {arg}")
return self.func(*args, **kwargs)
@validate_positive
def sqrt(x):
return x ** 0.5
print(sqrt(9)) # 3.0
try: sqrt(-1)
except ValueError as e: print("Caught:", e)import functools, time
def rate_limit(calls_per_second=1):
min_interval = 1.0 / calls_per_second
last_called = [0.0] # mutable container to allow mutation in closure
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait = min_interval - elapsed
if wait > 0:
print(f" Rate limit: waiting {wait:.2f}s")
time.sleep(wait)
last_called[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(calls_per_second=2)
def fetch(url):
return f"Response from {url}"
urls = ["http://a.com", "http://b.com", "http://c.com"]
for url in urls:
print(fetch(url))import functools, time
def cache(ttl=60):
def decorator(func):
store = {} # {args: (result, timestamp)}
@functools.wraps(func)
def wrapper(*args):
now = time.time()
if args in store:
result, ts = store[args]
if now - ts < ttl:
print(f" [cache hit] age={now-ts:.1f}s")
return result
# TODO: call func, store result with timestamp, return result
pass
return wrapper
return decorator
@cache(ttl=2)
def get_value(key):
return f"{key}:{time.time():.2f}"
print(get_value("x"))
print(get_value("x")) # should be cache hit
time.sleep(2.1)
print(get_value("x")) # should re-fetch after TTL
ABCs enforce interface contracts at class creation time. Protocols (PEP 544) enable structural subtyping β duck typing with type-checker support.
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self) -> float:
pass
@abstractmethod
def perimeter(self) -> float:
pass
def describe(self):
# Concrete method shared by all subclasses
return f"{type(self).__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"
class Circle(Shape):
def __init__(self, r): self.r = r
def area(self): return 3.14159 * self.r ** 2
def perimeter(self): return 2 * 3.14159 * self.r
class Rectangle(Shape):
def __init__(self, w, h): self.w, self.h = w, h
def area(self): return self.w * self.h
def perimeter(self): return 2 * (self.w + self.h)
for shape in [Circle(5), Rectangle(4, 6)]:
print(shape.describe())
# Cannot instantiate ABC directly
try:
s = Shape()
except TypeError as e:
print("Cannot instantiate:", e)from typing import Protocol, runtime_checkable
@runtime_checkable
class Drawable(Protocol):
def draw(self) -> str: ...
def get_color(self) -> str: ...
# Any class with draw() and get_color() satisfies Drawable
# No explicit inheritance required!
class Circle:
def draw(self): return "O"
def get_color(self): return "red"
class Square:
def draw(self): return "[]"
def get_color(self): return "blue"
class TextLabel:
def draw(self): return "TEXT"
def get_color(self): return "black"
def render(item: Drawable) -> str:
return f"Drawing {item.draw()} in {item.get_color()}"
shapes = [Circle(), Square(), TextLabel()]
for s in shapes:
print(render(s))
print(f" isinstance check: {isinstance(s, Drawable)}")from abc import ABC, abstractmethod
class Sized(ABC):
@abstractmethod
def __len__(self): ...
@classmethod
def __subclasshook__(cls, C):
# Automatically treat ANY class with __len__ as Sized
if cls is Sized:
if any("__len__" in B.__dict__ for B in C.__mro__):
return True
return NotImplemented
# list, dict, str all have __len__ β they are virtual subclasses
print(isinstance([], Sized)) # True
print(isinstance({}, Sized)) # True
print(isinstance("hi", Sized)) # True
print(isinstance(42, Sized)) # False
# Register a virtual subclass without inheritance
class SparseVector:
def __init__(self, data): self.data = data
def __len__(self): return len(self.data)
print(isinstance(SparseVector({0: 1.0}), Sized)) # True
print(issubclass(SparseVector, Sized)) # Truefrom abc import ABC, abstractmethod
from typing import Iterator, Any
class DataSource(ABC):
@abstractmethod
def connect(self) -> bool: ...
@abstractmethod
def read(self) -> Iterator[Any]: ...
@abstractmethod
def close(self) -> None: ...
def stream(self):
if self.connect():
yield from self.read()
self.close()
class CSVSource(DataSource):
def __init__(self, rows):
self.rows = rows
def connect(self):
print("CSV: opening"); return True
def read(self):
return iter(self.rows)
def close(self):
print("CSV: closed")
class APISource(DataSource):
def __init__(self, data):
self.data = data
def connect(self):
print("API: authenticated"); return True
def read(self):
return iter(self.data)
def close(self):
print("API: session ended")
for src in [CSVSource([1,2,3]), APISource(["a","b"])]:
for record in src.stream():
print(" ", record)from typing import Protocol, runtime_checkable
from dataclasses import dataclass
@runtime_checkable
class Serializable(Protocol):
def to_dict(self) -> dict: ...
# Note: classmethods in Protocols are tricky β just include to_dict for now
@dataclass
class Product:
name: str
price: float
qty: int
def to_dict(self):
# TODO: return {"name": ..., "price": ..., "qty": ...}
pass
@classmethod
def from_dict(cls, d: dict):
# TODO: return cls(d["name"], d["price"], d["qty"])
pass
def save_all(items):
results = []
for item in items:
if isinstance(item, Serializable):
results.append(item.to_dict())
else:
print(f"Skipped: {item!r} is not Serializable")
return results
products = [Product("apple", 1.2, 50), Product("banana", 0.5, 200)]
print(save_all(products))
Descriptors control attribute access via __get__, __set__, __delete__. The property() built-in is the most common descriptor. __slots__ reduces memory overhead.
class Temperature:
def __init__(self, celsius=0):
self._celsius = celsius # private storage
@property
def celsius(self):
return self._celsius
@celsius.setter
def celsius(self, value):
if value < -273.15:
raise ValueError(f"Temperature {value} below absolute zero!")
self._celsius = value
@celsius.deleter
def celsius(self):
print("Resetting temperature to 0")
self._celsius = 0
@property
def fahrenheit(self):
# Read-only computed property
return self._celsius * 9/5 + 32
t = Temperature(25)
print(f"{t.celsius}C = {t.fahrenheit}F")
t.celsius = 100
print(f"Boiling: {t.celsius}C = {t.fahrenheit}F")
del t.celsius
print(f"Reset: {t.celsius}C")
try:
t.celsius = -300
except ValueError as e:
print("Caught:", e)class Validated:
# A reusable descriptor for validated attributes
def __init__(self, min_val=None, max_val=None):
self.min_val = min_val
self.max_val = max_val
self.name = None # set by __set_name__
def __set_name__(self, owner, name):
self.name = name # called when class is defined
def __get__(self, obj, objtype=None):
if obj is None:
return self # class-level access returns descriptor itself
return obj.__dict__.get(self.name, None)
def __set__(self, obj, value):
if self.min_val is not None and value < self.min_val:
raise ValueError(f"{self.name} must be >= {self.min_val}, got {value}")
if self.max_val is not None and value > self.max_val:
raise ValueError(f"{self.name} must be <= {self.max_val}, got {value}")
obj.__dict__[self.name] = value
class Person:
age = Validated(min_val=0, max_val=150)
salary = Validated(min_val=0)
def __init__(self, name, age, salary):
self.name = name
self.age = age
self.salary = salary
p = Person("Alice", 30, 75000)
print(f"{p.name}: age={p.age}, salary={p.salary}")
try:
p.age = -5
except ValueError as e:
print("Caught:", e)import sys
class PointNormal:
def __init__(self, x, y):
self.x, self.y = x, y
class PointSlots:
__slots__ = ('x', 'y') # declare allowed attributes
def __init__(self, x, y):
self.x, self.y = x, y
n = PointNormal(1.0, 2.0)
s = PointSlots(1.0, 2.0)
print(f"Without slots: {sys.getsizeof(n)} bytes, has __dict__: {hasattr(n, '__dict__')}")
print(f"With slots: {sys.getsizeof(s)} bytes, has __dict__: {hasattr(s, '__dict__')}")
# Slots prevents adding arbitrary attributes
try:
s.z = 3.0
except AttributeError as e:
print("Cannot add:", e)
# Memory comparison with many instances
normal_mem = sum(sys.getsizeof(PointNormal(i, i)) for i in range(1000))
slots_mem = sum(sys.getsizeof(PointSlots(i, i)) for i in range(1000))
print(f"1000 objects β normal: {normal_mem} bytes, slots: {slots_mem} bytes")
print(f"Slots saves: {normal_mem - slots_mem} bytes ({(1-slots_mem/normal_mem)*100:.1f}%)")class TypedAttr:
def __init__(self, expected_type, default=None):
self.expected_type = expected_type
self.default = default
self.name = None
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None: return self
return obj.__dict__.get(self.name, self.default)
def __set__(self, obj, value):
if not isinstance(value, self.expected_type):
raise TypeError(
f"{self.name} must be {self.expected_type.__name__}, "
f"got {type(value).__name__}"
)
obj.__dict__[self.name] = value
class AppConfig:
host = TypedAttr(str, "localhost")
port = TypedAttr(int, 8080)
debug = TypedAttr(bool, False)
timeout = TypedAttr(float, 30.0)
cfg = AppConfig()
cfg.host = "0.0.0.0"
cfg.port = 443
cfg.debug = True
cfg.timeout = 5.0
print(f"Config: {cfg.host}:{cfg.port} debug={cfg.debug} timeout={cfg.timeout}s")
try:
cfg.port = "8080" # wrong type!
except TypeError as e:
print("Caught:", e)from collections import namedtuple
class UnitFloat:
Reading = namedtuple("Reading", ["value", "unit"])
def __init__(self, unit, min_val=None, max_val=None):
self.unit = unit
self.min_val = min_val
self.max_val = max_val
self.name = None
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None: return self
val = obj.__dict__.get(self.name)
# TODO: return UnitFloat.Reading(val, self.unit) if val is not None else None
pass
def __set__(self, obj, value):
# TODO: validate type is float or int, validate min/max, store
pass
class Measurement:
temperature = UnitFloat("C", min_val=-273.15)
pressure = UnitFloat("Pa", min_val=0)
humidity = UnitFloat("%", min_val=0, max_val=100)
m = Measurement()
m.temperature = 22.5
m.pressure = 101325.0
m.humidity = 65.0
print(m.temperature) # Reading(value=22.5, unit='C')
print(m.humidity)
Python manages memory via reference counting and a cyclic garbage collector. Use sys, gc, tracemalloc, and cProfile to find memory leaks and performance bottlenecks.
import sys
# Basic sizes
for obj in [0, 1, 255, 2**100, 3.14, "hi", "hello world", [], [1,2,3], {}, {"a":1}]:
print(f" {repr(obj):<25} {sys.getsizeof(obj):>6} bytes")
# id() returns memory address
a = [1, 2, 3]
b = a # same object
c = a.copy() # different object
print("a is b:", a is b) # True
print("a is c:", a is c) # False
print("id(a)==id(b):", id(a) == id(b)) # True
# Small integers are cached
x, y = 100, 100
print("100 is 100:", x is y) # True (cached)
x, y = 1000, 1000
print("1000 is 1000:", x is y) # False (not cached)
# Nested containers: getsizeof is shallow!
lst = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
print("Shallow size:", sys.getsizeof(lst)) # just the list objectimport gc
print("GC enabled:", gc.isenabled())
print("GC thresholds:", gc.get_threshold()) # (700, 10, 10)
# Reference cycle: a -> b -> a, both become unreachable
class Node:
def __init__(self, name):
self.name = name
self.ref = None
a = Node("A")
b = Node("B")
a.ref = b # a -> b
b.ref = a # b -> a (cycle!)
# Delete our references
del a, b
before = gc.collect(0)
print(f"GC collected {before} objects in gen-0")
# Check what gc is tracking
tracked = gc.get_count()
print("GC counts (gen0, gen1, gen2):", tracked)
# Use __del__ to observe collection
class Tracked:
def __del__(self):
print(f" {self!r} collected")
x = Tracked()
del x # collected immediately (refcount -> 0)
gc.collect() # collect cyclesimport tracemalloc, cProfile, io, pstats
# --- tracemalloc: trace memory allocations ---
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()
big_list = [i**2 for i in range(10_000)]
snapshot2 = tracemalloc.take_snapshot()
stats = snapshot2.compare_to(snapshot1, "lineno")
for stat in stats[:3]:
print(f" {stat}")
tracemalloc.stop()
del big_list
# --- cProfile: find slow functions ---
def slow_sum(n):
return sum(i**2 for i in range(n))
def fast_sum(n):
return n * (n-1) * (2*n-1) // 6 # formula
pr = cProfile.Profile()
pr.enable()
slow_sum(50_000)
fast_sum(50_000)
pr.disable()
sio = io.StringIO()
ps = pstats.Stats(pr, stream=sio).sort_stats("cumulative")
ps.print_stats(5)
print(sio.getvalue())import tracemalloc, sys
def deep_size(obj, seen=None):
# Recursively estimate size of a container
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
if isinstance(obj, dict):
size += sum(deep_size(v, seen) for v in obj.values())
size += sum(deep_size(k, seen) for k in obj.keys())
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(deep_size(i, seen) for i in obj)
return size
# Simulate a request that leaks memory
cache = {}
def handle_request(key, data):
cache[key] = data # intentional "leak" into global cache
return len(data)
tracemalloc.start()
snap1 = tracemalloc.take_snapshot()
for i in range(5):
handle_request(f"req_{i}", list(range(1000)))
snap2 = tracemalloc.take_snapshot()
top = snap2.compare_to(snap1, "lineno")[:2]
for stat in top:
print(f" Memory diff: {stat}")
print(f"Cache deep size: {deep_size(cache):,} bytes")
tracemalloc.stop()import cProfile, timeit
def trial_division(n):
primes = []
for num in range(2, n+1):
if all(num % i != 0 for i in range(2, int(num**0.5)+1)):
primes.append(num)
return primes
def sieve(n):
is_prime = [True] * (n+1)
is_prime[0] = is_prime[1] = False
for i in range(2, int(n**0.5)+1):
if is_prime[i]:
for j in range(i*i, n+1, i):
is_prime[j] = False
return [i for i, p in enumerate(is_prime) if p]
N = 10_000
# Benchmark
t1 = timeit.timeit(lambda: trial_division(N), number=3)
t2 = timeit.timeit(lambda: sieve(N), number=3)
print(f"trial_division: {t1:.3f}s")
print(f"sieve: {t2:.3f}s")
print(f"Speedup: {t1/t2:.1f}x")
# Profile trial_division
cProfile.run("trial_division(5000)", sort="cumulative")
Use the logging module instead of print() for production code. It supports levels, handlers, formatters, and log rotation β all configurable without code changes.
import logging
# Configure root logger
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)-8s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("myapp")
# Five standard levels (low to high)
logger.debug("Detailed info for debugging")
logger.info("Normal operation: user logged in")
logger.warning("Something unexpected but not fatal")
logger.error("A failure occurred β function returned None")
logger.critical("Service is down!")
# Log exceptions with traceback
try:
result = 1 / 0
except ZeroDivisionError:
logger.exception("Division failed") # includes traceback
# Extra context
user_id = 42
logger.info("Processing order", extra={"user": user_id})
# Check effective level
print("Effective level:", logger.getEffectiveLevel()) # 10 = DEBUGimport logging, io
logger = logging.getLogger("pipeline")
logger.setLevel(logging.DEBUG)
logger.handlers.clear() # avoid duplicate handlers in notebooks
# Handler 1: console with simple format
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING) # console only shows WARNING+
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
# Handler 2: "file" (using StringIO here for demo)
log_buffer = io.StringIO()
fh = logging.StreamHandler(log_buffer)
fh.setLevel(logging.DEBUG) # file gets everything
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-8s [%(funcName)s:%(lineno)d] %(message)s",
datefmt="%H:%M:%S"
))
logger.addHandler(ch)
logger.addHandler(fh)
def process(data):
logger.debug("Starting process with %d items", len(data))
logger.info("Processing...")
if not data:
logger.warning("Empty input")
logger.debug("Done")
process([1, 2, 3])
process([])
print("--- File log ---")
print(log_buffer.getvalue())import logging
# Best practice: use __name__ as logger name
# This creates a hierarchy: "myapp" -> "myapp.db" -> "myapp.db.query"
root = logging.getLogger()
app = logging.getLogger("myapp")
db = logging.getLogger("myapp.db")
qry = logging.getLogger("myapp.db.query")
# Set up root handler for the demo
logging.basicConfig(
level=logging.DEBUG,
format="%(name)-20s %(levelname)s: %(message)s"
)
# Child loggers propagate to parent by default
app.setLevel(logging.INFO)
db.setLevel(logging.DEBUG) # db subtree shows DEBUG
app.info("App started")
app.debug("This won't show β app is INFO level")
db.debug("DB connection established")
qry.debug("SELECT * FROM users")
# Disable propagation to avoid double-logging
# child_logger.propagate = False
# Silence noisy third-party libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
print("Third-party loggers silenced")import logging, time, io
def setup_logger(name, level=logging.DEBUG):
log = logging.getLogger(name)
log.setLevel(level)
if not log.handlers:
h = logging.StreamHandler()
h.setFormatter(logging.Formatter(
"%(asctime)s %(name)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S"
))
log.addHandler(h)
return log
log = setup_logger("etl")
def extract(source):
log.info("Extracting from %s", source)
data = list(range(100)) # simulated data
log.debug("Extracted %d records", len(data))
return data
def transform(data):
log.info("Transforming %d records", len(data))
t0 = time.time()
result = [x * 2 for x in data if x % 5 != 0]
log.debug("Transform took %.3fs, %d records remain", time.time()-t0, len(result))
return result
def load(data, target):
log.info("Loading %d records to %s", len(data), target)
# Simulate occasional error
if len(data) > 70:
log.warning("Large batch β consider chunking")
log.info("Load complete")
try:
d = extract("sales.csv")
d = transform(d)
load(d, "warehouse")
except Exception:
log.exception("Pipeline failed")import logging, re
from collections import Counter
def parse_log_line(line):
# Pattern: HH:MM:SS LEVEL name: message
pattern = r"(\d{2}:\d{2}:\d{2}) (\w+) (\S+): (.+)"
m = re.match(pattern, line)
if m:
return {"time": m.group(1), "level": m.group(2),
"name": m.group(3), "msg": m.group(4)}
return None
def analyze_logs(lines):
# TODO: parse each line, count levels, return Counter dict
pass
sample_logs = [
"12:00:01 INFO myapp: started",
"12:00:02 DEBUG myapp.db: query took 0.1s",
"12:00:03 WARNING myapp: memory 80% full",
"12:00:04 ERROR myapp: connection refused",
"12:00:05 WARNING myapp: retry 1/3",
]
counts = analyze_logs(sample_logs)
print("Level counts:", counts)
argparse is Python's standard library for building command-line interfaces. It handles argument parsing, type validation, help generation, and subcommands.
import argparse
# Simulate command-line arguments (replace sys.argv for demo)
parser = argparse.ArgumentParser(
description="Process a data file",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Positional argument (required)
parser.add_argument("filename", help="Input CSV file path")
# Optional arguments
parser.add_argument("-o", "--output", default="output.csv", help="Output file")
parser.add_argument("-n", "--rows", type=int, default=100, help="Number of rows")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--format", choices=["csv","json","parquet"], default="csv")
# Parse a fake argument list
args = parser.parse_args(["data.csv", "--rows", "500", "--verbose", "--format", "json"])
print(f"File: {args.filename}")
print(f"Output: {args.output}")
print(f"Rows: {args.rows}")
print(f"Verbose: {args.verbose}")
print(f"Format: {args.format}")import argparse
parser = argparse.ArgumentParser(prog="datool", description="Data pipeline tool")
subs = parser.add_subparsers(dest="command", required=True)
# Subcommand: convert
convert = subs.add_parser("convert", help="Convert file format")
convert.add_argument("input", help="Input file")
convert.add_argument("output", help="Output file")
convert.add_argument("--compression", choices=["none","gzip","snappy"], default="none")
# Subcommand: stats
stats = subs.add_parser("stats", help="Show file statistics")
stats.add_argument("file", help="File to analyze")
stats.add_argument("--col", action="append", dest="cols", help="Column to analyze (repeatable)")
# Subcommand: validate
validate = subs.add_parser("validate", help="Validate schema")
validate.add_argument("file")
validate.add_argument("--schema", required=True)
# Demo: parse "convert" command
args = parser.parse_args(["convert", "input.csv", "output.parquet", "--compression", "snappy"])
print(f"Command: {args.command}")
print(f"Input: {args.input}")
print(f"Output: {args.output}")
print(f"Compression: {args.compression}")
# Demo: parse "stats" command
args2 = parser.parse_args(["stats", "data.csv", "--col", "price", "--col", "qty"])
print(f"Stats cols: {args2.cols}")import argparse
parser = argparse.ArgumentParser(description="Model training CLI")
# Argument group for visual organization in --help
data_group = parser.add_argument_group("Data options")
data_group.add_argument("--train", required=True, help="Training data path")
data_group.add_argument("--val", required=True, help="Validation data path")
data_group.add_argument("--test", help="Test data path")
# Argument group for model options
model_group = parser.add_argument_group("Model options")
model_group.add_argument("--lr", type=float, default=0.001)
model_group.add_argument("--epochs", type=int, default=10)
# Mutually exclusive: can't use --gpu and --cpu together
device = parser.add_mutually_exclusive_group()
device.add_argument("--gpu", action="store_true")
device.add_argument("--cpu", action="store_true")
# Custom type validator
def positive_int(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"{value} must be a positive integer")
return ivalue
model_group.add_argument("--batch", type=positive_int, default=32)
args = parser.parse_args(["--train", "train.csv", "--val", "val.csv",
"--lr", "0.01", "--gpu", "--batch", "64"])
print(vars(args))import argparse, sys
def run_etl(args):
print(f"ETL Job: {args.job_name}")
print(f" Source: {args.source} (format={args.format})")
print(f" Target: {args.target}")
print(f" Batch: {args.batch_size}")
print(f" Dry run: {args.dry_run}")
if args.dry_run:
print(" [DRY RUN] No data written.")
return 0
print(" Writing data...")
return 0
parser = argparse.ArgumentParser(description="ETL Pipeline Runner")
parser.add_argument("job_name", help="Job identifier")
parser.add_argument("source", help="Source connection string")
parser.add_argument("target", help="Target connection string")
parser.add_argument("--format", choices=["csv","json","parquet"], default="csv")
parser.add_argument("--batch-size", type=int, default=1000, dest="batch_size")
parser.add_argument("--dry-run", action="store_true", dest="dry_run")
# Demo
args = parser.parse_args([
"daily_sales", "s3://bucket/sales.parquet", "postgres://db/warehouse",
"--format", "parquet", "--batch-size", "5000", "--dry-run"
])
sys.exit(run_etl(args))import argparse, csv, re
parser = argparse.ArgumentParser(prog="fileproc")
subs = parser.add_subparsers(dest="cmd", required=True)
# count subcommand
count_p = subs.add_parser("count", help="Count lines matching pattern")
count_p.add_argument("file")
count_p.add_argument("--pattern", default=".*", help="Regex pattern")
# summary subcommand
sum_p = subs.add_parser("summary", help="Summarize CSV columns")
sum_p.add_argument("file")
sum_p.add_argument("--col", action="append", dest="cols")
def cmd_count(args):
pattern = re.compile(args.pattern)
# TODO: open args.file, count lines matching pattern
pass
def cmd_summary(args):
# TODO: open CSV, for each col in args.cols, print first/last/count
pass
args = parser.parse_args(["count", "data.txt", "--pattern", "ERROR"])
if args.cmd == "count":
cmd_count(args)
elif args.cmd == "summary":
cmd_summary(args)
Python's json module handles serialization to/from JSON. For Python-specific objects, use pickle. For configuration, use configparser or tomllib.
import json
from datetime import datetime, date
from decimal import Decimal
# Basic usage
data = {"name": "Alice", "scores": [95, 87, 92], "active": True}
text = json.dumps(data, indent=2)
print("JSON string:")
print(text)
loaded = json.loads(text)
print("Loaded back:", loaded)
# Custom encoder for non-serializable types
class AppEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, set):
return sorted(list(obj))
return super().default(obj)
record = {
"created": datetime(2024, 1, 15, 9, 30),
"price": Decimal("29.99"),
"tags": {"python", "data", "tutorial"},
}
print(json.dumps(record, cls=AppEncoder, indent=2))import json
from datetime import datetime
# Custom decoder using object_hook
def decode_record(d):
for key, val in d.items():
# Auto-parse ISO datetime strings
if isinstance(val, str) and len(val) >= 19 and "T" in val:
try:
d[key] = datetime.fromisoformat(val)
except ValueError:
pass
return d
json_str = '''
{
"id": 42,
"name": "Order #42",
"created_at": "2024-01-15T09:30:00",
"updated_at": "2024-03-20T14:00:00",
"amount": 299.99
}
'''
obj = json.loads(json_str, object_hook=decode_record)
print("Type of created_at:", type(obj["created_at"])) # datetime
print("Year:", obj["created_at"].year)
# Simple schema validation pattern
def validate(data, schema):
errors = []
for field, expected_type in schema.items():
if field not in data:
errors.append(f"Missing: {field}")
elif not isinstance(data[field], expected_type):
errors.append(f"{field}: expected {expected_type.__name__}, got {type(data[field]).__name__}")
return errors
schema = {"id": int, "name": str, "amount": float}
print("Errors:", validate(obj, schema) or "None")import pickle, configparser, io
# βββ pickle: serialize any Python object βββββββββββββββββββββββββββββββββββ
class Model:
def __init__(self, weights):
self.weights = weights
def predict(self, x):
return sum(w * xi for w, xi in zip(self.weights, x))
model = Model([0.5, -0.3, 1.2])
buf = io.BytesIO()
pickle.dump(model, buf)
print("Pickled size:", buf.tell(), "bytes")
buf.seek(0)
loaded_model = pickle.load(buf)
print("Prediction:", loaded_model.predict([1.0, 2.0, 3.0]))
# βββ configparser: INI-format config files βββββββββββββββββββββββββββββββββ
config_text = '''
[database]
host = localhost
port = 5432
name = mydb
[app]
debug = true
workers = 4
log_level = INFO
'''
cfg = configparser.ConfigParser()
cfg.read_string(config_text)
print("DB host:", cfg["database"]["host"])
print("DB port:", cfg.getint("database", "port"))
print("Debug: ", cfg.getboolean("app", "debug"))
print("Workers:", cfg.getint("app", "workers"))
print("Sections:", cfg.sections())import json, hashlib
from datetime import datetime
class APICache:
def __init__(self):
self._store = {} # in memory; use file I/O in production
def _key(self, url, params):
raw = json.dumps({"url": url, "params": params}, sort_keys=True)
return hashlib.md5(raw.encode()).hexdigest()
def get(self, url, params=None):
k = self._key(url, params or {})
if k in self._store:
entry = json.loads(self._store[k])
age = (datetime.now() - datetime.fromisoformat(entry["cached_at"])).seconds
print(f" [cache hit] age={age}s, key={k[:8]}")
return entry["data"]
return None
def set(self, url, params, data):
k = self._key(url, params or {})
entry = {"data": data, "cached_at": datetime.now().isoformat(), "url": url}
self._store[k] = json.dumps(entry)
print(f" [cache set] key={k[:8]}")
cache = APICache()
url = "https://api.example.com/prices"
params = {"symbol": "AAPL", "period": "1d"}
result = cache.get(url, params)
if result is None:
data = {"symbol": "AAPL", "price": 195.50, "volume": 1_200_000}
cache.set(url, params, data)
result = data
print("Result:", result)
cache.get(url, params) # should be cache hitimport json, pathlib
class ConfigManager:
def __init__(self, path, defaults=None):
self.path = pathlib.Path(path)
self._data = dict(defaults or {})
# TODO: if self.path exists, load and merge with self._data
pass
def get(self, key, default=None):
# TODO: return self._data.get(key, default)
pass
def set(self, key, value):
# TODO: update self._data[key] = value
pass
def save(self):
# TODO: write self._data to self.path as JSON (indent=2)
pass
# Test
import tempfile, os
with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as f:
json.dump({"theme": "dark"}, f)
tmp = f.name
cfg = ConfigManager(tmp, defaults={"theme": "light", "font_size": 12})
print("theme:", cfg.get("theme")) # dark (from file)
print("font:", cfg.get("font_size")) # 12 (from defaults)
cfg.set("font_size", 14)
cfg.save()
cfg2 = ConfigManager(tmp)
print("reloaded font:", cfg2.get("font_size")) # 14
os.unlink(tmp)
pathlib.Path is the modern way to handle filesystem paths in Python. It's cross-platform, object-oriented, and integrates with all standard file operations.
from pathlib import Path
# Create a Path object β cross-platform!
p = Path("/home/user/data/sales_2024.csv")
# Path components
print("name: ", p.name) # sales_2024.csv
print("stem: ", p.stem) # sales_2024
print("suffix: ", p.suffix) # .csv
print("suffixes: ", Path("a.tar.gz").suffixes) # ['.tar', '.gz']
print("parent: ", p.parent) # /home/user/data
print("parts: ", p.parts)
# Building paths with / operator
base = Path("/home/user")
data = base / "data"
outfile = data / "reports" / "q1.xlsx"
print("Built path:", outfile)
# Resolve, absolute, relative_to
cwd = Path.cwd()
print("CWD:", cwd)
print("Home:", Path.home())
# Check existence
print("exists:", p.exists())
print("is_file:", p.is_file())
print("is_dir: ", p.is_dir())
# Change suffix
renamed = p.with_suffix(".parquet")
print("With new suffix:", renamed)import tempfile, pathlib
# Create a temp directory structure for demo
tmp = pathlib.Path(tempfile.mkdtemp())
(tmp / "data").mkdir()
(tmp / "data" / "sales.csv").write_text("a,b")
(tmp / "data" / "costs.csv").write_text("c,d")
(tmp / "reports").mkdir()
(tmp / "reports" / "q1.xlsx").write_text("x")
(tmp / "reports" / "q2.xlsx").write_text("y")
(tmp / "config.json").write_text("{}")
# glob: match in one directory
csvs = list(tmp.glob("data/*.csv"))
print("CSVs:", [f.name for f in csvs])
# rglob: recursive glob
all_files = list(tmp.rglob("*"))
print("All files:")
for f in sorted(all_files):
print(" ", f.relative_to(tmp))
# Filter only files (not directories)
only_files = [f for f in tmp.rglob("*") if f.is_file()]
print("File count:", len(only_files))
# Cleanup
import shutil
shutil.rmtree(tmp)
print("Temp dir removed")import tempfile, pathlib, shutil
tmp = pathlib.Path(tempfile.mkdtemp())
# Write and read text
(tmp / "hello.txt").write_text("Hello, World!")
content = (tmp / "hello.txt").read_text()
print("read_text:", content)
# Write and read bytes
(tmp / "data.bin").write_bytes(b"\x00\x01\x02\x03")
raw = (tmp / "data.bin").read_bytes()
print("read_bytes:", raw.hex())
# Open with context manager for large files
log = tmp / "log.txt"
with log.open("w") as f:
for i in range(5):
f.write(f"line {i}\n")
with log.open() as f:
for line in f:
print(" ", line.rstrip())
# stat: file metadata
s = log.stat()
print(f"Size: {s.st_size} bytes")
# mkdir, rename, unlink, shutil operations
(tmp / "subdir").mkdir(parents=True, exist_ok=True)
shutil.copy(log, tmp / "subdir" / "log_copy.txt")
print("Copied:", list((tmp / "subdir").iterdir()))
shutil.rmtree(tmp)
print("Done")import tempfile, pathlib, shutil
# Setup demo files
src = pathlib.Path(tempfile.mkdtemp())
for name in ["sales.csv", "costs.csv", "model.pkl", "report.pdf",
"config.json", "weights.pkl", "notes.txt"]:
(src / name).write_text(f"content of {name}")
print("Input files:", [f.name for f in sorted(src.iterdir())])
# Classification map
TYPE_MAP = {
".csv": "data",
".pkl": "models",
".json": "config",
".pdf": "reports",
".txt": "misc",
}
moved = []
for file in src.iterdir():
if not file.is_file():
continue
category = TYPE_MAP.get(file.suffix, "other")
dest_dir = src / category
dest_dir.mkdir(exist_ok=True)
dest = dest_dir / file.name
shutil.move(str(file), dest)
moved.append(f"{file.name} -> {category}/")
for m in moved:
print(" ", m)
# Show final structure
for subdir in sorted(src.iterdir()):
if subdir.is_dir():
print(f" {subdir.name}/:", [f.name for f in subdir.iterdir()])
shutil.rmtree(src)import pathlib, shutil, tempfile
from datetime import datetime, timedelta
def archive_logs(log_dir, archive_dir, days_old=7):
log_dir = pathlib.Path(log_dir)
archive_dir = pathlib.Path(archive_dir)
cutoff = datetime.now() - timedelta(days=days_old)
moved = []
for log_file in log_dir.glob("*.log"):
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff:
# TODO: create archive_dir/YYYY-MM/ folder
# TODO: move log_file there
# TODO: append (log_file.name, dest) to moved
pass
return moved
# Demo setup
import os, time
tmp_logs = pathlib.Path(tempfile.mkdtemp())
tmp_archive = pathlib.Path(tempfile.mkdtemp())
# Create fake old log files
for i in range(3):
f = tmp_logs / f"app_{i}.log"
f.write_text(f"log content {i}")
# Make it 10 days old
old_time = time.time() - 10 * 86400
os.utime(f, (old_time, old_time))
(tmp_logs / "recent.log").write_text("recent") # should NOT be archived
result = archive_logs(tmp_logs, tmp_archive, days_old=7)
print("Archived:", result)
shutil.rmtree(tmp_logs); shutil.rmtree(tmp_archive)
Master Python's string formatting mini-language: f-strings, format(), format spec DSL, textwrap, and Template strings for safe user-controlled formatting.
# Format spec: [[fill]align][sign][z][#][0][width][grouping][.precision][type]
pi = 3.14159265358979
# Width, precision, type
print(f"{pi:.2f}") # 3.14
print(f"{pi:10.4f}") # right-aligned in width 10
print(f"{pi:<10.4f}|") # left-aligned
print(f"{pi:^10.4f}|") # center-aligned
print(f"{pi:+.3f}") # force + sign
# Integer formatting
n = 1_234_567
print(f"{n:,}") # 1,234,567
print(f"{n:_}") # 1_234_567
print(f"{n:>15,}") # right-aligned width 15
print(f"{255:#x}") # 0xff hex with prefix
print(f"{255:08b}") # 11111111 binary, zero-padded
# Percentage
print(f"{0.857:.1%}") # 85.7%
# Datetime in f-string
from datetime import datetime
now = datetime(2024, 3, 15, 9, 5, 7)
print(f"{now:%Y-%m-%d %H:%M:%S}") # 2024-03-15 09:05:07
print(f"{now:%B %d, %Y}") # March 15, 2024
# Expression in f-string
data = [1, 2, 3, 4, 5]
print(f"Mean: {sum(data)/len(data):.2f}, Max: {max(data)}")
# Self-documenting expressions (Python 3.8+)
x = 42
print(f"{x=}") # x=42import textwrap
from string import Template
# textwrap.wrap / fill: wrap long text
long_text = ("Python is a high-level, interpreted, general-purpose programming language. "
"Its design philosophy emphasizes code readability with the use of significant indentation.")
wrapped = textwrap.fill(long_text, width=50)
print(wrapped)
print()
# Dedent: remove common leading whitespace (useful after triple-quote strings)
indented = '''
def foo():
return 42
'''
print(repr(textwrap.dedent(indented).strip()))
# Template: safe for user-provided format strings (no code execution risk)
tmpl = Template("Hello $name, your balance is $$${balance:.2f}")
print(tmpl.substitute(name="Alice", balance=1234.56))
# safe_substitute: does not raise for missing keys
tmpl2 = Template("Dear $name, ref: $ref_id")
print(tmpl2.safe_substitute(name="Bob")) # $ref_id stays
# str methods useful for formatting
cols = ["id", "name", "price", "qty"]
print(" | ".join(c.ljust(10) for c in cols))
print("-" * 45)
row = [1, "apple", 1.20, 50]
print(" | ".join(str(v).ljust(10) for v in row))# format() with the mini-language directly
print(format(3.14159, ".2f"))
print(format(1234567, ","))
print(format("hello", ">20"))
# Building a text table
headers = ["Product", "Qty", "Price", "Total"]
rows = [
("Apple", 50, 1.20, 60.00),
("Banana", 200, 0.50, 100.00),
("Cherry", 30, 2.00, 60.00),
("Durian", 5, 8.75, 43.75),
]
# Column widths
w = [12, 6, 8, 10]
fmt_h = " ".join(f"{h:>{ww}}" for h, ww in zip(headers, w))
sep = " ".join("-"*ww for ww in w)
print(fmt_h)
print(sep)
for row in rows:
vals = [f"{row[0]:<{w[0]}}", f"{row[1]:>{w[1]}",
f"{row[2]:>{w[2]}.2f}", f"{row[3]:>{w[3]}.2f}"]
print(" ".join(vals))
total = sum(r[3] for r in rows)
print(sep)
print(f"{'TOTAL':>{sum(w)+6}}: {total:.2f}")from datetime import date
import textwrap
def format_report(title, data, width=60):
border = "=" * width
today = date.today().strftime("%B %d, %Y")
lines = [border, f" {title}".center(width), f" Generated: {today}".center(width), border, ""]
# Summary stats
totals = [r["revenue"] for r in data]
lines += [
f" {'Region':<15} {'Revenue':>12} {'Units':>8} {'Avg/Unit':>10}",
" " + "-" * (width - 2),
]
for r in data:
avg = r["revenue"] / r["units"] if r["units"] else 0
lines.append(
f" {r['region']:<15} ${r['revenue']:>11,.0f} {r['units']:>8,} ${avg:>9.2f}"
)
lines += [" " + "-" * (width - 2),
f" {'TOTAL':<15} ${sum(totals):>11,.0f}",
"", border]
return "\n".join(lines)
data = [
{"region": "North", "revenue": 1_450_000, "units": 9_800},
{"region": "South", "revenue": 980_000, "units": 7_200},
{"region": "East", "revenue": 2_100_000, "units": 14_500},
{"region": "West", "revenue": 1_750_000, "units": 11_000},
]
print(format_report("Q1 2024 Sales Report", data))from datetime import date
def format_invoice(company, items, tax_rate=0.08):
today = date.today()
subtotal = sum(qty * price for _, qty, price in items)
tax = subtotal * tax_rate
total = subtotal + tax
w = 55
print("=" * w)
print(f" {company}".center(w))
print(f" Invoice Date: {today}".center(w))
print("=" * w)
print(f" {'Description':<22} {'Qty':>4} {'Unit':>8} {'Total':>10}")
print(" " + "-" * (w-2))
for desc, qty, price in items:
# TODO: print each line with f-string formatting
pass
print(" " + "-" * (w-2))
# TODO: print subtotal, tax, and grand total rows
print("=" * w)
format_invoice("Acme Corp", [
("Python Training", 1, 2500.00),
("Jupyter Setup", 3, 150.00),
("Cloud Credits", 10, 49.99),
], tax_rate=0.09)
Profile before optimizing. Use timeit for micro-benchmarks, functools.cache for memoization, __slots__ for memory, and algorithmic improvements for the biggest wins.
import timeit
# Compare list comprehension vs map() vs for-loop
setup = "data = list(range(10_000))"
t_comp = timeit.timeit("[x**2 for x in data]", setup=setup, number=1000)
t_map = timeit.timeit("list(map(lambda x: x**2, data))", setup=setup, number=1000)
t_loop = timeit.timeit('''
result = []
for x in data:
result.append(x**2)
''', setup=setup, number=1000)
print(f"List comprehension: {t_comp:.3f}s")
print(f"map(lambda): {t_map:.3f}s")
print(f"for loop + append: {t_loop:.3f}s")
# Compare string joining methods
setup2 = "parts = ['a'] * 1000"
t_join = timeit.timeit("''.join(parts)", setup=setup2, number=5000)
t_plus = timeit.timeit("s=''
for p in parts: s += p", setup=setup2, number=5000)
print(f"join(): {t_join:.4f}s")
print(f"+=: {t_plus:.4f}s")
print(f"join speedup: {t_plus/t_join:.1f}x")import functools, time
# lru_cache: memoize with a max size limit
@functools.lru_cache(maxsize=128)
def fib_lru(n):
if n <= 1: return n
return fib_lru(n-1) + fib_lru(n-2)
# functools.cache: unlimited cache (Python 3.9+)
@functools.cache
def fib_cache(n):
if n <= 1: return n
return fib_cache(n-1) + fib_cache(n-2)
t0 = time.perf_counter()
result = fib_lru(40)
print(f"fib(40) = {result}, lru_cache time: {(time.perf_counter()-t0)*1000:.2f}ms")
print("Cache info:", fib_lru.cache_info())
# cached_property: compute once, then return stored value
class DataStats:
def __init__(self, data):
self._data = data
@functools.cached_property
def mean(self):
print(" (computing mean...)")
return sum(self._data) / len(self._data)
@functools.cached_property
def std(self):
print(" (computing std...)")
m = self.mean
return (sum((x-m)**2 for x in self._data) / len(self._data)) ** 0.5
ds = DataStats(list(range(1000)))
print("mean:", ds.mean)
print("mean:", ds.mean) # no recompute
print("std: ", ds.std)import timeit, collections
# O(n) lookup with set vs list
data_list = list(range(10_000))
data_set = set(data_list)
t_list = timeit.timeit("9999 in data_list", globals=locals(), number=100_000)
t_set = timeit.timeit("9999 in data_set", globals=locals(), number=100_000)
print(f"list 'in': {t_list:.4f}s")
print(f"set 'in': {t_set:.4f}s")
print(f"Set speedup: {t_list/t_set:.0f}x")
# Counter vs manual counting
words = "the quick brown fox jumps over the lazy dog the fox".split()
t_manual = timeit.timeit('''
counts = {}
for w in words:
counts[w] = counts.get(w, 0) + 1
''', globals={"words": words}, number=50_000)
t_counter = timeit.timeit("collections.Counter(words)",
globals={"collections": collections, "words": words},
number=50_000)
print(f"Manual count: {t_manual:.4f}s")
print(f"Counter: {t_counter:.4f}s")
# Use sorted() key function instead of cmp
records = [{"name": n, "score": s} for n, s in [("Bob", 72), ("Alice", 95), ("Charlie", 88)]]
sorted_records = sorted(records, key=lambda r: r["score"], reverse=True)
for r in sorted_records:
print(f" {r['name']:10s}: {r['score']}")import functools, collections, statistics
class FastAggregator:
def __init__(self, records):
self._records = records
self._by_key = None # lazy
def _ensure_index(self):
if self._by_key is None:
self._by_key = collections.defaultdict(list)
for r in self._records:
self._by_key[r["group"]].append(r["value"])
@functools.cached_property
def group_means(self):
self._ensure_index()
return {k: statistics.mean(v) for k, v in self._by_key.items()}
@functools.cached_property
def group_counts(self):
self._ensure_index()
return {k: len(v) for k, v in self._by_key.items()}
@functools.cached_property
def overall_mean(self):
vals = [r["value"] for r in self._records]
return statistics.mean(vals)
import random
random.seed(42)
records = [{"group": f"G{i%5}", "value": random.gauss(50, 10)} for i in range(10_000)]
agg = FastAggregator(records)
print("Group means:", {k: f"{v:.2f}" for k, v in agg.group_means.items()})
print("Group counts:", agg.group_counts)
print("Overall mean:", f"{agg.overall_mean:.2f}")
print("(Accessing again β no recompute):", f"{agg.overall_mean:.2f}")import timeit, collections, random
random.seed(42)
data = [random.randint(0, 500) for _ in range(10_000)]
def brute_force(lst):
dups = set()
for i in range(len(lst)):
for j in range(i+1, len(lst)):
if lst[i] == lst[j]:
dups.add(lst[i])
return list(dups)
def sort_based(lst):
# TODO: sort, then check adjacent equal elements
pass
def hash_based(lst):
# TODO: use collections.Counter, return keys with count > 1
pass
# Only benchmark sort_based and hash_based (brute force is too slow)
for name, fn in [("sort_based", sort_based), ("hash_based", hash_based)]:
t = timeit.timeit(lambda: fn(data), number=100)
print(f"{name}: {t:.4f}s, found {len(fn(data))} duplicates")
Virtual environments isolate project dependencies. pip manages packages, and importlib enables dynamic imports at runtime β essential for building extensible systems.
# These commands are run in the terminal (not runnable as Python code)
# They are shown here as strings for educational purposes
venv_commands = '''
# Create a virtual environment
python -m venv .venv
# Activate (macOS/Linux)
source .venv/bin/activate
# Activate (Windows)
.venv\\Scripts\\activate
# Install packages
pip install requests pandas scikit-learn
# Install from requirements file
pip install -r requirements.txt
# Freeze current environment
pip freeze > requirements.txt
# Upgrade a package
pip install --upgrade numpy
# Show installed packages
pip list
pip show numpy
# Deactivate
deactivate
'''
# requirements.txt format:
req_txt = '''
# requirements.txt
numpy>=1.24,<2.0
pandas==2.1.0
scikit-learn>=1.3
requests>=2.31
matplotlib>=3.7; python_version >= "3.9"
'''
# pyproject.toml format (modern, preferred):
pyproject_toml = '''
[project]
name = "my-ml-project"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"numpy>=1.24",
"pandas>=2.1",
"scikit-learn>=1.3",
]
[project.optional-dependencies]
dev = ["pytest", "black", "mypy"]
'''
print("Common venv workflow:")
for cmd in ["python -m venv .venv", "source .venv/bin/activate", "pip install -r requirements.txt"]:
print(f" $ {cmd}")import importlib, sys
# Basic dynamic import
math = importlib.import_module("math")
print("sqrt(16):", math.sqrt(16))
# Import a submodule
pprint = importlib.import_module("pprint")
pprint.pprint({"a": 1, "b": [2, 3]})
# Conditional import: use fast version if available
def import_or_fallback(preferred, fallback):
try:
return importlib.import_module(preferred)
except ImportError:
print(f" {preferred} not found, using {fallback}")
return importlib.import_module(fallback)
json_mod = import_or_fallback("ujson", "json") # ujson is faster if installed
print("json module:", json_mod.__name__)
# importlib.util: check if a module is available without importing it
import importlib.util
for pkg in ["numpy", "pandas", "flask", "fastapi", "nonexistent_pkg"]:
spec = importlib.util.find_spec(pkg)
status = "installed" if spec else "NOT installed"
print(f" {pkg:<20} {status}")import tempfile, pathlib, sys, importlib
# Create a minimal package structure in a temp directory
tmp = pathlib.Path(tempfile.mkdtemp())
pkg = tmp / "mypackage"
pkg.mkdir()
# Package init
(pkg / "__init__.py").write_text('''
__version__ = "1.0.0"
from mypackage.utils import add, multiply
''')
(pkg / "utils.py").write_text('''
def add(a, b):
return a + b
def multiply(a, b):
return a * b
''')
(pkg / "models.py").write_text('''
class LinearModel:
def __init__(self, slope=1, intercept=0):
self.slope = slope
self.intercept = intercept
def predict(self, x):
return self.slope * x + self.intercept
''')
# Add tmp to path so Python can find our package
sys.path.insert(0, str(tmp))
# Import our package
mypackage = importlib.import_module("mypackage")
print("Version:", mypackage.__version__)
print("add:", mypackage.add(3, 4))
print("multiply:", mypackage.multiply(3, 4))
models = importlib.import_module("mypackage.models")
m = models.LinearModel(slope=2.5, intercept=-1)
print("predict(10):", m.predict(10))
sys.path.pop(0)
import shutil; shutil.rmtree(tmp)import importlib, importlib.util, pathlib, sys, tempfile, shutil
# Create plugin directory with two demo plugins
tmp = pathlib.Path(tempfile.mkdtemp())
plugin_dir = tmp / "plugins"
plugin_dir.mkdir()
(plugin_dir / "plugin_stats.py").write_text('''
def run(data):
n = len(data)
mean = sum(data) / n
return {"plugin": "stats", "count": n, "mean": round(mean, 2)}
''')
(plugin_dir / "plugin_filter.py").write_text('''
def run(data):
filtered = [x for x in data if x > 0]
return {"plugin": "filter", "kept": len(filtered), "dropped": len(data)-len(filtered)}
''')
def load_plugins(plugin_dir):
plugins = {}
for path in sorted(pathlib.Path(plugin_dir).glob("plugin_*.py")):
name = path.stem
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
plugins[name] = mod
print(f" Loaded: {name}")
return plugins
sys.path.insert(0, str(plugin_dir))
plugins = load_plugins(plugin_dir)
data = [3, -1, 7, 0, -2, 5, 9]
for name, plugin in plugins.items():
print(f" {name}: {plugin.run(data)}")
sys.path.pop(0)
shutil.rmtree(tmp)import importlib.util
def check_dependencies(requirements):
installed = []
missing = []
for pkg in requirements:
# Note: package names may differ from import names (e.g. scikit-learn -> sklearn)
import_name = pkg.replace("-", "_").split(">=")[0].split("==")[0].strip()
spec = importlib.util.find_spec(import_name)
if spec:
installed.append(pkg)
else:
missing.append(pkg)
return {"installed": installed, "missing": missing}
def install_missing(missing):
# TODO: print pip install command for each missing package
pass
packages = ["numpy", "pandas", "requests", "flask", "nonexistent_lib", "anotherMissingPkg"]
result = check_dependencies(packages)
print("Installed:", result["installed"])
print("Missing:", result["missing"])
install_missing(result["missing"])
Python's runtime lets you inspect and modify objects, classes, and functions dynamically. Use inspect, dir(), getattr(), and metaclasses for powerful abstractions.
import inspect
class Rectangle:
width: float
height: float
def __init__(self, w, h):
self.width = w
self.height = h
def area(self):
return self.width * self.height
def perimeter(self):
return 2 * (self.width + self.height)
r = Rectangle(4, 6)
# dir() lists all attributes and methods
attrs = [a for a in dir(r) if not a.startswith("_")]
print("Public attrs:", attrs)
# type() and isinstance()
print("type:", type(r).__name__)
print("isinstance(r, Rectangle):", isinstance(r, Rectangle))
print("isinstance(r, object): ", isinstance(r, object))
# getattr / setattr / hasattr / delattr
for method in ["area", "perimeter", "nonexistent"]:
if hasattr(r, method):
fn = getattr(r, method)
print(f"{method}(): {fn()}")
else:
print(f"{method}: not found")
# inspect module
print("Source file:", inspect.getfile(Rectangle))
sig = inspect.signature(Rectangle.__init__)
print("Signature:", sig)
print("Parameters:", list(sig.parameters.keys()))class Animal:
kingdom = "Animalia"
def __init__(self, name, species):
self.name = name
self.species = species
def speak(self):
return "..."
class Dog(Animal):
def __init__(self, name):
super().__init__(name, "Canis lupus familiaris")
def speak(self):
return "Woof!"
class GoldenRetriever(Dog):
breed = "Golden Retriever"
g = GoldenRetriever("Buddy")
# Instance __dict__: instance attributes only
print("Instance __dict__:", g.__dict__)
# Class __dict__: class attributes only
print("Class __dict__ keys:", list(GoldenRetriever.__dict__.keys()))
# vars(): same as __dict__ for objects
print("vars(g):", vars(g))
# Method Resolution Order (MRO)
print("MRO:", [c.__name__ for c in GoldenRetriever.__mro__])
# Class attributes vs instance attributes
print("Class attr 'kingdom':", g.kingdom) # inherited from Animal
g.kingdom = "override" # creates instance attr
print("Instance attr 'kingdom':", g.__dict__["kingdom"])
print("Class still has:", Animal.kingdom)# Metaclass: controls how classes are created
class SingletonMeta(type):
# Ensure only one instance per class
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class AppConfig(metaclass=SingletonMeta):
def __init__(self):
self.debug = False
self.host = "localhost"
c1 = AppConfig()
c2 = AppConfig()
c1.debug = True
print("Same object:", c1 is c2) # True
print("c2.debug:", c2.debug) # True β same instance!
# __init_subclass__: called when a subclass is defined
class PluginBase:
_registry = {}
def __init_subclass__(cls, plugin_name=None, **kwargs):
super().__init_subclass__(**kwargs)
name = plugin_name or cls.__name__.lower()
PluginBase._registry[name] = cls
print(f"Registered plugin: {name!r}")
class CSVPlugin(PluginBase, plugin_name="csv"):
def run(self): return "csv output"
class JSONPlugin(PluginBase, plugin_name="json"):
def run(self): return "json output"
print("Registry:", list(PluginBase._registry.keys()))
plugin = PluginBase._registry["csv"]()
print("CSV plugin run:", plugin.run())import inspect
class APIRouter:
def __init__(self):
self.routes = {}
def route(self, path, method="GET"):
def decorator(func):
sig = inspect.signature(func)
doc = inspect.getdoc(func) or "No description"
params = {
name: {"annotation": str(p.annotation.__name__ if p.annotation is not inspect.Parameter.empty else "any"),
"default": str(p.default) if p.default is not inspect.Parameter.empty else "required"}
for name, p in list(sig.parameters.items())[1:] # skip 'self'
}
self.routes[f"{method} {path}"] = {
"handler": func.__name__,
"doc": doc,
"params": params,
}
return func
return decorator
def docs(self):
for endpoint, info in self.routes.items():
print(f"\n{endpoint} -> {info['handler']}")
print(f" {info['doc']}")
for p, meta in info["params"].items():
print(f" - {p}: {meta['annotation']} (default={meta['default']})")
router = APIRouter()
@router.route("/users", "GET")
def list_users(limit: int = 20, offset: int = 0):
# Return paginated list of users.
pass
@router.route("/users/{id}", "GET")
def get_user(user_id: int, include_meta: bool = False):
# Fetch a single user by ID.
pass
router.docs()import inspect
def inspect_class(cls):
result = {
"name": cls.__name__,
"bases": [b.__name__ for b in cls.__bases__],
"mro": [c.__name__ for c in cls.__mro__],
"class_attrs": {},
"methods": {},
}
for name, val in cls.__dict__.items():
if name.startswith("_"):
continue
if callable(val):
sig = inspect.signature(val)
result["methods"][name] = str(sig)
else:
result["class_attrs"][name] = repr(val)
return result
class Vehicle:
wheels = 4
fuel = "gasoline"
def __init__(self, brand, speed):
self.brand = brand
self.speed = speed
def drive(self, distance: float) -> float:
return distance / self.speed
class ElectricCar(Vehicle):
fuel = "electric"
def charge(self, hours: int) -> str:
return f"Charging for {hours}h"
for cls in [Vehicle, ElectricCar]:
info = inspect_class(cls)
print(f"\n{info['name']}:")
print(f" bases: {info['bases']}")
print(f" attrs: {info['class_attrs']}")
print(f" methods: {info['methods']}")
Python's typing module enables static analysis with TypeVar, Generic, Protocol, overload, and Literal. Well-typed code is self-documenting and catches bugs before runtime.
from typing import TypeVar, Generic, Iterable, Optional
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
# Generic function: type-safe identity
def first(items: list[T]) -> Optional[T]:
return items[0] if items else None
print(first([1, 2, 3])) # int
print(first(["a", "b"])) # str
print(first([])) # None
# Generic class: type-safe stack
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
if not self._items:
raise IndexError("pop from empty stack")
return self._items.pop()
def peek(self) -> Optional[T]:
return self._items[-1] if self._items else None
def __len__(self) -> int:
return len(self._items)
int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
int_stack.push(3)
print("peek:", int_stack.peek()) # 3
print("pop: ", int_stack.pop()) # 3
print("len: ", len(int_stack)) # 2from typing import Union, Optional, Literal, Final
import sys
# Union: accepts multiple types (Python 3.10+: int | str)
def process(value: Union[int, str, float]) -> str:
return f"Got {type(value).__name__}: {value}"
print(process(42))
print(process("hello"))
print(process(3.14))
# Optional[T] is shorthand for Union[T, None]
def find_user(user_id: int) -> Optional[dict]:
db = {1: {"name": "Alice"}, 2: {"name": "Bob"}}
return db.get(user_id)
user = find_user(1)
if user:
print("Found:", user["name"])
# Literal: restrict to specific values
Mode = Literal["read", "write", "append"]
def open_file(path: str, mode: Mode) -> str:
return f"Opening {path} in {mode} mode"
print(open_file("data.csv", "read"))
# Final: constant that cannot be reassigned
MAX_RETRIES: Final = 3
API_URL: Final[str] = "https://api.example.com"
# TypeAlias (Python 3.10+)
if sys.version_info >= (3, 10):
from typing import TypeAlias
Vector: TypeAlias = list[float]
Matrix: TypeAlias = list[list[float]]
print(f"MAX_RETRIES: {MAX_RETRIES}")from typing import overload, Union
# @overload allows multiple type signatures for the same function
# Only the implementation signature uses the body
@overload
def parse(value: str) -> int: ...
@overload
def parse(value: bytes) -> float: ...
@overload
def parse(value: int) -> str: ...
def parse(value: Union[str, bytes, int]) -> Union[int, float, str]:
if isinstance(value, str):
return int(value)
elif isinstance(value, bytes):
return float(value.decode())
else:
return str(value)
print(parse("42")) # int
print(parse(b"3.14")) # float
print(parse(100)) # str
# TypedDict: dict with typed keys
from typing import TypedDict, NotRequired
class UserRecord(TypedDict):
id: int
name: str
email: str
age: NotRequired[int] # optional key
def create_user(data: UserRecord) -> str:
return f"User {data['name']} ({data['email']})"
user: UserRecord = {"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30}
print(create_user(user))
user2: UserRecord = {"id": 2, "name": "Bob", "email": "bob@example.com"}
print(create_user(user2)) # age is optionalfrom typing import TypedDict, Generic, TypeVar, Optional, Callable
from dataclasses import dataclass, field
T = TypeVar("T")
R = TypeVar("R")
class RawRecord(TypedDict):
id: int
name: str
value: float
valid: bool
class CleanRecord(TypedDict):
id: int
name: str
value: float
@dataclass
class Pipeline(Generic[T, R]):
steps: list[Callable[[T], R]] = field(default_factory=list)
def add_step(self, fn: Callable) -> "Pipeline":
self.steps.append(fn)
return self
def run(self, data: list[T]) -> list:
result = data
for step in self.steps:
result = [step(r) for r in result if r is not None]
return result
def filter_valid(r: RawRecord) -> Optional[RawRecord]:
return r if r["valid"] and r["value"] > 0 else None
def normalize(r: RawRecord) -> CleanRecord:
return {"id": r["id"], "name": r["name"].strip().title(), "value": round(r["value"], 2)}
records: list[RawRecord] = [
{"id": 1, "name": "alice smith", "value": 129.5, "valid": True},
{"id": 2, "name": "BOB JONES", "value": -5.0, "valid": False},
{"id": 3, "name": " carol lee ", "value": 89.99, "valid": True},
]
pipeline: Pipeline[RawRecord, CleanRecord] = Pipeline()
pipeline.add_step(filter_valid).add_step(normalize)
result = pipeline.run(records)
for r in result:
print(f" {r}")from typing import Generic, TypeVar, Callable, Optional
from dataclasses import dataclass
T = TypeVar("T")
E = TypeVar("E")
U = TypeVar("U")
@dataclass
class Result(Generic[T, E]):
_value: Optional[T] = None
_error: Optional[E] = None
@classmethod
def ok(cls, value: T) -> "Result[T, E]":
return cls(_value=value)
@classmethod
def err(cls, error: E) -> "Result[T, E]":
return cls(_error=error)
def is_ok(self) -> bool:
return self._error is None
def is_err(self) -> bool:
return self._error is not None
def unwrap(self) -> T:
if self.is_err():
raise ValueError(f"Called unwrap on Err: {self._error}")
return self._value
def unwrap_or(self, default: T) -> T:
# TODO: return value if ok, else default
pass
def map(self, fn: Callable[[T], U]) -> "Result[U, E]":
# TODO: if ok, return Result.ok(fn(self._value)), else return self
pass
# Tests
r1 = Result.ok(42)
r2 = Result.err("not found")
print(r1.is_ok(), r1.unwrap())
print(r2.is_err(), r2.unwrap_or(-1))
print(r1.map(lambda x: x * 2).unwrap())
try: r2.unwrap()
except ValueError as e: print("Caught:", e)