Compare commits

...

3 Commits

@ -17,3 +17,11 @@ river-migrate:
db-connect: db-connect:
psql $(PG_URI) psql $(PG_URI)
.PHONY: db-connect .PHONY: db-connect
local-traces:
xdg-open http://localhost:16686
.PHONY: local-traces
local-metrics:
xdg-open http://localhost:9090
.PHONY: local-metrics

@ -0,0 +1,23 @@
receivers:
otlp:
protocols:
grpc:
http:
exporters:
otlp:
endpoint: "jaeger:4317"
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "go_background_jobs"
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp]
metrics:
receivers: [otlp]
exporters: [prometheus]

@ -0,0 +1,11 @@
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8889']
# Add additional targets for your Go app metrics endpoint as needed, e.g. 'host.docker.internal:PORT' or 'app:PORT'

@ -13,3 +13,36 @@ services:
volumes: volumes:
- ./.pg_data:/var/lib/postgresql/data - ./.pg_data:/var/lib/postgresql/data
user: "${UID}:${GID}" user: "${UID}:${GID}"
river-ui:
image: ghcr.io/riverqueue/riverui:latest
environment:
- DATABASE_URL=postgresql://myuser:mypassword@postgres:5432/mydatabase
ports:
- "6543:8080"
depends_on:
- postgres
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "14268:14268" # Jaeger collector (legacy)
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./data/prometheus.yml:/etc/prometheus/prometheus.yml
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./data/otel-collector-config.yaml:/etc/otel-collector-config.yaml
depends_on:
- jaeger
- prometheus

@ -3,25 +3,50 @@ module goback
go 1.24.1 go 1.24.1
require ( require (
github.com/jackc/pgx/v5 v5.7.5
github.com/riverqueue/river v0.23.1
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1
github.com/riverqueue/river/rivertype v0.23.1
github.com/riverqueue/rivercontrib/otelriver v0.5.0
go.opentelemetry.io/otel v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0
go.opentelemetry.io/otel/sdk v1.36.0
go.opentelemetry.io/otel/sdk/metric v1.36.0
google.golang.org/grpc v1.73.0
)
require (
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.5 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river v0.23.1 // indirect
github.com/riverqueue/river/riverdriver v0.23.1 // indirect github.com/riverqueue/river/riverdriver v0.23.1 // indirect
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1 // indirect
github.com/riverqueue/river/rivershared v0.23.1 // indirect github.com/riverqueue/river/rivershared v0.23.1 // indirect
github.com/riverqueue/river/rivertype v0.23.1 // indirect
github.com/stretchr/testify v1.10.0 // indirect github.com/stretchr/testify v1.10.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect github.com/tidwall/sjson v1.2.5 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
go.uber.org/goleak v1.3.0 // indirect go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.37.0 // indirect golang.org/x/crypto v0.39.0 // indirect
golang.org/x/sync v0.14.0 // indirect golang.org/x/net v0.40.0 // indirect
golang.org/x/text v0.25.0 // indirect golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

@ -1,6 +1,25 @@
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@ -9,18 +28,40 @@ github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs=
github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/riverqueue/river v0.23.1 h1:/iwpDJ4ypgoVNMDDtQ7PYUKQd+lk6z414fGmp3nei84= github.com/riverqueue/river v0.23.1 h1:/iwpDJ4ypgoVNMDDtQ7PYUKQd+lk6z414fGmp3nei84=
github.com/riverqueue/river v0.23.1/go.mod h1:+02PXpjXtHnV5QzARe9BfltC52Kcm8y+BzaD6s6a2J4= github.com/riverqueue/river v0.23.1/go.mod h1:+02PXpjXtHnV5QzARe9BfltC52Kcm8y+BzaD6s6a2J4=
github.com/riverqueue/river/riverdriver v0.23.1 h1:KG7uUg2l2TWsPGcDfYD3U2ZAHXnZ/iZNH+JT0LjOq20= github.com/riverqueue/river/riverdriver v0.23.1 h1:KG7uUg2l2TWsPGcDfYD3U2ZAHXnZ/iZNH+JT0LjOq20=
github.com/riverqueue/river/riverdriver v0.23.1/go.mod h1:GN3r8XgDN/YwY1mudkPdrtyFTE3Pq/AMKrUePlcH0Uc= github.com/riverqueue/river/riverdriver v0.23.1/go.mod h1:GN3r8XgDN/YwY1mudkPdrtyFTE3Pq/AMKrUePlcH0Uc=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.23.1 h1:WIVKfmyprocrZfSjtM5lNNu+Hul+r64HHoR1CEbQ1g0=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.23.1/go.mod h1:v9OaTsxzr52ZCjGdfsaV5OIIQL84fcFuENQzaVRV5gI=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1 h1:hztWRKCHcsf9jkSjBCfQ6FQgoKoCtmd6A8EualE4ZEk= github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1 h1:hztWRKCHcsf9jkSjBCfQ6FQgoKoCtmd6A8EualE4ZEk=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1/go.mod h1:Wn8rY1a3a4I5nvskpebNK+LCkkopVFTUNPW9UklW02g= github.com/riverqueue/river/riverdriver/riverpgxv5 v0.23.1/go.mod h1:Wn8rY1a3a4I5nvskpebNK+LCkkopVFTUNPW9UklW02g=
github.com/riverqueue/river/riverdriver/riversqlite v0.23.1 h1:v5onNmGdbsmyQAIYG3I77/RG8wkfrAOrLRTAJ8vXgNU=
github.com/riverqueue/river/riverdriver/riversqlite v0.23.1/go.mod h1:yRc5N+kod5r4oIvHSK9GNDddP13zm1/VEFwG55pYhO8=
github.com/riverqueue/river/rivershared v0.23.1 h1:ZC6ybv5KguD/mpLkaXrtUCES6FyKbGsavk25YNJdp0s= github.com/riverqueue/river/rivershared v0.23.1 h1:ZC6ybv5KguD/mpLkaXrtUCES6FyKbGsavk25YNJdp0s=
github.com/riverqueue/river/rivershared v0.23.1/go.mod h1:8/jFVQNfUesv5y+qQZ55XULMCOdM5yj9F4MG7/UA8LA= github.com/riverqueue/river/rivershared v0.23.1/go.mod h1:8/jFVQNfUesv5y+qQZ55XULMCOdM5yj9F4MG7/UA8LA=
github.com/riverqueue/river/rivertype v0.23.1 h1:vaIIm54BVzvy2iXT/iP7isIPSv2k99DElJNI6hWQ1lc= github.com/riverqueue/river/rivertype v0.23.1 h1:vaIIm54BVzvy2iXT/iP7isIPSv2k99DElJNI6hWQ1lc=
github.com/riverqueue/river/rivertype v0.23.1/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs= github.com/riverqueue/river/rivertype v0.23.1/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
github.com/riverqueue/rivercontrib/otelriver v0.5.0 h1:dZF4Fy7/3RaIRsyCPdpIJtzEip0pCvoJ44YpSDum8e4=
github.com/riverqueue/rivercontrib/otelriver v0.5.0/go.mod h1:rXANcBrlgRvg+auD3/O6Xfs59AWeWNpa/kim62mkxGo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -36,15 +77,59 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0 h1:gAU726w9J8fwr4qRDqu1GYMNNs4gXrU+Pv20/N1UpB4=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0/go.mod h1:RboSDkp7N292rgu+T0MgVt2qgFGu6qa1RpZDOtpL76w=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 h1:dNzwXjZKpMpE2JhmO+9HsPl42NIXFIFSUSSs0fiqra0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0/go.mod h1:90PoxvaEB5n6AOdZvi+yWJQoE95U8Dhhw2bSyRqnTD0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 h1:nRVXXvf78e00EwY6Wp0YII8ww2JVWshZ20HfTlE11AM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0/go.mod h1:r49hO7CgrxY9Voaj3Xe8pANWtr0Oq916d0XAmOoCZAQ=
go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE=
go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.opentelemetry.io/proto/otlp v1.6.0 h1:jQjP+AQyTf+Fe7OKj/MfkDrmK4MNVtw2NpXsf9fefDI=
go.opentelemetry.io/proto/otlp v1.6.0/go.mod h1:cicgGehlFuNdgZkcALOCh3VE6K/u2tAjzlRhDwmVpZc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 h1:R84qjqJb5nVJMxqWYb3np9L5ZsaDtB+a39EqjV0JSUM=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0/go.mod h1:S9Xr4PYopiDyqSyp5NjCrhFrqg6A5zA2E/iPHPhqnS8=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 h1:Kog3KlB4xevJlAcbbbzPfRG0+X9fdoGM+UBRKVz6Wr0=
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237/go.mod h1:ezi0AVyMKDWy5xAncvjLWH7UcLBB5n7y2fQ8MzjJcto=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 h1:cJfm9zPbe1e873mHJzmQ1nwVEeRDU/T1wXDK2kUSU34=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/libc v1.64.0 h1:U0k8BD2d3cD3e9I8RLcZgJBHAcsJzbXx5mKGSb5pyJA=
modernc.org/libc v1.64.0/go.mod h1:7m9VzGq7APssBTydds2zBcxGREwvIGpuUBaKTXdm2Qs=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.10.0 h1:fzumd51yQ1DxcOxSO+S6X7+QTuVU+n8/Aj7swYjFfC4=
modernc.org/memory v1.10.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=

@ -9,6 +9,15 @@ import (
"github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/riverdriver/riverpgxv5"
"log/slog" "log/slog"
// "math/rand" // "math/rand"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/rivercontrib/otelriver"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"os" "os"
"time" "time"
) )
@ -25,6 +34,7 @@ func (MessageArgs) Kind() string {
type MessageWorker struct { type MessageWorker struct {
dbPool *pgxpool.Pool dbPool *pgxpool.Pool
Writer func(filename string, data []byte, perm os.FileMode) error
river.WorkerDefaults[MessageArgs] river.WorkerDefaults[MessageArgs]
} }
@ -32,7 +42,7 @@ func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) {
slog.Info("Initializing River workers") slog.Info("Initializing River workers")
workers := river.NewWorkers() workers := river.NewWorkers()
// need to specify generic type // need to specify generic type
err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool}) err := river.AddWorkerSafely[MessageArgs](workers, &MessageWorker{dbPool: pool, Writer: nil})
if err != nil { if err != nil {
slog.Error("Failed to add workers") slog.Error("Failed to add workers")
} }
@ -55,6 +65,14 @@ func InitWorkers(pool *pgxpool.Pool) (*river.Workers, error) {
//} //}
func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error { func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) error {
if w.dbPool == nil {
// For unit tests: just call the writer and return
writer := w.Writer
if writer == nil {
writer = os.WriteFile
}
return writer("testfile.txt", []byte(job.Args.Message), 0777)
}
//https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions //https://riverqueue.com/docs/reliable-workers#idempotency-with-transactions
tx, err := w.dbPool.Begin(ctx) tx, err := w.dbPool.Begin(ctx)
if err != nil { if err != nil {
@ -69,7 +87,11 @@ func (w *MessageWorker) Work(ctx context.Context, job *river.Job[MessageArgs]) e
slog.Info("Sleeping to simulate some work", "sleep", randTime) slog.Info("Sleeping to simulate some work", "sleep", randTime)
time.Sleep(time.Second * time.Duration(randTime)) time.Sleep(time.Second * time.Duration(randTime))
err = os.WriteFile(fileName, []byte(job.Args.Message), 0777) writer := w.Writer
if writer == nil {
writer = os.WriteFile
}
err = writer(fileName, []byte(job.Args.Message), 0777)
if err != nil { if err != nil {
slog.Error("Failed to save file") slog.Error("Failed to save file")
} }
@ -87,9 +109,57 @@ func (w *MessageWorker) Timeout(job *river.Job[MessageArgs]) time.Duration {
return time.Second * 30 return time.Second * 30
} }
func main() { func setupOTel() (func(context.Context) error, error) {
// OTLP HTTP exporters for traces and metrics
traceExp, err := otlptracehttp.New(context.Background(),
otlptracehttp.WithEndpoint("0.0.0.0:4318"),
)
if err != nil {
return nil, err
}
metricExp, err := otlpmetrichttp.New(context.Background(),
otlpmetrichttp.WithEndpoint("0.0.0.0:4318"),
)
if err != nil {
return nil, err
}
res, err := resource.New(context.Background(),
resource.WithAttributes(
semconv.ServiceName("go-background-jobs"),
),
)
if err != nil {
return nil, err
}
// Tracer provider
tp := trace.NewTracerProvider(
trace.WithBatcher(traceExp),
trace.WithResource(res),
)
otel.SetTracerProvider(tp)
// Meter provider
mp := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(metricExp)),
metric.WithResource(res),
)
otel.SetMeterProvider(mp)
return tp.Shutdown, nil
}
func main() {
ctx := context.Background() ctx := context.Background()
shutdown, err := setupOTel()
if err != nil {
panic(err)
}
defer shutdown(ctx)
dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI")) dbPool, err := pgxpool.New(ctx, os.Getenv("PG_URI"))
if err != nil { if err != nil {
slog.Error("Error setting up db", "error", err.Error()) slog.Error("Error setting up db", "error", err.Error())
@ -110,6 +180,9 @@ func main() {
river.QueueDefault: {MaxWorkers: 100}, river.QueueDefault: {MaxWorkers: 100},
}, },
Workers: workers, Workers: workers,
Middleware: []rivertype.Middleware{
otelriver.NewMiddleware(nil),
},
}) })
if err != nil { if err != nil {
slog.Error("Failed to setup river client", "error", err.Error()) slog.Error("Failed to setup river client", "error", err.Error())

@ -0,0 +1,72 @@
package main
import (
"context"
"errors"
"github.com/riverqueue/river"
"os"
"testing"
)
type mockWriter struct {
called bool
filename string
data []byte
perm uint32
fail bool
}
func (m *mockWriter) WriteFile(filename string, data []byte, perm uint32) error {
m.called = true
m.filename = filename
m.data = data
m.perm = perm
if m.fail {
return errors.New("mock write error")
}
return nil
}
func TestMessageWorker_Work_Success(t *testing.T) {
called := false
mw := &MessageWorker{
dbPool: nil, // not used in this test
Writer: func(filename string, data []byte, perm os.FileMode) error {
called = true
if string(data) != "hello test" {
t.Errorf("expected message 'hello test', got '%s'", string(data))
}
return nil
},
}
job := &river.Job[MessageArgs]{
Args: MessageArgs{Message: "hello test"},
}
// The DB logic will fail, so we expect an error
ctx := context.Background()
// Call Work and expect error due to nil dbPool
err := mw.Work(ctx, job)
if err != nil {
t.Errorf("expected no error, got: %v", err)
}
if !called {
t.Error("expected writer to be called")
}
}
func TestMessageWorker_Work_WriterError(t *testing.T) {
mw := &MessageWorker{
dbPool: nil,
Writer: func(filename string, data []byte, perm os.FileMode) error {
return errors.New("fail")
},
}
job := &river.Job[MessageArgs]{
Args: MessageArgs{Message: "fail test"},
}
ctx := context.Background()
err := mw.Work(ctx, job)
if err == nil {
t.Error("expected error due to writer fail, got nil")
}
}
Loading…
Cancel
Save