You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shentong.martin fb76b1c544 feat(graphtool): add InvokableGraphTool and StreamableGraphTool for wrapping compose types as tools
- Add graphtool package with InvokableGraphTool and StreamableGraphTool
- Support compose.Graph, compose.Chain, and compose.Workflow via Compilable interface
- Full interrupt/resume support with checkpoint persistence
- Add 4 examples demonstrating different use cases:
  - 1_chain_summarize: Document summarization with compose.Chain
  - 2_graph_research: Multi-source research with compose.Graph + streaming
  - 3_workflow_order: Order processing with compose.Workflow + approval
  - 4_nested_interrupt: Nested interrupts (outer approval + inner risk check)
- Update InvokableApprovableTool to support nested interrupts
- Add comprehensive README documentation for package and examples

Change-Id: Ia7cceebb9e25baf6ca4db25b36eddc74eec273cb
4 months ago
..
README.md feat(graphtool): add InvokableGraphTool and StreamableGraphTool for wrapping compose types as tools 4 months ago
main.go feat(graphtool): add InvokableGraphTool and StreamableGraphTool for wrapping compose types as tools 4 months ago

README.md

Example 2: Multi-Source Research with compose.Graph + Streaming

This example demonstrates using StreamableGraphTool with compose.Graph to create a research tool that queries multiple sources in parallel and streams the synthesized results.

What This Example Shows

  • Using compose.Graph with edge-based connections
  • Wrapping a graph as a StreamableTool
  • Parallel execution within a graph node
  • Streaming output with ReturnDirectly
  • ChatModel integration for result synthesis

Architecture

Research Query
      │
      ▼
┌─────────────────────────────────────┐
│         parallel_search             │
│  ┌─────────┬─────────┬──────────┐   │
│  │   Web   │   KB    │  Local   │   │  ← Parallel goroutines
│  │ Search  │ Search  │  Search  │   │
│  └────┬────┴────┬────┴────┬─────┘   │
│       └─────────┼─────────┘         │
└─────────────────┼───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│       prepare_prompt_input          │  ← Format for template
└─────────────────┬───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│          prepare_prompt             │  ← ChatTemplate node
└─────────────────┬───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│           synthesize                │  ← ChatModel (streams output)
└─────────────────┬───────────────────┘
                  │
                  ▼
        Streaming Response

Key Components

Parallel Search Implementation

graph.AddLambdaNode("parallel_search", compose.InvokableLambda(func(ctx context.Context, input *ResearchInput) (*searchResults, error) {
    resultCh := make(chan result, 3)
    
    // Launch parallel searches
    go func() { /* web search */ }()
    go func() { /* KB search */ }()
    go func() { /* local search */ }()
    
    // Collect results
    for i := 0; i < 3; i++ {
        r := <-resultCh
        // aggregate results
    }
    return results, nil
}))

Graph Edge Connections

graph.AddEdge(compose.START, "parallel_search")
graph.AddEdge("parallel_search", "prepare_prompt_input")
graph.AddEdge("prepare_prompt_input", "prepare_prompt")
graph.AddEdge("prepare_prompt", "synthesize")
graph.AddEdge("synthesize", compose.END)

Streaming Tool Creation

tool, err := graphtool.NewStreamableGraphTool[*ResearchInput, *schema.Message](
    graph,
    "research_topic",
    "Research a topic by querying multiple sources...",
)

ReturnDirectly Configuration

agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
    ToolsConfig: adk.ToolsConfig{
        ToolsNodeConfig: compose.ToolsNodeConfig{
            Tools: []tool.BaseTool{researchTool},
        },
        ReturnDirectly: map[string]bool{
            "research_topic": true,  // Stream directly to user
        },
    },
})

Running the Example

# Set your OpenAI API key
export OPENAI_API_KEY=your-api-key

# Run the example
go run main.go

Expected Output

=== Multi-Source Research Example (using compose.Graph + StreamableGraphTool) ===

This example demonstrates:
1. StreamableGraphTool with compose.Graph
2. Parallel search execution within a graph node
3. Streaming output from ChatModel via ReturnDirectly

  [Graph] Starting parallel searches...
  [Graph] Local file search completed
  [Graph] Knowledge base search completed
  [Graph] Web search completed
  [Graph] All searches completed, preparing synthesis...

{"role":"assistant","content":"Based"...}
{"role":"assistant","content":" on"...}
{"role":"assistant","content":" the"...}
... (streaming chunks)

Key Takeaways

  1. Graph for Complex Flows: compose.Graph allows flexible node connections via edges
  2. Parallel Execution: Use goroutines within a node for concurrent operations
  3. Streaming Output: StreamableGraphTool + ReturnDirectly enables real-time streaming to users
  4. Message Output: Output *schema.Message directly for proper streaming chunk handling