You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
8.7 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/bytedance/sonic"
"github.com/cloudwego/eino-examples/flow/agent/multiagent/plan_execute/debug"
"github.com/cloudwego/eino-examples/flow/agent/multiagent/plan_execute/tools"
clc "github.com/cloudwego/eino-ext/callbacks/cozeloop"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino-ext/components/model/deepseek"
callbacks2 "github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/flow/agent"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino/utils/callbacks"
"github.com/coze-dev/cozeloop-go"
)
func main() {
cozeloopApiToken := os.Getenv("COZELOOP_API_TOKEN")
cozeloopWorkspaceID := os.Getenv("COZELOOP_WORKSPACE_ID") // use cozeloop trace, from https://loop.coze.cn/open/docs/cozeloop/go-sdk#4a8c980e
ctx := context.Background()
var handlers []callbacks2.Handler
if cozeloopApiToken != "" && cozeloopWorkspaceID != "" {
client, err := cozeloop.NewClient(
cozeloop.WithAPIToken(cozeloopApiToken),
cozeloop.WithWorkspaceID(cozeloopWorkspaceID),
)
if err != nil {
panic(err)
}
defer client.Close(ctx)
handlers = append(handlers, clc.NewLoopHandler(client))
}
callbacks2.AppendGlobalHandlers(handlers...)
deepSeekModel, err := deepseek.NewChatModel(ctx, &deepseek.ChatModelConfig{
Model: os.Getenv("DEEPSEEK_MODEL_NAME"),
APIKey: os.Getenv("DEEPSEEK_API_KEY"),
BaseURL: os.Getenv("DEEPSEEK_BASE_URL"),
})
if err != nil {
log.Fatalf("new DeepSeek model failed: %v", err)
}
arkModel, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("ARK_MODEL_NAME"),
})
if err != nil {
log.Fatalf("new Ark model failed: %v", err)
}
toolsConfig, err := tools.GetTools(ctx)
if err != nil {
log.Fatalf("get tools config failed: %v", err)
}
// 创建多智能体的配置system prompt 都用默认值
config := &Config{
// planner 在调试时大部分场景不需要真的去生成,可以用 mock 输出替代
PlannerModel: &debug.ChatModelDebugDecorator{
Model: deepSeekModel,
},
ExecutorModel: arkModel,
ToolsConfig: compose.ToolsNodeConfig{Tools: toolsConfig},
ReviserModel: &debug.ChatModelDebugDecorator{
Model: deepSeekModel,
},
}
planExecuteAgent, err := NewMultiAgent(ctx, config)
if err != nil {
log.Fatalf("new plan execute multi agent failed: %v", err)
}
printer := newIntermediateOutputPrinter() // 创建一个中间结果打印器
printer.printStream() // 开始异步输出到 console
handler := printer.toCallbackHandler() // 转化为 Eino 框架的 callback handler
// 以流式方式调用多智能体,实际的 OutputStream 不再需要关注,因为所有输出都由 intermediateOutputPrinter 处理了
_, err = planExecuteAgent.Stream(ctx, []*schema.Message{schema.UserMessage("我们一家三口去乐园玩,孩子身高 120 cm预算 2000 元,希望能尽可能多的看表演,游乐设施则比较偏爱刺激项目,希望能在一天内尽可能多体验不同的活动,请帮忙规划一个可操作的一日行程。我们会在乐园开门的时候入场,玩到晚上闭园的时候。")},
agent.WithComposeOptions(compose.WithCallbacks(handler)), // 将中间结果打印的 callback handler 注入进来
// 给 planner 指定 mock 输出
//agent.WithComposeOptions(compose.WithChatModelOption(debug.WithDebugOutput(schema.AssistantMessage(debug.PlannerOutput, nil))).DesignateNode(nodeKeyPlanner)),
// 给 reviser 指定 mock 输出
//agent.WithComposeOptions(compose.WithChatModelOption(debug.WithDebugOutput(schema.AssistantMessage("最终答案", nil))).DesignateNode(nodeKeyReviser)),
)
if err != nil {
log.Fatalf("stream error: %v", err)
}
printer.wait() // 等待所有输出都处理完再结束
time.Sleep(3 * time.Second) // 确保trace上报后再结束
}
type coloredString struct {
str string
code string
}
// intermediateOutputPrinter 利用 Eino 的 callback 机制,收集多智能体各步骤的实时输出.
type intermediateOutputPrinter struct {
ch chan coloredString
currentAgentName string // 当前智能体名称
agentReasoning map[string]bool // 智能体处在“推理”阶段还是“最终答案”阶段
mu sync.Mutex
wg sync.WaitGroup
}
func newIntermediateOutputPrinter() *intermediateOutputPrinter {
return &intermediateOutputPrinter{
ch: make(chan coloredString),
agentReasoning: map[string]bool{
nodeKeyPlanner: false,
nodeKeyExecutor: false,
nodeKeyReviser: false,
},
}
}
func (s *intermediateOutputPrinter) printStream() {
go func() {
for m := range s.ch {
fmt.Print(m.code + m.str + Reset)
}
}()
}
func (s *intermediateOutputPrinter) toCallbackHandler() callbacks2.Handler {
return callbacks.NewHandlerHelper().ChatModel(&callbacks.ModelCallbackHandler{
OnEndWithStreamOutput: s.onChatModelEndWithStreamOutput,
}).Tool(&callbacks.ToolCallbackHandler{
OnStart: s.onToolStart,
OnEnd: s.onToolEnd,
}).Handler()
}
func (s *intermediateOutputPrinter) wait() {
s.wg.Wait()
}
const (
Reset = "\033[0m"
Red = "\033[31m"
Green = "\033[32m"
Yellow = "\033[33m"
Blue = "\033[34m"
White = "\033[97m"
Cyan = "\033[36m"
Gray = "\033[37m"
)
// onChatModelEndWithStreamOutput 当 ChatModel 结束时,获取它的流式输出并格式化处理.
func (s *intermediateOutputPrinter) onChatModelEndWithStreamOutput(ctx context.Context, runInfo *callbacks2.RunInfo, output *schema.StreamReader[*model.CallbackOutput]) context.Context {
name := runInfo.Name
if name != s.currentAgentName {
s.ch <- coloredString{fmt.Sprintf("\n\n=======\n%s: \n=======\n", name), Cyan}
s.currentAgentName = name
}
s.wg.Add(1)
go func() {
defer output.Close()
defer s.wg.Done()
for {
chunk, err := output.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Fatalf("internal error: %s\n", err)
}
if len(chunk.Message.Content) > 0 {
if s.agentReasoning[name] { // 切换到最终答案阶段
s.ch <- coloredString{"\nanswer begin: \n", Green}
s.mu.Lock()
s.agentReasoning[name] = false
s.mu.Unlock()
}
s.ch <- coloredString{chunk.Message.Content, Yellow}
} else if reasoningContent, ok := deepseek.GetReasoningContent(chunk.Message); ok {
if !s.agentReasoning[name] { // 切换到推理阶段
s.ch <- coloredString{"\nreasoning begin: \n", Green}
s.mu.Lock()
s.agentReasoning[name] = true
s.mu.Unlock()
}
s.ch <- coloredString{reasoningContent, White}
}
}
}()
return ctx
}
// onToolStart 当 Tool 执行开始时,获取并输出调用信息.
func (s *intermediateOutputPrinter) onToolStart(ctx context.Context, info *callbacks2.RunInfo, input *tool.CallbackInput) context.Context {
arguments := make(map[string]any)
err := sonic.Unmarshal([]byte(input.ArgumentsInJSON), &arguments)
if err != nil {
s.ch <- coloredString{fmt.Sprintf("\ncall %s: %s\n", info.Name, input.ArgumentsInJSON), Red}
return ctx
}
formatted, err := sonic.MarshalIndent(arguments, " ", " ")
if err != nil {
s.ch <- coloredString{fmt.Sprintf("\ncall %s: %s\n", info.Name, input.ArgumentsInJSON), Red}
return ctx
}
s.ch <- coloredString{fmt.Sprintf("\ncall %s: %s\n", info.Name, string(formatted)), Red}
return ctx
}
// onToolEnd 当 Tool 执行结束时,获取并输出返回结果.
func (s *intermediateOutputPrinter) onToolEnd(ctx context.Context, info *callbacks2.RunInfo, output *tool.CallbackOutput) context.Context {
response := make(map[string]any)
err := sonic.Unmarshal([]byte(output.Response), &response)
if err != nil {
s.ch <- coloredString{fmt.Sprintf("\ncall %s: %s\n", info.Name, output.Response), Blue}
return ctx
}
formatted, err := sonic.MarshalIndent(response, " ", " ")
if err != nil {
s.ch <- coloredString{fmt.Sprintf("\ncall %s: %s\n", info.Name, output.Response), Blue}
return ctx
}
s.ch <- coloredString{fmt.Sprintf("\ncall %s result: %s\n", info.Name, string(formatted)), Blue}
return ctx
}