package main import ( "bufio" "context" "fmt" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "log/slog" // "math/rand" "os" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/metric" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "net/http" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // River Worker type MessageArgs struct { Message string `json:"string"` } func (MessageArgs) Kind() string { return "message" } type MessageWorker struct { dbPool *pgxpool.Pool river.WorkerDefaults[MessageArgs] } func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) { slog.Info("Initializing River workers") workers := river.NewWorkers() // need to specify generic type err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool}) if err != nil { slog.Error("Failed to add workers") } return workers, err } // Old function that does use transactions so when the worker SIGKILL it lost track of itself //func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { // randTime := rand.Intn(10) // fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) // slog.Info("Sleeping to simulate some work", "sleep", randTime) // time.Sleep(time.Second * time.Duration(randTime)) // err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) // if err != nil { // slog.Error("Failed to save file") // } // slog.Info("Saved message!", "filename", fileName) // return err //} func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { //https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions tx, err := w.dbPool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) // our logic //randTime := rand.Intn(10) randTime := 10 fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) slog.Info("Sleeping to simulate some work", "sleep", randTime) time.Sleep(time.Second * time.Duration(randTime)) err = os.WriteFile(fileName, []byte(job.Args.Message), 0777) if err != nil { slog.Error("Failed to save file") } slog.Info("Saved message!", "filename", fileName) _, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) if err != nil { return err } return tx.Commit(ctx) } func (w *MessageWorker) Timeout(job *river.Job[MessageArgs]) time.Duration { return time.Second * 30 } func setupOTel() (func(context.Context) error, error) { // OTLP gRPC exporters for traces and metrics traceExp, err := otlptracegrpc.New(context.Background(), otlptracegrpc.WithEndpoint("localhost:4317"), otlptracegrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), ) if err != nil { return nil, err } metricExp, err := otlpmetricgrpc.New(context.Background(), otlpmetricgrpc.WithEndpoint("localhost:4317"), otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), ) if err != nil { return nil, err } res, err := resource.New(context.Background(), resource.WithAttributes( semconv.ServiceName("go-background-jobs"), ), ) if err != nil { return nil, err } // Tracer provider tp := trace.NewTracerProvider( trace.WithBatcher(traceExp), trace.WithResource(res), ) otel.SetTracerProvider(tp) // Meter provider mp := metric.NewMeterProvider( metric.WithReader(metric.NewPeriodicReader(metricExp)), metric.WithResource(res), ) otel.SetMeterProvider(mp) return tp.Shutdown, nil } func main() { ctx := context.Background() shutdown, err := setupOTel() if err != nil { panic(err) } defer shutdown(ctx) dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) if err != nil { slog.Error("Error setting up db", "error", err.Error()) panic(err.Error()) } defer dbPool.Close() workers, err := InitWorkers(dbPool) if err != nil { // abort! panic(err.Error()) } // create a River client rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ RescueStuckJobsAfter: 1 * time.Minute, Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { slog.Error("Failed to setup river client", "error", err.Error()) panic(err.Error()) } // In K8s you could separate our the worker process from a HTTP/gRPC service // by using rc.Enqueue in the server, and rc.Start in the worker go func() { slog.Info("Starting river background worker process") err = rc.Start(ctx) if err != nil { slog.Error("Failed to start river background workers", "error", err.Error()) panic(err.Error()) } }() // enter the loop for { inputReader := bufio.NewReader(os.Stdin) fmt.Println("Type in a message and I will save it to ./data:") msg, err := inputReader.ReadString('\n') if err != nil { slog.Error("Failed to read message. Try again", "error", err.Error()) } // TODO use rc.InsertTx. See https://riverqueue.com/docs/transactional-enqueueing _, err = rc.Insert(ctx, MessageArgs{Message: msg}, nil) if err != nil { slog.Error("Failed to enqueue message. Sorry", "error", err.Error()) continue } slog.Info("Submitted message for processing", "msg", msg) } }