package main import ( "bufio" "context" "errors" "flag" "fmt" "io" "log" "os" "strings" "github.com/cloudwego/eino-ext/components/model/openai" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/schema" ) // sourceEnv a crude .env file reader func sourceEnv() { file, err := os.Open(".env") if err != nil { fmt.Println(".env not found or cannot be read") return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { split_env := strings.Split(scanner.Text(), "=") if len(split_env) != 2 { log.Fatal(".env file expexted '=' to delimit key and value. Check .env file for proper format.") } fmt.Printf("Setting from .env: %s\n", split_env[0]) os.Setenv(split_env[0], split_env[1]) } if err := scanner.Err(); err != nil { log.Fatal(err) } } type AgentConfig struct { BaseUrl string ModelId string ApiKey string } func NewAgentConfig() *AgentConfig { url, ok := os.LookupEnv("OPENAI_BASE_URL") if !ok { log.Fatal("An OPENAI_BASE_URL must be specified as an environment variable.") } modelId, ok := os.LookupEnv("OPENAI_API_MODEL") if !ok { log.Fatal("A model id must be specified with OPENAI_API_MODEL as an environment variable.") } apiKey, ok := os.LookupEnv("OPENAI_API_KEY") if !ok { fmt.Println("No API found as OPENAI_API_KEY. Using dummy value") apiKey = "dummyvalue" } return &AgentConfig{url, modelId, apiKey} } func main() { sourceEnv() var instruction string flag.StringVar(&instruction, "instruction", "You are a helpful assistant.", "Set a system prompt for your agent.") flag.Parse() ctx := context.Background() agentCfg := NewAgentConfig() cm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: agentCfg.ModelId, BaseURL: agentCfg.BaseUrl, APIKey: agentCfg.ApiKey, }) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } // Introducing agent agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ Name: "Ch02ChatModelAgent", Description: "A minimal ChatAgent with in-memory multi-turn history.", Instruction: instruction, Model: cm, }) // A runner manages the agent's life cyle including start up, interruption. // Are a unified entrypoint of an agent and enables additional features like checkpoints for interruption /// recovery. I also converts the Agent's Event Stream into a consumable AysncIterator[*AgentEvent] runner := adk.NewRunner(ctx, adk.RunnerConfig{ Agent: agent, EnableStreaming: true, }) history := make([]*schema.Message, 0, 16) scanner := bufio.NewScanner(os.Stdin) // Note there is an alternative runner.Query method which will also produce an AysncIterator[*AgentEvent] // Each Run innvocation creates a new iterator. Iterators cannot be reused once consumed. for { _, _ = fmt.Fprint(os.Stdout, "you> ") if !scanner.Scan() { break } line := strings.TrimSpace(scanner.Text()) if line == "" { break } history = append(history, schema.UserMessage(line)) events := runner.Run(ctx, history) content, err := printAndCollectAssistantFromEvents(events) if err != nil { log.Fatal(err) } history = append(history, schema.AssistantMessage(content, nil)) } if err := scanner.Err(); err != nil { log.Fatal(err) } } func printAndCollectAssistantFromEvents(events *adk.AsyncIterator[*adk.AgentEvent]) (string, error) { var sb strings.Builder for { event, ok := events.Next() if !ok { break } if event.Err != nil { return "", event.Err } if event.Output == nil || event.Output.MessageOutput == nil { continue } msgVariant := event.Output.MessageOutput if msgVariant.Role != schema.Assistant { continue } if msgVariant.IsStreaming { // msgVariant.MessageStream.SetAutomaticClose() for { frame, err := msgVariant.MessageStream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return "", err } if frame != nil && frame.Content != "" { sb.WriteString(frame.Content) _, _ = fmt.Fprint(os.Stdout, frame.Content) } } _, _ = fmt.Fprintln(os.Stdout) continue } if msgVariant.Message != nil { sb.WriteString(msgVariant.Message.Content) _, _ = fmt.Fprintln(os.Stdout, msgVariant.Message.Content) } else { _, _ = fmt.Fprint(os.Stdout) } } return sb.String(), nil }