|
|
---
|
|
|
title: "第六章:Callback 与 Trace(可观测性)"
|
|
|
---
|
|
|
|
|
|
本章目标:理解 Callback 机制,集成 CozeLoop 实现链路追踪和可观测性。
|
|
|
|
|
|
## 代码位置
|
|
|
|
|
|
- 入口代码:[cmd/ch06/main.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/cmd/ch06/main.go)
|
|
|
|
|
|
## 前置条件
|
|
|
|
|
|
与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。同时,需要与第四章一样设置 `PROJECT_ROOT`:
|
|
|
|
|
|
```bash
|
|
|
export PROJECT_ROOT=/path/to/eino # Eino 核心库根目录(不设置则默认使用当前目录)
|
|
|
```
|
|
|
|
|
|
可选:配置 CozeLoop 实现链路追踪:
|
|
|
|
|
|
```bash
|
|
|
export COZELOOP_WORKSPACE_ID=your_workspace_id
|
|
|
export COZELOOP_API_TOKEN=your_token
|
|
|
```
|
|
|
|
|
|
## 运行
|
|
|
|
|
|
在 `examples/quickstart/chatwitheino` 目录下执行:
|
|
|
|
|
|
```bash
|
|
|
# 设置项目根目录
|
|
|
export PROJECT_ROOT=/path/to/your/project
|
|
|
|
|
|
# 可选:配置 CozeLoop
|
|
|
export COZELOOP_WORKSPACE_ID=your_workspace_id
|
|
|
export COZELOOP_API_TOKEN=your_token
|
|
|
|
|
|
go run ./cmd/ch06
|
|
|
```
|
|
|
|
|
|
输出示例:
|
|
|
|
|
|
```text
|
|
|
[trace] starting session: 083d16da-6b13-4fe6-afb0-c45d8f490ce1
|
|
|
you> 你好
|
|
|
[trace] chat_model_generate: model=gpt-4.1-mini tokens=150
|
|
|
[trace] tool_call: name=list_files duration=23ms
|
|
|
[assistant] 你好!有什么我可以帮助你的吗?
|
|
|
```
|
|
|
|
|
|
## 从黑盒到白盒:为什么需要 Callback
|
|
|
|
|
|
前几章我们实现的 Agent 是一个"黑盒":输入问题,输出答案,但中间发生了什么我们并不清楚。
|
|
|
|
|
|
**黑盒的问题:**
|
|
|
- 不知道模型调用了多少次
|
|
|
- 不知道 Tool 执行了多长时间
|
|
|
- 不知道 Token 消耗了多少
|
|
|
- 出问题时难以定位原因
|
|
|
|
|
|
**Callback 的定位:**
|
|
|
- **Callback 是 Eino 的旁路机制**:从 component 到 compose(下文详谈)到 adk,一以贯之
|
|
|
- **Callback 在固定点位触发**:组件生命周期的 5 个关键时机
|
|
|
- **Callback 可抽取实时信息**:输入、输出、错误、流式数据等
|
|
|
- **Callback 用途广泛**:观测、日志、指标、追踪、调试、审计等
|
|
|
|
|
|
**简单类比:**
|
|
|
- **Agent** = "业务逻辑"(主路)
|
|
|
- **Callback** = "旁路钩子"(在固定点位抽取信息)
|
|
|
|
|
|
## 关键概念
|
|
|
|
|
|
### Handler 接口
|
|
|
|
|
|
`Handler` 是 Eino 中定义回调处理器的核心接口:
|
|
|
|
|
|
```go
|
|
|
type Handler interface {
|
|
|
// 非流式输入(组件开始处理前)
|
|
|
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
|
|
|
|
|
|
// 非流式输出(组件成功返回后)
|
|
|
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
|
|
|
|
|
|
// 错误(组件返回错误时)
|
|
|
OnError(ctx context.Context, info *RunInfo, err error) context.Context
|
|
|
|
|
|
// 流式输入(组件接收流式输入时)
|
|
|
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
|
|
|
input *schema.StreamReader[CallbackInput]) context.Context
|
|
|
|
|
|
// 流式输出(组件返回流式输出时)
|
|
|
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
|
|
|
output *schema.StreamReader[CallbackOutput]) context.Context
|
|
|
}
|
|
|
```
|
|
|
|
|
|
**设计理念:**
|
|
|
- **旁路机制**:不干扰主流程,在固定点位抽取信息
|
|
|
- **全流程覆盖**:从 component 到 compose 到 adk,所有组件都支持
|
|
|
- **状态传递**:同一 Handler 的 OnStart→OnEnd 可通过 context 传递状态
|
|
|
- **性能优化**:实现 `TimingChecker` 接口可跳过不需要的时机
|
|
|
|
|
|
**RunInfo 结构:**
|
|
|
```go
|
|
|
type RunInfo struct {
|
|
|
Name string // 业务名称(节点名或用户指定)
|
|
|
Type string // 实现类型(如 "OpenAI")
|
|
|
Component string // 组件类型(如 "ChatModel")
|
|
|
}
|
|
|
```
|
|
|
|
|
|
**重要提示:**
|
|
|
- 流式回调必须关闭 StreamReader,否则会导致 goroutine 泄漏
|
|
|
- 不要修改 Input/Output,它们被所有下游共享
|
|
|
- RunInfo 可能为 nil,使用前需要检查
|
|
|
|
|
|
### CozeLoop
|
|
|
|
|
|
CozeLoop 是字节跳动开源的 AI 应用可观测性平台,提供了:
|
|
|
|
|
|
- **链路追踪**:完整的调用链路可视化
|
|
|
- **指标监控**:延迟、Token 消耗、错误率等
|
|
|
- **日志聚合**:集中管理所有日志
|
|
|
- **调试支持**:在线查看和调试
|
|
|
|
|
|
**集成方式:**
|
|
|
|
|
|
```go
|
|
|
import (
|
|
|
clc "github.com/cloudwego/eino-ext/callbacks/cozeloop"
|
|
|
"github.com/cloudwego/eino/callbacks"
|
|
|
"github.com/coze-dev/cozeloop-go"
|
|
|
)
|
|
|
|
|
|
// 创建 CozeLoop 客户端
|
|
|
client, err := cozeloop.NewClient(
|
|
|
cozeloop.WithAPIToken(apiToken),
|
|
|
cozeloop.WithWorkspaceID(workspaceID),
|
|
|
)
|
|
|
|
|
|
// 注册为全局 Callback
|
|
|
callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))
|
|
|
```
|
|
|
|
|
|
### Callback 的触发时机
|
|
|
|
|
|
Callback 在组件生命周期的 5 个关键时机触发。下表中 `Timing*` 是 Eino 内部常量名(用于 `TimingChecker` 接口),对应的 Handler 接口方法是右侧所示:
|
|
|
|
|
|
| 时机常量 | 对应 Handler 方法 | 触发点 | 输入/输出 |
|
|
|
|------|--------|--------|-----------|
|
|
|
| `TimingOnStart` | `OnStart` | 组件开始处理前 | CallbackInput |
|
|
|
| `TimingOnEnd` | `OnEnd` | 组件成功返回后 | CallbackOutput |
|
|
|
| `TimingOnError` | `OnError` | 组件返回错误时 | error |
|
|
|
| `TimingOnStartWithStreamInput` | `OnStartWithStreamInput` | 组件接收流式输入时 | StreamReader[CallbackInput] |
|
|
|
| `TimingOnEndWithStreamOutput` | `OnEndWithStreamOutput` | 组件返回流式输出时 | StreamReader[CallbackOutput] |
|
|
|
|
|
|
**示例:ChatModel 调用流程**
|
|
|
|
|
|
```
|
|
|
┌─────────────────────────────────────────┐
|
|
|
│ ChatModel.Generate(ctx, messages) │
|
|
|
└─────────────────────────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ OnStart │ ← 输入: CallbackInput (messages)
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 模型处理 │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ OnEnd │ ← 输出: CallbackOutput (response)
|
|
|
└──────────────────────┘
|
|
|
```
|
|
|
|
|
|
**示例:流式输出流程**
|
|
|
|
|
|
```
|
|
|
┌─────────────────────────────────────────┐
|
|
|
│ ChatModel.Stream(ctx, messages) │
|
|
|
└─────────────────────────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ OnStart │ ← 输入: CallbackInput (messages)
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 模型处理(流式) │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ OnEndWithStreamOutput │ ← 输出: StreamReader[CallbackOutput]
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 逐个 chunk 返回 │
|
|
|
└──────────────────────┘
|
|
|
```
|
|
|
|
|
|
**注意:**
|
|
|
- 流式错误(stream 中途出错)不会触发 OnError,而是在 StreamReader 中返回
|
|
|
- 同一 Handler 的 OnStart→OnEnd 可通过 context 传递状态
|
|
|
- 不同 Handler 之间没有执行顺序保证
|
|
|
|
|
|
## Callback 的实现
|
|
|
|
|
|
### 1. 实现自定义 Callback Handler
|
|
|
|
|
|
完整实现 `Handler` 接口需要实现所有 5 个方法,较为繁琐。Eino 提供了 `callbacks.HandlerHelper` 帮助类来简化实现:
|
|
|
|
|
|
```go
|
|
|
import "github.com/cloudwego/eino/callbacks"
|
|
|
|
|
|
// 使用 NewHandlerHelper 注册感兴趣的回调
|
|
|
handler := callbacks.NewHandlerHelper().
|
|
|
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
|
|
|
log.Printf("[trace] %s/%s start", info.Component, info.Name)
|
|
|
return ctx
|
|
|
}).
|
|
|
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
|
|
|
log.Printf("[trace] %s/%s end", info.Component, info.Name)
|
|
|
return ctx
|
|
|
}).
|
|
|
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
|
|
|
log.Printf("[trace] %s/%s error: %v", info.Component, info.Name, err)
|
|
|
return ctx
|
|
|
}).
|
|
|
Handler()
|
|
|
|
|
|
// 注册为全局 Callback
|
|
|
callbacks.AppendGlobalHandlers(handler)
|
|
|
```
|
|
|
|
|
|
**注意**:`RunInfo` 可能为 `nil`(如顶层调用没有 RunInfo),使用前请检查。
|
|
|
|
|
|
### 2. 集成 CozeLoop
|
|
|
|
|
|
```go
|
|
|
func setupCozeLoop(ctx context.Context) (*cozeloop.Client, error) {
|
|
|
apiToken := os.Getenv("COZELOOP_API_TOKEN")
|
|
|
workspaceID := os.Getenv("COZELOOP_WORKSPACE_ID")
|
|
|
|
|
|
if apiToken == "" || workspaceID == "" {
|
|
|
return nil, nil // 未配置则跳过
|
|
|
}
|
|
|
|
|
|
client, err := cozeloop.NewClient(
|
|
|
cozeloop.WithAPIToken(apiToken),
|
|
|
cozeloop.WithWorkspaceID(workspaceID),
|
|
|
)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
// 注册为全局 Callback
|
|
|
callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))
|
|
|
|
|
|
return client, nil
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### 3. 在 main 中使用
|
|
|
|
|
|
```go
|
|
|
func main() {
|
|
|
ctx := context.Background()
|
|
|
|
|
|
// 设置 CozeLoop(可选)
|
|
|
client, err := setupCozeLoop(ctx)
|
|
|
if err != nil {
|
|
|
log.Printf("cozeloop setup failed: %v", err)
|
|
|
}
|
|
|
if client != nil {
|
|
|
defer func() {
|
|
|
time.Sleep(5 * time.Second) // 等待数据上报
|
|
|
client.Close(ctx)
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
// 创建 Agent 并运行...
|
|
|
}
|
|
|
```
|
|
|
|
|
|
**关键代码片段(**注意:这是简化后的代码片段,不能直接运行,完整代码请参考** [cmd/ch06/main.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/cmd/ch06/main.go)):
|
|
|
|
|
|
```go
|
|
|
// 设置 CozeLoop 追踪
|
|
|
cozeloopApiToken := os.Getenv("COZELOOP_API_TOKEN")
|
|
|
cozeloopWorkspaceID := os.Getenv("COZELOOP_WORKSPACE_ID")
|
|
|
if cozeloopApiToken != "" && cozeloopWorkspaceID != "" {
|
|
|
client, err := cozeloop.NewClient(
|
|
|
cozeloop.WithAPIToken(cozeloopApiToken),
|
|
|
cozeloop.WithWorkspaceID(cozeloopWorkspaceID),
|
|
|
)
|
|
|
if err != nil {
|
|
|
log.Fatalf("cozeloop.NewClient failed: %v", err)
|
|
|
}
|
|
|
defer func() {
|
|
|
time.Sleep(5 * time.Second)
|
|
|
client.Close(ctx)
|
|
|
}()
|
|
|
callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))
|
|
|
}
|
|
|
```
|
|
|
|
|
|
## 可观测性的价值
|
|
|
|
|
|
### 1. 性能分析
|
|
|
|
|
|
通过 Callback 收集的数据,可以分析:
|
|
|
- 模型调用延迟分布
|
|
|
- Tool 执行时间排行
|
|
|
- Token 消耗趋势
|
|
|
|
|
|
### 2. 错误追踪
|
|
|
|
|
|
当 Agent 出现问题时:
|
|
|
- 查看完整的调用链路
|
|
|
- 定位是哪个环节出错
|
|
|
- 分析错误原因
|
|
|
|
|
|
### 3. 成本优化
|
|
|
|
|
|
通过 Token 消耗数据:
|
|
|
- 识别高消耗的对话
|
|
|
- 优化 Prompt 减少 Token
|
|
|
- 选择更经济的模型
|
|
|
|
|
|
## 本章小结
|
|
|
|
|
|
- **Callback**:Eino 的观测钩子,在关键节点触发回调
|
|
|
- **CozeLoop**:字节跳动的 AI 应用可观测性平台
|
|
|
- **全局注册**:通过 `callbacks.AppendGlobalHandlers` 注册全局 Callback
|
|
|
- **非侵入式**:业务代码不需要修改,Callback 自动触发
|
|
|
- **可观测性价值**:性能分析、错误追踪、成本优化
|
|
|
|
|
|
## 扩展思考
|
|
|
|
|
|
**其他 Callback 实现:**
|
|
|
- OpenTelemetry Callback:对接标准可观测性协议
|
|
|
- 自定义日志 Callback:记录到本地文件
|
|
|
- 指标 Callback:对接 Prometheus 等监控系统
|
|
|
|
|
|
**高级用法:**
|
|
|
- 在 Callback 中实现采样(只记录部分请求)
|
|
|
- 在 Callback 中实现限流(根据 Token 消耗)
|
|
|
- 在 Callback 中实现告警(错误率过高时通知)
|