# 检查专家答案的数量 #isinstance(m, AIMessage) :检查当前消息 m 是否是 AIMessage 类的实例。这通常用于识别由 AI 模型生成的消息。 num_responses = len( [m for m in messages if isinstance(m, AIMessage) and m.name == name] )
# 如果专家回答的次数超过最大轮数,则结束 if num_responses >= max_num_turns: return 'save_interview'
def sorting_reducer(left, right): """ 合并并排序列表中的值""" # 如果 left 不是列表,则将其转换为列表 if not isinstance(left, list): left = [left]
# 如果 right 不是列表,则将其转换为列表 if not isinstance(right, list): right = [right] # 合并 left 和 right 列表,然后升序排序 return sorted(left + right, reverse=False) class State(TypedDict): # sorting_reducer 函数将对 state 中的值进行排序 state: Annotated[list, sorting_reducer]
from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, MessagesState, START, END from langgraph.store.base import BaseStore
from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.runnables.config import RunnableConfig
# 运行图 for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"): chunk["messages"][-1].pretty_print() ================================[1m Human Message [0m=================================
你好,我的名字是Lance ==================================[1m Ai Message [0m==================================
from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, MessagesState, START, END from langgraph.store.base import BaseStore
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage from langchain_core.runnables.config import RunnableConfig
# 运行图 for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"): chunk["messages"][-1].pretty_print()
查看记忆
1 2 3 4 5
# Namespace for the memory to save user_id = "1" namespace = ("memory", user_id) existing_memory = across_thread_memory.get(namespace, "user_memory") existing_memory.value
Trustcall
for creating and updating profile schemasTrustcall
用于创建和更新配置文件模式
from langchain_core.messages import HumanMessage messages = [HumanMessage(content="你好,2乘2是多少")] messages = graph.invoke({"messages": messages}) for m in messages['messages']: m.pretty_print()
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14
================================[1m Human Message [0m=================================
# Define nodes: these do the work builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools))
# Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END tools_condition, ) builder.add_edge("tools", "assistant") react_graph = builder.compile()
# Show display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
from typing import Annotated from langgraph.graph import MessagesState from langchain_core.messages import AnyMessage from langgraph.graph.message import add_messages
# Initial state initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"), HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2") ]
# New message to add new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")
# Test add_messages(initial_messages , new_message)
1 2
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'), HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]
# Message list messages = [AIMessage("Hi.", name="Bot", id="1")] messages.append(HumanMessage("Hi.", name="Lance", id="2")) messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3")) messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))
# Isolate messages to delete delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]] print(delete_messages)
Multiple Schemas
多种状态
Private State
私有状态
首先,让我们讨论在节点之间传递 private
state
的情况。这对于图的中间计算逻辑中需要的任何内容都很有用,但与图的整体输入或输出无关。
from IPython.display import Image, display from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START
# Define a new graph workflow = StateGraph(State) workflow.add_node("conversation", call_model) workflow.add_node("summarize_conversation",summarize_conversation)
# Set the entrypoint as conversation workflow.add_edge(START, "conversation") workflow.add_conditional_edges("conversation", should_continue) workflow.add_edge("summarize_conversation", END)
input_message = HumanMessage(content="我喜欢玩lol,你知道这个游戏吗") output = graph.invoke({"messages": [input_message]}, config) for m in output['messages'][-1:]: m.pretty_print()
当对话大于6,可生成概要
1
graph.get_state(config).values.get("summary","")
Chatbot
with message summarization & external DB
memory具有消息总结和外部数据库记忆的聊天机器人
# Start conversation for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"): chunk['conversation']["messages"].pretty_print()
现在我们直接打印状态更新。
1 2 3
==================================[1m Ai Message [0m==================================
你好呀,zxj!再次见到你真高兴~😊 有什么我可以帮忙的吗?
stream_mode=“values”
1 2 3 4 5 6 7 8 9
# Start conversation, again config = {"configurable": {"thread_id": "2"}}
# Start conversation input_message = HumanMessage(content="你好我是zxj") for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"): for m in event['messages']: m.pretty_print() print("---"*25)
# Run the graph until the first interruption for event in graph.stream(initial_input, thread, stream_mode="values"): event['messages'][-1].pretty_print()
1 2 3 4 5 6 7 8 9 10
================================[1m Human Message [0m=================================
# Run the graph until the first interruption for event in graph.stream(initial_input, thread, stream_mode="values"): event['messages'][-1].pretty_print()
1 2 3
================================[1m Human Message [0m=================================
2乘3
当状态中断时,我们可以直接应用状态更新
记住,对 messages 键的更新将使用
add_messages reducer:
**如果我们想覆盖现有的消息,可以提供带有* id
*的消息。** 如果我们只想将消息添加到消息列表中,则可以传递未指定
id 的消息,如下所示。
from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.errors import NodeInterrupt from langgraph.graph import START, END, StateGraph
class State(TypedDict): input: str
def step_1(state: State) -> State: print("---Step 1---") return state
def step_2(state: State) -> State: # 如果输入字符串长度超过5个字符,我们可以选择抛出NodeInterrupt异常 if len(state['input']) > 5: raise NodeInterrupt(f"收到长度超过5个字符的输入: {state['input']}") print("---Step 2---") return state
def step_3(state: State) -> State: print("---Step 3---") return state
# top level context arg is typed as Context for autocomplete and type checking result = graph.invoke( {'input': 'abc'}, context=Context(user_id='123', db_conn='conn_mock') )
defstream_graph_updates(user_input: str): for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}): print(event) #stream 返回的每个 event 通常是一个字典,键是图中节点的名称,值是该节点完成后的状态更新。 for value in event.values(): #消息列表中的最后一条消息的文本内容 print("Assistant:", value["messages"][-1].content)
whileTrue: try: user_input = input("User: ") #退出 if user_input.lower() in ["quit", "exit", "q"]: print("Goodbye!") break #调用 stream_graph_updates(user_input) except: # fallback if input() is not available user_input = "What do you know about LangGraph?" print("User: " + user_input) stream_graph_updates(user_input) break
# The config is the **second positional argument** to stream() or invoke()! events = graph.stream( {"messages": [{"role": "user", "content": user_input}]}, config, stream_mode="values", ) for event in events: event["messages"][-1].pretty_print()
events = graph.stream( {"messages": [{"role": "user", "content": user_input}]}, config, stream_mode="values", ) for event in events: if "messages" in event: event["messages"][-1].pretty_print()
# 遍历 `graph` 在给定 `config` 下的所有历史状态。 # `graph.get_state_history(config)` 会返回一个迭代器,其中包含了从开始到当前的所有状态快照。 for state in graph.get_state_history(config): # 打印当前状态快照中的一些信息,以便我们观察和选择。 # `len(state.values["messages"])` 显示了到该状态为止,对话历史中的消息总数。 # `state.next` 显示了在该状态之后,图将要执行的下一个节点或步骤的名称。 print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next) # 打印一条分隔线,使输出更易读。 print("-" * 80) # 这里是选择“时间旅行”目标点的关键逻辑。 # 我们设定一个条件:当对话历史中的消息数量正好等于4时,我们就找到了想要回到的那个点。 # 这是一个为了演示而设定的任意条件,在实际应用中,您可以根据需要设置更复杂的选择逻辑。 if len(state.values["messages"]) == 4: # We are somewhat arbitrarily selecting a specific state based on the number of chat messages in the state. # 将当前这个符合条件的状态(state)保存到 to_replay 变量中。 # 循环结束后,to_replay 变量将持有我们选中的那个历史时刻的完整状态, # 之后我们就可以用它来恢复或修改执行流程。 to_replay = state
from langchain.agents import create_openai_functions_agent, AgentExecutor from langchain.tools import tool from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field
# 1. 定义工具 class WeatherInput(BaseModel): location: str = Field(description="城市名称")
from langchain_core.tools import tool from typing import Literal from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
# 处理非流式响应处理 else: try: events = graph.stream({"messages": prompt}, config) for event in events: for value in event.values(): result = value["messages"][-1].content except Exception as e: logger.info(f"Error processing response: {str(e)}")