You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

611 lines
20 KiB
Go

/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This example demonstrates the BatchNode component for processing multiple inputs
// through a Graph or Workflow with configurable concurrency and interrupt/resume support.
//
// Business Scenario: Document Review Pipeline
// A compliance team needs to review multiple documents. Each document goes through
// an automated review workflow, with high-priority documents requiring human approval.
//
// Scenarios covered:
// 1. Basic Sequential Processing - Process documents one at a time
// 2. Concurrent Processing - Process multiple documents in parallel
// 3. Compile Options - Configure inner workflow at compile time
// 4. Invoke Options (Callbacks) - Add callbacks for monitoring
// 5. Error Handling - Handle errors from individual tasks
// 6. Interrupt & Resume - Human-in-the-loop for high-priority documents
// 7. Parent Graph with Reduce - Integrate BatchNode in a larger pipeline
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-examples/compose/batch/batch"
)
func init() {
// Register types for serialization (required for interrupt/resume checkpoint)
schema.RegisterName[ReviewRequest]("batch_example.ReviewRequest")
schema.RegisterName[ReviewResult]("batch_example.ReviewResult")
schema.RegisterName[*ApprovalDecision]("batch_example.ApprovalDecision")
schema.RegisterName[[]ReviewRequest]("batch_example.ReviewRequestSlice")
schema.RegisterName[[]ReviewResult]("batch_example.ReviewResultSlice")
}
// ReviewRequest represents a document to be reviewed
type ReviewRequest struct {
DocumentID string
Content string
Priority string // "high", "medium", "low"
}
// ReviewResult contains the outcome of a document review
type ReviewResult struct {
DocumentID string
Approved bool
Score float64
Comments string
ReviewedAt time.Time
}
// ReviewReport aggregates results from batch processing
type ReviewReport struct {
TotalDocuments int
ApprovedCount int
RejectedCount int
AverageScore float64
HighPriorityPass int
Results []ReviewResult
GeneratedAt time.Time
}
// ApprovalDecision is the human decision for interrupted documents
type ApprovalDecision struct {
Approved bool
Comments string
}
// BatchReviewInput wraps documents with batch metadata
type BatchReviewInput struct {
Documents []ReviewRequest
BatchName string
}
func main() {
ctx := context.Background()
fmt.Println("=== Document Review Pipeline Example ===")
fmt.Println()
fmt.Println("--- Scenario 1: Basic Sequential Processing ---")
runBasicSequential(ctx)
fmt.Println()
fmt.Println("--- Scenario 2: Concurrent Processing ---")
runConcurrent(ctx)
fmt.Println()
fmt.Println("--- Scenario 3: With Compile Options ---")
runWithCompileOptions(ctx)
fmt.Println()
fmt.Println("--- Scenario 4: With Invoke Options (Callbacks) ---")
runWithInvokeOptions(ctx)
fmt.Println()
fmt.Println("--- Scenario 5: Normal Error Handling ---")
runWithError(ctx)
fmt.Println()
fmt.Println("--- Scenario 6: Interrupt & Resume ---")
runInterruptAndResume(ctx)
fmt.Println()
fmt.Println("--- Scenario 7: Parent Graph with Reduce Node ---")
runParentGraphWithReduce(ctx)
fmt.Println()
fmt.Println("=== All Scenarios Completed ===")
}
// createSampleDocuments generates test documents with rotating priorities
func createSampleDocuments(count int) []ReviewRequest {
priorities := []string{"high", "medium", "low"}
docs := make([]ReviewRequest, count)
for i := 0; i < count; i++ {
docs[i] = ReviewRequest{
DocumentID: fmt.Sprintf("DOC-%03d", i+1),
Content: fmt.Sprintf("Document content for review #%d. This is a sample compliance document.", i+1),
Priority: priorities[i%len(priorities)],
}
}
return docs
}
// createSimpleReviewWorkflow creates a basic document review workflow
// that simulates automated review with random scoring
func createSimpleReviewWorkflow() *compose.Workflow[ReviewRequest, ReviewResult] {
workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
time.Sleep(50 * time.Millisecond) // Simulate processing time
score := 0.5 + rand.Float64()*0.5
approved := score >= 0.7
return ReviewResult{
DocumentID: req.DocumentID,
Approved: approved,
Score: score,
Comments: fmt.Sprintf("Auto-reviewed document %s with priority %s", req.DocumentID, req.Priority),
ReviewedAt: time.Now(),
}, nil
})).AddInput(compose.START)
workflow.End().AddInput("analyze")
return workflow
}
// Scenario 1: Basic Sequential Processing
// Demonstrates: MaxConcurrency=0 for sequential execution
func runBasicSequential(ctx context.Context) {
docs := createSampleDocuments(3)
workflow := createSimpleReviewWorkflow()
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "SequentialReviewer",
InnerTask: workflow,
MaxConcurrency: 0, // Sequential: process one at a time
})
start := time.Now()
results, err := batchNode.Invoke(ctx, docs)
elapsed := time.Since(start)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed %d documents sequentially in %v\n", len(results), elapsed)
for _, r := range results {
fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score)
}
}
// Scenario 2: Concurrent Processing
// Demonstrates: MaxConcurrency>0 for parallel execution with limit
func runConcurrent(ctx context.Context) {
docs := createSampleDocuments(5)
workflow := createSimpleReviewWorkflow()
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "ConcurrentReviewer",
InnerTask: workflow,
MaxConcurrency: 3, // Concurrent: up to 3 parallel tasks
})
start := time.Now()
results, err := batchNode.Invoke(ctx, docs)
elapsed := time.Since(start)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed %d documents concurrently (max 3) in %v\n", len(results), elapsed)
for _, r := range results {
fmt.Printf(" - %s: approved=%v, score=%.2f\n", r.DocumentID, r.Approved, r.Score)
}
}
// Scenario 3: With Compile Options
// Demonstrates: InnerCompileOptions for configuring inner workflow at compile time
func runWithCompileOptions(ctx context.Context) {
docs := createSampleDocuments(3)
workflow := createSimpleReviewWorkflow()
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "NamedReviewer",
InnerTask: workflow,
MaxConcurrency: 2,
InnerCompileOptions: []compose.GraphCompileOption{
compose.WithGraphName("SingleDocumentReviewWorkflow"), // Name for debugging/tracing
},
})
results, err := batchNode.Invoke(ctx, docs)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed %d documents with named inner workflow\n", len(results))
for _, r := range results {
fmt.Printf(" - %s: approved=%v\n", r.DocumentID, r.Approved)
}
}
// Scenario 4: With Invoke Options (Callbacks)
// Demonstrates: Callbacks for batch-level monitoring via context
func runWithInvokeOptions(ctx context.Context) {
docs := createSampleDocuments(3)
workflow := createSimpleReviewWorkflow()
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "CallbackReviewer",
InnerTask: workflow,
MaxConcurrency: 0,
})
// Create callback handler for monitoring
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
fmt.Printf(" [Callback] OnStart: %s/%s\n", info.Component, info.Name)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
fmt.Printf(" [Callback] OnEnd: %s/%s\n", info.Component, info.Name)
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
fmt.Printf(" [Callback] OnError: %s/%s - %v\n", info.Component, info.Name, err)
return ctx
}).
Build()
// Initialize callbacks in context
ctxWithCallback := callbacks.InitCallbacks(ctx, nil, handler)
results, err := batchNode.Invoke(ctxWithCallback, docs)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed %d documents with callbacks\n", len(results))
}
// Scenario 5: Normal Error Handling
// Demonstrates: How BatchNode handles errors from individual tasks
func runWithError(ctx context.Context) {
workflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
workflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
// Simulate validation failure for specific document
if req.DocumentID == "DOC-002" {
return ReviewResult{}, fmt.Errorf("validation failed for document %s: content too short", req.DocumentID)
}
return ReviewResult{
DocumentID: req.DocumentID,
Approved: true,
Score: 0.9,
Comments: "Passed validation",
ReviewedAt: time.Now(),
}, nil
})).AddInput(compose.START)
workflow.End().AddInput("analyze")
docs := createSampleDocuments(3)
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "ErrorHandlingReviewer",
InnerTask: workflow,
MaxConcurrency: 0,
})
results, err := batchNode.Invoke(ctx, docs)
if err != nil {
fmt.Printf("Expected error occurred: %v\n", err)
return
}
fmt.Printf("Results: %v\n", results)
}
// Scenario 6: Interrupt & Resume
// Demonstrates: Human-in-the-loop workflow using compose.Interrupt and compose.BatchResumeWithData
//
// Flow:
// 1. First invocation: High-priority documents interrupt for human review
// 2. Extract interrupt contexts with document IDs
// 3. Resume with approval decisions using BatchResumeWithData
// 4. Interrupted tasks complete with human decisions
func runInterruptAndResume(ctx context.Context) {
innerWorkflow := compose.NewWorkflow[ReviewRequest, ReviewResult]()
innerWorkflow.AddLambdaNode("analyze", compose.InvokableLambda(func(ctx context.Context, req ReviewRequest) (ReviewResult, error) {
if req.Priority == "high" {
// Check if this is a resume from previous interrupt
wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
if !wasInterrupted {
// First run: interrupt for human review
fmt.Printf(" Document %s requires human review (high priority)\n", req.DocumentID)
return ReviewResult{}, compose.Interrupt(ctx, map[string]string{
"document_id": req.DocumentID,
"reason": "High priority document requires human approval",
})
}
// Resume: check if we have approval decision
isResumeTarget, hasData, decision := compose.GetResumeContext[*ApprovalDecision](ctx)
if isResumeTarget && hasData && decision != nil {
fmt.Printf(" Document %s resumed with decision: approved=%v\n", req.DocumentID, decision.Approved)
return ReviewResult{
DocumentID: req.DocumentID,
Approved: decision.Approved,
Score: 1.0,
Comments: fmt.Sprintf("Human review: %s", decision.Comments),
ReviewedAt: time.Now(),
}, nil
}
// Still waiting for decision
return ReviewResult{}, compose.Interrupt(ctx, map[string]string{
"document_id": req.DocumentID,
"reason": "Still waiting for human approval",
})
}
// Non-high priority: auto-approve
return ReviewResult{
DocumentID: req.DocumentID,
Approved: true,
Score: 0.85,
Comments: "Auto-approved (non-high priority)",
ReviewedAt: time.Now(),
}, nil
})).AddInput(compose.START)
innerWorkflow.End().AddInput("analyze")
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "InterruptReviewer",
InnerTask: innerWorkflow,
MaxConcurrency: 0,
})
// Wrap BatchNode in a parent graph for proper interrupt handling
parentGraph := compose.NewGraph[[]ReviewRequest, []ReviewResult]()
_ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) {
return batchNode.Invoke(ctx, inputs)
}))
_ = parentGraph.AddEdge(compose.START, "batch_review")
_ = parentGraph.AddEdge("batch_review", compose.END)
// Compile with checkpoint store for state persistence
store := newMemoryCheckpointStore()
runner, err := parentGraph.Compile(ctx,
compose.WithGraphName("InterruptResumeDemo"),
compose.WithCheckPointStore(store),
)
if err != nil {
fmt.Printf("Failed to compile graph: %v\n", err)
return
}
docs := []ReviewRequest{
{DocumentID: "DOC-001", Content: "Content 1", Priority: "high"},
{DocumentID: "DOC-002", Content: "Content 2", Priority: "medium"},
{DocumentID: "DOC-003", Content: "Content 3", Priority: "high"},
{DocumentID: "DOC-004", Content: "Content 4", Priority: "low"},
}
checkpointID := "interrupt-resume-demo-001"
// Step 1: First invocation - high priority docs will interrupt
fmt.Println("First invocation (will interrupt for high priority docs):")
results, err := runner.Invoke(ctx, docs, compose.WithCheckPointID(checkpointID))
if err != nil {
// Step 2: Extract interrupt info
info, infoOk := compose.ExtractInterruptInfo(err)
if infoOk && len(info.InterruptContexts) > 0 {
fmt.Printf("\n Interrupt detected! Found %d interrupt context(s):\n", len(info.InterruptContexts))
// Step 3: Prepare resume data with approval decisions
resumeData := make(map[string]any)
for i, iCtx := range info.InterruptContexts {
infoMap, _ := iCtx.Info.(map[string]string)
docID := infoMap["document_id"]
fmt.Printf(" %d. ID=%s\n", i+1, iCtx.ID)
fmt.Printf(" Address=%v\n", iCtx.Address)
fmt.Printf(" DocumentID=%s, Reason=%s\n", docID, infoMap["reason"])
resumeData[iCtx.ID] = &ApprovalDecision{
Approved: true,
Comments: fmt.Sprintf("Approved by supervisor for %s", docID),
}
}
// Step 4: Resume with approval decisions
fmt.Println("\n Resuming with approval decisions...")
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))
if err != nil {
fmt.Printf(" Resume error: %v\n", err)
return
}
fmt.Println("\n Final results after resume:")
for _, r := range results {
fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments)
}
return
}
fmt.Printf("Error: %v\n", err)
return
}
fmt.Println("\nFinal results:")
for _, r := range results {
fmt.Printf(" - %s: approved=%v, comments=%s\n", r.DocumentID, r.Approved, r.Comments)
}
}
// memoryCheckpointStore is a simple in-memory checkpoint store for demos
type memoryCheckpointStore struct {
mu sync.RWMutex
data map[string][]byte
}
func newMemoryCheckpointStore() *memoryCheckpointStore {
return &memoryCheckpointStore{
data: make(map[string][]byte),
}
}
func (m *memoryCheckpointStore) Get(_ context.Context, id string) ([]byte, bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
data, ok := m.data[id]
return data, ok, nil
}
func (m *memoryCheckpointStore) Set(_ context.Context, id string, data []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
m.data[id] = data
return nil
}
// Scenario 7: Parent Graph with Reduce Node
// Demonstrates:
// - BatchNode as a node in a larger pipeline
// - WithInnerOptions for passing runtime options (callbacks) to inner tasks
// - Reduce pattern: aggregate batch results into a summary report
func runParentGraphWithReduce(ctx context.Context) {
innerWorkflow := createSimpleReviewWorkflow()
batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
Name: "BatchDocumentReviewer",
InnerTask: innerWorkflow,
MaxConcurrency: 3,
InnerCompileOptions: []compose.GraphCompileOption{
compose.WithGraphName("SingleDocReview"),
},
})
parentGraph := compose.NewGraph[BatchReviewInput, ReviewReport]()
// Node 1: Preprocess - extract documents from batch input
_ = parentGraph.AddLambdaNode("preprocess", compose.InvokableLambda(func(ctx context.Context, input BatchReviewInput) ([]ReviewRequest, error) {
fmt.Printf(" Preprocessing batch '%s' with %d documents\n", input.BatchName, len(input.Documents))
return input.Documents, nil
}))
// Node 2: Batch Review - process all documents with progress tracking
_ = parentGraph.AddLambdaNode("batch_review", compose.InvokableLambda(func(ctx context.Context, inputs []ReviewRequest) ([]ReviewResult, error) {
fmt.Printf(" Starting batch review of %d documents\n", len(inputs))
// Progress tracking with atomic counter (thread-safe for concurrent processing)
var completedCount int32
totalCount := int32(len(inputs))
// Create callback for progress tracking
reviewProgressHandler := callbacks.NewHandlerBuilder().
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if info.Component == "Workflow" {
current := atomic.AddInt32(&completedCount, 1)
fmt.Printf(" [Progress] %d/%d documents reviewed\n", current, totalCount)
}
return ctx
}).
Build()
// Use WithInnerOptions to pass callbacks to each inner task
return batchNode.Invoke(ctx, inputs,
batch.WithInnerOptions(compose.WithCallbacks(reviewProgressHandler)),
)
}))
// Node 3: Reduce - aggregate results into a report
_ = parentGraph.AddLambdaNode("reduce", compose.InvokableLambda(func(ctx context.Context, results []ReviewResult) (ReviewReport, error) {
fmt.Printf(" Reducing %d results into report\n", len(results))
report := ReviewReport{
TotalDocuments: len(results),
Results: results,
GeneratedAt: time.Now(),
}
var totalScore float64
for _, r := range results {
totalScore += r.Score
if r.Approved {
report.ApprovedCount++
} else {
report.RejectedCount++
}
}
if len(results) > 0 {
report.AverageScore = totalScore / float64(len(results))
}
return report, nil
}))
// Connect nodes: preprocess -> batch_review -> reduce
_ = parentGraph.AddEdge(compose.START, "preprocess")
_ = parentGraph.AddEdge("preprocess", "batch_review")
_ = parentGraph.AddEdge("batch_review", "reduce")
_ = parentGraph.AddEdge("reduce", compose.END)
runner, err := parentGraph.Compile(ctx, compose.WithGraphName("DocumentReviewPipeline"))
if err != nil {
fmt.Printf("Failed to compile parent graph: %v\n", err)
return
}
input := BatchReviewInput{
Documents: createSampleDocuments(5),
BatchName: "Q4-Compliance-Review",
}
fmt.Println("Running document review pipeline:")
report, err := runner.Invoke(ctx, input)
if err != nil {
fmt.Printf("Pipeline error: %v\n", err)
return
}
fmt.Println("\n=== Review Report ===")
fmt.Printf(" Total Documents: %d\n", report.TotalDocuments)
fmt.Printf(" Approved: %d\n", report.ApprovedCount)
fmt.Printf(" Rejected: %d\n", report.RejectedCount)
fmt.Printf(" Average Score: %.2f\n", report.AverageScore)
fmt.Printf(" Generated At: %s\n", report.GeneratedAt.Format(time.RFC3339))
fmt.Println("\n Individual Results:")
for _, r := range report.Results {
status := "✓"
if !r.Approved {
status = "✗"
}
fmt.Printf(" %s %s (score: %.2f)\n", status, r.DocumentID, r.Score)
}
}