You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
13 KiB
Markdown

# BatchNode Example
This example demonstrates how to build a **BatchNode** component that processes multiple inputs through a Graph or Workflow with configurable concurrency and interrupt/resume support.
## Overview
BatchNode is a reusable component that:
- Accepts `[]I` (slice of inputs) and returns `[]O` (slice of outputs)
- Runs a Graph or Workflow for each input item
- Supports configurable concurrency (sequential or parallel)
- Handles errors and interrupts from individual tasks
- Integrates with Eino's callback and checkpoint systems
## Business Scenario
**Document Review Pipeline**: A compliance team needs to review multiple documents. Each document goes through an automated review workflow, with high-priority documents requiring human approval before completion.
## Quick Start
```bash
cd compose/batch
go run .
```
## Project Structure
```
compose/batch/
├── batch/
│ ├── types.go # Type definitions (NodeConfig, NodeInterruptState, etc.)
│ ├── options.go # Batch invocation options (WithInnerOptions)
│ ├── store.go # Internal checkpoint store for sub-tasks
│ └── node.go # Core BatchNode implementation
├── main.go # Example scenarios
└── README.md
```
## Key Concepts
### 1. Creating a BatchNode
```go
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "DocumentReviewer",
InnerTask: workflow, // Graph or Workflow
MaxConcurrency: 3, // 0=sequential, >0=parallel limit
InnerCompileOptions: []compose.GraphCompileOption{
compose.WithGraphName("SingleDocReview"),
},
})
```
### 2. Concurrency Control
| MaxConcurrency | Behavior |
|----------------|----------|
| `0` | Sequential: process one task at a time |
| `>0` | Concurrent: up to N parallel tasks (first task runs on main goroutine) |
### 3. Options
**Compile-time options** (in `NodeConfig.InnerCompileOptions`):
- Applied when compiling the inner Graph/Workflow
- Example: `compose.WithGraphName("...")`
**Request-time options** (via `batch.WithInnerOptions`):
- Applied to each inner task invocation
- Example: `compose.WithCallbacks(handler)`
```go
results, err := batchNode.Invoke(ctx, inputs,
batch.WithInnerOptions(
compose.WithCallbacks(progressHandler),
),
)
```
### 4. Error Handling
- **Normal errors**: BatchNode returns the first error encountered
- **Interrupt errors**: Collected and bundled via `compose.CompositeInterrupt`
### 5. Interrupt & Resume
BatchNode supports human-in-the-loop workflows:
```go
// In your inner workflow's lambda:
if needsHumanReview {
wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
if !wasInterrupted {
// First run: interrupt for human review
return Result{}, compose.Interrupt(ctx, map[string]string{
"document_id": docID,
"reason": "Requires human approval",
})
}
// Resume: get human decision
isTarget, hasData, decision := compose.GetResumeContext[*Decision](ctx)
if isTarget && hasData && decision != nil {
return Result{Approved: decision.Approved}, nil
}
}
```
Resume with approval decisions:
```go
// Extract interrupt contexts
info, _ := compose.ExtractInterruptInfo(err)
// Prepare resume data (keyed by interrupt ID)
resumeData := make(map[string]any)
for _, iCtx := range info.InterruptContexts {
resumeData[iCtx.ID] = &Decision{Approved: true}
}
// Resume
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))
```
## Scenarios
### Scenario 1: Basic Sequential Processing
Process documents one at a time with `MaxConcurrency: 0`.
### Scenario 2: Concurrent Processing
Process multiple documents in parallel with `MaxConcurrency: 3`.
### Scenario 3: With Compile Options
Configure inner workflow at compile time using `InnerCompileOptions`.
### Scenario 4: With Invoke Options (Callbacks)
Add callbacks for monitoring using `callbacks.InitCallbacks`.
### Scenario 5: Normal Error Handling
Demonstrates how BatchNode handles errors from individual tasks.
### Scenario 6: Interrupt & Resume
Human-in-the-loop workflow:
1. High-priority documents interrupt for human review
2. Extract interrupt contexts with document IDs
3. Resume with approval decisions using `BatchResumeWithData`
### Scenario 7: Parent Graph with Reduce Node
- Integrate BatchNode in a larger pipeline
- Use `WithInnerOptions` for progress tracking callbacks
- Reduce pattern: aggregate batch results into a summary report
## Key APIs Used
| API | Purpose |
|-----|---------|
| `compose.NewWorkflow` | Create inner workflow |
| `compose.AppendAddressSegment` | Create unique address for each sub-task |
| `compose.GetInterruptState` | Check if resuming from interrupt |
| `compose.GetResumeContext` | Get resume data for this component |
| `compose.Interrupt` | Interrupt execution for human input |
| `compose.CompositeInterrupt` | Bundle multiple interrupt errors |
| `compose.ExtractInterruptInfo` | Extract interrupt contexts from error |
| `compose.BatchResumeWithData` | Resume with data for multiple targets |
| `compose.WithCheckPointStore` | Enable checkpoint persistence |
| `compose.WithCheckPointID` | Identify checkpoint for resume |
| `callbacks.EnsureRunInfo` | Setup callback context |
| `callbacks.OnStart/OnEnd/OnError` | Trigger callbacks |
| `schema.RegisterName` | Register types for serialization |
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ BatchNode │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Input: []ReviewRequest │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Concurrency Control (MaxConcurrency) │ │
│ │ - Sequential (0): one at a time │ │
│ │ - Concurrent (>0): parallel with semaphore │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Inner Task │ │ Inner Task │ │ Inner Task │ │
│ │ (index: 0) │ │ (index: 1) │ │ (index: 2) │ │
│ │ │ │ │ │ │ │
│ │ Workflow/ │ │ Workflow/ │ │ Workflow/ │ │
│ │ Graph │ │ Graph │ │ Graph │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Result Collection │ │
│ │ - Success: store in outputs[index] │ │
│ │ - Error: return first error │ │
│ │ - Interrupt: collect for CompositeInterrupt │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Output: []ReviewResult │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Interrupt & Resume Flow
```
First Invocation:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ DOC-001 │ │ DOC-002 │ │ DOC-003 │ │ DOC-004 │
│ (high) │ │ (medium) │ │ (high) │ │ (low) │
└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │ │
▼ ▼ ▼ ▼
INTERRUPT COMPLETE INTERRUPT COMPLETE
│ │ │ │
└───────────────┴───────────────┴───────────────┘
┌────────────────────────┐
│ CompositeInterrupt │
│ - InterruptContexts │
│ - NodeInterruptState │
└────────────────────────┘
Resume with Approval:
┌──────────┐ ┌──────────┐
│ DOC-001 │ │ DOC-003 │
│ (high) │ │ (high) │
└────┬─────┘ └────┬─────┘
│ │
▼ ▼
GetResumeContext GetResumeContext
→ Decision{Approved: true} → Decision{Approved: true}
│ │
▼ ▼
COMPLETE COMPLETE
│ │
└───────────────┬───────────────┘
┌─────────────────────┐
│ Final Results │
│ DOC-001: ✓ │
│ DOC-002: ✓ │
│ DOC-003: ✓ │
│ DOC-004: ✓ │
└─────────────────────┘
```
## Sample Output
```
=== Document Review Pipeline Example ===
--- Scenario 6: Interrupt & Resume ---
First invocation (will interrupt for high priority docs):
Document DOC-001 requires human review (high priority)
Document DOC-003 requires human review (high priority)
Interrupt detected! Found 2 interrupt context(s):
1. ID=fd49cbc4-deca-4f02-bdf9-02f921c0c1f5
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:0;...
DocumentID=DOC-001, Reason=High priority document requires human approval
2. ID=af4a3f99-2414-4d6c-9c06-b9b4b1786044
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:2;...
DocumentID=DOC-003, Reason=High priority document requires human approval
Resuming with approval decisions...
Document DOC-001 resumed with decision: approved=true
Document DOC-003 resumed with decision: approved=true
Final results after resume:
- DOC-001: approved=true, comments=Human review: Approved by supervisor
- DOC-002: approved=true, comments=Auto-approved (non-high priority)
- DOC-003: approved=true, comments=Human review: Approved by supervisor
- DOC-004: approved=true, comments=Auto-approved (non-high priority)
```
## Learn More
- [Eino Documentation](https://github.com/cloudwego/eino)
- [Compose Package](https://github.com/cloudwego/eino/tree/main/compose)
- [Human-in-the-Loop Examples](../graph/react_with_interrupt/)