ch10s5_AsynchronousProgrammingWithAsyncio

Asynchronous programming enables your Python programs to handle **many tasks at once** without blocking.

Chapter 10: Advanced Topics — Asynchronous Programming with Asyncio

⚡ Asynchronous Programming with Asyncio — Efficient Concurrency in Python

Asynchronous programming enables your Python programs to handle many tasks at once without blocking.
Instead of waiting for one task to finish (like downloading a webpage or reading a file), asyncio allows your program to pause, perform other work, and come back when the result is ready.


🧩 1. Concurrency Models in Python

ModelDescriptionBest For
ThreadsMultiple tasks in shared memory, managed by the OSI/O-bound (network, disk)
ProcessesIndependent memory and executionCPU-bound (computation)
AsyncioSingle-threaded, event-driven concurrencyI/O-bound (many small tasks)

⚙️ Asyncio shines when you have many slow I/O operations — like API calls, database queries, or scraping hundreds of pages.


🧠 2. Understanding Asyncio Core Concepts

TermDescription
CoroutineA function defined with async def that can pause (await)
AwaitTells Python to wait for a coroutine or async task to complete
Event LoopThe core scheduler that runs coroutines
TaskA wrapper that schedules and tracks a coroutine

🧱 3. Basic Asyncio Example

import asyncio

async def greet():
    print("Hello...")
    await asyncio.sleep(1)
    print("World!")

# Modern way to run coroutines
asyncio.run(greet())

How it works:


⏱️ 4. Running Multiple Coroutines Concurrently

You can run multiple tasks at once using asyncio.create_task().

import asyncio

async def foo():
    await asyncio.sleep(2)
    print("Foo finished!")

async def bar():
    await asyncio.sleep(1)
    print("Bar finished!")

async def main():
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    print("Tasks started...")
    await task1
    await task2

asyncio.run(main())

Output:

Tasks started...
Bar finished!
Foo finished!

⚡ Notice that both coroutines run “concurrently” — the shorter task finishes first.


🔄 5. Running Many Tasks Together with asyncio.gather()

gather() runs multiple coroutines concurrently and returns all their results.

import asyncio

async def countdown(name, delay):
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished")
    return f"{name} done"

async def main():
    results = await asyncio.gather(
        countdown("Task 1", 3),
        countdown("Task 2", 1),
        countdown("Task 3", 2),
    )
    print("Results:", results)

asyncio.run(main())

🌍 6. Real-World Example — Asynchronous Web Requests

Using aiohttp, you can fetch many URLs in parallel.

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        content = await response.text()
        print(f"Fetched {url} ({len(content)} bytes)")
        return content

async def main():
    urls = [
        "https://example.com",
        "https://python.org",
        "https://github.com",
    ]
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(fetch_url(session, u) for u in urls))

asyncio.run(main())

⚙️ aiohttp enables thousands of concurrent requests efficiently — ideal for APIs, crawlers, and async web clients.


🧮 7. Using asyncio.as_completed() for Early Results

Instead of waiting for all tasks to finish, process results as they arrive.

import asyncio, aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            await asyncio.sleep(0.5)
            return url, r.status

async def main():
    urls = ["https://example.com", "https://python.org", "https://github.com"]
    tasks = [asyncio.create_task(fetch(u)) for u in urls]

    for coro in asyncio.as_completed(tasks):
        url, status = await coro
        print(f"{url} -> {status}")

asyncio.run(main())

🚨 8. Error Handling in Async Code

Use try/except inside coroutines or wrap them when using gather().

import asyncio

async def risky_divide(x, y):
    try:
        await asyncio.sleep(1)
        return x / y
    except ZeroDivisionError:
        print("Division by zero!")
        return None

async def main():
    results = await asyncio.gather(
        risky_divide(10, 2),
        risky_divide(10, 0),
    )
    print("Results:", results)

asyncio.run(main())

🧩 9. Task Cancellation and Timeouts

Use asyncio.wait_for() to enforce time limits.

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "Done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

🏭 10. Producer–Consumer Pattern with asyncio.Queue

import asyncio
import random

async def producer(queue):
    for i in range(5):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    prod = asyncio.create_task(producer(queue))
    cons = asyncio.create_task(consumer(queue))
    await prod
    await queue.join()
    cons.cancel()

asyncio.run(main())

🧠 This pattern is common in streaming pipelines, event systems, and task schedulers.


⚡ 11. Async File I/O Example

While the built-in open() is synchronous, you can use aiofiles for non-blocking file operations.

pip install aiofiles
import aiofiles, asyncio

async def write_log():
    async with aiofiles.open("log.txt", "w") as f:
        await f.write("Async I/O in action!\n")

asyncio.run(write_log())

⚖️ 12. Comparison — Asyncio vs Threads vs Processes

FeatureAsyncioThreadingMultiprocessing
ModelSingle-threaded, event-drivenMulti-threadedMulti-process
MemorySharedSharedIsolated
Best ForMany small I/O tasksBlocking I/OCPU-heavy work
OverheadVery lowModerateHigh
ExampleWeb scrapingFile downloadsImage processing

🧭 13. Best Practices for Asyncio

✅ Always use asyncio.run() instead of manual loops.
✅ Never call blocking functions (time.sleep, requests.get) inside async code — use await asyncio.sleep() and aiohttp.
✅ Use asyncio.create_task() for concurrency, not for sequential execution.
✅ Wrap critical async calls in try/except.
✅ Use timeouts to avoid hanging tasks.
✅ Use gather() for batch operations and as_completed() for streaming.
✅ Consider frameworks like FastAPI, aiohttp, and Quart for async web apps.


🚀 14. Real-World Example — Concurrent API Requests with Timing

import aiohttp, asyncio, time

async def fetch(session, url):
    async with session.get(url) as resp:
        await asyncio.sleep(0.2)
        return url, resp.status

async def main():
    urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 4)]
    start = time.perf_counter()

    async with aiohttp.ClientSession() as s:
        results = await asyncio.gather(*(fetch(s, u) for u in urls))

    end = time.perf_counter()
    print("Results:", results)
    print(f"Completed in {end - start:.2f}s")

asyncio.run(main())

🌟 Instead of taking 6 seconds (sequential), this finishes in ~2 seconds — true async concurrency!


🧠 Summary

ConceptDescriptionExample
CoroutineAsync function that can pause/resumeasync def fetch()
AwaitWaits for an async operation to completeawait asyncio.sleep(1)
Event LoopScheduler running async tasksasyncio.run(main())
GatherRun many coroutines concurrentlyawait asyncio.gather(...)
aiohttpAsync web clientFetch thousands of URLs efficiently

Asyncio transforms how Python handles concurrency — unlocking high-performance, non-blocking systems perfect for modern APIs, scraping, and automation.