From 8d48cc7e9bc97b8103561fcab50c3eba0271f40d Mon Sep 17 00:00:00 2001 From: Drew Bednar Date: Mon, 9 Jun 2025 21:01:17 -0400 Subject: [PATCH] Working example of enqueing work --- .gitignore | 2 + data/.gitkeep | 0 main.go | 107 ++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 data/.gitkeep diff --git a/.gitignore b/.gitignore index 2157e0e..8d19eae 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ tmp/ # Profiles .local-profile +# Ignore data +data/*.txt diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/main.go b/main.go index 9533d40..1c0cbfe 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,113 @@ package main import ( + "bufio" + "context" "fmt" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "log/slog" + "math/rand" + "os" + "time" ) +// River Worker + +type MessageArgs struct { + Message string `json:"string"` +} + +func (MessageArgs) Kind() string { + return "message" +} + +type MessageWorker struct { + river.WorkerDefaults[MessageArgs] +} + +func InitWorkers() (*river.Workers, error) { + slog.Info("Initializing River workers") + workers := river.NewWorkers() + err := river.AddWorkerSafely(workers, &MessageWorker{}) + if err != nil { + slog.Error("Failed to add workers") + } + return workers, err +} + +func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { + randTime := rand.Intn(10) + fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) + slog.Info("Sleeping to simulate some work", "sleep", randTime) + time.Sleep(time.Second * time.Duration(randTime)) + + err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) + if err != nil { + slog.Error("Failed to save file") + } + slog.Info("Saved message!", "filename", fileName) + return err +} + +// + func main() { - slog.Info("dirp") - fmt.Println("Hello World") - slog.Info("my message", "error", "this is value") + + workers, err := InitWorkers() + if err != nil { + // abort! + panic(err.Error()) + } + + ctx := context.Background() + // create a River client + + dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) + if err != nil { + slog.Error("Error setting up db", "error", err.Error()) + panic(err.Error()) + } + defer dbPool.Close() + + rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Workers: workers, + }) + if err != nil { + slog.Error("Failed to setup river client", "error", err.Error()) + panic(err.Error()) + } + + // In K8s you could separate our the worker process from a HTTP/gRPC service + // by using rc.Enqueue in the server, and rc.Start in the worker + go func() { + slog.Info("Starting river background worker process") + err = rc.Start(ctx) + if err != nil { + slog.Error("Failed to start river background workers", "error", err.Error()) + panic(err.Error()) + } + }() + + // enter the loop + for { + inputReader := bufio.NewReader(os.Stdin) + fmt.Println("Type in a message and I will save it to ./data:") + msg, err := inputReader.ReadString('\n') + if err != nil { + slog.Error("Failed to read message. Try again", "error", err.Error()) + } + + // TODO use rc.InsertTx. See https://riverqueue.com/docs/transactional-enqueueing + _, err = rc.Insert(ctx, MessageArgs{Message: msg}, nil) + if err != nil { + slog.Error("Failed to enqueue message. Sorry", "error", err.Error()) + continue + } + slog.Info("Submitted message for processing", "msg", msg) + } }