vgarvardt / gue Goto Github PK
View Code? Open in Web Editor NEWGolang queue on top of PostgreSQL
License: MIT License
Golang queue on top of PostgreSQL
License: MIT License
Can't use more than 5 workers
Can't use more than 5 workers. Gue client is blocked for unknown reasons
I am researching que systems and this looks really promising but I was wondering how can i add context to a given worker, at minimum i need to get ids numeric id, but beeing able to provide it with aditional context would be nice aswell.
Apparently otel removed some of their previously-working import paths (open-telemetry/opentelemetry-go#2897), so we might need to bump their versions in the v4 branch.
$ go get -u github.com/vgarvardt/gue/v4
go: github.com/vgarvardt/gue/v4 imports
go.opentelemetry.io/otel/metric/instrument: cannot find module providing package go.opentelemetry.io/otel/metric/instrument
go: github.com/vgarvardt/gue/v4 imports
go.opentelemetry.io/otel/metric/instrument/syncint64: cannot find module providing package go.opentelemetry.io/otel/metric/instrument/syncint64
go: github.com/vgarvardt/gue/v4 imports
go.opentelemetry.io/otel/metric/unit: cannot find module providing package go.opentelemetry.io/otel/metric/unit
Actually, I have a code that creates some async jobs but i need to report that the queue item is done, in the job itself. which is a completely separated context.
Can this be achieved?
A pattern I find myself doing is to have one job that is done at a given interval that creates lots of other jobs. Right now I am doing this via the gueClient but was wonder if there is a way to do this from a job so that it can run in the same transaction.
In case a worker runs a WorkFunc
that returns an error, hooksJobDone
will not run within the same transaction because calling op.Error()
defers a call to op.Done()
which commits the transaction, see line 232 in worker.go
.
Line 232 in 859d3f2
One solution would be to remove the defer job.Done()
from job.Error()
, since worker.go
already defers j.Done()
:
A Second backwards-compatible solution would be to add a alwaysDone
flag to Job
type which will conditionally run the defer block in
Line 133 in 859d3f2
The example hook in the readme will break if the WorkerFunc
returns an error.
After updating from v5.0.1 to v5.1.0 I have the following error:
# github.com/vgarvardt/gue/v5
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/client.go:218:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/client.go:226:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/worker.go:276:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/worker.go:284:23: cannot use unit.Milliseconds (constant "ms" of type unit.Unit) as string value in argument to instrument.WithUnit
Also these packages affect this error
before (no errors)
go.opentelemetry.io/otel v1.13.0 // indirect
go.opentelemetry.io/otel/metric v0.36.0 // indirect
go.opentelemetry.io/otel/trace v1.13.0 // indirect
after (there is an error)
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
Hi there,
I have this workers pool set
workers, err := gue.NewWorkerPool(
gc,
wm,
10,
gue.WithPoolQueue(printerQueue),
gue.WithPoolPollInterval(time.Minute*5),
gue.WithPoolLogger(azap.New(zap.NewExample())),
gue.WithPoolPollStrategy(gue.PriorityPollStrategy),
gue.WithPoolHooksJobDone(finishedJobsLog),
)
My impression was that gue will process any added messages right away, based on priority, but it looks like it's waiting for 5 minutes poll interval
Is this an expected behavior?
My use case is that I want to process newly added jobs right away and make them retry in 5 min if they fail.
Thanks
Worker failed to lock a job level=error worker-pool-id=959ce5 worker-id=959ce5/worker-0 error=could not lock a job (rollback result: ): ERROR: relation "gue_jobs" does not exist (SQLSTATE 42P01)
Have you considered supporting dependent jobs? I have not digged deep into the scheduler code of this project yet, but since there are configuration options to not run until a given time or run again on error I should belive it would be possible to add an option to
My main use case for this is creating a workflow like this:
Job V1: Read a validate config.
Job P1-P3: In paralell run 3 different tasks
Job G1: Wait until P1-3 are done, generate a job list of hundres of jobs R
Job R1-Rx: Run all the jobs
Job P4: Wait until all R are done. Run another job
Job G2: Wait until P4 is done. Generate yet more hundres of jobs of a different type S
Job S1-Sx: Run all the hundres of jobs
Job P5: wait until all S are done: Run a final job
In this case the queue for P, R, S is the same but the job type is different. Infact the job type for each P is different. (This queue runs transactions and need identity on the worker) the queue for G,V is the same and they only generate items do not do blockchain work.
All this is possible to just handle without support from the framework but it would results in lots of errors since for each time you want to schedule a job that is yet to run you have to Return an error, or I guess adjust it to check again in a couple of minutes. I approximately know how long each task takes here.
I would like to use the changes commited in this PR in my Go application. It would be very nice if a new release can be made. When is the next release scheduled?
Hi,
first of all thanks for this project, it's exactly what I was looking for.
I'm currently playing around with it and saw that finished jobs are removed afterwards from the table. Have you thought of adding an option to keep finished jobs in the database to get some sort of log?
robin
Currently the library forces a Job's Args to be encoded as valid JSON by storing it in JSON field in the database table and casting it to json.RawMessage in
Line 100 in 667e4cb
But in my case I would like to transport the Job args as a Protobuf message (for forward compatibility). The database field is easy to change of course because I run the migration myself. But I currently can't change the json.RawMessage cast.
It would make this project more flexible if developers could bring their own encoding format. Maybe just as option on the Client with a default set to using JSON?
let me know if this would be interesting to you an I can probably setup a PR and discuss it a bit further.
Any idea how to prevent enqueuing same job?
Maybe like if a job with same args exists More then 3 then we can prevent enqueuing it again?
I'm getting a lot of logs of this type.
Worker failed to lock a job level=error cannot set transaction read-write mode during recovery.
do you know what is this?
Hi, I want to persist the start and end of a job in the same database row in a history table. For this I am using the hooks and can record the end time of jobs that return a normal error just fine.
Having jobs that panic does not work as expected, from the best I can tell the hooks registered with WithPoolHooksJobDone
are not called at all in this case.
Version 5.5.0
Hooks registered via WithPoolHooksJobLocked
are executed though.
Hello,
I tried the example merged in #98 but i get an error when i run the client task. After starting the client and putting messages in the db i run the task client
comamnd
I get the following error:
task worker
task: [deps-up] docker compose up --detach --wait
[+] Running 2/2
⠿ Container outbox-worker-kafka-redpanda-1 Healthy 0.5s
⠿ Container outbox-worker-kafka-postgres-1 Healthy 0.5s
task: [_start_worker] go run ./... worker
Error: could not create test topic: dial tcp [::1]:9092: connect: connection refused
Looking in docker logs i see
outbox-worker-kafka-postgres-1 | 2022-10-17 12:17:08.136 UTC [76] FATAL: role "test" does not exist
Hi, thank you for this awesome library.
We have been using it for a while now, but we have new requirement in our projects that I am not sure we can implement with the current functionality.
Basically we have several services running, each one has its own job types and they all run in parallel in the same DB.
But now we have a new service that has to have several job types, but the same queue type. And the job types are of different nature, some tasks are really fast <20ms others really slow >2 minutes.
We need to have many workers for the quick tasks, and only a couple of workers for the slow task.
Right now you can only specify num workers per pool/queue.
The quick tasks must be processed in near real-time fashion. The slow tasks can wait.
If I create a workmap for these queue, and we have many slow tasks, the workers get stuck doing slow tasks, and the quick tasks are not resolved until all the slow tasks created before are processed.
(this is when tasks fail, they are mainly priorized by run_at, and the priority is ignored... unfortunately we have a lot of failures due to unreliable 3rd party services)
If we separate the tasks in different workmaps or pools of workers, because they share the same queue, we get a lot of job with unknown type
as both workers try to pull and lock job types of the other type. (even if we handle the unknown type, we do not want for those tasks to be locked constantly until the right pool picks them up)
This is also a problem if you have more job types in your queue than what your service understands. As you could have several services listening the same queue with workers/handlers for different job types. Even if you can ignore them, right now it fetches all jobs of any type, which is not ideal / optimal.
So, first of all, is there a way to achieve this with the current v5 version? (sorry if we have missed it)
If it is not possible, would you be ok for us to create a pull request with the following changes:
WithWorkerJobTypes(types ...string)
and store these in the workerjob_type = ANY(types)
I believe with thes changes:
Let us know how we can proceed.
Thank you!
Lines 284 to 291 in c9efae2
Maybe a silly question - but why truncate the stacktrace?
Aside from writing bug-free code 🤣 is there a better way to extract errors than relying on the built in last_error
field? This often removes the useful bit when there's tracing and other things wrapping the job.
We're using gue
together with Gorm, and specifically the otelgorm
library which provides otel spans when Gorm executes a SQL command.
The issue that we see is that because the poll function occurs before the first span is emitted from gue, the span that ends up being emitted by the "poll" gorm call becomes its own root span without any context about gue. For us, it would be desirable if the gue
span includes the the poll query so that the poll is part of the worker span.
Of course, the downside of this switch is that any users of gue
would start getting job spans for every poll, regardless of if the poll found a job. I could see how this could be undesirable.
Aside from an IncludePollInTrace
option, not sure if there is a "good" solution to this.
Let's suppose that I have an API and a worker both using gue, what approach do you recommend for migrating the database?
Some ways that I had been though:
For the 1st, I am worried about the execution order.
For the 2nd, I am thinking about the shared code to enable this.
I am trying to mutate the job.RunAt field inside a job to have it try to run itself at a later time. I want to control when to do this myself in a job and not as s policy.
Currently when I do this it works but i get the error
2022/11/09 11:26:42 Got an error on deleting a job level=error worker-pool-id=8d9cd5 worker-id=8d9cd5/worker-1 job-id=3 job-type=ChallengeFinisher error=conn busy
I have a wrapper around a job where i check that if this jobs depdendentIds are not finished running it should wait for a given interval and try again. I would very much like to keep the id of this job as it is and not report this as an error.
Hey there!
I’m trying to implement a generic jobType consumer with this library, but i met an issue. It is requried to define the jobType while creating the workerPool.
Is there any chance, how i can solve my problem?
It seems like pgx dependency points to a no longer available module.
github.com/vgarvardt/gue/v2 imports
github.com/jackc/pgtype tested by
github.com/jackc/pgtype.test imports
github.com/jackc/pgx/v4 tested by
github.com/jackc/pgx/v4.test imports
github.com/jackc/pgtype/ext/satori-uuid: module github.com/jackc/pgtype@latest found (v1.6.1), but does not contain package github.com/jackc/pgtype/ext/satori-uuid
I was able to fix on my working copy by bumping pgx to latest.
- github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186
+ github.com/jackc/pgx/v4 v4.9.0
Let me know if you would like a PR for that. Tests still pass but I didn't test the functionality (since I'm not using that adapter).
It is possible to create an example that is more production ready (the one in the readme has some comments) or point to a repo that has this?
We are strongly considering using this framework as the work horse of our small workflow engine but would really love some more exhaustive examples.
CREATE TABLE IF NOT EXISTS finished_jobs_log
(
job_id BIGSERIAL NOT NULL PRIMARY KEY,
run_at TIMESTAMPTZ NOT NULL,
type TEXT NOT NULL,
queue TEXT NOT NULL
);
I've noticed that panic error messages are not outputted/logged and can only be checked in the database last_error
column. I've tried debugging the code when panic is produced and the logger is indeed set to NoOpLogger
in the recovery code scope:
Passing both StdLogger
or zap.Logger
when creating a gue client didn't help. Example of how I'm setting the custom logger:
gc, err := gue.NewClient(poolAdapter, gue.WithClientLogger(adapter.NewStdLogger()))
Is it the expected behavior and I've misunderstood how custom loggers should work or is it a bug?
I'm using v4
gue version: github.com/vgarvardt/gue/v4 v4.1.0
defer recoverPanic(ctx, ll, j)
in worker.go:206
doesn't log the panic and populates lastError with an empty string, even tho it increases the errerCount value
I'm using the zap and pq adapter and
I spend some time looking into the issue but I couldn't find the reason
I am trying to implement logic where when i cancel the context a workerPool run in the active jobs are allowed to finish, but not accept new jobs. Is this a pattern that gue supports?
I tried to experiment a little but i did not find an obvious way to do this.
Is there a way to limit the called of worker function if those worker function its always return an err
?
something like how much retry operation that its need
First of all great work. Works as expected with tonnes of options but I will just point out the obvious that came to my mind.
While the poll works great, Postgres's Listen/Notify interface offers more significant benefits. And in the Elixir world, there's a popular job processing package leveraging this to be much more scalable.
https://github.com/sorentwo/oban
Interestingly that project also offers paid licenses for more plugins and web UI. This package actually falls back to polling if for some reason pub/sub stops working.
Understandingly this is an open-source project but please consider this as an eventual path forward. I am open to collaborating on this.
https://getoban.pro/articles/one-million-jobs-a-minute-with-oban
Using this adapter.Tx makes it hard to write portable functions it would be best to just use sql.DB instead of adapter.Tx
Currently, gue attaches some span attribute when a job is being run. This is great, although the attributes do not match OpenTelemetry naming conventions.
For each multi-word dot-delimited component of the attribute name separate the words by underscores (i.e. use snake_case). For example http.response.status_code denotes the status code in the http namespace.
Thus job-id
would be better named as job_id
, etc.
I encountered a problem, why? Thank you for you answern.
Details:
[gue-logger.go:26::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Debug()] Feb 23 19:09:36.946 [D] Tried to enqueue a job
[gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:36.946 [E] Worker failed to lock a job
......
[gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:46.954 [E] Worker failed to lock a job
[gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker finished
[gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker pool finished
Would it be possible to add a new string field to the job table that specifies text field that identifies that this particular job is of a particular run/batch of said job? The use case here is around me checking for dependent items in queue.
if we had a field job_batch_id (tentative name) that could optionally be added to a job checking this would be very easy. I could basically as part of the wrapper I have to check if a job should run check if there are any active jobs with a given subQueue and if it is reschedule the job to run later.
Also then I could say that if a job Enques other jobs it can use the same subQueue as parent or atleast use that as a prefix.
I know this is pushing the boundaries of the project a little but it is just a single field added from the perspective of this project.
Btw do you have a patreon set up here? You are doing amazing work and should be rewarded.
The scenario I'm running into is this
Is there a way to implement this behaviour in the library, or would I need to build something around the library instead?
Hi, what do you think of a grpc interface? If I would build it would you accept a pull request?
Sorry to bother you again @vgarvardt
We were using an internal library before moving to gue. (We have been using both until recently we removed the old one)
On that library we had a functionality that might be ported to gue and might be interesting.
Basically gue works polling jobs from the DB using an interval.
If you have many jobs types you have to basically find a balance for setting that interval, that is not polling too much, but you do not add too much delay processing important jobs.
The idea would be to:
NOTIFY [channel]
functionality.LISTEN [channel]
functionality.PS: I've read your comment about the plugin system. But this would need to be CORE I believe to access the timer, workmap, etc.
Let me know what you think, thank you.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.