python异步编程-asyncio

asyncio是什么

asyncioPython 标准库中的一个模块,用于编写异步(asynchronous)程序。它提供了一套完整的工具,让你可以用 async/await 语法编写并发代码,特别适合处理 I/O 密集型任务(比如网络请求、文件读写、数据库查询等),而不会阻塞整个程序。

概念学习

并发

并发(Concurrency)是一种“效果”——多个任务在一段时间内交替或同时推进。 它可以通过多种方式实现,常见的有:

  1. 多线程(Multithreading)
  2. 异步编程(Asynchronous programming,如 asyncio)
  3. 多进程(Multiprocessing)(严格说更偏向“并行”,但也支持并发)

进程(Process)线程(Thread)协程(Coroutine)

概念 定义
进程(Process) 操作系统进行资源分配和调度的基本单位。它是程序的一次执行实例,拥有独立的虚拟地址空间、文件描述符表、环境变量、信号处理表等内核资源。
线程(Thread) 进程内的执行流(execution context),是 CPU 调度的基本单位。同一进程内的多个线程共享该进程的地址空间和大部分资源(如堆、全局变量、打开的文件),但各自拥有独立的栈、寄存器状态和线程局部存储(TLS)。
协程(Coroutine) 一种用户态的轻量级并发原语,属于协作式多任务(cooperative multitasking)模型。协程的切换由程序显式控制(如通过 awaityield),不依赖操作系统调度,上下文切换在用户空间完成,无内核介入。

线程”在 CPU 核心上运行。 进程是资源容器,线程是执行单位。

内存与资源共享模型

模型 地址空间 堆(Heap) 栈(Stack) 同步机制
进程 独立 独立 独立 需 IPC(如管道、消息队列、共享内存 + 信号量)
线程 共享(同进程内) 共享 独立(每个线程一个栈) 需互斥锁(Mutex)、条件变量等防止数据竞争
协程 共享(同一线程内) 共享 逻辑独立(由运行时管理协程栈或使用生成器状态) 通常无需锁(因单线程串行执行),但需注意异步回调中的状态一致性

在 Python 中的实现

模型 标准库模块 关键 API
进程 multiprocessing Process, Pool, Queue, Pipe, Manager
线程 threading Thread, Lock, Condition, Semaphore
协程 asyncio + async/await async def, await, asyncio.run(), create_task(), gather()

进程资源隔离与并行计算的基石;

线程操作系统级并发的传统手段,但在 Python 中受 GIL 限制;

协程高并发 I/O 的现代解决方案,以极低开销实现大规模并发,已成为 Python 异步编程的事实标准。

1
2
3
4
操作系统
└── 进程(Process) ← 资源容器(内存、文件描述符等)
└── 线程(Thread) ← CPU 调度单元(至少一个)
└── 协程(Coroutine) ← 用户态逻辑任务(可多个,协作式切换)

当你运行一个 Python 脚本(如 python app.py),操作系统会:

  1. 创建一个新进程(Process)
  2. 加载 Python 解释器(CPython)
  3. 在该进程中启动主线程(Main Thread)
  4. 执行你的代码

📌 所以:一个正在运行的 Python 程序 = 1 个进程 + 至少 1 个线程(主线程)

但注意:

  • 程序可以创建更多进程(通过 multiprocessing
  • 程序可以创建更多线程(通过 threading
  • 所以“一个程序”最终可能对应 多个进程、多个线程

Python 异步编程的核心机制

  1. async def 定义协程函数 → 返回 协程对象(Coroutine Object)
  2. await 用于挂起当前协程,等待另一个协程或异步操作完成
  3. 事件循环(Event Loop)(由 asyncio 提供)负责:
    • 调度协程
    • 管理 I/O 多路复用(如 epoll/kqueue
    • 在 I/O 就绪时恢复对应协程

实战学习

sync_demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from time import sleep, perf_counter

def fetch_url(url):
print('Fetching the URL')
sleep(1)#模拟阻塞
print('Finished fetching')
return 'url_content'

def read_file(filepath):
print('Reading the file')
sleep(1)#模拟阻塞
print('Finished reading')
return 'file_content'

def main():
url = 'example.com'
filepath = 'example.txt'
fetch_result = fetch_url(url)
read_result = read_file(filepath)

if __name__ == '__main__':
start_time = perf_counter()
main()
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上展示一份同步的代码,sleep用来模拟阻塞

1
2
3
4
5
Fetching the URL
Finished fetching
Reading the file
Finished reading
Time taken: 2.00 seconds

async_demo.py

异步编程的三步核心流程

1. 定义协程函数 → 2. 包装协程为任务 → 3. 建立事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from time import sleep, perf_counter
import asyncio

#定义协程函数
async def fetch_url(url):
print(f'Fetching the url')
#把阻塞操作包装成协程
await asyncio.sleep(1)
print('Finished fetching')
return 'url_content'

async def read_file(filepath):
print('Reading the file')
await asyncio.sleep(1)
print('Finished reading')
return 'file_content'

#若想使用await,需要把main函数定义成协程函数
async def main():
url = 'example.com'
filepath = 'example.txt'
#创建任务
tasks = [asyncio.create_task(coro) for coro in [
fetch_url(url),
read_file(filepath)]]
fetch_result = await tasks[0]
read_result = await tasks[1]

if __name__ == '__main__':
start_time = perf_counter()
#创建事件循环
asyncio.run(main())
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上则是改为了异步的代码

1
2
3
4
5
Fetching the url
Reading the file
Finished fetching
Finished reading
Time taken: 1.01 seconds

协程函数

协程函数(Coroutine Function)是 Python 中使用 async def 语法定义的函数,它是异步编程的核心构建单元。调用协程函数不会立即执行其内部代码,而是返回一个 协程对象(Coroutine Object),该对象必须由事件循环(如 asyncio)驱动或通过 await 在另一个协程中调用,才能真正执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

#协程函数
async def coroutine_func():
return 'coroutine_result'

def main():
print(coroutine_func())
result = asyncio.run(coroutine_func())
print("---")
print(result)


if __name__ == '__main__':
main()
1
2
3
4
5
6
<coroutine object coroutine_func at 0x000001958D88A4B0>
d:\code\python\learn_pythontips\learn_async\coroutine_func.py:8: RuntimeWarning: coroutine 'coroutine_func' was never awaited
print(coroutine_func())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
---
coroutine_result

由输出可见,coroutine object,返回的是一个协程对象

await关键字

await 是 Python 异步编程中的核心关键字,它的作用是:

暂停当前协程的执行,等待一个“可等待对象”(awaitable)完成,并获取其结果,同时将控制权交还给事件循环,使其能运行其他任务。

✅ 三大功能:

  1. 挂起(Suspend):当前协程在此处暂停,不阻塞线程。
  2. 等待(Wait):等待一个异步操作(如网络请求、文件读写、定时器)完成。
  3. 恢复(Resume):当被等待的对象完成后,协程从此处恢复执行,并拿到结果。

⚠️ 关键:await 不会阻塞整个线程,而是让事件循环去执行其他就绪的协程。

事件循环(Event Loop)与任务(Task)

事件循环(Event Loop) 是异步编程的核心引擎,尤其在 Python 的 asyncio 模型中,它是驱动协程执行、管理异步 I/O、调度任务的中枢系统

事件循环是一个程序结构,用于监听和分发事件或消息,实现非阻塞 I/O 和协作式多任务调度。

在 Python asyncio 中,事件循环:

  • 维护一个待执行协程队列
  • 管理定时器(如 asyncio.sleep
  • 使用操作系统提供的 I/O 多路复用机制(如 Linux 的 epoll、macOS 的 kqueue、Windows 的 IOCP)来高效监听大量文件描述符(如 socket)
  • 在 I/O 就绪时,恢复对应的协程

那事件循环如何知道哪些协程可以执行,哪些协程需要暂停呢

在 Python 异步编程(特别是 asyncio)中,任务(Task) 是是对协程(Coroutine)的封装,用于被事件循环调度和并发执行

Taskasyncio 中表示“未来会完成的异步操作”的对象,它是 Future 的子类,用于包装协程并自动调度其执行。

核心作用:

  1. 将协程注册到事件循环中,使其能够并发运行(而非顺序等待)
  2. 提供状态管理(如是否完成、是否取消、结果或异常)
  3. 支持取消操作task.cancel()
  4. 允许多次 await(协程对象只能 await 一次,但 Task 可以)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def work(name, delay):
print(f"开始 {name}")
await asyncio.sleep(delay)
print(f"完成 {name}")
return f"result-{name}"

async def main():
# 创建任务 → 立即开始执行!
task1 = asyncio.create_task(work("A", 2))
task2 = asyncio.create_task(work("B", 1))

# 等待任务完成(可获取结果)
r1 = await task1
r2 = await task2
print(r1, r2)

asyncio.run(main())
1
2
3
4
5
开始 A
开始 B
完成 B
完成 A
result-A result-B

💡 Task 是实现并发的关键:它让多个协程“同时启动”,而不是“一个接一个等”。

asyncio.gather

asyncio.gather用于并发运行多个 awaitable 对象(如协程、Task、Future),并按顺序返回它们的结果列表

默认:任意一个任务出错,其他任务会被取消(除非 return_exceptions=True

1
2
3
4
5
6
7
async def main():
url = 'example.com'
filepath = 'example.txt'
results = await asyncio.gather(
fetch_url(url),
read_file(filepath))
print(results)
1
2
3
4
5
6
Fetching the url
Reading the file
Finished fetching
Finished reading
['url_content', 'file_content']
Time taken: 1.00 seconds

asyncio.as_completed

asyncio.as_completed返回一个异步迭代器(async iterator),按任务完成的先后顺序,逐个产出已完成的 awaitable 对象(通常是 Task 或 Future)。

1
2
3
4
5
results = asyncio.as_completed([
fetch_url(url),
read_file(filepath)])
for result in results:
print(await result)

特点:先完成的任务先被处理,无需等待所有任务结束

asyncio.to_thread

await asyncio.to_thread(func, \*args)默认线程池中运行同步函数 func(*args),并返回结果。 它是 loop.run_in_executor(None, func, *args)高层封装

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import time

def blocking_io():
time.sleep(1)
return "Done!"

async def main():
result = await asyncio.to_thread(blocking_io)
print(result)

asyncio.run(main())

aiohttp和aiofiles

aiohttp:异步 HTTP 客户端与服务器框架

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio

async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

asyncio.run(fetch("https://example.com"))

aiofiles 是一个为标准文件操作提供异步接口的库,允许你在 async/await 代码中安全地读写文件,而不会阻塞事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import aiofiles
import asyncio

async def main():
# 写文件
async with aiofiles.open('data.txt', 'w') as f:
await f.write('Hello, async files!')

# 读文件
async with aiofiles.open('data.txt', 'r') as f:
content = await f.read()
print(content)

asyncio.run(main())

参考资料

【Py】asyncio:为异步编程而生 | Python 特性 | 并发编程 | 协程_哔哩哔哩_bilibili

【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。_哔哩哔哩_bilibili