ch10s5_AsynchronousProgrammingWithAsyncio
Asynchronous programming enables your Python programs to handle **many tasks at once** without blocking.
Chapter 10: Advanced Topics — Asynchronous Programming with Asyncio
⚡ Asynchronous Programming with Asyncio — Efficient Concurrency in Python
Asynchronous programming enables your Python programs to handle many tasks at once without blocking.
Instead of waiting for one task to finish (like downloading a webpage or reading a file), asyncio allows your program to pause, perform other work, and come back when the result is ready.
🧩 1. Concurrency Models in Python
| Model | Description | Best For |
|---|---|---|
| Threads | Multiple tasks in shared memory, managed by the OS | I/O-bound (network, disk) |
| Processes | Independent memory and execution | CPU-bound (computation) |
| Asyncio | Single-threaded, event-driven concurrency | I/O-bound (many small tasks) |
⚙️ Asyncio shines when you have many slow I/O operations — like API calls, database queries, or scraping hundreds of pages.
🧠 2. Understanding Asyncio Core Concepts
| Term | Description |
|---|---|
| Coroutine | A function defined with async def that can pause (await) |
| Await | Tells Python to wait for a coroutine or async task to complete |
| Event Loop | The core scheduler that runs coroutines |
| Task | A wrapper that schedules and tracks a coroutine |
🧱 3. Basic Asyncio Example
import asyncio
async def greet():
print("Hello...")
await asyncio.sleep(1)
print("World!")
# Modern way to run coroutines
asyncio.run(greet())
How it works:
async defdefines an asynchronous coroutine.awaitpauses execution without blocking the entire program.asyncio.run()starts and closes the event loop automatically.
⏱️ 4. Running Multiple Coroutines Concurrently
You can run multiple tasks at once using asyncio.create_task().
import asyncio
async def foo():
await asyncio.sleep(2)
print("Foo finished!")
async def bar():
await asyncio.sleep(1)
print("Bar finished!")
async def main():
task1 = asyncio.create_task(foo())
task2 = asyncio.create_task(bar())
print("Tasks started...")
await task1
await task2
asyncio.run(main())
Output:
Tasks started...
Bar finished!
Foo finished!
⚡ Notice that both coroutines run “concurrently” — the shorter task finishes first.
🔄 5. Running Many Tasks Together with asyncio.gather()
gather() runs multiple coroutines concurrently and returns all their results.
import asyncio
async def countdown(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished")
return f"{name} done"
async def main():
results = await asyncio.gather(
countdown("Task 1", 3),
countdown("Task 2", 1),
countdown("Task 3", 2),
)
print("Results:", results)
asyncio.run(main())
🌍 6. Real-World Example — Asynchronous Web Requests
Using aiohttp, you can fetch many URLs in parallel.
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
content = await response.text()
print(f"Fetched {url} ({len(content)} bytes)")
return content
async def main():
urls = [
"https://example.com",
"https://python.org",
"https://github.com",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*(fetch_url(session, u) for u in urls))
asyncio.run(main())
⚙️
aiohttpenables thousands of concurrent requests efficiently — ideal for APIs, crawlers, and async web clients.
🧮 7. Using asyncio.as_completed() for Early Results
Instead of waiting for all tasks to finish, process results as they arrive.
import asyncio, aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
await asyncio.sleep(0.5)
return url, r.status
async def main():
urls = ["https://example.com", "https://python.org", "https://github.com"]
tasks = [asyncio.create_task(fetch(u)) for u in urls]
for coro in asyncio.as_completed(tasks):
url, status = await coro
print(f"{url} -> {status}")
asyncio.run(main())
🚨 8. Error Handling in Async Code
Use try/except inside coroutines or wrap them when using gather().
import asyncio
async def risky_divide(x, y):
try:
await asyncio.sleep(1)
return x / y
except ZeroDivisionError:
print("Division by zero!")
return None
async def main():
results = await asyncio.gather(
risky_divide(10, 2),
risky_divide(10, 0),
)
print("Results:", results)
asyncio.run(main())
🧩 9. Task Cancellation and Timeouts
Use asyncio.wait_for() to enforce time limits.
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "Done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
🏭 10. Producer–Consumer Pattern with asyncio.Queue
import asyncio
import random
async def producer(queue):
for i in range(5):
item = random.randint(1, 100)
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
prod = asyncio.create_task(producer(queue))
cons = asyncio.create_task(consumer(queue))
await prod
await queue.join()
cons.cancel()
asyncio.run(main())
🧠 This pattern is common in streaming pipelines, event systems, and task schedulers.
⚡ 11. Async File I/O Example
While the built-in open() is synchronous, you can use aiofiles for non-blocking file operations.
pip install aiofiles
import aiofiles, asyncio
async def write_log():
async with aiofiles.open("log.txt", "w") as f:
await f.write("Async I/O in action!\n")
asyncio.run(write_log())
⚖️ 12. Comparison — Asyncio vs Threads vs Processes
| Feature | Asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Model | Single-threaded, event-driven | Multi-threaded | Multi-process |
| Memory | Shared | Shared | Isolated |
| Best For | Many small I/O tasks | Blocking I/O | CPU-heavy work |
| Overhead | Very low | Moderate | High |
| Example | Web scraping | File downloads | Image processing |
🧭 13. Best Practices for Asyncio
✅ Always use asyncio.run() instead of manual loops.
✅ Never call blocking functions (time.sleep, requests.get) inside async code — use await asyncio.sleep() and aiohttp.
✅ Use asyncio.create_task() for concurrency, not for sequential execution.
✅ Wrap critical async calls in try/except.
✅ Use timeouts to avoid hanging tasks.
✅ Use gather() for batch operations and as_completed() for streaming.
✅ Consider frameworks like FastAPI, aiohttp, and Quart for async web apps.
🚀 14. Real-World Example — Concurrent API Requests with Timing
import aiohttp, asyncio, time
async def fetch(session, url):
async with session.get(url) as resp:
await asyncio.sleep(0.2)
return url, resp.status
async def main():
urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 4)]
start = time.perf_counter()
async with aiohttp.ClientSession() as s:
results = await asyncio.gather(*(fetch(s, u) for u in urls))
end = time.perf_counter()
print("Results:", results)
print(f"Completed in {end - start:.2f}s")
asyncio.run(main())
🌟 Instead of taking 6 seconds (sequential), this finishes in ~2 seconds — true async concurrency!
🧠 Summary
| Concept | Description | Example |
|---|---|---|
| Coroutine | Async function that can pause/resume | async def fetch() |
| Await | Waits for an async operation to complete | await asyncio.sleep(1) |
| Event Loop | Scheduler running async tasks | asyncio.run(main()) |
| Gather | Run many coroutines concurrently | await asyncio.gather(...) |
| aiohttp | Async web client | Fetch thousands of URLs efficiently |
Asyncio transforms how Python handles concurrency — unlocking high-performance, non-blocking systems perfect for modern APIs, scraping, and automation.