事件流是 SimpleLLMFunc v0.5.0+ 引入的高级特性,允许你实时观察 ReAct 循环的完整执行过程。通过启用事件流,你可以监控 LLM 调用、工具调用、流式响应等所有关键环节,实现更精细的控制和更好的用户体验。
在当前版本中,事件流还提供了统一来源元数据(EventYield.origin),用于稳定识别主会话与 fork 子会话。
适用范围:事件流系统同时支持 @llm_chat 和 @llm_function 装饰器,提供统一的事件观测体验。
快速理解
事件流(Event Stream)允许你在 ReAct 循环执行过程中持续拿到事件,而不是只在最后拿到一个结果。
LLM 调用事件
包括开始、流式 chunk 到达、调用结束等阶段。
工具调用事件
包括工具启动、执行完成、批次处理和异常信息。
执行统计
可观测 Token 用量、耗时、调用次数等性能指标。
自定义 UI
可据此构建流式消息区、进度条、工具调用面板等界面。
状态管理
可通过 wrapper 模式做上下文压缩和状态持久化。
为什么需要事件流
在传统使用方式下,你通常只能获得最终响应。而事件流带来的价值包括:
- 实时监控 LLM 和工具调用状态
- 获取详细的性能指标
- 构建更丰富的 UI
- 深入调试 ReAct 循环
- 配合 wrapper 做状态管理
设计哲学:Agent 作为无状态函数
SimpleLLMFunc 遵循一个核心设计哲学:Agent 本身是一个函数,不管理自己的状态。
这意味着:
- ✅ 无状态设计:
@llm_chat 装饰的函数是纯函数,每次调用都是独立的
- ✅ 状态外置:所有状态(包括
history)都通过参数传入和返回值传出
- ✅ 可组合性:通过 wrapper 函数实现状态管理和高级功能
为什么这样设计?
- 函数式编程:符合函数式编程理念,易于测试和推理
- 灵活性:状态管理完全由用户控制,可以实现各种自定义逻辑
- 可扩展性:通过 wrapper 函数可以轻松添加 Context Compression、状态持久化等功能
如何实现状态管理和修改?
通过事件流,你可以:
- 监控状态:通过事件获取 Agent 的内部状态信息(消息历史、工具调用等)
- 修改状态:在 wrapper 函数中拦截和修改
history,实现 Context Compression 等功能
- 状态持久化:在 wrapper 中实现状态的保存和恢复
下面我们将详细展示如何通过 wrapper 函数实现这些功能。
启用事件流
事件流同时支持 @llm_chat 和 @llm_function。两者都会返回 ReactOutput,区别只在于 ResponseYield.response 的内容形式。
基本用法
在 @llm_chat 或 @llm_function 装饰器中设置 enable_event=True 即可启用事件流:
llm_chat 的事件流
@llm_chat 装饰器在启用事件流时,返回一个生成器,yield ReactOutput:
from SimpleLLMFunc import llm_chat
from SimpleLLMFunc.hooks import ReactOutput, ResponseYield, EventYield
@llm_chat(
llm_interface=llm,
toolkit=[calculate, get_weather],
stream=True,
enable_event=True, # 🔑 启用事件流
)
async def chat(message: str, history=None):
"""智能助手"""
pass
# 使用事件流
async for output in chat("Hello"):
if isinstance(output, ResponseYield):
# 处理响应数据
print(output.response)
elif isinstance(output, EventYield):
# 处理事件
event = output.event
print(f"事件类型: {event.event_type}")
llm_function 的事件流
@llm_function 装饰器在启用事件流时,也返回一个生成器,yield ReactOutput:
from SimpleLLMFunc import llm_function
from SimpleLLMFunc.hooks import ReactOutput, ResponseYield, EventYield
@llm_function(
llm_interface=llm,
toolkit=[calculate, get_weather],
enable_event=True, # 🔑 启用事件流
)
async def analyze_text(text: str) -> str:
"""分析文本内容"""
pass
# 使用事件流
async for output in analyze_text("Hello world"):
if isinstance(output, ResponseYield):
# 处理响应数据(最终结果)
result = output.response
print(f"分析结果: {result}")
elif isinstance(output, EventYield):
# 处理事件(LLM 调用、工具调用等)
event = output.event
print(f"事件: {event.event_type}")
返回值变化
启用事件流后,函数的返回值类型会发生变化:
llm_chat 默认模式
llm_chat 事件流模式
llm_function 默认模式
llm_function 事件流模式
async for chunk, updated_history in chat("Hello"):
print(chunk)
async for output in chat("Hello"):
if isinstance(output, ResponseYield):
print(output.response)
elif isinstance(output, EventYield):
print(f"事件: {output.event.event_type}")
result = await analyze_text("Hello")
print(result)
async for output in analyze_text("Hello"):
if isinstance(output, ResponseYield):
print(output.response)
elif isinstance(output, EventYield):
print(f"事件: {output.event.event_type}")
核心类型
ReactOutput
事件流的统一输出类型,本质是 ResponseYield | EventYield。
ResponseYield
代表响应数据,在 llm_function 中通常是解析后的最终结果。
EventYield
代表执行过程中的事件,包含事件对象与来源元数据。
EventOrigin
描述事件来源,可用于区分主链路和 fork 子链路。
ReactOutput
ReactOutput 是一个 Tagged Union 类型,包含两种可能的值:
from SimpleLLMFunc.hooks import ReactOutput, ResponseYield, EventYield
# ReactOutput = ResponseYield | EventYield
ResponseYield
响应数据,包含 LLM 的响应内容和消息历史:
@dataclass
class ResponseYield:
response: Union[LLMResponse, LLMStreamChunk, str]
messages: MessageList
type: Literal["response"] = "response"
说明:
llm_chat 事件流模式中,response 为原始响应对象或流式 chunk
llm_function 事件流模式中,response 为解析后的最终结果(如 str / Pydantic 对象)
EventYield
事件数据,包含 ReAct 循环中的各种事件:
@dataclass
class EventYield:
event: ReActEvent # 事件对象
origin: EventOrigin # 事件来源(会话/分叉/工具关联元数据)
type: Literal["event"] = "event"
EventOrigin(来源元数据)
EventOrigin 用于描述事件在调用树中的来源。常用字段如下:
session_id: 当前会话 ID(同一会话内稳定)
agent_call_id: 当前 agent 调用 ID
parent_agent_call_id: 父 agent 调用 ID(如果存在)
event_seq: 会话内递增事件序号
fork_id: fork 场景下的分叉 ID(主会话通常为 None)
fork_depth: 分叉深度(主会话通常为 0)
fork_seq: fork 内事件序号(若存在)
selfref_instance_id: SelfReference 实例标识(若存在)
memory_key / source_memory_key: selfref 记忆键上下文
tool_name / tool_call_id: 工具事件关联信息
快速示例:
from SimpleLLMFunc.hooks import is_event_yield
async for output in chat("请并行处理多个子任务"):
if not is_event_yield(output):
continue
if output.origin.fork_id:
print(
f"[fork:{output.origin.fork_id} depth={output.origin.fork_depth}] "
f"{output.event.event_type}"
)
else:
print(f"[main] {output.event.event_type}")
如何阅读这页
先上手
如果你第一次接触事件流,先看启用方式和返回值变化。
理解类型
重点理解 ReactOutput、ResponseYield、EventYield 和 EventOrigin。
看完整事件表
当你需要做 UI 路由、埋点或调试时,再读完整事件类型。
看 FAQ
如果你关心性能、状态修改或 llm_function 行为差异,直接看 FAQ。
事件类型
事件流包含以下类型的事件,按执行顺序排列:
1. ReactStartEvent
ReAct 循环开始事件,在循环开始时触发。
@dataclass
class ReactStartEvent(ReActEvent):
user_task_prompt: str # 用户任务提示
initial_messages: MessageList # 初始消息列表
available_tools: ToolDefinitionList # 可用工具列表
使用场景:初始化 UI、显示开始提示、记录 trace_id
2. ReactIterationStartEvent
ReAct 迭代开始事件,每次迭代开始时触发。
@dataclass
class ReactIterationStartEvent(ReActEvent):
current_messages: MessageList # 当前消息历史
使用场景:显示迭代进度、更新消息历史
3. LLMCallStartEvent
LLM 调用开始事件,在 LLM 调用前触发。
@dataclass
class LLMCallStartEvent(ReActEvent):
messages: MessageList # 消息列表
tools: ToolDefinitionList # 工具定义列表
llm_kwargs: Dict[str, Any] # LLM 调用参数
stream: bool # 是否流式调用
使用场景:显示”正在思考…”提示、记录调用参数
4. LLMChunkArriveEvent
LLM 流式 chunk 到达事件(仅流式模式)。
@dataclass
class LLMChunkArriveEvent(ReActEvent):
chunk: LLMStreamChunk # LLM 返回的 chunk 对象
accumulated_content: str # 累积的内容
chunk_index: int # Chunk 序号
使用场景:实时渲染流式响应、显示打字效果
5. LLMCallEndEvent
LLM 调用结束事件,在调用完成后触发。
@dataclass
class LLMCallEndEvent(ReActEvent):
response: LLMResponse # LLM 响应对象
messages: MessageList # 更新后的消息列表
tool_calls: List[ToolCall] # 提取的工具调用列表
execution_time: float # 执行耗时
usage: Optional[LLMUsage] # Token 使用统计
使用场景:显示 Token 使用量、记录性能指标、检查工具调用
工具调用批次开始事件,当 LLM 返回多个工具调用时触发。
@dataclass
class ToolCallsBatchStartEvent(ReActEvent):
tool_calls: List[ToolCall] # 工具调用列表
batch_size: int # 批次大小
使用场景:显示工具调用批次信息、准备并行执行
单个工具调用开始事件。
@dataclass
class ToolCallStartEvent(ReActEvent):
tool_name: str # 工具名称
tool_call_id: str # 工具调用 ID
arguments: ToolCallArguments # 工具调用参数
tool_call: ToolCall # 完整的工具调用对象
使用场景:显示工具调用信息、记录调用参数
工具调用参数增量事件(仅流式模式)。
@dataclass
class ToolCallArgumentsDeltaEvent(ReActEvent):
tool_name: str
tool_call_id: str
argname: str
argcontent_delta: str
使用场景:在工具调用尚未开始前,增量渲染参数拼装过程(适合 TUI 实时面板)。
单个工具调用结束事件(关键:立即获取结果)。
@dataclass
class ToolCallEndEvent(ReActEvent):
tool_name: str # 工具名称
tool_call_id: str # 工具调用 ID
arguments: ToolCallArguments # 工具调用参数
result: ToolResult # 工具执行结果
execution_time: float # 执行耗时
success: bool # 是否成功执行
使用场景:显示工具执行结果、更新 UI、记录执行时间
10. CustomEvent
工具内部主动发射的自定义事件,用于进度、日志、阶段状态等实时反馈。
@dataclass
class CustomEvent(ReActEvent):
event_name: str # 自定义事件名
data: Any = None # 自定义数据
tool_name: Optional[str] = None # 所属工具(如果可用)
tool_call_id: Optional[str] = None # 所属工具调用 ID(如果可用)
使用场景:在 TUI 或自定义 UI 中增量渲染工具执行过程(例如 PyRepl stdout/stderr、批处理进度)。
工具调用批次结束事件,批次全部完成后触发。
@dataclass
class ToolCallsBatchEndEvent(ReActEvent):
tool_results: List[ToolCallResult] # 所有工具调用结果
batch_size: int # 批次大小
total_execution_time: float # 总执行时间
success_count: int # 成功数量
error_count: int # 失败数量
使用场景:显示批次统计、性能分析
12. ReactIterationEndEvent
ReAct 迭代结束事件,每次迭代完成时触发。
@dataclass
class ReactIterationEndEvent(ReActEvent):
messages: MessageList # 更新后的消息历史
iteration_time: float # 迭代耗时
tool_calls_count: int # 本次迭代的工具调用数量
使用场景:显示迭代统计、更新进度
13. ReactEndEvent
ReAct 循环结束事件,循环结束时触发。
@dataclass
class ReactEndEvent(ReActEvent):
final_response: str # 最终响应内容
final_messages: MessageList # 完整的消息历史
total_iterations: int # 总迭代次数
total_execution_time: float # 总执行时间
total_tool_calls: int # 总工具调用次数
total_llm_calls: int # 总 LLM 调用次数
total_token_usage: Optional[LLMUsage] # 总 token 使用统计
当通过 AbortSignal 中断回合时,ReactEndEvent.extra 会附加:
aborted: true
abort_reason: <reason>(如果调用 abort(reason) 时提供了 reason)
使用场景:显示完整统计、性能分析、保存执行记录
Fork 场景事件路由
在 fork agent 场景中,事件大致分为两类:
-
标准 ReAct 事件(如
LLMCallStartEvent、ToolCallStartEvent)
- 通过
EventYield.origin.fork_id / fork_depth 标识归属 fork。
- 适合在 UI 层按 fork 维度拆分消息流与工具卡片。
-
selfref 生命周期与流式自定义事件(
CustomEvent)
- 生命周期:
selfref_fork_start / selfref_fork_spawned / selfref_fork_end / selfref_fork_error
- 流式文本:
selfref_fork_stream_open / selfref_fork_stream_delta / selfref_fork_stream_close
- 适合展示 fork 启停状态与子任务增量输出。
如果你的处理链只保留了 ReActEvent 对象,也可从 event.extra["origin"] 读取同源元数据。
使用示例
示例 1: 基本事件处理
import asyncio
from SimpleLLMFunc import llm_chat, tool
from SimpleLLMFunc.hooks import (
ReactOutput,
ResponseYield,
EventYield,
ReactStartEvent,
LLMCallStartEvent,
LLMChunkArriveEvent,
ToolCallStartEvent,
ToolCallEndEvent,
ReactEndEvent,
)
@tool(name="calculate", description="执行数学计算")
async def calculate(expression: str) -> str:
"""计算数学表达式"""
return str(eval(expression))
@llm_chat(
llm_interface=llm,
toolkit=[calculate],
stream=True,
enable_event=True,
)
async def chat(message: str, history=None):
"""智能助手"""
pass
async def main():
async for output in chat("帮我计算 25 * 4 + 18"):
if isinstance(output, EventYield):
event = output.event
if isinstance(event, ReactStartEvent):
print(f"🚀 开始处理 (trace_id: {event.trace_id[:8]}...)")
elif isinstance(event, LLMCallStartEvent):
print(f"🤖 LLM 调用开始 ({'流式' if event.stream else '非流式'})")
elif isinstance(event, LLMChunkArriveEvent):
# 实时显示流式响应
chunk_content = event.chunk.choices[0].delta.content
if chunk_content:
print(chunk_content, end="", flush=True)
elif isinstance(event, ToolCallStartEvent):
print(f"\n🛠️ 调用工具: {event.tool_name}")
print(f" 参数: {event.arguments}")
elif isinstance(event, ToolCallEndEvent):
print(f" ✅ 结果: {event.result} ({event.execution_time:.2f}s)")
elif isinstance(event, ReactEndEvent):
print(f"\n📊 统计:")
print(f" 总耗时: {event.total_execution_time:.2f}s")
print(f" LLM 调用: {event.total_llm_calls} 次")
print(f" 工具调用: {event.total_tool_calls} 次")
if event.total_token_usage:
print(f" Token: {event.total_token_usage.total_tokens}")
elif isinstance(output, ResponseYield):
# 处理响应数据(如果需要)
pass
asyncio.run(main())
示例 2: 使用类型守卫
from SimpleLLMFunc.hooks import is_response_yield, is_event_yield
async for output in chat("Hello"):
if is_response_yield(output):
# 类型检查器会知道 output 是 ResponseYield
print(output.response)
elif is_event_yield(output):
# 类型检查器会知道 output 是 EventYield
print(output.event.event_type)
示例 3: 使用过滤器
from SimpleLLMFunc.hooks import (
responses_only,
events_only,
filter_events,
ReActEventType,
)
# 只获取响应(向后兼容)
async for response, history in responses_only(chat("Hello")):
print(response)
# 只获取事件
async for event in events_only(chat("Hello")):
print(f"事件: {event.event_type}")
# 过滤特定事件类型
async for event in filter_events(
chat("查询天气"),
event_types={ReActEventType.TOOL_CALL_END}
):
print(f"工具调用完成: {event.tool_name}")
print(f"结果: {event.result}")
示例 4: 事件观测器
使用 @with_event_observer 装饰器自动处理所有事件:
from SimpleLLMFunc.hooks import with_event_observer
async def log_event(event: ReActEvent):
"""记录所有事件到日志系统"""
print(f"[{event.timestamp}] {event.event_type}: {event.func_name}")
@with_event_observer(log_event)
@llm_chat(llm_interface=llm, enable_event=True)
async def observed_chat(message: str, history=None):
"""带事件观测的聊天"""
pass
# 使用
async for output in observed_chat("Hello"):
# 所有事件都会自动被 log_event 处理
if is_response_yield(output):
print(output.response)
示例 5: 完整的事件流 Chatbot
参考 examples/event_stream_chatbot.py 获取一个功能完整的示例,包括:
- 实时流式响应渲染(使用 Rich 库)
- 工具调用可视化
- 完整的执行统计
- 美观的终端 UI
最佳实践
1. 事件处理顺序
事件按照 ReAct 循环的执行顺序产生,建议按照以下顺序处理:
ReactStartEvent
→ ReactIterationStartEvent
→ LLMCallStartEvent
→ LLMChunkArriveEvent (流式模式)
→ LLMCallEndEvent
→ ToolCallsBatchStartEvent (如果有工具调用)
→ ToolCallStartEvent
→ ToolCallEndEvent
→ ...
→ ToolCallsBatchEndEvent
→ ReactIterationEndEvent
→ (重复迭代...)
→ ReactEndEvent
2. 性能考虑
- 事件处理应该是轻量级的,避免阻塞主流程
- 使用异步操作处理事件(如日志记录、UI 更新)
- 对于高频事件(如
LLMChunkArriveEvent),考虑批量处理
3. 错误处理
事件处理中的错误不应影响主流程:
async for output in chat("Hello"):
if is_event_yield(output):
try:
# 处理事件
handle_event(output.event)
except Exception as e:
# 记录错误但不中断流程
logger.error(f"事件处理失败: {e}")
4. UI 更新
使用事件流构建响应式 UI:
class ChatUI:
def __init__(self):
self.response_text = ""
self.tool_calls = []
async def handle_event(self, event: ReActEvent):
if isinstance(event, LLMChunkArriveEvent):
# 实时更新响应文本
self.response_text = event.accumulated_content
self.update_ui()
elif isinstance(event, ToolCallEndEvent):
# 更新工具调用列表
self.tool_calls.append({
"name": event.tool_name,
"result": event.result,
})
self.update_ui()
5. 状态管理和修改(Wrapper 模式)
由于 Agent 是无状态的,所有状态都通过参数和返回值传递。通过 wrapper 函数,你可以:
- 监控状态:通过事件获取内部状态
- 修改状态:拦截和修改
history 参数
- 实现高级功能:Context Compression、状态持久化等
5.1 基础 Wrapper:状态监控
from typing import List, Dict, Any
from SimpleLLMFunc.hooks import ReactOutput, ResponseYield, EventYield, ReactEndEvent
def with_state_monitoring(chat_func):
"""Wrapper:监控 Agent 的状态变化"""
async def wrapper(message: str, history: List[Dict[str, str]] = None):
# 记录初始状态
initial_history_length = len(history) if history else 0
print(f"初始历史长度: {initial_history_length}")
# 代理调用
async for output in chat_func(message, history):
if isinstance(output, EventYield):
event = output.event
# 监控状态变化
if isinstance(event, ReactEndEvent):
print(f"最终历史长度: {len(event.final_messages)}")
print(f"总迭代次数: {event.total_iterations}")
print(f"总工具调用: {event.total_tool_calls}")
yield output
return wrapper
# 使用
@llm_chat(llm_interface=llm, enable_event=True)
async def chat(message: str, history=None):
"""智能助手"""
pass
# 包装函数
monitored_chat = with_state_monitoring(chat)
# 使用包装后的函数
async for output in monitored_chat("Hello", history=[]):
if isinstance(output, ResponseYield):
print(output.response)
5.2 高级 Wrapper:Context Compression
通过修改 history 参数,可以实现 Context Compression(上下文压缩):
from typing import List, Dict, Optional
from SimpleLLMFunc.hooks import ResponseYield, EventYield
from copy import deepcopy
def with_context_compression(
chat_func,
max_history_length: int = 10,
compression_strategy: str = "keep_recent"
):
"""
Wrapper:实现上下文压缩
Args:
chat_func: 原始的 chat 函数
max_history_length: 最大历史长度
compression_strategy: 压缩策略("keep_recent" 或 "keep_important")
"""
def compress_history(history: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""压缩历史记录"""
if not history or len(history) <= max_history_length:
return history
if compression_strategy == "keep_recent":
# 保留最近的消息
return history[-max_history_length:]
elif compression_strategy == "keep_important":
# 保留系统消息和最近的消息
system_messages = [msg for msg in history if msg.get("role") == "system"]
recent_messages = history[-max_history_length:]
# 去重(如果系统消息已经在最近消息中)
combined = system_messages + recent_messages
seen = set()
result = []
for msg in combined:
msg_id = id(msg) # 使用对象 ID 去重
if msg_id not in seen:
seen.add(msg_id)
result.append(msg)
return result
else:
return history[-max_history_length:]
async def wrapper(message: str, history: List[Dict[str, str]] = None):
# 压缩输入的历史记录
compressed_history = compress_history(history or [])
# 代理调用
async for output in chat_func(message, compressed_history):
if isinstance(output, ResponseYield):
# 压缩返回的历史记录
compressed_messages = compress_history(output.messages)
# 创建新的 ResponseYield,包含压缩后的历史
yield ResponseYield(
response=output.response,
messages=compressed_messages
)
else:
yield output
return wrapper
# 使用
@llm_chat(llm_interface=llm, enable_event=True)
async def chat(message: str, history=None):
"""智能助手"""
pass
# 包装函数(最多保留 10 条历史)
compressed_chat = with_context_compression(chat, max_history_length=10)
# 使用
async for output in compressed_chat("Hello", history=long_history):
if isinstance(output, ResponseYield):
print(output.response)
5.3 高级 Wrapper:状态持久化
import json
from pathlib import Path
from typing import List, Dict, Optional
def with_state_persistence(chat_func, state_file: str = "chat_state.json"):
"""
Wrapper:实现状态持久化
自动保存和恢复对话历史
"""
state_path = Path(state_file)
async def wrapper(message: str, history: List[Dict[str, str]] = None):
# 恢复状态
if history is None and state_path.exists():
try:
with open(state_path, 'r', encoding='utf-8') as f:
history = json.load(f)
print(f"已恢复历史记录: {len(history)} 条消息")
except Exception as e:
print(f"恢复状态失败: {e}")
history = []
# 代理调用
final_history = history or []
async for output in chat_func(message, history):
if isinstance(output, ResponseYield):
# 更新历史
final_history = output.messages
# 保存状态
try:
with open(state_path, 'w', encoding='utf-8') as f:
json.dump(final_history, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存状态失败: {e}")
yield output
return wrapper
# 使用
@llm_chat(llm_interface=llm, enable_event=True)
async def chat(message: str, history=None):
"""智能助手"""
pass
# 包装函数
persistent_chat = with_state_persistence(chat, "my_chat_state.json")
# 使用(自动保存和恢复)
async for output in persistent_chat("Hello"):
if isinstance(output, ResponseYield):
print(output.response)
5.4 组合多个 Wrapper
你可以组合多个 wrapper 实现复杂的功能:
# 组合多个 wrapper
@llm_chat(llm_interface=llm, enable_event=True)
async def chat(message: str, history=None):
"""智能助手"""
pass
# 组合:监控 + 压缩 + 持久化
enhanced_chat = with_state_persistence(
with_context_compression(
with_state_monitoring(chat),
max_history_length=10
),
state_file="chat_state.json"
)
# 使用
async for output in enhanced_chat("Hello"):
if isinstance(output, ResponseYield):
print(output.response)
6. 向后兼容
如果需要同时支持事件流和传统模式,可以使用 responses_only:
# 启用事件流但只处理响应
@llm_chat(llm_interface=llm, enable_event=True)
async def chat(message: str, history=None):
pass
# 使用 responses_only 获得传统 API
async for response, history in responses_only(chat("Hello")):
print(response)
7. 中断与取消(AbortSignal)
事件流模式下可以通过 AbortSignal 中断当前回合,停止流式输出并取消正在执行的工具调用。
中断后仍会收到 ReactEndEvent(extra.aborted == true)。
from SimpleLLMFunc.hooks import AbortSignal, ABORT_SIGNAL_PARAM
abort_signal = AbortSignal()
async for output in chat(
"请列出 30 个要点",
history=[],
**{ABORT_SIGNAL_PARAM: abort_signal},
):
...
abort_signal.abort("user_interrupt")
更完整的说明请参阅 中断与取消。
常见问题
Q: 事件流会影响性能吗?
A: 事件流本身是轻量级的,不会显著影响性能。事件处理是异步的,不会阻塞主流程。如果你担心性能,可以在生产环境中禁用事件流(enable_event=False)。
Q: 如何只监听特定类型的事件?
A: 使用 filter_events 函数:
async for event in filter_events(
chat("Hello"),
event_types={ReActEventType.TOOL_CALL_END}
):
# 只处理工具调用结束事件
pass
Q: 事件流和流式响应有什么区别?
A:
- 流式响应 (
stream=True):控制 LLM 是否以流式方式返回响应
- 事件流 (
enable_event=True):控制是否产生事件,用于观察执行过程
两者可以独立使用,也可以同时启用。
Q: 可以在事件处理中修改执行流程吗?
A: 事件是只读的观察点,不能直接修改执行流程。但你可以通过 wrapper 函数在调用前后修改 history 参数,从而间接影响 Agent 的行为。例如,可以实现 Context Compression、状态过滤等功能。
Q: 如何修改 Agent 的状态?
A: 由于 Agent 是无状态的,所有状态都通过 history 参数传递。你可以:
- 在调用前修改:通过 wrapper 函数修改传入的
history 参数
- 在调用后修改:通过 wrapper 函数修改返回的
history(在 ResponseYield 中)
- 通过事件监控:通过事件获取状态信息,但修改需要通过 wrapper 实现
详见“状态管理和修改(Wrapper 模式)”章节。
Q: llm_function 支持事件流吗?
A: 是的!@llm_function 也执行 ReAct 循环,从 v0.5.0+ 开始完全支持事件流。当 enable_event=True 时,@llm_function 返回一个生成器,yield 事件和最终响应。
重要区别:@llm_function 的 ResponseYield.response 包含的是解析后的结果(str、Pydantic 对象等),而不是原始的 ChatCompletion 对象。这符合 llm_function 的设计理念:将 LLM 响应转换为指定的返回类型。
示例:
from SimpleLLMFunc import llm_function
from SimpleLLMFunc.hooks.stream import is_response_yield
@llm_function(llm_interface=llm, enable_event=True)
async def summarize(text: str) -> str:
"""生成摘要"""
pass
async for output in summarize(text="..."):
if is_response_yield(output):
# output.response 是解析后的字符串,不是 ChatCompletion
print(output.response) # 直接是摘要文本
参考示例:
Q: 事件中的 trace_id 有什么用?
A: trace_id 用于追踪整个 ReAct 循环的执行过程,可以用于日志关联、调试和性能分析。
相关资源