GIL是什么

GIL(Global Interpreter Lock,全局解释器锁)CPython 解释器(Python 的官方实现)中的一个互斥锁(mutex),它确保同一时刻只有一个线程执行 Python 字节码

🔒 互斥锁的定义:

互斥锁是一种同步原语,用于防止多个线程同时访问共享资源

GIL 正是这样一把锁:

  • 共享资源 = CPython 解释器的内部状态(如对象引用计数、内存管理器)
  • 保护方式 = 任何线程要执行 Python 代码,必须先获取 GIL

为什么要设计GIL

GIL 的核心原因:CPython 的内存管理模型

🔑 关键点:

CPython 使用“引用计数(Reference Counting)”作为主要的内存管理机制,而引用计数的增减操作必须是原子的

🔑 核心逻辑链:

Python(CPython)使用引用计数(Reference Counting)管理内存

引用计数的增减必须是原子操作(否则会出错)

为了保证原子性,CPython 引入了 GIL(全局互斥锁)

GIL 确保同一时刻只有一个线程能修改引用计数

什么是引用计数(Reference Counting)

在 CPython 中,每个 Python 对象(如 list, str, 自定义类实例)都有一个字段叫 ob_refcnt,记录“有多少变量/容器引用了它”。

1
2
3
4
a = [1, 2, 3]      # ob_refcnt = 1
b = a # ob_refcnt = 2
c = [a, "hello"] # ob_refcnt = 3(因为 c[0] 也引用了它)
del b # ob_refcnt = 2

ob_refcnt 降到 0 时,对象立即被销毁(内存回收)。

如果没有GIL

假设两个线程同时执行 b = a

线程 A 线程 B 实际 ob_refcnt
读取 ob_refcnt = 1 读取 ob_refcnt = 1 1
计算 1 + 1 = 2 计算 1 + 1 = 2
写回 ob_refcnt = 2 写回 ob_refcnt = 2 2(但正确值应为 3!)

👉 结果:引用计数错误 → 对象可能被提前释放(程序崩溃)或内存泄漏

💥 这就是竞态条件(Race Condition):多个线程/进程并发访问共享资源时,最终结果依赖于它们的执行顺序或 timing(时序),导致程序行为不可预测、错误或崩溃。

为什么 Java/Go 不需要GIL?

语言 内存管理 是否需要全局锁
Python (CPython) 引用计数(运行时增减) ✅ 需要(GIL)
Java / Go / C# 垃圾回收(GC) ❌ 不需要
Rust 编译期所有权检查 ❌ 不需要
  • GC 语言:对象分配/回收由专用 GC 线程处理,用户线程不直接操作引用计数
  • Rust:内存安全在编译期保证,运行时无引用计数开销

什么是GC

GC(Garbage Collection,垃圾回收) 是现代编程语言中自动管理内存的核心机制。其核心思想为, 自动找出“不再使用的对象”,并回收其内存,无需程序员手动释放。

🆚 对比:手动管理 vs 自动管理

方式 代表语言 特点
手动管理 C, C++ 程序员用malloc/freenew/delete管理内存 → 容易出错(内存泄漏、野指针)
自动管理 Java, Go, C#, Python 语言运行时自动回收内存 → 安全,但有性能开销

GC是如何工作的

主流 GC(如 Java、Go)使用 “可达性分析”(Reachability Analysis)判断对象是否存活:

🌳 核心概念:根对象(Roots)

  • 全局变量
  • 当前函数的局部变量
  • CPU 寄存器中的引用

📌 判断规则:

从根对象出发,能通过引用链到达的对象 = 存活对象
无法到达的对象 = 垃圾(可回收)

python的程序并行

由于 CPython 的 GIL(全局解释器锁)存在:

  • 多线程无法在多核 CPU 上并行执行 Python 字节码(尤其是 CPU 密集型任务)
  • 但多进程可以绕过 GIL,每个进程拥有独立的解释器和内存空间,因此能真正并行,充分利用多核 CPU。
类型 是否受 GIL 限制 能否利用多核 适用场景
多线程(Threading) ✅ 受限(CPU 任务串行) ❌ CPU 任务不能✅ I/O 任务可以(因释放 GIL) 网络请求、文件读写、数据库查询等 I/O 密集型
多进程(Multiprocessing) ❌ 不受限 ✅ 能(真并行) 图像处理、模型推理、加密计算等 CPU 密集型

📌 注意:“并行”(parallelism)≠ “并发”(concurrency)

  • 多线程在 Python 中实现的是 并发(交替执行),不是 并行(同时执行)(对 CPU 任务而言)。

参考资料

带大家感受一下没有GIL的CPython_哔哩哔哩_bilibili

【python】天使还是魔鬼?GIL的前世今生。一期视频全面了解GIL!_哔哩哔哩_bilibili

asyncio是什么

asyncioPython 标准库中的一个模块,用于编写异步(asynchronous)程序。它提供了一套完整的工具,让你可以用 async/await 语法编写并发代码,特别适合处理 I/O 密集型任务(比如网络请求、文件读写、数据库查询等),而不会阻塞整个程序。

概念学习

并发

并发(Concurrency)是一种“效果”——多个任务在一段时间内交替或同时推进。
它可以通过多种方式实现,常见的有:

  1. 多线程(Multithreading)
  2. 异步编程(Asynchronous programming,如 asyncio)
  3. 多进程(Multiprocessing)(严格说更偏向“并行”,但也支持并发)

进程(Process)线程(Thread)协程(Coroutine)

概念 定义
进程(Process) 操作系统进行资源分配和调度的基本单位。它是程序的一次执行实例,拥有独立的虚拟地址空间、文件描述符表、环境变量、信号处理表等内核资源。
线程(Thread) 进程内的执行流(execution context),是 CPU 调度的基本单位。同一进程内的多个线程共享该进程的地址空间和大部分资源(如堆、全局变量、打开的文件),但各自拥有独立的栈、寄存器状态和线程局部存储(TLS)。
协程(Coroutine) 一种用户态的轻量级并发原语,属于协作式多任务(cooperative multitasking)模型。协程的切换由程序显式控制(如通过 awaityield),不依赖操作系统调度,上下文切换在用户空间完成,无内核介入。

线程”在 CPU 核心上运行。
进程是资源容器,线程是执行单位。

内存与资源共享模型

模型 地址空间 堆(Heap) 栈(Stack) 同步机制
进程 独立 独立 独立 需 IPC(如管道、消息队列、共享内存 + 信号量)
线程 共享(同进程内) 共享 独立(每个线程一个栈) 需互斥锁(Mutex)、条件变量等防止数据竞争
协程 共享(同一线程内) 共享 逻辑独立(由运行时管理协程栈或使用生成器状态) 通常无需锁(因单线程串行执行),但需注意异步回调中的状态一致性

在 Python 中的实现

模型 标准库模块 关键 API
进程 multiprocessing Process, Pool, Queue, Pipe, Manager
线程 threading Thread, Lock, Condition, Semaphore
协程 asyncio + async/await async def, await, asyncio.run(), create_task(), gather()

进程资源隔离与并行计算的基石;

线程操作系统级并发的传统手段,但在 Python 中受 GIL 限制;

协程高并发 I/O 的现代解决方案,以极低开销实现大规模并发,已成为 Python 异步编程的事实标准。

1
2
3
4
操作系统
└── 进程(Process) ← 资源容器(内存、文件描述符等)
└── 线程(Thread) ← CPU 调度单元(至少一个)
└── 协程(Coroutine) ← 用户态逻辑任务(可多个,协作式切换)

当你运行一个 Python 脚本(如 python app.py),操作系统会:

  1. 创建一个新进程(Process)
  2. 加载 Python 解释器(CPython)
  3. 在该进程中启动主线程(Main Thread)
  4. 执行你的代码

📌 所以:一个正在运行的 Python 程序 = 1 个进程 + 至少 1 个线程(主线程)

但注意:

  • 程序可以创建更多进程(通过 multiprocessing
  • 程序可以创建更多线程(通过 threading
  • 所以“一个程序”最终可能对应 多个进程、多个线程

Python 异步编程的核心机制

  1. async def 定义协程函数 → 返回 协程对象(Coroutine Object)
  2. await 用于挂起当前协程,等待另一个协程或异步操作完成
  3. 事件循环(Event Loop)(由 asyncio 提供)负责:
    • 调度协程
    • 管理 I/O 多路复用(如 epoll/kqueue
    • 在 I/O 就绪时恢复对应协程

实战学习

sync_demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from time import sleep, perf_counter

def fetch_url(url):
print('Fetching the URL')
sleep(1)#模拟阻塞
print('Finished fetching')
return 'url_content'

def read_file(filepath):
print('Reading the file')
sleep(1)#模拟阻塞
print('Finished reading')
return 'file_content'

def main():
url = 'example.com'
filepath = 'example.txt'
fetch_result = fetch_url(url)
read_result = read_file(filepath)

if __name__ == '__main__':
start_time = perf_counter()
main()
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上展示一份同步的代码,sleep用来模拟阻塞

1
2
3
4
5
Fetching the URL
Finished fetching
Reading the file
Finished reading
Time taken: 2.00 seconds

async_demo.py

异步编程的三步核心流程

1. 定义协程函数 → 2. 包装协程为任务 → 3. 建立事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from time import sleep, perf_counter
import asyncio

#定义协程函数
async def fetch_url(url):
print(f'Fetching the url')
#把阻塞操作包装成协程
await asyncio.sleep(1)
print('Finished fetching')
return 'url_content'

async def read_file(filepath):
print('Reading the file')
await asyncio.sleep(1)
print('Finished reading')
return 'file_content'

#若想使用await,需要把main函数定义成协程函数
async def main():
url = 'example.com'
filepath = 'example.txt'
#创建任务
tasks = [asyncio.create_task(coro) for coro in [
fetch_url(url),
read_file(filepath)]]
fetch_result = await tasks[0]
read_result = await tasks[1]

if __name__ == '__main__':
start_time = perf_counter()
#创建事件循环
asyncio.run(main())
end_time = perf_counter()
print(f'Time taken: {end_time - start_time:.2f} seconds')

以上则是改为了异步的代码

1
2
3
4
5
Fetching the url
Reading the file
Finished fetching
Finished reading
Time taken: 1.01 seconds

协程函数

协程函数(Coroutine Function)是 Python 中使用 async def 语法定义的函数,它是异步编程的核心构建单元。调用协程函数不会立即执行其内部代码,而是返回一个 协程对象(Coroutine Object),该对象必须由事件循环(如 asyncio)驱动或通过 await 在另一个协程中调用,才能真正执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

#协程函数
async def coroutine_func():
return 'coroutine_result'

def main():
print(coroutine_func())
result = asyncio.run(coroutine_func())
print("---")
print(result)


if __name__ == '__main__':
main()
1
2
3
4
5
6
<coroutine object coroutine_func at 0x000001958D88A4B0>
d:\code\python\learn_pythontips\learn_async\coroutine_func.py:8: RuntimeWarning: coroutine 'coroutine_func' was never awaited
print(coroutine_func())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
---
coroutine_result

由输出可见,coroutine object,返回的是一个协程对象

await关键字

await 是 Python 异步编程中的核心关键字,它的作用是:

暂停当前协程的执行,等待一个“可等待对象”(awaitable)完成,并获取其结果,同时将控制权交还给事件循环,使其能运行其他任务。

✅ 三大功能:

  1. 挂起(Suspend):当前协程在此处暂停,不阻塞线程。
  2. 等待(Wait):等待一个异步操作(如网络请求、文件读写、定时器)完成。
  3. 恢复(Resume):当被等待的对象完成后,协程从此处恢复执行,并拿到结果。

⚠️ 关键:await 不会阻塞整个线程,而是让事件循环去执行其他就绪的协程。

事件循环(Event Loop)与任务(Task)

事件循环(Event Loop) 是异步编程的核心引擎,尤其在 Python 的 asyncio 模型中,它是驱动协程执行、管理异步 I/O、调度任务的中枢系统

事件循环是一个程序结构,用于监听和分发事件或消息,实现非阻塞 I/O 和协作式多任务调度。

在 Python asyncio 中,事件循环:

  • 维护一个待执行协程队列
  • 管理定时器(如 asyncio.sleep
  • 使用操作系统提供的 I/O 多路复用机制(如 Linux 的 epoll、macOS 的 kqueue、Windows 的 IOCP)来高效监听大量文件描述符(如 socket)
  • 在 I/O 就绪时,恢复对应的协程

那事件循环如何知道哪些协程可以执行,哪些协程需要暂停呢

在 Python 异步编程(特别是 asyncio)中,任务(Task) 是是对协程(Coroutine)的封装,用于被事件循环调度和并发执行

Taskasyncio 中表示“未来会完成的异步操作”的对象,它是 Future 的子类,用于包装协程并自动调度其执行。

核心作用:

  1. 将协程注册到事件循环中,使其能够并发运行(而非顺序等待)
  2. 提供状态管理(如是否完成、是否取消、结果或异常)
  3. 支持取消操作task.cancel()
  4. 允许多次 await(协程对象只能 await 一次,但 Task 可以)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def work(name, delay):
print(f"开始 {name}")
await asyncio.sleep(delay)
print(f"完成 {name}")
return f"result-{name}"

async def main():
# 创建任务 → 立即开始执行!
task1 = asyncio.create_task(work("A", 2))
task2 = asyncio.create_task(work("B", 1))

# 等待任务完成(可获取结果)
r1 = await task1
r2 = await task2
print(r1, r2)

asyncio.run(main())
1
2
3
4
5
开始 A
开始 B
完成 B
完成 A
result-A result-B

💡 Task 是实现并发的关键:它让多个协程“同时启动”,而不是“一个接一个等”。

asyncio.gather

asyncio.gather用于并发运行多个 awaitable 对象(如协程、Task、Future),并按顺序返回它们的结果列表

默认:任意一个任务出错,其他任务会被取消(除非 return_exceptions=True

1
2
3
4
5
6
7
async def main():
url = 'example.com'
filepath = 'example.txt'
results = await asyncio.gather(
fetch_url(url),
read_file(filepath))
print(results)
1
2
3
4
5
6
Fetching the url
Reading the file
Finished fetching
Finished reading
['url_content', 'file_content']
Time taken: 1.00 seconds

asyncio.as_completed

asyncio.as_completed返回一个异步迭代器(async iterator),按任务完成的先后顺序,逐个产出已完成的 awaitable 对象(通常是 Task 或 Future)。

1
2
3
4
5
results = asyncio.as_completed([
fetch_url(url),
read_file(filepath)])
for result in results:
print(await result)

特点:先完成的任务先被处理,无需等待所有任务结束

asyncio.to_thread

await asyncio.to_thread(func, \*args)默认线程池中运行同步函数 func(*args),并返回结果。
它是 loop.run_in_executor(None, func, *args)高层封装

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import time

def blocking_io():
time.sleep(1)
return "Done!"

async def main():
result = await asyncio.to_thread(blocking_io)
print(result)

asyncio.run(main())

aiohttp和aiofiles

aiohttp:异步 HTTP 客户端与服务器框架

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio

async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

asyncio.run(fetch("https://example.com"))

aiofiles 是一个为标准文件操作提供异步接口的库,允许你在 async/await 代码中安全地读写文件,而不会阻塞事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import aiofiles
import asyncio

async def main():
# 写文件
async with aiofiles.open('data.txt', 'w') as f:
await f.write('Hello, async files!')

# 读文件
async with aiofiles.open('data.txt', 'r') as f:
content = await f.read()
print(content)

asyncio.run(main())

参考资料

【Py】asyncio:为异步编程而生 | Python 特性 | 并发编程 | 协程_哔哩哔哩_bilibili

【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。_哔哩哔哩_bilibili

为什么要有设计模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class DatabaseConnection:
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password

def connect(self):
return f"Connecting to database at {self.host}:{self.port} with username '{self.username}'"


def client():
main_db = DatabaseConnection('localhost', 3306, 'root', 'password123')
analytics_db = DatabaseConnection('192.168.1.1', 5432, 'admin', 'securepass')
cache_db = DatabaseConnection('10.0.0.1', 27017, 'cacheuser', 'cachepass')

print(main_db.connect())
print(analytics_db.connect())
print(cache_db.connect())


client()

这是数据库连接类的范例,其存在以下问题

  1. 数据库连接信息(主机、端口、用户名、密码)直接写在代码中,缺乏配置管理,难以在不同环境间切换,如果要更改数据库的信息,就要更改项目中每一处的连接数据库的参数,难以维护
  2. 每次都需要手动传入相同类型的参数,增加出错的风险

工厂模式(Factory Pattern)

工厂模式 是一种创建型设计模式,它提供了一种创建对象的接口,但让子类决定实例化哪一个类。换句话说:把对象的创建过程封装起来,调用者不需要关心具体创建的是哪个类,只需要知道“我要一个某种类型的东西”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def connection_factory(db_type):
db_configs = {
'main': {
'host': 'localhost',
'port': 3306,
'username': 'root',
'password': 'password123'
},
'analytics': {
'host': '192.168.1.1',
'port': 5432,
'username': 'admin',
'password': 'securepass'
},
'cache': {
'host': '10.0.0.1',
'port': 27017,
'username': 'cacheuser',
'password': 'cachepass'
}
}

return DatabaseConnection(**db_configs[db_type])


# 测试工厂模式
if __name__ == "__main__":
# 使用工厂函数创建不同类型的数据库连接
main_db = connection_factory('main')
analytics_db = connection_factory('analytics')
cache_db = connection_factory('cache')

# 测试连接
print(main_db.connect())
print(analytics_db.connect())
print(cache_db.connect())

将数据库的配置信息提取到config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#factory_pattern.py
def connection_factory(db_type):
"""
工厂函数:根据数据库类型创建相应的数据库连接

Args:
db_type (str): 数据库类型 ('main', 'analytics', 'cache')

Returns:
DatabaseConnection: 数据库连接实例

Raises:
KeyError: 当提供的数据库类型不存在时
"""
from config import DATABASE_CONFIGS
if db_type not in DATABASE_CONFIGS:
raise ValueError(f"Unknown database type: {db_type}. Available types: {list(DATABASE_CONFIGS.keys())}")

return DatabaseConnection(**DATABASE_CONFIGS[db_type])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#config.py
# 数据库配置字典
DATABASE_CONFIGS = {
'main': {
'host': 'localhost',
'port': 3306,
'username': 'root',
'password': 'password123'
},
'analytics': {
'host': '192.168.1.1',
'port': 5432,
'username': 'admin',
'password': 'securepass'
},
'cache': {
'host': '10.0.0.1',
'port': 27017,
'username': 'cacheuser',
'password': 'cachepass'
}
}

为什么要这么做

  • 所有配置信息都在一个地方,便于统一管理和维护
  • 易于修改 :配置变更不需要修改业务逻辑代码

工厂模式的核心就是工厂函数

工厂函数是一个返回对象的函数(而不是类),它封装了对象的创建逻辑,根据输入参数决定返回哪种具体对象。

建造者模式(Builder Pattern)

当你需要创建一个有很多属性、配置步骤繁多的对象时(比如一辆汽车、一个 HTTP 请求、一个数据库连接配置),直接用构造函数会非常混乱。
建造者模式通过“分步构建 + 最终组装”的方式,让创建过程清晰、灵活、可读性强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class DatabaseConnectionBuilder:
"""数据库连接建造者类"""

def __init__(self,host,port,username,password):
self._config = {
'host': host,
'port': port,
'username': username,
'password': password,
}

def set_max_connections(self, max_connections):
"""设置最大连接数"""
if max_connections is not None and max_connections <= 0:
raise ValueError("Max connections must be positive")
self._config['max_connections'] = max_connections
return self

def set_timeout(self, timeout):
"""设置超时时间"""
if timeout is not None and timeout <= 0:
raise ValueError("Connect timeout must be positive")
self._config['timeout'] = timeout
return self

def build(self):
"""构建最终的数据库连接对象"""
return DatabaseConnection(**self._config)


# 演示建造者模式的使用
if __name__ == "__main__":
# 创建一个数据库连接建造者
builder = DatabaseConnectionBuilder(
host='localhost',
port=3306,
username='root',
password='password123'
)

# 使用建造者模式构建数据库连接
connection = builder.set_max_connections(100) \
.set_timeout(30) \
.enable_ssl() \
.set_connection_pool('my_pool') \
.set_retry_attempts(3) \
.enable_compression() \
.set_read_preference('secondaryPreferred') \
.build()

单例模式(Singleton Pattern)

单例模式确保一个类只有一个实例,并提供一个全局访问点来获取该实例。

换句话说:
无论你调用多少次“创建对象”,返回的始终是同一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class DatabaseConnection:
"""数据库连接单例类"""

_instance = None
_lock = threading.Lock() # 线程锁,确保线程安全

def __new__(cls, *args, **kwargs):
"""重写__new__方法实现单例模式"""
if cls._instance is None:
with cls._lock: # 使用双重检查锁定
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, host, port, username, password):
"""初始化数据库连接参数"""
# 防止重复初始化
if hasattr(self, '_initialized'):
return

self.host = host
self.port = port
self.username = username
self.password = password
self._initialized = True

def connect(self):
"""连接数据库"""
return f"Connecting to database at {self.host}:{self.port} with username '{self.username}'"

def client():
db1 = DatabaseConnection(host='localhost', port=3306, username='root', password='password')
db2 = DatabaseConnection(host='localhost', port=3306, username='root', password='password')
print(db1 is db2)

client()

参考资料

【设计模式 inPy】一个视频搞懂三种设计模式:工厂、建造者和单例_哔哩哔哩_bilibili

为什么你应该忘掉设计模式?_哔哩哔哩_bilibili

【设计模式inPython】策略模式:不要再用一个类装所有方法啦!_哔哩哔哩_bilibili

Python项目结构和打包

pip install 的本质

PyPI (默认)或其他源(如私有仓库、本地文件)查找指定名称的包,下载对应的.whl文件

whl文件

  • .whlWheel 的缩写,是 Python 的一种标准打包格式(PEP 427 定义)。
  • 它本质上是一个 ZIP 格式的压缩包,扩展名改为 .whl

安装 .whl 文件

1
pip install package_name.whl

hachling

hatchling 是 Python 生态中一个现代的、轻量级的构建后端(build backend),主要用于将 Python 项目打包成可分发的格式(如 .whl 或源码包)。它是 Hatch 项目的一部分,由 PyPA(Python Packaging Authority)推荐使用。

在pyproject.toml添加

1
2
3
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

当你运行:

1
2
3
pip install .
# 或
python -m build

如果项目配置了 hatchling 作为构建后端,pipbuild 工具就会调用 hatchling 来完成打包和安装。

例如以下项目结构

1
2
3
4
5
6
7
8
my_project/
├── pyproject.toml
├── src/
│ └── my_utils/ ← 你写的包
│ ├── __init__.py
│ └── math.py
├── scripts/
│ └── run_demo.py ← 想在这里用 my_utils

想让项目的其他文件使用自己编写的包,在pyproject.toml增加

1
2
3
# 👇 新增:显式声明包位置
[tool.hatch.build.targets.wheel]
packages = ["src/my_utils"]

在项目根目录(my_project/)执行:

1
uv pip install -e .

✅ 这会把 src/my_utils/ 注册为一个可导入的包。

src layout

src layout(也称为 src 布局src 目录结构)是 Python 项目中一种推荐的源代码组织方式,其核心思想是:将你的 Python 包(package)放在一个名为 src/ 的子目录下,而不是直接放在项目根目录中。

❌ 传统布局(不推荐)

1
2
3
4
5
6
7
my_project/
├── my_package/
│ ├── __init__.py
│ └── module.py
├── tests/
├── setup.py
└── README.md

src 布局(推荐)

1
2
3
4
5
6
7
8
my_project/
├── src/
│ └── my_package/
│ ├── __init__.py
│ └── module.py
├── tests/
├── pyproject.toml
└── README.md

uv build和uv pip install .什么区别

一、uv build

✅ 作用:构建分发包(不安装)

  • 调用项目的构建后端(如 hatchlingsetuptools 等)。
  • 生成标准的分发文件:
    • 一个 Wheel 文件.whl
    • 一个 源码分发包(sdist,.tar.gz
  • 输出到项目根目录下的 dist/ 文件夹。
  • 不会将包安装到当前 Python 环境中

🔧 示例:

1
uv build

输出:

1
2
dist/my_package-0.1.0-py3-none-any.whl
dist/my_package-0.1.0.tar.gz

image-20251018192905210

二、uv pip install .

✅ 作用:安装当前项目到当前环境

  • 首先(隐式)构建项目(类似 uv build 的过程)。
  • 然后将构建结果安装到当前激活的 Python 环境(如虚拟环境或系统环境)。
  • 安装后,你可以在 Python 中 import 该包。
  • 默认是 “非可编辑安装”(即代码改动不会自动生效,除非重新安装)。

🔧 示例:

1
uv pip install .

效果:

  • 包被安装到 site-packages/
  • 可在 Python 中 import my_package

Editable install(可编辑安装)

Editable install(可编辑安装) 是 Python 包管理中的一种安装模式,它让你在安装一个包的同时,保留对源代码的直接引用

1
uv pip install -e .
  • 核心机制
    • 不复制代码到 site-packages/
    • 而是在 site-packages/ 中创建一个 .pth 文件my_package.egg-link,指向你本地项目中的 src/(或包目录)。
    • Python 解释器在导入时,会顺着这个链接去读你本地的源码。

image-20251018193022973

python是从哪里查找模块的

Python 查找模块(module)的机制由 模块搜索路径(module search path) 决定,这个路径是一个字符串列表,存储在 sys.path 中。当你执行 import some_module 时,Python 会按顺序在这个列表中的每个目录里查找对应的模块文件。

可以通过以下代码查看当前 Python 的模块搜索路径:

1
2
import sys
print(sys.path)

sys.path 通常包含以下几类路径(顺序很重要):

  1. 脚本所在目录(或当前工作目录)
  • 如果你运行 python /path/to/script.py,那么 /path/to/ 会被加到 sys.path[0]
  • 如果你运行 python 进入交互模式,或运行 python -c "...",则当前工作目录(os.getcwd() 会被放在首位。
  • ⚠️ 这是很多“意外导入”问题的根源(比如项目根目录下有同名包)。
  1. 环境变量 PYTHONPATH 中的目录
  • 类似系统的 PATH,你可以通过设置 PYTHONPATH 添加自定义搜索路径。
  • 示例(Linux/macOS):
    1
    2
    export PYTHONPATH="/my/custom/modules:$PYTHONPATH"
    python my_script.py
  • Windows(PowerShell):
    1
    $env:PYTHONPATH = "C:\my\custom\modules;" + $env:PYTHONPATH
  1. 标准库目录
  • Python 自带的模块(如 os, sys, json)所在位置。
  • 通常位于 Python 安装目录下的 lib/ 子目录中。
  1. 第三方包安装目录(site-packages)
  • 通过 pip installuv pip install 等安装的包,会被放到 site-packages 目录。
  • 路径可通过以下命令查看:
    1
    2
    3
    import site
    print(site.getsitepackages()) # 全局环境
    print(site.getusersitepackages()) # 用户级安装
  1. .pth 文件中指定的路径
  • 某些包(尤其是 editable install)会在 site-packages/ 中放置 .pth 文件,动态添加路径到 sys.path

参考资料

build + hatchling 15分钟搞懂Python项目结构和打包_哔哩哔哩_bilibili

@field_validator()

field_validator 是 Pydantic v2 的字段级校验与转换装饰器,用来在模型创建或赋值时,对指定字段做规则检查和/或值变换。例如

1
2
3
4
5
6
@field_validator("name")
def _validate_name(cls, v: str) -> str:#cls 表示当前模型类
v = str(v).strip()
if not v:
raise ValueError("name 不能为空")
return v

这个验证器的作用是确保 name 字段不能为空,这个 _validate_name 验证器会在 Pydantic 模型实例化(创建对象)时自动调用

生成项目结构

1
2
3
4
─src
│ └─pytest_demo
│ calculator.py
│ __init__.py

生成calculator.py类用于后续学习pytest,以上树状结构使用tree /f生成

__init__.py 文件的作用

__init__.py 文件的主要作用是告诉Python解释器这个目录是一个Python包(package)。当Python看到一个包含 __init__.py 文件的目录时,就会将其识别为一个可导入的包。

1
2
3
4
5
6
7
"""
pytest_demo package
"""

from .calculator import Calculator

__all__ = ['Calculator']

相对导入 : from .calculator import Calculator

  • .calculator 表示从当前包内的 calculator.py 模块导入 Calculator 类
  • 点号 . 表示相对导入,指向当前包

__all__ 列表 : __all__ = ['Calculator']

  • 定义了当使用 from pytest_demo import * 时会导入哪些对象
  • 这是一个显式的公共API声明

pytest 的自动发现规则

pytest 会自动查找以下内容:

  • 文件名:test_*.py*_test.py
  • 函数名:test_*()
  • 类名:Test*(类中方法也需以 test_ 开头)

测试文件

1
2
└─tests
│ test_calculator.py
1
2
3
4
5
6
7
8
9
10
from pytest_demo.calculator import Calculator

def test_add() -> None:
calc = Calculator()
assert calc.add(2, 1) == 3


def test_div() -> None:
calc = Calculator()
assert calc.divide(2, 1) == 2

运行 uv run pytest

image-20251018194456963

vscode支持pytest的可视化页面

ctrl+shift+p搜索

image-20251018194539467

image-20251018194723165

使用pytest.raises验证异常

1
2
3
4
5
6
import pytest

def test_divide_by_zero() -> None:
calc = Calculator()
with pytest.raises(ZeroDivisionError):
calc.divide(2, 0)

当你测试的函数应该在特定条件下抛出异常(比如传入非法参数、除零错误等),你可以用 pytest.raises 来验证:

“这段代码是否如预期那样,抛出了我们想要的异常?”

如果:

  • ✅ 抛出了指定类型的异常 → 测试通过
  • ❌ 没有抛出异常 → 测试失败
  • ❌ 抛出了其他类型的异常 → 测试失败

@pytest.fixture提供测试参数

@pytest.fixturepytest 中最核心、最强大的功能之一,它的作用是:

为测试函数提供可复用的、隔离的“测试依赖”(如对象、数据、资源、环境等),并管理它们的生命周期。

1
2
3
4
5
6
7
#工厂函数
@pytest.fixture
def calc() -> Calculator:
return Calculator()

def test_add(calc: Calculator) -> None:
assert calc.add(2, 1) == 3

在 pytest 中,当测试函数(或另一个 fixture)的参数名与某个 fixture 的名称相同时,pytest 会自动调用该 fixture,并将其返回值传入测试函数

这是 pytest 依赖注入机制的核心,也是 fixture 能“自动生效”的原因。

工厂函数(Factory Function) 是一种返回对象(通常是类的实例或其他函数)的函数,它的名字来源于“工厂模式”——就像工厂生产产品一样,这个函数“生产”对象。

scope 参数

1
2
3
4
5
6
7
8
9
10
@pytest.fixture(scope="session")  # 整个测试会话只启动一次
def browser():
driver = webdriver.Chrome()
yield driver
driver.quit()

def test_login(browser):
browser.get("/login")
def test_profile(browser):
browser.get("/profile")

好处:避免创建多个实例

支持的作用域:

  • function(默认):每个测试函数
  • class:每个测试类
  • module:每个 .py 文件
  • session:整个测试运行

自动管理资源生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pytest.fixture
def temp_file():
# setup: 创建临时文件
path = "temp.txt"
with open(path, "w") as f:
f.write("hello")

yield path # 测试函数在此处获得 path

# teardown: 清理
os.remove(path)

def test_read_file(temp_file):
with open(temp_file) as f:
assert f.read() == "hello"

算法流程

image-20250924135355928

  1. 对文本进行分块后,分别存储在kvstorageVectorDB Storage

  2. 然后调用大模型进行实体与关系抽取Extract Entities & Relations

  3. 然后进行实体与关系的存储Entities Data / Relations Data),去重后Deduplication),再次调用embedding模型存储于VectorDB Storage

  4. 再次调用大模型,对关系与实体的描述进行精炼或合并(Update Description),并存储到Knowledge Graph

image

LightRAG 使用 4 种类型的存储用于不同目的:

  • KV_STORAGE:llm 响应缓存、文本块、文档信息
  • VECTOR_STORAGE:实体向量、关系向量、块向量
  • GRAPH_STORAGE:实体关系图
  • DOC_STATUS_STORAGE:文档索引状态

image-20250924142554264

值得注意的是,local检索模式仅使用low_level_keywords,而global检索模式仅支持high_level_keywords,从算法流程图可以看出来,前者更侧重于检索实体,后者则侧重于关系

高关键词(High-level Keywords)用于捕获查询的核心意图和主题概念;低关键词(Low-level Keywords)用于识别具体的实体和细节信息

text_units的作用是:

  • 每条实体/关系记录在 KV 里都挂着 chunk_ids 列表。
  • 把 Top-K 结果里的 chunk_ids合并 + 去重,得到原始文本块序号。
  • 用序号去 Nano VectorDB 里把对应 text_units(原始句子/段落) 拉回,作为 上下文原始语料

image-20250924135917313

安装

安装LightRAG服务器

LightRAG服务器旨在提供Web UI和API支持。Web UI便于文档索引、知识图谱探索和简单的RAG查询界面。LightRAG服务器还提供兼容Ollama的接口,旨在将LightRAG模拟为Ollama聊天模型。这使得AI聊天机器人(如Open WebUI)可以轻松访问LightRAG。

1
2
3
pip install "lightrag-hku[api]"
cp env.example .env
lightrag-server

在此获取LightRAG docker镜像历史版本: LightRAG Docker Images

安装 LightRAG Core

1
pip install lightrag-hku

LightRAG 服务器和 WebUI

LightRAG/lightrag/api/README-zh.md at main · HKUDS/LightRAG

LightRAG 服务器旨在提供 Web 界面和 API 支持。Web 界面便于文档索引、知识图谱探索和简单的 RAG 查询界面。

使用环境变量来配置 LightRAG 服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
LLM_BINDING=openai
LLM_MODEL=qwen3-max-preview
LLM_BINDING_HOST=https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_BINDING_API_KEY=sk-

EMBEDDING_BINDING=openai
EMBEDDING_MODEL=text-embedding-v4
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY=sk-
EMBEDDING_BINDING_HOST=https://dashscope.aliyuncs.com/compatible-mode/v1

############################
### 数据存储选择
############################
### 默认存储(推荐用于小规模部署)
LIGHTRAG_KV_STORAGE=JsonKVStorage
LIGHTRAG_DOC_STATUS_STORAGE=JsonDocStatusStorage
LIGHTRAG_GRAPH_STORAGE=NetworkXStorage
LIGHTRAG_VECTOR_STORAGE=NanoVectorDBStorage

### 图存储(推荐用于生产部署)
# LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
# LIGHTRAG_GRAPH_STORAGE=MemgraphStorage

### PostgreSQL
# LIGHTRAG_KV_STORAGE=PGKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
# LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
# LIGHTRAG_VECTOR_STORAGE=PGVectorStorage

### PostgreSQL 配置
# POSTGRES_HOST=localhost
# POSTGRES_PORT=5432
# POSTGRES_USER=您的用户名
# POSTGRES_PASSWORD='您的密码'
# POSTGRES_DATABASE=您的数据库

### Neo4j 配置
# NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
# NEO4J_USERNAME=neo4j
# NEO4J_PASSWORD='您的密码'
# NEO4J_DATABASE=noe4j

### Milvus 配置
# MILVUS_URI=http://localhost:19530
# MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=您的密码

启动

1
lightrag-server

这是因为每次启动时,LightRAG Server会将.env文件中的环境变量加载至系统环境变量,且系统环境变量的设置具有更高优先级。

启动时可以通过命令行参数覆盖.env文件中的配置。常用的命令行参数包括:

  • --host:服务器监听地址(默认:0.0.0.0)
  • --port:服务器监听端口(默认:9621)
  • --working-dir:数据库持久化目录(默认:./rag_storage)
  • --input-dir:上传文件存放目录(默认:./inputs)
  • --workspace: 工作空间名称,用于逻辑上隔离多个LightRAG实例之间的数据(默认:空)

启动多个LightRAG实例

所有实例共享一套相同的.env配置文件,然后通过命令行参数来为每个实例指定不同的服务器监听端口和工作空间。你可以在同一个工作目录中通过不同的命令行参数启动多个LightRAG实例。例如:

1
2
3
4
5
# 启动实例1
lightrag-server --port 9621 --workspace space1

# 启动实例2
lightrag-server --port 9622 --workspace space2

运行

image-20250926162648415

image-20250926162703167

image-20250926162714883

LightRAG的数据隔离

需要通过配置工作空间来实现数据隔离,否则不同实例的数据将会出现冲突并被破坏。

LightRAG 使用 4 种类型的存储用于不同目的:

  • KV_STORAGE:llm 响应缓存、文本块、文档信息
  • VECTOR_STORAGE:实体向量、关系向量、块向量
  • GRAPH_STORAGE:实体关系图
  • DOC_STATUS_STORAGE:文档索引状态

每种存储类型都有多种存储实现方式。LightRAG Server默认的存储实现为内存数据库,数据通过文件持久化保存到WORKING_DIR目录。LightRAG还支持PostgreSQL、MongoDB、FAISS、Milvus、Qdrant、Neo4j、Memgraph和Redis等存储实现方式。

您可以通过环境变量选择存储实现。例如,在首次启动 API 服务器之前,您可以将以下环境变量设置为特定的存储实现名称:

1
2
3
4
LIGHTRAG_KV_STORAGE=PGKVStorage
LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage

文档和块处理逻辑说明

LightRAG 中的文档处理流程有些复杂,分为两个主要阶段:提取阶段(实体和关系提取)和合并阶段(实体和关系合并)。有两个关键参数控制流程并发性:并行处理的最大文件数(MAX_PARALLEL_INSERT)和最大并发 LLM 请求数(MAX_ASYNC)。工作流程描述如下:

  1. MAX_ASYNC 限制系统中并发 LLM 请求的总数,包括查询、提取和合并的请求。LLM 请求具有不同的优先级:查询操作优先级最高,其次是合并,然后是提取。
  2. MAX_PARALLEL_INSERT 控制提取阶段并行处理的文件数量。MAX_PARALLEL_INSERT建议设置为2~10之间,通常设置为 MAX_ASYNC/3,设置太大会导致合并阶段不同文档之间实体和关系重名的机会增大,降低合并阶段的效率。
  3. 在单个文件中,来自不同文本块的实体和关系提取是并发处理的,并发度由 MAX_ASYNC 设置。只有在处理完 MAX_ASYNC 个文本块后,系统才会继续处理同一文件中的下一批文本块。
  4. 当一个文件完成实体和关系提后,将进入实体和关系合并阶段。这一阶段也会并发处理多个实体和关系,其并发度同样是由 MAX_ASYNC 控制。
  5. 合并阶段的 LLM 请求的优先级别高于提取阶段,目的是让进入合并阶段的文件尽快完成处理,并让处理结果尽快更新到向量数据库中。
  6. 为防止竞争条件,合并阶段会避免并发处理同一个实体或关系,当多个文件中都涉及同一个实体或关系需要合并的时候他们会串行执行。
  7. 每个文件在流程中被视为一个原子处理单元。只有当其所有文本块都完成提取和合并后,文件才会被标记为成功处理。如果在处理过程中发生任何错误,整个文件将被标记为失败,并且必须重新处理。
  8. 当由于错误而重新处理文件时,由于 LLM 缓存,先前处理的文本块可以快速跳过。尽管 LLM 缓存在合并阶段也会被利用,但合并顺序的不一致可能会限制其在此阶段的有效性。
  9. 如果在提取过程中发生错误,系统不会保留任何中间结果。如果在合并过程中发生错误,已合并的实体和关系可能会被保留;当重新处理同一文件时,重新提取的实体和关系将与现有实体和关系合并,而不会影响查询结果。
  10. 在合并阶段结束时,所有实体和关系数据都会在向量数据库中更新。如果此时发生错误,某些更新可能会被保留。但是,下一次处理尝试将覆盖先前结果,确保成功重新处理的文件不会影响未来查询结果的完整性。

大家好,我想请教一下如何更好的使用这个项目,我现在处理的好慢,昨天从晚上十点跑到今天早上7点,只处理好33份文件[苦涩]我看日志,一份47页的pdf花了一个小时(mineru处理了十分钟)。
我自己总结下是有以下几个
1.我应该使用milvus和neo4j存储,而不是图方便存本地
2.设置一下并发,而不是每次只处理一个文件
3.不对图片进行处理了(感觉对我当前的场景没有必要)
针对第一点,我想问一下,存储到数据库是影响检索速率还是存储(我个人感觉是不是只影响检索啊)

Embedding模型应该本地部署,这样速度才比较快。一个文本块的Embedding速度在本地是毫秒级别的。调用云端API通常是秒级别的。
没有必要都用RagAnything来处理,先试一下用LightRAG来处理看一把速度如何。日后LightRAG会进程RagAnything,到时候可以灵活地指定每个文件的处理方式。
LightRAG的处理速度组要受到LLM的影响,与使用什么存储关系不是十分大。
LLM每秒能够输出的Tokens数量和支持的最大并发数决定了文档处理的速度。
例如一个LLM在8个并发的时候能够达到 400Tokens/秒 的峰值,预计处理速度达到15~20秒处理一个文本块。文本块的处理速度与识别出来的实体数量和实体关系需要合并的数量是有关的,因此不同文档是有所不同的。
可以用以上经验值来评估一下自己的系统处理速度是否合理。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class LightRAGStorage:
"""LightRAG存储和检索类

支持多种存储后端:
- KV存储: PostgreSQL
- 文档状态存储: PostgreSQL
- 图存储: Neo4j
- 向量存储: Milvus

使用workspace实现数据隔离
"""

def __init__(self, workspace: str = "default"):
"""初始化LightRAG存储

Args:
workspace: 工作空间名称,用于数据隔离
"""
self.workspace = workspace

# 加载环境变量
load_dotenv()

# 使用文件同级的 lightrag_storage 目录
self.working_dir = os.path.join(os.path.dirname(__file__), "lightrag_storage")

self.rag: Optional[LightRAG] = None

# 确保工作目录存在
os.makedirs(self.working_dir, exist_ok=True)

async def _get_llm_model_func(self):
"""LLM模型函数"""
async def llm_model_func(
prompt,
system_prompt=None,
history_messages=[],
keyword_extraction=False,
**kwargs
) -> str:
return await openai_complete_if_cache(
model=os.getenv("DASHSCOPE_MODEL", "qwen-max"),
prompt=prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
**kwargs
)
return llm_model_func

async def _get_embedding_func(self):
"""嵌入模型函数"""
async def embedding_func(texts: List[str]) -> np.ndarray:
return await openai_embed(
texts,
model=os.getenv("DASHSCOPE_EMBEDDING_MODEL", "text-embedding-v4"),
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1")
)
return embedding_func


async def initialize(self) -> None:
"""初始化LightRAG实例"""
if self.rag is not None:
return

# 获取模型函数
llm_model_func = await self._get_llm_model_func()
embedding_func = await self._get_embedding_func()

# 存储配置 - 统一使用字符串方式,让LightRAG自动处理
# 图存储配置
graph_storage = os.getenv("LIGHTRAG_GRAPH_STORAGE", "Neo4JStorage")

# KV存储配置
kv_storage = os.getenv("LIGHTRAG_KV_STORAGE", "PGKVStorage")

# 文档状态存储配置
doc_status_storage = os.getenv("LIGHTRAG_DOC_STATUS_STORAGE", "PGDocStatusStorage")

# 向量存储配置
vector_storage = os.getenv("LIGHTRAG_VECTOR_STORAGE", "MilvusVectorDBStorage")

# 创建LightRAG实例
self.rag = LightRAG(
working_dir=self.working_dir,
embedding_func=EmbeddingFunc(
func=embedding_func,
embedding_dim=int(os.getenv("EMBEDDING_DIM", 1024))
),
llm_model_func=llm_model_func,
workspace=self.workspace,
graph_storage=graph_storage,
kv_storage=kv_storage,
doc_status_storage=doc_status_storage,
vector_storage=vector_storage
)

# 初始化存储
await self.rag.initialize_storages()
await initialize_pipeline_status()

async def insert_text(self, text: str) -> None:
"""插入文本到LightRAG

Args:
text: 要插入的文本内容
"""
if self.rag is None:
await self.initialize()

await self.rag.ainsert(text)

async def insert_texts(self, texts: List[str]) -> None:
"""批量插入文本

Args:
texts: 文本列表
"""
for text in texts:
await self.insert_text(text)

async def query(
self,
query: str,
mode: str = "hybrid",
**kwargs
) -> str:
"""执行查询

Args:
query: 查询文本
mode: 查询模式 ("naive", "local", "global", "hybrid")
**kwargs: 其他查询参数

Returns:
查询结果
"""
if self.rag is None:
await self.initialize()

return await self.rag.aquery(
query,
param=QueryParam(mode=mode, **kwargs)
)

参考资料

通过实际案例拆解Light RAG,构建可视化知识图谱,解析Graph RAG的运行原理和关键概念(一)_哔哩哔哩_bilibili

AI知识图谱 GraphRAG 是怎么回事?_哔哩哔哩_bilibili

LightRAG/README-zh.md 在主 ·香港科技大学/LightRAG —- LightRAG/README-zh.md at main · HKUDS/LightRAG

数据的规范化

  1. Min-Max 归一化(Normalization)
1
2
3
4
5
from sklearn.preprocessing import MinMaxScaler

# 公式: X_scaled = (X - X_min) / (X_max - X_min)
scaler = MinMaxScaler()
X_norm = scaler.fit_transform(X)
  • 结果范围:[0, 1](或可指定其他范围)
  • 保留原始分布形状
  • 对异常值敏感(因为依赖最大/最小值)
  1. Z-score 标准化(Standardization)
1
2
3
4
5
from sklearn.preprocessing import StandardScaler

# 公式: X_scaled = (X - μ) / σ
scaler = StandardScaler()
X_std = scaler.fit_transform(X)
  • 结果:均值 ≈ 0,标准差 ≈ 1
  • 不保证固定范围(可能有 -3 到 +3 甚至更大)
  • 对异常值相对稳健

RobustScaler (鲁棒标准化)与Z-score 标准化(Standardization)对比

StandardScaler (Z-score标准化)

1
2
3
4
5
6
7
8
9
公式: (x - 均值) / 标准差

问题: 均值和标准差都会被异常值严重影响!

例如:
正常值: [100, 150, 200, 180, 220] → 均值 = 170
加入异常值: [100, 150, 200, 180, 220, 6445] → 均值 = 1216 ❌

结果: 所有正常值都变成负数,异常值主导了整个标准化过程

RobustScaler (鲁棒标准化)

1
2
3
4
5
6
7
8
9
10
公式: (x - 中位数) / IQR
其中 IQR = Q3 - Q1 (四分位距)

优势: 中位数和IQR不受异常值影响!

例如:
正常值: [100, 150, 200, 180, 220] → 中位数 = 180, IQR = 70
加入异常值: [100, 150, 200, 180, 220, 6445] → 中位数 = 190, IQR = 70 ✅

结果: 异常值不会扭曲正常数据的标准化结果

Apriori算法

Apriori 算法 是一种用于 频繁项集挖掘(Frequent Itemset Mining)关联规则学习(Association Rule Learning) 的经典算法。

Apriori 基于一个非常重要的性质,叫 先验性质(Apriori Property)

如果一个项集是频繁的,那么它的所有子集也一定是频繁的。
反过来:如果一个项集是非频繁的,那么它的所有超集也一定是非频繁的。

这个性质可以用来 剪枝(prune),避免穷举所有可能的组合。

Apriori 的 候选生成规则

当且仅当它们的前 (k-1) 个项相同,两个频繁 k项集才可以连接生成 (k+1)项集。

剪枝规则只有一句话:

对候选 k-项集 c,只要存在任何一个 (k−1)-子集 不在 Lₖₖ₋₁ 里,
就把 c 整集扔掉,不再给它计数。

为什么这样做合法?

基于 Apriori 向下封闭性(频繁项集的所有子集必频繁)。
若 c 哪怕只有一个 (k−1)-子集是非频繁的,c 自己绝对不可能频繁
所以没必要浪费一次数据库扫描去数它的支持度。

例子

1
2
3
4
5
6
7
transactions = [
['牛奶', '面包', '黄油'],
['牛奶', '面包'],
['牛奶', '可乐'],
['面包', '黄油'],
['牛奶', '面包', '可乐'],
]

我们的目标是找出 频繁项集(比如哪些商品经常一起出现)。

步骤 1:设定最小支持度(min_support)

假设我们设 min_support = 2,意思是:至少出现在 2 个购物篮中才算“频繁”

支持度(Support) = 包含该项集的交易数 / 总交易数
这里我们直接用“出现次数 ≥ 2”来简化。

步骤 2:生成 1-项集(单个商品)

统计每个商品出现次数:

项集 出现次数
{‘牛奶’} 4
{‘面包’} 4
{‘黄油’} 2
{‘可乐’} 2

全部 ≥2 → 都是频繁 1-项集。

步骤 3:生成 2-项集(两两组合)

从频繁 1-项集中两两组合(注意:只组合那些“所有子集都频繁”的):

可能的组合:

  • {‘牛奶’, ‘面包’}
  • {‘牛奶’, ‘黄油’}
  • {‘牛奶’, ‘可乐’}
  • {‘面包’, ‘黄油’}
  • {‘面包’, ‘可乐’}
  • {‘黄油’, ‘可乐’}

现在统计它们在交易中出现的次数:

项集 出现次数
{‘牛奶’, ‘面包’} 3 ✅
{‘牛奶’, ‘黄油’} 1 ❌
{‘牛奶’, ‘可乐’} 2 ✅
{‘面包’, ‘黄油’} 2 ✅
{‘面包’, ‘可乐’} 1 ❌
{‘黄油’, ‘可乐’} 0 ❌

保留 ≥2 的 → 频繁 2-项集:

  • {‘牛奶’, ‘面包’}
  • {‘牛奶’, ‘可乐’}
  • {‘面包’, ‘黄油’}

步骤 4:生成 3-项集

从频繁 2-项集中尝试组合。
比如:{‘牛奶’,’面包’} 和 {‘牛奶’,’可乐’} → 可以组合成 {‘牛奶’,’面包’,’可乐’}
但必须检查它的所有 2-项子集是否都在频繁 2-项集中:

  • 子集:{‘牛奶’,’面包’} ✅
  • 子集:{‘牛奶’,’可乐’} ✅
  • 子集:{‘面包’,’可乐’} ❌(之前被剪掉了!)

→ 所以 {‘牛奶’,’面包’,’可乐’} 不合法,不能生成。

再试:{‘牛奶’,’面包’} + {‘面包’,’黄油’} → {‘牛奶’,’面包’,’黄油’}
检查子集:

  • {‘牛奶’,’面包’} ✅
  • {‘牛奶’,’黄油’} ❌(之前只有1次)
  • {‘面包’,’黄油’} ✅

→ 有一个子集不频繁 → 整个 3-项集被剪枝!

结论:没有频繁 3-项集。

算法结束。

FP-growth

FP-growth(Frequent Pattern Growth)算法,它是一种用于频繁项集挖掘的高效算法

  • Apriori 算法:通过逐层生成候选项集(先1项,再2项…),每次都要扫描整个数据库,效率低。
  • FP-growth不生成候选项集,而是构建一种压缩数据结构——FP树(Frequent Pattern Tree),只需扫描数据库 2 次,效率更高!

FP-growth 的核心思想(分两步)

第一步:构建 FP 树(FP-Tree)

  1. 第一次扫描:统计每个单项的支持度,过滤掉低于 min_support 的项。
  2. 对每条事务
    • 只保留频繁项
    • 按支持度从高到低排序
  3. 第二次扫描:将排序后的事务插入 FP 树。

第二步:从 FP 树中挖掘频繁项集

  • 支持度最低的频繁项开始(称为“后缀模式”)
  • 对每个项,找出它的条件模式基(Conditional Pattern Base)
  • 构建条件 FP 树(Conditional FP-Tree)
  • 递归挖掘,直到树为空

补充.4 关联FP算法哔哩哔哩_bilibili

模型评价指标

课后第一,二次作业

考虑表1中的候选3-项集,假设hash函数为h(x)=x mod 4,叶节点最大尺寸为2,构造hash树。

表1. 候选3-项集

编号 项集
1 {1, 4, 5}
2 {1, 5, 9}
3 {3, 5, 6}
4 {1, 2, 4}
5 {1, 3, 6}
6 {3, 5, 7}
7 {4, 5, 7}
8 {2, 3, 4}
9 {6, 8, 9}
10 {1, 2, 5}
11 {5, 6, 7}
12 {3, 6, 7}
13 {4, 5, 8}
14 {3, 4, 5}
15 {3, 6, 8}

IMG_20251027_094247

事务集如表2所示,最小支持度阈值是30%

根据表2的事务集,在格结构上对每个结点添加所有符合条件的字母标记:

N:如果该项集被Apriori算法认为不是候选项集。(一个项集不是候选项集有两种可能的原因,一个是它们没有在候选项集产生步骤产生,另一个是它虽然在候选项集产生步骤产生,但是在剪枝步骤被丢掉)

I:如果计算支持度计数后,该候选项集被认为是非频繁的。

表2. 事务集

TID 项集
1 {a, b, d, e}
2 {b, c, d}
3 {a, b, d, e}
4 {a, c, d, e}
5 {b, c, d, e}
6 {b, d, e}
7 {c, d}
8 {a, b, c}
9 {a, d, e}
10 {b, d}

img

IMG_20251029_160558

判断极大频繁项集(M)

极大频繁项集:是频繁的,且没有频繁的真超集

定义:极大频繁项集(Maximal Frequent Itemset)是一个频繁项集,其所有超集都不是频繁的

检查每个频繁项集:

  • ade:超集有 abde, acde, adde(无效), bcde 等,但所有 4-项集都不频繁 → 所以 ade 是极大
  • bde:同理,超集如 abde, bcde 都不频繁 → bde 是极大
  • 其他频繁项集(如 ad):有超集 ade(频繁)→ 所以 ad 不是极大
  • ab:超集 abd(非频繁),abe(非频繁)→ 但 ab 本身频繁,但有没有频繁超集?没有 → 等等!极大要求:没有频繁的超集。abd 支持度=2(非频繁),abe=2(非频繁)→ 所以 ab 没有频繁超集 → 那 ab 是极大?

判断闭频繁项集(C)

闭项集:一个项集 X 是闭的,如果不存在超集 Y ⊃ X 使得 support(Y) = support(X)

即:它的支持度严格大于所有超集的支持度(或者没有超集具有相同支持度)。

以单项集为例:

单项集:

  • a (5):超集 ad(4), ae(4), ab(3), ade(4) → 所有支持度 <5 → 没有超集支持度=5 → a 是闭
  • b (7):超集 bd(6), be(4), ab(3), bde(4) → 都 <7 → b 是闭
  • c (5):超集 bc(3), cd(4) → 都 <5 → c 是闭
  • d (9):超集 ad(4), bd(6), cd(4), de(6), ade(4), bde(4) → 都 <9 → d 是闭
  • e (6):超集 ae(4), be(4), de(6) → 注意:de 支持度=6,等于 e 的支持度!
    • e ⊂ de,且 support(e)=support(de)=6 → 所以 e 不是闭

课后第三次作业

假设最小支持度阈值为40%,基于以下事务集写出使用FP-growth挖掘频繁项集的过程。

表1. 事务集

TID 项集
1 {a,b,d,e}
2 {b,c,d}
3 {a,b,d,e}
4 {a,c,d,e}
5 {b,c,d,e}

image-20251117112729200

image-20251117112739932

条件 FP 树(Conditional FP-tree)是 FP-growth 算法在“递归”阶段用来压缩“条件模式基”的一棵小 FP 树。
它的构建过程与初始 FP 树几乎一样,只是输入数据换成了“条件模式基”,并且再扫一遍、删低计数、排序、插入即可。下面用 “以 e 为后缀” 的例子把每一步都写出来,你就能完全复现。

一、准备:条件模式基(Conditional Pattern Base)

从主 FP 树里提取所有以 e 结尾的路径,并记录该路径的出现次数:

路径(删去 e 本身) 该路径计数
d b a 2
d a c 1
d b c 1

这就是“e 的条件模式基”。

二、第一次扫描:算单项计数

把上面 3 条记录拆成单项累加:

  • d: 2+1+1 = 4
  • b: 2+0+1 = 3
  • a: 2+1+0 = 3
  • c: 0+1+1 = 2

最小支持度计数仍是 2,因此全部保留(若某项 <2 则直接丢弃)。

三、第二次扫描:排序 + 插入

  1. 排序规则:按全局频繁 1-项集的降序排(即 d≻b≻a≻c≻e)。
    因此每条记录内部重新排序:
原路径 排序后路径 计数
d b a d b a 2
d a c d a c 1
d b c d b c 1
  1. 逐条插入,构建“e 的条件 FP 树”:
  • 插入 d b a(计数 2)
    根 → d(2) → b(2) → a(2)
  • 插入 d a c(计数 1)
    根 → d(3) → a(1) → c(1)
  • 插入 d b c(计数 1)
    根 → d(4) → b(3) → c(1)

最终得到的条件 FP 树文字表示:

1
2
3
4
5
6
7
null
└── d(4)
├── b(3)
│ ├── a(2)
│ └── c(1)
└── a(1)
└── c(1)

用户和权限管理

创建四个系统用户

为系统添加以下四个用户:

  • alice
  • bob
  • john
  • mike
1
2
3
useradd -d /home/bob -m bob
useradd -d /home/john -m john
useradd -d /home/mike -m mike

为四个用户设置密码

1
2
3
4
passwd alice
passwd bob
passwd john
passwd mike

创建共享目录

/home 目录下创建一个名为 work 的共享目录:

1
mkdir /home/work

创建用户组并添加成员

创建一个名为 workgroup 的用户组:

1
groupadd workgroup

将用户 alice, bob, john 加入该组:

1
2
3
4
usermod -a -G workgroup alice   # 添加为附加组
usermod -g workgroup alice # 设置为 alice 的主组(primary group)
usermod -a -G workgroup bob
usermod -a -G workgroup john
  • 主组(Primary Group):每个用户有且只有一个,是用户创建文件时默认继承的组
  • 附加组(Supplementary Group):用户可以有多个,用于额外获得某些组的权限,但不会影响新建文件的默认组

查看某个组(group)的信息

1
getent group workgroup

修改共享目录的所有权

/home/work 目录的属主改为 alice,属组改为 workgroup

1
chown alice:workgroup /home/work

修改 work 目录的权限

  • 属组内的用户(workgroup 组成员)→ 完全访问权限(rwx)
  • 属组外的用户 → 没有访问权限(—-)
1
chmod ug+rwx,o-rwx /home/work
  • ug+rwx:用户(user)和组(group)都加上读、写、执行权限
  • o-rwx:其他人(others)去掉所有权限(读、写、执行)

尝试以 bob 用户身份在 work 目录下创建文件

切换到 bob 用户:

1
su - bob

创建文件:

1
touch /home/work/bob.txt

查看是否成功:

1
ls -l /home/work/bob.txt

尝试以 john 的身份查看或修改 bob.txt

1
2
3
4
su - john
cat /home/work/bob.txt #查看文件内容
echo "hello" >> /home/work/bob.txt
ls -l /home/work/bob.txt #查看权限

尝试以 mike 的身份查看或修改 bob.txt

1
2
3
4
su - mike
cat /home/work/bob.txt
echo "mike tried" >> /home/work/bob.txt
cd /home/work

进程管理与调试

编写 badproc.sh 脚本

Shell 程序(Shell Script) 就是一系列 Shell 命令的集合,写在一个文件里,可以像程序一样自动、批量、重复执行

1
2
3
4
5
6
7
8
#!/bin/bash
while echo "I'm making files!"
do
mkdir adir
cd adir
touch afile
sleep 10s
done

#!/bin/bash → 指定用 bash 解释器执行

while echo "..."无限循环,每次循环前先打印一句话(等价于 while true; do ... done

循环体内:

  • 创建目录 adir
  • 进入该目录
  • 创建文件 afile
  • 睡眠 10 秒 → 避免太快占满磁盘

添加可执行权限

1
2
3
chmod +x badproc.sh
#验证
ls -l badproc.sh

在后台运行脚本

1
./badproc.sh &

查看进程号

1
ps aux | grep badproc

杀死进程

1
kill 12345

删除脚本运行时创建的目录和文件

1
rm -rf adir

创建源文件 fork.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>

int main() {
pid_t pid;

/* fork another process */
pid = fork();
if (pid < 0) { /* error occurred */
fprintf(stderr, "Fork Failed\n");
exit(-1);
}
else if (pid == 0) { /* child process */
printf("This is child process, pid=%d\n", getpid());
execl("/bin/ls", "ls", NULL); // 执行 ls 命令
printf("Child process finished\n"); // 这句不会打印,除非 execl 失败
}
else { /* parent process */
printf("This is parent process, pid=%d\n", getpid());
wait(NULL); // 等待子进程结束
printf("Child Complete\n");
exit(0);
}
}

编译程序(带调试信息)

1
gcc -g -o fork fork.c

先运行一次,看看效果

1
./fork

gdb 调试 fork 程序

1
2
3
4
5
6
gdb ./fork
#在 fork() 调用前设置
(gdb) set follow-fork-mode child
#设置断点和 catch exec
break main
catch exec

Linux编程环境熟悉

C++编译

创建一个名为 helloworld.cpp 的文件,nano helloworld.cpp

1
2
3
4
5
6
7
8
#include <iostream>
using namespace std;

int main(void)
{
cout << "Hello world" << endl;
return 0;
}

Ctrl+O 保存,再按 Ctrl+X 退出 nano

1
2
3
4
#编译程序
g++ -o hello helloworld.cpp
#运行程序
./hello

创建小型函数库

创建源文件 fred.cbill.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* fred.c */
#include <stdio.h>

void fred(int arg)
{
printf("fred: we passed %d\n", arg);
}

/* bill.c */
#include <stdio.h>

void bill(char *arg)
{
printf("bill: we passed %s\n", arg);
}

编译成目标文件(.o)

1
gcc -c bill.c fred.c

创建头文件 lib.h

1
2
3
/* lib.h */
void bill(char *);
void fred(int);

创建主程序 program.c

1
2
3
4
5
6
7
8
9
/* program.c */
#include <stdlib.h>
#include "lib.h" // 引入我们自己写的头文件

int main()
{
bill("Hello World");
exit(0); // 正常退出
}

编译主程序(只编译,不链接)

1
gcc -c program.c

创建静态库 libfoo.a

使用 ar 命令把 bill.ofred.o 打包成一个静态库:

  • ar:archive 工具,用于创建静态库
  • c:创建新库
  • r:将文件插入到库中(如果不存在则添加)
  • v:显示详细信息
1
ar crv libfoo.a bill.o fred.o

链接主程序和静态库

现在我们要把 program.olibfoo.a 链接起来,生成最终可执行文件 program

1
gcc -o program program.o libfoo.a

运行程序

1
./program
0%