package main import ( "bufio" "context" "fmt" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "log/slog" "math/rand" "os" "time" ) // River Worker type MessageArgs struct { Message string `json:"string"` } func (MessageArgs) Kind() string { return "message" } type MessageWorker struct { river.WorkerDefaults[MessageArgs] } func InitWorkers() (*river.Workers, error) { slog.Info("Initializing River workers") workers := river.NewWorkers() err := river.AddWorkerSafely(workers, &MessageWorker{}) if err != nil { slog.Error("Failed to add workers") } return workers, err } func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { randTime := rand.Intn(10) fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) slog.Info("Sleeping to simulate some work", "sleep", randTime) time.Sleep(time.Second * time.Duration(randTime)) err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) if err != nil { slog.Error("Failed to save file") } slog.Info("Saved message!", "filename", fileName) return err } // func main() { workers, err := InitWorkers() if err != nil { // abort! panic(err.Error()) } ctx := context.Background() // create a River client dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) if err != nil { slog.Error("Error setting up db", "error", err.Error()) panic(err.Error()) } defer dbPool.Close() rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { slog.Error("Failed to setup river client", "error", err.Error()) panic(err.Error()) } // In K8s you could separate our the worker process from a HTTP/gRPC service // by using rc.Enqueue in the server, and rc.Start in the worker go func() { slog.Info("Starting river background worker process") err = rc.Start(ctx) if err != nil { slog.Error("Failed to start river background workers", "error", err.Error()) panic(err.Error()) } }() // enter the loop for { inputReader := bufio.NewReader(os.Stdin) fmt.Println("Type in a message and I will save it to ./data:") msg, err := inputReader.ReadString('\n') if err != nil { slog.Error("Failed to read message. Try again", "error", err.Error()) } // TODO use rc.InsertTx. See https://riverqueue.com/docs/transactional-enqueueing _, err = rc.Insert(ctx, MessageArgs{Message: msg}, nil) if err != nil { slog.Error("Failed to enqueue message. Sorry", "error", err.Error()) continue } slog.Info("Submitted message for processing", "msg", msg) } }