|
|
---
|
|
|
title: "第七章:Interrupt/Resume(中断与恢复)"
|
|
|
---
|
|
|
|
|
|
本章目标:理解 Interrupt/Resume 机制,实现 Tool 审批流程,让用户在敏感操作前进行确认。
|
|
|
|
|
|
## 代码位置
|
|
|
|
|
|
- 入口代码:[cmd/ch07/main.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/cmd/ch07/main.go)
|
|
|
|
|
|
## 前置条件
|
|
|
|
|
|
与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。同时,需要与第四章一样设置 `PROJECT_ROOT`:
|
|
|
|
|
|
```bash
|
|
|
export PROJECT_ROOT=/path/to/eino # Eino 核心库根目录(不设置则默认使用当前目录)
|
|
|
```
|
|
|
|
|
|
## 运行
|
|
|
|
|
|
在 `examples/quickstart/chatwitheino` 目录下执行:
|
|
|
|
|
|
```bash
|
|
|
# 设置项目根目录
|
|
|
export PROJECT_ROOT=/path/to/your/project
|
|
|
|
|
|
go run ./cmd/ch07
|
|
|
```
|
|
|
|
|
|
输出示例:
|
|
|
|
|
|
```text
|
|
|
you> 请执行命令 echo hello
|
|
|
|
|
|
⚠️ Approval Required ⚠️
|
|
|
Tool: execute
|
|
|
Arguments: {"command":"echo hello"}
|
|
|
|
|
|
Approve this action? (y/n): y
|
|
|
[tool result] hello
|
|
|
|
|
|
hello
|
|
|
```
|
|
|
|
|
|
## 从自动执行到人工审批:为什么需要 Interrupt
|
|
|
|
|
|
前几章我们实现的 Agent 会自动执行所有 Tool 调用,但在某些场景下这是危险的:
|
|
|
|
|
|
**自动执行的风险:**
|
|
|
- 删除文件:误删重要数据
|
|
|
- 发送邮件:发送错误内容
|
|
|
- 执行命令:执行危险操作
|
|
|
- 修改配置:破坏系统设置
|
|
|
|
|
|
**Interrupt 的定位:**
|
|
|
- **Interrupt 是 Agent 的暂停机制**:在关键操作前暂停,等待用户确认
|
|
|
- **Interrupt 可携带信息**:向用户展示即将执行的操作
|
|
|
- **Interrupt 可恢复**:用户确认后继续执行,拒绝后返回错误
|
|
|
|
|
|
**简单类比:**
|
|
|
- **自动执行** = "自动驾驶"(完全信任系统)
|
|
|
- **Interrupt** = "人工接管"(关键决策由人来做)
|
|
|
|
|
|
## 关键概念
|
|
|
|
|
|
### Interrupt 机制
|
|
|
|
|
|
`Interrupt` 是 Eino 中实现人机协作的核心机制。
|
|
|
|
|
|
**核心思想:在执行关键操作前暂停,等待用户确认后继续。**
|
|
|
|
|
|
一个需要审批的 Tool 的执行被分成**两个阶段**:
|
|
|
|
|
|
1. **第一次调用(触发中断)**:Tool 保存当前参数,然后返回一个中断信号。Runner 暂停执行,向调用侧返回 Interrupt 事件。
|
|
|
2. **用户审批后恢复(Resume)**:Runner 重新调用 Tool,此时 Tool 检测到"已中断过",直接读取用户的审批结果并执行(或拒绝)。
|
|
|
|
|
|
**简化版伪代码:**
|
|
|
|
|
|
```
|
|
|
func myTool(ctx, args):
|
|
|
if 第一次调用:
|
|
|
保存 args
|
|
|
return 中断信号 // Runner 暂停,展示审批提示
|
|
|
else: // Resume 后的第二次调用
|
|
|
if 用户批准:
|
|
|
return 执行操作(保存的 args)
|
|
|
else:
|
|
|
return "操作被用户拒绝"
|
|
|
```
|
|
|
|
|
|
**完整代码及关键字段说明:**
|
|
|
|
|
|
```go
|
|
|
// 在 Tool 中触发中断
|
|
|
func myTool(ctx context.Context, args string) (string, error) {
|
|
|
// wasInterrupted: 是否是 Resume 后的第二次调用(第一次为 false,Resume 后为 true)
|
|
|
// storedArgs: 第一次调用时通过 StatefulInterrupt 保存的参数,Resume 后可取回
|
|
|
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
|
|
|
|
|
|
if !wasInterrupted {
|
|
|
// 第一次调用:触发中断,同时保存 args 供 Resume 后使用
|
|
|
return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
|
|
|
ToolName: "my_tool",
|
|
|
ArgumentsInJSON: args,
|
|
|
}, args) // 第三个参数是要保存的状态(Resume 后通过 storedArgs 取回)
|
|
|
}
|
|
|
|
|
|
// Resume 后的第二次调用:读取用户审批结果
|
|
|
// isTarget: 本次 Resume 是否针对当前 Tool(一次 Resume 只针对一个 Tool)
|
|
|
// hasData: Resume 时是否携带了审批结果数据
|
|
|
// data: 用户传入的审批结果
|
|
|
isTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
|
|
|
if isTarget && hasData {
|
|
|
if data.Approved {
|
|
|
return doSomething(storedArgs) // 使用保存的参数执行实际操作
|
|
|
}
|
|
|
return "Operation rejected by user", nil
|
|
|
}
|
|
|
|
|
|
// 其他情况(isTarget=false 意味着本次 Resume 目标不是当前 Tool):重新中断
|
|
|
return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
|
|
|
ToolName: "my_tool",
|
|
|
ArgumentsInJSON: storedArgs,
|
|
|
}, storedArgs)
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### ApprovalMiddleware
|
|
|
|
|
|
`ApprovalMiddleware` 是一个通用的审批中间件,可以拦截特定 Tool 的调用:
|
|
|
|
|
|
```go
|
|
|
type approvalMiddleware struct {
|
|
|
*adk.BaseChatModelAgentMiddleware
|
|
|
}
|
|
|
|
|
|
func (m *approvalMiddleware) WrapInvokableToolCall(
|
|
|
_ context.Context,
|
|
|
endpoint adk.InvokableToolCallEndpoint,
|
|
|
tCtx *adk.ToolContext,
|
|
|
) (adk.InvokableToolCallEndpoint, error) {
|
|
|
// 只拦截需要审批的 Tool
|
|
|
if tCtx.Name != "execute" {
|
|
|
return endpoint, nil
|
|
|
}
|
|
|
|
|
|
return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
|
|
|
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
|
|
|
|
|
|
if !wasInterrupted {
|
|
|
return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
|
|
|
ToolName: tCtx.Name,
|
|
|
ArgumentsInJSON: args,
|
|
|
}, args)
|
|
|
}
|
|
|
|
|
|
isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
|
|
|
if isTarget && hasData {
|
|
|
if data.Approved {
|
|
|
return endpoint(ctx, storedArgs, opts...)
|
|
|
}
|
|
|
if data.DisapproveReason != nil {
|
|
|
return fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason), nil
|
|
|
}
|
|
|
return fmt.Sprintf("tool '%s' disapproved", tCtx.Name), nil
|
|
|
}
|
|
|
|
|
|
// 重新中断
|
|
|
return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
|
|
|
ToolName: tCtx.Name,
|
|
|
ArgumentsInJSON: storedArgs,
|
|
|
}, storedArgs)
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
func (m *approvalMiddleware) WrapStreamableToolCall(
|
|
|
_ context.Context,
|
|
|
endpoint adk.StreamableToolCallEndpoint,
|
|
|
tCtx *adk.ToolContext,
|
|
|
) (adk.StreamableToolCallEndpoint, error) {
|
|
|
// 如果 agent 配置了 StreamingShell,则 execute 会走流式调用,需要实现该方法才能拦截到
|
|
|
if tCtx.Name != "execute" {
|
|
|
return endpoint, nil
|
|
|
}
|
|
|
return func(ctx context.Context, args string, opts ...tool.Option) (*schema.StreamReader[string], error) {
|
|
|
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
|
|
|
if !wasInterrupted {
|
|
|
return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
|
|
|
ToolName: tCtx.Name,
|
|
|
ArgumentsInJSON: args,
|
|
|
}, args)
|
|
|
}
|
|
|
|
|
|
isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
|
|
|
if isTarget && hasData {
|
|
|
if data.Approved {
|
|
|
return endpoint(ctx, storedArgs, opts...)
|
|
|
}
|
|
|
if data.DisapproveReason != nil {
|
|
|
return singleChunkReader(fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason)), nil
|
|
|
}
|
|
|
return singleChunkReader(fmt.Sprintf("tool '%s' disapproved", tCtx.Name)), nil
|
|
|
}
|
|
|
|
|
|
isTarget, _, _ = tool.GetResumeContext[any](ctx)
|
|
|
if !isTarget {
|
|
|
return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
|
|
|
ToolName: tCtx.Name,
|
|
|
ArgumentsInJSON: storedArgs,
|
|
|
}, storedArgs)
|
|
|
}
|
|
|
|
|
|
return endpoint(ctx, storedArgs, opts...)
|
|
|
}, nil
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### CheckPointStore
|
|
|
|
|
|
`CheckPointStore` 是实现中断恢复的关键组件:
|
|
|
|
|
|
```go
|
|
|
type CheckPointStore interface {
|
|
|
// 保存检查点
|
|
|
Put(ctx context.Context, key string, checkpoint *Checkpoint) error
|
|
|
|
|
|
// 获取检查点
|
|
|
Get(ctx context.Context, key string) (*Checkpoint, error)
|
|
|
}
|
|
|
```
|
|
|
|
|
|
**为什么需要 CheckPointStore?**
|
|
|
- 中断时保存状态:Tool 参数、执行位置等
|
|
|
- 恢复时加载状态:从中断点继续执行
|
|
|
- 支持跨进程恢复:进程重启后仍可恢复
|
|
|
|
|
|
## Interrupt/Resume 的实现
|
|
|
|
|
|
### 1. 配置 Runner 使用 CheckPointStore
|
|
|
|
|
|
```go
|
|
|
runner := adk.NewRunner(ctx, adk.RunnerConfig{
|
|
|
Agent: agent,
|
|
|
EnableStreaming: true,
|
|
|
CheckPointStore: adkstore.NewInMemoryStore(), // 内存存储
|
|
|
})
|
|
|
```
|
|
|
|
|
|
### 2. 配置 Agent 使用 ApprovalMiddleware
|
|
|
|
|
|
```go
|
|
|
agent, err := deep.New(ctx, &deep.Config{
|
|
|
// ... 其他配置
|
|
|
Handlers: []adk.ChatModelAgentMiddleware{
|
|
|
&approvalMiddleware{}, // 添加审批中间件
|
|
|
&safeToolMiddleware{}, // 将 Tool 错误转换为字符串(中断类错误会继续向上抛出)
|
|
|
},
|
|
|
})
|
|
|
```
|
|
|
|
|
|
### 3. 处理中断事件
|
|
|
|
|
|
```go
|
|
|
checkPointID := sessionID
|
|
|
|
|
|
events := runner.Run(ctx, history, adk.WithCheckPointID(checkPointID))
|
|
|
content, interruptInfo, err := printAndCollectAssistantFromEvents(events)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
if interruptInfo != nil {
|
|
|
// 注意:建议使用同一个 stdin reader 同时读取「用户输入」与「审批 y/n」
|
|
|
// 避免审批输入被当成下一轮 you> 的消息
|
|
|
content, err = handleInterrupt(ctx, runner, checkPointID, interruptInfo, reader)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
_ = session.Append(schema.AssistantMessage(content, nil))
|
|
|
```
|
|
|
|
|
|
## Interrupt/Resume 执行流程
|
|
|
|
|
|
```
|
|
|
┌─────────────────────────────────────────┐
|
|
|
│ 用户:执行命令 echo hello │
|
|
|
└─────────────────────────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ Agent 分析意图 │
|
|
|
│ 决定调用 execute │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ ApprovalMiddleware │
|
|
|
│ 拦截 Tool 调用 │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 触发 Interrupt │
|
|
|
│ 保存状态到 Store │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 返回 Interrupt 事件 │
|
|
|
│ 等待用户审批 │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 用户输入 y/n │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ runner.ResumeWith... │
|
|
|
│ 恢复执行 │
|
|
|
└──────────────────────┘
|
|
|
↓
|
|
|
┌──────────────────────┐
|
|
|
│ 执行 execute │
|
|
|
│ 或返回拒绝信息 │
|
|
|
└──────────────────────┘
|
|
|
```
|
|
|
|
|
|
## 本章小结
|
|
|
|
|
|
- **Interrupt**:Agent 的暂停机制,在关键操作前暂停等待确认
|
|
|
- **Resume**:恢复执行,用户确认后继续或拒绝后返回错误
|
|
|
- **ApprovalMiddleware**:通用审批中间件,拦截特定 Tool 调用
|
|
|
- **CheckPointStore**:保存中断状态,支持跨进程恢复
|
|
|
- **人机协作**:关键决策由人来确认,提高安全性
|
|
|
|
|
|
## 扩展思考
|
|
|
|
|
|
**其他 Interrupt 场景:**
|
|
|
- 多选项审批:用户选择多个选项之一
|
|
|
- 参数补全:用户提供缺失的参数
|
|
|
- 条件分支:用户决定执行路径
|
|
|
|
|
|
**审批策略:**
|
|
|
- 白名单:只审批敏感操作
|
|
|
- 黑名单:审批所有操作,除了安全的
|
|
|
- 动态规则:根据参数内容决定是否审批
|