Rescueing stuck jobs successfully

master
Drew Bednar 3 weeks ago
parent 8d48cc7e9b
commit 3fc0073dec

@ -8,7 +8,7 @@ import (
"github.com/riverqueue/river" "github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/riverdriver/riverpgxv5"
"log/slog" "log/slog"
"math/rand" // "math/rand"
"os" "os"
"time" "time"
) )
@ -24,46 +24,72 @@ func (MessageArgs) Kind() string {
} }
type MessageWorker struct { type MessageWorker struct {
dbPool *pgxpool.Pool
river.WorkerDefaults[MessageArgs] river.WorkerDefaults[MessageArgs]
} }
func InitWorkers() (*river.Workers, error) { func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) {
slog.Info("Initializing River workers") slog.Info("Initializing River workers")
workers := river.NewWorkers() workers := river.NewWorkers()
err := river.AddWorkerSafely(workers, &MessageWorker{}) // need to specify generic type
err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool})
if err != nil { if err != nil {
slog.Error("Failed to add workers") slog.Error("Failed to add workers")
} }
return workers, err return workers, err
} }
// Old function that does use transactions so when the worker SIGKILL it lost track of itself
//func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error {
// randTime := rand.Intn(10)
// fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix())
// slog.Info("Sleeping to simulate some work", "sleep", randTime)
// time.Sleep(time.Second * time.Duration(randTime))
// err := os.WriteFile(fileName, []byte(job.Args.Message), 0777)
// if err != nil {
// slog.Error("Failed to save file")
// }
// slog.Info("Saved message!", "filename", fileName)
// return err
//}
func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error {
randTime := rand.Intn(10) //https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions
tx, err := w.dbPool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// our logic
//randTime := rand.Intn(10)
randTime := 10
fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix()) fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix())
slog.Info("Sleeping to simulate some work", "sleep", randTime) slog.Info("Sleeping to simulate some work", "sleep", randTime)
time.Sleep(time.Second * time.Duration(randTime)) time.Sleep(time.Second * time.Duration(randTime))
err := os.WriteFile(fileName, []byte(job.Args.Message), 0777) err = os.WriteFile(fileName, []byte(job.Args.Message), 0777)
if err != nil { if err != nil {
slog.Error("Failed to save file") slog.Error("Failed to save file")
} }
slog.Info("Saved message!", "filename", fileName) slog.Info("Saved message!", "filename", fileName)
_, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
if err != nil {
return err return err
}
return tx.Commit(ctx)
} }
// func (w *MessageWorker) Timeout(job *river.Job[MessageArgs]) time.Duration {
return time.Second * 30
}
func main() { func main() {
workers, err := InitWorkers()
if err != nil {
// abort!
panic(err.Error())
}
ctx := context.Background() ctx := context.Background()
// create a River client
dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI"))
if err != nil { if err != nil {
slog.Error("Error setting up db", "error", err.Error()) slog.Error("Error setting up db", "error", err.Error())
@ -71,7 +97,15 @@ func main() {
} }
defer dbPool.Close() defer dbPool.Close()
workers, err := InitWorkers(dbPool)
if err != nil {
// abort!
panic(err.Error())
}
// create a River client
rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
RescueStuckJobsAfter: 1 * time.Minute,
Queues: map[string]river.QueueConfig{ Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100}, river.QueueDefault: {MaxWorkers: 100},
}, },

Loading…
Cancel
Save