You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
291 lines
13 KiB
Markdown
291 lines
13 KiB
Markdown
# BatchNode Example
|
|
|
|
This example demonstrates how to build a **BatchNode** component that processes multiple inputs through a Graph or Workflow with configurable concurrency and interrupt/resume support.
|
|
|
|
## Overview
|
|
|
|
BatchNode is a reusable component that:
|
|
- Accepts `[]I` (slice of inputs) and returns `[]O` (slice of outputs)
|
|
- Runs a Graph or Workflow for each input item
|
|
- Supports configurable concurrency (sequential or parallel)
|
|
- Handles errors and interrupts from individual tasks
|
|
- Integrates with Eino's callback and checkpoint systems
|
|
|
|
## Business Scenario
|
|
|
|
**Document Review Pipeline**: A compliance team needs to review multiple documents. Each document goes through an automated review workflow, with high-priority documents requiring human approval before completion.
|
|
|
|
## Quick Start
|
|
|
|
```bash
|
|
cd compose/batch
|
|
go run .
|
|
```
|
|
|
|
## Project Structure
|
|
|
|
```
|
|
compose/batch/
|
|
├── batch/
|
|
│ ├── types.go # Type definitions (NodeConfig, NodeInterruptState, etc.)
|
|
│ ├── options.go # Batch invocation options (WithInnerOptions)
|
|
│ ├── store.go # Internal checkpoint store for sub-tasks
|
|
│ └── node.go # Core BatchNode implementation
|
|
├── main.go # Example scenarios
|
|
└── README.md
|
|
```
|
|
|
|
## Key Concepts
|
|
|
|
### 1. Creating a BatchNode
|
|
|
|
```go
|
|
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
|
Name: "DocumentReviewer",
|
|
InnerTask: workflow, // Graph or Workflow
|
|
MaxConcurrency: 3, // 0=sequential, >0=parallel limit
|
|
InnerCompileOptions: []compose.GraphCompileOption{
|
|
compose.WithGraphName("SingleDocReview"),
|
|
},
|
|
})
|
|
```
|
|
|
|
### 2. Concurrency Control
|
|
|
|
| MaxConcurrency | Behavior |
|
|
|----------------|----------|
|
|
| `0` | Sequential: process one task at a time |
|
|
| `>0` | Concurrent: up to N parallel tasks (first task runs on main goroutine) |
|
|
|
|
### 3. Options
|
|
|
|
**Compile-time options** (in `NodeConfig.InnerCompileOptions`):
|
|
- Applied when compiling the inner Graph/Workflow
|
|
- Example: `compose.WithGraphName("...")`
|
|
|
|
**Request-time options** (via `batch.WithInnerOptions`):
|
|
- Applied to each inner task invocation
|
|
- Example: `compose.WithCallbacks(handler)`
|
|
|
|
```go
|
|
results, err := batchNode.Invoke(ctx, inputs,
|
|
batch.WithInnerOptions(
|
|
compose.WithCallbacks(progressHandler),
|
|
),
|
|
)
|
|
```
|
|
|
|
### 4. Error Handling
|
|
|
|
- **Normal errors**: BatchNode returns the first error encountered
|
|
- **Interrupt errors**: Collected and bundled via `compose.CompositeInterrupt`
|
|
|
|
### 5. Interrupt & Resume
|
|
|
|
BatchNode supports human-in-the-loop workflows:
|
|
|
|
```go
|
|
// In your inner workflow's lambda:
|
|
if needsHumanReview {
|
|
wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
|
|
if !wasInterrupted {
|
|
// First run: interrupt for human review
|
|
return Result{}, compose.Interrupt(ctx, map[string]string{
|
|
"document_id": docID,
|
|
"reason": "Requires human approval",
|
|
})
|
|
}
|
|
|
|
// Resume: get human decision
|
|
isTarget, hasData, decision := compose.GetResumeContext[*Decision](ctx)
|
|
if isTarget && hasData && decision != nil {
|
|
return Result{Approved: decision.Approved}, nil
|
|
}
|
|
}
|
|
```
|
|
|
|
Resume with approval decisions:
|
|
|
|
```go
|
|
// Extract interrupt contexts
|
|
info, _ := compose.ExtractInterruptInfo(err)
|
|
|
|
// Prepare resume data (keyed by interrupt ID)
|
|
resumeData := make(map[string]any)
|
|
for _, iCtx := range info.InterruptContexts {
|
|
resumeData[iCtx.ID] = &Decision{Approved: true}
|
|
}
|
|
|
|
// Resume
|
|
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
|
|
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))
|
|
```
|
|
|
|
## Scenarios
|
|
|
|
### Scenario 1: Basic Sequential Processing
|
|
Process documents one at a time with `MaxConcurrency: 0`.
|
|
|
|
### Scenario 2: Concurrent Processing
|
|
Process multiple documents in parallel with `MaxConcurrency: 3`.
|
|
|
|
### Scenario 3: With Compile Options
|
|
Configure inner workflow at compile time using `InnerCompileOptions`.
|
|
|
|
### Scenario 4: With Invoke Options (Callbacks)
|
|
Add callbacks for monitoring using `callbacks.InitCallbacks`.
|
|
|
|
### Scenario 5: Normal Error Handling
|
|
Demonstrates how BatchNode handles errors from individual tasks.
|
|
|
|
### Scenario 6: Interrupt & Resume
|
|
Human-in-the-loop workflow:
|
|
1. High-priority documents interrupt for human review
|
|
2. Extract interrupt contexts with document IDs
|
|
3. Resume with approval decisions using `BatchResumeWithData`
|
|
|
|
### Scenario 7: Parent Graph with Reduce Node
|
|
- Integrate BatchNode in a larger pipeline
|
|
- Use `WithInnerOptions` for progress tracking callbacks
|
|
- Reduce pattern: aggregate batch results into a summary report
|
|
|
|
## Key APIs Used
|
|
|
|
| API | Purpose |
|
|
|-----|---------|
|
|
| `compose.NewWorkflow` | Create inner workflow |
|
|
| `compose.AppendAddressSegment` | Create unique address for each sub-task |
|
|
| `compose.GetInterruptState` | Check if resuming from interrupt |
|
|
| `compose.GetResumeContext` | Get resume data for this component |
|
|
| `compose.Interrupt` | Interrupt execution for human input |
|
|
| `compose.CompositeInterrupt` | Bundle multiple interrupt errors |
|
|
| `compose.ExtractInterruptInfo` | Extract interrupt contexts from error |
|
|
| `compose.BatchResumeWithData` | Resume with data for multiple targets |
|
|
| `compose.WithCheckPointStore` | Enable checkpoint persistence |
|
|
| `compose.WithCheckPointID` | Identify checkpoint for resume |
|
|
| `callbacks.EnsureRunInfo` | Setup callback context |
|
|
| `callbacks.OnStart/OnEnd/OnError` | Trigger callbacks |
|
|
| `schema.RegisterName` | Register types for serialization |
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ BatchNode │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ Input: []ReviewRequest │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ Concurrency Control (MaxConcurrency) │ │
|
|
│ │ - Sequential (0): one at a time │ │
|
|
│ │ - Concurrent (>0): parallel with semaphore │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌─────────────────┼─────────────────┐ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
|
│ │ Inner Task │ │ Inner Task │ │ Inner Task │ │
|
|
│ │ (index: 0) │ │ (index: 1) │ │ (index: 2) │ │
|
|
│ │ │ │ │ │ │ │
|
|
│ │ Workflow/ │ │ Workflow/ │ │ Workflow/ │ │
|
|
│ │ Graph │ │ Graph │ │ Graph │ │
|
|
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
|
│ │ │ │ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ Result Collection │ │
|
|
│ │ - Success: store in outputs[index] │ │
|
|
│ │ - Error: return first error │ │
|
|
│ │ - Interrupt: collect for CompositeInterrupt │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ Output: []ReviewResult │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Interrupt & Resume Flow
|
|
|
|
```
|
|
First Invocation:
|
|
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
|
|
│ DOC-001 │ │ DOC-002 │ │ DOC-003 │ │ DOC-004 │
|
|
│ (high) │ │ (medium) │ │ (high) │ │ (low) │
|
|
└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
|
|
│ │ │ │
|
|
▼ ▼ ▼ ▼
|
|
INTERRUPT COMPLETE INTERRUPT COMPLETE
|
|
│ │ │ │
|
|
└───────────────┴───────────────┴───────────────┘
|
|
│
|
|
▼
|
|
┌────────────────────────┐
|
|
│ CompositeInterrupt │
|
|
│ - InterruptContexts │
|
|
│ - NodeInterruptState │
|
|
└────────────────────────┘
|
|
|
|
Resume with Approval:
|
|
┌──────────┐ ┌──────────┐
|
|
│ DOC-001 │ │ DOC-003 │
|
|
│ (high) │ │ (high) │
|
|
└────┬─────┘ └────┬─────┘
|
|
│ │
|
|
▼ ▼
|
|
GetResumeContext GetResumeContext
|
|
→ Decision{Approved: true} → Decision{Approved: true}
|
|
│ │
|
|
▼ ▼
|
|
COMPLETE COMPLETE
|
|
│ │
|
|
└───────────────┬───────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────┐
|
|
│ Final Results │
|
|
│ DOC-001: ✓ │
|
|
│ DOC-002: ✓ │
|
|
│ DOC-003: ✓ │
|
|
│ DOC-004: ✓ │
|
|
└─────────────────────┘
|
|
```
|
|
|
|
## Sample Output
|
|
|
|
```
|
|
=== Document Review Pipeline Example ===
|
|
|
|
--- Scenario 6: Interrupt & Resume ---
|
|
First invocation (will interrupt for high priority docs):
|
|
Document DOC-001 requires human review (high priority)
|
|
Document DOC-003 requires human review (high priority)
|
|
|
|
Interrupt detected! Found 2 interrupt context(s):
|
|
1. ID=fd49cbc4-deca-4f02-bdf9-02f921c0c1f5
|
|
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:0;...
|
|
DocumentID=DOC-001, Reason=High priority document requires human approval
|
|
2. ID=af4a3f99-2414-4d6c-9c06-b9b4b1786044
|
|
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:2;...
|
|
DocumentID=DOC-003, Reason=High priority document requires human approval
|
|
|
|
Resuming with approval decisions...
|
|
Document DOC-001 resumed with decision: approved=true
|
|
Document DOC-003 resumed with decision: approved=true
|
|
|
|
Final results after resume:
|
|
- DOC-001: approved=true, comments=Human review: Approved by supervisor
|
|
- DOC-002: approved=true, comments=Auto-approved (non-high priority)
|
|
- DOC-003: approved=true, comments=Human review: Approved by supervisor
|
|
- DOC-004: approved=true, comments=Auto-approved (non-high priority)
|
|
```
|
|
|
|
## Learn More
|
|
|
|
- [Eino Documentation](https://github.com/cloudwego/eino)
|
|
- [Compose Package](https://github.com/cloudwego/eino/tree/main/compose)
|
|
- [Human-in-the-Loop Examples](../graph/react_with_interrupt/)
|