操作系统概论

操作系统的主要特征

并发性 (Concurrency)

🔍 核心定义

并发性是指两个或两个以上的事件或活动在同一时间间隔内发生。

  • 关键点:“同一时间间隔” ≠ “同一时刻”。它强调的是“看起来同时”,而不是“真正同时”。

📌 操作系统中的体现

  1. 多个 I/O 设备同时工作
    • 你的键盘在输入,打印机在打印,网卡在收发数据。这些设备都在“同时”工作。
  2. I/O 和 CPU 计算同时进行
    • 当 CPU 在计算时,I/O 设备可能在后台传输数据。CPU 不需要等待 I/O 完成,可以去处理其他任务。
  3. 内存中多个程序交替执行
    • 这是最核心的体现。操作系统通过时间片轮转(Time-Slicing)等调度算法,让多个程序“轮流”使用 CPU,从而实现“宏观上的并发”。

🖼️ 并发 vs 并行

这是你 PPT 中提出的关键问题!

特性 并发 (Concurrency) 并行 (Parallelism)
定义 多个任务在同一时间间隔内交替执行。 多个任务在同一时刻真正同时执行。
物理基础 单 CPU 系统即可实现。 需要多核 CPU 或多处理器系统。
效果 “看起来”同时进行。 “真正”同时进行。
类比 一个人在厨房里,一会儿切菜,一会儿炒菜,一会儿洗碗。 三个人在厨房里,一个人切菜,一个人炒菜,一个人洗碗。

一句话总结并行是并发的一种特例。并发是“逻辑上的同时”,并行是“物理上的同时”。

共享性 (Sharing)

🔍 核心定义

共享性指操作系统中的资源(包括硬件资源和软件资源)可被多个并发执行的进程共同使用,而不是被一个进程所独占。

  • 关键点:共享不等于“无限制访问”,它必须在操作系统管理下进行,以保证安全和有序。

📌 资源共享的方式

1️⃣ 透明资源共享 / 同时共享方式

  • 含义:允许多个进程在同一时间段内对资源进行访问,好像每个进程都独占资源一样。
  • 特点
    • 访问的次序对结果无影响。
    • 通常用于可重入只读的资源。
  • 例子
    • CPU:通过时间片轮转,让多个进程“同时”使用 CPU。
    • 主存 (RAM):多个进程的代码和数据可以同时存在于内存中。
    • 磁盘:多个进程可以同时读取磁盘上的不同文件。
    • 打印机:虽然物理上一次只能打印一个任务,但操作系统可以通过“打印队列”实现逻辑上的“同时共享”。

2️⃣ 独占资源共享 / 互斥共享方式

  • 含义:在同一时间段内只允许一个进程访问资源。
  • 特点
    • 这类资源称为临界资源 (Critical Resource)
    • 必须通过互斥机制(如锁、信号量)来保护。
  • 例子
    • 磁带机:一次只能由一个进程控制。
    • 扫描仪:一次只能扫描一份文档。
    • 数据库中的某一行记录:如果两个事务同时修改同一行,会导致数据不一致。

🛠️ 操作系统如何管理共享?

  • 提供显式资源共享机制:如 fork(), semaphore, mutex, lock 等系统调用。
  • 将互斥访问下放给用户决策:程序员需要自己负责加锁和解锁,操作系统提供工具。

异步性 (Asynchrony)

🔍 核心定义

异步性指在多道程序环境中,由于资源有限而进程众多,多数情况下,进程的执行不是一气呵成,而是“走走停停”。

  • 关键点:进程的执行是不可预测的,它的推进速度取决于系统调度、I/O 等待、中断等多种因素。

📌 异步性的表现

  1. 作业到达系统的时间和类型不确定
    • 用户随时可能启动一个新程序。
  2. 操作员发出命令或操作的时间和类型不确定
    • 用户可能随时按下键盘或点击鼠标。
  3. 程序运行发生错误或异常的类型和时刻不确定
    • 程序可能因为除零、内存溢出等原因崩溃。
  4. 中断事件发生的时刻不确定
    • 时钟中断、I/O 中断、硬件故障中断等都是随机发生的。
1
2
3
4
5
6
7
graph TD
A[并发性] --> B[多个任务同时执行]
C[共享性] --> D[资源被多个任务共同使用]
E[异步性] --> F[任务执行“走走停停”]

B & D & F --> G[现代操作系统的核心特征]
G --> H[实现多任务、多用户环境]

多道程序设计

核心思想

多道程序设计是指允许多个程序同时驻留在内存中,并由操作系统统一管理和调度,使它们交替(并发)地使用 CPU 和其他系统资源。

  • 核心目的掩盖 I/O 等待时间,提高 CPU 和系统资源的利用率
  • 终极目标:让昂贵的 CPU 永远不要闲着
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
gantt
title 单道程序 vs 多道程序
dateFormat HH:mm:ss
axisFormat %Ss

section 单道程序
作业A-CPU :crit, a1, 00:00:00, 2s
作业A-I/O :active, a2, after a1, 8s
作业A-CPU :crit, a3, after a2, 2s
作业A-I/O :active, a4, after a3, 8s

section 多道程序
作业A-CPU :crit, b1, 00:00:00, 2s
作业B-CPU :crit, b2, after b1, 2s
作业C-CPU :crit, b3, after b2, 2s
作业A-I/O :active, b4, after b1, 8s
作业B-I/O :active, b5, after b2, 8s
作业C-I/O :active, b6, after b3, 8s

对比:在多道程序中,当作业 A 在等待 I/O 时,CPU 立刻去执行作业 B 和 C。CPU 几乎没有空闲时间,利用率接近 100%!

cpu利用率

CPU利用率 = 1 - p^n

🔍 假设条件

  1. 系统中有 n 个程序 同时在内存中。
  2. 每个程序平均有 p 的概率在等待 I/O 操作
    • 例如,p = 0.8 表示一个程序有 80% 的时间在等磁盘读写、键盘输入等,只有 20% 的时间在真正使用 CPU。
  3. 各个程序的等待操作是相互独立的
    • 这是一个关键假设,意味着一个程序是否在等 I/O,不影响其他程序。

💡 公式推导

  • CPU 空闲的概率:当且仅当所有 n 个程序都在等待 I/O 时,CPU 才会空闲。
  • 因为每个程序等待 I/O 的概率是 p,且它们相互独立,所以所有 n 个程序都等待 I/O 的概率是 p^n
  • 因此,CPU 空闲的概率 = p^n
  • CPU 利用率 = 1 - CPU 空闲的概率 = 1 - p^n

若进程平均花费 80% 的时间等待 I/O,则为了使得 CPU 利用率不低于 80%,应至少有多少道程序在主存中运行?

计算过程

根据公式:

CPU利用率 = 1 - p^n ≥ 0.8

移项得:

p^n ≤ 0.2

代入 p = 0.8

0.8^n ≤ 0.2

两边取对数(以 10 为底或自然对数均可):

n * log(0.8) ≤ log(0.2)

注意:log(0.8) 是负数,所以在除的时候要反转不等号方向

n ≥ log(0.2) / log(0.8)

计算数值:

  • log(0.2) ≈ -0.69897
  • log(0.8) ≈ -0.09691
  • n ≥ (-0.69897) / (-0.09691) ≈ 7.21

因为 n 必须是整数,且要满足 n ≥ 7.21,所以:

n = 8

✅ 最终答案

为了使得 CPU 利用率不低于 80%,应至少有 8 道程序在主存中运行。

是不是同时运行的程序越多越好?

不是!同时运行的程序(道数)并不是越多越好。存在一个最优的“道数”,超过这个值,系统的整体效率反而会下降。

当道数 n 超过某个临界值后,系统性能会急剧下降。主要原因有:

1️⃣ 上下文切换开销 (Context Switching Overhead)

什么是上下文切换?

  • 当操作系统从一个进程切换到另一个进程时,它需要保存当前进程的状态(寄存器、内存映射、程序计数器等),并加载下一个进程的状态。

2️⃣ 内存压力 (Memory Pressure)

  • 每个进程都需要内存:代码段、数据段、堆、栈、页表等。

3️⃣ 资源竞争加剧 (Resource Contention)

  • 锁竞争:多个进程同时访问共享资源(如数据库连接池、文件锁),需要排队等待,增加了延迟。
  • 缓存失效:多个进程的指令和数据交替进入 CPU 缓存,导致缓存命中率降低,CPU 需要更频繁地从内存读取数据。

处理器状态

为什么需要两种处理器状态?

现代计算机是一个多用户、多任务的环境。如果所有程序都能随意执行任何指令,那么一个不小心的 bug 或一个恶意程序就可能:

  • 格式化硬盘。
  • 修改系统时间。
  • 访问其他用户的隐私数据。
  • 导致系统崩溃。

为了避免这种情况,CPU 被设计成有两种工作模式:

  1. 用户态 (User Mode):普通程序运行的状态,只能执行“安全”的指令。
  2. 核心态 (Kernel Mode / Supervisor Mode):操作系统内核运行的状态,可以执行所有指令,包括“危险”的特权指令。

程序状态字 (PSW)

Program Status Word (PSW) 是一个非常重要的寄存器。

  • 定义:PSW 是 CPU 内部的一个特殊寄存器,用于存储当前处理器的各种状态信息。
  • 关键作用:PSW 中有一个标志位(通常是最高位或某一位),用来标识当前 CPU 处于用户态还是核心态
    • PSW[bit] = 0 → 用户态
    • PSW[bit] = 1 → 核心态

这就是 CPU 判断当前是否可以执行特权指令的依据!

当 CPU 执行一条指令时,它会检查 PSW 中的这个标志位:

  • 如果是用户态,并且指令是特权指令,则触发一个异常 (Exception),操作系统会介入处理(通常是终止该程序)。
  • 如果是核心态,则允许执行。

CPU 如何判断当前是否可以执行特权指令?

答案:CPU 通过检查 程序状态字 (PSW) 中的一个特定标志位来判断。

  • 如果该标志位表示当前处于用户态,并且遇到的是特权指令,则 CPU 会触发一个异常(通常是“非法指令”或“特权指令违规”),并将控制权交给操作系统内核。
  • 操作系统内核会根据情况决定是终止该程序,还是进行其他处理。
image-20251116191358137

进程控制和管理

进程定义与属性

进程(Process)是程序在计算机上的一次执行实例,是操作系统进行资源分配、调度和保护的基本单位

为什么要引入“进程”?

1️⃣ 刻画系统的动态性(Dynamic Nature)

  • 问题:程序是静态的代码,无法描述“执行中”的状态。
  • 解决方案:进程是一个动态实体,它有生命周期(创建 → 运行 → 阻塞 → 终止)。
  • 意义:操作系统可以精确跟踪每个任务的当前状态,做出调度决策。

2️⃣ 发挥系统的并发性(Concurrency)

  • 问题:CPU 和 I/O 设备速度不匹配。程序在等待 I/O(如读文件、网络请求)时,CPU 就空闲了。
  • 解决方案:通过进程切换,让 CPU 在等待期间去执行其他任务。
  • 意义:提高了 CPU 利用率和系统吞吐量。

3️⃣ 解决资源共享与隔离的矛盾

  • 问题:多个程序可能需要共享资源(如文件、打印机),但又不能互相干扰。
  • 解决方案
    • 共享性:进程可以通过合法机制(如共享内存、消息队列)共享资源。
    • 独立性/保护性:每个进程拥有独立的地址空间,操作系统通过内存管理单元(MMU)确保 A 进程不能访问 B 进程的内存。
  • 意义:既实现了协作,又保证了安全和稳定。

进程的五大核心属性

属性 含义 举例说明
1. 动态性 进程是动态的,有生命周期(创建 → 运行 → 阻塞 → 终止)。 uvicorn 启动时创建进程,Ctrl+C 终止时销毁进程。
2. 并发性 多个进程可以“同时”运行(宏观并发,微观交替)。 一台服务器同时处理成百上千个用户的 HTTP 请求。
3. 独立性 每个进程有独立的地址空间和资源,互不干扰。 一个 Python 进程崩溃,不会导致另一个 Python 进程退出。
4. 制约性 进程间可能存在同步或互斥关系(如竞争资源、等待结果)。 多个进程写同一个日志文件,需要用文件锁避免内容错乱。
5. 共享性 进程可以通过操作系统提供的机制共享资源(如内存、文件)。 多个 FastAPI worker 进程共享一个 Redis 缓存连接池。

进程状态转换

五态模型

image-20251115204410504

七态模型

image-20251115204451732

七态模型在五态模型的基础上,显式增加了“挂起(Suspend)”的概念

挂起 = 进程被换出到外存(Swap)

  • 目的:当系统内存紧张时,操作系统会将一些暂时不活跃的进程(比如长时间阻塞的进程)从内存移到硬盘上的“交换区(Swap Space)”,以腾出内存给更紧急的任务。

挂起就绪态 (Ready/Suspend)

定义:

进程具备运行条件(即它已经准备好执行),但目前在外存中。只有当它被换入内存后,才能被调度器选中运行。

挂起等待态 (Blocked/Suspend)

定义:

进程正在等待某一个事件发生(如 I/O 完成、用户输入、网络响应),并且目前在外存中。

进程描述和组成

进程映像

进程映像(Process Image)是指进程在内存中的完整内容,包括代码、数据、堆、栈以及内核数据结构(如 PCB)等所有组成部分的集合。

image-20251115210257885

进程上下文

image-20251115210715693

寄存器上下文 (Register Context)存储在 PCB 中 包含:通用寄存器、程序计数器、栈指针、程序状态字

这是进程“灵魂”的一部分——CPU 执行时最直接依赖的状态。

PCB(Process Control Block,进程控制块)

PCB 是操作系统为每个进程创建的一个数据结构,用来记录和刻画该进程的所有状态和相关信息。

1️⃣ 进程标识信息 (Identification Information)

字段 说明
PID (Process ID) 进程的唯一数字标识,如 12345
PPID (Parent PID) 父进程的 PID,用于构建进程树。
UID/GID (User/Group ID) 进程所属用户的 ID 和组 ID,用于权限控制。

🌰 你的例子

1
2
3
import os
print(f"当前进程 PID: {os.getpid()}")
print(f"父进程 PID: {os.getppid()}")
这些值就是从 PCB 中读取的!

2️⃣ 处理器状态信息 (Processor State Information) —— 这就是“寄存器上下文”

这是 PCB 最关键的部分,用于上下文切换

字段 说明
程序计数器 (PC) 下一条要执行的指令地址。
通用寄存器 (AX, BX, CX…) 存放临时计算结果、变量地址等。
程序状态字 (PSW) 包含标志位(零标志 Z、进位标志 C、溢出标志 O 等)、中断允许位、特权级别。
栈指针 (SP) 指向当前函数调用栈的顶部。

关键点:每次进程切换时,操作系统都会将当前 CPU 寄存器的值“倾倒”进 PCB,再从新进程的 PCB “倒回”寄存器。这就是“上下文切换”的核心开销。

3️⃣ 进程调度信息 (Scheduling Information)

字段 说明
进程状态 就绪、运行、阻塞、挂起等。
进程优先级 决定调度顺序。
时间片剩余量 用于时间片轮转调度。
等待事件 如等待键盘输入、网络数据包到达等。

🌰 你的例子: 在 FastAPI 中,当一个请求在 await httpx.get(...) 时,其对应协程/线程的状态会被标记为“阻塞”,并被放入等待队列。这就是 PCB 中“进程状态”字段的作用。

4️⃣ 内存管理信息 (Memory Management Information)

字段 说明
页表基址 / 段表指针 用于虚拟内存到物理内存的地址转换。
内存分配情况 代码段、数据段、堆、栈的起始地址和大小。

💡 关键点:确保进程访问的是自己的内存空间,实现“内存保护”。

5️⃣ I/O 状态信息 (I/O Status Information)

字段 说明
打开的文件列表 文件描述符(fd)、文件指针、访问模式等。
分配的 I/O 设备 如打印机、网卡等。

🌰 你的例子: 当你在 Python 中 f = open("log.txt", "a") 时,操作系统会在 PCB 的“打开文件列表”中添加一条记录,记录这个文件句柄 f 对应的 fd。

6️⃣ 记账信息 (Accounting Information)

字段 说明
CPU 使用时间 进程已使用的 CPU 时间总和。
累计运行时间 从创建到现在的总时间。
最大内存使用量 历史峰值。

📊 用途:用于性能监控、计费、调试等。

进程队列

链接方式

image-20251115212223846

索引方式

image-20251115212241091

进程切换和处理器状态转换

image-20251115212521578

模式切换 vs. 进程切换

  1. 模式切换 (Mode Switch)

定义:CPU 在“用户态(User Mode)”和“核心态(Kernel Mode)”之间的切换。 触发方式:由中断(Interrupt)或系统调用(System Call) 引起。 目的:让操作系统获得控制权,执行特权指令(如访问硬件、修改内存映射)。

  1. 进程切换 (Process Switch / Context Switch)

定义:操作系统暂停当前正在运行的进程,保存其状态,并加载另一个进程的状态,使其开始运行。 触发方式:通常发生在核心态下,由中断或系统调用引发。 目的:实现多任务并发,公平分配 CPU 时间。

image-20251115215300054

当进程开始运行时,操作系统如何重新获得控制?

果进程一直在运行,操作系统就永远没机会调度其他进程了,系统就会卡死。

答案:中断 (Interrupt) 是关键!

  • 什么是中断? 中断就像一个“紧急电话”,它能打断 CPU 当前正在执行的程序,强制 CPU 去处理一个更高优先级的事情——通常是操作系统内核。
    • 硬件中断:由外部设备触发,比如键盘敲击、鼠标移动、网络数据包到达、定时器到期。
    • 软件中断/异常:由程序自身触发,比如除零错误、访问非法内存地址、或者程序主动发起的系统调用(如 open(), read())。

进程需要保存哪些状态?

当操作系统获得控制权后,它必须把当前正在运行的进程(比如进程0)的“工作状态”完整地记录下来,以便将来能恢复执行。这个过程叫做“保存现场 (Save Context)”。

需要保存哪些状态?

这些状态主要存储在一个叫做 PCB (Process Control Block, 进程控制块) 的数据结构里。PCB 就像是进程的“身份证 + 工作日志 + 资源清单”。

如何选择下一个待执行的进程/线程?

当操作系统保存完当前进程的状态后,它需要决定“接下来该让谁干活”。这个决策过程叫做“进程调度 (Process Scheduling)”。

如何选择?

这取决于操作系统的调度算法 (Scheduling Algorithm)

线程

为什么需要线程?—— 引入线程的动机

❓ 问题:进程模型有什么不足?

  1. 切换开销大:进程切换需要保存/恢复整个内存空间(代码、数据、堆、栈)和 PCB,开销大。
  2. 通信困难:进程间通信(IPC)需要管道、消息队列、共享内存等复杂机制,效率低。
  3. 不适合细粒度并发:比如一个 Web 服务器,每个请求都创建一个进程,成本太高。

✅ 解决方案:引入线程!

线程是进程内的一个执行单元,是 CPU 调度和分派的基本单位。

  • 同一个进程内的所有线程
    • 共享:代码段、数据段、堆、打开的文件等进程资源
    • 私有:各自的寄存器上下文

💡 核心价值实现进程内部的并发,降低切换和通信开销

什么是线程?—— 核心定义

线程(Thread)是进程中一个可并发执行的控制流,它拥有自己独立的栈和寄存器状态,但与其他线程共享进程的地址空间和资源。

线程如何工作?—— 线程的生命周期与切换

  1. 线程的生命周期状态

和进程类似,线程也有状态:新建 → 就绪 → 运行 → 阻塞 → 终止

  1. 线程切换(Thread Switching)
  • 开销远小于进程切换!因为不需要切换地址空间(页表),只需要保存/恢复寄存器上下文和栈指针
  • 切换由线程调度器(在 OS 内核或用户态库中)管理。
  1. 线程的实现方式
类型 说明 例子
用户级线程 (User-Level Threads) 由用户态线程库(如 Java Green Threads)管理,内核 unaware Python 的 greenlet(非标准)
内核级线程 (Kernel-Level Threads) 由操作系统内核直接管理,每个线程对应一个内核调度实体 Python 的 threading 模块
混合模式 用户级线程映射到少量内核线程 Go 的 Goroutine

💡 Python 的 threading 是内核级线程,但受 GIL 限制,无法真正并行执行 Python 字节码。

处理器调度

调度层次

1️⃣ 高级调度(High-Level Scheduling)—— 作业调度 / 长程调度

目标:决定哪些“作业”被允许进入系统参与 CPU 竞争。 对象:作业(Job)→ 通常是一个完整的程序或任务(如编译一个文件、运行一个脚本)。 发生频率(几分钟到几小时一次)。 执行者:操作系统内核。

🔍 核心功能:

  • 选作业进内存:从后备队列中选择作业,将其加载到内存,创建进程。
  • 控制多道程序的道数:决定同时在内存中运行多少个作业(即并发度)。太多会耗尽内存,太少会浪费 CPU。

2️⃣ 中级调度(Medium-Level Scheduling)—— 平衡调度 / 内存调度

目标:根据内存状态,决定哪些进程可以在内存中运行,哪些需要换出到外存。 对象:进程(Process)。 发生频率中等(几秒到几分钟一次)。 执行者:操作系统内核。

🔍 核心功能:

  • 选进程进出内存:当内存紧张时,将一些不活跃的进程(如长时间阻塞的进程)换出到 Swap 分区;当内存空闲时,再换回。
  • 平衡系统负载:防止内存溢出,提高系统吞吐量。

3️⃣ 低级调度(Low-Level Scheduling)—— 进程调度 / CPU 调度

目标:决定哪个就绪队列中的进程/线程获得 CPU 执行权。 对象:进程或线程(内核级线程)。 发生频率(毫秒级,每几十到几百毫秒一次)。 执行者:操作系统内核 → 这是操作系统最核心的部分

🔍 核心功能:

  • 选进程分配 CPU:从就绪队列中选出下一个要运行的进程/线程。
  • 执行上下文切换:保存当前进程上下文,恢复新进程上下文。
  • 实现公平与效率:通过调度算法(如 RR、优先级、MLFQ)保证所有进程都能得到 CPU 时间。
1
2
3
4
5
6
7
8
graph LR
A[高级调度] -->|选作业进内存| B[中级调度]
B -->|选进程进出内存| C[低级调度]
C -->|选进程分配 CPU| D[CPU 执行]

style A fill:#f9d5e5,stroke:#333
style B fill:#e3eaa7,stroke:#333
style C fill:#b2d3c2,stroke:#333

调度算法评价指标

image-20251116131448708

七种调度策略

先来先服务 (First Come First Serverd, FCFS)

image-20251116132753390

短作业优先 (Shortest Job First, SJF)

image-20251116132925377

最短剩余时间优先 (Shortest Remaining Time First, SRTF)

image-20251116133352988

最高响应比优先 (Highest Response Ratio First, HRRF)

image-20251116133432719

优先级调度 (Priority Scheduling)

image-20251116141512909

轮转调度 (Round Robin Scheduling, RR)

image-20251116141611150

多级反馈队列调度 (Multi-Level Feedback Queue, MLFQ)

image-20251116144745818

并发:互斥与同步

进程交互

为什么需要“进程交互”?

在单进程时代,程序是“独占”的——它不需要考虑别人。但在现代操作系统中:

  • 多个进程/线程同时运行。
  • 它们可能共享资源(如内存、文件、数据库连接)。
  • 它们可能需要协同完成一个复杂任务(如一个 Web 请求涉及多个微服务)。

这就产生了两个根本性问题:

  1. 竞争(Competition):多个进程争抢同一个资源,导致结果不可预测。
  2. 协作(Cooperation):多个进程需要按特定顺序执行,才能完成共同目标。

进程交互就是解决这两个问题的机制

竞争关系(进程互斥)

✅ 核心定义:

进程互斥是指若干进程因相互争夺独占型资源而产生的竞争制约关系。

📌 关键词解析:

  • “相互争夺”:多个进程都想使用同一个资源。
  • “独占型资源”:一次只能被一个进程使用的资源,如打印机、临界区代码、全局变量、数据库连接等。
  • “竞争制约关系”:一个进程的执行会制约另一个进程的执行。

🧱 两个核心控制问题:

  1. 死锁问题(Deadlock)
    • 定义:多个进程互相等待对方释放资源,导致所有进程都无法继续执行。
    • 经典例子:“哲学家就餐问题”——五个哲学家围坐圆桌,每人左右各有一根筷子。他们必须拿到两根筷子才能吃饭。如果每个人都拿起左边的筷子,然后等待右边的筷子,就会陷入死锁。
    • 四个必要条件
      • 互斥条件
      • 请求与保持条件
      • 不剥夺条件
      • 环路等待条件
  2. 饥饿问题(Starvation)
    • 定义:某个进程因为优先级低或资源分配策略不当,长时间得不到所需资源,导致无法执行。
    • 例子:在一个高优先级任务永远不结束的系统中,低优先级任务可能永远得不到 CPU。

协作关系(进程同步)

✅ 核心定义:

进程同步是指为完成共同任务的并发进程基于某个条件来协调其活动,因为需要在某些位置上排定执行的先后次序而等待、传递信号或消息所产生的协作制约关系。

📌 关键词解析:

  • “完成共同任务”:多个进程/线程需要合作才能达成目标。
  • “协调活动”:它们需要按特定顺序执行。
  • “排定执行先后次序”:比如 A 必须在 B 之前执行。
  • “等待、传递信号或消息”:通过同步机制(如信号量、条件变量、管道)实现通信和协调。

🧱 核心思想:

  • “生产者-消费者”模型:生产者生成数据,消费者消费数据,它们必须同步。
  • “读者-写者”模型:读者可以同时读,但写者必须独占。
  • “屏障(Barrier)”:所有进程到达某个点后才能继续执行。

临界区管理

什么是“临界区”?

✅ 核心定义:

并发进程中,与共享变量有关的程序段叫做“临界区”(Critical Section)。

📌 关键词解析:

  • “并发进程”:多个进程/线程同时运行。
  • “共享变量”:多个进程都能访问和修改的变量(如全局变量、数据库连接、文件句柄)。
  • “程序段”:一段代码,比如 counter += 1 这样的操作。

💡 简单说临界区 = 操作共享资源的那一小段代码。

🎯 为什么重要?

因为这段代码如果被多个进程同时执行,会导致竞态条件(Race Condition),产生不可预测的结果。

如何避免错误?—— 互斥访问临界区

如果保证进程在临界区执行时,不让另一个进程进入临界区,即各进程对共享变量的访问是互斥的,就不会造成与时间有关的错误。

这就是“进程互斥”的核心思想。

临界区调度的三个原则(经典!)

这是解决临界区问题的黄金法则,任何同步机制都必须满足这三个条件:

✅ 原则 1:一次至多一个进程能够进入临界区内执行

互斥性(Mutual Exclusion)

  • 这是最基本的要求。任何时候,最多只能有一个进程在临界区内。
  • 如果 A 在临界区,B 就不能进入,必须等待。

✅ 原则 2:如果已有进程在临界区,其他试图进入的进程应等待

忙则等待(Progress)

  • 如果临界区空闲,想进入的进程可以立即进入。
  • 如果临界区被占用,其他进程必须等待,不能“自旋”浪费 CPU(虽然有些实现会自旋,但理想情况下应该阻塞等待)。

✅ 原则 3:进入临界区内的进程应在有限时间内退出,以便让等待进程中的一个进入

有限等待(Bounded Waiting)

  • 防止“饥饿”。不能让某个进程永远等下去。
  • 例如,使用队列来管理等待的进程,确保每个进程最终都能获得进入临界区的机会。

实现临界区管理的软件方法一Peterson方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int turn;           // turn 表示轮到谁进入
boolean flag[2]; // flag[i] 表示进程 i 想进入临界区

// 初始化
flag[0] = flag[1] = false;

Process P0() {
flag[0] = true;
turn = 1; // 谦让给 P1
while (flag[1] && turn == 1); // 等待 P1 退出或谦让
/* critical section */
flag[0] = false;
/* remainder section */
}

Process P1() {
flag[1] = true;
turn = 0; // 谦让给 P0
while (flag[0] && turn == 0); // 等待 P0 退出或谦让
/* critical section */
flag[1] = false;
/* remainder section */
}

✅ 1. 互斥性 (Mutual Exclusion)

定义:一次至多一个进程能进入临界区。

📌 证明思路:

  • 假设 P0 和 P1 同时进入临界区。
  • 那么 flag[0] = trueflag[1] = true
  • 根据算法,P0 在进入前设置了 turn = 1,P1 设置了 turn = 0
  • 由于 turn 只能取值 0 或 1,不可能同时为 0 和 1。
  • 所以,当 P0 检查 while (flag[1] && turn == 1) 时,如果 turn == 0,它就会阻塞。
  • 同理,P1 也会被阻塞。
  • 结论:不可能同时进入。

✅ 2. 空闲让进 (Progress)

定义:如果临界区空闲,且有进程想进入,则该进程应该能进入。

📌 证明思路:

  • 如果 P1 不想进入临界区,则 flag[1] = false
  • 此时,无论 turn 是多少,P0 的 while (flag[1] && turn == 1) 条件都会失败(因为 flag[1]false),所以 P0 可以立即进入临界区。

✅ 3. 有限等待 (Bounded Waiting)

定义:一个进程最多等待另一个进程执行完临界区一次,就能获得进入的机会。

📌 证明思路:

  • 假设 P0 被阻塞,说明 turn = 1flag[1] = true,即 P1 在临界区。
  • 当 P1 执行完临界区后,它会设置 flag[1] = false
  • 此时,如果 P0 还想进入,它的 while 条件会失败,从而进入临界区。
  • 如果 P1 在 flag[1] = false 后又想进入,则它会设置 flag[1] = trueturn = 0
  • 此时,P0 会被阻塞,但 P1 执行完后,P0 就能进入。
  • 结论:P0 最多等待 P1 执行一次临界区,就能进入。

信号量与PV操作

信号量(Semaphore)

✅ 核心定义:

信号量是一种软件资源,用于表示物理资源的实体,是一个与队列有关的整型变量。

📌 关键词解析:

  • “表示物理资源”:比如打印机、数据库连接池、线程池中的可用线程数。
  • “整型变量”:信号量的值代表当前可用资源的数量
  • “与队列有关”:当资源不足时,等待的进程会被放入一个等待队列

P/V 操作:信号量的“原子操作”

✅ 定义:

P (Proberen, 尝试) 和 V (Verhogen, 增加) 是对信号量进行操作的原语。

  • P 操作:尝试获取资源。如果资源可用(信号量 > 0),则减 1;否则,进程进入等待队列。
  • V 操作:释放资源。增加信号量值,并唤醒一个等待的进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// P 操作 (Wait)
void P(semaphore s) {
s.value = s.value - 1;
if (s.value < 0) {
// 资源不足,将当前进程加入等待队列并阻塞
block(current_process);
}
}

// V 操作 (Signal)
void V(semaphore s) {
s.value = s.value + 1;
if (s.value <= 0) {
// 有进程在等待,唤醒一个
wakeup(one_waiting_process);
}
}

⚠️ 关键点:P/V 操作必须是原子操作(Atomic Operation),即在执行过程中不能被中断。否则会导致竞态条件。

哲学家进餐问题

哲学家进餐问题:核心描述

✅ 问题设定:

  • 5 位哲学家 围坐在一张圆桌旁。
  • 每位哲学家面前有一盘意大利面
  • 桌子上有 5 把叉子,每两位哲学家之间放一把。
  • 哲学家的生活只有两件事:
    • 思考(Think):什么都不做。
    • 吃饭(Eat):必须同时拿到左右两边的叉子才能吃。
  • 吃完后,会放下叉子,继续思考。

💡 目标:设计一个算法,让所有哲学家都能吃饱,且不会发生死锁或饥饿。

为什么会出现死锁?

📌 死锁的四个必要条件:

  1. 互斥条件:叉子一次只能被一个人使用。
  2. 请求与保持条件:哲学家拿起一把叉子后,会继续等待另一把。
  3. 不剥夺条件:不能强行从哲学家手中拿走叉子。
  4. 环路等待条件:每位哲学家都在等右边的人放下叉子,形成一个循环等待链。

解决方案:打破死锁的四个条件之一

要避免死锁,只需破坏其中一个必要条件即可。以下是几种经典的解决方案:

✅ 解决方案 1:限制同时就餐的哲学家数量(破坏“环路等待”)

最多允许 4 位哲学家同时吃面。

📌 原理:

  • 如果只有 4 个人尝试拿叉子,那么至少有一把叉子是空闲的。
  • 这样,总会有一个人能拿到两把叉子并吃完,从而释放资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading
import time

# 5 把叉子(信号量)
forks = [threading.Semaphore(1) for _ in range(5)]
# 限制同时就餐人数为 4
dining_room = threading.Semaphore(4)

def philosopher(i):
while True:
think()
dining_room.acquire() # 进入餐厅(最多 4 人)

forks[i].acquire() # 拿起左边叉子
forks[(i + 1) % 5].acquire() # 拿起右边叉子

eat(i)

forks[i].release() # 放下左边叉子
forks[(i + 1) % 5].release() # 放下右边叉子

dining_room.release() # 离开餐厅

def think():
time.sleep(0.1)

def eat(i):
print(f"Philosopher {i} is eating...")
time.sleep(0.1)

# 创建 5 个哲学家线程
threads = []
for i in range(5):
t = threading.Thread(target=philosopher, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

✅ 解决方案 2:奇偶号哲学家取叉子顺序不同(破坏“环路等待”)

奇数号哲学家先取左边叉子,再取右边;偶数号哲学家先取右边叉子,再取左边。

📌 原理:

  • 这样就不会形成环路等待。
  • 例如,哲学家 0(偶数)先拿右边叉子(叉子 1),哲学家 1(奇数)先拿左边叉子(叉子 1)→ 他们争抢同一把叉子,但最终只会有一个成功,另一个等待,从而打破环路。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def philosopher(i):
while True:
think()

if i % 2 == 0: # 偶数号哲学家
forks[(i + 1) % 5].acquire() # 先拿右边叉子
forks[i].acquire() # 再拿左边叉子
else: # 奇数号哲学家
forks[i].acquire() # 先拿左边叉子
forks[(i + 1) % 5].acquire() # 再拿右边叉子

eat(i)

forks[i].release() # 放下左边叉子
forks[(i + 1) % 5].release() # 放下右边叉子

✅ 解决方案 3:拿起两把叉子才开始吃(破坏“请求与保持”)

每位哲学家必须同时拿到两把叉子才能开始吃,否则一把也不拿。

1️⃣ 全局变量定义

1
2
3
4
5
6
7
#define THINKING 0
#define HUNGRY 1
#define EATING 2

semaphore s[5]; // 用于阻塞哲学家的信号量
semaphore mutex = 1; // 互斥锁,保护 state 和 s
int state[5]; // 哲学家的状态
  • s[i] 初始值为 0,因为一开始没有人需要等待。
  • state[i] 初始化为 THINKING

2️⃣ take_fork(int i) 函数

1
2
3
4
5
6
7
void take_fork(int i) {
P(mutex); // 获取互斥锁
state[i] = HUNGRY; // 哲学家 i 变成饥饿状态
test(i); // 尝试让 i 吃饭
V(mutex); // 释放互斥锁
P(s[i]); // 如果 test(i) 没有让 i 吃上饭,这里会阻塞
}

📌 关键点:

  • state[i] = HUNGRY: 告诉“管家”,我饿了。
  • test(i): “管家”检查我是否能吃。
    • 如果能吃,test(i) 会执行 V(s[i]),唤醒我。
    • 如果不能吃,test(i) 不做任何事。
  • P(s[i]): 如果我没被唤醒,我就在这里阻塞,等待邻居放叉子。

这个函数是“非阻塞”的:它只负责声明“我饿了”,然后立即返回。真正的等待发生在 P(s[i])

3️⃣ put_fork(int i) 函数

1
2
3
4
5
6
7
void put_fork(int i) {
P(mutex); // 获取互斥锁
state[i] = THINKING; // 哲学家 i 变成思考状态
test((i + 1) % 5); // 检查右边邻居
test((i + 4) % 5); // 检查左边邻居((i+4)%5 == (i-1)%5)
V(mutex); // 释放互斥锁
}

📌 关键点:

  • state[i] = THINKING: 我吃饱了,不再占用叉子。
  • test((i+1)%5)test((i+4)%5): 告诉“管家”,我的邻居们可能现在可以吃饭了。
    • 例如,哲学家 0 放下叉子后,哲学家 1 和 4 可能现在能拿到两把叉子了。
    • “管家”会检查他们是否处于 HUNGRY 状态,并且邻居都不在吃,如果是,就唤醒他们。

4️⃣ test(int i) 函数 —— 核心逻辑!

1
2
3
4
5
6
7
8
void test(int i) {
if (state[i] == HUNGRY &&
state[(i + 1) % 5] != EATING &&
state[(i + 4) % 5] != EATING) {
state[i] = EATING; // 可以吃了!
V(s[i]); // 唤醒哲学家 i
}
}

📌 关键点:

  • 检查三个条件
    1. state[i] == HUNGRY: 我确实想吃饭。
    2. state[(i+1)%5] != EATING: 我右边的邻居没在吃。
    3. state[(i+4)%5] != EATING: 我左边的邻居没在吃。
  • 如果都满足:说明我现在可以拿到两把叉子!
    • 设置 state[i] = EATING
    • 执行 V(s[i]),唤醒我自己(因为我在 take_forkP(s[i]) 阻塞了)。

这个函数是“原子”的:因为它在 mutex 保护下执行,不会被其他哲学家打断。

生产者消费者问题

mutex 的作用就是:

保护对共享变量(或临界区)的访问只在真正操作这些共享资源的前后“加锁”和“解锁”

(防死锁铁律):

永远不要在持有互斥锁(mutex)的情况下,调用可能阻塞的操作(如 P(empty)、P(full)、sleep、wait 等)。

什么是生产者-消费者问题?

这是一个经典的多线程同步问题,用于模拟现实中的“生产”与“消费”场景:

  • 生产者 (Producer):负责制造数据或产品。
  • 消费者 (Consumer):负责处理或消费这些数据/产品。
  • 缓冲区 (Buffer):一个有限大小的共享空间,用来暂存生产者的产品,供消费者取用。

📌 核心挑战

  1. 互斥 (Mutual Exclusion):多个生产者/消费者不能同时操作缓冲区的同一个位置,否则数据会错乱。
  2. 同步 (Synchronization)
    • 生产者不能在缓冲区满时继续生产(要等待)。
    • 消费者不能在缓冲区空时继续消费(要等待)。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
item B[n];
Semaphore empty; /*可用的空缓冲区个数*/
Semaphore full; /*可用的产品数*/
Semaphore mutex; /*互斥信号量*/
empty = n; full = 0; mutex = 1;
int in = 0; out = 0; /*in为放入缓冲区指针, out为取出缓冲区指针*/

Process producer_i( ) {
while(true) {
item product = produce();
P(empty);
P(mutex);
B[in] = product;
in = (in+1) % n;
V(mutex);
V(full);
}
}

Process consumer_i( ) {
while(true) {
P(full);
P(mutex);
Item product = B[out];
out = (out+1) % n;
V(mutex);
V(empty);
consume(product);
}
}

问题:如果将P操作的顺序交换,会出现什么情况?

生产者霸占着 mutex 锁,等待 empty,消费者等待 mutex 锁,导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sequenceDiagram
participant Producer as 生产者 P1
participant Consumer as 消费者 C1
participant Mutex as 互斥锁 (mutex)
participant Empty as 空闲缓冲区 (empty)

Note over Producer,Consumer: 初始状态: empty=0 (缓冲区满), mutex=1

Producer->>Mutex: P(mutex) // 成功获取锁,mutex=0
Producer->>Empty: P(empty) // empty=0,阻塞!等待空位...
Note over Producer: 生产者 P1 霸占 mutex 锁,等待 empty

Consumer->>Full: P(full) // full=1,成功,full=0
Consumer->>Mutex: P(mutex) // mutex=0,阻塞!等待锁...
Note over Consumer: 消费者 C1 等待 mutex 锁

Note over Producer,Consumer: 💥 死锁!
Note right of Producer: 我要等 empty (需 C1 消费)
Note left of Consumer: 我要等 mutex (需 P1 释放)

问题:当前生产者消费者共用一个互斥锁会造成竞争

1
2
3
4
Semaphore pmutex, cmutex; // 两个独立的互斥锁
...
P(pmutex); // 生产者只锁自己的写入区域
P(cmutex); // 消费者只锁自己的读取区域

优势

  • 生产者之间:仍然需要 pmutex 来互斥,因为多个生产者可能同时想写入 in 指针指向的位置。
  • 消费者之间:仍然需要 cmutex 来互斥,因为多个消费者可能同时想读取 out 指针指向的位置。
  • 生产者 vs 消费者它们可以并行! 只要生产者在写一个位置,消费者在读另一个位置,两者互不干扰,完全可以同时进行。

死锁

死锁产生

什么是死锁

在多进程/多线程系统中,死锁是指两个或多个进程因竞争资源而造成的一种互相等待的现象,若无外力作用,它们都将无法向前推进。

简单说:A 等 B,B 等 C,C 又等 A,大家谁也不让步,结果全都卡住。

死锁的4个必要条件

只要系统发生死锁,以下4个条件必然同时成立。缺一不可!

1️⃣ 互斥访问 (Mutual Exclusion)

  • 定义:系统中存在临界资源,进程应互斥地使用这些资源。
  • 通俗解释:资源一次只能被一个进程使用。比如,打印机、文件、数据库连接、内存中的某个变量等。
  • 为什么是必要条件?如果资源可以被多个进程同时共享(如只读文件),那就不存在竞争,也就不会死锁。

2️⃣ 占有和等待 (Hold and Wait)

  • 定义:进程在请求资源得不到满足而等待时,不释放已占有的资源。
  • 通俗解释:一个进程已经拿着一些资源,但它还需要其他资源才能完成工作,于是它一边等着新资源,一边还紧紧攥着自己手里的旧资源,不肯放手。
  • 为什么是必要条件?如果一个进程在等待新资源时能主动释放旧资源,那么它就不会阻塞别人,死锁也就不会形成。

3️⃣ 不剥夺 (No Preemption)

  • 定义:已被占用的资源只能由属主进程自愿释放,而不允许被其他进程剥夺。
  • 通俗解释:资源一旦被某个进程拿走,除非它自己愿意还回来,否则谁也不能强行抢走。这保证了进程的“自主性”,但也为死锁埋下了隐患。
  • 为什么是必要条件?如果系统能强行剥夺资源(比如操作系统强制回收),那么就可以打破死锁链。

4️⃣ 循环等待 (Circular Wait)

  • 定义:存在循环等待链,每个进程在链中等待下一个进程所持有的资源,造成这组进程处于永远等待状态。
  • 通俗解释:这是一个闭环。A 等 B 的资源,B 等 C 的资源,C 又等 A 的资源,形成了一个“等待环”。
  • 为什么是必要条件?如果没有循环,等待链最终会指向一个“不等待”的进程,这个进程完成后会释放资源,从而解开整个等待链。

死锁防止

image-20251116173459401

死锁避免

银行家算法

image-20251116181426684

死锁检测和解除

资源分配图

image-20251116182605135
  • 阻塞节点 (Blocked Node):一个进程,它正在请求一个或多个资源,但这些资源当前都被其他进程占用,且没有空闲实例可用。它必须等待。
  • 非阻塞节点 (Non-blocked Node):一个进程,它要么没有请求任何资源,要么它请求的资源当前有空闲实例可以立即满足。它可以继续执行。

如何通过资源分配图判断死锁?

✅ 死锁的充分条件(当资源类型只有一个实例时)

如果资源分配图中存在一个环,则系统一定发生死锁。

  • 原因:在一个环中,每个进程都在等待下一个进程所持有的资源,而下一个进程又在等待再下一个……形成一个无限等待的闭环。

⚠️ 当资源类型有多个实例时

环的存在是死锁的必要条件,但不是充分条件。

  • 原因:即使图中有环,但如果环中的某个资源类型有多个实例,那么可能还有空闲实例可以满足某个进程的需求,从而打破死锁。

资源分配图的简化

image-20251116182901353

死锁检测算法

与银行家算法的安全性检测类似

image-20251116184516361

参考资料

操作系统原理 (2025 春季学期)

01 - AI 时代的操作系统课2025 南京大学操作系统原理]_哔哩哔哩_bilibili

为什么使用milvus

非结构化数据(如文本、图像和音频)格式各异,蕴含丰富的潜在语义,因此分析起来极具挑战性。为了处理这种复杂性,Embeddings 被用来将非结构化数据转换成能够捕捉其基本特征的数字向量。然后将这些向量存储在向量数据库中,从而实现快速、可扩展的搜索和分析。

Milvus 提供强大的数据建模功能,使您能够将非结构化或多模式数据组织成结构化的 Collections。它支持多种数据类型,适用于不同的属性模型,包括常见的数字和字符类型、各种向量类型、数组、集合和 JSON,为您节省了维护多个数据库系统的精力。

image-20250908091053654

部署(windows)

在 Docker(Linux)中运行 Milvus | Milvus 文档

1
2
3
4
5
# Download the configuration file and rename it as docker-compose.yml
C:\>Invoke-WebRequest https://github.com/milvus-io/milvus/releases/download/v2.6.0/milvus-standalone-docker-compose.yml -OutFile docker-compose.yml

# Start Milvus
C:\>docker compose up -d

注意设置环境变量DOCKER_VOLUME_DIRECTORY来决定卷映射的路径

容器 镜像 在 Milvus 中的角色 一句话说明
etcd quay.io/coreos/etcd:v3.5.18 元数据与协调中心 负责“记帐”——存索引结构、集合信息、节点心跳等,相当于 Milvus 的“大脑备忘录”。
minio minio/minio:RELEASE.2024-12-18T13-15-44Z 对象存储 负责“存文件”——把向量索引文件、大字段、日志快照等落地成对象,相当于 Milvus 的“硬盘”。
standalone milvusdb/milvus:v2.6.0 计算节点(单机版) 负责“干活”——接受 SDK 请求、做向量检索、构建索引,相当于 Milvus 的“工人”。

安装

1
pip install -U pymilvus

基本概念

数据库

在 Milvus 中,数据库是组织和管理数据的逻辑单元。为了提高数据安全性并实现多租户,你可以创建多个数据库,为不同的应用程序或租户从逻辑上隔离数据。例如,创建一个数据库用于存储用户 A 的数据,另一个数据库用于存储用户 B 的数据。

collections

在 Milvus 上,您可以创建多个 Collections 来管理数据,并将数据作为实体插入到 Collections 中。Collections 和实体类似于关系数据库中的表和记录

Collection 是一个二维表,具有固定的列和变化的行。每列代表一个字段,每行代表一个实体。

下图显示了一个有 8 列和 6 个实体的 Collection。

image-20250908094418551

schema

Schema 定义了 Collections 的数据结构。在创建一个 Collection 之前,你需要设计出它的 Schema。

设计良好的 Schema 至关重要,因为它抽象了数据模型,并决定能否通过搜索实现业务目标。此外,由于插入 Collections 的每一行数据都必须遵循 Schema,因此有助于保持数据的一致性和长期质量。从技术角度看,定义明确的 Schema 会带来组织良好的列数据存储和更简洁的索引结构,从而提升搜索性能。

一个 Collection Schema 有一个主键、最多四个向量字段和几个标量字段。下图说明了如何将文章映射到模式字段列表。

image-20250908094645183

与langchain集成

Milvus | 🦜️🔗 LangChain

使用 Milvus 和 LangChain 的检索增强生成(RAG) | Milvus 文档

本人实现的用于分块后存储入milvus的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
class MilvusStorage:
"""Milvus向量存储管理类

负责将分块后的文档内容存储到Milvus向量数据库中,
支持向量检索和BM25全文检索。
"""

def __init__(self,
embedding_function: Embeddings,
uri: Optional[str] = None,
db_name: Optional[str] = None,
token: Optional[str] = None,
collection_name: Optional[str] = None):
"""初始化Milvus存储客户端

Args:
embedding_function: LangChain embedding模型实例(必需)
uri: Milvus服务地址,默认从环境变量MILVUS_URI获取
db_name: 数据库名称,默认从环境变量MILVUS_DB_NAME获取
token: 认证令牌,默认从环境变量MILVUS_TOKEN获取(可选)
collection_name: 集合名称,默认从环境变量MILVUS_COLLECTION_NAME获取
"""
# 验证必需参数
if not embedding_function:
raise ValueError("embedding_function是必需参数,必须提供LangChain Embeddings实例")

# 从环境变量读取配置,如果参数没有提供的话
self.uri = uri or os.getenv('MILVUS_URI', 'http://localhost:19530')
self.db_name = db_name or os.getenv('MILVUS_DB_NAME', 'rag')
self.token = token or os.getenv('MILVUS_TOKEN') or None
self.collection_name = collection_name or os.getenv('MILVUS_COLLECTION_NAME', 'chunks')

# 设置embedding函数
self.embedding_function = embedding_function

# 初始化LangChain Milvus向量存储
self.vector_store = Milvus(
embedding_function=self.embedding_function,
connection_args={
"uri": self.uri,
"db_name": self.db_name,
"token": self.token
} if self.token else {
"uri": self.uri,
"db_name": self.db_name
},
collection_name=self.collection_name,
index_params={"index_type": "HNSW", "metric_type": "COSINE", "params": {"M": 16, "efConstruction": 200}}
)

def store_chunks(self, chunk_result: ChunkResult) -> Dict[str, Any]:
"""存储分块结果到Milvus

Args:
chunk_result: 分块结果对象

Returns:
Dict: 插入结果,包含插入状态和记录数

Raises:
ValueError: 当向量存储未初始化时
Exception: Milvus操作异常
"""
if not self.vector_store:
raise ValueError("向量存储未初始化")

if not chunk_result.chunks:
return {
"status": "success",
"inserted_count": 0,
"message": "无数据需要插入"
}

try:
# 转换为LangChain Document格式
documents = self._convert_chunks_to_langchain_docs(chunk_result)

# 为每个文档生成UUID作为主键
from uuid import uuid4
uuids = [str(uuid4()) for _ in range(len(documents))]

# 使用LangChain Milvus添加文档,指定IDs
ids = self.vector_store.add_documents(documents=documents, ids=uuids)

return {
"status": "success",
"inserted_count": len(documents),
"document_ids": ids,
"document_name": chunk_result.document_name,
"strategy": chunk_result.strategy.value,
"collection_name": self.collection_name
}

except Exception as e:
raise Exception(f"Milvus插入失败: {str(e)}")

def _convert_chunks_to_langchain_docs(self, chunk_result: ChunkResult) -> List[Document]:
"""为现有Documents添加存储所需的元数据

Args:
chunk_result: 分块结果,chunks已经是Document列表

Returns:
List[Document]: 添加了元数据的Document列表
"""
documents = []

for idx, chunk in enumerate(chunk_result.chunks):
# 创建符合Milvus集合schema的元数据
# 注意:page_content会自动映射到text_content字段
updated_metadata = {
**chunk.metadata, # 保留原有元数据
"document_name": chunk_result.document_name,
"chunk_index": idx,
"chunk_size": len(chunk.page_content)
}

# 创建新Document以避免修改原始数据
# LangChain会自动将page_content映射到Milvus的text_content字段
# embedding字段会由embedding_function自动生成
doc = Document(
page_content=chunk.page_content,
metadata=updated_metadata
)
documents.append(doc)

return documents

def store_chunks_batch(self, chunk_results: List[ChunkResult]) -> Dict[str, Any]:
"""批量存储多个分块结果到Milvus

Args:
chunk_results: 分块结果列表

Returns:
Dict: 批量插入结果

Raises:
ValueError: 当向量存储未初始化时
Exception: Milvus操作异常
"""
if not self.vector_store:
raise ValueError("向量存储未初始化")

if not chunk_results:
return {
"status": "success",
"message": "没有分块结果需要存储",
"total_chunks": 0,
"document_count": 0
}

try:
# 收集所有文档
all_documents = []
total_chunks = 0

for chunk_result in chunk_results:
if chunk_result.chunks:
documents = self._convert_chunks_to_langchain_docs(chunk_result)
all_documents.extend(documents)
total_chunks += len(documents)

if not all_documents:
return {
"status": "success",
"message": "没有文档需要存储",
"total_chunks": 0,
"document_count": len(chunk_results)
}

# 为所有文档生成UUID作为主键
from uuid import uuid4
uuids = [str(uuid4()) for _ in range(len(all_documents))]

# 批量添加所有文档,指定IDs
ids = self.vector_store.add_documents(documents=all_documents, ids=uuids)

return {
"status": "success",
"message": f"成功存储 {len(chunk_results)} 个文档的 {total_chunks} 个分块",
"total_chunks": total_chunks,
"document_count": len(chunk_results),
"ids": ids,
"collection_name": self.collection_name
}

except Exception as e:
raise Exception(f"Milvus批量插入失败: {str(e)}")

def delete_document(self,
document_name: str,
collection_name: Optional[str] = None) -> Dict[str, Any]:
"""删除指定文档的所有chunks

Args:
document_name: 文档名称
collection_name: collection名称

Returns:
Dict: 删除结果
"""
target_collection = collection_name or self.collection_name

try:
if not self.vector_store:
raise ValueError("向量存储未初始化")

# 使用LangChain Milvus删除功能
# 注意:LangChain Milvus可能不支持按元数据过滤删除,这里提供基本实现
return {
"status": "error",
"document_name": document_name,
"error": "LangChain Milvus不支持按文档名删除,请使用其他方式",
"collection_name": target_collection
}

except Exception as e:
return {
"status": "error",
"document_name": document_name,
"error": str(e)
}

def get_document_stats(self,
document_name: Optional[str] = None,
collection_name: Optional[str] = None) -> Dict[str, Any]:
"""获取文档统计信息

Args:
document_name: 文档名称,None则统计所有文档
collection_name: collection名称

Returns:
Dict: 统计信息
"""
target_collection = collection_name or self.collection_name

try:
if not self.vector_store:
raise ValueError("向量存储未初始化")

# 使用LangChain Milvus获取基本信息
# 注意:LangChain Milvus没有直接的统计方法,这里提供基本信息
return {
"status": "success",
"collection_name": target_collection,
"vector_store_type": "LangChain Milvus",
"embedding_function": str(type(self.embedding_function).__name__),
"connection_uri": self.uri,
"database_name": self.db_name,
"message": "详细统计信息需要通过其他方式获取"
}

except Exception as e:
return {
"status": "error",
"error": str(e)
}

基本ANN搜索

近似近邻(ANN)搜索以记录向量嵌入排序顺序的索引文件为基础,根据接收到的搜索请求中携带的查询向量查找向量嵌入子集,将查询向量与子群中的向量进行比较,并返回最相似的结果。

ANN 和 k-Nearest Neighbors (kNN) 搜索是向量相似性搜索的常用方法。在 kNN 搜索中,必须将向量空间中的所有向量与搜索请求中携带的查询向量进行比较,然后找出最相似的向量,这既耗时又耗费资源。

与 kNN 搜索不同,ANN 搜索算法要求提供一个索引文件,记录向量 Embeddings 的排序顺序。当收到搜索请求时,可以使用索引文件作为参考,快速找到可能包含与查询向量最相似的向量嵌入的子组。然后,你可以使用指定的度量类型来测量查询向量与子组中的向量之间的相似度,根据与查询向量的相似度对组成员进行排序,并找出前 K 个组成员。

ANN 搜索依赖于预建索引,搜索吞吐量、内存使用量和搜索正确性可能会因选择的索引类型而不同。您需要在搜索性能和正确性之间取得平衡。

混合检索

多向量混合搜索 | Milvus 文档

image-20250908092720695

让我们考虑一个真实世界的使用案例,其中每个产品都包含文字描述和图片。根据可用数据,我们可以进行三种类型的搜索:

  • 语义文本搜索:这涉及使用密集向量查询产品的文本描述。可以使用BERTTransformers等模型或OpenAI 等服务生成文本嵌入。
  • 全文搜索:在这里,我们使用稀疏向量的关键词匹配来查询产品的文本描述。BM25等算法或BGE-M3SPLADE等稀疏嵌入模型可用于此目的。
  • 多模态图像搜索:这种方法使用带有密集向量的文本查询对图像进行查询。可以使用CLIP 等模型生成图像嵌入。

混合检索的构建流程:

  1. 创建具有多个向量场的 Collections

    • 定义 Collections Schema
    • 配置索引参数
    • 创建 Collections
  2. 插入数据‘

  3. 执行混合搜索

    • 创建多个 AnnSearchRequest 实例

      混合搜索是通过在hybrid_search() 函数中创建多个AnnSearchRequest 来实现的,其中每个AnnSearchRequest 代表一个特定向量场的基本 ANN 搜索请求。因此,在进行混合搜索之前,有必要为每个向量场创建一个AnnSearchRequest

    • 配置 Rerankers 策略

Rerankers 策略

RRF 排序器 | Milvus 文档

要对 ANN 搜索结果集进行合并和重新排序,选择适当的重新排序策略至关重要。Milvus 提供两种重排策略:

  • 加权排名:如果结果需要强调某个向量场,请使用该策略。WeightedRanker 可以为某些向量场赋予更大的权重,使其更加突出。
  • RRFRanker(互易排名融合排名器):在不需要特别强调的情况下选择此策略。RRFRanker 能有效平衡每个向量场的重要性。

加权排名

加权排名器通过为每个搜索路径分配不同的重要性权重,智能地组合来自多个搜索路径的结果并确定其优先级。与技艺高超的厨师平衡多种配料以制作完美菜肴的方式类似,加权排名器也会平衡不同的搜索结果,以提供最相关的综合结果。这种方法非常适合在多个向量场或模式中进行搜索,其中某些场对最终排名的贡献应比其他场更大。

image-20250908092538807

RRFRanker

互惠排名融合(RRF)排名器是 Milvus 混合搜索的一种重新排名策略,它根据多个向量搜索路径的排名位置而不是原始相似度得分来平衡搜索结果。就像体育比赛考虑的是球员的排名而不是个人统计数据一样,RRF Ranker 根据每个项目在不同搜索路径中的排名高低来组合搜索结果,从而创建一个公平、均衡的最终排名。

RRF Ranker 专门设计用于混合搜索场景,在这种场景中,您需要平衡来自多个向量搜索路径的结果,而无需分配明确的重要性权重。

image-20250908092346065

多租户

实施多租户 | Milvus 文档

Milvus 支持四个级别的多租户:数据库CollectionPartitionPartition Key

** 数据库级** Collections 级 分区级 分区 Key 级
数据隔离 物理 物理 物理 物理 + 逻辑
最大租户数 默认为 64 个。您可以通过修改 Milvus.yaml 配置文件中的maxDatabaseNum 参数来增加租户数。 默认为 65,536。可以通过修改 Milvus.yaml 配置文件中的maxCollectionNum 参数来增加。 每个 Collection 最多 1,024 个。 百万
数据 Schema 灵活性
RBAC 支持 支持 支持 支持 不支持
搜索性能 中等 中等
跨租户搜索支持 不支持 支持 支持
支持有效处理冷热数据 支持 否 目前不支持 Partition Key 级策略。

比较有意思的教程

多模态rag用 Milvus 制作多模态 RAG | Milvus 文档

使用 Milvus 进行文本到图像搜索 | Milvus 文档

使用 Milvus 搜索图像 | Milvus 文档

convertapi

ConvertAPI: Powerful File Conversion API for Developers & Businesses

档次 官方名称 月费 (CNY) 每月包含转换次数 单文件上限 并发任务数
1 Developer ¥249 1,000 次 200 MB 1
2 Startup ¥677 5,000 次 300 MB 2
3 Growth ¥1,247 15,000 次 500 MB 3
4 Business ¥2,495 50,000 次 1 GB 无限制

CloudConvert

模式 价格 包含内容
一次性购买积分 $9 美元 500 个转换积分
月度订阅 $9 美元/月 每月 1000 个转换积分,未用完可滚存

每天免费 10 次转换

pdf转ppt,一次要花费4积分,平均下来一份需要0.47元,付费情况下可以做到 5个并发任务

定价 |云转换

PDF to PowerPoint | CloudConvert

GroupDocs.Conversion/Aspose.PDF Cloud

每个月1000次以内的api调用是30美金,平均下来是一份0.21元,但是如果超过1000次每个月,就要0.09美金一次转换

付费默认5并发

https://products.groupdocs.cloud/conversion/python/pdf-to-ppt/

Dashboard

Dashboard

Pricing Guide - Purchase - groupdocs.cloud

Adobe PDF

每个月五百次的免费转换

Adobe PDF Services API Pricing | PDF Embed API Pricing | Adobe Acrobat Services Pricing - Adobe Developers

度慧科技

这个很便宜,我在腾讯云上看,300r可以买五千次,500r可以买5万次转换。阿里云,100r可以买3000次,有效期一个月

并发数为200

度慧文档转换

[度慧]PDF转Word,PPT,Excel,TXT,OFD(OCR高级版)-腾讯云市场

【度慧文档转换】PDF转Word/PPT/Excel/TXT/OFD - 支持扫描版OCR【最新版】_数据API_OCR_API-_云市场-阿里云

技术路线

方案 A:html → PDF → pptx(我尝试下来不大可行,无法再进行编辑了)

方案 B:html → PPTXGenJS

方案 C:html → python-pptx (主流)

方案 D:markdown → Slidev

我尝试了当前市面的ppt生成产品,在网页里面展示还会有良好的动画效果,但是一旦导出成pptx,都是会变成静态页面,没有动画

reveal.js可以利用页面生成丰富的ppt动画效果;Slidev可以将markdown语法转化成ppt,可以导出为pdf或pptx,需要注意的是,PPTX 文件中的所有幻灯片都会被导出为图片。

PPTXGenJS和python-pptx的原理基本一致,区别一个是使用js一个是python,使用方法都是通过解析HTML标签内容,定义一个ppt实例,将html的内容一点点加入这个示例中,最后导出pptx。这样都仅能实现最基本的ppt演示,不会有复杂的结构,而且经常会出现一个问题——某个标签内文字太多往往会超出ppt演示范围

直接让AI来生成非常自由的PPT,最终的效果一般来说都比较烂,大部分都是预定义一个html模板,然后让AI来自动的选择模板往里面填充内容

相关工具

Slidev 是一个为开发者设计的基于 Web 的幻灯片制作工具。它帮助您以 Markdown 的形式专注于编写幻灯片的内容,并制作出具有交互式演示功能的、高度可自定义的幻灯片。

reveal.js 是一个开源的 HTML 演示框架,用 JavaScript 写成。只要你会写 HTML/CSS/JS,就可以像做网页一样做出 酷炫、响应式、支持键盘/鼠标/触控交互 的幻灯片。

PptxGenJS 允许您使用 JavaScript 生成专业的 PowerPoint 演示文稿——直接从 Node、React、Vite、Electron,甚至浏览器中生成。

python-pptx 是一个用于创建、读取和更新 PowerPoint (.pptx)文件的 Python 库。典型的使用场景是从动态内容(如数据库查询、分析输出或 JSON 负载)生成 PowerPoint 演示文稿,可能是在响应 HTTP 请求时生成 PPTX 文件并下载。

python-pptx使用方式:根据标签解析html文件,如h1,div等,然后一点点添加到定义的页中

市面同类产品

  1. Genspark
    • Genspark 是 MainFunc 公司(由前小度 CEO 景鲲和前小度 CTO 朱凯华联合创立)推出的 AI Agent 搜索引擎(或称“AI 原生搜索引擎”)。
  2. skywork
    • Skywork 是昆仑万维(Kunlun Inc.)旗下 SkyWork AI 推出的一系列 开源大模型AI 技术品牌
  3. manus
  4. Gamma 是一个 “AI 驱动的在线内容工作站”:输入一句话、一段大纲或任何资料,它就能在 1-3 分钟内 帮你生成 高颜值、品牌化、可互动 的演示文稿、网站、社媒图文或 PDF,并可一键导出为 PPT / Google Slides / PDF / 网站链接

manus是每页ppt都是一个html文件,我猜测应该是使用像python-pptx的库生成

ppt-mcp

可以参考其中的工具实现,这两个我看下来都是使用python-pptx包

GongRzhe/Office-PowerPoint-MCP-Server: A MCP (Model Context Protocol) server for PowerPoint manipulation using python-pptx. This server provides tools for creating, editing, and manipulating PowerPoint presentations through the MCP protocol.

ltc6539/mcp-ppt: A mcp server supporting you to generate powerpoint using LLM and natural language automatically.

架构思考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
flowchart TB
subgraph "Plan-and-Execute阶段"
A["用户输入"] --> B["Planner Agent"]
B --> C["Agent Executor"]
C --> D["Replanner"]
D -->|"需要更多信息"| C
D -->|"信息充足"| E["输出结构化信息"]
end

subgraph "内容生成阶段"
E --> F["大纲设计节点"]
F --> G["页面内容生成节点"]
G --> H["HTML代码生成节点"]
end

subgraph "文件转换阶段"
H --> I["html演示生成"]
I --> J["转换pptx节点"]
J --> K["输出PPTX文件"]
end

利用APRYSE将pdf转成pptx

Apryse(曾用名 PDFTron)是一家加拿大公司推出的商用 SDK 家族,专注 “任何格式进、任何格式出” 的文档处理。

获取apikeyFree trial key for Apryse SDK | Apryse documentation

Python 3.X PDF Library for Windows, Linux and Mac | Apryse documentation

安装 Apryse SDK 的“结构化输出模块”(Structured Output Module)。该模块是一个可选的扩展包,PDF → PPTX、PDF → Word 等高级转换功能都依赖它。库插件:OCR、CAD 转 PDF - 适用于服务器/桌面 SDK | Apryse 文档 — Library Add-ons: OCR, CAD to PDF - for Server/Desktop SDK | Apryse documentation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from apryse_sdk import PDFNet, PDFDoc, Convert, StructuredOutputModule

# 1. 初始化(许可证)
PDFNet.Initialize("demo:1756369085114:")

# 2. 告诉 SDK 模块放在哪里
PDFNet.AddResourceSearchPath(r"F:\project python\test\StructuredOutputWindows\Lib\Windows")

# 3. 可选:确认模块已就位
if not StructuredOutputModule.IsModuleAvailable():
raise RuntimeError("StructuredOutput module not found!")

# 4. 正常调用
doc = PDFDoc("input.pdf")
Convert.ToPowerPoint(doc, "output.pptx")

Overview

参考资料

动手实现一个做PPT的MCP服务器_哔哩哔哩_bilibili

前言

这个agentic rag主要是作用于检索部分,由是否需要调用检索工具判定是否进入检索阶段,当检索到相关的文章,则进行回答,否则对问题进行改写,再次检索

代码见learn-rag-langchain/agentic-rag at main · zxj-2023/learn-rag-langchain

在这个教程中,我们将构建一个检索代理。当您希望 LLM 决定是否从向量存储中检索上下文或直接响应用户时,检索代理非常有用。

完成教程后,我们将完成以下工作:

  1. 获取并预处理用于检索的文档。
  2. 为这些文档建立语义索引,并为代理创建一个检索工具。
  3. 构建一个能够决定何时使用检索工具的代理式 RAG 系统。
image-20250819165309335

1. 预处理文档

获取用于我们 RAG 系统的文档。我们将使用 Lilian Weng 优秀博客中最新的三页。我们将从使用 WebBaseLoader 工具获取页面内容开始:

1
2
3
4
5
6
7
8
9
from langchain_community.document_loaders import WebBaseLoader

urls = [
"https://lilianweng.github.io/posts/2024-11-28-reward-hacking/",
"https://lilianweng.github.io/posts/2024-07-07-hallucination/",
"https://lilianweng.github.io/posts/2024-04-12-diffusion-video/",
]

docs = [WebBaseLoader(url).load() for url in urls]
1
docs[0][0].page_content.strip()[:1000]

将获取的文档分割成更小的块,以便索引到我们的向量存储中:

1
2
3
4
5
6
7
8
from langchain_text_splitters import RecursiveCharacterTextSplitter

docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=100, chunk_overlap=50
)
doc_splits = text_splitter.split_documents(docs_list)
1
doc_splits[0].page_content.strip()

2. 创建检索工具

现在我们已经有了分割的文档,我们可以将它们索引到一个向量存储中,我们将使用这个向量存储进行语义搜索。

使用内存向量存储和 OpenAI 嵌入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_chroma import Chroma  # 导入 Chroma
from langchain_openai import OpenAIEmbeddings
import os

# 确保安装了 langchain-chroma
# pip install langchain-chroma

embedding = OpenAIEmbeddings(
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="text-embedding-v4",
check_embedding_ctx_length=False,
dimensions=1536,
chunk_size=5 # 设置较小的批次大小
)

# 使用 Chroma 替代 InMemoryVectorStore
vectorstore = Chroma.from_documents(
documents=doc_splits,
embedding=embedding,
persist_directory="./chroma_db" # 指定持久化目录
)
1
2
3
4
5
6
7
# 重新加载已存在的 Chroma 数据库
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embedding
)

retriever = vectorstore.as_retriever()

使用 LangChain 的预构建 create_retriever_tool 创建检索工具

1
2
3
4
5
6
7
from langchain.tools.retriever import create_retriever_tool

retriever_tool = create_retriever_tool(
retriever, # 【输入】一个已经配置好的检索器(例如:向量数据库的检索器)
"retrieve_blog_posts", # 【工具名称】这个工具的唯一标识名(供模型内部调用)
"Search and return information about Lilian Weng blog posts." # 【工具描述】模型看到的说明,用于决定是否调用它
)
1
retriever_tool.invoke({"query": "types of reward hacking"})

3. 生成查询

现在我们将开始构建我们智能体 RAG 图中的组件(节点和边)。

构建一个 generate_query_or_respond 节点。它将调用 LLM 来根据当前图状态(消息列表)生成响应。根据输入消息,它将决定使用检索工具进行检索,或直接响应用户。请注意,我们通过 .bind_tools 向聊天模型提供了先前创建的 retriever_tool 访问权限:

1
2
3
4
5
6
7
from langchain_community.chat_models import ChatTongyi
llm = ChatTongyi(
model="qwen3-235b-a22b",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model_kwargs={"enable_thinking": False} # 关键在这里
)
1
2
3
4
5
6
7
8
9
10
from langgraph.graph import MessagesState

def generate_query_or_respond(state: MessagesState):
"""调用模型,根据当前状态生成响应。根据问题,模型将决定是使用检索工具进行检索,还是直接回复用户。
"""
response = (
llm
.bind_tools([retriever_tool]).invoke(state["messages"])
)
return {"messages": [response]}

提出一个需要语义搜索的问题:

1
2
3
4
5
6
7
8
9
input = {
"messages": [
{
"role": "user",
"content": "What does Lilian Weng say about types of reward hacking?",
}
]
}
generate_query_or_respond(input)["messages"][-1].pretty_print()

4.评定文件

添加一个条件边 — grade_documents — 来判断检索到的文档是否与问题相关。

我们将使用一个具有结构化输出模式 GradeDocuments 的模型来对文档进行评分。 grade_documents 函数将根据评分决策( generate_answerrewrite_question )返回要前往的节点的名称:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pydantic import BaseModel, Field
from typing import Literal

# 定义评分提示模板
GRADE_PROMPT = (
"你是一个评分员,负责评估检索到的文档与用户问题的相关性。\n "
"以下是检索到的文档内容:\n\n {context} \n\n"
"以下是用户的问题:{question} \n"
"如果文档包含与用户问题相关的关键词或语义含义,则将其评为相关。\n"
"请给出一个二元评分:'yes'(是)表示相关,'no'(否)表示不相关。"
)

# 定义用于评估文档相关性的 Pydantic 模型
class GradeDocuments(BaseModel):
"""使用二元评分对文档进行相关性评估。"""

binary_score: str = Field(
description="相关性评分:'yes' 表示相关,'no' 表示不相关"
)

# 初始化用于评分的聊天模型
grader_model = llm

def grade_documents(
state: MessagesState,
) -> Literal["generate_answer", "rewrite_question"]:
"""
判断检索到的文档是否与用户问题相关。

参数:
state: 包含消息历史的状态对象,其中第一条消息是用户问题,
最后一条消息是检索到的文档内容。

返回:
如果文档相关,返回 "generate_answer";
如果不相关,返回 "rewrite_question",表示需要重写问题并重新检索。
"""
question = state["messages"][0].content # 获取用户问题
context = state["messages"][-1].content # 获取检索到的文档内容

# 将问题和文档内容填入提示模板
prompt = GRADE_PROMPT.format(question=question, context=context)

# 调用模型,并以结构化输出(Pydantic 模型)的形式获取评分结果
response = (
grader_model
.with_structured_output(GradeDocuments)
.invoke([{"role": "user", "content": prompt}])
)
#print(response)
score = response.binary_score # 获取二元评分结果

# 根据评分决定下一步操作
if score == "yes":
return "generate_answer" # 文档相关,生成答案
else:
return "rewrite_question" # 文档不相关,重写问题后重新检索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.messages import convert_to_messages

input = {
"messages": convert_to_messages(#将一系列消息转换为 BaseMessage 类型的消息列表。
[
{
"role": "user",
"content": "What does Lilian Weng say about types of reward hacking?",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "1",
"name": "retrieve_blog_posts",
"args": {"query": "types of reward hacking"},
}
],
},
{"role": "tool", "content": "meow", "tool_call_id": "1"},
]
)
}
grade_documents(input)

5. 重写问题

构建 rewrite_question 节点。

检索工具可能会返回潜在的不相关文档,这表明需要改进原始用户问题。为此,我们将调用 rewrite_question 节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
REWRITE_PROMPT = (
"Look at the input and try to reason about the underlying semantic intent / meaning.\n"
"Here is the initial question:"
"\n ------- \n"
"{question}"
"\n ------- \n"
"Formulate an improved question:"
)

def rewrite_question(state: MessagesState):
"""
重写用户最初的提问,以更好地表达其语义意图。

参数:
state: 包含消息历史的状态对象,其中第一条消息是用户原始问题。

返回:
一个字典,包含一条新的用户消息,内容为改写后的问题。
该消息将用于后续的检索步骤,以提高检索结果的相关性。
"""
messages = state["messages"]
question = messages[0].content # 获取用户最初的提问
prompt = REWRITE_PROMPT.format(question=question) # 将问题填入提示模板
response = llm.invoke([{"role": "user", "content": prompt}]) # 调用模型生成改写后的问题

# 返回新的消息结构,内容为改写后的问题
return {"messages": [{"role": "user", "content": response.content}]}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input = {
"messages": convert_to_messages(
[
{
"role": "user",
"content": "What does Lilian Weng say about types of reward hacking?",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "1",
"name": "retrieve_blog_posts",
"args": {"query": "types of reward hacking"},
}
],
},
{"role": "tool", "content": "meow", "tool_call_id": "1"},
]
)
}

response = rewrite_question(input)
print(response["messages"][-1]["content"])

6. 生成答案

构建 generate_answer 节点:如果我们通过了评分器的检查,我们可以根据原始问题和检索到的上下文生成最终答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GENERATE_PROMPT = (
"You are an assistant for question-answering tasks. "
"Use the following pieces of retrieved context to answer the question. "
"If you don't know the answer, just say that you don't know. "
"Use three sentences maximum and keep the answer concise.\n"
"Question: {question} \n"
"Context: {context}"
)


def generate_answer(state: MessagesState):
"""Generate an answer."""
question = state["messages"][0].content
context = state["messages"][-1].content
prompt = GENERATE_PROMPT.format(question=question, context=context)
response = llm.invoke([{"role": "user", "content": prompt}])
return {"messages": [response]}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
input = {
"messages": convert_to_messages(
[
{
"role": "user",
"content": "What does Lilian Weng say about types of reward hacking?",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "1",
"name": "retrieve_blog_posts",
"args": {"query": "types of reward hacking"},
}
],
},
{
"role": "tool",
"content": "reward hacking can be categorized into two types: environment or goal misspecification, and reward tampering",
"tool_call_id": "1",
},
]
)
}

response = generate_answer(input)
response["messages"][-1].pretty_print()

7. 组装图表

generate_query_or_respond 开头,并确定是否需要调用 retriever_tool

使用 tools_condition 跳转到下一步:

  • 如果 generate_query_or_respond 返回 tool_calls ,调用 retriever_tool 获取上下文
  • 否则,直接回复用户

对检索到的文档内容按与问题的相关性( grade_documents )进行评分,并路由到下一步:

  • 如果不相关,使用 rewrite_question 重写问题,然后再次调用 generate_query_or_respond
  • 如果相关,请继续到 generate_answer 并使用检索到的文档上下文生成最终响应 ToolMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition

# 创建一个基于状态图(StateGraph)的流程,用于管理对话或任务的执行流程
workflow = StateGraph(MessagesState)

# 定义流程中将循环执行的各个节点
workflow.add_node(generate_query_or_respond) # 判断是生成检索查询还是直接回复用户
workflow.add_node("retrieve", ToolNode([retriever_tool])) # 检索节点:使用检索工具(retriever_tool)从知识库中查找相关文档
workflow.add_node(rewrite_question) # 重写问题节点:当检索结果不相关时,优化并重写用户的问题
workflow.add_node(generate_answer) # 生成答案节点:基于检索到的信息生成最终回答

# 设置流程的起始点:从 `generate_query_or_respond` 节点开始
workflow.add_edge(START, "generate_query_or_respond")

# 添加条件边:决定是否进行文档检索
workflow.add_conditional_edges(
"generate_query_or_respond",
# 使用 `tools_condition` 函数判断 LLM 的输出意图:
# 如果 LLM 决定调用 `retriever_tool` 工具,则进入检索;如果选择直接回复,则结束流程
tools_condition,
{
# 将条件判断结果映射到图中的具体节点
"tools": "retrieve", # 若需调用工具,则跳转到检索节点
END: END # 若无需调用工具(即可以直接回答),则结束流程
},
)

# 在 `retrieve` 节点执行后,根据文档相关性判断下一步操作
workflow.add_conditional_edges(
"retrieve",
# 调用 `grade_documents` 函数评估检索到的文档是否与问题相关
grade_documents,
# 根据评分结果决定流向:
# - 如果相关,进入 `generate_answer`
# - 如果不相关,进入 `rewrite_question`
# (该逻辑在 `grade_documents` 函数中返回 "generate_answer" 或 "rewrite_question")
)

# 添加固定边:生成答案后流程结束
workflow.add_edge("generate_answer", END)

# 重写问题后,回到初始节点重新判断是否需要检索
workflow.add_edge("rewrite_question", "generate_query_or_respond")

# 编译整个工作流,生成可执行的图结构
graph = workflow.compile()
image-20250826170834571

参考资料

《Agentic RAG》 — Agentic RAG

前言

代码见learn-rag-langchain/multi-agent at main · zxj-2023/learn-rag-langchain

什么是多智能体

当我们谈论”多智能体”时,我们指的是由llm驱动的多个独立的agent以特定方式连接在一起。

每个agent可以拥有自己的提示、LLM、工具和其他自定义代码,以最佳方式与其他智能体协作。

这种思维方式非常适合用图来表示,就像 langgraph 所提供的那样。在这种方法中,每个智能体都是图中的一个节点,而它们之间的连接则表示为一条边控制流由边管理,它们通过向图的状态中添加信息来进行通信

多智能体架构梳理

langgraph给我们提供了几种多智能体架构

image-20250818163359517

Network: 每个智能体可以与其他所有智能体通信。任何智能体都可以决定下一步调用哪个其他智能体。

Multi-agent network

Supervisor:每个智能体与一个单一的监督者智能体通信。监督者智能体决定下一步应该调用哪个智能体。

代理监督者 — Agent Supervisor

Hierarchical: 你可以定义一个具有监督者监督者的多代理系统。这是监督者架构的泛化,并允许更复杂的控制流程。

层级代理团队 — Hierarchical Agent Teams

Custom multi-agent workflow: 每个代理只与代理子集通信。流程的部分是确定的,只有一些代理可以决定下一步调用哪些其他代理。

Agent Supervisor

在本教程中,你将构建一个包含两个代理的监督者系统——一个研究专家和一个数学专家。

环境

1
pip install -U langgraph langgraph-supervisor langchain-tavily "langchain[openai]"

1. 创建工作代理

首先,让我们创建我们的专业工作代理——研究代理和数学代理:

研究代理

对于网络搜索,我们将使用 TavilySearch 工具来自 langchain-tavily :

1
2
3
4
5
6
from langchain_tavily import TavilySearch

web_search = TavilySearch(max_results=3,tavily_api_key="tvly-dev-")
web_search_results = web_search.invoke("南京在哪")

print(web_search_results["results"][0]["content"])

为了创建单个工作代理,我们将使用 LangGraph 的预构建代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

llm=ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

research_agent = create_react_agent(
model=llm,
tools=[web_search],
prompt=(
"你是一个研究代理。\n\n指令:\n- 仅协助与研究相关的任务,不得进行任何数学计算\n- 完成任务后,直接向主管回复\n- 仅回复你的工作结果,不得包含任何其他文字。"
),
name="research_agent",
)

让我们运行代理来验证它的行为是否符合预期。我们将使用 pretty_print_messages 辅助工具来美观地渲染流式代理输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from langchain_core.messages import convert_to_messages


def pretty_print_message(message, indent=False):
"""
美化打印单条消息

Args:
message: 要打印的消息对象
indent: 是否需要缩进打印
"""
# 将消息转换为美观的HTML格式表示
pretty_message = message.pretty_repr(html=True)
if not indent:
# 如果不需要缩进,直接打印
print(pretty_message)
return

# 如果需要缩进,为每一行添加制表符前缀
indented = "\n".join("\t" + c for c in pretty_message.split("\n"))
print(indented)


def pretty_print_messages(update, last_message=False):
"""
美化打印消息更新

Args:
update: 包含消息更新的数据结构
last_message: 是否只打印最后一条消息
"""
is_subgraph = False # 标记是否为子图更新

# 检查更新是否为元组格式(包含命名空间信息)
if isinstance(update, tuple):
ns, update = update
# 如果命名空间为空,跳过父图更新的打印
if len(ns) == 0:
return

# 提取图ID并打印子图更新信息
graph_id = ns[-1].split(":")[0]
print(f"来自子图 {graph_id} 的更新:")
print("\n")
is_subgraph = True

# 遍历每个节点的更新
for node_name, node_update in update.items():
# 构造更新标签
update_label = f"来自节点 {node_name} 的更新:"
if is_subgraph:
# 如果是子图,添加缩进
update_label = "\t" + update_label

print(update_label)
print("\n")

# 将节点更新中的消息转换为消息对象列表
messages = convert_to_messages(node_update["messages"])
# 如果只要求最后一条消息,则截取最后一条
if last_message:
messages = messages[-1:]

# 打印每条消息
for m in messages:
pretty_print_message(m, indent=is_subgraph)
print("\n")
1
2
3
4
for chunk in research_agent.stream(
{"messages": [{"role": "user", "content": "南京在哪?"}]}
):
pretty_print_messages(chunk)
数学代理

对于数学代理工具,我们将使用纯 Python 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def add(a: float, b: float):
"""将两个数字相加。"""
return a + b


def multiply(a: float, b: float):
"""将两个数字相乘。"""
return a * b


def divide(a: float, b: float):
"""将两个数字相除。"""
return a / b


math_agent = create_react_agent(
model=llm,
tools=[add, multiply, divide],
prompt=(
"你是一个数学代理。\n\n"
"指令:\n"
"- 仅协助处理数学相关任务\n"
"- 完成任务后,直接回复给主管\n"
"- 仅回复你的工作结果,不要包含任何其他文字。"
),
name="math_agent",
)

让我们运行数学代理:

1
2
3
4
for chunk in math_agent.stream(
{"messages": [{"role": "user", "content": "what's (3 + 5) x 7"}]}
):
pretty_print_messages(chunk)

2.创建监督者 langgraph-supervisor

为了实现我们的多智能体系统,我们将使用预构建的 langgraph-supervisor 库中的 create_supervisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph_supervisor import create_supervisor
from langchain.chat_models import init_chat_model

supervisor = create_supervisor(
model=llm,
agents=[research_agent, math_agent],
prompt=(
"你是一个管理两个代理的主管:\n"
"- 一个研究代理。将研究相关任务分配给这个代理\n"
"- 一个数学代理。将数学相关任务分配给这个代理\n"
"一次只分配工作给一个代理,不要并行调用代理。\n"
"不要自己做任何工作。"
),
add_handoff_back_messages=True,
output_mode="full_history",
).compile()
1
2
3
from IPython.display import display, Image

display(Image(supervisor.get_graph().draw_mermaid_png()))
image-20250819113009108

现在让我们用一个需要两个代理的查询来运行它:

研究代理将查找必要的 GDP 信息;数学代理将执行除法以找到纽约州 GDP 的百分比,如所请求

3.从头创建监督者

现在让我们从头实现这个多智能体系统。我们需要:

  1. 设置主管如何与各个代理进行沟通
  2. 创建监督代理
  3. 将监督代理和工作代理组合成一个多代理图。
设置代理通信

我们需要定义一种方式,让监督代理能够与工作代理进行通信。在多代理架构中,实现这一功能的一种常见方法是使用handoffs,即一个代理将控制权交给另一个代理。交接允许你指定:

  • destination:要转移到的目标代理
  • payload:要传递给该智能体的信息

我们将通过handoff tools(转接工具)实现转接,并将这些工具交给监督代理:当监督代理调用这些工具时,它将控制权转交给工作代理,并将完整消息历史传递给该代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from typing import Annotated
from langchain_core.tools import tool, InjectedToolCallId
from langgraph.prebuilt import InjectedState
from langgraph.graph import StateGraph, START, MessagesState
from langgraph.types import Command


def create_handoff_tool(*, agent_name: str, description: str | None = None):
"""
创建一个“交接”工具函数,用于在 LangGraph 的 Supervisor-Worker 架构中
把当前对话状态移交给指定名称的子 Agent。

参数
----
agent_name : str
目标子 Agent 的名称,必须与 Supervisor 图中注册的节点名一致。
description : str | None
工具的描述文本。如果为 None,则使用默认描述 "Ask {agent_name} for help."。

返回
----
handoff_tool : Callable
一个已用 @tool 装饰的函数,可直接注入到 Supervisor 的工具列表。
"""
# 动态生成工具名,例如 agent_name="math_agent" -> "transfer_to_math_agent"
name = f"transfer_to_{agent_name}"

# 如果调用者没有提供描述,则使用默认描述
description = description or f"Ask {agent_name} for help."

# 用 LangGraph 的 @tool 装饰器注册工具
@tool(name, description=description)
def handoff_tool(
state: Annotated[MessagesState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command:
"""
实际执行交接逻辑的工具函数。

参数
----
state : MessagesState
当前对话状态,由 LangGraph 注入。
tool_call_id : str
本次工具调用的唯一 ID,由 LangGraph 注入。

返回
----
Command
一个 LangGraph Command 对象,告诉框架:
- goto=agent_name : 跳转到哪个子 Agent
- update : 更新后的状态
- graph=Command.PARENT : 在父图(Supervisor)作用域内执行
"""
# 构造一条工具消息,记录交接动作
tool_message = {
"role": "tool",
"content": f"Successfully transferred to {agent_name}",
"name": name,
"tool_call_id": tool_call_id,
}

# 使用 Command 把对话状态连同新消息一起发送到目标 Agent
return Command(
goto=agent_name,
update={**state, "messages": state["messages"] + [tool_message]},
graph=Command.PARENT,
)

# 返回已装饰的工具函数,供 Supervisor 添加进 tools 列表
return handoff_tool


# 创建研究代理的交接工具
assign_to_research_agent = create_handoff_tool(
agent_name="research_agent",
description="Assign task to a researcher agent.",
)

# 创建数学代理的交接工具
assign_to_math_agent = create_handoff_tool(
agent_name="math_agent",
description="Assign task to a math agent.",
)
创建监督代理

然后,我们使用刚刚定义的交接工具来创建监督代理。我们将使用预构建的 create_react_agent :

1
2
3
4
5
6
7
8
9
10
11
12
supervisor_agent = create_react_agent(
model=llm,
tools=[assign_to_research_agent, assign_to_math_agent],
prompt=(
"你是一个管理两个代理的主管:\n"
"- 一个研究代理。将研究相关任务分配给这个代理\n"
"- 一个数学代理。将数学相关任务分配给这个代理\n"
"一次只分配工作给一个代理,不要并行调用代理。\n"
"不要自己做任何工作。"
),
name="supervisor",
)
创建多智能体图

将这些内容整合起来,让我们为我们的整体多代理系统创建一个图。我们将添加监督代理和各个代理作为子图节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langgraph.graph import END

# 定义多代理主管图
supervisor = (
StateGraph(MessagesState)
# 注意:`destinations` 仅用于可视化,不影响运行时行为
.add_node(supervisor_agent, destinations=("research_agent", "math_agent", END))
.add_node(research_agent)
.add_node(math_agent)
.add_edge(START, "supervisor")
# 总是返回到主管
.add_edge("research_agent", "supervisor")
.add_edge("math_agent", "supervisor")
.compile()
)

在这个代码中,去 research_agentmath_agent 的条件边是通过工具调用实现的,而不是显式的条件边。

工作机制:

  1. 工具作为交接手段

    • assign_to_research_agentassign_to_math_agent 这两个工具被添加到 supervisor_agent
    • 当 supervisor_agent 决定需要某个代理帮助时,它会调用相应的工具
  2. 工具内部实现交接

    1
    2
    3
    4
    5
    6
    def handoff_tool(...) -> Command:
    return Command(
    goto=agent_name, # 这里指定了要跳转到哪个代理
    update={...},
    graph=Command.PARENT,
    )
  3. 隐式的条件边

    • 当 supervisor_agent 调用 assign_to_research_agent 工具时 → 自动跳转到 research_agent
    • 当 supervisor_agent 调用 assign_to_math_agent 工具时 → 自动跳转到 math_agent

什么是 Command 机制

Command 机制是 LangGraph 提供的一种显式控制流程跳转的方式。它允许工具或节点直接指定下一步要执行什么操作,而不需要通过传统的条件边路由。

Command 的核心概念

1
2
3
4
5
6
7
from langgraph.types import Command

Command(
goto=agent_name, # 要跳转到的目标节点
update=state_update, # 要更新的状态
graph=Command.PARENT # 在哪个图中执行(父图/子图)
)

请注意,我们已经从工作代理添加了明确的边回到主管——这意味着它们保证会将控制权返回给主管。如果你希望代理直接响应用户(即,将系统转变为路由器),你可以移除这些边。

Multi-agent network

一个单一智能体通常可以使用单个领域内的一小批工具来有效运作,但即使使用像 gpt-4 这样强大的模型,使用多个工具时也可能效果不佳。

处理复杂任务的一种方法是采用“分而治之”的方法:为每个任务或领域创建一个专门的智能体,并将任务路由到正确的“专家”。这是一个多智能体网络架构的例子。

image-20250819150039818

这个多agent架构,就像多个agent进行讨论,所以也叫Multi Agent Collaboration,但是给我的感觉,比较混乱,agent直接的路由很难去定义,agent一多就搞不清楚了,所以这里也不实战了。

Hierarchical Agent Teams

对于某些应用,如果工作按层次分布,系统可能会更有效。你可以通过组合不同的子图,并创建一个顶层监督者以及中层监督者来实现这一点。

image-20250819151813264

使用预设的supervisor构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 1. 定义研究团队的代理
@tool
def web_search(query: str) -> str:
"""执行网络搜索"""
return f"搜索结果:关于'{query}'的最新信息..."

@tool
def analyze_data(data: str) -> str:
"""分析数据"""
return f"数据分析结果:{data}的趋势显示..."

research_agent = create_react_agent(
model=llm,
tools=[web_search, analyze_data],
prompt="你是一个研究专家,负责进行网络搜索和数据分析。",
name="research_specialist"
)

# 2. 定义数学团队的代理
@tool
def calculate_statistics(numbers: list[float]) -> str:
"""计算统计值"""
if not numbers:
return "错误:数据列表为空"
avg = sum(numbers) / len(numbers)
return f"统计结果:平均值={avg:.2f},数据点数量={len(numbers)}"

@tool
def solve_equation(equation: str) -> str:
"""解方程"""
return f"方程 {equation} 的解为:x = 42"

math_agent = create_react_agent(
model=llm,
tools=[calculate_statistics, solve_equation],
prompt="你是一个数学专家,负责统计计算和方程求解。",
name="math_specialist"
)

# 3. 创建研究团队主管
research_supervisor = create_supervisor(
model=llm,
agents=[research_agent],
prompt=(
"你是研究团队的主管。\n"
"你的团队有一个研究专家,负责网络搜索和数据分析。\n"
"根据任务需求,将工作分配给研究专家。\n"
"等待专家完成任务后,总结结果并报告给上级主管。"
),
name="research_supervisor"
).compile(name="research_supervisor")

# 4. 创建数学团队主管
math_supervisor = create_supervisor(
model=llm,
agents=[math_agent],
prompt=(
"你是数学团队的主管。\n"
"你的团队有一个数学专家,负责统计计算和方程求解。\n"
"根据任务需求,将工作分配给数学专家。\n"
"等待专家完成任务后,总结结果并报告给上级主管。"
),
name="math_supervisor"
).compile(name="math_supervisor")

# 5. 创建顶层主管
top_supervisor = create_supervisor(
model=llm,
agents=[research_supervisor, math_supervisor],
prompt=(
"你是顶层主管,管理两个专业团队:\n"
"- 研究团队:负责市场调研、数据分析等任务\n"
"- 数学团队:负责统计计算、方程求解等任务\n"
"根据任务的性质,将工作分配给相应的团队主管。\n"
"等待团队完成任务后,整合所有结果并给出最终报告。"
),
name="top_supervisor"
).compile(name="top_supervisor")

参考资料

LangGraph:多智能体工作流 — LangGraph: Multi-Agent Workflows

Overview

前言

为了后续自己搭建全栈项目做准备,对react做一定的了解

学习目标:大致看懂react的基本语法,可以在ai的协助下完成前端的搭建

介绍

React 是 Facebook(现 Meta)于 2013 年开源的一套用于构建用户界面的 JavaScript 库,现由 React 核心团队与社区共同维护。

项目搭建

项目创建

1
npx create-react-app my-app

npx 是什么?

npm 5.2+ 自带的“包运行器”(Node Package eXecute)。类似uv

脚手架(Scaffold / Boilerplate)是什么?

  1. 定义:官方或社区提供的“项目模板生成器”,一条命令就能创建带目录结构、配置、脚本、依赖的完整项目骨架。
  2. 目的: • 省掉繁琐的初始化、Webpack/Rollup/Vite 配置、ESLint/TypeScript/测试等环境搭建。 • 统一团队规范,降低新人上手成本。

启动开发服务器

1
2
cd my-app
npm start # 或 yarn start

目录速览(核心)

1
2
3
4
5
6
my-app
├─ public/ # 静态资源,index.html 是页面模板
├─ src/
│ ├─ App.js # 根组件
│ ├─ index.js # 应用入口(ReactDOM.createRoot)
└─ package.json # 依赖与脚本

JSX

JSX(JavaScript XML 的缩写)是 React 引入的一种语法糖(syntactic sugar)。它让你在 JavaScript 文件里直接写类 HTML 标记,然后由构建工具(Babel、TypeScript、esbuild、swc)把它翻译成普通的 JavaScript 函数调用

如下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 找到 public/index.html 中 id="root" 的 DOM 节点,作为 React 应用的挂载点
const root = ReactDOM.createRoot(document.getElementById('root'));

// 2. 将根组件 <App /> 渲染到该挂载点
root.render(
// 3. <React.StrictMode> 是 React 提供的开发模式辅助工具
// 作用:在开发阶段自动检测潜在问题(如过时的 API、副作用重复执行等)
// 注意:它仅在开发环境生效,生产环境不会渲染任何额外 DOM
<React.StrictMode>
{/* 4. 项目真正的根组件 App,所有业务逻辑都从这里开始 */}
<App />
</React.StrictMode>
);

箭头函数

React(以及所有现代 JavaScript)里,“箭头”指的是 箭头函数(Arrow Function),语法是:

1
const 函数名 = (参数) => 返回值或语句块

它的作用可以概括为 “更简洁的函数声明 + 词法作用域的 this”

通俗理解:把小括号的内容变成箭头后的内容

函数组件

函数组件 + JSX 的组合作用是: 以函数的形式返回“虚拟 DOM 描述”,交由 React 渲染成真实 DOM,而不是直接返回 HTML 组件或字符串。

  1. 函数组件的“返回值”
1
2
3
function Welcome(props) {
return <h1>Hello {props.name}</h1>;
}

经过 Babel 编译后等价于:

1
2
3
function Welcome(props) {
return React.createElement('h1', null, 'Hello ', props.name);
}

React.createElement 会生成一个纯 JS 对象(虚拟节点),而不是一段 HTML 字符串。

使用示例

1
2
3
4
5
6
7
8
9
10
// 1. 接收父组件传来的 props
function Card({ title, children }) {
// 2. 返回一段 JSX(最终会被编译成虚拟 DOM)
return (
<div className="card">
<h2>{title}</h2>
{children}
</div>
);
}

使用:

1
2
3
<Card title="函数组件">
<p>Hello, world!</p>
</Card>

DOM(Document Object Model,文档对象模型)是浏览器在内存里把一份 HTML/XML 文档表示成树形结构编程接口(API)。

每个节点(元素、文本、注释…)都是一个对象,拥有属性与方法,例如:

1
2
3
const title = document.getElementById('title');
title.textContent = 'Hi React'; // 改文本
title.style.color = 'red'; // 改样式

插值写法

在 React 中,“插值”专指把一段 JavaScript 表达式的实时结果塞进 JSX 的写法。 核心符号只有一对花括号 { },记住口诀:“JSX 里凡是 {} 包起来的,就是 JavaScript 运行后的值。”

基本文本插值

1
2
const name = 'React';
<h1>Hello, {name}!</h1> // → Hello, React!

属性插值

1
2
3
4
5
6
function App() {
const mytitle="hello"
return (
<div title={mytitle}></div>
);
}

数据渲染

条件渲染

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function App() {
const mytitle="hello"

let mycontent=null
const flag=true
if(flag){
mycontent=<h2>hello</h2>
}
else{
mycontent=<h2>world</h2>
}
return (
<div title={mytitle}>{mycontent}</div>
);
}

列表渲染

1
2
3
4
5
6
7
8
9
function App() {
const list=['1','2','3']
const mycontent=list.map((item)=>{
return <li>{item}</li>
})
return (
<div>{mycontent}</div>
);
}
  1. .map((item) => { ... })Array.prototype.map:遍历数组,把每个元素依次交给回调函数处理,并返回一个新数组。 ‑ (item) 是每次循环拿到的当前元素。
  2. return <li>{item}</li> ‑ 每一次循环里,把当前元素 item 用 JSX 插值语法 {item} 放进 <li> 标签里。

状态处理

1
2
3
4
5
6
7
8
9
10
11
12
13
import { useState } from 'react';
function App() {
const [mycontent,setmycontent]=useState("hello world");
function changeContent(){
setmycontent("hello world2");
}
return (
<>
<div>{mycontent}</div>
<button onClick={changeContent}>change</button>
</>
);
}

useState 是 React 提供的 Hook,让函数组件也能拥有内部状态(state)。可以通过更新函数,调用后触发重新渲染。

对象的状态更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { useState } from 'react';
function App() {
const [mycontent,setmycontent]=useState({
title:'hello world',
content :'hello world content'
});
function changeContent(){
setmycontent({
...mycontent,
content:'new content'
});
}
return (
<>
<div title={mycontent.title}>{mycontent.content}</div>
<button onClick={changeContent}>change</button>
</>
);
}

...mycontent 是 ES6 的 对象展开运算符(object spread)。 一句话:把 mycontent 里所有“旧属性”先抄出来,然后再覆盖/新增你后面写的属性。

react组件的使用

1
2
3
4
5
6
7
8
import { useState } from 'react';
function App() {
return (
<>
<img src={logo} className="App-logo" alt="logo" style={{ width: '100px',backgroundColor: 'grey'}}/>
</>
);
}
  1. className 代替 class 传统 HTML 写 <img class="App-logo">;React 组件里必须用 className,因为 JSX 最终会被编译成 JavaScript 对象,而 class 是 JS 的保留关键字。

  2. 样式写成对象

HTML 写行内样式:style="width:100px;background-color:grey" React 必须写成对象:

1
2
3
4
style={{
width: '100px',
backgroundColor: 'grey' // 驼峰命名
}}

因为 JSX 属性最终会变成 JS 对象的键值对,键名必须合法(驼峰),值可以是任何 JS 值(数字、变量、计算结果)。

  1. 最终产物是虚拟 DOM 节点

<img src={logo} ... /> 在浏览器里不会直接变成 <img> 标签,而是先被编译成:

1
2
3
4
5
6
React.createElement('img', {
src: logo,
className: 'App-logo',
alt: 'logo',
style: { width: '100px', backgroundColor: 'grey' }
});

React 再拿这个对象去做 diff、更新真实 DOM,而不是直接 innerHTML。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function App() {

const imgdata={
className:"App-logo",
style:{
width:'100px',
backgroundColor:'grey'
}
}

return (
<>
<img src={logo} alt="logo" {...imgdata}/>
</>
);
}

利用 JSX 展开运算符(spread attributes)imgdata 里的所有键值一次性“拍平”到 <img> 标签上

组件复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function Article(props) {
return (
<div>
<h2>{props.title}</h2>
<p>{props.content}</p>
</div>
);
}

function App() {
return (
<>
<Article title="标签1" content="内容1" />
<Article title="标签2" content="内容2" />
</>
);
}

组件通信

组件通信的 4 条主线

1️⃣ 父 → 子:props 2️⃣ 子 → 父:回调函数 3️⃣ 隔代/任意:Context 4️⃣ 全局/远端:状态管理库(Zustand、Redux、React Query)

父 → 子

1
2
3
4
5
6
7
8
function Parent() {
const title = 'Hello React';
return <Child title={title} />;
}

function Child({ title }) {
return <h1>{title}</h1>;
}

子 → 父

1
2
3
4
5
6
7
8
9
10
11
12
13
function Parent() {
const [count, setCount] = useState(0);
return (
<>
<p>父:{count}</p>
<Child onInc={() => setCount(c => c + 1)} />
</>
);
}

function Child({ onInc }) {
return <button onClick={onInc}>子按钮 +1</button>;
}

父组件把“修改函数”通过 props 传给子组件,子组件在合适的时机调用它,把数据作为参数传回去。

react hooks

Hook 是什么? Hook 是 React 16.8 引入的 函数级 API,让函数组件拥有

  • 状态(useState)
  • 生命周期(useEffect)
  • 上下文(useContext)
  • 自定义逻辑(自定义 Hook) 而不必写 class。

参考资料

20分钟学会React Hooks 前端开发必看 AI编程工具 CodeGeeX 体验_哔哩哔哩_bilibili

前言

就是client调用agent那一块,感觉还是比较困惑,我看例子是要通过定义给的execut和cancel函数,那就意味着agent提供者都要去自己去定义这些怎么执行的函数,还有描述agent的skill和card,工作量明显比mcp大了很多,可能这也是现在a2a传播没有mcp好的一大原因吧,我的理解,不知道对不对


思考:现在利用a2a搭建多agent的现实例子多吗,从概念上,我认为a2a的思路是没问题的,但感觉下来,现在大多数的多agent的实现方式还是像langgraph中条件边来控制使用哪个agent,是不是因为a2a对于中小开发者搭建起来还是有些复杂,但我还是认为他这种于mcp类似,模块化,可以自定义的形式会是后续方向。就像现在的mcp client,可以在市场上下载自己想要的mcp,利用a2a协议,用户可以在市场上下载想用的agent,搭建自己的多agent管家,现在市场上有类似的产品吗?

a2a协议其实与mcp类似,对象不同,一个是mcp client与mcp server(tool),一个是agent client与agent server。具体实现中,需要完成对agent server的信息暴露与executor的编写,以便让client正确调用agent,调用前要启动服务。

一个agent server所要包含的要素包括:1.AgentSkill,用于描述agent可以实现的能力

2.AgentCard,描述agent的信息,包括运行的url,输入和返回的数据类型,所包含的skills

3.AgentExecutor,定义了如何执行智能体,通过定义execute方法,以便正确调用agent server

4.通过DefaultRequestHandler,封装调用agent的接口,不用再手写接口,只要提供一个 executor 和一个 store 即可,收到对话内容后,DefaultRequestHandler 会把对话打包成任务,交给 HelloWorldAgentExecutor 去执行。、

5.通过A2AStarletteApplication打包成应用(如fastapi),他的作用如下:1.把这个 handler 注册成真正的 HTTP 路由,于是外部就能通过 POST / 调用上述 JSON-RPC 方法。2.对外暴露名片

什么是A2A协议

A2A 协议(Agent2Agent Protocol,智能体间通信协议)是 Google 在 2025 年 4 月发布并开源的首个 AI 智能体交互标准。它通过统一的通信规范,解决不同团队、不同框架、不同供应商开发的 AI 智能体如何“对话”和协同工作的问题。

与mcp区分,MCP 解决 “单个智能体如何调用外部工具/数据” 的问题,而A2A 解决 “多个智能体如何协同完成任务” 的问题。

image-20250809222720192

为什么要使用A2A协议

随着 AI 应用深化,单一“万能”模型难以兼顾所有领域。A2A 鼓励构建“小而专”的智能体生态:

  • 每个智能体专注一个领域(如订票、报税、图像处理)。
  • 通过 A2A 协议,它们像乐高积木一样自由组合,快速响应新的业务需求。

比如你让一个agent使用多个工具,不仅会浪费tokens,也会降低其调用工具的准确性。所有,专业的领域使用专业的agent,而agent间的通信便要依靠A2A协议

环境配置

克隆仓库

如果你还没有克隆,请克隆 A2A Samples 仓库:

1
2
git clone https://github.com/a2aproject/a2a-samples.git -b main --depth 1
cd a2a-samples

Python 环境和 SDK 安装

我们推荐为 Python 项目使用虚拟环境。A2A Python SDK 使用 uv 进行依赖管理,但你也可以使用 pipvenv

  1. 创建并激活虚拟环境:

    使用 venv(标准库):

    1
    2
    python -m venv .venv
    source .venv/bin/activate
  2. 安装所需的 Python 依赖项以及 A2A SDK 及其依赖项:

    1
    pip install -r samples/python/requirements.txt

Agent Skills & Agent Card

Agent Skills

一个代理技能描述了代理可以执行的具体能力或功能。它是告诉客户端代理擅长哪些任务的构建模块。

AgentSkill 的关键属性(定义在 a2a.types 中):

  • id: 技能的唯一标识符。
  • name: 人类可读的名称。
  • description:对技能功能的更详细说明。
  • tags:用于分类和发现的关键词。
  • examples:示例提示或使用案例。
  • inputModes / outputModes: 支持的输入和输出媒体类型(例如,“text/plain”,“application/json”)。

__main__.py 中,你可以看到如何为 Helloworld 代理定义一个技能:

1
2
3
4
5
6
7
skill = AgentSkill(
id='hello_world',
name='Returns hello world',
description='just returns hello world',
tags=['hello world'],
examples=['hi', 'hello world'],
)

这个技能非常简单:它的名称是 “Returns hello world”,并且主要处理文本。

Agent Card

代理卡是一个 A2A 服务器提供的 JSON 文档,通常位于 .well-known/agent-card.json 端点。它就像代理的数字名片。

AgentCard 的关键属性(定义在 a2a.types 中):

  • name, description, version: 基本身份信息。
  • url:A2A 服务可访问的端点。
  • capabilities:指定支持的 A2A 功能,如 streamingpushNotifications
  • defaultInputModes / defaultOutputModes: 代理的默认媒体类型。
  • skills: 代理提供的 AgentSkill 对象列表。

helloworld 示例定义其 Agent Card 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# This will be the public-facing agent card
public_agent_card = AgentCard(
name='Hello World Agent',
description='Just a hello world agent',
url='http://localhost:9999/',
version='1.0.0',
# 默认输入模式:Agent 能够接收的输入类型列表,这里仅支持纯文本
default_input_modes=['text'],
# 默认输出模式:Agent 能够产生的输出类型列表,这里仅返回纯文本
default_output_modes=['text'],
# 能力声明:告知调用方 Agent 支持的能力,例如是否支持流式输出(streaming)
capabilities=AgentCapabilities(streaming=True),
skills=[skill], # Only the basic skill for the public card
supports_authenticated_extended_card=True,
)

这张卡片告诉我们代理名为 “Hello World Agent”,运行在 http://localhost:9999/,支持文本交互,并具有 hello_world 技能。它还表明支持公开认证,意味着无需特定凭证。

Agent Executor

A2A 代理处理请求和生成响应/事件的核心逻辑由一个 Agent Executor 负责。A2A Python SDK 提供了一个抽象基类 a2a.server.agent_execution.AgentExecutor 供你实现。

AgentExecutor 接口

AgentExecutor 类定义了两个主要方法:

  • async def execute(self, context: RequestContext, event_queue: EventQueue) : 处理期望响应或事件流的传入请求。它处理用户输入(可通过 context 获取)并使用 event_queue 发送 MessageTaskTaskStatusUpdateEventTaskArtifactUpdateEvent 对象。
  • async def cancel(self, context: RequestContext, event_queue: EventQueue) : 处理取消正在进行的任务的请求。

RequestContext 提供有关传入请求的信息,例如用户消息和任何现有的任务详情。EventQueue 由执行器使用,用于将事件发送回客户端。

Helloworld AgentExecutor

让我们看看 agent_executor.py。它定义了 HelloWorldAgentExecutor

  1. 代理(HelloWorldAgent:这是一个简单的辅助类,封装了实际的“业务逻辑”。

    1
    2
    3
    4
    5
    class HelloWorldAgent:
    """Hello World Agent."""

    async def invoke(self) -> str:
    return 'Hello World'

    它有一个简单的 invoke 方法,返回字符串”Hello World”。

  2. 执行器(HelloWorldAgentExecutor:这个类实现了 AgentExecutor 接口。

    • __init__:

      1
      2
      3
      4
      5
      class HelloWorldAgentExecutor(AgentExecutor):
      """Test AgentProxy Implementation."""

      def __init__(self):
      self.agent = HelloWorldAgent()

      它实例化了 HelloWorldAgent

    • execute:

      1
      2
      3
      4
      5
      6
      7
      async def execute(
      self,
      context: RequestContext,
      event_queue: EventQueue,
      ) -> None:
      result = await self.agent.invoke()
      await event_queue.enqueue_event(new_agent_text_message(result))

      当收到一个 message/sendmessage/stream 请求时(这两种请求在这个简化的执行器中均由 execute 处理):

      1. 它调用 self.agent.invoke() 来获取 “Hello World” 字符串。
      2. 它使用 new_agent_text_message 工具函数创建一个 A2A Message 对象。
      3. 它将此消息入队到 event_queue。底层的 DefaultRequestHandler 随后会处理这个队列以向客户端发送响应。对于像这样的一条消息,在流关闭之前,它将导致一个 message/send 的单一响应或一个 message/stream 的单一事件。
    • cancel: Helloworld 示例的 cancel 方法简单地抛出一个异常,表明这个基本代理不支持取消操作。

      1
      2
      3
      4
      async def cancel(
      self, context: RequestContext, event_queue: EventQueue
      ) -> None:
      raise Exception('cancel not supported')

AgentExecutor 充当 A2A 协议(由请求处理器和服务器应用程序管理)与您的代理特定逻辑之间的桥梁。它接收关于请求的上下文信息,并使用事件队列来通信结果或更新。

启动server

现在我们已经有了 Agent Card 和 Agent Executor,可以设置并启动 A2A 服务器。

A2A Python SDK 提供了一个 A2AStarletteApplication 类,简化了运行符合 A2A 标准的 HTTP 服务器。它使用 Starlette 作为 Web 框架,通常与 Uvicorn 等 ASGI 服务器一起运行。

让我们再次查看 __main__.py,看看服务器是如何初始化和启动的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import uvicorn

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
AgentCapabilities,
AgentCard,
AgentSkill,
)
from agent_executor import (
HelloWorldAgentExecutor, # type: ignore[import-untyped]
)


if __name__ == '__main__':
skill = AgentSkill(
id='hello_world',
name='返回 hello world',
description='简单地返回 hello world',
tags=['hello world'],
examples=['hi', 'hello world'],
)

extended_skill = AgentSkill(
id='super_hello_world',
name='返回 SUPER Hello World',
description='仅限已认证用户使用的更热情的问候。',
tags=['hello world', 'super', 'extended'],
examples=['super hi', 'give me a super hello'],
)

# 这是面向公众的 Agent 卡片
public_agent_card = AgentCard(
name='Hello World Agent',
description='只是一个 hello world 代理',
url='http://localhost:9999/',
version='1.0.0',
default_input_modes=['text'],
default_output_modes=['text'],
capabilities=AgentCapabilities(streaming=True),
skills=[skill], # 公开卡片仅包含基础技能
supports_authenticated_extended_card=True,
)

# 这是已认证用户的扩展 Agent 卡片
# 额外包含 'extended_skill'
specific_extended_agent_card = public_agent_card.model_copy(
update={
'name': 'Hello World Agent - Extended Edition', # 使用不同名称以便区分
'description': '面向已认证用户的完整功能 hello world 代理。',
'version': '1.0.1', # 甚至可以是不同的版本
# capabilities 及其他字段(如 url、default_input_modes、default_output_modes、
# supports_authenticated_extended_card)均从 public_agent_card 继承,
# 除非在此处另行指定。
'skills': [
skill,
extended_skill,
], # 扩展卡片包含两个技能
}
)

request_handler = DefaultRequestHandler(
agent_executor=HelloWorldAgentExecutor(),
task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
agent_card=public_agent_card,
http_handler=request_handler,
extended_agent_card=specific_extended_agent_card,
)

# 使用 uvicorn 启动服务,监听 0.0.0.0:9999
uvicorn.run(server.build(), host='0.0.0.0', port=9999)

我们来分解一下:

  1. DefaultRequestHandler:
    • SDK 提供了 DefaultRequestHandler。这个处理器接收你的 AgentExecutor 实现(这里,HelloWorldAgentExecutor)和一个 TaskStore(这里,InMemoryTaskStore)。
    • 它将传入的 A2A RPC 调用路由到你的执行器的适当方法上(比如 executecancel)。
    • TaskStoreDefaultRequestHandler 用来管理任务的生命周期,特别是对于有状态交互、流式传输和重新订阅。即使你的代理执行器很简单,处理器也需要一个任务存储。
  2. A2AStarletteApplication:
    • A2AStarletteApplication 类使用 agent_cardrequest_handler(在其构造函数中称为 http_handler)进行实例化。
    • agent_card 至关重要,因为服务器将在 /.well-known/agent-card.json 端点(默认情况下)上公开它。
    • request_handler 负责通过与其 AgentExecutor 交互来处理所有传入的 A2A 方法调用。
  3. uvicorn.run(server_app_builder.build(), ...):
    • A2AStarletteApplication 有一个 build() 方法,用于构建实际的 Starlette 应用程序。
    • 然后使用 uvicorn.run() 运行该应用程序,使您的代理可通过 HTTP 访问。
    • host='0.0.0.0' 使服务器可在您机器上的所有网络接口上访问。
    • port=9999 指定监听的端口。这需要与 AgentCard 中的 url 匹配。
  4. specific_extended_agent_card
    • 给同一个 Agent 准备“两张不同权限的名片”,分别用于“普通访客”和“已认证用户”。、

与服务器交互

Helloworld A2A 服务器运行后,让我们向它发送一些请求。SDK 包含一个客户端(A2AClient),可以简化这些交互。

让我们看一下 test_client.py 的关键部分:

  1. 获取代理卡 & 初始化客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    base_url = 'http://localhost:9999'

    async with httpx.AsyncClient() as httpx_client:
    # 初始化 A2ACardResolver
    resolver = A2ACardResolver(
    httpx_client=httpx_client,
    base_url=base_url,
    # agent_card_path 使用默认值,extended_agent_card_path 也使用默认值
    )

    A2ACardResolver 类是一个便捷工具。它首先从服务器端的 /.well-known/agent-card.json 端点(基于提供的基 URL)获取 AgentCard,然后使用它初始化客户端。

  2. 发送非流式消息 (send_message):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    client = A2AClient(
    httpx_client=httpx_client,
    agent_card=final_agent_card_to_use#这个card为经过认证处理后暴露的card
    )
    logger.info('A2AClient initialized.')

    send_message_payload: dict[str, Any] = {
    'message': {
    'role': 'user',
    'parts': [
    {'kind': 'text', 'text': 'how much is 10 USD in INR?'}
    ],
    'messageId': uuid4().hex,
    },
    }
    request = SendMessageRequest(
    id=str(uuid4()), params=MessageSendParams(**send_message_payload)
    )

    response = await client.send_message(request)
    print(response.model_dump(mode='json', exclude_none=True))
    • send_message_payload 构建了 MessageSendParams 的数据。
    • 这些数据被封装在 SendMessageRequest 中。
    • 它包含一个 message 对象,其中 role 设置为”用户”,内容在 parts 中。
    • Helloworld 代理的 execute 方法将入队一条”Hello World”消息。DefaultRequestHandler 将获取这条消息并将其作为响应发送。
    • response 将是一个 SendMessageResponse 对象,其中包含 SendMessageSuccessResponse(以代理的 Message 作为结果)或 JSONRPCErrorResponse
  3. 处理任务 ID(Helloworld 的说明性注释):

    Helloworld 客户端(test_client.py)不会直接尝试 get_taskcancel_task,因为简单的 Helloworld 代理的 execute 方法,通过 message/send 调用时,会导致 DefaultRequestHandler 返回一个直接的 Message 响应,而不是 Task 对象。更复杂的、明确管理任务的代理(如 LangGraph 示例)会从 message/send 返回一个 Task 对象,然后其 id 可用于 get_taskcancel_task

  4. 发送流式消息(send_message_streaming

    1
    2
    3
    4
    5
    6
    7
    8
    streaming_request = SendStreamingMessageRequest(
    id=str(uuid4()), params=MessageSendParams(**send_message_payload)
    )

    stream_response = client.send_message_streaming(streaming_request)

    async for chunk in stream_response:
    print(chunk.model_dump(mode='json', exclude_none=True))
    • 此方法调用代理的 message/stream 端点。DefaultRequestHandler 将调用 HelloWorldAgentExecutor.execute 方法。
    • execute 方法将一个”Hello World”消息入队,然后关闭事件队列。
    • 客户端将接收这条单条消息为一个 SendStreamingMessageResponse 事件,然后流将终止。
    • stream_response 是一个 AsyncGenerator

参考资料

a2aproject/a2a-samples: Samples using the Agent2Agent (A2A) Protocol

Agent2Agent (A2A) Protocol

a2aproject/a2a-python: Agent2Agent (A2A) 协议的官方 Python SDK — a2aproject/a2a-python: Official Python SDK for the Agent2Agent (A2A) Protocol

0%