Learning how Async Iterator works
parent
0c6f453845
commit
1e05d4e444
@ -0,0 +1,160 @@
|
|||||||
|
# Learn Async Iterator Pattern
|
||||||
|
|
||||||
|
The async iterator pattern is used in the eino-adk for the agent.Run implementation. Since an Agent is simply defined an interface we can implement our own Agent implementation conforming to the interface that will work seemlessly within the eino-adk framework. To better understand how this can be achieved we have to have a solid understanding of what the async iterator pattern is, and why it's used.
|
||||||
|
|
||||||
|
The magic of this pattern lies in the use of goroutines, channels, blocking reads, and generics. This pattern is usually useful when work takes time, streaming matters, the results arrive incrementally, and/or you don't want to wait for everything before doing some kind of work with the initial results. So this can include websocket events, logs, filesystem walks, DB cursor iteration, senor readings, progress updates, and of course LLM token streaming.
|
||||||
|
|
||||||
|
## Eino's use case
|
||||||
|
|
||||||
|
In Eino they describe the async iterator as allowing callers to consume a series of AgentEvents produced by the Agent (it's internal Chatmodel component actually) in an ordered blocking manner.
|
||||||
|
|
||||||
|
We can start with a mental model of one goroutine producing values over time and another goroutine consuming them one-by-one. The iterator object is what sits between these two threads and serves as a buffer between them. This let's us progressively capture emitted values as they become available.
|
||||||
|
|
||||||
|
## A Non-agent example
|
||||||
|
|
||||||
|
```go
|
||||||
|
type AsyncIterator[T any] struct {
|
||||||
|
ch chan T
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Here we define what an AsyncIterator, which is simply a struct that stores a channel. This channel carries values of Type T. Since this is defined as a generic we can later use this same definition for multiple types.
|
||||||
|
|
||||||
|
```go
|
||||||
|
AsyncIterator[int]
|
||||||
|
AsyncIterator[string]
|
||||||
|
AsyncIterator[User]
|
||||||
|
```
|
||||||
|
|
||||||
|
Like the Eino framework, we will implement a receiver function called Next() that produces the next value from it's internal Channel. This blocks until another value arrives on the channel or the channel closes. If ok is true a new value was produced, if it's false the channel has been closed, and iteration is over.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func (it *AysncIterator[T]) Next() (T, bool){
|
||||||
|
v, ok := <-it.ch
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
```go
|
||||||
|
func CountToFive() *AsyncIterator[int] {
|
||||||
|
it := &AsyncIterator[int]{
|
||||||
|
ch: make(chan int),
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(it.ch)
|
||||||
|
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
it.ch <- i
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return it
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This launches a goroutine which functions as the value producer running concurrently to the main thread. The function immediately returns the iterator, which is the "async" part of the flow.
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AsyncIterator[T any] struct {
|
||||||
|
ch chan T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *AsyncIterator[T]) Next() (T, bool) {
|
||||||
|
v, ok := <-it.ch
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func CountToFive() *AsyncIterator[int] {
|
||||||
|
it := &AsyncIterator[int]{
|
||||||
|
ch: make(chan int),
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(it.ch)
|
||||||
|
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
it.ch <- i
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return it
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
iter := CountToFive()
|
||||||
|
|
||||||
|
for {
|
||||||
|
value, ok := iter.Next()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fmt.Println(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
When you run this, Each value of the iterator is printed to the terminal with a one second delay between values to simulate the blocking nature of the iterators channel communication.
|
||||||
|
|
||||||
|
This is pattern is have Eino handles AgentEvents, and a `handler()` is called within the iteration loop to perform additional operations. You will notice that, this is simply a wrapper over top of channels. This gives a cleaner api, uses generic composition, hides channel implementation details, and resembles iterators from other languages.
|
||||||
|
|
||||||
|
## Not the modern stle of iteration
|
||||||
|
|
||||||
|
Eino's async iterator pattern is a more traditional style of iterator. It's important to know that in golang 1.23+ a Range-over-Func implementation wass added that allows us to use a custom object in a loop. In order to use this feature the function called in the loop has to match one of these signatures.
|
||||||
|
|
||||||
|
|Type | Signature | Description |
|
||||||
|
|-------------|-----------------------------|-------------------------------------------------|
|
||||||
|
|Single Value | func(yield func(V) bool) | Yields one value (like a slice index or value). |
|
||||||
|
|Dual Value | func(yield func(K, V) bool) | Yields two values (like a map's key and value). |
|
||||||
|
|No Value | func(yield func() bool) | Just iterates (useful for simple loops). |
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
```go
|
||||||
|
func CountToFive(yield func(int) bool) {
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if !yield(i) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
These are very different internally. The Eino style AsyncIterator is one where the consumer pulls ("Next value please"). The Range-over-Func is a Producer push `yield(value)` and the Golang runtime transforms the syntax into a callback-style control flow. The Range-over-Func style is usually Synchronous, as you can see there is no goroutine call in that implementation, so everything happens within one call stack. You can make a async iterator with Range-over-Func.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func AsyncCountToFive(yield func(int) bool) {
|
||||||
|
ch := make(chan int)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
ch <- i
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for v := range ch {
|
||||||
|
if !yield(v) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for v:= range AsyncCountToFive {
|
||||||
|
fmt.Println(v)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
As you can see this doesn't get you much because you are still using a channel and a goroutine, and you actually lose a lot of control when compared to the more traditional method. Libraries will typically still use the traditional pattern because it gives them explicit blocking semantics, easier cancelation handling, easier propagation, compatiability with older go versions, more control over the life. While these too implementation feel similar then are not equivalent because they are different protocols.
|
||||||
|
|
||||||
|
Range-over-Func is a "Call this callback for every value" model, where the Eino style is waits until the next value becomes available. It's the waiting capability that makes the traditional async iterator pattern powerful.
|
||||||
@ -0,0 +1,3 @@
|
|||||||
|
module asynciterator
|
||||||
|
|
||||||
|
go 1.26.1
|
||||||
@ -0,0 +1,61 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AsyncIterator[T any] struct {
|
||||||
|
ch chan T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *AsyncIterator[T]) Next() (T, bool) {
|
||||||
|
v, ok := <-it.ch
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func CountToFive() *AsyncIterator[int] {
|
||||||
|
it := &AsyncIterator[int]{
|
||||||
|
ch: make(chan int),
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(it.ch)
|
||||||
|
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
it.ch <- i
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return it
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModernCountToFive is an implementation of the Range-over-Func pattern of iterator.
|
||||||
|
// This was introduced in a golang 1.23+
|
||||||
|
func ModernCountToFive(yield func(int) bool) {
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if !yield(i) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
iter := CountToFive()
|
||||||
|
|
||||||
|
fmt.Println("Traditional (Eino style) iterator.")
|
||||||
|
for {
|
||||||
|
value, ok := iter.Next()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fmt.Println(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Modern Range-over-Func iterator.")
|
||||||
|
for i := range ModernCountToFive {
|
||||||
|
fmt.Println(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue