diff --git a/compose/batch/README.md b/compose/batch/README.md new file mode 100644 index 0000000..3fda325 --- /dev/null +++ b/compose/batch/README.md @@ -0,0 +1,290 @@ +# BatchNode Example + +This example demonstrates how to build a **BatchNode** component that processes multiple inputs through a Graph or Workflow with configurable concurrency and interrupt/resume support. + +## Overview + +BatchNode is a reusable component that: +- Accepts `[]I` (slice of inputs) and returns `[]O` (slice of outputs) +- Runs a Graph or Workflow for each input item +- Supports configurable concurrency (sequential or parallel) +- Handles errors and interrupts from individual tasks +- Integrates with Eino's callback and checkpoint systems + +## Business Scenario + +**Document Review Pipeline**: A compliance team needs to review multiple documents. Each document goes through an automated review workflow, with high-priority documents requiring human approval before completion. + +## Quick Start + +```bash +cd compose/batch +go run . +``` + +## Project Structure + +``` +compose/batch/ +├── batch/ +│ ├── types.go # Type definitions (NodeConfig, NodeInterruptState, etc.) +│ ├── options.go # Batch invocation options (WithInnerOptions) +│ ├── store.go # Internal checkpoint store for sub-tasks +│ └── node.go # Core BatchNode implementation +├── main.go # Example scenarios +└── README.md +``` + +## Key Concepts + +### 1. Creating a BatchNode + +```go +batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "DocumentReviewer", + InnerTask: workflow, // Graph or Workflow + MaxConcurrency: 3, // 0=sequential, >0=parallel limit + InnerCompileOptions: []compose.GraphCompileOption{ + compose.WithGraphName("SingleDocReview"), + }, +}) +``` + +### 2. Concurrency Control + +| MaxConcurrency | Behavior | +|----------------|----------| +| `0` | Sequential: process one task at a time | +| `>0` | Concurrent: up to N parallel tasks (first task runs on main goroutine) | + +### 3. Options + +**Compile-time options** (in `NodeConfig.InnerCompileOptions`): +- Applied when compiling the inner Graph/Workflow +- Example: `compose.WithGraphName("...")` + +**Request-time options** (via `batch.WithInnerOptions`): +- Applied to each inner task invocation +- Example: `compose.WithCallbacks(handler)` + +```go +results, err := batchNode.Invoke(ctx, inputs, + batch.WithInnerOptions( + compose.WithCallbacks(progressHandler), + ), +) +``` + +### 4. Error Handling + +- **Normal errors**: BatchNode returns the first error encountered +- **Interrupt errors**: Collected and bundled via `compose.CompositeInterrupt` + +### 5. Interrupt & Resume + +BatchNode supports human-in-the-loop workflows: + +```go +// In your inner workflow's lambda: +if needsHumanReview { + wasInterrupted, _, _ := compose.GetInterruptState[any](ctx) + if !wasInterrupted { + // First run: interrupt for human review + return Result{}, compose.Interrupt(ctx, map[string]string{ + "document_id": docID, + "reason": "Requires human approval", + }) + } + + // Resume: get human decision + isTarget, hasData, decision := compose.GetResumeContext[*Decision](ctx) + if isTarget && hasData && decision != nil { + return Result{Approved: decision.Approved}, nil + } +} +``` + +Resume with approval decisions: + +```go +// Extract interrupt contexts +info, _ := compose.ExtractInterruptInfo(err) + +// Prepare resume data (keyed by interrupt ID) +resumeData := make(map[string]any) +for _, iCtx := range info.InterruptContexts { + resumeData[iCtx.ID] = &Decision{Approved: true} +} + +// Resume +resumeCtx := compose.BatchResumeWithData(ctx, resumeData) +results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID)) +``` + +## Scenarios + +### Scenario 1: Basic Sequential Processing +Process documents one at a time with `MaxConcurrency: 0`. + +### Scenario 2: Concurrent Processing +Process multiple documents in parallel with `MaxConcurrency: 3`. + +### Scenario 3: With Compile Options +Configure inner workflow at compile time using `InnerCompileOptions`. + +### Scenario 4: With Invoke Options (Callbacks) +Add callbacks for monitoring using `callbacks.InitCallbacks`. + +### Scenario 5: Normal Error Handling +Demonstrates how BatchNode handles errors from individual tasks. + +### Scenario 6: Interrupt & Resume +Human-in-the-loop workflow: +1. High-priority documents interrupt for human review +2. Extract interrupt contexts with document IDs +3. Resume with approval decisions using `BatchResumeWithData` + +### Scenario 7: Parent Graph with Reduce Node +- Integrate BatchNode in a larger pipeline +- Use `WithInnerOptions` for progress tracking callbacks +- Reduce pattern: aggregate batch results into a summary report + +## Key APIs Used + +| API | Purpose | +|-----|---------| +| `compose.NewWorkflow` | Create inner workflow | +| `compose.AppendAddressSegment` | Create unique address for each sub-task | +| `compose.GetInterruptState` | Check if resuming from interrupt | +| `compose.GetResumeContext` | Get resume data for this component | +| `compose.Interrupt` | Interrupt execution for human input | +| `compose.CompositeInterrupt` | Bundle multiple interrupt errors | +| `compose.ExtractInterruptInfo` | Extract interrupt contexts from error | +| `compose.BatchResumeWithData` | Resume with data for multiple targets | +| `compose.WithCheckPointStore` | Enable checkpoint persistence | +| `compose.WithCheckPointID` | Identify checkpoint for resume | +| `callbacks.EnsureRunInfo` | Setup callback context | +| `callbacks.OnStart/OnEnd/OnError` | Trigger callbacks | +| `schema.RegisterName` | Register types for serialization | + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ BatchNode │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Input: []ReviewRequest │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Concurrency Control (MaxConcurrency) │ │ +│ │ - Sequential (0): one at a time │ │ +│ │ - Concurrent (>0): parallel with semaphore │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────┼─────────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Inner Task │ │ Inner Task │ │ Inner Task │ │ +│ │ (index: 0) │ │ (index: 1) │ │ (index: 2) │ │ +│ │ │ │ │ │ │ │ +│ │ Workflow/ │ │ Workflow/ │ │ Workflow/ │ │ +│ │ Graph │ │ Graph │ │ Graph │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Result Collection │ │ +│ │ - Success: store in outputs[index] │ │ +│ │ - Error: return first error │ │ +│ │ - Interrupt: collect for CompositeInterrupt │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Output: []ReviewResult │ │ +│ └─────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Interrupt & Resume Flow + +``` +First Invocation: +┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ +│ DOC-001 │ │ DOC-002 │ │ DOC-003 │ │ DOC-004 │ +│ (high) │ │ (medium) │ │ (high) │ │ (low) │ +└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ + │ │ │ │ + ▼ ▼ ▼ ▼ + INTERRUPT COMPLETE INTERRUPT COMPLETE + │ │ │ │ + └───────────────┴───────────────┴───────────────┘ + │ + ▼ + ┌────────────────────────┐ + │ CompositeInterrupt │ + │ - InterruptContexts │ + │ - NodeInterruptState │ + └────────────────────────┘ + +Resume with Approval: +┌──────────┐ ┌──────────┐ +│ DOC-001 │ │ DOC-003 │ +│ (high) │ │ (high) │ +└────┬─────┘ └────┬─────┘ + │ │ + ▼ ▼ + GetResumeContext GetResumeContext + → Decision{Approved: true} → Decision{Approved: true} + │ │ + ▼ ▼ + COMPLETE COMPLETE + │ │ + └───────────────┬───────────────┘ + │ + ▼ + ┌─────────────────────┐ + │ Final Results │ + │ DOC-001: ✓ │ + │ DOC-002: ✓ │ + │ DOC-003: ✓ │ + │ DOC-004: ✓ │ + └─────────────────────┘ +``` + +## Sample Output + +``` +=== Document Review Pipeline Example === + +--- Scenario 6: Interrupt & Resume --- +First invocation (will interrupt for high priority docs): + Document DOC-001 requires human review (high priority) + Document DOC-003 requires human review (high priority) + + Interrupt detected! Found 2 interrupt context(s): + 1. ID=fd49cbc4-deca-4f02-bdf9-02f921c0c1f5 + Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:0;... + DocumentID=DOC-001, Reason=High priority document requires human approval + 2. ID=af4a3f99-2414-4d6c-9c06-b9b4b1786044 + Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:2;... + DocumentID=DOC-003, Reason=High priority document requires human approval + + Resuming with approval decisions... + Document DOC-001 resumed with decision: approved=true + Document DOC-003 resumed with decision: approved=true + + Final results after resume: + - DOC-001: approved=true, comments=Human review: Approved by supervisor + - DOC-002: approved=true, comments=Auto-approved (non-high priority) + - DOC-003: approved=true, comments=Human review: Approved by supervisor + - DOC-004: approved=true, comments=Auto-approved (non-high priority) +``` + +## Learn More + +- [Eino Documentation](https://github.com/cloudwego/eino) +- [Compose Package](https://github.com/cloudwego/eino/tree/main/compose) +- [Human-in-the-Loop Examples](../graph/react_with_interrupt/) diff --git a/compose/batch/batch/node.go b/compose/batch/batch/node.go new file mode 100644 index 0000000..cdc202d --- /dev/null +++ b/compose/batch/batch/node.go @@ -0,0 +1,275 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package batch + +import ( + "context" + "fmt" + "strconv" + "sync" + + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/compose" +) + +// Node is a batch processor that runs a Graph/Workflow for each input item. +// It supports configurable concurrency, interrupt/resume, and callbacks. +// +// Type parameters: +// - I: Input type for each item +// - O: Output type for each item +type Node[I, O any] struct { + name string + innerTask Compilable[I, O] + maxConcurrency int + innerCompileOptions []compose.GraphCompileOption +} + +// NewBatchNode creates a new batch processing node. +// +// Example: +// +// batchNode := batch.NewBatchNode(&batch.NodeConfig[Request, Response]{ +// Name: "MyBatchProcessor", +// InnerTask: myWorkflow, +// MaxConcurrency: 5, +// }) +func NewBatchNode[I, O any](config *NodeConfig[I, O]) *Node[I, O] { + name := config.Name + if name == "" { + name = "Node" + } + return &Node[I, O]{ + name: name, + innerTask: config.InnerTask, + maxConcurrency: config.MaxConcurrency, + innerCompileOptions: config.InnerCompileOptions, + } +} + +// GetType returns the node name for callback identification. +// Implements components.Typer interface. +func (b *Node[I, O]) GetType() string { + return b.name +} + +// IsCallbacksEnabled returns true to enable callback support. +// Implements components.Checker interface. +func (b *Node[I, O]) IsCallbacksEnabled() bool { + return true +} + +// Invoke processes all inputs through the inner task and returns results. +// It handles concurrency, errors, and interrupts according to configuration. +// +// Parameters: +// - ctx: Context for cancellation and deadline +// - inputs: Slice of input items to process +// - opts: Optional batch options (e.g., WithInnerOptions) +// +// Returns: +// - []O: Results in the same order as inputs +// - error: First normal error, or CompositeInterrupt if any task interrupted +func (b *Node[I, O]) Invoke(ctx context.Context, inputs []I, opts ...Option) ([]O, error) { + batchOpts := applyBatchOptions(opts...) + + // Setup callbacks for batch-level monitoring + ctx = callbacks.EnsureRunInfo(ctx, b.name, ComponentOfBatchNode) + ctx = callbacks.OnStart(ctx, &CallbackInput[I]{ + Inputs: inputs, + MaxConcurrency: b.maxConcurrency, + }) + + outputs, err := b.invoke(ctx, inputs, batchOpts) + if err != nil { + callbacks.OnError(ctx, err) + return nil, err + } + + callbacks.OnEnd(ctx, &CallbackOutput[O]{Outputs: outputs}) + return outputs, nil +} + +// invoke is the internal implementation of batch processing. +func (b *Node[I, O]) invoke(ctx context.Context, inputs []I, batchOpts *options) ([]O, error) { + // Check if this is a resume from a previous interrupt + wasInterrupted, hasState, prevState := compose.GetInterruptState[*NodeInterruptState](ctx) + + var store *batchBridgeStore + var indicesToProcess []int + var effectiveInputs []I + + if wasInterrupted && hasState && prevState != nil { + // RESUME PATH: Restore state from previous interrupt + // Use fresh store (don't restore checkpoint data - it causes input issues) + store = newBatchBridgeStore() + indicesToProcess = prevState.InterruptedIndices + + // Restore original inputs from interrupt state + // (inputs parameter is nil during resume) + effectiveInputs = make([]I, prevState.TotalCount) + for i, v := range prevState.OriginalInputs { + if typedInput, ok := v.(I); ok { + effectiveInputs[i] = typedInput + } + } + } else { + // FIRST RUN PATH: Process all inputs + store = newBatchBridgeStore() + effectiveInputs = inputs + indicesToProcess = make([]int, len(inputs)) + for i := range inputs { + indicesToProcess[i] = i + } + } + + // Allocate output slice + outputs := make([]O, len(effectiveInputs)) + + // Restore completed results from previous run (if resuming) + if wasInterrupted && hasState && prevState != nil { + for idx, result := range prevState.CompletedResults { + if idx < len(outputs) { + if typedResult, ok := result.(O); ok { + outputs[idx] = typedResult + } + } + } + } + + // Compile inner task with checkpoint store + compileOpts := append([]compose.GraphCompileOption{ + compose.WithCheckPointStore(store), + }, b.innerCompileOptions...) + + runner, err := b.innerTask.Compile(ctx, compileOpts...) + if err != nil { + return nil, fmt.Errorf("failed to compile inner task: %w", err) + } + + // Nothing to process (all completed in previous run) + if len(indicesToProcess) == 0 { + return outputs, nil + } + + // Task result for collecting outputs from goroutines + type taskResult struct { + index int + output O + err error + } + + resultCh := make(chan taskResult, len(indicesToProcess)) + var wg sync.WaitGroup + + // runTask executes a single inner task + runTask := func(index int, input I) { + defer wg.Done() + + // Create sub-context with unique address segment for this task + // This enables proper interrupt ID generation (e.g., "batch_process:0") + subCtx := compose.AppendAddressSegment(ctx, AddressSegmentBatchProcess, strconv.Itoa(index)) + + // Combine checkpoint ID with user-provided inner options + invokeOpts := append([]compose.Option{ + compose.WithCheckPointID(makeBatchCheckpointID(index)), + }, batchOpts.innerOptions...) + + output, taskErr := runner.Invoke(subCtx, input, invokeOpts...) + resultCh <- taskResult{index: index, output: output, err: taskErr} + } + + // Execute tasks based on concurrency setting + if b.maxConcurrency == 0 { + // Sequential: Run one task at a time + for _, idx := range indicesToProcess { + wg.Add(1) + runTask(idx, effectiveInputs[idx]) + } + } else { + // Concurrent: Use semaphore to limit parallelism + sem := make(chan struct{}, b.maxConcurrency) + + for i, idx := range indicesToProcess { + wg.Add(1) + if i == 0 { + // First task runs on main goroutine (optimization) + runTask(idx, effectiveInputs[idx]) + } else { + // Subsequent tasks run in goroutines with semaphore + go func(index int, input I) { + sem <- struct{}{} + defer func() { <-sem }() + runTask(index, input) + }(idx, effectiveInputs[idx]) + } + } + } + + // Close result channel when all tasks complete + go func() { + wg.Wait() + close(resultCh) + }() + + // Collect results and categorize errors + var normalErr error + var interruptErrs []error + completedResults := make(map[int]any) + interruptedIndices := make([]int, 0) + + for result := range resultCh { + if result.err != nil { + if _, ok := compose.ExtractInterruptInfo(result.err); ok { + // Interrupt error: collect for CompositeInterrupt + interruptErrs = append(interruptErrs, result.err) + interruptedIndices = append(interruptedIndices, result.index) + } else if normalErr == nil { + // Normal error: keep first one + normalErr = fmt.Errorf("task %d failed: %w", result.index, result.err) + } + } else { + // Success: store result + outputs[result.index] = result.output + completedResults[result.index] = result.output + } + } + + // Return first normal error (if any) + if normalErr != nil { + return nil, normalErr + } + + // Return composite interrupt (if any tasks interrupted) + if len(interruptErrs) > 0 { + // Store original inputs for resume (inputs will be nil on resume call) + originalInputs := make([]any, len(effectiveInputs)) + for i, v := range effectiveInputs { + originalInputs[i] = v + } + state := &NodeInterruptState{ + OriginalInputs: originalInputs, + CompletedResults: completedResults, + InterruptedIndices: interruptedIndices, + TotalCount: len(effectiveInputs), + } + // CompositeInterrupt bundles all interrupt errors with state for resume + return nil, compose.CompositeInterrupt(ctx, nil, state, interruptErrs...) + } + + return outputs, nil +} diff --git a/compose/batch/batch/options.go b/compose/batch/batch/options.go new file mode 100644 index 0000000..15993e2 --- /dev/null +++ b/compose/batch/batch/options.go @@ -0,0 +1,56 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package batch + +import "github.com/cloudwego/eino/compose" + +// options holds runtime configuration for a batch invocation. +type options struct { + // innerOptions are compose.Option values passed to each inner task invocation. + // These are request-time options (vs compile-time options in NodeConfig). + innerOptions []compose.Option +} + +// Option is a function that configures batch invocation options. +type Option func(*options) + +// WithInnerOptions passes compose.Option values to each inner task invocation. +// Use this for request-time options like: +// - compose.WithCallbacks: Add callbacks for progress tracking +// - compose.WithMaxRunSteps: Limit execution steps per task +// +// Example: +// +// batchNode.Invoke(ctx, inputs, +// batch.WithInnerOptions( +// compose.WithCallbacks(progressHandler), +// ), +// ) +func WithInnerOptions(opts ...compose.Option) Option { + return func(o *options) { + o.innerOptions = append(o.innerOptions, opts...) + } +} + +// applyBatchOptions creates an options struct from the given Option functions. +func applyBatchOptions(opts ...Option) *options { + o := &options{} + for _, opt := range opts { + opt(o) + } + return o +} diff --git a/compose/batch/batch/store.go b/compose/batch/batch/store.go new file mode 100644 index 0000000..fb58ca8 --- /dev/null +++ b/compose/batch/batch/store.go @@ -0,0 +1,90 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package batch + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" +) + +// batchBridgeStore implements compose.CheckPointStore for batch processing. +// It stores checkpoint data keyed by batch index, allowing each sub-task +// to have its own checkpoint namespace. +// +// This store is used internally by BatchNode and is not meant for external use. +// For interrupt/resume, the BatchNode stores its state via CompositeInterrupt, +// not through this checkpoint store. +type batchBridgeStore struct { + mu sync.RWMutex + data map[int][]byte // index -> checkpoint data +} + +// newBatchBridgeStore creates a new empty checkpoint store. +func newBatchBridgeStore() *batchBridgeStore { + return &batchBridgeStore{ + data: make(map[int][]byte), + } +} + +// makeBatchCheckpointID creates a checkpoint ID for a given batch index. +// Format: "batch_0", "batch_1", etc. +func makeBatchCheckpointID(index int) string { + return fmt.Sprintf("batch_%d", index) +} + +// parseBatchIndex extracts the batch index from a checkpoint ID. +// Returns error if the ID format is invalid. +func parseBatchIndex(checkPointID string) (int, error) { + if !strings.HasPrefix(checkPointID, "batch_") { + return 0, fmt.Errorf("invalid batch checkpoint ID: %s", checkPointID) + } + indexStr := strings.TrimPrefix(checkPointID, "batch_") + return strconv.Atoi(indexStr) +} + +// Get retrieves checkpoint data for a batch index. +// Implements compose.CheckPointStore interface. +func (m *batchBridgeStore) Get(_ context.Context, checkPointID string) ([]byte, bool, error) { + index, err := parseBatchIndex(checkPointID) + if err != nil { + return nil, false, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + data, ok := m.data[index] + return data, ok, nil +} + +// Set stores checkpoint data for a batch index. +// Implements compose.CheckPointStore interface. +func (m *batchBridgeStore) Set(_ context.Context, checkPointID string, checkPoint []byte) error { + index, err := parseBatchIndex(checkPointID) + if err != nil { + return err + } + + m.mu.Lock() + defer m.mu.Unlock() + + m.data[index] = checkPoint + return nil +} diff --git a/compose/batch/batch/types.go b/compose/batch/batch/types.go new file mode 100644 index 0000000..484985d --- /dev/null +++ b/compose/batch/batch/types.go @@ -0,0 +1,106 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package batch provides a BatchNode implementation for processing multiple inputs +// through a Graph or Workflow with configurable concurrency and interrupt/resume support. +// +// Key features: +// - Generic batch processing: Accept []I, return []O +// - Configurable concurrency: Sequential (0) or concurrent with limit (>0) +// - Interrupt handling: Collects interrupts from sub-tasks using CompositeInterrupt +// - Resume support: Restores state and only re-runs interrupted tasks +// - Callbacks: Implements Typer and Checker interfaces for callback support +package batch + +import ( + "context" + + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" +) + +func init() { + // Register NodeInterruptState for serialization during checkpoint save/restore. + // This is required for the interrupt state to be properly persisted. + schema.RegisterName[*NodeInterruptState]("batch.NodeInterruptState") +} + +// ComponentOfBatchNode is the component type identifier for callbacks. +// Used by callbacks.EnsureRunInfo to identify this component in the callback chain. +const ComponentOfBatchNode components.Component = "Batch" + +// AddressSegmentBatchProcess is the address segment type for batch processing. +// Used by compose.AppendAddressSegment to create unique addresses for each sub-task, +// enabling proper interrupt ID generation (e.g., "batch_process:0", "batch_process:1"). +const AddressSegmentBatchProcess compose.AddressSegmentType = "batch_process" + +// Compilable represents a Graph or Workflow that can be compiled into a Runnable. +// Both compose.Graph and compose.Workflow implement this interface. +type Compilable[I, O any] interface { + Compile(ctx context.Context, opts ...compose.GraphCompileOption) (compose.Runnable[I, O], error) +} + +// NodeConfig contains configuration for creating a BatchNode. +type NodeConfig[I, O any] struct { + // Name is the node name used for callbacks and logging. Defaults to "Node" if empty. + Name string + + // InnerTask is the Graph or Workflow to run for each input item. + // Must implement Compilable[I, O] interface. + InnerTask Compilable[I, O] + + // MaxConcurrency controls parallel execution: + // - 0: Sequential processing (one task at a time) + // - >0: Concurrent processing with this many parallel tasks + // First task runs on main goroutine, rest run in goroutines + MaxConcurrency int + + // InnerCompileOptions are passed to InnerTask.Compile() for each invocation. + // Use this for compile-time options like WithGraphName. + InnerCompileOptions []compose.GraphCompileOption +} + +// NodeInterruptState stores the batch node's state when an interrupt occurs. +// This state is persisted via CompositeInterrupt and restored on resume. +type NodeInterruptState struct { + // OriginalInputs stores all input items (as []any for serialization). + // Required because inputs are not passed during resume invocation. + OriginalInputs []any + + // CompletedResults maps index -> result for tasks that completed before interrupt. + // These results are restored directly without re-running the tasks. + CompletedResults map[int]any + + // InterruptedIndices lists which task indices were interrupted. + // Only these tasks will be re-run on resume. + InterruptedIndices []int + + // TotalCount is the total number of input items. + // Used to allocate the correct output slice size on resume. + TotalCount int +} + +// CallbackInput is passed to callbacks.OnStart when batch processing begins. +type CallbackInput[I any] struct { + Inputs []I // All input items to be processed + MaxConcurrency int // Configured concurrency limit +} + +// CallbackOutput is passed to callbacks.OnEnd when batch processing completes. +type CallbackOutput[O any] struct { + Outputs []O // All output results +} diff --git a/compose/batch/main.go b/compose/batch/main.go new file mode 100644 index 0000000..232fe4d --- /dev/null +++ b/compose/batch/main.go @@ -0,0 +1,611 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This example demonstrates the BatchNode component for processing multiple inputs +// through a Graph or Workflow with configurable concurrency and interrupt/resume support. +// +// Business Scenario: Document Review Pipeline +// A compliance team needs to review multiple documents. Each document goes through +// an automated review workflow, with high-priority documents requiring human approval. +// +// Scenarios covered: +// 1. Basic Sequential Processing - Process documents one at a time +// 2. Concurrent Processing - Process multiple documents in parallel +// 3. Compile Options - Configure inner workflow at compile time +// 4. Invoke Options (Callbacks) - Add callbacks for monitoring +// 5. Error Handling - Handle errors from individual tasks +// 6. Interrupt & Resume - Human-in-the-loop for high-priority documents +// 7. Parent Graph with Reduce - Integrate BatchNode in a larger pipeline +package main + +import ( + "context" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" + + "github.com/cloudwego/eino-examples/compose/batch/batch" +) + +func init() { + // Register types for serialization (required for interrupt/resume checkpoint) + schema.RegisterName[ReviewRequest]("batch_example.ReviewRequest") + schema.RegisterName[ReviewResult]("batch_example.ReviewResult") + schema.RegisterName[*ApprovalDecision]("batch_example.ApprovalDecision") + schema.RegisterName[[]ReviewRequest]("batch_example.ReviewRequestSlice") + schema.RegisterName[[]ReviewResult]("batch_example.ReviewResultSlice") +} + +// ReviewRequest represents a document to be reviewed +type ReviewRequest struct { + DocumentID string + Content string + Priority string // "high", "medium", "low" +} + +// ReviewResult contains the outcome of a document review +type ReviewResult struct { + DocumentID string + Approved bool + Score float64 + Comments string + ReviewedAt time.Time +} + +// ReviewReport aggregates results from batch processing +type ReviewReport struct { + TotalDocuments int + ApprovedCount int + RejectedCount int + AverageScore float64 + HighPriorityPass int + Results []ReviewResult + GeneratedAt time.Time +} + +// ApprovalDecision is the human decision for interrupted documents +type ApprovalDecision struct { + Approved bool + Comments string +} + +// BatchReviewInput wraps documents with batch metadata +type BatchReviewInput struct { + Documents []ReviewRequest + BatchName string +} + +func main() { + ctx := context.Background() + + fmt.Println("=== Document Review Pipeline Example ===") + fmt.Println() + + fmt.Println("--- Scenario 1: Basic Sequential Processing ---") + runBasicSequential(ctx) + fmt.Println() + + fmt.Println("--- Scenario 2: Concurrent Processing ---") + runConcurrent(ctx) + fmt.Println() + + fmt.Println("--- Scenario 3: With Compile Options ---") + runWithCompileOptions(ctx) + fmt.Println() + + fmt.Println("--- Scenario 4: With Invoke Options (Callbacks) ---") + runWithInvokeOptions(ctx) + fmt.Println() + + fmt.Println("--- Scenario 5: Normal Error Handling ---") + runWithError(ctx) + fmt.Println() + + fmt.Println("--- Scenario 6: Interrupt & Resume ---") + runInterruptAndResume(ctx) + fmt.Println() + + fmt.Println("--- Scenario 7: Parent Graph with Reduce Node ---") + runParentGraphWithReduce(ctx) + fmt.Println() + + fmt.Println("=== All Scenarios Completed ===") +} + +// createSampleDocuments generates test documents with rotating priorities +func createSampleDocuments(count int) []ReviewRequest { + priorities := []string{"high", "medium", "low"} + docs := make([]ReviewRequest, count) + for i := 0; i < count; i++ { + docs[i] = ReviewRequest{ + DocumentID: fmt.Sprintf("DOC-%03d", i+1), + Content: fmt.Sprintf("Document content for review #%d. This is a sample compliance document.", i+1), + Priority: priorities[i%len(priorities)], + } + } + return docs +} + +// createSimpleReviewWorkflow creates a basic document review workflow +// that simulates automated review with random scoring +func createSimpleReviewWorkflow() *compose.Workflow[ReviewRequest, ReviewResult] { + workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]() + + workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) { + time.Sleep(50 * time.Millisecond) // Simulate processing time + + score := 0.5 + rand.Float64()*0.5 + approved := score >= 0.7 + + return ReviewResult{ + DocumentID: req.DocumentID, + Approved: approved, + Score: score, + Comments: fmt.Sprintf("Auto-reviewed document %s with priority %s", req.DocumentID, req.Priority), + ReviewedAt: time.Now(), + }, nil + })).AddInput(compose.START) + + workflow.End().AddInput("analyze") + + return workflow +} + +// Scenario 1: Basic Sequential Processing +// Demonstrates: MaxConcurrency=0 for sequential execution +func runBasicSequential(ctx context.Context) { + docs := createSampleDocuments(3) + workflow := createSimpleReviewWorkflow() + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "SequentialReviewer", + InnerTask: workflow, + MaxConcurrency: 0, // Sequential: process one at a time + }) + + start := time.Now() + results, err := batchNode.Invoke(ctx, docs) + elapsed := time.Since(start) + + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Printf("Processed %d documents sequentially in %v\n", len(results), elapsed) + for _, r := range results { + fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score) + } +} + +// Scenario 2: Concurrent Processing +// Demonstrates: MaxConcurrency>0 for parallel execution with limit +func runConcurrent(ctx context.Context) { + docs := createSampleDocuments(5) + workflow := createSimpleReviewWorkflow() + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "ConcurrentReviewer", + InnerTask: workflow, + MaxConcurrency: 3, // Concurrent: up to 3 parallel tasks + }) + + start := time.Now() + results, err := batchNode.Invoke(ctx, docs) + elapsed := time.Since(start) + + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Printf("Processed %d documents concurrently (max 3) in %v\n", len(results), elapsed) + for _, r := range results { + fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score) + } +} + +// Scenario 3: With Compile Options +// Demonstrates: InnerCompileOptions for configuring inner workflow at compile time +func runWithCompileOptions(ctx context.Context) { + docs := createSampleDocuments(3) + workflow := createSimpleReviewWorkflow() + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "NamedReviewer", + InnerTask: workflow, + MaxConcurrency: 2, + InnerCompileOptions: []compose.GraphCompileOption{ + compose.WithGraphName("SingleDocumentReviewWorkflow"), // Name for debugging/tracing + }, + }) + + results, err := batchNode.Invoke(ctx, docs) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Printf("Processed %d documents with named inner workflow\n", len(results)) + for _, r := range results { + fmt.Printf(" - %s: approved=%v\n", r.DocumentID, r.Approved) + } +} + +// Scenario 4: With Invoke Options (Callbacks) +// Demonstrates: Callbacks for batch-level monitoring via context +func runWithInvokeOptions(ctx context.Context) { + docs := createSampleDocuments(3) + workflow := createSimpleReviewWorkflow() + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "CallbackReviewer", + InnerTask: workflow, + MaxConcurrency: 0, + }) + + // Create callback handler for monitoring + handler := callbacks.NewHandlerBuilder(). + OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + fmt.Printf(" [Callback] OnStart: %s/%s\n", info.Component, info.Name) + return ctx + }). + OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + fmt.Printf(" [Callback] OnEnd: %s/%s\n", info.Component, info.Name) + return ctx + }). + OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + fmt.Printf(" [Callback] OnError: %s/%s - %v\n", info.Component, info.Name, err) + return ctx + }). + Build() + + // Initialize callbacks in context + ctxWithCallback := callbacks.InitCallbacks(ctx, nil, handler) + + results, err := batchNode.Invoke(ctxWithCallback, docs) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Printf("Processed %d documents with callbacks\n", len(results)) +} + +// Scenario 5: Normal Error Handling +// Demonstrates: How BatchNode handles errors from individual tasks +func runWithError(ctx context.Context) { + workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]() + + workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) { + // Simulate validation failure for specific document + if req.DocumentID == "DOC-002" { + return ReviewResult{}, fmt.Errorf("validation failed for document %s: content too short", req.DocumentID) + } + return ReviewResult{ + DocumentID: req.DocumentID, + Approved: true, + Score: 0.9, + Comments: "Passed validation", + ReviewedAt: time.Now(), + }, nil + })).AddInput(compose.START) + + workflow.End().AddInput("analyze") + + docs := createSampleDocuments(3) + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "ErrorHandlingReviewer", + InnerTask: workflow, + MaxConcurrency: 0, + }) + + results, err := batchNode.Invoke(ctx, docs) + if err != nil { + fmt.Printf("Expected error occurred: %v\n", err) + return + } + + fmt.Printf("Results: %v\n", results) +} + +// Scenario 6: Interrupt & Resume +// Demonstrates: Human-in-the-loop workflow using compose.Interrupt and compose.BatchResumeWithData +// +// Flow: +// 1. First invocation: High-priority documents interrupt for human review +// 2. Extract interrupt contexts with document IDs +// 3. Resume with approval decisions using BatchResumeWithData +// 4. Interrupted tasks complete with human decisions +func runInterruptAndResume(ctx context.Context) { + innerWorkflow := compose.NewWorkflow[ReviewRequest, ReviewResult]() + + innerWorkflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) { + if req.Priority == "high" { + // Check if this is a resume from previous interrupt + wasInterrupted, _, _ := compose.GetInterruptState[any](ctx) + if !wasInterrupted { + // First run: interrupt for human review + fmt.Printf(" Document %s requires human review (high priority)\n", req.DocumentID) + return ReviewResult{}, compose.Interrupt(ctx, map[string]string{ + "document_id": req.DocumentID, + "reason": "High priority document requires human approval", + }) + } + + // Resume: check if we have approval decision + isResumeTarget, hasData, decision := compose.GetResumeContext[*ApprovalDecision](ctx) + if isResumeTarget && hasData && decision != nil { + fmt.Printf(" Document %s resumed with decision: approved=%v\n", req.DocumentID, decision.Approved) + return ReviewResult{ + DocumentID: req.DocumentID, + Approved: decision.Approved, + Score: 1.0, + Comments: fmt.Sprintf("Human review: %s", decision.Comments), + ReviewedAt: time.Now(), + }, nil + } + + // Still waiting for decision + return ReviewResult{}, compose.Interrupt(ctx, map[string]string{ + "document_id": req.DocumentID, + "reason": "Still waiting for human approval", + }) + } + + // Non-high priority: auto-approve + return ReviewResult{ + DocumentID: req.DocumentID, + Approved: true, + Score: 0.85, + Comments: "Auto-approved (non-high priority)", + ReviewedAt: time.Now(), + }, nil + })).AddInput(compose.START) + + innerWorkflow.End().AddInput("analyze") + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "InterruptReviewer", + InnerTask: innerWorkflow, + MaxConcurrency: 0, + }) + + // Wrap BatchNode in a parent graph for proper interrupt handling + parentGraph := compose.NewGraph[[]ReviewRequest, []ReviewResult]() + _ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) { + return batchNode.Invoke(ctx, inputs) + })) + _ = parentGraph.AddEdge(compose.START, "batch_review") + _ = parentGraph.AddEdge("batch_review", compose.END) + + // Compile with checkpoint store for state persistence + store := newMemoryCheckpointStore() + runner, err := parentGraph.Compile(ctx, + compose.WithGraphName("InterruptResumeDemo"), + compose.WithCheckPointStore(store), + ) + if err != nil { + fmt.Printf("Failed to compile graph: %v\n", err) + return + } + + docs := []ReviewRequest{ + {DocumentID: "DOC-001", Content: "Content 1", Priority: "high"}, + {DocumentID: "DOC-002", Content: "Content 2", Priority: "medium"}, + {DocumentID: "DOC-003", Content: "Content 3", Priority: "high"}, + {DocumentID: "DOC-004", Content: "Content 4", Priority: "low"}, + } + + checkpointID := "interrupt-resume-demo-001" + + // Step 1: First invocation - high priority docs will interrupt + fmt.Println("First invocation (will interrupt for high priority docs):") + results, err := runner.Invoke(ctx, docs, compose.WithCheckPointID(checkpointID)) + + if err != nil { + // Step 2: Extract interrupt info + info, infoOk := compose.ExtractInterruptInfo(err) + if infoOk && len(info.InterruptContexts) > 0 { + fmt.Printf("\n Interrupt detected! Found %d interrupt context(s):\n", len(info.InterruptContexts)) + + // Step 3: Prepare resume data with approval decisions + resumeData := make(map[string]any) + for i, iCtx := range info.InterruptContexts { + infoMap, _ := iCtx.Info.(map[string]string) + docID := infoMap["document_id"] + fmt.Printf(" %d. ID=%s\n", i+1, iCtx.ID) + fmt.Printf(" Address=%v\n", iCtx.Address) + fmt.Printf(" DocumentID=%s, Reason=%s\n", docID, infoMap["reason"]) + resumeData[iCtx.ID] = &ApprovalDecision{ + Approved: true, + Comments: fmt.Sprintf("Approved by supervisor for %s", docID), + } + } + + // Step 4: Resume with approval decisions + fmt.Println("\n Resuming with approval decisions...") + resumeCtx := compose.BatchResumeWithData(ctx, resumeData) + results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID)) + if err != nil { + fmt.Printf(" Resume error: %v\n", err) + return + } + + fmt.Println("\n Final results after resume:") + for _, r := range results { + fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments) + } + return + } + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Println("\nFinal results:") + for _, r := range results { + fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments) + } +} + +// memoryCheckpointStore is a simple in-memory checkpoint store for demos +type memoryCheckpointStore struct { + mu sync.RWMutex + data map[string][]byte +} + +func newMemoryCheckpointStore() *memoryCheckpointStore { + return &memoryCheckpointStore{ + data: make(map[string][]byte), + } +} + +func (m *memoryCheckpointStore) Get(_ context.Context, id string) ([]byte, bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() + data, ok := m.data[id] + return data, ok, nil +} + +func (m *memoryCheckpointStore) Set(_ context.Context, id string, data []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + m.data[id] = data + return nil +} + +// Scenario 7: Parent Graph with Reduce Node +// Demonstrates: +// - BatchNode as a node in a larger pipeline +// - WithInnerOptions for passing runtime options (callbacks) to inner tasks +// - Reduce pattern: aggregate batch results into a summary report +func runParentGraphWithReduce(ctx context.Context) { + innerWorkflow := createSimpleReviewWorkflow() + + batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{ + Name: "BatchDocumentReviewer", + InnerTask: innerWorkflow, + MaxConcurrency: 3, + InnerCompileOptions: []compose.GraphCompileOption{ + compose.WithGraphName("SingleDocReview"), + }, + }) + + parentGraph := compose.NewGraph[BatchReviewInput, ReviewReport]() + + // Node 1: Preprocess - extract documents from batch input + _ = parentGraph.AddLambdaNode("preprocess", compose.InvokableLambda(func(ctx context.Context, input BatchReviewInput) ([]ReviewRequest, error) { + fmt.Printf(" Preprocessing batch '%s' with %d documents\n", input.BatchName, len(input.Documents)) + return input.Documents, nil + })) + + // Node 2: Batch Review - process all documents with progress tracking + _ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) { + fmt.Printf(" Starting batch review of %d documents\n", len(inputs)) + + // Progress tracking with atomic counter (thread-safe for concurrent processing) + var completedCount int32 + totalCount := int32(len(inputs)) + + // Create callback for progress tracking + reviewProgressHandler := callbacks.NewHandlerBuilder(). + OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + if info.Component == "Workflow" { + current := atomic.AddInt32(&completedCount, 1) + fmt.Printf(" [Progress] %d/%d documents reviewed\n", current, totalCount) + } + return ctx + }). + Build() + + // Use WithInnerOptions to pass callbacks to each inner task + return batchNode.Invoke(ctx, inputs, + batch.WithInnerOptions(compose.WithCallbacks(reviewProgressHandler)), + ) + })) + + // Node 3: Reduce - aggregate results into a report + _ = parentGraph.AddLambdaNode("reduce", compose.InvokableLambda(func(ctx context.Context, results []ReviewResult) (ReviewReport, error) { + fmt.Printf(" Reducing %d results into report\n", len(results)) + + report := ReviewReport{ + TotalDocuments: len(results), + Results: results, + GeneratedAt: time.Now(), + } + + var totalScore float64 + for _, r := range results { + totalScore += r.Score + if r.Approved { + report.ApprovedCount++ + } else { + report.RejectedCount++ + } + } + + if len(results) > 0 { + report.AverageScore = totalScore / float64(len(results)) + } + + return report, nil + })) + + // Connect nodes: preprocess -> batch_review -> reduce + _ = parentGraph.AddEdge(compose.START, "preprocess") + _ = parentGraph.AddEdge("preprocess", "batch_review") + _ = parentGraph.AddEdge("batch_review", "reduce") + _ = parentGraph.AddEdge("reduce", compose.END) + + runner, err := parentGraph.Compile(ctx, compose.WithGraphName("DocumentReviewPipeline")) + if err != nil { + fmt.Printf("Failed to compile parent graph: %v\n", err) + return + } + + input := BatchReviewInput{ + Documents: createSampleDocuments(5), + BatchName: "Q4-Compliance-Review", + } + + fmt.Println("Running document review pipeline:") + report, err := runner.Invoke(ctx, input) + if err != nil { + fmt.Printf("Pipeline error: %v\n", err) + return + } + + fmt.Println("\n=== Review Report ===") + fmt.Printf(" Total Documents: %d\n", report.TotalDocuments) + fmt.Printf(" Approved: %d\n", report.ApprovedCount) + fmt.Printf(" Rejected: %d\n", report.RejectedCount) + fmt.Printf(" Average Score: %.2f\n", report.AverageScore) + fmt.Printf(" Generated At: %s\n", report.GeneratedAt.Format(time.RFC3339)) + fmt.Println("\n Individual Results:") + for _, r := range report.Results { + status := "✓" + if !r.Approved { + status = "✗" + } + fmt.Printf(" %s %s (score: %.2f)\n", status, r.DocumentID, r.Score) + } +}