feat(compose): batch node
Change-Id: Ifc5038a5ef1ff5918bec2fc053a24c607d061feedrew/english
parent
9835e6abd6
commit
30b31e535f
@ -0,0 +1,290 @@
|
||||
# BatchNode Example
|
||||
|
||||
This example demonstrates how to build a **BatchNode** component that processes multiple inputs through a Graph or Workflow with configurable concurrency and interrupt/resume support.
|
||||
|
||||
## Overview
|
||||
|
||||
BatchNode is a reusable component that:
|
||||
- Accepts `[]I` (slice of inputs) and returns `[]O` (slice of outputs)
|
||||
- Runs a Graph or Workflow for each input item
|
||||
- Supports configurable concurrency (sequential or parallel)
|
||||
- Handles errors and interrupts from individual tasks
|
||||
- Integrates with Eino's callback and checkpoint systems
|
||||
|
||||
## Business Scenario
|
||||
|
||||
**Document Review Pipeline**: A compliance team needs to review multiple documents. Each document goes through an automated review workflow, with high-priority documents requiring human approval before completion.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
cd compose/batch
|
||||
go run .
|
||||
```
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
compose/batch/
|
||||
├── batch/
|
||||
│ ├── types.go # Type definitions (NodeConfig, NodeInterruptState, etc.)
|
||||
│ ├── options.go # Batch invocation options (WithInnerOptions)
|
||||
│ ├── store.go # Internal checkpoint store for sub-tasks
|
||||
│ └── node.go # Core BatchNode implementation
|
||||
├── main.go # Example scenarios
|
||||
└── README.md
|
||||
```
|
||||
|
||||
## Key Concepts
|
||||
|
||||
### 1. Creating a BatchNode
|
||||
|
||||
```go
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "DocumentReviewer",
|
||||
InnerTask: workflow, // Graph or Workflow
|
||||
MaxConcurrency: 3, // 0=sequential, >0=parallel limit
|
||||
InnerCompileOptions: []compose.GraphCompileOption{
|
||||
compose.WithGraphName("SingleDocReview"),
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
### 2. Concurrency Control
|
||||
|
||||
| MaxConcurrency | Behavior |
|
||||
|----------------|----------|
|
||||
| `0` | Sequential: process one task at a time |
|
||||
| `>0` | Concurrent: up to N parallel tasks (first task runs on main goroutine) |
|
||||
|
||||
### 3. Options
|
||||
|
||||
**Compile-time options** (in `NodeConfig.InnerCompileOptions`):
|
||||
- Applied when compiling the inner Graph/Workflow
|
||||
- Example: `compose.WithGraphName("...")`
|
||||
|
||||
**Request-time options** (via `batch.WithInnerOptions`):
|
||||
- Applied to each inner task invocation
|
||||
- Example: `compose.WithCallbacks(handler)`
|
||||
|
||||
```go
|
||||
results, err := batchNode.Invoke(ctx, inputs,
|
||||
batch.WithInnerOptions(
|
||||
compose.WithCallbacks(progressHandler),
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
### 4. Error Handling
|
||||
|
||||
- **Normal errors**: BatchNode returns the first error encountered
|
||||
- **Interrupt errors**: Collected and bundled via `compose.CompositeInterrupt`
|
||||
|
||||
### 5. Interrupt & Resume
|
||||
|
||||
BatchNode supports human-in-the-loop workflows:
|
||||
|
||||
```go
|
||||
// In your inner workflow's lambda:
|
||||
if needsHumanReview {
|
||||
wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
|
||||
if !wasInterrupted {
|
||||
// First run: interrupt for human review
|
||||
return Result{}, compose.Interrupt(ctx, map[string]string{
|
||||
"document_id": docID,
|
||||
"reason": "Requires human approval",
|
||||
})
|
||||
}
|
||||
|
||||
// Resume: get human decision
|
||||
isTarget, hasData, decision := compose.GetResumeContext[*Decision](ctx)
|
||||
if isTarget && hasData && decision != nil {
|
||||
return Result{Approved: decision.Approved}, nil
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Resume with approval decisions:
|
||||
|
||||
```go
|
||||
// Extract interrupt contexts
|
||||
info, _ := compose.ExtractInterruptInfo(err)
|
||||
|
||||
// Prepare resume data (keyed by interrupt ID)
|
||||
resumeData := make(map[string]any)
|
||||
for _, iCtx := range info.InterruptContexts {
|
||||
resumeData[iCtx.ID] = &Decision{Approved: true}
|
||||
}
|
||||
|
||||
// Resume
|
||||
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
|
||||
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))
|
||||
```
|
||||
|
||||
## Scenarios
|
||||
|
||||
### Scenario 1: Basic Sequential Processing
|
||||
Process documents one at a time with `MaxConcurrency: 0`.
|
||||
|
||||
### Scenario 2: Concurrent Processing
|
||||
Process multiple documents in parallel with `MaxConcurrency: 3`.
|
||||
|
||||
### Scenario 3: With Compile Options
|
||||
Configure inner workflow at compile time using `InnerCompileOptions`.
|
||||
|
||||
### Scenario 4: With Invoke Options (Callbacks)
|
||||
Add callbacks for monitoring using `callbacks.InitCallbacks`.
|
||||
|
||||
### Scenario 5: Normal Error Handling
|
||||
Demonstrates how BatchNode handles errors from individual tasks.
|
||||
|
||||
### Scenario 6: Interrupt & Resume
|
||||
Human-in-the-loop workflow:
|
||||
1. High-priority documents interrupt for human review
|
||||
2. Extract interrupt contexts with document IDs
|
||||
3. Resume with approval decisions using `BatchResumeWithData`
|
||||
|
||||
### Scenario 7: Parent Graph with Reduce Node
|
||||
- Integrate BatchNode in a larger pipeline
|
||||
- Use `WithInnerOptions` for progress tracking callbacks
|
||||
- Reduce pattern: aggregate batch results into a summary report
|
||||
|
||||
## Key APIs Used
|
||||
|
||||
| API | Purpose |
|
||||
|-----|---------|
|
||||
| `compose.NewWorkflow` | Create inner workflow |
|
||||
| `compose.AppendAddressSegment` | Create unique address for each sub-task |
|
||||
| `compose.GetInterruptState` | Check if resuming from interrupt |
|
||||
| `compose.GetResumeContext` | Get resume data for this component |
|
||||
| `compose.Interrupt` | Interrupt execution for human input |
|
||||
| `compose.CompositeInterrupt` | Bundle multiple interrupt errors |
|
||||
| `compose.ExtractInterruptInfo` | Extract interrupt contexts from error |
|
||||
| `compose.BatchResumeWithData` | Resume with data for multiple targets |
|
||||
| `compose.WithCheckPointStore` | Enable checkpoint persistence |
|
||||
| `compose.WithCheckPointID` | Identify checkpoint for resume |
|
||||
| `callbacks.EnsureRunInfo` | Setup callback context |
|
||||
| `callbacks.OnStart/OnEnd/OnError` | Trigger callbacks |
|
||||
| `schema.RegisterName` | Register types for serialization |
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ BatchNode │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ Input: []ReviewRequest │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ Concurrency Control (MaxConcurrency) │ │
|
||||
│ │ - Sequential (0): one at a time │ │
|
||||
│ │ - Concurrent (>0): parallel with semaphore │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ┌─────────────────┼─────────────────┐ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||||
│ │ Inner Task │ │ Inner Task │ │ Inner Task │ │
|
||||
│ │ (index: 0) │ │ (index: 1) │ │ (index: 2) │ │
|
||||
│ │ │ │ │ │ │ │
|
||||
│ │ Workflow/ │ │ Workflow/ │ │ Workflow/ │ │
|
||||
│ │ Graph │ │ Graph │ │ Graph │ │
|
||||
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
||||
│ │ │ │ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ Result Collection │ │
|
||||
│ │ - Success: store in outputs[index] │ │
|
||||
│ │ - Error: return first error │ │
|
||||
│ │ - Interrupt: collect for CompositeInterrupt │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ Output: []ReviewResult │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Interrupt & Resume Flow
|
||||
|
||||
```
|
||||
First Invocation:
|
||||
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
|
||||
│ DOC-001 │ │ DOC-002 │ │ DOC-003 │ │ DOC-004 │
|
||||
│ (high) │ │ (medium) │ │ (high) │ │ (low) │
|
||||
└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
|
||||
│ │ │ │
|
||||
▼ ▼ ▼ ▼
|
||||
INTERRUPT COMPLETE INTERRUPT COMPLETE
|
||||
│ │ │ │
|
||||
└───────────────┴───────────────┴───────────────┘
|
||||
│
|
||||
▼
|
||||
┌────────────────────────┐
|
||||
│ CompositeInterrupt │
|
||||
│ - InterruptContexts │
|
||||
│ - NodeInterruptState │
|
||||
└────────────────────────┘
|
||||
|
||||
Resume with Approval:
|
||||
┌──────────┐ ┌──────────┐
|
||||
│ DOC-001 │ │ DOC-003 │
|
||||
│ (high) │ │ (high) │
|
||||
└────┬─────┘ └────┬─────┘
|
||||
│ │
|
||||
▼ ▼
|
||||
GetResumeContext GetResumeContext
|
||||
→ Decision{Approved: true} → Decision{Approved: true}
|
||||
│ │
|
||||
▼ ▼
|
||||
COMPLETE COMPLETE
|
||||
│ │
|
||||
└───────────────┬───────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────┐
|
||||
│ Final Results │
|
||||
│ DOC-001: ✓ │
|
||||
│ DOC-002: ✓ │
|
||||
│ DOC-003: ✓ │
|
||||
│ DOC-004: ✓ │
|
||||
└─────────────────────┘
|
||||
```
|
||||
|
||||
## Sample Output
|
||||
|
||||
```
|
||||
=== Document Review Pipeline Example ===
|
||||
|
||||
--- Scenario 6: Interrupt & Resume ---
|
||||
First invocation (will interrupt for high priority docs):
|
||||
Document DOC-001 requires human review (high priority)
|
||||
Document DOC-003 requires human review (high priority)
|
||||
|
||||
Interrupt detected! Found 2 interrupt context(s):
|
||||
1. ID=fd49cbc4-deca-4f02-bdf9-02f921c0c1f5
|
||||
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:0;...
|
||||
DocumentID=DOC-001, Reason=High priority document requires human approval
|
||||
2. ID=af4a3f99-2414-4d6c-9c06-b9b4b1786044
|
||||
Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:2;...
|
||||
DocumentID=DOC-003, Reason=High priority document requires human approval
|
||||
|
||||
Resuming with approval decisions...
|
||||
Document DOC-001 resumed with decision: approved=true
|
||||
Document DOC-003 resumed with decision: approved=true
|
||||
|
||||
Final results after resume:
|
||||
- DOC-001: approved=true, comments=Human review: Approved by supervisor
|
||||
- DOC-002: approved=true, comments=Auto-approved (non-high priority)
|
||||
- DOC-003: approved=true, comments=Human review: Approved by supervisor
|
||||
- DOC-004: approved=true, comments=Auto-approved (non-high priority)
|
||||
```
|
||||
|
||||
## Learn More
|
||||
|
||||
- [Eino Documentation](https://github.com/cloudwego/eino)
|
||||
- [Compose Package](https://github.com/cloudwego/eino/tree/main/compose)
|
||||
- [Human-in-the-Loop Examples](../graph/react_with_interrupt/)
|
||||
@ -0,0 +1,275 @@
|
||||
/*
|
||||
* Copyright 2025 CloudWeGo Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/cloudwego/eino/callbacks"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
)
|
||||
|
||||
// Node is a batch processor that runs a Graph/Workflow for each input item.
|
||||
// It supports configurable concurrency, interrupt/resume, and callbacks.
|
||||
//
|
||||
// Type parameters:
|
||||
// - I: Input type for each item
|
||||
// - O: Output type for each item
|
||||
type Node[I, O any] struct {
|
||||
name string
|
||||
innerTask Compilable[I, O]
|
||||
maxConcurrency int
|
||||
innerCompileOptions []compose.GraphCompileOption
|
||||
}
|
||||
|
||||
// NewBatchNode creates a new batch processing node.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// batchNode := batch.NewBatchNode(&batch.NodeConfig[Request, Response]{
|
||||
// Name: "MyBatchProcessor",
|
||||
// InnerTask: myWorkflow,
|
||||
// MaxConcurrency: 5,
|
||||
// })
|
||||
func NewBatchNode[I, O any](config *NodeConfig[I, O]) *Node[I, O] {
|
||||
name := config.Name
|
||||
if name == "" {
|
||||
name = "Node"
|
||||
}
|
||||
return &Node[I, O]{
|
||||
name: name,
|
||||
innerTask: config.InnerTask,
|
||||
maxConcurrency: config.MaxConcurrency,
|
||||
innerCompileOptions: config.InnerCompileOptions,
|
||||
}
|
||||
}
|
||||
|
||||
// GetType returns the node name for callback identification.
|
||||
// Implements components.Typer interface.
|
||||
func (b *Node[I, O]) GetType() string {
|
||||
return b.name
|
||||
}
|
||||
|
||||
// IsCallbacksEnabled returns true to enable callback support.
|
||||
// Implements components.Checker interface.
|
||||
func (b *Node[I, O]) IsCallbacksEnabled() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Invoke processes all inputs through the inner task and returns results.
|
||||
// It handles concurrency, errors, and interrupts according to configuration.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: Context for cancellation and deadline
|
||||
// - inputs: Slice of input items to process
|
||||
// - opts: Optional batch options (e.g., WithInnerOptions)
|
||||
//
|
||||
// Returns:
|
||||
// - []O: Results in the same order as inputs
|
||||
// - error: First normal error, or CompositeInterrupt if any task interrupted
|
||||
func (b *Node[I, O]) Invoke(ctx context.Context, inputs []I, opts ...Option) ([]O, error) {
|
||||
batchOpts := applyBatchOptions(opts...)
|
||||
|
||||
// Setup callbacks for batch-level monitoring
|
||||
ctx = callbacks.EnsureRunInfo(ctx, b.name, ComponentOfBatchNode)
|
||||
ctx = callbacks.OnStart(ctx, &CallbackInput[I]{
|
||||
Inputs: inputs,
|
||||
MaxConcurrency: b.maxConcurrency,
|
||||
})
|
||||
|
||||
outputs, err := b.invoke(ctx, inputs, batchOpts)
|
||||
if err != nil {
|
||||
callbacks.OnError(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
callbacks.OnEnd(ctx, &CallbackOutput[O]{Outputs: outputs})
|
||||
return outputs, nil
|
||||
}
|
||||
|
||||
// invoke is the internal implementation of batch processing.
|
||||
func (b *Node[I, O]) invoke(ctx context.Context, inputs []I, batchOpts *options) ([]O, error) {
|
||||
// Check if this is a resume from a previous interrupt
|
||||
wasInterrupted, hasState, prevState := compose.GetInterruptState[*NodeInterruptState](ctx)
|
||||
|
||||
var store *batchBridgeStore
|
||||
var indicesToProcess []int
|
||||
var effectiveInputs []I
|
||||
|
||||
if wasInterrupted && hasState && prevState != nil {
|
||||
// RESUME PATH: Restore state from previous interrupt
|
||||
// Use fresh store (don't restore checkpoint data - it causes input issues)
|
||||
store = newBatchBridgeStore()
|
||||
indicesToProcess = prevState.InterruptedIndices
|
||||
|
||||
// Restore original inputs from interrupt state
|
||||
// (inputs parameter is nil during resume)
|
||||
effectiveInputs = make([]I, prevState.TotalCount)
|
||||
for i, v := range prevState.OriginalInputs {
|
||||
if typedInput, ok := v.(I); ok {
|
||||
effectiveInputs[i] = typedInput
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// FIRST RUN PATH: Process all inputs
|
||||
store = newBatchBridgeStore()
|
||||
effectiveInputs = inputs
|
||||
indicesToProcess = make([]int, len(inputs))
|
||||
for i := range inputs {
|
||||
indicesToProcess[i] = i
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate output slice
|
||||
outputs := make([]O, len(effectiveInputs))
|
||||
|
||||
// Restore completed results from previous run (if resuming)
|
||||
if wasInterrupted && hasState && prevState != nil {
|
||||
for idx, result := range prevState.CompletedResults {
|
||||
if idx < len(outputs) {
|
||||
if typedResult, ok := result.(O); ok {
|
||||
outputs[idx] = typedResult
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compile inner task with checkpoint store
|
||||
compileOpts := append([]compose.GraphCompileOption{
|
||||
compose.WithCheckPointStore(store),
|
||||
}, b.innerCompileOptions...)
|
||||
|
||||
runner, err := b.innerTask.Compile(ctx, compileOpts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to compile inner task: %w", err)
|
||||
}
|
||||
|
||||
// Nothing to process (all completed in previous run)
|
||||
if len(indicesToProcess) == 0 {
|
||||
return outputs, nil
|
||||
}
|
||||
|
||||
// Task result for collecting outputs from goroutines
|
||||
type taskResult struct {
|
||||
index int
|
||||
output O
|
||||
err error
|
||||
}
|
||||
|
||||
resultCh := make(chan taskResult, len(indicesToProcess))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// runTask executes a single inner task
|
||||
runTask := func(index int, input I) {
|
||||
defer wg.Done()
|
||||
|
||||
// Create sub-context with unique address segment for this task
|
||||
// This enables proper interrupt ID generation (e.g., "batch_process:0")
|
||||
subCtx := compose.AppendAddressSegment(ctx, AddressSegmentBatchProcess, strconv.Itoa(index))
|
||||
|
||||
// Combine checkpoint ID with user-provided inner options
|
||||
invokeOpts := append([]compose.Option{
|
||||
compose.WithCheckPointID(makeBatchCheckpointID(index)),
|
||||
}, batchOpts.innerOptions...)
|
||||
|
||||
output, taskErr := runner.Invoke(subCtx, input, invokeOpts...)
|
||||
resultCh <- taskResult{index: index, output: output, err: taskErr}
|
||||
}
|
||||
|
||||
// Execute tasks based on concurrency setting
|
||||
if b.maxConcurrency == 0 {
|
||||
// Sequential: Run one task at a time
|
||||
for _, idx := range indicesToProcess {
|
||||
wg.Add(1)
|
||||
runTask(idx, effectiveInputs[idx])
|
||||
}
|
||||
} else {
|
||||
// Concurrent: Use semaphore to limit parallelism
|
||||
sem := make(chan struct{}, b.maxConcurrency)
|
||||
|
||||
for i, idx := range indicesToProcess {
|
||||
wg.Add(1)
|
||||
if i == 0 {
|
||||
// First task runs on main goroutine (optimization)
|
||||
runTask(idx, effectiveInputs[idx])
|
||||
} else {
|
||||
// Subsequent tasks run in goroutines with semaphore
|
||||
go func(index int, input I) {
|
||||
sem <- struct{}{}
|
||||
defer func() { <-sem }()
|
||||
runTask(index, input)
|
||||
}(idx, effectiveInputs[idx])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close result channel when all tasks complete
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resultCh)
|
||||
}()
|
||||
|
||||
// Collect results and categorize errors
|
||||
var normalErr error
|
||||
var interruptErrs []error
|
||||
completedResults := make(map[int]any)
|
||||
interruptedIndices := make([]int, 0)
|
||||
|
||||
for result := range resultCh {
|
||||
if result.err != nil {
|
||||
if _, ok := compose.ExtractInterruptInfo(result.err); ok {
|
||||
// Interrupt error: collect for CompositeInterrupt
|
||||
interruptErrs = append(interruptErrs, result.err)
|
||||
interruptedIndices = append(interruptedIndices, result.index)
|
||||
} else if normalErr == nil {
|
||||
// Normal error: keep first one
|
||||
normalErr = fmt.Errorf("task %d failed: %w", result.index, result.err)
|
||||
}
|
||||
} else {
|
||||
// Success: store result
|
||||
outputs[result.index] = result.output
|
||||
completedResults[result.index] = result.output
|
||||
}
|
||||
}
|
||||
|
||||
// Return first normal error (if any)
|
||||
if normalErr != nil {
|
||||
return nil, normalErr
|
||||
}
|
||||
|
||||
// Return composite interrupt (if any tasks interrupted)
|
||||
if len(interruptErrs) > 0 {
|
||||
// Store original inputs for resume (inputs will be nil on resume call)
|
||||
originalInputs := make([]any, len(effectiveInputs))
|
||||
for i, v := range effectiveInputs {
|
||||
originalInputs[i] = v
|
||||
}
|
||||
state := &NodeInterruptState{
|
||||
OriginalInputs: originalInputs,
|
||||
CompletedResults: completedResults,
|
||||
InterruptedIndices: interruptedIndices,
|
||||
TotalCount: len(effectiveInputs),
|
||||
}
|
||||
// CompositeInterrupt bundles all interrupt errors with state for resume
|
||||
return nil, compose.CompositeInterrupt(ctx, nil, state, interruptErrs...)
|
||||
}
|
||||
|
||||
return outputs, nil
|
||||
}
|
||||
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2025 CloudWeGo Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package batch
|
||||
|
||||
import "github.com/cloudwego/eino/compose"
|
||||
|
||||
// options holds runtime configuration for a batch invocation.
|
||||
type options struct {
|
||||
// innerOptions are compose.Option values passed to each inner task invocation.
|
||||
// These are request-time options (vs compile-time options in NodeConfig).
|
||||
innerOptions []compose.Option
|
||||
}
|
||||
|
||||
// Option is a function that configures batch invocation options.
|
||||
type Option func(*options)
|
||||
|
||||
// WithInnerOptions passes compose.Option values to each inner task invocation.
|
||||
// Use this for request-time options like:
|
||||
// - compose.WithCallbacks: Add callbacks for progress tracking
|
||||
// - compose.WithMaxRunSteps: Limit execution steps per task
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// batchNode.Invoke(ctx, inputs,
|
||||
// batch.WithInnerOptions(
|
||||
// compose.WithCallbacks(progressHandler),
|
||||
// ),
|
||||
// )
|
||||
func WithInnerOptions(opts ...compose.Option) Option {
|
||||
return func(o *options) {
|
||||
o.innerOptions = append(o.innerOptions, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// applyBatchOptions creates an options struct from the given Option functions.
|
||||
func applyBatchOptions(opts ...Option) *options {
|
||||
o := &options{}
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
return o
|
||||
}
|
||||
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2025 CloudWeGo Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// batchBridgeStore implements compose.CheckPointStore for batch processing.
|
||||
// It stores checkpoint data keyed by batch index, allowing each sub-task
|
||||
// to have its own checkpoint namespace.
|
||||
//
|
||||
// This store is used internally by BatchNode and is not meant for external use.
|
||||
// For interrupt/resume, the BatchNode stores its state via CompositeInterrupt,
|
||||
// not through this checkpoint store.
|
||||
type batchBridgeStore struct {
|
||||
mu sync.RWMutex
|
||||
data map[int][]byte // index -> checkpoint data
|
||||
}
|
||||
|
||||
// newBatchBridgeStore creates a new empty checkpoint store.
|
||||
func newBatchBridgeStore() *batchBridgeStore {
|
||||
return &batchBridgeStore{
|
||||
data: make(map[int][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
// makeBatchCheckpointID creates a checkpoint ID for a given batch index.
|
||||
// Format: "batch_0", "batch_1", etc.
|
||||
func makeBatchCheckpointID(index int) string {
|
||||
return fmt.Sprintf("batch_%d", index)
|
||||
}
|
||||
|
||||
// parseBatchIndex extracts the batch index from a checkpoint ID.
|
||||
// Returns error if the ID format is invalid.
|
||||
func parseBatchIndex(checkPointID string) (int, error) {
|
||||
if !strings.HasPrefix(checkPointID, "batch_") {
|
||||
return 0, fmt.Errorf("invalid batch checkpoint ID: %s", checkPointID)
|
||||
}
|
||||
indexStr := strings.TrimPrefix(checkPointID, "batch_")
|
||||
return strconv.Atoi(indexStr)
|
||||
}
|
||||
|
||||
// Get retrieves checkpoint data for a batch index.
|
||||
// Implements compose.CheckPointStore interface.
|
||||
func (m *batchBridgeStore) Get(_ context.Context, checkPointID string) ([]byte, bool, error) {
|
||||
index, err := parseBatchIndex(checkPointID)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
data, ok := m.data[index]
|
||||
return data, ok, nil
|
||||
}
|
||||
|
||||
// Set stores checkpoint data for a batch index.
|
||||
// Implements compose.CheckPointStore interface.
|
||||
func (m *batchBridgeStore) Set(_ context.Context, checkPointID string, checkPoint []byte) error {
|
||||
index, err := parseBatchIndex(checkPointID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.data[index] = checkPoint
|
||||
return nil
|
||||
}
|
||||
@ -0,0 +1,106 @@
|
||||
/*
|
||||
* Copyright 2025 CloudWeGo Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Package batch provides a BatchNode implementation for processing multiple inputs
|
||||
// through a Graph or Workflow with configurable concurrency and interrupt/resume support.
|
||||
//
|
||||
// Key features:
|
||||
// - Generic batch processing: Accept []I, return []O
|
||||
// - Configurable concurrency: Sequential (0) or concurrent with limit (>0)
|
||||
// - Interrupt handling: Collects interrupts from sub-tasks using CompositeInterrupt
|
||||
// - Resume support: Restores state and only re-runs interrupted tasks
|
||||
// - Callbacks: Implements Typer and Checker interfaces for callback support
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cloudwego/eino/components"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register NodeInterruptState for serialization during checkpoint save/restore.
|
||||
// This is required for the interrupt state to be properly persisted.
|
||||
schema.RegisterName[*NodeInterruptState]("batch.NodeInterruptState")
|
||||
}
|
||||
|
||||
// ComponentOfBatchNode is the component type identifier for callbacks.
|
||||
// Used by callbacks.EnsureRunInfo to identify this component in the callback chain.
|
||||
const ComponentOfBatchNode components.Component = "Batch"
|
||||
|
||||
// AddressSegmentBatchProcess is the address segment type for batch processing.
|
||||
// Used by compose.AppendAddressSegment to create unique addresses for each sub-task,
|
||||
// enabling proper interrupt ID generation (e.g., "batch_process:0", "batch_process:1").
|
||||
const AddressSegmentBatchProcess compose.AddressSegmentType = "batch_process"
|
||||
|
||||
// Compilable represents a Graph or Workflow that can be compiled into a Runnable.
|
||||
// Both compose.Graph and compose.Workflow implement this interface.
|
||||
type Compilable[I, O any] interface {
|
||||
Compile(ctx context.Context, opts ...compose.GraphCompileOption) (compose.Runnable[I, O], error)
|
||||
}
|
||||
|
||||
// NodeConfig contains configuration for creating a BatchNode.
|
||||
type NodeConfig[I, O any] struct {
|
||||
// Name is the node name used for callbacks and logging. Defaults to "Node" if empty.
|
||||
Name string
|
||||
|
||||
// InnerTask is the Graph or Workflow to run for each input item.
|
||||
// Must implement Compilable[I, O] interface.
|
||||
InnerTask Compilable[I, O]
|
||||
|
||||
// MaxConcurrency controls parallel execution:
|
||||
// - 0: Sequential processing (one task at a time)
|
||||
// - >0: Concurrent processing with this many parallel tasks
|
||||
// First task runs on main goroutine, rest run in goroutines
|
||||
MaxConcurrency int
|
||||
|
||||
// InnerCompileOptions are passed to InnerTask.Compile() for each invocation.
|
||||
// Use this for compile-time options like WithGraphName.
|
||||
InnerCompileOptions []compose.GraphCompileOption
|
||||
}
|
||||
|
||||
// NodeInterruptState stores the batch node's state when an interrupt occurs.
|
||||
// This state is persisted via CompositeInterrupt and restored on resume.
|
||||
type NodeInterruptState struct {
|
||||
// OriginalInputs stores all input items (as []any for serialization).
|
||||
// Required because inputs are not passed during resume invocation.
|
||||
OriginalInputs []any
|
||||
|
||||
// CompletedResults maps index -> result for tasks that completed before interrupt.
|
||||
// These results are restored directly without re-running the tasks.
|
||||
CompletedResults map[int]any
|
||||
|
||||
// InterruptedIndices lists which task indices were interrupted.
|
||||
// Only these tasks will be re-run on resume.
|
||||
InterruptedIndices []int
|
||||
|
||||
// TotalCount is the total number of input items.
|
||||
// Used to allocate the correct output slice size on resume.
|
||||
TotalCount int
|
||||
}
|
||||
|
||||
// CallbackInput is passed to callbacks.OnStart when batch processing begins.
|
||||
type CallbackInput[I any] struct {
|
||||
Inputs []I // All input items to be processed
|
||||
MaxConcurrency int // Configured concurrency limit
|
||||
}
|
||||
|
||||
// CallbackOutput is passed to callbacks.OnEnd when batch processing completes.
|
||||
type CallbackOutput[O any] struct {
|
||||
Outputs []O // All output results
|
||||
}
|
||||
@ -0,0 +1,611 @@
|
||||
/*
|
||||
* Copyright 2025 CloudWeGo Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// This example demonstrates the BatchNode component for processing multiple inputs
|
||||
// through a Graph or Workflow with configurable concurrency and interrupt/resume support.
|
||||
//
|
||||
// Business Scenario: Document Review Pipeline
|
||||
// A compliance team needs to review multiple documents. Each document goes through
|
||||
// an automated review workflow, with high-priority documents requiring human approval.
|
||||
//
|
||||
// Scenarios covered:
|
||||
// 1. Basic Sequential Processing - Process documents one at a time
|
||||
// 2. Concurrent Processing - Process multiple documents in parallel
|
||||
// 3. Compile Options - Configure inner workflow at compile time
|
||||
// 4. Invoke Options (Callbacks) - Add callbacks for monitoring
|
||||
// 5. Error Handling - Handle errors from individual tasks
|
||||
// 6. Interrupt & Resume - Human-in-the-loop for high-priority documents
|
||||
// 7. Parent Graph with Reduce - Integrate BatchNode in a larger pipeline
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cloudwego/eino/callbacks"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
|
||||
"github.com/cloudwego/eino-examples/compose/batch/batch"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register types for serialization (required for interrupt/resume checkpoint)
|
||||
schema.RegisterName[ReviewRequest]("batch_example.ReviewRequest")
|
||||
schema.RegisterName[ReviewResult]("batch_example.ReviewResult")
|
||||
schema.RegisterName[*ApprovalDecision]("batch_example.ApprovalDecision")
|
||||
schema.RegisterName[[]ReviewRequest]("batch_example.ReviewRequestSlice")
|
||||
schema.RegisterName[[]ReviewResult]("batch_example.ReviewResultSlice")
|
||||
}
|
||||
|
||||
// ReviewRequest represents a document to be reviewed
|
||||
type ReviewRequest struct {
|
||||
DocumentID string
|
||||
Content string
|
||||
Priority string // "high", "medium", "low"
|
||||
}
|
||||
|
||||
// ReviewResult contains the outcome of a document review
|
||||
type ReviewResult struct {
|
||||
DocumentID string
|
||||
Approved bool
|
||||
Score float64
|
||||
Comments string
|
||||
ReviewedAt time.Time
|
||||
}
|
||||
|
||||
// ReviewReport aggregates results from batch processing
|
||||
type ReviewReport struct {
|
||||
TotalDocuments int
|
||||
ApprovedCount int
|
||||
RejectedCount int
|
||||
AverageScore float64
|
||||
HighPriorityPass int
|
||||
Results []ReviewResult
|
||||
GeneratedAt time.Time
|
||||
}
|
||||
|
||||
// ApprovalDecision is the human decision for interrupted documents
|
||||
type ApprovalDecision struct {
|
||||
Approved bool
|
||||
Comments string
|
||||
}
|
||||
|
||||
// BatchReviewInput wraps documents with batch metadata
|
||||
type BatchReviewInput struct {
|
||||
Documents []ReviewRequest
|
||||
BatchName string
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
fmt.Println("=== Document Review Pipeline Example ===")
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 1: Basic Sequential Processing ---")
|
||||
runBasicSequential(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 2: Concurrent Processing ---")
|
||||
runConcurrent(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 3: With Compile Options ---")
|
||||
runWithCompileOptions(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 4: With Invoke Options (Callbacks) ---")
|
||||
runWithInvokeOptions(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 5: Normal Error Handling ---")
|
||||
runWithError(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 6: Interrupt & Resume ---")
|
||||
runInterruptAndResume(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("--- Scenario 7: Parent Graph with Reduce Node ---")
|
||||
runParentGraphWithReduce(ctx)
|
||||
fmt.Println()
|
||||
|
||||
fmt.Println("=== All Scenarios Completed ===")
|
||||
}
|
||||
|
||||
// createSampleDocuments generates test documents with rotating priorities
|
||||
func createSampleDocuments(count int) []ReviewRequest {
|
||||
priorities := []string{"high", "medium", "low"}
|
||||
docs := make([]ReviewRequest, count)
|
||||
for i := 0; i < count; i++ {
|
||||
docs[i] = ReviewRequest{
|
||||
DocumentID: fmt.Sprintf("DOC-%03d", i+1),
|
||||
Content: fmt.Sprintf("Document content for review #%d. This is a sample compliance document.", i+1),
|
||||
Priority: priorities[i%len(priorities)],
|
||||
}
|
||||
}
|
||||
return docs
|
||||
}
|
||||
|
||||
// createSimpleReviewWorkflow creates a basic document review workflow
|
||||
// that simulates automated review with random scoring
|
||||
func createSimpleReviewWorkflow() *compose.Workflow[ReviewRequest, ReviewResult] {
|
||||
workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
|
||||
|
||||
workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
|
||||
time.Sleep(50 * time.Millisecond) // Simulate processing time
|
||||
|
||||
score := 0.5 + rand.Float64()*0.5
|
||||
approved := score >= 0.7
|
||||
|
||||
return ReviewResult{
|
||||
DocumentID: req.DocumentID,
|
||||
Approved: approved,
|
||||
Score: score,
|
||||
Comments: fmt.Sprintf("Auto-reviewed document %s with priority %s", req.DocumentID, req.Priority),
|
||||
ReviewedAt: time.Now(),
|
||||
}, nil
|
||||
})).AddInput(compose.START)
|
||||
|
||||
workflow.End().AddInput("analyze")
|
||||
|
||||
return workflow
|
||||
}
|
||||
|
||||
// Scenario 1: Basic Sequential Processing
|
||||
// Demonstrates: MaxConcurrency=0 for sequential execution
|
||||
func runBasicSequential(ctx context.Context) {
|
||||
docs := createSampleDocuments(3)
|
||||
workflow := createSimpleReviewWorkflow()
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "SequentialReviewer",
|
||||
InnerTask: workflow,
|
||||
MaxConcurrency: 0, // Sequential: process one at a time
|
||||
})
|
||||
|
||||
start := time.Now()
|
||||
results, err := batchNode.Invoke(ctx, docs)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Processed %d documents sequentially in %v\n", len(results), elapsed)
|
||||
for _, r := range results {
|
||||
fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score)
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario 2: Concurrent Processing
|
||||
// Demonstrates: MaxConcurrency>0 for parallel execution with limit
|
||||
func runConcurrent(ctx context.Context) {
|
||||
docs := createSampleDocuments(5)
|
||||
workflow := createSimpleReviewWorkflow()
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "ConcurrentReviewer",
|
||||
InnerTask: workflow,
|
||||
MaxConcurrency: 3, // Concurrent: up to 3 parallel tasks
|
||||
})
|
||||
|
||||
start := time.Now()
|
||||
results, err := batchNode.Invoke(ctx, docs)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Processed %d documents concurrently (max 3) in %v\n", len(results), elapsed)
|
||||
for _, r := range results {
|
||||
fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score)
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario 3: With Compile Options
|
||||
// Demonstrates: InnerCompileOptions for configuring inner workflow at compile time
|
||||
func runWithCompileOptions(ctx context.Context) {
|
||||
docs := createSampleDocuments(3)
|
||||
workflow := createSimpleReviewWorkflow()
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "NamedReviewer",
|
||||
InnerTask: workflow,
|
||||
MaxConcurrency: 2,
|
||||
InnerCompileOptions: []compose.GraphCompileOption{
|
||||
compose.WithGraphName("SingleDocumentReviewWorkflow"), // Name for debugging/tracing
|
||||
},
|
||||
})
|
||||
|
||||
results, err := batchNode.Invoke(ctx, docs)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Processed %d documents with named inner workflow\n", len(results))
|
||||
for _, r := range results {
|
||||
fmt.Printf(" - %s: approved=%v\n", r.DocumentID, r.Approved)
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario 4: With Invoke Options (Callbacks)
|
||||
// Demonstrates: Callbacks for batch-level monitoring via context
|
||||
func runWithInvokeOptions(ctx context.Context) {
|
||||
docs := createSampleDocuments(3)
|
||||
workflow := createSimpleReviewWorkflow()
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "CallbackReviewer",
|
||||
InnerTask: workflow,
|
||||
MaxConcurrency: 0,
|
||||
})
|
||||
|
||||
// Create callback handler for monitoring
|
||||
handler := callbacks.NewHandlerBuilder().
|
||||
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
|
||||
fmt.Printf(" [Callback] OnStart: %s/%s\n", info.Component, info.Name)
|
||||
return ctx
|
||||
}).
|
||||
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
|
||||
fmt.Printf(" [Callback] OnEnd: %s/%s\n", info.Component, info.Name)
|
||||
return ctx
|
||||
}).
|
||||
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
|
||||
fmt.Printf(" [Callback] OnError: %s/%s - %v\n", info.Component, info.Name, err)
|
||||
return ctx
|
||||
}).
|
||||
Build()
|
||||
|
||||
// Initialize callbacks in context
|
||||
ctxWithCallback := callbacks.InitCallbacks(ctx, nil, handler)
|
||||
|
||||
results, err := batchNode.Invoke(ctxWithCallback, docs)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Processed %d documents with callbacks\n", len(results))
|
||||
}
|
||||
|
||||
// Scenario 5: Normal Error Handling
|
||||
// Demonstrates: How BatchNode handles errors from individual tasks
|
||||
func runWithError(ctx context.Context) {
|
||||
workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
|
||||
|
||||
workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
|
||||
// Simulate validation failure for specific document
|
||||
if req.DocumentID == "DOC-002" {
|
||||
return ReviewResult{}, fmt.Errorf("validation failed for document %s: content too short", req.DocumentID)
|
||||
}
|
||||
return ReviewResult{
|
||||
DocumentID: req.DocumentID,
|
||||
Approved: true,
|
||||
Score: 0.9,
|
||||
Comments: "Passed validation",
|
||||
ReviewedAt: time.Now(),
|
||||
}, nil
|
||||
})).AddInput(compose.START)
|
||||
|
||||
workflow.End().AddInput("analyze")
|
||||
|
||||
docs := createSampleDocuments(3)
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "ErrorHandlingReviewer",
|
||||
InnerTask: workflow,
|
||||
MaxConcurrency: 0,
|
||||
})
|
||||
|
||||
results, err := batchNode.Invoke(ctx, docs)
|
||||
if err != nil {
|
||||
fmt.Printf("Expected error occurred: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Results: %v\n", results)
|
||||
}
|
||||
|
||||
// Scenario 6: Interrupt & Resume
|
||||
// Demonstrates: Human-in-the-loop workflow using compose.Interrupt and compose.BatchResumeWithData
|
||||
//
|
||||
// Flow:
|
||||
// 1. First invocation: High-priority documents interrupt for human review
|
||||
// 2. Extract interrupt contexts with document IDs
|
||||
// 3. Resume with approval decisions using BatchResumeWithData
|
||||
// 4. Interrupted tasks complete with human decisions
|
||||
func runInterruptAndResume(ctx context.Context) {
|
||||
innerWorkflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
|
||||
|
||||
innerWorkflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
|
||||
if req.Priority == "high" {
|
||||
// Check if this is a resume from previous interrupt
|
||||
wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
|
||||
if !wasInterrupted {
|
||||
// First run: interrupt for human review
|
||||
fmt.Printf(" Document %s requires human review (high priority)\n", req.DocumentID)
|
||||
return ReviewResult{}, compose.Interrupt(ctx, map[string]string{
|
||||
"document_id": req.DocumentID,
|
||||
"reason": "High priority document requires human approval",
|
||||
})
|
||||
}
|
||||
|
||||
// Resume: check if we have approval decision
|
||||
isResumeTarget, hasData, decision := compose.GetResumeContext[*ApprovalDecision](ctx)
|
||||
if isResumeTarget && hasData && decision != nil {
|
||||
fmt.Printf(" Document %s resumed with decision: approved=%v\n", req.DocumentID, decision.Approved)
|
||||
return ReviewResult{
|
||||
DocumentID: req.DocumentID,
|
||||
Approved: decision.Approved,
|
||||
Score: 1.0,
|
||||
Comments: fmt.Sprintf("Human review: %s", decision.Comments),
|
||||
ReviewedAt: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Still waiting for decision
|
||||
return ReviewResult{}, compose.Interrupt(ctx, map[string]string{
|
||||
"document_id": req.DocumentID,
|
||||
"reason": "Still waiting for human approval",
|
||||
})
|
||||
}
|
||||
|
||||
// Non-high priority: auto-approve
|
||||
return ReviewResult{
|
||||
DocumentID: req.DocumentID,
|
||||
Approved: true,
|
||||
Score: 0.85,
|
||||
Comments: "Auto-approved (non-high priority)",
|
||||
ReviewedAt: time.Now(),
|
||||
}, nil
|
||||
})).AddInput(compose.START)
|
||||
|
||||
innerWorkflow.End().AddInput("analyze")
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "InterruptReviewer",
|
||||
InnerTask: innerWorkflow,
|
||||
MaxConcurrency: 0,
|
||||
})
|
||||
|
||||
// Wrap BatchNode in a parent graph for proper interrupt handling
|
||||
parentGraph := compose.NewGraph[[]ReviewRequest, []ReviewResult]()
|
||||
_ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) {
|
||||
return batchNode.Invoke(ctx, inputs)
|
||||
}))
|
||||
_ = parentGraph.AddEdge(compose.START, "batch_review")
|
||||
_ = parentGraph.AddEdge("batch_review", compose.END)
|
||||
|
||||
// Compile with checkpoint store for state persistence
|
||||
store := newMemoryCheckpointStore()
|
||||
runner, err := parentGraph.Compile(ctx,
|
||||
compose.WithGraphName("InterruptResumeDemo"),
|
||||
compose.WithCheckPointStore(store),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to compile graph: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
docs := []ReviewRequest{
|
||||
{DocumentID: "DOC-001", Content: "Content 1", Priority: "high"},
|
||||
{DocumentID: "DOC-002", Content: "Content 2", Priority: "medium"},
|
||||
{DocumentID: "DOC-003", Content: "Content 3", Priority: "high"},
|
||||
{DocumentID: "DOC-004", Content: "Content 4", Priority: "low"},
|
||||
}
|
||||
|
||||
checkpointID := "interrupt-resume-demo-001"
|
||||
|
||||
// Step 1: First invocation - high priority docs will interrupt
|
||||
fmt.Println("First invocation (will interrupt for high priority docs):")
|
||||
results, err := runner.Invoke(ctx, docs, compose.WithCheckPointID(checkpointID))
|
||||
|
||||
if err != nil {
|
||||
// Step 2: Extract interrupt info
|
||||
info, infoOk := compose.ExtractInterruptInfo(err)
|
||||
if infoOk && len(info.InterruptContexts) > 0 {
|
||||
fmt.Printf("\n Interrupt detected! Found %d interrupt context(s):\n", len(info.InterruptContexts))
|
||||
|
||||
// Step 3: Prepare resume data with approval decisions
|
||||
resumeData := make(map[string]any)
|
||||
for i, iCtx := range info.InterruptContexts {
|
||||
infoMap, _ := iCtx.Info.(map[string]string)
|
||||
docID := infoMap["document_id"]
|
||||
fmt.Printf(" %d. ID=%s\n", i+1, iCtx.ID)
|
||||
fmt.Printf(" Address=%v\n", iCtx.Address)
|
||||
fmt.Printf(" DocumentID=%s, Reason=%s\n", docID, infoMap["reason"])
|
||||
resumeData[iCtx.ID] = &ApprovalDecision{
|
||||
Approved: true,
|
||||
Comments: fmt.Sprintf("Approved by supervisor for %s", docID),
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Resume with approval decisions
|
||||
fmt.Println("\n Resuming with approval decisions...")
|
||||
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
|
||||
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))
|
||||
if err != nil {
|
||||
fmt.Printf(" Resume error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("\n Final results after resume:")
|
||||
for _, r := range results {
|
||||
fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments)
|
||||
}
|
||||
return
|
||||
}
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("\nFinal results:")
|
||||
for _, r := range results {
|
||||
fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments)
|
||||
}
|
||||
}
|
||||
|
||||
// memoryCheckpointStore is a simple in-memory checkpoint store for demos
|
||||
type memoryCheckpointStore struct {
|
||||
mu sync.RWMutex
|
||||
data map[string][]byte
|
||||
}
|
||||
|
||||
func newMemoryCheckpointStore() *memoryCheckpointStore {
|
||||
return &memoryCheckpointStore{
|
||||
data: make(map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryCheckpointStore) Get(_ context.Context, id string) ([]byte, bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
data, ok := m.data[id]
|
||||
return data, ok, nil
|
||||
}
|
||||
|
||||
func (m *memoryCheckpointStore) Set(_ context.Context, id string, data []byte) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.data[id] = data
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scenario 7: Parent Graph with Reduce Node
|
||||
// Demonstrates:
|
||||
// - BatchNode as a node in a larger pipeline
|
||||
// - WithInnerOptions for passing runtime options (callbacks) to inner tasks
|
||||
// - Reduce pattern: aggregate batch results into a summary report
|
||||
func runParentGraphWithReduce(ctx context.Context) {
|
||||
innerWorkflow := createSimpleReviewWorkflow()
|
||||
|
||||
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
|
||||
Name: "BatchDocumentReviewer",
|
||||
InnerTask: innerWorkflow,
|
||||
MaxConcurrency: 3,
|
||||
InnerCompileOptions: []compose.GraphCompileOption{
|
||||
compose.WithGraphName("SingleDocReview"),
|
||||
},
|
||||
})
|
||||
|
||||
parentGraph := compose.NewGraph[BatchReviewInput, ReviewReport]()
|
||||
|
||||
// Node 1: Preprocess - extract documents from batch input
|
||||
_ = parentGraph.AddLambdaNode("preprocess", compose.InvokableLambda(func(ctx context.Context, input BatchReviewInput) ([]ReviewRequest, error) {
|
||||
fmt.Printf(" Preprocessing batch '%s' with %d documents\n", input.BatchName, len(input.Documents))
|
||||
return input.Documents, nil
|
||||
}))
|
||||
|
||||
// Node 2: Batch Review - process all documents with progress tracking
|
||||
_ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) {
|
||||
fmt.Printf(" Starting batch review of %d documents\n", len(inputs))
|
||||
|
||||
// Progress tracking with atomic counter (thread-safe for concurrent processing)
|
||||
var completedCount int32
|
||||
totalCount := int32(len(inputs))
|
||||
|
||||
// Create callback for progress tracking
|
||||
reviewProgressHandler := callbacks.NewHandlerBuilder().
|
||||
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
|
||||
if info.Component == "Workflow" {
|
||||
current := atomic.AddInt32(&completedCount, 1)
|
||||
fmt.Printf(" [Progress] %d/%d documents reviewed\n", current, totalCount)
|
||||
}
|
||||
return ctx
|
||||
}).
|
||||
Build()
|
||||
|
||||
// Use WithInnerOptions to pass callbacks to each inner task
|
||||
return batchNode.Invoke(ctx, inputs,
|
||||
batch.WithInnerOptions(compose.WithCallbacks(reviewProgressHandler)),
|
||||
)
|
||||
}))
|
||||
|
||||
// Node 3: Reduce - aggregate results into a report
|
||||
_ = parentGraph.AddLambdaNode("reduce", compose.InvokableLambda(func(ctx context.Context, results []ReviewResult) (ReviewReport, error) {
|
||||
fmt.Printf(" Reducing %d results into report\n", len(results))
|
||||
|
||||
report := ReviewReport{
|
||||
TotalDocuments: len(results),
|
||||
Results: results,
|
||||
GeneratedAt: time.Now(),
|
||||
}
|
||||
|
||||
var totalScore float64
|
||||
for _, r := range results {
|
||||
totalScore += r.Score
|
||||
if r.Approved {
|
||||
report.ApprovedCount++
|
||||
} else {
|
||||
report.RejectedCount++
|
||||
}
|
||||
}
|
||||
|
||||
if len(results) > 0 {
|
||||
report.AverageScore = totalScore / float64(len(results))
|
||||
}
|
||||
|
||||
return report, nil
|
||||
}))
|
||||
|
||||
// Connect nodes: preprocess -> batch_review -> reduce
|
||||
_ = parentGraph.AddEdge(compose.START, "preprocess")
|
||||
_ = parentGraph.AddEdge("preprocess", "batch_review")
|
||||
_ = parentGraph.AddEdge("batch_review", "reduce")
|
||||
_ = parentGraph.AddEdge("reduce", compose.END)
|
||||
|
||||
runner, err := parentGraph.Compile(ctx, compose.WithGraphName("DocumentReviewPipeline"))
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to compile parent graph: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
input := BatchReviewInput{
|
||||
Documents: createSampleDocuments(5),
|
||||
BatchName: "Q4-Compliance-Review",
|
||||
}
|
||||
|
||||
fmt.Println("Running document review pipeline:")
|
||||
report, err := runner.Invoke(ctx, input)
|
||||
if err != nil {
|
||||
fmt.Printf("Pipeline error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("\n=== Review Report ===")
|
||||
fmt.Printf(" Total Documents: %d\n", report.TotalDocuments)
|
||||
fmt.Printf(" Approved: %d\n", report.ApprovedCount)
|
||||
fmt.Printf(" Rejected: %d\n", report.RejectedCount)
|
||||
fmt.Printf(" Average Score: %.2f\n", report.AverageScore)
|
||||
fmt.Printf(" Generated At: %s\n", report.GeneratedAt.Format(time.RFC3339))
|
||||
fmt.Println("\n Individual Results:")
|
||||
for _, r := range report.Results {
|
||||
status := "✓"
|
||||
if !r.Approved {
|
||||
status = "✗"
|
||||
}
|
||||
fmt.Printf(" %s %s (score: %.2f)\n", status, r.DocumentID, r.Score)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue