You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
11 KiB
Markdown

---
title: "Chapter 8: Graph Tool (Complex Workflows)"
---
The goal of this chapter is to understand the concept of Graph Tools, implement parallel chunk recall for large files, and introduce the compose package for building complex workflows.
## Code Location
- Entry code: [cmd/ch08/main.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/cmd/ch08/main.go)
- RAG implementation: [rag/rag.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/rag/rag.go)
## Prerequisites
Same as Chapter 1: you need to configure an available ChatModel (OpenAI or Ark).
## Running
In the `examples/quickstart/chatwitheino` directory, run:
```bash
# Set the project root directory
export PROJECT_ROOT=/path/to/your/project
go run ./cmd/ch08
```
Output example:
```text
you> Please analyze the WebSocket handshake section in the RFC6455 document
[assistant] Let me analyze the document for you...
[tool call] answer_from_document(file_path: "rfc6455.txt", question: "WebSocket handshake process")
[tool result] Found 3 relevant fragments, generating answer...
[assistant] According to the RFC6455 document, the WebSocket handshake process is as follows...
```
## From Simple Tools to Graph Tools: Why We Need Complex Workflows
In Chapter 4, we created simple Tools where each Tool performs a single task. But in real-world scenarios, many tasks require multiple steps working together.
**Limitations of simple Tools:**
- Single responsibility: Each Tool does only one thing
- No parallelism: Multiple independent tasks cannot execute simultaneously
- Hard to reuse: Complex logic is difficult to split and compose
**Important note: This chapter only showcases a small part of compose/graph/workflow capabilities.**
From a broader perspective, Eino's `compose` package provides very general-purpose, deterministic orchestration capabilities: you can organize any system that requires "deterministic business flows" into an executable pipeline using `compose`'s Graph/Chain/Workflow. It can **natively orchestrate all Eino components** (such as ChatModel, Prompt, Tools, Retriever, Embedding, Indexer, etc.), with a complete **callback** system and **interrupt/resume + checkpoint** support.
**The role of Graph Tools:**
- **Graph Tool is a Tool-wrapped compose workflow**: Wraps compilable orchestration artifacts like `compose.Graph / compose.Chain / compose.Workflow` into a Tool that an Agent can call
- **Supports parallelism/branching/composition**: Provided by compose (parallelism, branching, field mapping, subgraphs, etc.); Graph Tool simply exposes them as a Tool entry point
- **Supports state management and persistence**: Passes data between nodes, and saves/restores run state via checkpoints
- **Supports interrupt/resume**: Both workflow-internal interrupts (triggering interrupt within a node) and tool-level interrupt wrapping (nested interrupt scenarios)
**Simple analogy:**
- **Simple Tool** = "single-step operation" (read a file)
- **Graph Tool** = "pipeline" (read -> chunk -> score -> filter -> generate answer)
## Key Concepts
### compose.Workflow
`compose.Workflow` is the core component for building workflows in Eino:
```go
wf := compose.NewWorkflow[Input, Output]()
// Add nodes
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).AddInput("chunk")
wf.AddLambdaNode("answer", answerFunc).AddInput("score")
// Connect to end node
wf.End().AddInput("answer")
```
**Core concepts:**
- **Node**: A processing unit in the workflow
- **Edge**: The data flow direction between nodes
- **START**: The workflow entry point
- **END**: The workflow exit point
### BatchNode
`BatchNode` is used for parallel processing of multiple tasks:
```go
scorer := batch.NewBatchNode(&batch.NodeConfig[Task, Result]{
Name: "ChunkScorer",
InnerTask: scoreOneChunk, // Processing function for a single task
MaxConcurrency: 5, // Maximum concurrency
})
```
**How it works:**
1. Receives a task list as input
2. Executes each task in parallel (limited by MaxConcurrency)
3. Collects and returns all results
### FieldMapping
`FieldMapping` is used to pass data across nodes:
```go
wf.AddLambdaNode("answer", answerFunc).
AddInputWithOptions("filter", // Get data from the filter node
[]*compose.FieldMapping{compose.ToField("TopK")},
compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, // Get data from the START node
[]*compose.FieldMapping{compose.MapFields("Question", "Question")},
compose.WithNoDirectDependency())
```
**Why do we need FieldMapping?**
- Pass data between non-adjacent nodes
- Merge multiple data sources into a single node
- Rename data fields
## Graph Tool Implementation
### 1. Define Input/Output Structures
```go
type Input struct {
FilePath string `json:"file_path" jsonschema:"description=Absolute path to the document"`
Question string `json:"question" jsonschema:"description=The question to answer"`
}
type Output struct {
Answer string `json:"answer"`
Sources []string `json:"sources"`
}
```
### 2. Build the Workflow
```go
func buildWorkflow(cm model.BaseChatModel) *compose.Workflow[Input, Output] {
wf := compose.NewWorkflow[Input, Output]()
// load: Read the file
wf.AddLambdaNode("load", compose.InvokableLambda(
func(ctx context.Context, in Input) ([]*schema.Document, error) {
data, err := os.ReadFile(in.FilePath)
if err != nil {
return nil, err
}
return []*schema.Document{{Content: string(data)}}, nil
},
)).AddInput(compose.START)
// chunk: Split into chunks
wf.AddLambdaNode("chunk", compose.InvokableLambda(
func(ctx context.Context, docs []*schema.Document) ([]*schema.Document, error) {
var out []*schema.Document
for _, d := range docs {
out = append(out, splitIntoChunks(d.Content, 800)...)
}
return out, nil
},
)).AddInput("load")
// score: Parallel scoring
scorer := batch.NewBatchNode(&batch.NodeConfig[scoreTask, scoredChunk]{
Name: "ChunkScorer",
InnerTask: newScoreWorkflow(cm),
MaxConcurrency: 5,
})
wf.AddLambdaNode("score", compose.InvokableLambda(
func(ctx context.Context, in scoreIn) ([]scoredChunk, error) {
tasks := make([]scoreTask, len(in.Chunks))
for i, c := range in.Chunks {
tasks[i] = scoreTask{Text: c.Content, Question: in.Question}
}
return scorer.Invoke(ctx, tasks)
},
)).
AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
// filter: Select top-k
wf.AddLambdaNode("filter", compose.InvokableLambda(
func(ctx context.Context, scored []scoredChunk) ([]scoredChunk, error) {
sort.Slice(scored, func(i, j int) bool {
return scored[i].Score > scored[j].Score
})
// Return top-3
if len(scored) > 3 {
scored = scored[:3]
}
return scored, nil
},
)).AddInput("score")
// answer: Generate the answer
wf.AddLambdaNode("answer", compose.InvokableLambda(
func(ctx context.Context, in synthIn) (Output, error) {
return synthesize(ctx, cm, in)
},
)).
AddInputWithOptions("filter", []*compose.FieldMapping{compose.ToField("TopK")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
wf.End().AddInput("answer")
return wf
}
```
### 3. Wrap as a Tool
```go
func BuildTool(ctx context.Context, cm model.BaseChatModel) (tool.BaseTool, error) {
wf := buildWorkflow(cm)
return graphtool.NewInvokableGraphTool[Input, Output](
wf,
"answer_from_document",
"Search a large document for relevant content and synthesize an answer.",
)
}
```
**Key code snippet (Note: this is a simplified code snippet that cannot be run directly. For the complete code, please refer to** [rag/rag.go](https://github.com/cloudwego/eino-examples/blob/main/quickstart/chatwitheino/rag/rag.go)**)**:
```go
// Build the workflow
wf := compose.NewWorkflow[Input, Output]()
// Add nodes
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).
AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
// Wrap as a Tool
return graphtool.NewInvokableGraphTool[Input, Output](wf, "answer_from_document", "...")
```
## Graph Tool Execution Flow
```
+------------------------------------------+
| Input: file_path, question |
+------------------------------------------+
|
+------------------------+
| load: Read file |
| Output: []*Document |
+------------------------+
|
+------------------------+
| chunk: Split into |
| chunks |
| Output: []*Document |
+------------------------+
|
+------------------------+
| score: Parallel |
| scoring |
| (MaxConcurrency=5) |
| Output: []scoredChunk |
+------------------------+
|
+------------------------+
| filter: Select top-k |
| Output: []scoredChunk |
+------------------------+
|
+------------------------+
| answer: Generate |
| answer |
| Output: Output |
+------------------------+
|
+------------------------+
| Return result |
| {answer, sources} |
+------------------------+
```
## Chapter Summary
- **Graph Tool**: Wraps complex workflows as a Tool, supporting multi-step coordination
- **compose.Workflow**: The core component for building workflows
- **BatchNode**: Parallel processing of multiple tasks
- **FieldMapping**: Passing data across nodes
- **Interrupt/Resume support**: Graph Tools support the Checkpoint mechanism
## Further Thinking
**Other Graph Tool applications:**
- Multi-document RAG: Process multiple documents in parallel
- Multi-model collaboration: Different models handle different tasks
- Complex decision trees: Choose different branches based on conditions
**Performance optimization:**
- Adjust MaxConcurrency to control parallelism
- Use caching to avoid redundant computation
- Use streaming output to improve user experience