You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
225 lines
5.8 KiB
Go
225 lines
5.8 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/riverqueue/river"
|
|
"github.com/riverqueue/river/riverdriver/riverpgxv5"
|
|
"log/slog"
|
|
// "math/rand"
|
|
"github.com/riverqueue/river/rivertype"
|
|
"github.com/riverqueue/rivercontrib/otelriver"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/sdk/metric"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
"go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
// River Worker
|
|
|
|
type MessageArgs struct {
|
|
Message string `json:"string"`
|
|
}
|
|
|
|
func (MessageArgs) Kind() string {
|
|
return "message"
|
|
}
|
|
|
|
type MessageWorker struct {
|
|
dbPool *pgxpool.Pool
|
|
Writer func(filename string, data []byte, perm os.FileMode) error
|
|
river.WorkerDefaults[MessageArgs]
|
|
}
|
|
|
|
func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) {
|
|
slog.Info("Initializing River workers")
|
|
workers := river.NewWorkers()
|
|
// need to specify generic type
|
|
err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool, Writer: nil})
|
|
if err != nil {
|
|
slog.Error("Failed to add workers")
|
|
}
|
|
return workers, err
|
|
}
|
|
|
|
// Old function that does use transactions so when the worker SIGKILL it lost track of itself
|
|
//func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error {
|
|
// randTime := rand.Intn(10)
|
|
// fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix())
|
|
// slog.Info("Sleeping to simulate some work", "sleep", randTime)
|
|
// time.Sleep(time.Second * time.Duration(randTime))
|
|
|
|
// err := os.WriteFile(fileName, []byte(job.Args.Message), 0777)
|
|
// if err != nil {
|
|
// slog.Error("Failed to save file")
|
|
// }
|
|
// slog.Info("Saved message!", "filename", fileName)
|
|
// return err
|
|
//}
|
|
|
|
func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error {
|
|
if w.dbPool == nil {
|
|
// For unit tests: just call the writer and return
|
|
writer := w.Writer
|
|
if writer == nil {
|
|
writer = os.WriteFile
|
|
}
|
|
return writer("testfile.txt", []byte(job.Args.Message), 0777)
|
|
}
|
|
//https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions
|
|
tx, err := w.dbPool.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
// our logic
|
|
//randTime := rand.Intn(10)
|
|
randTime := 10
|
|
fileName := fmt.Sprintf("./data/msg-%d.txt", time.Now().Unix())
|
|
slog.Info("Sleeping to simulate some work", "sleep", randTime)
|
|
time.Sleep(time.Second * time.Duration(randTime))
|
|
|
|
writer := w.Writer
|
|
if writer == nil {
|
|
writer = os.WriteFile
|
|
}
|
|
err = writer(fileName, []byte(job.Args.Message), 0777)
|
|
if err != nil {
|
|
slog.Error("Failed to save file")
|
|
}
|
|
slog.Info("Saved message!", "filename", fileName)
|
|
|
|
_, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
func (w *MessageWorker) Timeout(job *river.Job[MessageArgs]) time.Duration {
|
|
return time.Second * 30
|
|
}
|
|
|
|
func setupOTel() (func(context.Context) error, error) {
|
|
// OTLP gRPC exporters for traces and metrics
|
|
traceExp, err := otlptracegrpc.New(context.Background(),
|
|
otlptracegrpc.WithEndpoint("localhost:4317"),
|
|
otlptracegrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
metricExp, err := otlpmetricgrpc.New(context.Background(),
|
|
otlpmetricgrpc.WithEndpoint("localhost:4317"),
|
|
otlpmetricgrpc.WithDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := resource.New(context.Background(),
|
|
resource.WithAttributes(
|
|
semconv.ServiceName("go-background-jobs"),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Tracer provider
|
|
tp := trace.NewTracerProvider(
|
|
trace.WithBatcher(traceExp),
|
|
trace.WithResource(res),
|
|
)
|
|
otel.SetTracerProvider(tp)
|
|
|
|
// Meter provider
|
|
mp := metric.NewMeterProvider(
|
|
metric.WithReader(metric.NewPeriodicReader(metricExp)),
|
|
metric.WithResource(res),
|
|
)
|
|
otel.SetMeterProvider(mp)
|
|
|
|
return tp.Shutdown, nil
|
|
}
|
|
|
|
func main() {
|
|
ctx := context.Background()
|
|
|
|
shutdown, err := setupOTel()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer shutdown(ctx)
|
|
|
|
dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI"))
|
|
if err != nil {
|
|
slog.Error("Error setting up db", "error", err.Error())
|
|
panic(err.Error())
|
|
}
|
|
defer dbPool.Close()
|
|
|
|
workers, err := InitWorkers(dbPool)
|
|
if err != nil {
|
|
// abort!
|
|
panic(err.Error())
|
|
}
|
|
|
|
// create a River client
|
|
rc, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
|
|
RescueStuckJobsAfter: 1 * time.Minute,
|
|
Queues: map[string]river.QueueConfig{
|
|
river.QueueDefault: {MaxWorkers: 100},
|
|
},
|
|
Workers: workers,
|
|
Middleware: []rivertype.Middleware{
|
|
otelriver.NewMiddleware(nil),
|
|
},
|
|
})
|
|
if err != nil {
|
|
slog.Error("Failed to setup river client", "error", err.Error())
|
|
panic(err.Error())
|
|
}
|
|
|
|
// In K8s you could separate our the worker process from a HTTP/gRPC service
|
|
// by using rc.Enqueue in the server, and rc.Start in the worker
|
|
go func() {
|
|
slog.Info("Starting river background worker process")
|
|
err = rc.Start(ctx)
|
|
if err != nil {
|
|
slog.Error("Failed to start river background workers", "error", err.Error())
|
|
panic(err.Error())
|
|
}
|
|
}()
|
|
|
|
// enter the loop
|
|
for {
|
|
inputReader := bufio.NewReader(os.Stdin)
|
|
fmt.Println("Type in a message and I will save it to ./data:")
|
|
msg, err := inputReader.ReadString('\n')
|
|
if err != nil {
|
|
slog.Error("Failed to read message. Try again", "error", err.Error())
|
|
}
|
|
|
|
// TODO use rc.InsertTx. See https://riverqueue.com/docs/transactional-enqueueing
|
|
_, err = rc.Insert(ctx, MessageArgs{Message: msg}, nil)
|
|
if err != nil {
|
|
slog.Error("Failed to enqueue message. Sorry", "error", err.Error())
|
|
continue
|
|
}
|
|
slog.Info("Submitted message for processing", "msg", msg)
|
|
}
|
|
}
|