--- title: "Chapter 8: Graph Tool (Complex Workflows)" --- The goal of this chapter is to understand the concept of Graph Tools, implement parallel chunk recall for large files, and introduce the compose package for building complex workflows. ## Code Location - Entry code: [cmd/ch08/main.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/cmd/ch08/main.go) - RAG implementation: [rag/rag.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/rag/rag.go) ## Prerequisites Same as Chapter 1: you need to configure an available ChatModel (OpenAI or Ark). ## Running In the `examples/quickstart/chatwitheino` directory, run: ```bash # Set the project root directory export PROJECT_ROOT=/path/to/your/project go run ./cmd/ch08 ``` Output example: ```text you> Please analyze the WebSocket handshake section in the RFC6455 document [assistant] Let me analyze the document for you... [tool call] answer_from_document(file_path: "rfc6455.txt", question: "WebSocket handshake process") [tool result] Found 3 relevant fragments, generating answer... [assistant] According to the RFC6455 document, the WebSocket handshake process is as follows... ``` ## From Simple Tools to Graph Tools: Why We Need Complex Workflows In Chapter 4, we created simple Tools where each Tool performs a single task. But in real-world scenarios, many tasks require multiple steps working together. **Limitations of simple Tools:** - Single responsibility: Each Tool does only one thing - No parallelism: Multiple independent tasks cannot execute simultaneously - Hard to reuse: Complex logic is difficult to split and compose **Important note: This chapter only showcases a small part of compose/graph/workflow capabilities.** From a broader perspective, Eino's `compose` package provides very general-purpose, deterministic orchestration capabilities: you can organize any system that requires "deterministic business flows" into an executable pipeline using `compose`'s Graph/Chain/Workflow. It can **natively orchestrate all Eino components** (such as ChatModel, Prompt, Tools, Retriever, Embedding, Indexer, etc.), with a complete **callback** system and **interrupt/resume + checkpoint** support. **The role of Graph Tools:** - **Graph Tool is a Tool-wrapped compose workflow**: Wraps compilable orchestration artifacts like `compose.Graph / compose.Chain / compose.Workflow` into a Tool that an Agent can call - **Supports parallelism/branching/composition**: Provided by compose (parallelism, branching, field mapping, subgraphs, etc.); Graph Tool simply exposes them as a Tool entry point - **Supports state management and persistence**: Passes data between nodes, and saves/restores run state via checkpoints - **Supports interrupt/resume**: Both workflow-internal interrupts (triggering interrupt within a node) and tool-level interrupt wrapping (nested interrupt scenarios) **Simple analogy:** - **Simple Tool** = "single-step operation" (read a file) - **Graph Tool** = "pipeline" (read -> chunk -> score -> filter -> generate answer) ## Key Concepts ### compose.Workflow `compose.Workflow` is the core component for building workflows in Eino: ```go wf := compose.NewWorkflow[Input, Output]() // Add nodes wf.AddLambdaNode("load", loadFunc).AddInput(compose.START) wf.AddLambdaNode("chunk", chunkFunc).AddInput("load") wf.AddLambdaNode("score", scoreFunc).AddInput("chunk") wf.AddLambdaNode("answer", answerFunc).AddInput("score") // Connect to end node wf.End().AddInput("answer") ``` **Core concepts:** - **Node**: A processing unit in the workflow - **Edge**: The data flow direction between nodes - **START**: The workflow entry point - **END**: The workflow exit point ### BatchNode `BatchNode` is used for parallel processing of multiple tasks: ```go scorer := batch.NewBatchNode(&batch.NodeConfig[Task, Result]{ Name: "ChunkScorer", InnerTask: scoreOneChunk, // Processing function for a single task MaxConcurrency: 5, // Maximum concurrency }) ``` **How it works:** 1. Receives a task list as input 2. Executes each task in parallel (limited by MaxConcurrency) 3. Collects and returns all results ### FieldMapping `FieldMapping` is used to pass data across nodes: ```go wf.AddLambdaNode("answer", answerFunc). AddInputWithOptions("filter", // Get data from the filter node []*compose.FieldMapping{compose.ToField("TopK")}, compose.WithNoDirectDependency()). AddInputWithOptions(compose.START, // Get data from the START node []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency()) ``` **Why do we need FieldMapping?** - Pass data between non-adjacent nodes - Merge multiple data sources into a single node - Rename data fields ## Graph Tool Implementation ### 1. Define Input/Output Structures ```go type Input struct { FilePath string `json:"file_path" jsonschema:"description=Absolute path to the document"` Question string `json:"question" jsonschema:"description=The question to answer"` } type Output struct { Answer string `json:"answer"` Sources []string `json:"sources"` } ``` ### 2. Build the Workflow ```go func buildWorkflow(cm model.BaseChatModel) *compose.Workflow[Input, Output] { wf := compose.NewWorkflow[Input, Output]() // load: Read the file wf.AddLambdaNode("load", compose.InvokableLambda( func(ctx context.Context, in Input) ([]*schema.Document, error) { data, err := os.ReadFile(in.FilePath) if err != nil { return nil, err } return []*schema.Document{{Content: string(data)}}, nil }, )).AddInput(compose.START) // chunk: Split into chunks wf.AddLambdaNode("chunk", compose.InvokableLambda( func(ctx context.Context, docs []*schema.Document) ([]*schema.Document, error) { var out []*schema.Document for _, d := range docs { out = append(out, splitIntoChunks(d.Content, 800)...) } return out, nil }, )).AddInput("load") // score: Parallel scoring scorer := batch.NewBatchNode(&batch.NodeConfig[scoreTask, scoredChunk]{ Name: "ChunkScorer", InnerTask: newScoreWorkflow(cm), MaxConcurrency: 5, }) wf.AddLambdaNode("score", compose.InvokableLambda( func(ctx context.Context, in scoreIn) ([]scoredChunk, error) { tasks := make([]scoreTask, len(in.Chunks)) for i, c := range in.Chunks { tasks[i] = scoreTask{Text: c.Content, Question: in.Question} } return scorer.Invoke(ctx, tasks) }, )). AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()). AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency()) // filter: Select top-k wf.AddLambdaNode("filter", compose.InvokableLambda( func(ctx context.Context, scored []scoredChunk) ([]scoredChunk, error) { sort.Slice(scored, func(i, j int) bool { return scored[i].Score > scored[j].Score }) // Return top-3 if len(scored) > 3 { scored = scored[:3] } return scored, nil }, )).AddInput("score") // answer: Generate the answer wf.AddLambdaNode("answer", compose.InvokableLambda( func(ctx context.Context, in synthIn) (Output, error) { return synthesize(ctx, cm, in) }, )). AddInputWithOptions("filter", []*compose.FieldMapping{compose.ToField("TopK")}, compose.WithNoDirectDependency()). AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency()) wf.End().AddInput("answer") return wf } ``` ### 3. Wrap as a Tool ```go func BuildTool(ctx context.Context, cm model.BaseChatModel) (tool.BaseTool, error) { wf := buildWorkflow(cm) return graphtool.NewInvokableGraphTool[Input, Output]( wf, "answer_from_document", "Search a large document for relevant content and synthesize an answer.", ) } ``` **Key code snippet (Note: this is a simplified code snippet that cannot be run directly. For the complete code, please refer to** [rag/rag.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/rag/rag.go)**)**: ```go // Build the workflow wf := compose.NewWorkflow[Input, Output]() // Add nodes wf.AddLambdaNode("load", loadFunc).AddInput(compose.START) wf.AddLambdaNode("chunk", chunkFunc).AddInput("load") wf.AddLambdaNode("score", scoreFunc). AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()). AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency()) // Wrap as a Tool return graphtool.NewInvokableGraphTool[Input, Output](wf, "answer_from_document", "...") ``` ## Graph Tool Execution Flow ``` +------------------------------------------+ | Input: file_path, question | +------------------------------------------+ | +------------------------+ | load: Read file | | Output: []*Document | +------------------------+ | +------------------------+ | chunk: Split into | | chunks | | Output: []*Document | +------------------------+ | +------------------------+ | score: Parallel | | scoring | | (MaxConcurrency=5) | | Output: []scoredChunk | +------------------------+ | +------------------------+ | filter: Select top-k | | Output: []scoredChunk | +------------------------+ | +------------------------+ | answer: Generate | | answer | | Output: Output | +------------------------+ | +------------------------+ | Return result | | {answer, sources} | +------------------------+ ``` ## Chapter Summary - **Graph Tool**: Wraps complex workflows as a Tool, supporting multi-step coordination - **compose.Workflow**: The core component for building workflows - **BatchNode**: Parallel processing of multiple tasks - **FieldMapping**: Passing data across nodes - **Interrupt/Resume support**: Graph Tools support the Checkpoint mechanism ## Further Thinking **Other Graph Tool applications:** - Multi-document RAG: Process multiple documents in parallel - Multi-model collaboration: Different models handle different tasks - Complex decision trees: Choose different branches based on conditions **Performance optimization:** - Adjust MaxConcurrency to control parallelism - Use caching to avoid redundant computation - Use streaming output to improve user experience