hibiken / asynq Goto Github PK
View Code? Open in Web Editor NEWSimple, reliable, and efficient distributed task queue in Go
License: MIT License
Simple, reliable, and efficient distributed task queue in Go
License: MIT License
We should support timeout per task and allow user to cancel task processing using the CLI.
Cancellation would be useful in two cases:
Optional timeout per task is used in cases such as:
In order to support both cancel and timeout, we need to change the Handler
interface.
type Handler interface {
func ProcessTask(ctx context.Context, t *Task) error
}
and add Timeout
function to create a timeout option when scheduling a task.
client.Schedule(task, time.Now().Add(time.Hour), asynq.Timeout(10 * time.Second))
CLI will have a new command to cancel a in-progress task:
asynqmon cancel [task_id]
A cancelled task should be moved to retry state consuming a retry count.
Is your feature request related to a problem? Please describe.
When utilizing in my program, go mod download
pulls in a large number of dependencies. Many of which are related to the nice but "optional" CLI tool.
Describe the solution you'd like
Move the CLI to a separate project such as asynq-cli or similar.
Or possibly create a separate nested go.mod file under the CLI folder.
Describe alternatives you've considered
Very nice and simple library. Would be nice if could be used with only the bare minimum of dependencies. Wanted to point this issue out since it was a bit of frustration when building the project multiple files in docker containers.
Additional context
N/A
Is your feature request related to a problem? Please describe.
When you have a lot of tasks, it'd be useful if ls
commands support filtering
Describe the solution you'd like
filter should support
Describe the bug
When using RedisFailoverClientOpt
to take advantage of automatic failover, the tasks that are in-progress when the failover happens don't get removed from in-progress.
To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):
Background
with RedisFailoverClientOpt
redis-cli -p <port> DEBUG sleep 30
while some tasks are in progress.Background
will resume processing tasks, but the tasks that were in-progress never get removed from the asynq:in_progress
list in redis.Expected behavior
Upon successful failover, the tasks that were in-progress should be removed from the asynq:in_progress
LIST
Environment (please complete the following information):
asynq
package: v0.1.0Is your feature request related to a problem? Please describe.
We should make inspector
package public to let users create their own CLI or Web UI to monitor queues and tasks.
Note Since the API is tightly coupled to the internals of the core asynq
package, work on this once the internals have become stable.
Is your feature request related to a problem? Please describe.
User of the package should be able to set the minimum log level.
This would be also useful in testing, since logs in CI are currently a bit noisy when running tests with -v flag.
Describe the solution you'd like
Add LogLevel
field in Config
and define Level
type and expose predefined enums for each level
srv := asynq.NewServer(r, asynq.Config{
LogLevel: asynq.InfoLevel,
})
Describe alternatives you've considered
Expose DefaultLogger
and allow user to set level through that object.
asynq.DefaultLogger.SetLevel(asynq.InfoLevel)
Cons of this approach is that we have to define a new type for DefaultLogger
which has SetLevel(l Level)
as a method.
Additional context
None for now
Is your feature request related to a problem? Please describe.
We should provide a hook to allow users of the library to customize error reporting.
Describe the solution you'd like
Add "Listener" field in Config
for events that users would care about
Describe alternatives you've considered
None for now.
Is your feature request related to a problem? Please describe.
I'm always frustrated when I run asynqmon ls
when there are lot of tasks in the queue. It'd be nice if there's a way to paginate the list.
Describe the solution you'd like
Example: asynqmon ls -size=30 -offset=100
to list 30 tasks from 101th task.
Describe alternatives you've considered
Alternatively, I could use the command in conjunction with pager command such as less
but ideally I'd like to see the column header so that it's easy understand what each value means
Is your feature request related to a problem? Please describe.
I'm currently trying to integrate asynq
into my web framework(heavily inspired by Rails which also supports SPA prerender using chromedp when running in docker image and more feature improvements coming from Rails experience) which it already has its own logger.
Describe the solution you'd like
I'd like to plug in a custom logger to the Background instance (I currently call it worker
which will be triggered by the command go run . worker
in debug build or <BINARY> worker
in release build).
Side note
I have reviewed a few other solutions:
asynq
but isn't very reliable which they came up with v2)I decided to go with asynq
as I believe this library will grow to be like Sidekiq
as much as possible given what was mentioned in the README.md
. Thanks.
Describe the bug
When task get processes successfully, it should be removed from asynq:in_progress
list in Redis. However the operation to removed the task fails when you have a large number in the payload (caused by JSON number overflowing the target type).
You'll see an error message like this when this happens.
WARN: Could not remove task id=brgdeqquof2o9md7qoq0 type="test2" from "asynq:in_progress": NOT FOUND; Will retry syncing
startJob.Enqueue enqueue QTTMemberID:702303,OutKey:user_main-coin_to_sub_702303_1587398400 Error:ERR 'EVALSHA' command keys must in same slot
Dequeue error: ERR Error running script (call to f_119a32ad4f4ac4f23fba1b8be91aacd2948feb88): @user_script:4: @user_script: 4: Lua script attempted to access a non local key in a cluster node
Is your feature request related to a problem? Please describe.
Currently we only show overall processed, failed count and error rate based on those counts.
Describe the solution you'd like
It'd be useful if asynq tracks processed and failed count per task type
Currently we increment Retried
field on TaskMessage when we move the task to retry queue. This data is visible to user via asynqmon
and user will see Retried 1 after first attempt to process the task fails. This could be confusing to user since the task is scheduled for a retry but hasn't been retried yet.
Add a command to asynqmon to show historic stats (processed/failed for the last x days)
Is it safe to use 4x workers with for example concurrency 20 each? For example in a containerized environment, where each worker runs in its own container?
Just wondering because I did not see this addressed specifically in other questions or documentation.
Is your feature request related to a problem? Please describe.
It'd be nice if we can pause a queue without shutting down the background worker process.
Sidekiq Pro has this exact feature documented here.
If a queue is paused then the background workers shouldn't process any tasks from that queue.
Describe the solution you'd like
Add a command to the CLI to pause/unpause a queue.
asynq queue pause [queue name]
asynq queue unpause [queue name]
Describe alternatives you've considered
None for now.
Additional context
None for now.
user should be able to specify how many times a task should be retried.
Is your feature request related to a problem? Please describe.
Do plan to support middleware functions that behave the same way for example gorilla mux.Middware do? If not how do logging??
https://github.com/hibiken/asynq/blob/master/background.go#L253
normally, packages has two methods, Run
and Stop
, so i can cleanup other resources before or after Stop(),
another scene is that user will run multipe backgroud server in main()
, like kafka consumer
Is your feature request related to a problem? Please describe.
Sometimes I don't really want to schedule a task. Just want it to get processed as soon as possible (next slot ready). It feels a bit awkward to need to provide a timestamp for that, even if time.Now() will work in most cases (assuming that the clocks are well synchronized - which normally would not be a problem).
Describe the solution you'd like
Would be nice if could do something like:
err := client.ScheduleNext(t)
and the task would be processed in next available slot without any time dependence.
Describe alternatives you've considered
Using time.Now()
is currently the way to go. But as I mentioned, any time differences can lead to delay in processing. In many cases the goal is to have tasks processed right away, in which case any possible delays are not desirable.
Additional context
Basically came from my feeling of using the library. I could be wrong here or not understand fully how the internals handle the timing. In that case, the documentation could be improved a bit to clarify.
Hi,
I am getting an error below when running example code on Windows. I enabled redis server
> D:\go-work\pkg\mod\github.com\hibiken\[email protected]\background.go:257:55: undefined: syscall.SIGTSTP
> D:\go-work\pkg\mod\github.com\hibiken\[email protected]\background.go:260:13: undefined: syscall.SIGTSTP
Add a doc to help people contribute to the project.
It should contain:
i am thinking about choose it
Is your feature request related to a problem? Please describe.
Any chance we would be supporting this? More details can be found here. Thanks.
Is there an easy way to determine if the current task being processed is a retry? It is not in the context I guess passed to ProcessTask, would be nice to have it in the Task perhaps.
This is useful if I have already completed some set of action, but then need to retry some aspect that failed - unless I need to store some relationship that a particular task failed and thus subsequent will be a retry?
I prefer I do not need to store this, as I see in redis we already are storing the number of retries which can be used as a determinant.
Maybe I have missed some means to be able to just get this information somehow?
I like how unlike most redis
backed task queues
you have abstracted away connection pooling so I don't have to think about which client to connect with. However I had to write my own redis ParseURL
function cause it was bit annoying passing multiple configuration values for just one resource while I pass just one for all the others.
Describe the solution you'd like
Though it's trivial it would be really great to have one provided by the package.
that would be of the following signature:
func ParseRedisURL(url string)(*asynq.RedisClientOpt, error)
Describe alternatives you've considered
I wrote my own.
Additional context
The task is simple enough I could open a PR if you approve
Is your feature request related to a problem? Please describe.
We could offer client libraries in other languages. Web applications written in another language can use the client library to enqueue tasks.
Describe the solution you'd like
Add a client library for
Additional context
These client libraries should be its own repo.
Use log.Logger
type to set Prefix and Writer.
Is your feature request related to a problem? Please describe.
When have I tasks that takes a while to complete, I want to know how long it's taking to process the task.
Describe the solution you'd like
asynqmon ls inprogress
shows the elapsed time for each task.
Describe alternatives you've considered
None.
CLI documented here: https://github.com/gojp/goreportcard#command-line-interface
Run this command in CI so that we can make sure all of the checks pass before merging PRs.
Is your feature request related to a problem? Please describe.
Client
should be able to schedule a task with an expiration time.
If a task doesn't get processed by the expiration time, the task should be skipped.
Add asynqmon ps
command that list all background processes and for each process show which queue it's processing.
It should show the status of process (Paused | Running)
Experiment with other serialization format and benchmark performance and memory usage:
Other serialization format:
Is your feature request related to a problem? Please describe.
Current Logger
interface is hard to satisfy without writing a wrapper around popular third-party logging libraries (i.e. logrus
, zap
)
Describe the solution you'd like
Change the interface to the following so that logrus
user don't have to write a adapter wrapper around logrus.Logger
.
type Logger interface {
Debug(v ...interface{})
Info(v ...interface{})
Warn(v ...interface{})
Error(v ...interface{})
Fatal(v ...interface{})
}
User should be able to specify a priority of a task.
Is your feature request related to a problem? Please describe.
I'm always frustrated when I need to call GetX
method for all the values in Payload to get all the data (also need to make sure that I don't make a typo in payload key string).
It'd be nice if asynq
supported en/decoding protobuf or gob message directly so that I can load that data to an object with one method call.
Describe the solution you'd like
initial proposal:
type EmailTaskPayload struct {
UserID int
}
// client side.
func main() {
c := asynq.NewClient(asynq.RedisClientOpt{Addr: ":6379"})
t := asynq.NewTask("email:welcome", EmailTaskPayload{UserID: 42})
err := c.Enqueue(t)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
}
// background workers side
func main() {
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 10,
})
mux := asynq.NewMux()
mux.HandleFunc("email:welcome", emailHandler)
bg.Run(mux)
}
func emailHandler(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
err := t.ReadPayload(&p)
if err != nil {
return err
}
fmt.Printf("Send Email to User %d\n", p.UserID)
return nil
}
Describe alternatives you've considered
None for now.
Additional context
None for now.
I would like to call into either the *asynq.Client or *asynq.Server for whether they are healthy - ie. can connect to redis etc.
It seems at the moment this is not possible, I want this so my health-check route can come back with a valid response if they are not well.
Hopefully not something hard to add, or if you can enlighten me how to get this status (if available). Thanks!
Maybe a quick:
healthy := true
_, err := r.redisClient.Ping().Result()
if err != nil {
return false
}
return healthy
Is your feature request related to a problem? Please describe.
Currently, both Client
and Background
constructor takes go-redis/redis/v7
as a parameter. This will become an issue when there is a new version or if we want to switch to another redis-client library. The change will require major version update, which we don't want to do just for a redis-client library change.
Describe the solution you'd like
We should abstract this away from asynq
package users and take RedisClientOption
as a parameter.
Describe alternatives you've considered
None so far.
Is your feature request related to a problem? Please describe.
Currently, we can only see how many tasks have been enqueued in each queue using the CLI.
It'd be better if we can show the latency of each queue in the output so that user can tell when was the oldest task in the queue was enqueued.
Describe the solution you'd like
Show the latency within the stats
command output.
Describe alternatives you've considered
None for now.
Additional context
None for now.
**Is your feature request related to a problem?
It's tedious and error-prone to specify Queue name as an Option
when enqueuing a task.
If a certain type of task needs to go into a specific queue, it's better to specify that once and not every time enqueueing that type of task.
This also applies to other options as well. It's more convenient and common to say
for this type of task, set timeout to be this duration, or
for this type of task, set unique TTL to be this duration.
Describe the solution you'd like
Add a method to Client
to set default options for a type.
func (c *Client) SetDefaultOptions(pattern string, opts ...Option)
Example:
c := asynq.NewClient(r)
// Routes maps task types to queue names.
c.SetDefaultOptions("web:email", asynq.Queues("web"))
c.SetDefaultOptions("feed:import", asynq.Queues("feed"), asynq.Timeout(time.Hour))
feedTask := asynq.NewTask("feed:import", payload)
c.Enqueue(feedTask) // Should be routed to "feed" queue.
webTask := asynq.NewTask("web:email", payload)
c.Enqueue(webTask) // Should be routed to "web" queue.
otherTask := asynq.NewTask("other:task", payload)
c.Enqueue(otherTask) // Should be routed to "default" queue. No matching routing so use "default" queue.
// You should be able to override routing at enqueue time.
feedTask := asynq.NewTask("feed:tasks", payload)
c.Enqueue(feedTask, asynq.Queue("other_queue"))
Describe alternatives you've considered
None for now.
Additional context
Inspiration: Celery has a task routing configuration option.
Describe the bug
Could not get the detail err in https://github.com/hibiken/asynq/blob/master/processor.go#L254.
Only log err in std:
asynq: pid=8419 2020/06/10 11:18:00.494785 WARN: Could not remove task id=brgc2pquof2i2r2uml3g type="test1" from "asynq:in_progress"; Will retry syncing
Expected behavior
if err happend, logger.Errorf("Could not remove task id=%s err: %+v", task.ID, err)
Environment (please complete the following information):
benchcmp is deprecated in favor of benchstat: https://pkg.go.dev/golang.org/x/perf/cmd/benchstat
Useful link: golang/go#23471
Describe the bug
When you call client.Enqueue(task)
it shouldn't be calling schedule()
To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):
just call client.Enqueue(task)
Expected behavior
When you call client.Enqueue(task)
it should call enqueue()
Environment (please complete the following information):
asynq
package: a38f628Additional context
t1 := time.Now()
t2 := time.Now()
fmt.Printf("delta nanos: %v\n", t2.UnixNano()-t1.UnixNano()) // Probably 0
fmt.Printf("t2 after t1: %v\n", t2.After(t1)) // Probably false
fmt.Printf("t2 not before t1: %v\n", !t2.Before(t1)) // Should be true
https://play.golang.org/p/K41KEKdMIQg
I guess L235 can change a little to make it more reliable. eg.
if time.Now().UnixNano() >= t.UnixNano() {
if !time.Now().Before(t) {
if !t.After(time.Now()) {
Lines 235 to 239 in a38f628
Describe the bug
The problem is that I spawned a taskqueuer that queued tasks ranging from now - 10 minutes to now + 10 minutes. With 4 workers running (each at concurrency 1).
The output I got was:
taskrunner2_1 | Creating redis worker
taskrunner2_1 | asynq: pid=1 2020/02/18 22:46:41.351359 INFO: Starting processing
taskrunner_1 | Creating redis worker
taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.359319 INFO: Starting processing
taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.443077 INFO: Send signal TSTP to stop processing new tasks
taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.443085 INFO: Send signal TERM or INT to terminate the process
taskrunner_1 | Got task: task 3
taskrunner_1 | (*asynq.Task)(0xc00000e220)({
taskrunner_1 | Type: (string) (len=6) "task 3",
taskrunner_1 | Payload: (asynq.Payload) {
taskrunner_1 | data: (map[string]interface {}) (len=1) {
taskrunner_1 | (string) (len=1) "i": (float64) -8
taskrunner_1 | }
taskrunner_1 | }
taskrunner_1 | })
taskrunner_1 | Got task: task 4
taskrunner_1 | (*asynq.Task)(0xc00000e440)({
taskrunner_1 | Type: (string) (len=6) "task 4",
taskrunner_1 | Payload: (asynq.Payload) {
taskrunner_1 | data: (map[string]interface {}) (len=1) {
taskrunner_1 | (string) (len=1) "i": (float64) -7
taskrunner_1 | }
taskrunner_1 | }
taskrunner_1 | })
taskrunner2_1 | Got task: task 1
taskrunner2_1 | (*asynq.Task)(0xc00000e380)({
taskrunner2_1 | Type: (string) (len=6) asynq: pid=1 2020/02/18 22:46:41.443166 INFO: Send signal TSTP to stop processing new tasks
taskrunner2_1 | asynq: pid=1 2020/02/18 22:46:41.443173 INFO: Send signal TERM or INT to terminate the process
taskrunner2_1 | "task 1",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -10
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
taskrunner2_1 | Got task: task 2
taskrunner2_1 | (*asynq.Task)(0xc00000e540)({
taskrunner2_1 | Type: (string) (len=6) "task 2",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -9
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
taskrunner2_1 | Got task: task 3
taskrunner2_1 | (*asynq.Task)(0xc00007c900)({
taskrunner2_1 | Type: (string) (len=6) "task 3",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -8
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
taskrunner2_1 | Got task: task 4
taskrunner2_1 | (*asynq.Task)(0xc00000e640)({
taskrunner2_1 | Type: (string) (len=6) "task 4",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -7
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
taskrunner2_1 | Got task: task 6
taskrunner2_1 | (*asynq.Task)(0xc00007ca00)({
taskrunner2_1 | Type: (string) (len=6) "task 6",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -5
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
taskrunner2_1 | Got task: task 7
taskrunner2_1 | (*asynq.Task)(0xc00000e740)({
taskrunner2_1 | Type: (string) (len=6) "task 7",
taskrunner2_1 | Payload: (asynq.Payload) {
taskrunner2_1 | data: (map[string]interface {}) (len=1) {
taskrunner2_1 | (string) (len=1) "i": (float64) -4
taskrunner2_1 | }
taskrunner2_1 | }
taskrunner2_1 | })
...
So tasks 3 and 4 were received twice, which could lead to problems, although I admit the case I am working with is a bit strange. I.e. minutes in the past etc.
To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 1,
})
bg.Run(asynq.HandlerFunc(handler))
}
func handler(ctx context.Context, t *asynq.Task) error {
fmt.Printf("Got task: %s\n", t.Type)
spew.Dump(t)
return nil
}
fmt.Printf("Creating redis client\n")
client := asynq.NewClient(r)
n := 1
for i := -10; i < 10; i++ {
name := fmt.Sprintf("task %d", n)
t := asynq.NewTask(name, map[string]interface{}{"i": i})
err := client.Schedule(t, time.Now().Add(time.Duration(i)*time.Minute))
if err != nil {
fmt.Printf("Error scheduling\n")
panic(err)
}
n+
}
Note the way I am spawning the workers and queuer, it is possible that the tasks are queued before some workers start. It is all starting at the same time. Ideally that would not matter.
Expected behavior
Expected that each task could only be received by one worker / processed once.
Screenshots
N/A
Environment (please complete the following information):
Additional context
If needed, I can clean up my docker compose environment and provide a fully contained example.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.