diff --git a/main.go b/main.go index 1c0cbfe..22b9217 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,7 @@ import ( "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "log/slog" - "math/rand" + // "math/rand" "os" "time" ) @@ -24,46 +24,72 @@ func (MessageArgs) Kind() string { } type MessageWorker struct { + dbPool *pgxpool.Pool river.WorkerDefaults[MessageArgs] } -func InitWorkers() (*river.Workers, error) { +func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) { slog.Info("Initializing River workers") workers := river.NewWorkers() - err := river.AddWorkerSafely(workers, &MessageWorker{}) + // need to specify generic type + err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool}) if err != nil { slog.Error("Failed to add workers") } return workers, err } +// Old function that does use transactions so when the worker SIGKILL it lost track of itself +//func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { +// randTime := rand.Intn(10) +// fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) +// slog.Info("Sleeping to simulate some work", "sleep", randTime) +// time.Sleep(time.Second * time.Duration(randTime)) + +// err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) +// if err != nil { +// slog.Error("Failed to save file") +// } +// slog.Info("Saved message!", "filename", fileName) +// return err +//} + func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { - randTime := rand.Intn(10) + //https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions + tx, err := w.dbPool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + // our logic + //randTime := rand.Intn(10) + randTime := 10 fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) slog.Info("Sleeping to simulate some work", "sleep", randTime) time.Sleep(time.Second * time.Duration(randTime)) - err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) + err = os.WriteFile(fileName, []byte(job.Args.Message), 0777) if err != nil { slog.Error("Failed to save file") } slog.Info("Saved message!", "filename", fileName) - return err + + _, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) + if err != nil { + return err + } + + return tx.Commit(ctx) } -// +func (w *MessageWorker) Timeout(job *river.Job[MessageArgs]) time.Duration { + return time.Second * 30 +} func main() { - workers, err := InitWorkers() - if err != nil { - // abort! - panic(err.Error()) - } - ctx := context.Background() - // create a River client - dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) if err != nil { slog.Error("Error setting up db", "error", err.Error()) @@ -71,7 +97,15 @@ func main() { } defer dbPool.Close() + workers, err := InitWorkers(dbPool) + if err != nil { + // abort! + panic(err.Error()) + } + + // create a River client rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + RescueStuckJobsAfter: 1 * time.Minute, Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, },