LangGraphChatBot

环境配置

python虚拟环境构建

1
python -m venv .venv
1
2
3
4
5
pip install langgraph==0.2.74                  
pip install langchain-openai==0.3.6
pip install fastapi==0.115.8
pip install uvicorn==0.34.0
pip install gradio==5.18.0

查看包pip list

构建一个基本的fastapi+langgraph应用

llm示例的构建(利用ChatOpenAI)

1
2
3
4
5
6
7
8
9
# 创建LLM实例
llm = ChatOpenAI(
base_url=config["base_url"],
api_key=config["api_key"],
model=config["model"],
temperature=DEFAULT_TEMPERATURE,
timeout=30, # 添加超时配置(秒)
max_retries=2 # 添加重试次数
)

数据类型的构建

继承于pydantic

规范化 API 请求和响应的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 定义消息类,用于封装API接口返回数据
#基于 Pydantic 的数据模型
# 定义Message类
class Message(BaseModel):
'''
role (角色): 这是一个字符串,表示消息的发送者。常见的角色包括:

- user (用户): 表示用户输入的消息。
- assistant (助手): 表示聊天机器人或模型生成的消息。
- system (系统): 表示为模型提供上下文或指令的系统消息。
'''
role: str
content: str

# 定义ChatCompletionRequest类
#聊天 API 请求
class ChatCompletionRequest(BaseModel):
messages: List[Message]
stream: Optional[bool] = False#是否流式方式响应
userId: Optional[str] = None#用于标识发起请求的用户
conversationId: Optional[str] = None#用于标识特定的对话会话,这对于管理对话上下文或历史记录非常有用

# 定义ChatCompletionResponseChoice类
#聊天完成响应中的一个“选择”或一个生成的回复
class ChatCompletionResponseChoice(BaseModel):
index: int
message: Message
finish_reason: Optional[str] = None

# 定义ChatCompletionResponse类
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
choices: List[ChatCompletionResponseChoice]#模型生成的所有可能的回复选项
system_fingerprint: Optional[str] = None

定义fastapi应用并管理应用的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph

try:
logger.info("正在初始化模型、定义Graph...")
#(1)初始化LLM
llm = get_llm(llm_type)
#(2)定义Graph
graph = create_graph(llm)
#(3)将Graph可视化图保存
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
# raise 关键字重新抛出异常,以确保程序不会在错误状态下继续运行
raise

# yield 关键字将控制权交还给FastAPI框架,使应用开始运行
# 分隔了启动和关闭的逻辑。在yield 之前的代码在应用启动时运行,yield 之后的代码在应用关闭时运行
yield
# 关闭时执行
logger.info("正在关闭...")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

langgraph核心逻辑

创建langgraph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 定义chatbot的状态
class State(TypedDict):
messages: Annotated[list, add_messages]

# 创建和配置chatbot的状态图
def create_graph(llm) -> StateGraph:
try:
# 构建graph
#创建一个 StateGraph 的实例,并将其配置为使用 State 类作为其状态管理的数据模型
graph_builder = StateGraph(State)

# 定义chatbot的node
def chatbot(state: State) -> dict:
# 处理当前状态并返回 LLM 响应
return {"messages": [llm.invoke(state["messages"])]}

# 配置graph
#第二个参数 chatbot :这是一个可调用对象(通常是一个函数或方法),它定义了当执行流程到达这个名为 "chatbot" 的节点时,应该执行什么操作。
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# 这里使用内存存储 也可以持久化到数据库
memory = MemorySaver()

# 编译生成graph并返回
#checkpointer 参数将 memory 实例传递给编译过程,使得图能够管理其状态的保存和加载。编译后的图对象被返回,这个对象可以被调用来运行聊天机器人。
return graph_builder.compile(checkpointer=memory)

except Exception as e:
raise RuntimeError(f"Failed to create graph: {str(e)}")

可视化langgraph节点

1
2
3
4
5
6
7
8
# 将构建的graph可视化保存为 PNG 文件
def save_graph_visualization(graph: StateGraph, filename: str = "graph.png") -> None:
try:
with open(filename, "wb") as f:
f.write(graph.get_graph().draw_mermaid_png())
logger.info(f"Graph visualization saved as {filename}")
except IOError as e:
logger.info(f"Warning: Failed to save graph visualization: {str(e)}")

封装接口

包含流式输出与非流式输出的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 封装POST请求接口,与大模型进行问答
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
# 判断初始化是否完成
if not graph:
logger.error("服务未初始化")
raise HTTPException(status_code=500, detail="服务未初始化")

try:
logger.info(f"收到聊天完成请求: {request}")

query_prompt = request.messages[-1].content
logger.info(f"用户问题是: {query_prompt}")

config = {"configurable": {"thread_id": request.userId+"@@"+request.conversationId}}
logger.info(f"用户当前会话信息: {config}")

prompt_template_system = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_SYS)
prompt_template_user = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_USER)
prompt = [
{"role": "system", "content": prompt_template_system.template},
{"role": "user", "content": prompt_template_user.template.format(query=query_prompt)}
]

# 处理流式响应
if request.stream:
async def generate_stream():
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
async for message_chunk, metadata in graph.astream({"messages": prompt}, config, stream_mode="messages"):
chunk = message_chunk.content
logger.info(f"chunk: {chunk}")
# 在处理过程中产生每个块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {'content': chunk},'finish_reason': None}]})}\n\n"
# 流结束的最后一块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {},'finish_reason': 'stop'}]})}\n\n"
# 返回fastapi.responses中StreamingResponse对象
return StreamingResponse(generate_stream(), media_type="text/event-stream")

# 处理非流式响应处理
else:
try:
events = graph.stream({"messages": prompt}, config)
for event in events:
for value in event.values():
result = value["messages"][-1].content
except Exception as e:
logger.info(f"Error processing response: {str(e)}")

formatted_response = str(format_response(result))
logger.info(f"格式化的搜索结果: {formatted_response}")
#封装响应
response = ChatCompletionResponse(
choices=[
ChatCompletionResponseChoice(
index=0,
message=Message(role="assistant", content=formatted_response),
finish_reason="stop"
)
]
)
logger.info(f"发送响应内容: \n{response}")
# 返回fastapi.responses中JSONResponse对象
# model_dump()方法通常用于将Pydantic模型实例的内容转换为一个标准的Python字典,以便进行序列化
return JSONResponse(content=response.model_dump())

except Exception as e:
logger.error(f"处理聊天完成时出错:\n\n {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

langgraph的短期记忆与长期记忆

LangGraph支持两种对于构建对话代理至关重要的内存类型:

  • 短期内存:通过在会话中维护消息历史来跟踪正在进行的对话。
  • 长期内存:在不同会话之间存储用户特定或应用程序级别的数据。
image-20250715094855646

在LangGraph中

  • 短期内存也称为线程级内存
  • 长期内存也称为跨线程内存

教程地址

NanGePlus/LangGraphChatBot: 使用LangGraph+DeepSeek-R1+FastAPI+Gradio实现一个带有记忆功能的流量包推荐智能客服web端用例,同时也支持gpt大模型、国产大模型(OneApi方式)、Ollama本地开源大模型、阿里通义千问大模型

LangGraph+deepseek-r1+FastAPI+Gradio实现拥有记忆的流量包推荐智能客服web端用例,同时也支持gpt、国产大模型、Ollama_哔哩哔哩_bilibili