Code Monkey home page Code Monkey logo

go-queue's Introduction

go-queue GoDoc Build Status Build status codecov.io Go Report Card

Queue is a generic interface to abstract the details of implementation of queue systems.

Similar to the package database/sql, this package implements a common interface to interact with different queue systems, in a unified way.

Currently, only AMQP queues and an in-memory queue are supported.

Installation

The recommended way to install go-queue is:

go get -u gopkg.in/src-d/go-queue.v1/...

Usage

This example shows how to publish and consume a Job from the in-memory implementation, very useful for unit tests.

The queue implementations to be supported by the NewBroker should be imported as shows the example.

import (
    ...
	"gopkg.in/src-d/go-queue.v1"
	_ "gopkg.in/src-d/go-queue.v1/memory"
)

...

b, _ := queue.NewBroker("memory://")
q, _ := b.Queue("test-queue")

j, _ := queue.NewJob()
if err := j.Encode("hello world!"); err != nil {
    log.Fatal(err)
}

if err := q.Publish(j); err != nil {
    log.Fatal(err)
}

iter, err := q.Consume(1)
if err != nil {
    log.Fatal(err)
}

consumedJob, _ := iter.Next()

var payload string
_ = consumedJob.Decode(&payload)

fmt.Println(payload)
// Output: hello world!

Configuration

AMQP

The list of available variables is:

  • AMQP_BACKOFF_MIN (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_MAX (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_FACTOR (default: 2): Multiplying factor for each increment step on the retry.
  • AMQP_BURIED_QUEUE_SUFFIX (default: .buriedQueue): Suffix for the buried queue name.
  • AMQP_BURIED_EXCHANGE_SUFFIX (default: .buriedExchange): Suffix for the exchange name.
  • AMQP_BURIED_TIMEOUT (default: 500): Time in milliseconds to wait for new jobs from the buried queue.
  • AMQP_RETRIES_HEADER (default: x-retries): Message header to set the number of retries.
  • AMQP_ERROR_HEADER (default: x-error-type): Message header to set the error type.

License

Apache License Version 2.0, see LICENSE

go-queue's People

Contributors

ajnavarro avatar alcortesm avatar carlosms avatar erizocosmico avatar jfontan avatar juanjux avatar mcarmonaa avatar mcuadros avatar serabe avatar smacker avatar smola avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-queue's Issues

Test for delayed jobs fails spuriously

=== RUN   TestAMQPSuite/TestTransaction_not_supported
--- FAIL: TestAMQPSuite (2.93s)
    --- PASS: TestAMQPSuite/TestConsume_empty (0.90s)
    --- PASS: TestAMQPSuite/TestConsumersCanShareJobIteratorConcurrently (0.04s)
    --- FAIL: TestAMQPSuite/TestDelayed (1.02s)
    	suite.go:358: 
    			Error Trace:	suite.go:358
    			Error:      	Should be true
    			Test:       	TestAMQPSuite/TestDelayed
    --- PASS: TestAMQPSuite/TestJobIter_Next_empty (0.02s)
    --- PASS: TestAMQPSuite/TestJob_Reject_no_requeue (0.07s)
    --- PASS: TestAMQPSuite/TestJob_Reject_requeue (0.03s)
    --- PASS: TestAMQPSuite/TestPublishAndConsume_immediate_ack (0.10s)
    --- PASS: TestAMQPSuite/TestPublishDelayed_empty (0.02s)
    --- PASS: TestAMQPSuite/TestPublishDelayed_nil (0.03s)
    --- PASS: TestAMQPSuite/TestPublish_empty (0.02s)
    --- PASS: TestAMQPSuite/TestPublish_nil (0.03s)
    --- PASS: TestAMQPSuite/TestRetryQueue (0.54s)
    --- PASS: TestAMQPSuite/TestTransaction (0.04s)
    --- PASS: TestAMQPSuite/TestTransaction_Error (0.07s)
    --- SKIP: TestAMQPSuite/TestTransaction_not_supported (0.01s)
    	suite.go:435: transactions supported

NATS implementation

I've got some code for connecting to NATS littered around in a bunch of places. I was going to write a library like database/sql to abstract it away, but I figured somebody would have beaten me to it.

Is a PR for a NATS implementation something you'd merge? Or is that outside of the scope of this?

Sometimes rabbitmq does not start on time in AppVeyor

It seems that rabbitmq takes a bit to start. Some tests that use it work but the firsts fail.

Full log:

Build started
git clone -q https://github.com/src-d/go-queue.git c:\gopath\src\gopkg.in\src-d\go-queue.v1
git fetch -q origin +refs/pull/9/merge:
git checkout -qf FETCH_HEAD
set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
go version
go version go1.10 windows/amd64
go get -v -t ./...
github.com/satori/go.uuid (download)
Fetching https://gopkg.in/src-d/go-errors.v0?go-get=1
Parsing meta tags from https://gopkg.in/src-d/go-errors.v0?go-get=1 (status code 200)
get "gopkg.in/src-d/go-errors.v0": found meta tag get.metaImport{Prefix:"gopkg.in/src-d/go-errors.v0", VCS:"git", RepoRoot:"https://gopkg.in/src-d/go-errors.v0"} at https://gopkg.in/src-d/go-errors.v0?go-get=1
gopkg.in/src-d/go-errors.v0 (download)
github.com/pkg/errors (download)
Fetching https://gopkg.in/src-d/go-errors.v1?go-get=1
Parsing meta tags from https://gopkg.in/src-d/go-errors.v1?go-get=1 (status code 200)
get "gopkg.in/src-d/go-errors.v1": found meta tag get.metaImport{Prefix:"gopkg.in/src-d/go-errors.v1", VCS:"git", RepoRoot:"https://gopkg.in/src-d/go-errors.v1"} at https://gopkg.in/src-d/go-errors.v1?go-get=1
gopkg.in/src-d/go-errors.v1 (download)
Fetching https://gopkg.in/vmihailenco/msgpack.v2?go-get=1
Parsing meta tags from https://gopkg.in/vmihailenco/msgpack.v2?go-get=1 (status code 200)
get "gopkg.in/vmihailenco/msgpack.v2": found meta tag get.metaImport{Prefix:"gopkg.in/vmihailenco/msgpack.v2", VCS:"git", RepoRoot:"https://gopkg.in/vmihailenco/msgpack.v2"} at https://gopkg.in/vmihailenco/msgpack.v2?go-get=1
gopkg.in/vmihailenco/msgpack.v2 (download)
github.com/stretchr/testify (download)
github.com/jpillora/backoff (download)
github.com/kelseyhightower/envconfig (download)
github.com/streadway/amqp (download)
Fetching https://gopkg.in/src-d/go-log.v1?go-get=1
Parsing meta tags from https://gopkg.in/src-d/go-log.v1?go-get=1 (status code 200)
get "gopkg.in/src-d/go-log.v1": found meta tag get.metaImport{Prefix:"gopkg.in/src-d/go-log.v1", VCS:"git", RepoRoot:"https://gopkg.in/src-d/go-log.v1"} at https://gopkg.in/src-d/go-log.v1?go-get=1
gopkg.in/src-d/go-log.v1 (download)
github.com/sirupsen/logrus (download)
Fetching https://golang.org/x/crypto/ssh/terminal?go-get=1
Parsing meta tags from https://golang.org/x/crypto/ssh/terminal?go-get=1 (status code 200)
get "golang.org/x/crypto/ssh/terminal": found meta tag get.metaImport{Prefix:"golang.org/x/crypto", VCS:"git", RepoRoot:"https://go.googlesource.com/crypto"} at https://golang.org/x/crypto/ssh/terminal?go-get=1
get "golang.org/x/crypto/ssh/terminal": verifying non-authoritative meta tag
Fetching https://golang.org/x/crypto?go-get=1
Parsing meta tags from https://golang.org/x/crypto?go-get=1 (status code 200)
golang.org/x/crypto (download)
Fetching https://golang.org/x/sys/windows?go-get=1
Parsing meta tags from https://golang.org/x/sys/windows?go-get=1 (status code 200)
get "golang.org/x/sys/windows": found meta tag get.metaImport{Prefix:"golang.org/x/sys", VCS:"git", RepoRoot:"https://go.googlesource.com/sys"} at https://golang.org/x/sys/windows?go-get=1
get "golang.org/x/sys/windows": verifying non-authoritative meta tag
Fetching https://golang.org/x/sys?go-get=1
Parsing meta tags from https://golang.org/x/sys?go-get=1 (status code 200)
golang.org/x/sys (download)
github.com/src-d/envconfig (download)
github.com/serenize/snaker (download)
github.com/x-cray/logrus-prefixed-formatter (download)
github.com/mgutz/ansi (download)
github.com/mattn/go-colorable (download)
github.com/mattn/go-isatty (download)
github.com/pkg/errors
github.com/satori/go.uuid
gopkg.in/src-d/go-errors.v0
gopkg.in/src-d/go-errors.v1
gopkg.in/vmihailenco/msgpack.v2/codes
gopkg.in/vmihailenco/msgpack.v2
github.com/jpillora/backoff
github.com/kelseyhightower/envconfig
gopkg.in/src-d/go-queue.v1
github.com/streadway/amqp
golang.org/x/sys/windows
github.com/serenize/snaker
github.com/src-d/envconfig
golang.org/x/crypto/ssh/terminal
github.com/mattn/go-isatty
github.com/sirupsen/logrus
github.com/mattn/go-colorable
github.com/mgutz/ansi
gopkg.in/src-d/go-queue.v1/memory
github.com/x-cray/logrus-prefixed-formatter
github.com/stretchr/testify/vendor/github.com/davecgh/go-spew/spew
gopkg.in/src-d/go-log.v1
github.com/stretchr/testify/vendor/github.com/pmezard/go-difflib/difflib
gopkg.in/src-d/go-queue.v1/amqp
github.com/stretchr/testify/assert
github.com/stretchr/testify/require
github.com/stretchr/testify/suite
gopkg.in/src-d/go-queue.v1/test
set PATH=C:\Program Files\erl9.2\bin;%PATH%
choco install rabbitmq --ignoredependencies -y
Chocolatey v0.10.8
Installing the following packages:
rabbitmq
By installing you accept licenses for the packages.
Progress: Downloading rabbitmq 3.7.3... 100%
rabbitmq v3.7.3 [Approved]
rabbitmq package files install completed. Performing other installation steps.
Downloading rabbitmq 
  from 'https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-3.7.3.exe'
Progress: 100% - Completed download of C:\Users\appveyor\AppData\Local\Temp\1\chocolatey\rabbitmq\3.7.3\rabbitmq-server-3.7.3.exe (11.04 MB).
Download of rabbitmq-server-3.7.3.exe (11.04 MB) completed.
Installing rabbitmq...
rabbitmq has been installed.
C:\Program Files\erl9.2\erts-9.2\bin\erlsrv: Service RabbitMQ enabled.
Enabling plugins on node rabbit@APPVYR-WIN:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@APPVYR-WIN...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
enabled 3 plugins.
Offline change; changes will take effect at broker restart.
RabbitMQ service is already present - only updating service parameters
  rabbitmq may be able to be automatically uninstalled.
Environment Vars (like PATH) have changed. Close/reopen your shell to
 see the changes (or in powershell/cmd.exe just type `refreshenv`).
 The install of rabbitmq was successful.
  Software installed as 'EXE', install location is likely default.
Chocolatey installed 1/1 packages. 
 See the log for details (C:\ProgramData\chocolatey\logs\chocolatey.log).
Start-Sleep -s 2
go test -v ./...
=== RUN   TestNewBroker
--- PASS: TestNewBroker (0.00s)
=== RUN   ExampleMemoryQueue
--- PASS: ExampleMemoryQueue (0.00s)
PASS
ok  	gopkg.in/src-d/go-queue.v1	0.057s
=== RUN   TestAMQPSuite
=== RUN   TestAMQPSuite/TestConsume_empty
=== RUN   TestAMQPSuite/TestConsumersCanShareJobIteratorConcurrently
=== RUN   TestAMQPSuite/TestDelayed
=== RUN   TestAMQPSuite/TestJobIter_Next_empty
=== RUN   TestAMQPSuite/TestJob_Reject_no_requeue
=== RUN   TestAMQPSuite/TestJob_Reject_requeue
=== RUN   TestAMQPSuite/TestPublishAndConsume_immediate_ack
=== RUN   TestAMQPSuite/TestPublishDelayed_empty
=== RUN   TestAMQPSuite/TestPublishDelayed_nil
=== RUN   TestAMQPSuite/TestPublish_empty
=== RUN   TestAMQPSuite/TestPublish_nil
=== RUN   TestAMQPSuite/TestRetryQueue
=== RUN   TestAMQPSuite/TestTransaction
=== RUN   TestAMQPSuite/TestTransaction_Error
=== RUN   TestAMQPSuite/TestTransaction_not_supported
--- FAIL: TestAMQPSuite (10.60s)
    --- FAIL: TestAMQPSuite/TestConsume_empty (1.05s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestConsume_empty
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestConsume_empty
    --- FAIL: TestAMQPSuite/TestConsumersCanShareJobIteratorConcurrently (1.20s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestConsumersCanShareJobIteratorConcurrently
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestConsumersCanShareJobIteratorConcurrently
    --- FAIL: TestAMQPSuite/TestDelayed (1.00s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestDelayed
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestDelayed
    --- FAIL: TestAMQPSuite/TestJobIter_Next_empty (1.00s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestJobIter_Next_empty
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestJobIter_Next_empty
    --- FAIL: TestAMQPSuite/TestJob_Reject_no_requeue (1.03s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestJob_Reject_no_requeue
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestJob_Reject_no_requeue
    --- FAIL: TestAMQPSuite/TestJob_Reject_requeue (1.02s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestJob_Reject_requeue
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestJob_Reject_requeue
    --- FAIL: TestAMQPSuite/TestPublishAndConsume_immediate_ack (1.01s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestPublishAndConsume_immediate_ack
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestPublishAndConsume_immediate_ack
    --- FAIL: TestAMQPSuite/TestPublishDelayed_empty (1.02s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestPublishDelayed_empty
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestPublishDelayed_empty
    --- FAIL: TestAMQPSuite/TestPublishDelayed_nil (1.02s)
    	suite.go:39: 
    			Error Trace:	suite.go:39
    			            				suite.go:88
    			Error:      	Received unexpected error:
    			            	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			            	
    			            	gopkg.in/src-d/go-queue.v1/amqp.New
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:78
    			            	gopkg.in/src-d/go-queue.v1/amqp.init.0.func1
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/amqp/amqp.go:27
    			            	gopkg.in/src-d/go-queue%2ev1.NewBroker
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/register.go:49
    			            	gopkg.in/src-d/go-queue.v1/test.(*QueueSuite).SetupTest
    			            		c:/gopath/src/gopkg.in/src-d/go-queue.v1/test/suite.go:38
    			            	github.com/stretchr/testify/suite.Run.func2
    			            		c:/gopath/src/github.com/stretchr/testify/suite/suite.go:88
    			            	testing.tRunner
    			            		C:/go/src/testing/testing.go:777
    			            	runtime.goexit
    			            		C:/go/src/runtime/asm_amd64.s:2361
    			Test:       	TestAMQPSuite/TestPublishDelayed_nil
    	suite.go:40: 
    			Error Trace:	suite.go:40
    			            				suite.go:88
    			Error:      	failed to connect to RabbitMQ: dial tcp 127.0.0.1:5672: connectex: No connection could be made because the target machine actively refused it.
    			Test:       	TestAMQPSuite/TestPublishDelayed_nil
    --- PASS: TestAMQPSuite/TestPublish_empty (0.53s)
    --- PASS: TestAMQPSuite/TestPublish_nil (0.09s)
    --- PASS: TestAMQPSuite/TestRetryQueue (0.53s)
    --- PASS: TestAMQPSuite/TestTransaction (0.02s)
    --- PASS: TestAMQPSuite/TestTransaction_Error (0.07s)
    --- SKIP: TestAMQPSuite/TestTransaction_not_supported (0.00s)
    	suite.go:438: transactions supported
=== RUN   TestDefaultConfig
--- PASS: TestDefaultConfig (0.00s)
=== RUN   TestNewAMQPBroker_bad_url
--- PASS: TestNewAMQPBroker_bad_url (0.00s)
=== RUN   TestAMQPPriorities
--- PASS: TestAMQPPriorities (0.33s)
=== RUN   TestAMQPHeaders
=== RUN   TestAMQPHeaders/with_x-retries_and_x-error-type_headers
=== RUN   TestAMQPHeaders/with_x-retries_header
=== RUN   TestAMQPHeaders/with_x-error-type_headers
=== RUN   TestAMQPHeaders/with_no_headers
--- PASS: TestAMQPHeaders (0.05s)
    --- PASS: TestAMQPHeaders/with_x-retries_and_x-error-type_headers (0.00s)
    --- PASS: TestAMQPHeaders/with_x-retries_header (0.00s)
    --- PASS: TestAMQPHeaders/with_x-error-type_headers (0.00s)
    --- PASS: TestAMQPHeaders/with_no_headers (0.00s)
=== RUN   TestAMQPRepublishBuried
--- PASS: TestAMQPRepublishBuried (4.71s)
FAIL
FAIL	gopkg.in/src-d/go-queue.v1/amqp	16.470s
=== RUN   TestMemorySuite
=== RUN   TestMemorySuite/TestConsume_empty
=== RUN   TestMemorySuite/TestConsumersCanShareJobIteratorConcurrently
=== RUN   TestMemorySuite/TestDelayed
=== RUN   TestMemorySuite/TestIntegration
=== RUN   TestMemorySuite/TestJobIter_Next_empty
=== RUN   TestMemorySuite/TestJob_Reject_no_requeue
=== RUN   TestMemorySuite/TestJob_Reject_requeue
=== RUN   TestMemorySuite/TestPublishAndConsume_immediate_ack
=== RUN   TestMemorySuite/TestPublishDelayed_empty
=== RUN   TestMemorySuite/TestPublishDelayed_nil
=== RUN   TestMemorySuite/TestPublish_empty
=== RUN   TestMemorySuite/TestPublish_nil
=== RUN   TestMemorySuite/TestRetryQueue
=== RUN   TestMemorySuite/TestTransaction
=== RUN   TestMemorySuite/TestTransaction_Error
=== RUN   TestMemorySuite/TestTransaction_not_supported
--- PASS: TestMemorySuite (1.33s)
    --- PASS: TestMemorySuite/TestConsume_empty (0.00s)
    --- PASS: TestMemorySuite/TestConsumersCanShareJobIteratorConcurrently (0.00s)
    --- PASS: TestMemorySuite/TestDelayed (1.25s)
    --- PASS: TestMemorySuite/TestIntegration (0.06s)
    --- PASS: TestMemorySuite/TestJobIter_Next_empty (0.00s)
    --- PASS: TestMemorySuite/TestJob_Reject_no_requeue (0.00s)
    --- PASS: TestMemorySuite/TestJob_Reject_requeue (0.00s)
    --- PASS: TestMemorySuite/TestPublishAndConsume_immediate_ack (0.00s)
    --- PASS: TestMemorySuite/TestPublishDelayed_empty (0.00s)
    --- PASS: TestMemorySuite/TestPublishDelayed_nil (0.00s)
    --- PASS: TestMemorySuite/TestPublish_empty (0.00s)
    --- PASS: TestMemorySuite/TestPublish_nil (0.00s)
    --- PASS: TestMemorySuite/TestRetryQueue (0.00s)
    --- PASS: TestMemorySuite/TestTransaction (0.00s)
    --- PASS: TestMemorySuite/TestTransaction_Error (0.00s)
    --- SKIP: TestMemorySuite/TestTransaction_not_supported (0.00s)
    	suite.go:438: transactions supported
PASS
ok  	gopkg.in/src-d/go-queue.v1/memory	1.879s
?   	gopkg.in/src-d/go-queue.v1/test	[no test files]
Command exited with code 1

v1 is not compilable in go 1.14

Running go get gopkg.in/src-d/go-queue.v1 with go 1.14 results in this error message:

# gopkg.in/src-d/go-queue.v1
/Users/mhek/go/pkg/mod/gopkg.in/src-d/[email protected]/job.go:50:9: assignment mismatch: 2 variables but uuid.NewV4 returns 1 values

Change republish logic to get messages with blocking call

The current code does a non blocking read and retries after 50ms if no message was received. This wastes 50ms per job published:

time="2018-06-12T13:11:51.289605092Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.3399403Z" level=debug msg="republished job" duration=62.436µs id=264fee23-13a2-4835-b091-138ed0c0cfb3 source="amqp/amqp.go:400"
time="2018-06-12T13:11:51.340008442Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.390390193Z" level=debug msg="republished job" duration=102.438µs id=4441f18d-425b-41fd-90d8-e030b929076b source="amqp/amqp.go:400"
time="2018-06-12T13:11:51.390468293Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.440788261Z" level=debug msg="republished job" duration=66.511µs id=c2daff16-97a7-4bd8-a7e7-0d5e6b6011b2 source="amqp/amqp.go:400"
time="2018-06-12T13:11:51.440856836Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.491297249Z" level=debug msg="republished job" duration=109.879µs id=7ab5e472-ef93-4596-8c45-380096ca1425 source="amqp/amqp.go:400"
time="2018-06-12T13:11:51.491402971Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.541833233Z" level=debug msg="republished job" duration=97.156µs id=7d5d28f5-5c88-4927-bfa5-0d1c13e34242 source="amqp/amqp.go:400"
time="2018-06-12T13:11:51.541908884Z" level=debug msg="received empty job" retries=0 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.592080956Z" level=debug msg="received empty job" retries=1 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.642332159Z" level=debug msg="received empty job" retries=2 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.692657338Z" level=debug msg="received empty job" retries=3 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.743047815Z" level=debug msg="received empty job" retries=4 source="amqp/amqp.go:360"
time="2018-06-12T13:11:51.743236765Z" level=debug msg="maximum number of retries reached" max-retries=3 retries=4 source="amqp/amqp.go:371"
time="2018-06-12T13:11:51.743286551Z" level=debug msg="rejecting 0 non complying jobs" source="go-log.v1/default.go:51"
time="2018-06-12T13:11:51.745363Z" level=info msg="stopping republishing jobs" source="go-log.v1/default.go:56"

Change to a blocking channel read with a timer:

https://github.com/src-d/go-queue/blob/master/amqp/amqp.go#L502-L513

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.