asyncio是什么

asyncioPython 标准库中的一个模块,用于编写异步(asynchronous)程序。它提供了一套完整的工具,让你可以用 async/await 语法编写并发代码,特别适合处理 I/O 密集型任务(比如网络请求、文件读写、数据库查询等),而不会阻塞整个程序。

概念学习

并发

并发(Concurrency)是一种“效果”——多个任务在一段时间内交替或同时推进。 它可以通过多种方式实现,常见的有:

  1. 多线程(Multithreading)
  2. 异步编程(Asynchronous programming,如 asyncio)
  3. 多进程(Multiprocessing)(严格说更偏向“并行”,但也支持并发)

进程(Process)线程(Thread)协程(Coroutine)

概念 定义
进程(Process) 操作系统进行资源分配和调度的基本单位。它是程序的一次执行实例,拥有独立的虚拟地址空间、文件描述符表、环境变量、信号处理表等内核资源。
线程(Thread) 进程内的执行流(execution context),是 CPU 调度的基本单位。同一进程内的多个线程共享该进程的地址空间和大部分资源(如堆、全局变量、打开的文件),但各自拥有独立的栈、寄存器状态和线程局部存储(TLS)。
协程(Coroutine) 一种用户态的轻量级并发原语,属于协作式多任务(cooperative multitasking)模型。协程的切换由程序显式控制(如通过 awaityield),不依赖操作系统调度,上下文切换在用户空间完成,无内核介入。

线程”在 CPU 核心上运行。 进程是资源容器,线程是执行单位。

内存与资源共享模型

模型 地址空间 堆(Heap) 栈(Stack) 同步机制
进程 独立 独立 独立 需 IPC(如管道、消息队列、共享内存 + 信号量)
线程 共享(同进程内) 共享 独立(每个线程一个栈) 需互斥锁(Mutex)、条件变量等防止数据竞争
协程 共享(同一线程内) 共享 逻辑独立(由运行时管理协程栈或使用生成器状态) 通常无需锁(因单线程串行执行),但需注意异步回调中的状态一致性

在 Python 中的实现

模型 标准库模块 关键 API
进程 multiprocessing Process, Pool, Queue, Pipe, Manager
线程 threading Thread, Lock, Condition, Semaphore
协程 asyncio + async/await async def, await, asyncio.run(), create_task(), gather()

进程资源隔离与并行计算的基石;

线程操作系统级并发的传统手段,但在 Python 中受 GIL 限制;

协程高并发 I/O 的现代解决方案,以极低开销实现大规模并发,已成为 Python 异步编程的事实标准。

1
2
3
4
操作系统
└── 进程(Process) ← 资源容器(内存、文件描述符等)
└── 线程(Thread) ← CPU 调度单元(至少一个)
└── 协程(Coroutine) ← 用户态逻辑任务(可多个,协作式切换)

当你运行一个 Python 脚本(如 python app.py),操作系统会:

  1. 创建一个新进程(Process)
  2. 加载 Python 解释器(CPython)
  3. 在该进程中启动主线程(Main Thread)
  4. 执行你的代码

📌 所以:一个正在运行的 Python 程序 = 1 个进程 + 至少 1 个线程(主线程)

但注意:

  • 程序可以创建更多进程(通过 multiprocessing
  • 程序可以创建更多线程(通过 threading
  • 所以“一个程序”最终可能对应 多个进程、多个线程

Python 异步编程的核心机制

  1. async def 定义协程函数 → 返回 协程对象(Coroutine Object)
  2. await 用于挂起当前协程,等待另一个协程或异步操作完成
  3. 事件循环(Event Loop)(由 asyncio 提供)负责:
    • 调度协程
    • 管理 I/O 多路复用(如 epoll/kqueue
    • 在 I/O 就绪时恢复对应协程

实战学习

sync_demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from time import sleep, perf_counter

def fetch_url(url):
print('Fetching the URL')
sleep(1)#模拟阻塞
print('Finished fetching')
return 'url_content'

def read_file(filepath):
print('Reading the file')
sleep(1)#模拟阻塞
print('Finished reading')
return 'file_content'

def main():
url = 'example.com'
filepath = 'example.txt'
fetch_result = fetch_url(url)
read_result = read_file(filepath)

if __name__ == '__main__':
start_time = perf_counter()
main()
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上展示一份同步的代码,sleep用来模拟阻塞

1
2
3
4
5
Fetching the URL
Finished fetching
Reading the file
Finished reading
Time taken: 2.00 seconds

async_demo.py

异步编程的三步核心流程

1. 定义协程函数 → 2. 包装协程为任务 → 3. 建立事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from time import sleep, perf_counter
import asyncio

#定义协程函数
async def fetch_url(url):
print(f'Fetching the url')
#把阻塞操作包装成协程
await asyncio.sleep(1)
print('Finished fetching')
return 'url_content'

async def read_file(filepath):
print('Reading the file')
await asyncio.sleep(1)
print('Finished reading')
return 'file_content'

#若想使用await,需要把main函数定义成协程函数
async def main():
url = 'example.com'
filepath = 'example.txt'
#创建任务
tasks = [asyncio.create_task(coro) for coro in [
fetch_url(url),
read_file(filepath)]]
fetch_result = await tasks[0]
read_result = await tasks[1]

if __name__ == '__main__':
start_time = perf_counter()
#创建事件循环
asyncio.run(main())
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上则是改为了异步的代码

1
2
3
4
5
Fetching the url
Reading the file
Finished fetching
Finished reading
Time taken: 1.01 seconds

协程函数

协程函数(Coroutine Function)是 Python 中使用 async def 语法定义的函数,它是异步编程的核心构建单元。调用协程函数不会立即执行其内部代码,而是返回一个 协程对象(Coroutine Object),该对象必须由事件循环(如 asyncio)驱动或通过 await 在另一个协程中调用,才能真正执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

#协程函数
async def coroutine_func():
return 'coroutine_result'

def main():
print(coroutine_func())
result = asyncio.run(coroutine_func())
print("---")
print(result)


if __name__ == '__main__':
main()
1
2
3
4
5
6
<coroutine object coroutine_func at 0x000001958D88A4B0>
d:\code\python\learn_pythontips\learn_async\coroutine_func.py:8: RuntimeWarning: coroutine 'coroutine_func' was never awaited
print(coroutine_func())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
---
coroutine_result

由输出可见,coroutine object,返回的是一个协程对象

await关键字

await 是 Python 异步编程中的核心关键字,它的作用是:

暂停当前协程的执行,等待一个“可等待对象”(awaitable)完成,并获取其结果,同时将控制权交还给事件循环,使其能运行其他任务。

✅ 三大功能:

  1. 挂起(Suspend):当前协程在此处暂停,不阻塞线程。
  2. 等待(Wait):等待一个异步操作(如网络请求、文件读写、定时器)完成。
  3. 恢复(Resume):当被等待的对象完成后,协程从此处恢复执行,并拿到结果。

⚠️ 关键:await 不会阻塞整个线程,而是让事件循环去执行其他就绪的协程。

事件循环(Event Loop)与任务(Task)

事件循环(Event Loop) 是异步编程的核心引擎,尤其在 Python 的 asyncio 模型中,它是驱动协程执行、管理异步 I/O、调度任务的中枢系统

事件循环是一个程序结构,用于监听和分发事件或消息,实现非阻塞 I/O 和协作式多任务调度。

在 Python asyncio 中,事件循环:

  • 维护一个待执行协程队列
  • 管理定时器(如 asyncio.sleep
  • 使用操作系统提供的 I/O 多路复用机制(如 Linux 的 epoll、macOS 的 kqueue、Windows 的 IOCP)来高效监听大量文件描述符(如 socket)
  • 在 I/O 就绪时,恢复对应的协程

那事件循环如何知道哪些协程可以执行,哪些协程需要暂停呢

在 Python 异步编程(特别是 asyncio)中,任务(Task) 是是对协程(Coroutine)的封装,用于被事件循环调度和并发执行

Taskasyncio 中表示“未来会完成的异步操作”的对象,它是 Future 的子类,用于包装协程并自动调度其执行。

核心作用:

  1. 将协程注册到事件循环中,使其能够并发运行(而非顺序等待)
  2. 提供状态管理(如是否完成、是否取消、结果或异常)
  3. 支持取消操作task.cancel()
  4. 允许多次 await(协程对象只能 await 一次,但 Task 可以)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def work(name, delay):
print(f"开始 {name}")
await asyncio.sleep(delay)
print(f"完成 {name}")
return f"result-{name}"

async def main():
# 创建任务 → 立即开始执行!
task1 = asyncio.create_task(work("A", 2))
task2 = asyncio.create_task(work("B", 1))

# 等待任务完成(可获取结果)
r1 = await task1
r2 = await task2
print(r1, r2)

asyncio.run(main())
1
2
3
4
5
开始 A
开始 B
完成 B
完成 A
result-A result-B

💡 Task 是实现并发的关键:它让多个协程“同时启动”,而不是“一个接一个等”。

asyncio.gather

asyncio.gather用于并发运行多个 awaitable 对象(如协程、Task、Future),并按顺序返回它们的结果列表

默认:任意一个任务出错,其他任务会被取消(除非 return_exceptions=True

1
2
3
4
5
6
7
async def main():
url = 'example.com'
filepath = 'example.txt'
results = await asyncio.gather(
fetch_url(url),
read_file(filepath))
print(results)
1
2
3
4
5
6
Fetching the url
Reading the file
Finished fetching
Finished reading
['url_content', 'file_content']
Time taken: 1.00 seconds

asyncio.as_completed

asyncio.as_completed返回一个异步迭代器(async iterator),按任务完成的先后顺序,逐个产出已完成的 awaitable 对象(通常是 Task 或 Future)。

1
2
3
4
5
results = asyncio.as_completed([
fetch_url(url),
read_file(filepath)])
for result in results:
print(await result)

特点:先完成的任务先被处理,无需等待所有任务结束

asyncio.to_thread

await asyncio.to_thread(func, \*args)默认线程池中运行同步函数 func(*args),并返回结果。 它是 loop.run_in_executor(None, func, *args)高层封装

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import time

def blocking_io():
time.sleep(1)
return "Done!"

async def main():
result = await asyncio.to_thread(blocking_io)
print(result)

asyncio.run(main())

aiohttp和aiofiles

aiohttp:异步 HTTP 客户端与服务器框架

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio

async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

asyncio.run(fetch("https://example.com"))

aiofiles 是一个为标准文件操作提供异步接口的库,允许你在 async/await 代码中安全地读写文件,而不会阻塞事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import aiofiles
import asyncio

async def main():
# 写文件
async with aiofiles.open('data.txt', 'w') as f:
await f.write('Hello, async files!')

# 读文件
async with aiofiles.open('data.txt', 'r') as f:
content = await f.read()
print(content)

asyncio.run(main())

参考资料

【Py】asyncio:为异步编程而生 | Python 特性 | 并发编程 | 协程_哔哩哔哩_bilibili

【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。_哔哩哔哩_bilibili

为什么要有设计模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class DatabaseConnection:
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password

def connect(self):
return f"Connecting to database at {self.host}:{self.port} with username '{self.username}'"


def client():
main_db = DatabaseConnection('localhost', 3306, 'root', 'password123')
analytics_db = DatabaseConnection('192.168.1.1', 5432, 'admin', 'securepass')
cache_db = DatabaseConnection('10.0.0.1', 27017, 'cacheuser', 'cachepass')

print(main_db.connect())
print(analytics_db.connect())
print(cache_db.connect())


client()

这是数据库连接类的范例,其存在以下问题

  1. 数据库连接信息(主机、端口、用户名、密码)直接写在代码中,缺乏配置管理,难以在不同环境间切换,如果要更改数据库的信息,就要更改项目中每一处的连接数据库的参数,难以维护
  2. 每次都需要手动传入相同类型的参数,增加出错的风险

工厂模式(Factory Pattern)

工厂模式 是一种创建型设计模式,它提供了一种创建对象的接口,但让子类决定实例化哪一个类。换句话说:把对象的创建过程封装起来,调用者不需要关心具体创建的是哪个类,只需要知道“我要一个某种类型的东西”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def connection_factory(db_type):
db_configs = {
'main': {
'host': 'localhost',
'port': 3306,
'username': 'root',
'password': 'password123'
},
'analytics': {
'host': '192.168.1.1',
'port': 5432,
'username': 'admin',
'password': 'securepass'
},
'cache': {
'host': '10.0.0.1',
'port': 27017,
'username': 'cacheuser',
'password': 'cachepass'
}
}

return DatabaseConnection(**db_configs[db_type])


# 测试工厂模式
if __name__ == "__main__":
# 使用工厂函数创建不同类型的数据库连接
main_db = connection_factory('main')
analytics_db = connection_factory('analytics')
cache_db = connection_factory('cache')

# 测试连接
print(main_db.connect())
print(analytics_db.connect())
print(cache_db.connect())

将数据库的配置信息提取到config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#factory_pattern.py
def connection_factory(db_type):
"""
工厂函数:根据数据库类型创建相应的数据库连接

Args:
db_type (str): 数据库类型 ('main', 'analytics', 'cache')

Returns:
DatabaseConnection: 数据库连接实例

Raises:
KeyError: 当提供的数据库类型不存在时
"""
from config import DATABASE_CONFIGS
if db_type not in DATABASE_CONFIGS:
raise ValueError(f"Unknown database type: {db_type}. Available types: {list(DATABASE_CONFIGS.keys())}")

return DatabaseConnection(**DATABASE_CONFIGS[db_type])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#config.py
# 数据库配置字典
DATABASE_CONFIGS = {
'main': {
'host': 'localhost',
'port': 3306,
'username': 'root',
'password': 'password123'
},
'analytics': {
'host': '192.168.1.1',
'port': 5432,
'username': 'admin',
'password': 'securepass'
},
'cache': {
'host': '10.0.0.1',
'port': 27017,
'username': 'cacheuser',
'password': 'cachepass'
}
}

为什么要这么做

  • 所有配置信息都在一个地方,便于统一管理和维护
  • 易于修改 :配置变更不需要修改业务逻辑代码

工厂模式的核心就是工厂函数

工厂函数是一个返回对象的函数(而不是类),它封装了对象的创建逻辑,根据输入参数决定返回哪种具体对象。

建造者模式(Builder Pattern)

当你需要创建一个有很多属性、配置步骤繁多的对象时(比如一辆汽车、一个 HTTP 请求、一个数据库连接配置),直接用构造函数会非常混乱。 建造者模式通过“分步构建 + 最终组装”的方式,让创建过程清晰、灵活、可读性强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class DatabaseConnectionBuilder:
"""数据库连接建造者类"""

def __init__(self,host,port,username,password):
self._config = {
'host': host,
'port': port,
'username': username,
'password': password,
}

def set_max_connections(self, max_connections):
"""设置最大连接数"""
if max_connections is not None and max_connections <= 0:
raise ValueError("Max connections must be positive")
self._config['max_connections'] = max_connections
return self

def set_timeout(self, timeout):
"""设置超时时间"""
if timeout is not None and timeout <= 0:
raise ValueError("Connect timeout must be positive")
self._config['timeout'] = timeout
return self

def build(self):
"""构建最终的数据库连接对象"""
return DatabaseConnection(**self._config)


# 演示建造者模式的使用
if __name__ == "__main__":
# 创建一个数据库连接建造者
builder = DatabaseConnectionBuilder(
host='localhost',
port=3306,
username='root',
password='password123'
)

# 使用建造者模式构建数据库连接
connection = builder.set_max_connections(100) \
.set_timeout(30) \
.enable_ssl() \
.set_connection_pool('my_pool') \
.set_retry_attempts(3) \
.enable_compression() \
.set_read_preference('secondaryPreferred') \
.build()

单例模式(Singleton Pattern)

单例模式确保一个类只有一个实例,并提供一个全局访问点来获取该实例。

换句话说: 无论你调用多少次“创建对象”,返回的始终是同一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class DatabaseConnection:
"""数据库连接单例类"""

_instance = None
_lock = threading.Lock() # 线程锁,确保线程安全

def __new__(cls, *args, **kwargs):
"""重写__new__方法实现单例模式"""
if cls._instance is None:
with cls._lock: # 使用双重检查锁定
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, host, port, username, password):
"""初始化数据库连接参数"""
# 防止重复初始化
if hasattr(self, '_initialized'):
return

self.host = host
self.port = port
self.username = username
self.password = password
self._initialized = True

def connect(self):
"""连接数据库"""
return f"Connecting to database at {self.host}:{self.port} with username '{self.username}'"

def client():
db1 = DatabaseConnection(host='localhost', port=3306, username='root', password='password')
db2 = DatabaseConnection(host='localhost', port=3306, username='root', password='password')
print(db1 is db2)

client()

参考资料

【设计模式 inPy】一个视频搞懂三种设计模式:工厂、建造者和单例_哔哩哔哩_bilibili

为什么你应该忘掉设计模式?_哔哩哔哩_bilibili

【设计模式inPython】策略模式:不要再用一个类装所有方法啦!_哔哩哔哩_bilibili

GIL是什么

GIL(Global Interpreter Lock,全局解释器锁)CPython 解释器(Python 的官方实现)中的一个互斥锁(mutex),它确保同一时刻只有一个线程执行 Python 字节码

🔒 互斥锁的定义:

互斥锁是一种同步原语,用于防止多个线程同时访问共享资源

GIL 正是这样一把锁:

  • 共享资源 = CPython 解释器的内部状态(如对象引用计数、内存管理器)
  • 保护方式 = 任何线程要执行 Python 代码,必须先获取 GIL

为什么要设计GIL

GIL 的核心原因:CPython 的内存管理模型

🔑 关键点:

CPython 使用“引用计数(Reference Counting)”作为主要的内存管理机制,而引用计数的增减操作必须是原子的

🔑 核心逻辑链:

Python(CPython)使用引用计数(Reference Counting)管理内存引用计数的增减必须是原子操作(否则会出错)为了保证原子性,CPython 引入了 GIL(全局互斥锁)GIL 确保同一时刻只有一个线程能修改引用计数

什么是引用计数(Reference Counting)

在 CPython 中,每个 Python 对象(如 list, str, 自定义类实例)都有一个字段叫 ob_refcnt,记录“有多少变量/容器引用了它”。

1
2
3
4
a = [1, 2, 3]      # ob_refcnt = 1
b = a # ob_refcnt = 2
c = [a, "hello"] # ob_refcnt = 3(因为 c[0] 也引用了它)
del b # ob_refcnt = 2

ob_refcnt 降到 0 时,对象立即被销毁(内存回收)。

如果没有GIL

假设两个线程同时执行 b = a

线程 A 线程 B 实际 ob_refcnt
读取 ob_refcnt = 1 读取 ob_refcnt = 1 1
计算 1 + 1 = 2 计算 1 + 1 = 2
写回 ob_refcnt = 2 写回 ob_refcnt = 2 2(但正确值应为 3!)

👉 结果:引用计数错误 → 对象可能被提前释放(程序崩溃)或内存泄漏

💥 这就是竞态条件(Race Condition):多个线程/进程并发访问共享资源时,最终结果依赖于它们的执行顺序或 timing(时序),导致程序行为不可预测、错误或崩溃。

为什么 Java/Go 不需要GIL?

语言 内存管理 是否需要全局锁
Python (CPython) 引用计数(运行时增减) ✅ 需要(GIL)
Java / Go / C# 垃圾回收(GC) ❌ 不需要
Rust 编译期所有权检查 ❌ 不需要
  • GC 语言:对象分配/回收由专用 GC 线程处理,用户线程不直接操作引用计数
  • Rust:内存安全在编译期保证,运行时无引用计数开销

什么是GC

GC(Garbage Collection,垃圾回收) 是现代编程语言中自动管理内存的核心机制。其核心思想为, 自动找出“不再使用的对象”,并回收其内存,无需程序员手动释放。

🆚 对比:手动管理 vs 自动管理

方式 代表语言 特点
手动管理 C, C++ 程序员用malloc/freenew/delete管理内存 → 容易出错(内存泄漏、野指针)
自动管理 Java, Go, C#, Python 语言运行时自动回收内存 → 安全,但有性能开销

GC是如何工作的

主流 GC(如 Java、Go)使用 “可达性分析”(Reachability Analysis)判断对象是否存活:

🌳 核心概念:根对象(Roots)

  • 全局变量
  • 当前函数的局部变量
  • CPU 寄存器中的引用

📌 判断规则:

从根对象出发,能通过引用链到达的对象 = 存活对象 无法到达的对象 = 垃圾(可回收)

python的程序并行

由于 CPython 的 GIL(全局解释器锁)存在:

  • 多线程无法在多核 CPU 上并行执行 Python 字节码(尤其是 CPU 密集型任务)
  • 但多进程可以绕过 GIL,每个进程拥有独立的解释器和内存空间,因此能真正并行,充分利用多核 CPU。
类型 是否受 GIL 限制 能否利用多核 适用场景
多线程(Threading) ✅ 受限(CPU 任务串行) ❌ CPU 任务不能✅ I/O 任务可以(因释放 GIL) 网络请求、文件读写、数据库查询等 I/O 密集型
多进程(Multiprocessing) ❌ 不受限 ✅ 能(真并行) 图像处理、模型推理、加密计算等 CPU 密集型

📌 注意:“并行”(parallelism)≠ “并发”(concurrency)
- 多线程在 Python 中实现的是 并发(交替执行),不是 并行(同时执行)(对 CPU 任务而言)。

参考资料

带大家感受一下没有GIL的CPython_哔哩哔哩_bilibili

【python】天使还是魔鬼?GIL的前世今生。一期视频全面了解GIL!_哔哩哔哩_bilibili

Python项目结构和打包

pip install 的本质

PyPI(默认)或其他源(如私有仓库、本地文件)查找指定名称的包,下载对应的.whl文件

whl文件

  • .whlWheel 的缩写,是 Python 的一种标准打包格式(PEP 427 定义)。
  • 它本质上是一个 ZIP 格式的压缩包,扩展名改为 .whl

安装 .whl 文件

1
pip install package_name.whl

hachling

hatchling 是 Python 生态中一个现代的、轻量级的构建后端(build backend),主要用于将 Python 项目打包成可分发的格式(如 .whl 或源码包)。它是 Hatch项目的一部分,由 PyPA(Python Packaging Authority)推荐使用。

在pyproject.toml添加

1
2
3
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

当你运行:

1
2
3
pip install .
# 或
python -m build

如果项目配置了 hatchling 作为构建后端,pipbuild 工具就会调用 hatchling 来完成打包和安装。

例如以下项目结构

1
2
3
4
5
6
7
8
my_project/
├── pyproject.toml
├── src/
│ └── my_utils/ ← 你写的包
│ ├── __init__.py
│ └── math.py
├── scripts/
│ └── run_demo.py ← 想在这里用 my_utils

想让项目的其他文件使用自己编写的包,在pyproject.toml增加

1
2
3
# 👇 新增:显式声明包位置
[tool.hatch.build.targets.wheel]
packages = ["src/my_utils"]

在项目根目录(my_project/)执行:

1
uv pip install -e .

✅ 这会把 src/my_utils/ 注册为一个可导入的包。

src layout

src layout(也称为 src 布局src 目录结构)是 Python 项目中一种推荐的源代码组织方式,其核心思想是:将你的 Python 包(package)放在一个名为 src/ 的子目录下,而不是直接放在项目根目录中。

❌ 传统布局(不推荐)

1
2
3
4
5
6
7
my_project/
├── my_package/
│ ├── __init__.py
│ └── module.py
├── tests/
├── setup.py
└── README.md

src 布局(推荐)

1
2
3
4
5
6
7
8
my_project/
├── src/
│ └── my_package/
│ ├── __init__.py
│ └── module.py
├── tests/
├── pyproject.toml
└── README.md

uv build和uv pip install .什么区别

一、uv build

✅ 作用:构建分发包(不安装)

  • 调用项目的构建后端(如 hatchlingsetuptools 等)。
  • 生成标准的分发文件:
    • 一个 Wheel 文件.whl
    • 一个 源码分发包(sdist,.tar.gz
  • 输出到项目根目录下的 dist/ 文件夹。
  • 不会将包安装到当前 Python 环境中

🔧 示例:

1
uv build

输出:

1
2
dist/my_package-0.1.0-py3-none-any.whl
dist/my_package-0.1.0.tar.gz

image-20251018192905210

二、uv pip install .

✅ 作用:安装当前项目到当前环境

  • 首先(隐式)构建项目(类似 uv build 的过程)。
  • 然后将构建结果安装到当前激活的 Python 环境(如虚拟环境或系统环境)。
  • 安装后,你可以在 Python 中 import 该包。
  • 默认是 “非可编辑安装”(即代码改动不会自动生效,除非重新安装)。

🔧 示例:

1
uv pip install .

效果: - 包被安装到 site-packages/ - 可在 Python 中 import my_package

Editable install(可编辑安装)

Editable install(可编辑安装) 是 Python 包管理中的一种安装模式,它让你在安装一个包的同时,保留对源代码的直接引用

1
uv pip install -e .
  • 核心机制
    • 不复制代码到 site-packages/
    • 而是在 site-packages/ 中创建一个 .pth 文件my_package.egg-link,指向你本地项目中的 src/(或包目录)。
    • Python 解释器在导入时,会顺着这个链接去读你本地的源码。
image-20251018193022973

python是从哪里查找模块的

Python 查找模块(module)的机制由 模块搜索路径(module search path) 决定,这个路径是一个字符串列表,存储在 sys.path 中。当你执行 import some_module 时,Python 会按顺序在这个列表中的每个目录里查找对应的模块文件。

可以通过以下代码查看当前 Python 的模块搜索路径:

1
2
import sys
print(sys.path)

sys.path 通常包含以下几类路径(顺序很重要):

  1. 脚本所在目录(或当前工作目录)
  • 如果你运行 python /path/to/script.py,那么 /path/to/ 会被加到 sys.path[0]
  • 如果你运行 python 进入交互模式,或运行 python -c "...",则当前工作目录(os.getcwd() 会被放在首位。
  • ⚠️ 这是很多“意外导入”问题的根源(比如项目根目录下有同名包)。
  1. 环境变量 PYTHONPATH 中的目录
  • 类似系统的 PATH,你可以通过设置 PYTHONPATH 添加自定义搜索路径。
  • 示例(Linux/macOS):
    1
    2
    export PYTHONPATH="/my/custom/modules:$PYTHONPATH"
    python my_script.py
  • Windows(PowerShell):
    1
    $env:PYTHONPATH = "C:\my\custom\modules;" + $env:PYTHONPATH
  1. 标准库目录
  • Python 自带的模块(如 os, sys, json)所在位置。
  • 通常位于 Python 安装目录下的 lib/ 子目录中。
  1. 第三方包安装目录(site-packages)
  • 通过 pip installuv pip install 等安装的包,会被放到 site-packages 目录。
  • 路径可通过以下命令查看:
    1
    2
    3
    import site
    print(site.getsitepackages()) # 全局环境
    print(site.getusersitepackages()) # 用户级安装
  1. .pth 文件中指定的路径
  • 某些包(尤其是 editable install)会在 site-packages/ 中放置 .pth 文件,动态添加路径到 sys.path

参考资料

build + hatchling 15分钟搞懂Python项目结构和打包_哔哩哔哩_bilibili

@field_validator()

field_validator 是 Pydantic v2 的字段级校验与转换装饰器,用来在模型创建或赋值时,对指定字段做规则检查和/或值变换。例如

1
2
3
4
5
6
@field_validator("name")
def _validate_name(cls, v: str) -> str:#cls 表示当前模型类
v = str(v).strip()
if not v:
raise ValueError("name 不能为空")
return v

这个验证器的作用是确保 name 字段不能为空,这个 _validate_name 验证器会在 Pydantic 模型实例化(创建对象)时自动调用

生成项目结构

1
2
3
4
─src
│ └─pytest_demo
│ calculator.py
│ __init__.py

生成calculator.py类用于后续学习pytest,以上树状结构使用tree /f生成

__init__.py 文件的作用

__init__.py 文件的主要作用是告诉Python解释器这个目录是一个Python包(package)。当Python看到一个包含 __init__.py 文件的目录时,就会将其识别为一个可导入的包。

1
2
3
4
5
6
7
"""
pytest_demo package
"""

from .calculator import Calculator

__all__ = ['Calculator']

相对导入 : from .calculator import Calculator

  • .calculator 表示从当前包内的 calculator.py 模块导入 Calculator 类
  • 点号 . 表示相对导入,指向当前包

__all__ 列表 : __all__ = ['Calculator']

  • 定义了当使用 from pytest_demo import * 时会导入哪些对象
  • 这是一个显式的公共API声明

pytest 的自动发现规则

pytest 会自动查找以下内容:

  • 文件名:test_*.py*_test.py
  • 函数名:test_*()
  • 类名:Test*(类中方法也需以 test_ 开头)

测试文件

1
2
└─tests
│ test_calculator.py
1
2
3
4
5
6
7
8
9
10
from pytest_demo.calculator import Calculator

def test_add() -> None:
calc = Calculator()
assert calc.add(2, 1) == 3


def test_div() -> None:
calc = Calculator()
assert calc.divide(2, 1) == 2

运行 uv run pytest

image-20251018194456963

vscode支持pytest的可视化页面

ctrl+shift+p搜索

image-20251018194539467
image-20251018194723165

使用pytest.raises验证异常

1
2
3
4
5
6
import pytest

def test_divide_by_zero() -> None:
calc = Calculator()
with pytest.raises(ZeroDivisionError):
calc.divide(2, 0)

当你测试的函数应该在特定条件下抛出异常(比如传入非法参数、除零错误等),你可以用 pytest.raises 来验证:

“这段代码是否如预期那样,抛出了我们想要的异常?”

如果:

  • ✅ 抛出了指定类型的异常 → 测试通过
  • ❌ 没有抛出异常 → 测试失败
  • ❌ 抛出了其他类型的异常 → 测试失败

@pytest.fixture提供测试参数

@pytest.fixturepytest 中最核心、最强大的功能之一,它的作用是:

为测试函数提供可复用的、隔离的“测试依赖”(如对象、数据、资源、环境等),并管理它们的生命周期。

1
2
3
4
5
6
7
#工厂函数
@pytest.fixture
def calc() -> Calculator:
return Calculator()

def test_add(calc: Calculator) -> None:
assert calc.add(2, 1) == 3

在 pytest 中,当测试函数(或另一个 fixture)的参数名与某个 fixture 的名称相同时,pytest 会自动调用该 fixture,并将其返回值传入测试函数

这是 pytest 依赖注入机制的核心,也是 fixture 能“自动生效”的原因。

工厂函数(Factory Function) 是一种返回对象(通常是类的实例或其他函数)的函数,它的名字来源于“工厂模式”——就像工厂生产产品一样,这个函数“生产”对象。

scope 参数

1
2
3
4
5
6
7
8
9
10
@pytest.fixture(scope="session")  # 整个测试会话只启动一次
def browser():
driver = webdriver.Chrome()
yield driver
driver.quit()

def test_login(browser):
browser.get("/login")
def test_profile(browser):
browser.get("/profile")

好处:避免创建多个实例

支持的作用域:

  • function(默认):每个测试函数
  • class:每个测试类
  • module:每个 .py 文件
  • session:整个测试运行

自动管理资源生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pytest.fixture
def temp_file():
# setup: 创建临时文件
path = "temp.txt"
with open(path, "w") as f:
f.write("hello")

yield path # 测试函数在此处获得 path

# teardown: 清理
os.remove(path)

def test_read_file(temp_file):
with open(temp_file) as f:
assert f.read() == "hello"

正变换,逆变换与逆向映射法

正变换(Forward Mapping):

  • 思路:遍历原图每个像素 (x, y) → 计算它在新图中的位置 (u, v)
  • 问题:
    • (u, v) 很可能是非整数坐标(如 (123.7, 45.2)
    • 无法直接赋值给新图的整数像素位置
    • 即使四舍五入,也会导致:
      • 空洞:某些新图像素从未被赋值 → 黑色或空白
      • 重叠:多个原图像素映射到同一新图像素 → 颜色混合或丢失

图像插值方法(Interpolation Methods)

当你对图像做以下操作时,就需要插值:

  • 放大(zoom in):像素变多,新像素值从哪来?
  • 缩小(zoom out):像素变少,如何保留信息?
  • 旋转、仿射变换、透视变换:目标像素位置可能落在原图两个像素之间,需要“估算”颜色。

插值就是估算非整数坐标处像素值的方法

最近邻插值(Nearest Neighbor)

  • 原理:新像素的值 = 离它最近的原图像素值
  • 计算:不计算!直接“抄最近的”
  • 特点
    • ⚡ 速度最快
    • 🧱 放大后有明显“马赛克”或“锯齿”(blocky / jagged)
    • ❌ 不适合高质量图像处理

双线性插值(Bilinear Interpolation)

  • 原理:在 2×2 邻域(4个像素)内,先水平插值,再垂直插值
  • 计算:加权平均(距离越近,权重越大)
  • 特点
    • ⏱ 速度较快
    • 🌫 图像变“柔和”,边缘轻微模糊
    • ✅ 是默认的 resize 插值方法(OpenCV 默认值)

🖼 举例(1D 简化): 原值:A=100(位置0),B=200(位置1) 在 0.3 处插值 → 100×(1−0.3) + 200×0.3 = 130 2D 就是两次这样的操作(x方向 + y方向)

双三次插值(Bicubic Interpolation)

  • 原理:在 4×4 邻域(16个像素)内,用三次多项式拟合曲面
  • 计算:复杂,涉及更多像素和权重函数(常用 Catmull-Rom 样条)
  • 特点
    • 🐢 速度较慢(约是 bilinear 的 4~8 倍)
    • 🔍 边缘更锐利,细节保留更好
    • ⚠️ 可能产生“过冲”(overshoot):边缘出现亮/暗 halo(光晕)

图像逻辑运算

单尺度和多尺度Retinex增强方法实现图像增强

图像模板运算

邀请码

image-20250921100514116

邀请链接

image-20250921100529781
image-20250921100700770

在表里面增加一个邀请链接的字段,每个用户有专属的邀请链接

算法流程

image-20250924135355928
  1. 对文本进行分块后,分别存储在kvstorageVectorDB Storage

  2. 然后调用大模型进行实体与关系抽取Extract Entities & Relations

  3. 然后进行实体与关系的存储Entities Data / Relations Data),去重后Deduplication),再次调用embedding模型存储于VectorDB Storage

  4. 再次调用大模型,对关系与实体的描述进行精炼或合并(Update Description),并存储到Knowledge Graph

image

LightRAG 使用 4 种类型的存储用于不同目的:

  • KV_STORAGE:llm 响应缓存、文本块、文档信息
  • VECTOR_STORAGE:实体向量、关系向量、块向量
  • GRAPH_STORAGE:实体关系图
  • DOC_STATUS_STORAGE:文档索引状态
image-20250924142554264

值得注意的是,local检索模式仅使用low_level_keywords,而global检索模式仅支持high_level_keywords,从算法流程图可以看出来,前者更侧重于检索实体,后者则侧重于关系

高关键词(High-level Keywords)用于捕获查询的核心意图和主题概念;低关键词(Low-level Keywords)用于识别具体的实体和细节信息

text_units的作用是:

  • 每条实体/关系记录在 KV 里都挂着 chunk_ids 列表。
  • 把 Top-K 结果里的 chunk_ids合并 + 去重,得到原始文本块序号。
  • 用序号去 Nano VectorDB 里把对应 text_units(原始句子/段落) 拉回,作为 上下文原始语料
image-20250924135917313

安装

安装LightRAG服务器

LightRAG服务器旨在提供Web UI和API支持。Web UI便于文档索引、知识图谱探索和简单的RAG查询界面。LightRAG服务器还提供兼容Ollama的接口,旨在将LightRAG模拟为Ollama聊天模型。这使得AI聊天机器人(如Open WebUI)可以轻松访问LightRAG。

1
2
3
pip install "lightrag-hku[api]"
cp env.example .env
lightrag-server

在此获取LightRAG docker镜像历史版本: LightRAG Docker Images

安装 LightRAG Core

1
pip install lightrag-hku

LightRAG 服务器和 WebUI

LightRAG/lightrag/api/README-zh.md at main · HKUDS/LightRAG

LightRAG 服务器旨在提供 Web 界面和 API 支持。Web 界面便于文档索引、知识图谱探索和简单的 RAG 查询界面。

使用环境变量来配置 LightRAG 服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
LLM_BINDING=openai
LLM_MODEL=qwen3-max-preview
LLM_BINDING_HOST=https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_BINDING_API_KEY=sk-

EMBEDDING_BINDING=openai
EMBEDDING_MODEL=text-embedding-v4
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY=sk-
EMBEDDING_BINDING_HOST=https://dashscope.aliyuncs.com/compatible-mode/v1

############################
### 数据存储选择
############################
### 默认存储(推荐用于小规模部署)
LIGHTRAG_KV_STORAGE=JsonKVStorage
LIGHTRAG_DOC_STATUS_STORAGE=JsonDocStatusStorage
LIGHTRAG_GRAPH_STORAGE=NetworkXStorage
LIGHTRAG_VECTOR_STORAGE=NanoVectorDBStorage

### 图存储(推荐用于生产部署)
# LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
# LIGHTRAG_GRAPH_STORAGE=MemgraphStorage

### PostgreSQL
# LIGHTRAG_KV_STORAGE=PGKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
# LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
# LIGHTRAG_VECTOR_STORAGE=PGVectorStorage

### PostgreSQL 配置
# POSTGRES_HOST=localhost
# POSTGRES_PORT=5432
# POSTGRES_USER=您的用户名
# POSTGRES_PASSWORD='您的密码'
# POSTGRES_DATABASE=您的数据库

### Neo4j 配置
# NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
# NEO4J_USERNAME=neo4j
# NEO4J_PASSWORD='您的密码'
# NEO4J_DATABASE=noe4j

### Milvus 配置
# MILVUS_URI=http://localhost:19530
# MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=您的密码

启动

1
lightrag-server

这是因为每次启动时,LightRAG Server会将.env文件中的环境变量加载至系统环境变量,且系统环境变量的设置具有更高优先级。

启动时可以通过命令行参数覆盖.env文件中的配置。常用的命令行参数包括:

  • --host:服务器监听地址(默认:0.0.0.0)
  • --port:服务器监听端口(默认:9621)
  • --working-dir:数据库持久化目录(默认:./rag_storage)
  • --input-dir:上传文件存放目录(默认:./inputs)
  • --workspace: 工作空间名称,用于逻辑上隔离多个LightRAG实例之间的数据(默认:空)

启动多个LightRAG实例

所有实例共享一套相同的.env配置文件,然后通过命令行参数来为每个实例指定不同的服务器监听端口和工作空间。你可以在同一个工作目录中通过不同的命令行参数启动多个LightRAG实例。例如:

1
2
3
4
5
# 启动实例1
lightrag-server --port 9621 --workspace space1

# 启动实例2
lightrag-server --port 9622 --workspace space2

运行

image-20250926162648415
image-20250926162703167
image-20250926162714883

LightRAG的数据隔离

需要通过配置工作空间来实现数据隔离,否则不同实例的数据将会出现冲突并被破坏。

LightRAG 使用 4 种类型的存储用于不同目的:

  • KV_STORAGE:llm 响应缓存、文本块、文档信息
  • VECTOR_STORAGE:实体向量、关系向量、块向量
  • GRAPH_STORAGE:实体关系图
  • DOC_STATUS_STORAGE:文档索引状态

每种存储类型都有多种存储实现方式。LightRAG Server默认的存储实现为内存数据库,数据通过文件持久化保存到WORKING_DIR目录。LightRAG还支持PostgreSQL、MongoDB、FAISS、Milvus、Qdrant、Neo4j、Memgraph和Redis等存储实现方式。

您可以通过环境变量选择存储实现。例如,在首次启动 API 服务器之前,您可以将以下环境变量设置为特定的存储实现名称:

1
2
3
4
LIGHTRAG_KV_STORAGE=PGKVStorage
LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage

文档和块处理逻辑说明

LightRAG 中的文档处理流程有些复杂,分为两个主要阶段:提取阶段(实体和关系提取)和合并阶段(实体和关系合并)。有两个关键参数控制流程并发性:并行处理的最大文件数(MAX_PARALLEL_INSERT)和最大并发 LLM 请求数(MAX_ASYNC)。工作流程描述如下:

  1. MAX_ASYNC 限制系统中并发 LLM 请求的总数,包括查询、提取和合并的请求。LLM 请求具有不同的优先级:查询操作优先级最高,其次是合并,然后是提取。
  2. MAX_PARALLEL_INSERT 控制提取阶段并行处理的文件数量。MAX_PARALLEL_INSERT建议设置为2~10之间,通常设置为 MAX_ASYNC/3,设置太大会导致合并阶段不同文档之间实体和关系重名的机会增大,降低合并阶段的效率。
  3. 在单个文件中,来自不同文本块的实体和关系提取是并发处理的,并发度由 MAX_ASYNC 设置。只有在处理完 MAX_ASYNC 个文本块后,系统才会继续处理同一文件中的下一批文本块。
  4. 当一个文件完成实体和关系提后,将进入实体和关系合并阶段。这一阶段也会并发处理多个实体和关系,其并发度同样是由 MAX_ASYNC 控制。
  5. 合并阶段的 LLM 请求的优先级别高于提取阶段,目的是让进入合并阶段的文件尽快完成处理,并让处理结果尽快更新到向量数据库中。
  6. 为防止竞争条件,合并阶段会避免并发处理同一个实体或关系,当多个文件中都涉及同一个实体或关系需要合并的时候他们会串行执行。
  7. 每个文件在流程中被视为一个原子处理单元。只有当其所有文本块都完成提取和合并后,文件才会被标记为成功处理。如果在处理过程中发生任何错误,整个文件将被标记为失败,并且必须重新处理。
  8. 当由于错误而重新处理文件时,由于 LLM 缓存,先前处理的文本块可以快速跳过。尽管 LLM 缓存在合并阶段也会被利用,但合并顺序的不一致可能会限制其在此阶段的有效性。
  9. 如果在提取过程中发生错误,系统不会保留任何中间结果。如果在合并过程中发生错误,已合并的实体和关系可能会被保留;当重新处理同一文件时,重新提取的实体和关系将与现有实体和关系合并,而不会影响查询结果。
  10. 在合并阶段结束时,所有实体和关系数据都会在向量数据库中更新。如果此时发生错误,某些更新可能会被保留。但是,下一次处理尝试将覆盖先前结果,确保成功重新处理的文件不会影响未来查询结果的完整性。

大家好,我想请教一下如何更好的使用这个项目,我现在处理的好慢,昨天从晚上十点跑到今天早上7点,只处理好33份文件[苦涩]我看日志,一份47页的pdf花了一个小时(mineru处理了十分钟)。 我自己总结下是有以下几个 1.我应该使用milvus和neo4j存储,而不是图方便存本地 2.设置一下并发,而不是每次只处理一个文件 3.不对图片进行处理了(感觉对我当前的场景没有必要) 针对第一点,我想问一下,存储到数据库是影响检索速率还是存储(我个人感觉是不是只影响检索啊)

Embedding模型应该本地部署,这样速度才比较快。一个文本块的Embedding速度在本地是毫秒级别的。调用云端API通常是秒级别的。 没有必要都用RagAnything来处理,先试一下用LightRAG来处理看一把速度如何。日后LightRAG会进程RagAnything,到时候可以灵活地指定每个文件的处理方式。 LightRAG的处理速度组要受到LLM的影响,与使用什么存储关系不是十分大。 LLM每秒能够输出的Tokens数量和支持的最大并发数决定了文档处理的速度。 例如一个LLM在8个并发的时候能够达到 400Tokens/秒 的峰值,预计处理速度达到15~20秒处理一个文本块。文本块的处理速度与识别出来的实体数量和实体关系需要合并的数量是有关的,因此不同文档是有所不同的。 可以用以上经验值来评估一下自己的系统处理速度是否合理。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class LightRAGStorage:
"""LightRAG存储和检索类

支持多种存储后端:
- KV存储: PostgreSQL
- 文档状态存储: PostgreSQL
- 图存储: Neo4j
- 向量存储: Milvus

使用workspace实现数据隔离
"""

def __init__(self, workspace: str = "default"):
"""初始化LightRAG存储

Args:
workspace: 工作空间名称,用于数据隔离
"""
self.workspace = workspace

# 加载环境变量
load_dotenv()

# 使用文件同级的 lightrag_storage 目录
self.working_dir = os.path.join(os.path.dirname(__file__), "lightrag_storage")

self.rag: Optional[LightRAG] = None

# 确保工作目录存在
os.makedirs(self.working_dir, exist_ok=True)

async def _get_llm_model_func(self):
"""LLM模型函数"""
async def llm_model_func(
prompt,
system_prompt=None,
history_messages=[],
keyword_extraction=False,
**kwargs
) -> str:
return await openai_complete_if_cache(
model=os.getenv("DASHSCOPE_MODEL", "qwen-max"),
prompt=prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
**kwargs
)
return llm_model_func

async def _get_embedding_func(self):
"""嵌入模型函数"""
async def embedding_func(texts: List[str]) -> np.ndarray:
return await openai_embed(
texts,
model=os.getenv("DASHSCOPE_EMBEDDING_MODEL", "text-embedding-v4"),
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1")
)
return embedding_func


async def initialize(self) -> None:
"""初始化LightRAG实例"""
if self.rag is not None:
return

# 获取模型函数
llm_model_func = await self._get_llm_model_func()
embedding_func = await self._get_embedding_func()

# 存储配置 - 统一使用字符串方式,让LightRAG自动处理
# 图存储配置
graph_storage = os.getenv("LIGHTRAG_GRAPH_STORAGE", "Neo4JStorage")

# KV存储配置
kv_storage = os.getenv("LIGHTRAG_KV_STORAGE", "PGKVStorage")

# 文档状态存储配置
doc_status_storage = os.getenv("LIGHTRAG_DOC_STATUS_STORAGE", "PGDocStatusStorage")

# 向量存储配置
vector_storage = os.getenv("LIGHTRAG_VECTOR_STORAGE", "MilvusVectorDBStorage")

# 创建LightRAG实例
self.rag = LightRAG(
working_dir=self.working_dir,
embedding_func=EmbeddingFunc(
func=embedding_func,
embedding_dim=int(os.getenv("EMBEDDING_DIM", 1024))
),
llm_model_func=llm_model_func,
workspace=self.workspace,
graph_storage=graph_storage,
kv_storage=kv_storage,
doc_status_storage=doc_status_storage,
vector_storage=vector_storage
)

# 初始化存储
await self.rag.initialize_storages()
await initialize_pipeline_status()

async def insert_text(self, text: str) -> None:
"""插入文本到LightRAG

Args:
text: 要插入的文本内容
"""
if self.rag is None:
await self.initialize()

await self.rag.ainsert(text)

async def insert_texts(self, texts: List[str]) -> None:
"""批量插入文本

Args:
texts: 文本列表
"""
for text in texts:
await self.insert_text(text)

async def query(
self,
query: str,
mode: str = "hybrid",
**kwargs
) -> str:
"""执行查询

Args:
query: 查询文本
mode: 查询模式 ("naive", "local", "global", "hybrid")
**kwargs: 其他查询参数

Returns:
查询结果
"""
if self.rag is None:
await self.initialize()

return await self.rag.aquery(
query,
param=QueryParam(mode=mode, **kwargs)
)

参考资料

通过实际案例拆解Light RAG,构建可视化知识图谱,解析Graph RAG的运行原理和关键概念(一)_哔哩哔哩_bilibili

AI知识图谱 GraphRAG 是怎么回事?_哔哩哔哩_bilibili

LightRAG/README-zh.md 在主 ·香港科技大学/LightRAG — LightRAG/README-zh.md at main · HKUDS/LightRAG

认识数据

数据的属性

属性类型 定义 举例
标称属性 (Nominal) 仅用于区分不同类别,无顺序关系 性别(男/女)、颜色(红/绿/蓝)
二元属性 (Binary) 仅有两个类别的标称属性 是否存活(是/否)、性别(男/女)
序数属性 (Ordinal) 类别之间存在明确的顺序关系 教育水平(小学/中学/大学)、舱位等级(1等/2等/3等)
数值属性 (Numeric) 可度量的数值,可进行数学运算 年龄、票价

数据相似度的计算方法

关联分析算法

频繁项集

apriori算法

fp-growth

十分钟速通 | FP-Tree算法_哔哩哔哩_bilibili

频繁模式挖掘FP-growth算法分析例题讲解_哔哩哔哩_bilibili

规则评估方法

聚类

1. K-means(K均值聚类)

✅ 核心思想:

  • 假设:簇是“球状”、“大小相近”、“围绕中心分布”的。
  • 目标:把数据分成 K 个簇,使得 每个点到其所属簇中心的距离平方和最小

🔄 算法步骤(迭代优化):

Step 1:初始化

  • 随机选择 K 个点作为初始簇中心(centroids)。
    • 也可以用 K-means++ 更智能地选初始点(避免坏初始化)。

Step 2:分配阶段(Assignment)

  • 对每个数据点 *xi* ,计算它到 每个簇中心** 的距离(通常用欧氏距离);
  • 将 *xi* 分配给 距离最近的簇中心** 所在的簇。

💡 例如:点 (1,2) 到中心 A(0,0) 距离是 2.24,到 B(3,3) 是 2.24 → 可能分给 A(打破平局规则)。

Step 3:更新阶段(Update)

  • 对每个簇,重新计算其新的簇中心:取该簇中所有点的坐标均值
    • 例如:簇中有 (0,0)、(1,1)、(2,2) → 新中心 = ((0+1+2)/3, (0+1+2)/3) = (1,1)

Step 4:判断收敛

  • 如果簇中心不再变化(或变化小于阈值),算法结束;
  • 否则,回到 Step 2 继续迭代。

✅ 优点:

  • 简单、高效、易于实现;
  • 适合大规模数据(时间复杂度 ≈ O(nKIt))。

❌ 缺点:

  • 必须提前知道 K(簇数量);
  • 对异常值敏感;
  • 只能识别凸形(球状)簇,无法处理月牙形、环形等。

2. AGNES(Agglomerative Nesting,凝聚层次聚类)

✅ 核心思想:

  • 自底向上:开始时每个点自己是一个簇,然后不断合并最相似的两个簇,直到只剩一个簇。
  • 树状图(dendrogram) 展示合并过程,你可以“切一刀”得到任意数量的簇。

🔄 算法步骤:

Step 1:初始化

  • 将每个数据点视为一个独立的簇。共 n 个簇。

Step 2:计算簇间距离

  • 对每一对簇,计算它们之间的距离。
    • 距离定义方式(关键!):
      • Single Linkage:两个簇中最近两点的距离;
      • Complete Linkage:两个簇中最远两点的距离;
      • Average Linkage:所有点对的平均距离

Step 3:合并最近的两个簇

  • 找到距离最小的两个簇,将它们合并为一个新簇

Step 4:更新距离矩阵

  • 从距离矩阵中删除这两个旧簇;
  • 添加新簇,并计算它与其他所有簇的距离。

Step 5:重复

  • 重复 Step 2–4,直到只剩一个簇(或达到预设簇数)。

🔗 簇间距离定义(关键!):

链接方式 定义 特点
Single Linkage 两个簇中最近两点的距离 能识别链状结构(如 Moons),但易受噪声影响
Complete Linkage 两个簇中最远两点的距离 倾向生成大小相近的紧凑簇
Average Linkage 所有点对的平均距离 折中方案,较稳健

✅ 优点:

  • 不需要预设 K(后期从树状图决定);
  • 能发现嵌套结构;
  • 可处理非球形簇(尤其 Single Linkage)。

❌ 缺点:

  • 时间复杂度高(O(n²) 或 O(n³)),不适合大数据;
  • 一旦合并不能撤销(贪心算法)。

3. DBSCAN(基于密度的聚类)

✅ 核心思想:

  • 高密度区域 = 簇,低密度区域 = 噪声
  • 不需要指定簇数量;
  • 能发现任意形状的簇,并自动识别离群点

🔑 两个关键参数:

  • eps:邻域半径(多近算邻居?)
  • min_samples:成为核心点所需的最小邻居数(多密才算高密度?)

🔄 点的类型:

类型 定义
核心点 eps范围内 ≥min_samples个点
边界点 不是核心点,但在某个核心点的邻域内
噪声点 既不是核心点,也不被任何核心点覆盖(标记为 -1)

🔄 算法步骤:

Step 1:初始化

  • 所有点标记为 未访问(unvisited)
  • 簇编号 cluster_id = 0

Step 2:遍历每个点

  • 对每个未访问的点 P
    1. 标记 P 为 已访问
    2. 找出 P 的 eps 邻域内的所有点(称为 neighbors);
    3. 如果 |neighbors| < MinPts
      • 将 P 标记为 噪声(noise)
      • 继续下一个点;
    4. 否则(P 是核心点)
      • cluster_id += 1,创建新簇;
      • 将 P 加入新簇;
      • neighbors 中所有点加入一个种子集合(seeds)
      • 扩展簇
        • 从 seeds 中取出一个点 Q;
        • 如果 Q 未被访问,标记为已访问,并检查其邻域;
          • 如果 Q 是核心点,将其邻居加入 seeds;
        • 将 Q 加入当前簇;
        • 重复直到 seeds 为空。

Step 3:输出

  • 返回每个点的簇标签(噪声点为 -1)。

✅ 优点:

  • 不需要 K;
  • 能处理任意形状(月牙、环形等);
  • 对噪声鲁棒;
  • 一次扫描完成(高效)。

❌ 缺点:

  • epsmin_samples 敏感;
  • 密度差异大的数据上表现差(比如一个密簇 + 一个稀疏簇);
  • 高维数据中“距离失效”(维度灾难)。

轮廓系数

轮廓系数(Silhouette Coefficient)是一种无监督聚类评估指标,用于衡量聚类结果的“好坏”——它告诉我们:簇内是否紧密,簇间是否分离

它的取值范围是 [-1, 1]: - 接近 1:簇内紧密,簇间远离 → 聚类效果好; - 接近 0:簇间边界模糊 → 聚类效果一般; - 接近 -1:样本可能被分错簇 → 聚类效果差。

下面我将一步步带你理解轮廓系数的计算过程,并配上具体数值例子 + 公式 + Python 验证

轮廓系数的定义(对单个样本)

对数据集中的每一个样本 i,计算其轮廓系数 $ s(i) $:

$$ s(i) = \frac{b(i) - a(i)}{\max\{a(i), b(i)\}} $$

其中: - $ a(i) $:样本 i 到同簇其他点平均距离簇内不相似度); - $ b(i) $:样本 i 到最近的其他簇中所有点的平均距离的最小值(最近簇间距离)。

💡 直观理解: - $ a(i) $ 越小越好(和自己人很近); - $ b(i) $ 越大越好(离别人很远); - 所以 $ b(i) - a(i) $ 越大,$ s(i) $ 越接近 1。

0%