Code Monkey home page Code Monkey logo

taskq's Introduction

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

build workflow PkgGoDev Documentation Chat

taskq is brought to you by ⭐ uptrace/uptrace. Uptrace is an open source and blazingly fast distributed tracing tool powered by OpenTelemetry and ClickHouse. Give it a star as well!

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using snappy / s2.

Resources:

Getting started

To get started, see Golang Task Queue documentation.

Producer:

import (
    "github.com/vmihailenco/taskq/v3"
    "github.com/vmihailenco/taskq/v3/redisq"
)

// Create a queue factory.
var QueueFactory = redisq.NewFactory()

// Create a queue.
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

// Register a task.
var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
    Name: "counter",
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

ctx := context.Background()

// And start producing.
for {
	// Call the task without any args.
	err := MainQueue.Add(CountTask.WithArgs(ctx))
	if err != nil {
		panic(err)
	}
	time.Sleep(time.Second)
}

Consumer:

// Start consuming the queue.
if err := MainQueue.Start(context.Background()); err != nil {
    log.Fatal(err)
}

See also

Contributors

Thanks to all the people who already contributed!

taskq's People

Contributors

anmic avatar dependabot[bot] avatar kd7lxl avatar lilien1010 avatar mwieser avatar notmahsa avatar rbg avatar renovate-bot avatar shaunco avatar silasdavis avatar sruehl avatar subomi avatar vmihailenco avatar yedemon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

taskq's Issues

memqueue_test.go makes use of Redis

Perhaps I'm missing something obvious, but I don't see why memqueue/memqueue_test.go uses a Redis Ring. Given that memqueue/queue.go is supposed to be an entirely in-memory queue, it seems odd that the unit tests are some weird hybrid of in-memory and Redis.

Please add q.wg.Wait() to redisq.go

After a carefully look, I found the wg in redisq.go is used only at RunQueue(), and has no Wait calls.

Should we add q.wg.Wait() to CloseTimeOut or some place else??

Because I'd really want to stop a running queue.

Please take this small quest into consideration. Thanks.. and thanks

Unable to grep the package

Description

Running go get github.com/vmihailenco/taskq/v3 gives the error due to go-redis .

Error Trace:-

go: github.com/vmihailenco/taskq/[email protected] requires
5
	github.com/go-redis/redis/[email protected]: reading github.com/go-redis/redis/go.mod at revision v8.0.0-alpha.1: unknown revision v8.0.0-alpha.1
6
##[error]Process completed with exit code 1.

Cross compilation fails due to gozstd

When I try to cross-compile a project that includes this lib I get the following errors:

# github.com/valyala/gozstd
../../go/pkg/mod/github.com/valyala/[email protected]/stream.go:31:59: undefined: CDict
../../go/pkg/mod/github.com/valyala/[email protected]/stream.go:35:64: undefined: CDict
../../go/pkg/mod/github.com/valyala/[email protected]/stream.go:47:20: undefined: Writer

I build using the following:

env GOOS=linux GOARCH=arm go build ./cmd/api/main.go

I do this on a macOS 10.15.2.

If I build on the mac (no cross-compilation) it works without errors.

Propagate TraceContext from producer to consumer

@vmihailenco Thank you for taskq and the support of opentelemetry out of the box in your libraries.

In taskqext/otel.go, support tracing with opentelemetry while processing the message in the worker not in the producer as well. I have a trace started in the producer I want to propagate this to the consumer as async trace. I want to pass trace information to consumers.

Cleanup completed job in redis

Thank you for great library.
I can see only Purge function to clean ALL job in redis queue. What should I do if I need to clean ONLY completed job?
Is there any expire, time to live,... function available for redis job?

Maybe Millisecond??

in redisq/queue.go
func cleanZombieConsumers

line 382: if time.Duration(consumer.Idle)*time.Second > q.opt.ConsumerIdleTimeout {

Maybe it is time.Millisecond ?

Sorry if i am wrong, just want to help. =w=

Delayed memqueue tasks are still queued after a Purge() and consumed after Close()

memqueue/queue.go uses time.AfterFunc to handle delayed messages, however, the handler doesn't recheck q.closed() before handing the taskq.Message to the consumer, which means that delayed tasks are still consumed after the queue is closed. Seems like this should be something like:

		time.AfterFunc(msg.Delay, func() {
			if q.closed() {
				q.wg.Done()
				return
			}
			msg.Delay = 0
			_ = q.consumer.Add(msg)
		})

This means that Close() ends up timing out while waiting for the waitgorup to complete.

Additionally, memqueue's Purge doesn't purge any delayed messages, as they exist in delayed goroutines. Seems a fix for this is a bit more complicated, as the *Timer objects returned from time.AfterFunc need to be tracked, so that .Stop() can be called on all of them when Purge is called. Also, that tracker will need a way to map each taskq.Message back to a *Timer, so that the tracked Timer can be removed when the AfterFunc handler is called.

Example code: taskqdelay.zip

I'll try to submit a PR for both of these later today.

Add a manager class to wrap services

What?

Example usage:

man := msgqueue.NewSQSManager(accountId, key)
man.NewQueue("myname") - calls azsqs.NewQueue
man.Queues() - calls azsqs.Queues()

It would be nice to have a more central manager class for interacting with the various services. For SQS, this would mean wrapping the client:

sess := session.Must(session.NewSession())
app.sqsClient = sqs.New(sess)

Why?

  • We can internalise the different services more easily and maybe remove user deps to sqs/iron/etc
  • Simplifies usage. In the SQS case we would not have to think about client/session management.

should create a peridic job to clean zoombie consumer

**:6379> xinfo consumers company  cg1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 3
   5) "idle"
   6) (integer) 4785470

once the idle is longer than a duration(6 hours maybe), we should del this consumer。

Once a consumer was killed unexpectedly, that consumer will be left as an Orphan.

Lack of CHANGELOG.md file

in the lastest 3.2.5 release, the release log is check the CHANGELOG.md xxx
image

but there is no CHANGELOG.md in the repo

Struct members passed to sync/atomic calls need to be 64-bit aligned on ARM and MIPS-32

Per the Go docs for sync/atomic at https://pkg.go.dev/sync/atomic#pkg-note-BUG:

On ARM, 386, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

taskq uses sync/atomic operations on quite a few struct members that are 32 bits in size, which in turn makes them 32-bit aligned in memory. Under heavy stress on ARM64 and MIPS-32 platforms, we have seen quite a few instances of this causing issues.

Since none of the int32/uint32 values are repeated (that is, they are all just a single value rather than in arrays), the easiest fix for this is to simply make them int64 values and update the corresponding sync/atomic calls. Making this change has minimal impact on overall memory usage - approximately 100 bytes (25 single instance int32/uint32 variables change to int64/uint64).

redisq isn't acknowledging processed messages.

Please correct me If I'm wrong. I have been running a script to generate events and consumer likewise with the code below:

// Define Our Queues
 var (
	ScheduledQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
		Name:  "ScheduledQueue",
		Redis: redisClient,
	})
 )

// Define Task
 var (
	WorkProcessor = taskq.RegisterTask(&taskq.TaskOptions{
		Name:       "WorkProcessor",
		RetryLimit: 10,
		Handler: func(job *Job) error {
			log.Printf("Processing Job - %s", job.ID)
			
			return nil
		},
	})
 )

// Schedule task.
 err := ScheduledQueue.Add(msg)
 if err != nil {
	log.Printf("Error occurred scheduling job - %s", job.ID)
 }

When I check the logs after the ReservationTimeout is past, I start to multiple lines of see:

taskq: 2021/10/12 13:27:24 queue.go:296: redisq: pending failed: redisq: can't find peding message id="1634040420749-0" in stream="taskq:{ScheduledQueue}:stream"

Then I go into redis-cli and perform xack taskq:{ScheduledQueue}:stream taskq 1634040419532-1 and the entry is acknowledged and it starts to complain about the next item in the Pending List.

Am I doing something wrong or is this a bug?

Thanks

Dynamic BufferSize or Consumer.Add() spawns a goroutine?

The current BufferSize implementation in queue.go makes it so that if a task is added to the queue and the queue is full, the AddTask call will block until a task completes. This has caused me issues in two cases:

Two different tasks in two different queues might add work to each other, and as soon as buffer size is exceeded, you end up in a deadlock where taskA.handler is calling queueB.Add() and taskB.handler is calling queueA.Add(). Sure, I can keep upping BufferSize, but there are cases like packet processing the number of queued tasks is beyond a developer's control.

Similarly, if a queue is full and one of the tasks in that queue tries to add another task to the queue, the calling task becomes deadlocked. There are plenty of instances where you might have something like PreprocessTask and PostprocessTask in the same queue and when Preprocess finishes, it queues Postprocess.

I've attached some code that demonstrates these two.

It seems like there are two possible fixes, but I haven't fully wrapped my head around the entire taskq package enough to submit a PR. It would seem that either adding a dynamic BufferSize that isn't tied to ReservationSize would fix this, or having Consumer.Add() do the channel send in a goroutine:

func (c *Consumer) Add(msg *Message) error {
	_ = c.limiter.Reserve(msg.Ctx, 1)	
	go func() {
		c.buffer <- msg
	}()
	return nil
}

... which keeps BufferSize as is, but at the overhead of spawning goroutines.

taskqchaos.zip

Named tasks can not be recreated after they complete

I had left this comment in #106, but figured it was worth asking more directly...

I find it very strange that named tasks will always Exists once added to a queue when it seems like they should stop existing once successfully executed... but Storage only has a single Exists method, which means Consumer has no way to remove a successfully executed task from storage.

RedisStorage holds these for 24hr and LocalStorage will hold them until 128,000 additional tasks have been queued.

An example:

  1. Create a task and give it a name like spider_www_google_com
  2. Attempt to add a second task with the name spider_www_google_com - taskq rightfully returns taskq.ErrDuplicate
  3. spider_www_google_com task completes successfully and is consumer.Put is called which calls consumer.delete, which calls Queue.Delete ... this calls Redis.XDel on the ID for Redis and scheduler.Remove for memqueue
  4. Attempt to add a new task with the name spider_www_google_com - taskq returns taskq.ErrDuplicate

I would expect step 4 to succeed, but it fails because RedisStorage.Exists calls SetNX on the task's name whereas the call to XDel is on the task's ID ... which leaves the name lock lingering for 24hr. Similarly, LocalStorage.Exists puts the name in a LRU cache (of size 128,000) whereas scheduler.Remove is a map based with the task's pointer as its key.

The easy fix is to extend the Storage interface to have a Delete method that can be called when a task is completed successfully... however, for Redis it is probably best to simply pass both the task's ID and name to XDel to avoid needing a second round-trip to Redis and then leaving RedisStorage.Delete to do nothing.

Thoughts? Am I missing something here?

Can't pass multiple arguments to the task handler

I can't seem to be able to pass multiple arguments to a Task's handler function

When calling like tasks.MainQueue.Add(tasks.CountTask.WithArgs(context.Background(), "hello", "example", "test")) I can't retrieve all the 3 arguments on the handler

CountTask = taskq.RegisterTask(&taskq.TaskOptions{
		Name: "counter",
		Handler: func(arg1, arg2, arg3 string) error {
			println("handler:", arg1, arg2, arg3)
			IncrLocalCounter()
			return nil
		},
	})

I get consumer.go:616: task="counter" failed (will retry=1 in dur=30s): taskq: got 1 args, wanted 3

When I try

CountTask = taskq.RegisterTask(&taskq.TaskOptions{
		Name: "counter",
		Handler: func(arg1 *taskq.Message) error {
			println("handler:", arg1.Args)
			IncrLocalCounter()
			return nil
		},
	})

I get an empty array handler: [0/0]0x0

With the handler function accepting just one string argument, it prints the last argument:

CountTask = taskq.RegisterTask(&taskq.TaskOptions{
		Name: "counter",
		Handler: func(arg1 string) error {
			println("handler:", arg1)
			IncrLocalCounter()
			return nil
		},
	})

The output is handler: test

I wonder how can I pass multiple arguments to a handler function or if this is a bug

Redis message ID

XAck method need message ID to delete, but message ID doesn't pass to client in XAdd method and "*" set as steam ID as default.

Also XAdd method return StringCmd struct that contains stream ID as val property. You can return stream ID (which generated by Redis client) with error in redis queue Add method to use it for deleting message from stream.

Ability to provide generic fallback handler

Currently the fallback handler is passed through the same handle machinery as the normal handler. This makes sense when you want to take a task-specific fallback action, but when you just wan to log or alert on the message in a generic way it is slightly onerous to write a custom fallback handler that basically does the same thing - in my case log the task and arguments to an error reporter. If I didn't have to match the type and number of arguments of the message I could just reuse the same implementation.

To support this fnArgs could accept a certain function signature regardless of the shape of the message. The two candidates I can think of are:

func(context.Context, args... interface{})

or:

func(context.Context, msg *Message)

modulo the Context obviously

I think I prefer the second because it makes other potentially useful information available like the message Name and the ReservedCount.

It might make sense to allow handlers too to take a single Message argument - where they too could also potentially benefit from the metadata or possibly be defined generically.

The implementation of this would be very simple - if there is a single Message arg then just pass that. However I think there may be an issue with the message compression whereby the Args field is not necessarily filled - what would be the best strategy here?

Timers are not cleaned up properly on Stop and Reset

Timers in consumer.go and memqueue/queue.go are not cleaned up properly on Stop() and Reset() calls.

Per the Go docs:

  • https://pkg.go.dev/time#Timer.Stop

    To ensure the channel is empty after a call to Stop, check the return value and drain the channel. For example, assuming the program has not received from t.C already

  • https://pkg.go.dev/time#Timer.Reset

    For a Timer created with NewTimer, Reset should be invoked only on stopped or expired timers with drained channels.

    If a program has already received a value from t.C, the timer is known to have expired and the channel drained, so t.Reset can be used directly. If a program has not yet received a value from t.C, however, the timer must be stopped and—if Stop reports that the timer expired before being stopped—the channel explicitly drained

Both recommend this pattern:

if !timer.Stop() {
    <-timer.C
}

Unfortunately, the <-t.C channel read can block if the channel has already been read. As such, this pattern will prevent the read from blocking:

if !timer.Stop() {
    select {
    case <-timer.C:
    default:
    }
}

Chaining API to create messages

E.g. replace

msg := task.OnceWithArgs(ctx, 15*time.Second, arg1, arg2, arg3, arg4, arg5)

with

msg := task.NewMessage(arg1, arg2, arg3, arg5, arg5).
    WithContext(ctx).
    OnceInPeriod(15*time.Second)

undefined: syscall.Sysinfo_t

# github.com/capnm/sysinfo
vendor/github.com/capnm/sysinfo/sysinfo.go:62:9: undefined: syscall.Sysinfo_t
vendor/github.com/capnm/sysinfo/sysinfo.go:65:9: undefined: syscall.Sysinfo

Redis queue visibility

Is there an easy way to see what jobs are currently in the queue for Redis backend? The closest I could get to was this:

127.0.0.1:6379> XREAD COUNT 112 STREAMS "taskq:{api-worker}:stream" writers 0-0 0-0
1) 1) "taskq:{api-worker}:stream"
   2) 1) 1) "1642584445592-0"
         2) 1) "body"
            2) "\x83\xa11\xb0\xe1\r\xf4a\x1c\x05B\t\x95\xdbK\x8ex\xec\xc6X\xa13\xc4\x10\x91\xaeHello world 87\xa15\xa7counter"
      2) 1) "1642584447593-0"
         2) 1) "body"
            2) "\x83\xa11\xb0'\xad\x95\xb1c\xb5F\x92\xb4u%\xc9|\x0b\xc2N\xa13\xc4\x0f\x91\xadHello world 6\xa15\xa7counter"
      3) 1) "1642584449594-0"
         2) 1) "body"
            2) "\x83\xa11\xb0\x147\ba\x954E\xdf\x8aH8\x9ePG\xbd\xf7\xa13\xc4\x10\x91\xaeHello world 31\xa15\xa7counter"

Ideally I could start some web ui that would show it :) I could even try to build one if somebody could provide more details on how to read and de-serialize values stored in redis stream

Structured logging

Many of the log statements have a semi-structured format with key=value substrings. It is useful operationally to have distinct key-values for downstream loggers like elasticsearch, logstash,etc.

I propose we swap out the Go std logger for an interface like:

https://github.com/go-kit/kit/blob/master/log/log.go#L11

or

https://github.com/inconshreveable/log15/blob/master/handler.go#L19

And use one of those libraries to handle the logging. Both work well. The former has simplest possible interface, the latter a little bit more structure, but both essentially transmit the same information.

I also personally much prefer to not share a global logging instance and pass it down the stack through options, but I appreciate it is a matter of taste. I think we could optionally set it on the factory and have it passed down from there.

Will submit PR if you agree.

sigsegv

Oct 16 12:05:15  worker[6431]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x78 pc=0x8fcf93]
Oct 16 12:05:15 worker[6431]: goroutine 70 [running]:
Oct 16 12:05:15  worker[6431]: github.com/go-msgqueue/msgqueue/azsqs.(*Queue).deleteBatch(0xc4202531f0, 0xc43e035a80, 0xa, 0x
Oct 16 12:05:15  worker[6431]:         

Option to not use global registry

I have dispatcher type with some internal state. The dispatcher is passed to my worker orchestrator 'workers', workers then calls Init() on the dispatcher. This registers the tasks and dispatcher holds a reference to them. The state in the dispatcher is local, but it is closed into the handler functions which are global, which is the cause of my issue.

I could work around this by implementing a namespace for task names, with a prefix for instance. But it would be cleaner if I could just use a separate registry for each queue/consumer.

How would you feel about having Queue hold a reference to a registry? I propose we:

  • Export the TaskRegistry type
  • Add Tasks *TaskRegistry to QueueOptions
  • Default the Registry to be global Tasks

Will submit PR if you agree.

How to stop a running queue at runtime??

I'm curious about how to stop a running queue without stop the programme. Becauze I've to start the queue accordingly. And can't just leave them on the floor.

Please help..

S2 compression

According to recent post of @klauspost on Reddit:

S2 is meant to be the choice for absolute speed. While it is only available in Go other languages can simply use Snappy for compression and implementing the decoder changes should be quite trivial, since the changes are very small.

For the rest I would probably go for zstd - especially if I can find the time to implement even stronger compression for the Go version.

S2, Go data compression at GBs per second

Maybe it could be implemented in taskq?

Release function - why do we not use XCLAIM

Thanks for the library, I was having issues with another redis queue - I didn't know about consumer groups so this looks great.

I was just looking at the message recovery code and I have a question about the Release function in redisq (https://github.com/vmihailenco/taskq/blob/master/redisq/queue.go#L205-L213)

You use XDEL to remove a timed out message the re-add it to the queue. Seems like this would usually be okay, but since it is not atomic if we were to crash during Release we could delete a message without re-adding it.

I was expecting to find a usage of XCLAIM (https://redis.io/commands/xclaim) which seems to be designed for this purpose. Is there a particular reason you did not use this? Would it be a good idea to use it?

Add support for Kafka/NSQ/RabbitMQ

Would love to see support for some of the other non-hosted queuing systems out there.

Kafka/NSQ/RabbitMQ are all pretty solid and those options here would be super interesting!

Why is the consumer started in the memqueue.NewQueue() method?

I'm wondering why the consumer in https://github.com/vmihailenco/taskq/blob/v3/memqueue/queue.go.NewQueue() is started before returning the queue. Doesn't this go against the current design of other queues? Perhaps there is a reason why?

func NewQueue(opt *taskq.QueueOptions) *Queue {
	opt.Init()

	q := &Queue{
		opt: opt,
	}

	q.consumer = taskq.NewConsumer(q)
	if err := q.consumer.Start(context.Background()); err != nil {
		panic(err)
	}

	return q
}

`

Encountering Crash when upgrade to v.3.2.7

	
2021/11/09 04:03:04 exit status 2
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:100 +0x309
created by github.com/vmihailenco/taskq/v3/redisq.NewQueue
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:102 +0x90
github.com/vmihailenco/taskq/v3/redisq.NewQueue.func2(0xc00020f290)
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:294 +0x111
github.com/vmihailenco/taskq/v3/redisq.(*Queue).scheduler(0xc00020f290, 0x108c8e0, 0x7, 0xc00182ffa8)
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:437 +0x15a
github.com/vmihailenco/taskq/v3/redisq.(*Queue).withRedisLock(0xc00020f290, 0x11d7130, 0xc000126008, 0xc00003d650, 0x21, 0xc000e9ff38, 0x0, 0x0)
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:296 +0x42
github.com/vmihailenco/taskq/v3/redisq.(*Queue).scheduler.func1(0x11d7130, 0xc000126008, 0x7f66551c35c0, 0xc00159d280)
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:406 +0x318
github.com/vmihailenco/taskq/v3/redisq.(*Queue).schedulePending(0xc00020f290, 0x11d7130, 0xc000126008, 0x84f5e5, 0xc001040b40, 0x11d7130)
/tmp/cache/go-path/pkg/mod/github.com/vmihailenco/taskq/[email protected]/redisq/queue.go:240 +0x1ae
github.com/vmihailenco/taskq/v3/redisq.(*Queue).Release(0xc00020f290, 0xc000dfe5b0, 0x0, 0x0)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/pipeline.go:115 +0x102
github.com/go-redis/redis/v8.(*Pipeline).Exec(0xc000419310, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:604
github.com/go-redis/redis/v8.(*Client).processTxPipeline(...)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:120 +0x2b7
github.com/go-redis/redis/v8.hooks.processTxPipeline(0xc000c00f10, 0x1, 0x1, 0x0, 0x0, 0xc000f3e820, 0x2, 0x2, 0xc00182fbd0, 0xe3214e, ...) 
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:103 +0x32a
github.com/go-redis/redis/v8.hooks.processPipeline(0xc000c00f10, 0x1, 0x1, 0x0, 0x0, 0xc000679040, 0x4, 0x4, 0xc00182fbd0, 0x70, ...)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:396
github.com/go-redis/redis/v8.(*baseClient).processTxPipeline(...)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:404 +0x77
github.com/go-redis/redis/v8.(*baseClient).generalProcessPipeline(0xc000272a60, 0x0, 0x0, 0xc000679040, 0x4, 0x4, 0xc000f22d50, 0x0, 0x0)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:424 +0x113
github.com/go-redis/redis/v8.(*baseClient)._generalProcessPipeline(0xc000272a60, 0x0, 0x0, 0xc000679040, 0x4, 0x4, 0xc000f22d50, 0x10, 0xf5a220)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:274 +0x8a
github.com/go-redis/redis/v8.(*baseClient).withConn(0xc000272a60, 0x0, 0x0, 0xc001040f90, 0x0, 0x0)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:182 +0x66
github.com/go-redis/redis/v8.(*baseClient).getConn(0xc000272a60, 0x0, 0x0, 0x862aa5, 0xc00182f8c8, 0x40f87b)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/redis.go:194 +0x4c
github.com/go-redis/redis/v8.(*baseClient)._getConn(0xc000272a60, 0x0, 0x0, 0x203000, 0x15, 0x15)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/internal/pool/pool.go:249 +0x58
github.com/go-redis/redis/v8/internal/pool.(*ConnPool).Get(0xc0003d3c20, 0x0, 0x0, 0x0, 0xc00182f7d8, 0x51c116)
/tmp/cache/go-path/pkg/mod/github.com/go-redis/redis/[email protected]/internal/pool/pool.go:292 +0x37
github.com/go-redis/redis/v8/internal/pool.(*ConnPool).waitTurn(0xc0003d3c20, 0x0, 0x0, 0xc00182f9d0, 0xdef904)
goroutine 254 [running]:

[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x7637f7]

azsqs Release failed

Release failed: InvalidParameterValue: Value ... for parameter ReceiptHandle is invalid. Reason: Message does not exist or is not available for visibility timeout change.
status code: 400, request id: ....

Expose interfaces when possible

When using this library it's hard to write mocks.
This is due to the fact that some methods expose structs.

func (q *Queue) Consumer() *taskq.Consumer {
	if q.consumer == nil {
		q.consumer = taskq.NewConsumer(q)
	}
	return q.consumer
}

see here

So I would propose to write interfaces whenever you have objects that provide interactions to allow writing mocks for the API without the need for doing something like described here.

I will prepare a PR containing the changes.

Add ability to disable autotuner

Currently taskq consumer starts with 1 fetcher and 1 worker. Then it automatically adds and removes fetchers and workers based on the current load. It usually works fine but it is not very well tested. So there should be an ability to disable this behavior.

/cc @silasdavis just in case it affects you. This is recent v2 addition and I am not sure how it performs on different workloads...

redis queue implementation of schedulePending queries future time range

The redis queue implementation of schedulePending seems to XPENDING for a future time range (from now to now + 5min)?
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L333-L342

When monitoring on redis side, the log looks like this:

1630486661.736435 [0 172.17.0.1:55854] "xpending" "taskq:{claim4-tester}:stream" "taskq" "1630486961735" "+" "100"

The time 1630486961 used as the lower time bound of the xpending command is 5 minutes after the time xpending is called (at 1630486661). So I suppose there won't be any messages returned...

I'm confused by the intention of schedulePending:

  • Is it meant to release the message if the message is "older" than 5min? e.g., should it be querying time range from - to now - 5min?
  • Is it meant to requeue any pending messages reserved by any consumers in the same group (including other consumers in the group, not necessarily the "current consumer")? Because the xpending command doesn't specify the consumer's name.

redis `schedulePending` target messages continue to increase

Example

  • Create stream(taskq:{test}:stream) and consumer group(taskq)
127.0.0.1:6379> XGROUP CREATE taskq:{test}:stream taskq $ MKSTREAM
OK
  • Add 3 messages
127.0.0.1:6379> XADD taskq:{test}:stream * body apple
"1634099225543-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body orange
"1634099233517-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body banana
"1634099237506-0"
127.0.0.1:6379> XLEN taskq:{test}:stream
(integer) 3
  • This time pending list is empty
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
(empty array)
  • Consumer foo allocated 3 messages
127.0.0.1:6379> XREADGROUP GROUP taskq foo STREAMS taskq:{test}:stream >
1) 1) "taskq:{test}:stream"
   2) 1) 1) "1634099225543-0"
         2) 1) "body"
            2) "apple"
      2) 1) "1634099233517-0"
         2) 1) "body"
            2) "orange"
      3) 1) "1634099237506-0"
         2) 1) "body"
            2) "banana"
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
  • When message processed successfully the message will be deleted, but pending list remains 3 messages
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099225543-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1

All messages will leaves in pending list that schedulePending method will got an error trying to release a deleted messages
issue: #143

When Delete and Release do we should execute XACK and then execute XDEL
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L225
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L245

127.0.0.1:6379> XACK taskq:{test}:stream taskq 1634099233517-0
(integer) 1
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099233517-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1
2) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.