2948 words
15 minutes
Python asyncio

I. Python asyncio — Principles & Core Mechanics#

Overview: asyncio is Python's asynchronous concurrency framework (异步并发框架) that uses an Event Loop (事件循环), Coroutines (协程), and non-blocking I/O (非阻塞 I/O) to efficiently handle many I/O-bound tasks within a single thread. The core principle: when a coroutine reaches an await, it yields control back to the event loop, which uses OS-level I/O multiplexing to resume the coroutine once the I/O operation is ready.

1. How asyncio Works — The Big Picture#

The three pillars of asyncio:

  • Event Loop (事件循环) — schedules and runs coroutines, monitors I/O events, resumes tasks when they are ready
  • Coroutines (协程) — define asynchronous tasks using async/await; can pause and resume execution
  • Non-blocking I/O (非阻塞 I/O) — allows the program to perform other work while waiting for I/O operations to complete
Note: The principle of asyncio is: use an event loop to schedule coroutines (使用事件循环调度协程). When a coroutine reaches await, it yields control (让出执行权) back to the event loop, which then uses non-blocking I/O and OS I/O multiplexing (I/O 多路复用) to resume the coroutine (恢复协程执行) once the I/O operation is ready.

2. Event Loop (事件循环)#

1) What the Event Loop Does#

The Event Loop is the core scheduler (核心调度器) of asyncio. It is responsible for:

  • Running coroutines and tasks
  • Monitoring I/O events (sockets, file descriptors, timers)
  • Resuming coroutines when their awaited operation completes

2) Starting the Event Loop#

The standard way is via asyncio.run():

import asyncio
async def main():
print("hello")
asyncio.run(main())

What happens here:

  • asyncio.run() creates and starts the event loop
  • The event loop executes the main() coroutine to completion
  • The event loop is closed when the coroutine returns

3. Coroutines (协程)#

1) What Is a Coroutine?#

A Coroutine is a special function that can pause and resume execution. When it encounters an I/O wait, it pauses and returns control to the event loop, allowing other tasks to run in the meantime.

2) How to Define and Use Coroutines#

Define with async def, await with await:

import asyncio
async def task():
await asyncio.sleep(1)
print("done")
asyncio.run(task())

What happens here:

  • async def defines the coroutine
  • await pauses the coroutine until the operation completes, yielding control back to the event loop
Note: Calling an async def function does not execute it — it returns a Coroutine Object (协程对象). The coroutine only runs when it is await-ed or wrapped in a Task via asyncio.create_task().

💡 One-line Takeaway
asyncio achieves concurrency on a single thread by having the Event Loop (事件循环) continuously schedule Coroutines (协程) — each coroutine runs until it hits an await, yields control, and is resumed by the event loop once its I/O is ready.

II. Python asyncio — Complete API Reference & Usage Scenarios#

Overview: This note is a complete API reference for Python's asyncio library, organized by category. Every interface is paired with a real-world usage scenario so you can immediately see when and why to use it. The library is built on a single-threaded Event Loop (事件循环) that schedules Coroutines (协程) cooperatively, making it ideal for I/O-bound (I/O 密集型) workloads.

1. Entry Points — Running Coroutines#

1) asyncio.run(coro)#

The top-level entry point for running an async program. Creates a new Event Loop (事件循环), runs the coroutine to completion, then closes the loop.

import asyncio
async def main():
print("Hello from asyncio!")
asyncio.run(main())
Note: Never call asyncio.run() inside an already-running event loop (e.g., inside Jupyter Notebook). Use await coro directly instead, or apply nest_asyncio.

Scenario: Entry point of any standalone async application — CLI tools, scripts, servers.


2) asyncio.get_event_loop() / asyncio.get_running_loop()#

async def main():
loop = asyncio.get_running_loop() # Preferred inside async context
print(loop)
loop = asyncio.get_event_loop() # Can be used outside async context
APIWhen to Use
get_running_loop()Inside a coroutine — raises RuntimeError if no loop is running
get_event_loop()Outside a coroutine — may create a new loop if none exists

2. Coroutines & Tasks (协程与任务)#

1) async def / await — Defining and Awaiting Coroutines#

async def fetch(url: str) -> str:
await asyncio.sleep(1) # Yield control to event loop
return f"data from {url}"
async def main():
result = await fetch("https://api.example.com")
print(result)

Scenario: Any function that performs I/O — HTTP requests, DB queries, file reads.


2) asyncio.create_task(coro) — Schedule Concurrently#

Wraps a coroutine into a Task (任务) and schedules it to run on the current event loop immediately — without blocking the caller.

async def worker(name: str, delay: float):
await asyncio.sleep(delay)
print(f"{name} done")
async def main():
t1 = asyncio.create_task(worker("A", 2.0))
t2 = asyncio.create_task(worker("B", 1.0))
await t1
await t2
# Total time ≈ 2s, not 3s

Scenario: Fire multiple independent I/O operations simultaneously (parallel API calls, parallel DB queries).

Note: A task that is created but never awaited will still run, but any exception it raises will be silently discarded. Always await your tasks or attach a add_done_callback.

3) asyncio.Task Methods#

async def main():
task = asyncio.create_task(worker("A", 5.0))
task.cancel() # Request cancellation
print(task.done()) # True if finished/cancelled/errored
print(task.cancelled()) # True if cancelled
print(task.result()) # Returns result (raises if not done)
task.add_done_callback(lambda t: print("finished:", t))
MethodPurpose
cancel()Request cancellation — injects CancelledError at next await
done()True if completed, cancelled, or raised an exception
result()Returns the return value, or re-raises the exception
exception()Returns the exception if one was raised, else None
add_done_callback(fn)Register a callback to run when the task finishes

3. Concurrency Helpers (并发工具)#

1) asyncio.gather(*coros, return_exceptions=False)#

Runs multiple awaitables (可等待对象) concurrently, returns a list of results in the same order as input.

async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
)
print(results) # ["data from url1", "data from url2", "data from url3"]

With exception handling:

results = await asyncio.gather(
fetch("url1"),
failing_fetch(),
return_exceptions=True # Exceptions returned as values, not raised
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")

Scenario: Batch HTTP requests, parallel DB lookups, loading multiple config files simultaneously.


2) asyncio.wait(tasks, return_when=...)#

Returns two sets: done and pending. Gives fine-grained control over when to stop waiting.

async def main():
tasks = {asyncio.create_task(fetch(url)) for url in urls}
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
return_whenBehavior
ALL_COMPLETEDWait for all tasks (default)
FIRST_COMPLETEDReturn as soon as any task finishes
FIRST_EXCEPTIONReturn as soon as any task raises

Scenario: Race condition (竞态) — take the first successful result and cancel the rest (e.g., querying multiple replicas, use whichever responds first).


3) asyncio.as_completed(coros)#

Yields tasks in completion order (not submission order).

async def main():
coros = [fetch(url) for url in urls]
for future in asyncio.as_completed(coros):
result = await future
print(f"Got: {result}") # Processed as each one finishes

Scenario: Show results to the user as they arrive, without waiting for the slowest request.


4) asyncio.TaskGroup (Python 3.11+) — Structured Concurrency (结构化并发)#

If any task raises, all remaining tasks are automatically cancelled.

async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch("url1"))
t2 = tg.create_task(fetch("url2"))
# All done here — or all cancelled if one failed
print(t1.result(), t2.result())

Scenario: Any workflow where subtasks are all required — if one fails, the whole group should abort (e.g., a multi-step pipeline).


4. Timeouts & Cancellation (超时与取消)#

1) asyncio.wait_for(coro, timeout)#

async def main():
try:
result = await asyncio.wait_for(fetch("url"), timeout=3.0)
except asyncio.TimeoutError:
print("Request timed out after 3s")

Scenario: Any network call that must complete within a deadline (SLA enforcement, user-facing APIs).


2) asyncio.timeout(seconds) (Python 3.11+)#

A context-manager (上下文管理器) version of timeout — more composable than wait_for.

async def main():
try:
async with asyncio.timeout(5.0):
result = await fetch("url")
await process(result)
except TimeoutError:
print("Entire block timed out")

Scenario: Apply a single deadline across multiple awaits inside a block.


3) asyncio.shield(coro) — Protect from Cancellation#

Prevents the inner coroutine from being cancelled when the outer task is cancelled.

async def important_cleanup():
await asyncio.sleep(1)
print("Cleanup done")
async def main():
task = asyncio.create_task(important_cleanup())
try:
await asyncio.shield(task)
except asyncio.CancelledError:
print("Outer cancelled, but cleanup still runs!")
await task # Wait for it to actually finish

Scenario: Protect a critical cleanup/commit operation from being interrupted by a cancellation signal.


4) Handling CancelledError#

async def worker():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Cleaning up before cancel...")
await do_cleanup()
raise # Always re-raise!
Note: Always re-raise CancelledError after cleanup. Swallowing it breaks the cancellation chain. In Python 3.8+, CancelledError is a subclass of BaseException, not Exception.

5. Synchronization Primitives (同步原语)#

1) asyncio.Lock — Mutual Exclusion (互斥锁)#

lock = asyncio.Lock()
async def safe_write(db, data):
async with lock:
await db.write(data) # Only one coroutine at a time

Scenario: Protecting shared in-memory state (counters, caches, connection pools) from concurrent modification.


2) asyncio.Semaphore — Concurrency Limiter (并发限制)#

sem = asyncio.Semaphore(10) # Max 10 concurrent requests
async def rate_limited_fetch(session, url):
async with sem:
return await session.get(url)
async def main():
async with aiohttp.ClientSession() as session:
tasks = [rate_limited_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)

Scenario: Rate-limiting API calls, capping DB connection count, controlling concurrent file handles.


3) asyncio.BoundedSemaphore#

Same as Semaphore but raises ValueError if release() is called more times than acquire().

Scenario: Safety-critical resource pools where over-releasing would be a bug.


4) asyncio.Event — Signal Between Coroutines (协程间信号)#

event = asyncio.Event()
async def producer():
await asyncio.sleep(2)
print("Data ready")
event.set() # Signal the consumer
async def consumer():
await event.wait() # Block until set
print("Processing data")
async def main():
await asyncio.gather(producer(), consumer())

Scenario: One-shot notification — signal consumers when data/resource becomes available.


5) asyncio.Condition — Wait + Notify Pattern#

condition = asyncio.Condition()
buffer = []
async def producer():
async with condition:
buffer.append("item")
condition.notify_all() # Wake all waiting consumers
async def consumer():
async with condition:
await condition.wait_for(lambda: len(buffer) > 0)
item = buffer.pop()

Scenario: Multiple consumers waiting on a shared resource to reach a specific state.


6) asyncio.Queue — Producer-Consumer (生产者-消费者)#

async def producer(q: asyncio.Queue):
for i in range(10):
await q.put(i)
await q.put(None) # Sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
if item is None:
break
await process(item)
q.task_done()
async def main():
q = asyncio.Queue(maxsize=5)
await asyncio.gather(producer(q), consumer(q))
Queue TypeBehavior
QueueFIFO (先进先出)
LifoQueueLIFO / stack (后进先出)
PriorityQueueSmallest item dequeued first (优先队列)

Scenario: Pipeline architectures — web crawlers, log processors, streaming data ingestion.


6. Async Context Managers & Iterators (异步上下文管理器与迭代器)#

1) async with — Async Context Manager (异步上下文管理器)#

Implement aenter and aexit:

class AsyncDB:
async def __aenter__(self):
self.conn = await connect_db()
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async def main():
async with AsyncDB() as conn:
result = await conn.query("SELECT 1")

Scenario: Any resource requiring async setup/teardown — DB connections, HTTP sessions, file handles, locks.


2) @asynccontextmanager — Decorator Shortcut#

from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_connection():
conn = await connect_db()
try:
yield conn
finally:
await conn.close()
async def main():
async with managed_connection() as conn:
await conn.query("SELECT 1")

Scenario: Simpler alternative to writing a full class when you need a one-off async context manager.


3) async for — Async Iterator (异步迭代器)#

Implement aiter and anext, or use an async generator:

async def paginated_api(base_url: str):
page = 1
while True:
data = await fetch(f"{base_url}?page={page}")
if not data:
break
yield data
page += 1
async def main():
async for page in paginated_api("https://api.example.com/items"):
await process(page)

Scenario: Paginated APIs, streaming database cursors, real-time event streams (WebSocket, SSE).


7. Running Blocking Code (在异步中运行阻塞代码)#

1) asyncio.to_thread(func, *args) (Python 3.9+)#

import time
async def main():
# Run blocking I/O in a thread without freezing the event loop
result = await asyncio.to_thread(time.sleep, 2)

Scenario: Legacy blocking libraries (e.g., requests, psycopg2, time.sleep), file system operations.


2) loop.run_in_executor(executor, func, *args)#

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
# Thread pool — for blocking I/O
result = await loop.run_in_executor(None, blocking_io_func, arg)
# Process pool — for CPU-bound work
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_func, arg)
ExecutorUse Case
None (default ThreadPool)Blocking I/O, legacy libraries
ThreadPoolExecutorExplicit thread pool sizing
ProcessPoolExecutorCPU-bound tasks (bypasses GIL)

Scenario: Image processing, ML inference on CPU, compression, encryption — any heavy computation alongside async I/O.


8. Streams — High-Level Network I/O (高层网络 I/O)#

1) asyncio.open_connection(host, port) — TCP Client#

async def tcp_client():
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write(b"Hello\n")
await writer.drain()
data = await reader.readline()
print(f"Received: {data.decode()}")
writer.close()
await writer.wait_closed()

Scenario: Custom TCP clients — talking to Redis, custom protocols, game servers.


2) asyncio.start_server(handler, host, port) — TCP Server#

async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(data) # Echo back
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
async with server:
await server.serve_forever()

Scenario: Building lightweight TCP/protocol servers (chat, telnet, custom RPC).


9. Subprocesses (异步子进程)#

1) asyncio.create_subprocess_exec() / asyncio.create_subprocess_shell()#

async def run_command():
proc = await asyncio.create_subprocess_exec(
"ls", "-la",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
print(stdout.decode())
async def run_shell():
proc = await asyncio.create_subprocess_shell(
"echo hello && sleep 1 && echo world",
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
print(stdout.decode())
APIUse Case
create_subprocess_execSafe — no shell injection, explicit args
create_subprocess_shellConvenient — supports pipes/redirects, shell expansion

Scenario: Running external tools (ffmpeg, git, compilers) without blocking the event loop.


10. Utilities & Introspection (工具与内省)#

1) asyncio.sleep(delay, result=None)#

async def main():
await asyncio.sleep(0) # Yield control without waiting (common pattern)
await asyncio.sleep(1.5) # Wait 1.5 seconds
val = await asyncio.sleep(2, result="done") # Returns result after delay
print(val) # "done"

Scenario: sleep(0) is used to yield control voluntarily in tight loops, preventing event loop starvation (事件循环饥饿).


2) asyncio.current_task() / asyncio.all_tasks()#

async def main():
me = asyncio.current_task()
me.set_name("main-task")
all_running = asyncio.all_tasks()
print(f"Running tasks: {len(all_running)}")

Scenario: Debugging, logging task names, graceful shutdown (cancel all tasks on SIGINT).


3) asyncio.ensure_future(coro_or_future)#

Schedules a coroutine or wraps a Future (期约) into a Task. Largely superseded by create_task() in modern code.

task = asyncio.ensure_future(my_coro()) # Legacy — prefer create_task()

4) asyncio.wrap_future(future) — Bridge with concurrent.futures#

Wraps a concurrent.futures.Future into an asyncio-compatible asyncio.Future.

import concurrent.futures
def blocking():
return 42
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(blocking)
result = await asyncio.wrap_future(future)
print(result) # 42

Scenario: Integrating existing concurrent.futures-based code into an asyncio application.


5) asyncio.isfuture() / asyncio.iscoroutine() / asyncio.iscoroutinefunction()#

import asyncio
async def my_coro(): pass
print(asyncio.iscoroutinefunction(my_coro)) # True
print(asyncio.iscoroutine(my_coro())) # True
print(asyncio.isfuture(asyncio.Future())) # True

Scenario: Writing framework code or decorators that need to handle both sync and async callables.


11. Low-level Event Loop APIs (底层事件循环接口)#

1) loop.call_soon(callback, *args) / loop.call_later(delay, callback)#

Schedule a plain (non-coroutine) callback:

loop = asyncio.get_event_loop()
loop.call_soon(print, "scheduled immediately")
loop.call_later(2.0, print, "scheduled in 2s")
loop.call_at(loop.time() + 5.0, print, "scheduled at absolute time")

Scenario: Integrating callback-based legacy code into an asyncio event loop.


2) loop.add_reader(fd, callback) / loop.add_writer(fd, callback)#

Register a callback to fire when a file descriptor (文件描述符) becomes readable/writable.

loop.add_reader(sock.fileno(), on_data_received)
loop.remove_reader(sock.fileno())

Scenario: Building low-level protocol handlers — custom socket management, raw I/O multiplexing.


3) asyncio.Protocol / asyncio.DatagramProtocol#

The low-level callback-based protocol interface, underlying StreamReader/StreamWriter.

class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data: bytes):
self.transport.write(data) # Echo
def connection_lost(self, exc):
print("Connection closed")
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(EchoProtocol, "127.0.0.1", 8888)
async with server:
await server.serve_forever()

Scenario: High-performance servers where the overhead of StreamReader/StreamWriter is unacceptable, or when implementing a custom protocol (e.g., custom binary framing).


12. Graceful Shutdown Pattern (优雅关闭模式)#

import asyncio
import signal
async def main():
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
tasks = [asyncio.create_task(worker(i)) for i in range(5)]
await stop # Block until SIGINT/SIGTERM
print("Shutting down...")
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print("All tasks cancelled. Bye.")
asyncio.run(main())

Scenario: Any long-running async service (web server, bot, background processor) that must clean up gracefully on Ctrl+C or a system signal.


13. Quick API Comparison Table#

APICategoryKey Trait
asyncio.run()Entry pointCreates + closes loop; top-level only
create_task()SchedulingNon-blocking schedule; returns Task
gather()ConcurrencyAll results in order; short-circuits on exception
wait()ConcurrencyReturns done/pending sets; fine control
as_completed()ConcurrencyYields in completion order
TaskGroupStructuredAuto-cancel on failure (3.11+)
wait_for()TimeoutCancels task on timeout
timeout()TimeoutContext manager; covers multiple awaits (3.11+)
shield()CancellationProtects inner task from outer cancel
LockSyncMutex; one at a time
SemaphoreSyncN at a time; rate limiting
EventSyncOne-shot signal
ConditionSyncWait-for-state with notify
QueueSyncFIFO producer-consumer
to_thread()BlockingOffload to thread pool (3.9+)
run_in_executor()BlockingThread or process pool
open_connection()StreamsHigh-level TCP client
start_server()StreamsHigh-level TCP server
ProtocolLow-levelCallback-based; max performance

💡 One-line Takeaway
Master the three tiers: use gather / TaskGroup for everyday concurrency, Lock / Semaphore / Queue for coordination, and to_thread / run_in_executor to escape blocking code — everything else (streams, protocols, signals) builds on these foundations.
Python asyncio
https://lxy-alexander.github.io/blog/posts/python/python-asyncio/
Author
Alexander Lee
Published at
2026-03-08
License
CC BY-NC-SA 4.0