Code Monkey home page Code Monkey logo

machinery's Introduction

Machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.

Travis Status for RichardKnop/machinery godoc for RichardKnop/machinery codecov for RichardKnop/machinery

Go Report Card GolangCI OpenTracing Badge

Sourcegraph for RichardKnop/machinery Donate Bitcoin


V2

I recommend using V2 in order to avoid having to import all dependencies for brokers and backends you are not using.

Instead of factory, you will need to inject broker and backend objects to the server constructor:

import (
  "github.com/RichardKnop/machinery/v2"
  backendsiface "github.com/RichardKnop/machinery/v2/backends/iface"
  brokersiface "github.com/RichardKnop/machinery/v2/brokers/iface"
  locksiface "github.com/RichardKnop/machinery/v2/locks/iface"
)

var broker brokersiface.Broker
var backend backendsiface.Backend
var lock locksiface.Lock
server := machinery.NewServer(cnf, broker, backend, lock)
// server.NewWorker("machinery", 10)

First Steps

To install recommended v2 release:

go get github.com/RichardKnop/machinery/v2

If you want to use legacy v1 version, you still can:

go get github.com/RichardKnop/machinery

First, you will need to define some tasks. Look at sample tasks in v2/example/tasks/tasks.go to see a few examples.

Second, you will need to launch a worker process with one of these commands (v2 is recommended since it doesn't import dependencies for all brokers / backends, only those you actually need):

cd v2/
go run example/amqp/main.go worker
go run example/redigo/main.go worker // Redis with redigo driver
go run example/go-redis/main.go worker // Redis with Go Redis driver

go run example/amqp/main.go worker
go run example/redis/main.go worker

Example worker

Finally, once you have a worker running and waiting for tasks to consume, send some tasks with one of these commands (v2 is recommended since it doesn't import dependencies for all brokers / backends, only those you actually need):

cd v2
go run v2/example/amqp/main.go send
go run v2/example/redigo/main.go send // Redis with redigo driver
go run v2/example/go-redis/main.go send // Redis with Go Redis driver

You will be able to see the tasks being processed asynchronously by the worker:

Example worker receives tasks

Configuration

The config package has convenience methods for loading configuration from environment variables or a YAML file. For example, load configuration from environment variables:

cnf, err := config.NewFromEnvironment()

Or load from YAML file:

cnf, err := config.NewFromYaml("config.yml", true)

Second boolean flag enables live reloading of configuration every 10 seconds. Use false to disable live reloading.

Machinery configuration is encapsulated by a Config struct and injected as a dependency to objects that need it.

Lock

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379

Broker

A message broker. Currently supported brokers are:

AMQP

Use AMQP URL in the format:

amqp://[username:password@]@host[:port]

For example:

  1. amqp://guest:guest@localhost:5672

AMQP also supports multiples brokers urls. You need to specify the URL separator in the MultipleBrokerSeparator field.

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379
  2. redis+socket://password@/path/to/file.sock:/0
AWS SQS

Use AWS SQS URL in the format:

https://sqs.us-east-2.amazonaws.com/123456789012

See AWS SQS docs for more information. Also, configuring AWS_REGION is required, or an error would be thrown.

To use a manually configured SQS Client:

var sqsClient = sqs.New(session.Must(session.NewSession(&aws.Config{
  Region:         aws.String("YOUR_AWS_REGION"),
  Credentials:    credentials.NewStaticCredentials("YOUR_AWS_ACCESS_KEY", "YOUR_AWS_ACCESS_SECRET", ""),
  HTTPClient:     &http.Client{
    Timeout: time.Second * 120,
  },
})))
var visibilityTimeout = 20
var cnf = &config.Config{
  Broker:          "YOUR_SQS_URL"
  DefaultQueue:    "machinery_tasks",
  ResultBackend:   "YOUR_BACKEND_URL",
  SQS: &config.SQSConfig{
    Client: sqsClient,
    // if VisibilityTimeout is nil default to the overall visibility timeout setting for the queue
    // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
    VisibilityTimeout: &visibilityTimeout,
    WaitTimeSeconds: 30,
  },
}
GCP Pub/Sub

Use GCP Pub/Sub URL in the format:

gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME

To use a manually configured Pub/Sub Client:

pubsubClient, err := pubsub.NewClient(
    context.Background(),
    "YOUR_GCP_PROJECT_ID",
    option.WithServiceAccountFile("YOUR_GCP_SERVICE_ACCOUNT_FILE"),
)

cnf := &config.Config{
  Broker:          "gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME"
  DefaultQueue:    "YOUR_PUBSUB_TOPIC_NAME",
  ResultBackend:   "YOUR_BACKEND_URL",
  GCPPubSub: config.GCPPubSubConfig{
    Client: pubsubClient,
  },
}

DefaultQueue

Default queue name, e.g. machinery_tasks.

ResultBackend

Result backend to use for keeping task states and results.

Currently supported backends are:

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379
  2. redis+socket://password@/path/to/file.sock:/0
  3. cluster redis://host1:port1,host2:port2,host3:port3
  4. cluster with password redis://pass@host1:port1,host2:port2,host3:port3
Memcache

Use Memcache URL in the format:

memcache://host1[:port1][,host2[:port2],...[,hostN[:portN]]]

For example:

  1. memcache://localhost:11211 for a single instance, or
  2. memcache://10.0.0.1:11211,10.0.0.2:11211 for a cluster
AMQP

Use AMQP URL in the format:

amqp://[username:password@]@host[:port]

For example:

  1. amqp://guest:guest@localhost:5672

Keep in mind AMQP is not recommended as a result backend. See Keeping Results

MongoDB

Use Mongodb URL in the format:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

For example:

  1. mongodb://localhost:27017/taskresults

See MongoDB docs for more information.

ResultsExpireIn

How long to store task results for in seconds. Defaults to 3600 (1 hour).

AMQP

RabbitMQ related configuration. Not necessary if you are using other broker/backend.

  • Exchange: exchange name, e.g. machinery_exchange
  • ExchangeType: exchange type, e.g. direct
  • QueueBindingArguments: an optional map of additional arguments used when binding to an AMQP queue
  • BindingKey: The queue is bind to the exchange with this key, e.g. machinery_task
  • PrefetchCount: How many tasks to prefetch (set to 1 if you have long running tasks)
  • DelayedQueue: delayed queue name to be used for task retry or delayed task (if empty it will follow auto create and delate delayed queues)

DynamoDB

DynamoDB related configuration. Not necessary if you are using other backend.

  • TaskStatesTable: Custom table name for saving task states. Default one is task_states, and make sure to create this table in your AWS admin first, using TaskUUID as table's primary key.
  • GroupMetasTable: Custom table name for saving group metas. Default one is group_metas, and make sure to create this table in your AWS admin first, using GroupUUID as table's primary key. For example:
dynamodb:
  task_states_table: 'task_states'
  group_metas_table: 'group_metas'

If these tables are not found, an fatal error would be thrown.

If you wish to expire the records, you can configure the TTL field in AWS admin for these tables. The TTL field is set based on the ResultsExpireIn value in the Server's config. See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html for more information.

Redis

Redis related configuration. Not necessary if you are using other backend.

See: config (TODO)

GCPPubSub

GCPPubSub related configuration. Not necessary if you are using other backend.

See: config (TODO)

Custom Logger

You can define a custom logger by implementing the following interface:

type Interface interface {
  Print(...interface{})
  Printf(string, ...interface{})
  Println(...interface{})

  Fatal(...interface{})
  Fatalf(string, ...interface{})
  Fatalln(...interface{})

  Panic(...interface{})
  Panicf(string, ...interface{})
  Panicln(...interface{})
}

Then just set the logger in your setup code by calling Set function exported by github.com/RichardKnop/machinery/v1/log package:

log.Set(myCustomLogger)

Server

A Machinery library must be instantiated before use. The way this is done is by creating a Server instance. Server is a base object which stores Machinery configuration and registered tasks. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/config"
  "github.com/RichardKnop/machinery/v1"
)

var cnf = &config.Config{
  Broker:        "amqp://guest:guest@localhost:5672/",
  DefaultQueue:  "machinery_tasks",
  ResultBackend: "amqp://guest:guest@localhost:5672/",
  AMQP: &config.AMQPConfig{
    Exchange:     "machinery_exchange",
    ExchangeType: "direct",
    BindingKey:   "machinery_task",
  },
}

server, err := machinery.NewServer(cnf)
if err != nil {
  // do something with the error
}

Workers

In order to consume tasks, you need to have one or more workers running. All you need to run a worker is a Server instance with registered tasks. E.g.:

worker := server.NewWorker("worker_name", 10)
err := worker.Launch()
if err != nil {
  // do something with the error
}

Each worker will only consume registered tasks. For each task on the queue the Worker.Process() method will be run in a goroutine. Use the second parameter of server.NewWorker to limit the number of concurrently running Worker.Process() calls (per worker). Example: 1 will serialize task execution while 0 makes the number of concurrently executed tasks unlimited (default).

Tasks

Tasks are a building block of Machinery applications. A task is a function which defines what happens when a worker receives a message.

Each task needs to return an error as a last return value. In addition to error tasks can now return any number of arguments.

Examples of valid tasks:

func Add(args ...int64) (int64, error) {
  sum := int64(0)
  for _, arg := range args {
    sum += arg
  }
  return sum, nil
}

func Multiply(args ...int64) (int64, error) {
  sum := int64(1)
  for _, arg := range args {
    sum *= arg
  }
  return sum, nil
}

// You can use context.Context as first argument to tasks, useful for open tracing
func TaskWithContext(ctx context.Context, arg Arg) error {
  // ... use ctx ...
  return nil
}

// Tasks need to return at least error as a minimal requirement
func DummyTask(arg string) error {
  return errors.New(arg)
}

// You can also return multiple results from the task
func DummyTask2(arg1, arg2 string) (string, string, error) {
  return arg1, arg2, nil
}

Registering Tasks

Before your workers can consume a task, you need to register it with the server. This is done by assigning a task a unique name:

server.RegisterTasks(map[string]interface{}{
  "add":      Add,
  "multiply": Multiply,
})

Tasks can also be registered one by one:

server.RegisterTask("add", Add)
server.RegisterTask("multiply", Multiply)

Simply put, when a worker receives a message like this:

{
  "UUID": "48760a1a-8576-4536-973b-da09048c2ac5",
  "Name": "add",
  "RoutingKey": "",
  "ETA": null,
  "GroupUUID": "",
  "GroupTaskCount": 0,
  "Args": [
    {
      "Type": "int64",
      "Value": 1,
    },
    {
      "Type": "int64",
      "Value": 1,
    }
  ],
  "Immutable": false,
  "RetryCount": 0,
  "RetryTimeout": 0,
  "OnSuccess": null,
  "OnError": null,
  "ChordCallback": null
}

It will call Add(1, 1). Each task should return an error as well so we can handle failures.

Ideally, tasks should be idempotent which means there will be no unintended consequences when a task is called multiple times with the same arguments.

Signatures

A signature wraps calling arguments, execution options (such as immutability) and success/error callbacks of a task so it can be sent across the wire to workers. Task signatures implement a simple interface:

// Arg represents a single argument passed to invocation fo a task
type Arg struct {
  Type  string
  Value interface{}
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// Signature represents a single task invocation
type Signature struct {
  UUID           string
  Name           string
  RoutingKey     string
  ETA            *time.Time
  GroupUUID      string
  GroupTaskCount int
  Args           []Arg
  Headers        Headers
  Immutable      bool
  RetryCount     int
  RetryTimeout   int
  OnSuccess      []*Signature
  OnError        []*Signature
  ChordCallback  *Signature
}

UUID is a unique ID of a task. You can either set it yourself or it will be automatically generated.

Name is the unique task name by which it is registered against a Server instance.

RoutingKey is used for routing a task to correct queue. If you leave it empty, the default behaviour will be to set it to the default queue's binding key for direct exchange type and to the default queue name for other exchange types.

ETA is a timestamp used for delaying a task. if it's nil, the task will be published for workers to consume immediately. If it is set, the task will be delayed until the ETA timestamp.

GroupUUID, GroupTaskCount are useful for creating groups of tasks.

Args is a list of arguments that will be passed to the task when it is executed by a worker.

Headers is a list of headers that will be used when publishing the task to AMQP queue.

Immutable is a flag which defines whether a result of the executed task can be modified or not. This is important with OnSuccess callbacks. Immutable task will not pass its result to its success callbacks while a mutable task will prepend its result to args sent to callback tasks. Long story short, set Immutable to false if you want to pass result of the first task in a chain to the second task.

RetryCount specifies how many times a failed task should be retried (defaults to 0). Retry attempts will be spaced out in time, after each failure another attempt will be scheduled further to the future.

RetryTimeout specifies how long to wait before resending task to the queue for retry attempt. Default behaviour is to use fibonacci sequence to increase the timeout after each failed retry attempt.

OnSuccess defines tasks which will be called after the task has executed successfully. It is a slice of task signature structs.

OnError defines tasks which will be called after the task execution fails. The first argument passed to error callbacks will be the error string returned from the failed task.

ChordCallback is used to create a callback to a group of tasks.

Supported Types

Machinery encodes tasks to JSON before sending them to the broker. Task results are also stored in the backend as JSON encoded strings. Therefor only types with native JSON representation can be supported. Currently supported types are:

  • bool
  • int
  • int8
  • int16
  • int32
  • int64
  • uint
  • uint8
  • uint16
  • uint32
  • uint64
  • float32
  • float64
  • string
  • []bool
  • []int
  • []int8
  • []int16
  • []int32
  • []int64
  • []uint
  • []uint8
  • []uint16
  • []uint32
  • []uint64
  • []float32
  • []float64
  • []string

Sending Tasks

Tasks can be called by passing an instance of Signature to an Server instance. E.g:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
)

signature := &tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

asyncResult, err := server.SendTask(signature)
if err != nil {
  // failed to send the task
  // do something with the error
}

Delayed Tasks

You can delay a task by setting the ETA timestamp field on the task signature.

// Delay the task by 5 seconds
eta := time.Now().UTC().Add(time.Second * 5)
signature.ETA = &eta

Retry Tasks

You can set a number of retry attempts before declaring task as failed. Fibonacci sequence will be used to space out retry requests over time. (See RetryTimeout for details.)

// If the task fails, retry it up to 3 times
signature.RetryCount = 3

Alternatively, you can return tasks.ErrRetryTaskLater from your task and specify duration after which the task should be retried, e.g.:

return tasks.NewErrRetryTaskLater("some error", 4 * time.Hour)

Get Pending Tasks

Tasks currently waiting in the queue to be consumed by workers can be inspected, e.g.:

server.GetBroker().GetPendingTasks("some_queue")

Currently only supported by Redis broker.

Keeping Results

If you configure a result backend, the task states and results will be persisted. Possible states:

const (
	// StatePending - initial state of a task
	StatePending = "PENDING"
	// StateReceived - when task is received by a worker
	StateReceived = "RECEIVED"
	// StateStarted - when the worker starts processing the task
	StateStarted = "STARTED"
	// StateRetry - when failed task has been scheduled for retry
	StateRetry = "RETRY"
	// StateSuccess - when the task is processed successfully
	StateSuccess = "SUCCESS"
	// StateFailure - when processing of the task fails
	StateFailure = "FAILURE"
)

When using AMQP as a result backend, task states will be persisted in separate queues for each task. Although RabbitMQ can scale up to thousands of queues, it is strongly advised to use a better suited result backend (e.g. Memcache) when you are expecting to run a large number of parallel tasks.

// TaskResult represents an actual return value of a processed task
type TaskResult struct {
  Type  string      `bson:"type"`
  Value interface{} `bson:"value"`
}

// TaskState represents a state of a task
type TaskState struct {
  TaskUUID  string        `bson:"_id"`
  State     string        `bson:"state"`
  Results   []*TaskResult `bson:"results"`
  Error     string        `bson:"error"`
}

// GroupMeta stores useful metadata about tasks within the same group
// E.g. UUIDs of all tasks which are used in order to check if all tasks
// completed successfully or not and thus whether to trigger chord callback
type GroupMeta struct {
  GroupUUID      string   `bson:"_id"`
  TaskUUIDs      []string `bson:"task_uuids"`
  ChordTriggered bool     `bson:"chord_triggered"`
  Lock           bool     `bson:"lock"`
}

TaskResult represents a slice of return values of a processed task.

TaskState struct will be serialized and stored every time a task state changes.

GroupMeta stores useful metadata about tasks within the same group. E.g. UUIDs of all tasks which are used in order to check if all tasks completed successfully or not and thus whether to trigger chord callback.

AsyncResult object allows you to check for the state of a task:

taskState := asyncResult.GetState()
fmt.Printf("Current state of %v task is:\n", taskState.TaskUUID)
fmt.Println(taskState.State)

There are couple of convenient methods to inspect the task status:

asyncResult.GetState().IsCompleted()
asyncResult.GetState().IsSuccess()
asyncResult.GetState().IsFailure()

You can also do a synchronous blocking call to wait for a task result:

results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a task failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Error Handling

When a task returns with an error, the default behavior is to first attempty to retry the task if it's retriable, otherwise log the error and then eventually call any error callbacks.

To customize this, you can set a custom error handler on the worker which can do more than just logging after retries fail and error callbacks are trigerred:

worker.SetErrorHandler(func (err error) {
  customHandler(err)
})

Workflows

Running a single asynchronous task is fine but often you will want to design a workflow of tasks to be executed in an orchestrated way. There are couple of useful functions to help you design workflows.

Groups

Group is a set of tasks which will be executed in parallel, independent of each other. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

group, _ := tasks.NewGroup(&signature1, &signature2)
asyncResults, err := server.SendGroup(group, 0) //The second parameter specifies the number of concurrent sending tasks. 0 means unlimited.
if err != nil {
  // failed to send the group
  // do something with the error
}

SendGroup returns a slice of AsyncResult objects. So you can do a blocking call and wait for the result of groups tasks:

for _, asyncResult := range asyncResults {
  results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
  if err != nil {
    // getting result of a task failed
    // do something with the error
  }
  for _, result := range results {
    fmt.Println(result.Interface())
  }
}

Chords

Chord allows you to define a callback to be executed after all tasks in a group finished processing, e.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
}

group := tasks.NewGroup(&signature1, &signature2)
chord, _ := tasks.NewChord(group, &signature3)
chordAsyncResult, err := server.SendChord(chord, 0) //The second parameter specifies the number of concurrent sending tasks. 0 means unlimited.
if err != nil {
  // failed to send the chord
  // do something with the error
}

The above example executes task1 and task2 in parallel, aggregates their results and passes them to task3. Therefore what would end up happening is:

multiply(add(1, 1), add(5, 5))

More explicitly:

(1 + 1) * (5 + 5) = 2 * 10 = 20

SendChord returns ChordAsyncResult which follows AsyncResult's interface. So you can do a blocking call and wait for the result of the callback:

results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a chord failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Chains

Chain is simply a set of tasks which will be executed one by one, each successful task triggering the next task in the chain. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 4,
    },
  },
}

chain, _ := tasks.NewChain(&signature1, &signature2, &signature3)
chainAsyncResult, err := server.SendChain(chain)
if err != nil {
  // failed to send the chain
  // do something with the error
}

The above example executes task1, then task2 and then task3. When a task is completed successfully, the result is appended to the end of list of arguments for the next task in the chain. Therefore what would end up happening is:

multiply(4, add(5, 5, add(1, 1)))

More explicitly:

  4 * (5 + 5 + (1 + 1))   # task1: add(1, 1)        returns 2
= 4 * (5 + 5 + 2)         # task2: add(5, 5, 2)     returns 12
= 4 * (12)                # task3: multiply(4, 12)  returns 48
= 48

SendChain returns ChainAsyncResult which follows AsyncResult's interface. So you can do a blocking call and wait for the result of the whole chain:

results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a chain failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Periodic Tasks & Workflows

Machinery now supports scheduling periodic tasks and workflows. See examples bellow.

Periodic Tasks

import (
  "github.com/RichardKnop/machinery/v1/tasks"
)

signature := &tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}
err := server.RegisterPeriodicTask("0 6 * * ?", "periodic-task", signature)
if err != nil {
  // failed to register periodic task
}

Periodic Groups

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

group, _ := tasks.NewGroup(&signature1, &signature2)
err := server.RegisterPeriodicGroup("0 6 * * ?", "periodic-group", group)
if err != nil {
  // failed to register periodic group
}

Periodic Chains

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 4,
    },
  },
}

chain, _ := tasks.NewChain(&signature1, &signature2, &signature3)
err := server.RegisterPeriodicChain("0 6 * * ?", "periodic-chain", chain)
if err != nil {
  // failed to register periodic chain
}

Chord

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
}

group := tasks.NewGroup(&signature1, &signature2)
chord, _ := tasks.NewChord(group, &signature3)
err := server.RegisterPeriodicChord("0 6 * * ?", "periodic-chord", chord)
if err != nil {
  // failed to register periodic chord
}

Development

Requirements

  • Go
  • RabbitMQ (optional)
  • Redis
  • Memcached (optional)
  • MongoDB (optional)

On OS X systems, you can install requirements using Homebrew:

brew install go
brew install rabbitmq
brew install redis
brew install memcached
brew install mongodb

Or optionally use the corresponding Docker containers:

docker run -d -p 5672:5672 rabbitmq
docker run -d -p 6379:6379 redis
docker run -d -p 11211:11211 memcached
docker run -d -p 27017:27017 mongo
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

Dependencies

Since Go 1.11, a new recommended dependency management system is via modules.

This is one of slight weaknesses of Go as dependency management is not a solved problem. Previously Go was officially recommending to use the dep tool but that has been abandoned now in favor of modules.

Testing

Easiest (and platform agnostic) way to run tests is via docker-compose:

make ci

This will basically run docker-compose command:

(docker-compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut)

Alternative approach is to setup a development environment on your machine.

In order to enable integration tests, you will need to install all required services (RabbitMQ, Redis, Memcache, MongoDB) and export these environment variables:

export AMQP_URL=amqp://guest:guest@localhost:5672/
export REDIS_URL=localhost:6379
export MEMCACHE_URL=localhost:11211
export MONGODB_URL=localhost:27017

To run integration tests against an SQS instance, you will need to create a "test_queue" in SQS and export these environment variables:

export SQS_URL=https://YOUR_SQS_URL
export AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
export AWS_DEFAULT_REGION=YOUR_AWS_DEFAULT_REGION

Then just run:

make test

If the environment variables are not exported, make test will only run unit tests.

machinery's People

Contributors

bharat-p avatar bruceadowns avatar charleswhchan avatar connordoyle avatar eldad87 avatar evolsnow avatar gngeorgiev avatar gow avatar hitmanranbo avatar itsleeowen avatar mars4myshare avatar minight avatar mjarkk avatar mjetpax avatar ofernandezcr92 avatar owenhaynes avatar pyyoshi avatar rainbowmango avatar requilence avatar richardknop avatar rio avatar shivanshgaur avatar speza avatar surendratiwari3 avatar vincenthcui avatar willabides avatar winggao avatar xiaozuo7 avatar xtzhangfw avatar zhenhangtung avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

machinery's Issues

Runtime Error: index out of range

file: ./worker.go
line: 139

if results[1].IsNil() 

this code on my machine will got an error "Error = runtime error: index out of range"

value of "results" is [<int Value>]

go version: go1.6.3 darwin/amd64

task:

func Add(a int, b int) int {
    return a + b
}

worker:

task := signatures.TaskSignature{
        Name: "add",
        Args: []signatures.TaskArg{
            {
                Type:  "int",
                Value: 1,
            },
            {
                Type:  "int",
                Value: 1,
            },
        },
    }
_, sendTaskErr := server.SendTask(&task)

worker doesn't make use of concurrency

My task logic would send new tasks within a task body. In this case, this worker no longer receive no tasks until it finishes the current one. It can be workarounded by initiating multiple workers on one nodes but it seems strange.

To me, it's a waste of resource. But is it a valid scenario? Or should I avoid initiating tasks within the task body?

Task stuck due to "Error: Reflect task args: test is not xxx"

If a task argument value of incorrect type is passed, the tasksignature.Get method get stuck waiting
for a result.

Example client:

    signatures.TaskArg{
            Type:  "int64",
            Value: "this-is-a-string",
    },

    asyncResult, err := server.SendTask(&task)
    if err != nil {
        fmt.Println(err)
    }

    result, err := asyncResult.Get()

The worker displays the following error:

2016/01/04 16:18:47 Failed processing task_488a2edb-433b-4fda-a4eb-d5a59568f3a3. Error = this-is-a-string is not int64
2016/01/04 16:18:47 Going to retry launching the worker. Error: Reflect task args: this-is-a-string is not int64

I think would be better to set the task state to failed, and return the error.

Sidekiq

Hey,

just a suggestion - use Sidekiq way of doing things to make this useful for a lot broader user base. Will help with adoption/migration for sure since a lot of people are using that.

I can document everything (how sidekiq works with redis) and also help with some implementation, although my Go skills leave a lot to be desired ;)

Codeship Build Fails

Running tests on Codeship fails. Investigate and try to find a source of the problem.

There have been intermitten issues with integration tests on Travis as well but that is less pressing.

Refactor Tests

I have been going over tests recently and while the test coverage is decent, there is a lot of low hanging fruit to improve the tests. Make them more readable, reuse code.

I would like to use testify library instead of just Go's core testing module. I have been using testify in my other projects and it improves the tests readability and maintainability.

See: https://github.com/stretchr/testify

support customized result backend

The latest server.go changes the following error handling.

backend, err := BackendFactory(cnf)
if err != nil {
    return nil, err
}

It used to be:

// Backend is optional so we ignore the error
backend, _ := BackendFactory(cnf)

This change breaks my customised result backend. I still want to monitor the error returned by NewServer(). Can you revert it or provide an alternative to support customised result backend?

Support unix domain socket Redis connections

Hello,

I'm working on an application which uses Machinery with a Redis backend, both embedded on a single desktop client. For a few reasons, we've concluded that using a unix domain socket is preferable to a network socket. Socket connections are already supported in redigo, so I'd like to propose extending that support to Machinery.

This may not fit perfectly with the current means of instantiating a backend/broker, but here are some thoughts:

  • Add support for redis.DialOptions in RedisBackend or in the NewRedisBackend constructor. This would be the most open-ended and probably best way of customizing use of Redis, since it lets the user granularly set a password, database number, and connection options.
  • The existing ParseRedisURL function wouldn't work well with file paths — currently it assumes that what comes after the final / is a DB number — so maybe a separate URL scheme would be needed. E.g.:
"redissocket:///path/to/file.sock"
"redissocket:///path/to/file.sock:/db" -- use ":" to separate final segment

In either case, the user should be able to supply redis.DialOptions to modify options even if the URL scheme can't.

I'd love to hear your thoughts on this proposal and am willing to work on a pull request if there is a design proposal that you find acceptable. Let me know what you think and how I can help!

A kafka broker

I think it would be really awesome if Kafka can be added as a broker. I am currently investigating using kafka to send messages as part of an event-sourced system on mesos. Being able to use kafka would mean that existing kafka infrastructure can be used, rather than having to build a AMQP cluster on mesos.

worker stopped process task after a while

I haven't nail down the root cause yet. But my worker seems to stopped working after it received a new message, and leave the message un-acknowledged. I tried purge the queue, and 3 were left un-acknowledged. Then I shutdown the worker, the 3 messages are changed back to ready.

I'm always use the latest code base and it has happened two times. Will try again to see if there is a pattern.

Catch error on startup

So I have something like this:

    func init() {
    ...
        server, err := machinery.NewServer(&cnf)
        if err != nil {
            // some error handling
        }

        server.RegisterTasks(tasks)

        worker = server.NewWorker("machinery_worker")
    ...

    func main() {
        err := worker.Launch()
        if err != nil {
            // some error handling
        }
    }

which leads to this if the worker cannot connect to the broker on startup:

    2016/04/20 16:01:59 Launching a worker with the following settings:
    2016/04/20 16:01:59 - Broker: amqp://some_url
    2016/04/20 16:01:59 - ResultBackend: amqp://some_url
    2016/04/20 16:01:59 - Exchange: machinery_exchange
    2016/04/20 16:01:59 - ExchangeType: direct
    2016/04/20 16:01:59 - DefaultQueue: machinery_tasks
    2016/04/20 16:01:59 - BindingKey: machinery_task
    panic: runtime error: invalid memory address or nil pointer dereference
    [signal 0xb code=0x1 addr=0x20 pc=0x3495e9]

    goroutine 5 [running]:
    panic(0x4d3680, 0xc82000a0d0)
        /usr/local/go/src/runtime/panic.go:464 +0x3e6
    github.com/RichardKnop/machinery/vendor/github.com/streadway/amqp.(*Channel).Close(0x0, 0x0, 0x0)
        /Users/andy/checkouts/golang/src/github.com/RichardKnop/machinery/vendor/github.com/streadway/amqp/channel.go:402 +0x39
    github.com/RichardKnop/machinery/v1/brokers.(*AMQPBroker).StartConsuming(0xc8200165c0, 0x5d7220, 0x10, 0xc44828, 0xc8200bd500, 0x1, 0xc40028, 0xc8200e2050)
        /Users/andy/checkouts/golang/src/github.com/RichardKnop/machinery/v1/brokers/amqp.go:57 +0x156
    github.com/RichardKnop/machinery/v1.(*Worker).Launch.func1(0xc44740, 0xc8200165c0, 0xc8200bd500, 0xc820010540)
        /Users/andy/checkouts/golang/src/github.com/RichardKnop/machinery/v1/worker.go:38 +0xa2
    created by github.com/RichardKnop/machinery/v1.(*Worker).Launch
        /Users/andy/checkouts/golang/src/github.com/RichardKnop/machinery/v1/worker.go:47 +0x67d

I'm trying to figure out where the process dies. server.RegisterTasks(tasks) or in worker = server.NewWorker("machinery_worker") (I'm catching all other errors).

Although the output would suggest that the error actually occurs during worker.Launch(), which leads me to believe that there's a bug in the error handling (because err comes back as nil)?

Converting `error` to type `string` has weird consequences

When my function is called and it returns an error, it is converted to string, rather than an error. This causes error to be a string containing <error Value>. Here is an example:

result, err := asyncResult.Get()
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        log.Println(err)
        return
    }

When I run this, I see <error Value> in the log where I print err. It is because in worker.go, on line 92, the second returned argument is converted to string, and the string representation of an error type is <error Value>.

extensible brokers and backend?

current implementation for broker and backend are with factory that use connection prefix as backend selector and i think is not posible to extend the broker or backend? CMIIW how about
database/sql style backends and brokers?

amqp problems

Work with amqp not stable:
from time to time sender hangs on results.Get()
fix: ctrl+c and re-run again. some times from 3rd try.

Today I see worker crashed with this output:
/home/user/go/bin/src/github.com/RichardKnop/machinery/v1/brokers/amqp.go:161 +0x1c9

goroutine 13329 [select]:
github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_Connection).heartbeater(0xc820076240, 0x2540be400, 0xc8200e6300)
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:494 +0x505
created by github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_Connection).openTune
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:713 +0x805

goroutine 13330 [chan receive]:
github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.bufferDeliveries(0xc8200e65a0, 0xc8200e6540)
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/consumers.go:38 +0x7b
created by github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(*consumers).add
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/consumers.go:74 +0x10b

goroutine 13372 [runnable]:
github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_Connection).heartbeater(0xc82391f320, 0x2540be400, 0xc8239754a0)
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:494 +0x505
created by github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_Connection).openTune
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:713 +0x805

goroutine 13371 [runnable]:
net.runtime_pollWait(0x7fb9dba4ab98, 0x72, 0xc820010180)
/usr/local/go/src/runtime/netpoll.go:157 +0x60
net.(_pollDesc).Wait(0xc823934610, 0x72, 0x0, 0x0)
/usr/local/go/src/net/fd_poll_runtime.go:73 +0x3a
net.(_pollDesc).WaitRead(0xc823934610, 0x0, 0x0)
/usr/local/go/src/net/fd_poll_runtime.go:78 +0x36
net.(_netFD).Read(0xc8239345b0, 0xc82393b000, 0x1000, 0x1000, 0x0, 0x7fb9dba45050, 0xc820010180)
/usr/local/go/src/net/fd_unix.go:232 +0x23a
net.(_conn).Read(0xc820028628, 0xc82393b000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:172 +0xe4
bufio.(_Reader).fill(0xc823975440)
/usr/local/go/src/bufio/bufio.go:97 +0x1e9
bufio.(_Reader).Read(0xc823975440, 0xc82393d2d0, 0x7, 0x7, 0x1, 0x0, 0x0)
/usr/local/go/src/bufio/bufio.go:207 +0x260
io.ReadAtLeast(0x7fb9dba4aef8, 0xc823975440, 0xc82393d2d0, 0x7, 0x7, 0x7, 0x0, 0x0, 0x0)
/usr/local/go/src/io/io.go:298 +0xe6
io.ReadFull(0x7fb9dba4aef8, 0xc823975440, 0xc82393d2d0, 0x7, 0x7, 0x1, 0x0, 0x0)
/usr/local/go/src/io/io.go:316 +0x62
github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_reader).ReadFrame(0xc821d99ee0, 0x0, 0x0, 0x0, 0x0)
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/read.go:49 +0xbd
github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.(_Connection).reader(0xc82391f320, 0x7fb9dba4ae60, 0xc820028628)
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:464 +0x172
created by github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp.Open
/home/user/go/bin/src/github.com/RichardKnop/machinery/Godeps/_workspace/src/github.com/streadway/amqp/connection.go:220 +0x3e3

amqp, server and workers are on the same server.

Worker's concurrency limit

Hi, is there way to limit count of concurrent tasks running on one worker server? For example: I know, thar server A is slow, but server B is fast and i want to server B perform more tasks than server A. I want to told machinery worker that it can perform only 10 goroutins on server A, and 20 on server B.

Or another method: on servers i have some metric(GPU memory usage) and i want to configure worker to accept new tasks only if that metric is low.

Any suggestions?
Thanks!

RedisBroker.StopConsuming() Does Not Work

It seems like RedisBroker.StopConsuming() doesn't properly shut down the Redis receiving goroutine.

This causes one of the integration tests to fail (I have commented it out for now).

Not a high priority issue as StopConsuming() methods are only used by integration tests for now.

Could not send task

Hi,
When I test with the example, I got this error:

Could not send task: Set State Pending: Queue Declare: Exception (406) Reason: "PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'task_5d2b652d-8428-4834-8c5e-bb4c6bc94d50' in vhost '/': {value_negative,-694967296}"
panic: Could not send task: Set State Pending: Queue Declare: Exception (406) Reason: "PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'task_5d2b652d-8428-4834-8c5e-bb4c6bc94d50' in vhost '/': {value_negative,-694967296}"

I am running RabbitMQ 3.6.4 on my mac and use the default exampleconfig.yml,

I also do search, the problem seems to be declare it again with a x-message-ttl, so i remove this line

results_expire_in: 3600000

then it works fine, is there other way to solve this problem?

thanks.

Chord Bug With Redis/Memcache Backend

There seems to be a race issue with chords when using Redis or Memcache backends.

I have observed an intermittent bug when chord callbacks would not get triggered. Probably caused by inconsistent state of TaskStateGroup.

Needs some investigation.

Add logic to query current tasks in a queue

@RichardKnop been working with Machinery for a few months now and it is really great. Thank you so much for your work on the project!

I am interested in the ability to query a broker for tasks that are currently enqueued. I've searched through the project and don't believe this is currently supported.

Unless I am mistaken, would you consider a PR that provides this functionality? I'm interested in this for testing purposes. IE I want to test some behavior that should publish a few tasks, then verify that they actually made it to the queue with the appropriate data.

For ex, I have the following implemented on a fork for the redis broker.

// PendingMessages returns a slice of task.Signatures currently enqueued.
func (redisBroker *RedisBroker) PendingMessages() ([]*signatures.TaskSignature, error) {
    conn, err := redisBroker.open()
    if err != nil {
        return nil, fmt.Errorf("Dial: %s", err)
    }
    defer conn.Close()

    bytes, err := conn.Do("LRANGE", redisBroker.config.DefaultQueue, 0, 10)
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return nil, err
    }
    results, err := redis.ByteSlices(bytes, err)
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return nil, err
    }

    var taskSignatures []*signatures.TaskSignature
    for _, result := range results {
        var taskSignature signatures.TaskSignature
        if err := json.Unmarshal(result, &taskSignature); err != nil {
            return nil, err
        }
        taskSignatures = append(taskSignatures, &taskSignature)
    }
    return taskSignatures, nil
}

I have additionally implement a function to purge a queue, which is also useful for testing.

// PurgeQueue
func (redisBroker *RedisBroker) PurgeQueue() error {
    conn, err := redisBroker.open()
    if err != nil {
        return nil
    }
    defer conn.Close()

    _, err = conn.Do("DEL", redisBroker.config.DefaultQueue)
    return err
}

Would love to know your thoughts on the above.

Update task state with metadata

Awesome work on this project!

Did you have any plans to add the ability for a task to update its state with metadata? I'm currently using this project to import data and it would be beneficial to be able to determine the progress of the task. I wanted to gauge the interest in having this feature before implementing it. My initial thoughts would be to add a field into the TaskState struct which would hold any metadata on the task.

type TaskState struct {
  TaskUUID string
  State    string
  Result   *TaskResult
  Metadata map[interface{}]interface{}
  Error    string
}

There could then be a flag set in the task signature that would tell the worker to pass along a function that would be used by the task to set it's metadata.

type TaskSignature struct {
  UUID           string
  Name           string
  RoutingKey     string
  GroupUUID      string
  GroupTaskCount int
  Args           []TaskArg
  Immutable      bool
  HasMetadata    bool // tells worker to pass along a function to update metadata
  OnSuccess      []*TaskSignature
  OnError        []*TaskSignature
  ChordCallback  *TaskSignature
}

Avoid logging potentially sensitive data

I have a use-case where semi-sensitive data is passed around through a redis queue. This data should not be revealed in the logs, but https://github.com/RichardKnop/machinery/blob/master/v1/brokers/redis.go#L211 causes the data to appear.

Ideally it should be possible to suppress this output without suppressing all log output (currently it is possible to provide an alternative logger implementation but since the log line doesn't set any log level such as Debug, it can't be selectively filtered)

Using a custom broker seems to be the only possibility, but this is awkward since it requires copying the code of brokers/redis.go and factories.go to create an alternative redis implementation (an issue which seems to be related to #97).

Could a flag be added to denote data as sensitive, and therefore not to be logged anywhere? Alternatively, could that log line be changed to debug level (or other log lines be changed to warn level as appropriate) so that this could be filtered by the logger?

If redis crashes a worker won't quit

    redisBroker.pool = redisBroker.newPool()
    defer redisBroker.pool.Close()

    _, err := redisBroker.pool.Get().Do("PING")
    if err != nil {
        redisBroker.retryFunc()
        return true, err // retry true
    }

I have a test case where i see how my program handles redis crashes. Once i identify the crash, i want to close the worker and create a new one since redis's ip might've changed.
So when redis crashes, i try to call Quit() in the hope of stopping the previous worker goroutine. The stopping channel isn't sampled, so the worker won't quit since it'll keep on retrying.

AMPQ broker connection reusing

Is there a reason to open/close broker connection on every message publishing?
In a single goroutine with AMPQ connection reusing I have 110 msg publish/sec vs 55 msg/sec with current implementation.
Also with using of goroutines to publish messages I was able to get 34000 msg/sec on the same configuration. Looks like goroutines is safe when reusing AMPQ connection

ResultBackend is not optional

Result backend to use for keeping task states and results. This setting is optional, you can run >Machinery without keeping track of task results.

But in fact when I try to leave ResultBackend empty there is a panic at

src/github.com/RichardKnop/machinery/v1/server.go:102 (0x2a4492)
    (*Server).SendTask: if err := server.backend.SetStatePending(signature); err != nil

Feature print stack on panic

Print the stack when panic happens when processing task.

At the moment the stack is lost when a panic happens inside of a task. I do not know if its best to enforce people to put there own handling for panics or just update the global worker panic recover to have a stack trace, or both?

At the moment you can get into situations like this:

2016/08/18 07:56:45 Failed processing task_225ef62d-6c6b-42dd-9bce-d91d4c7edea9. Error = runtime error: invalid memory address or nil pointer dereference

Which makes it impossible to debug.

If its a good idea to put this in the global worker recover let me know and with make a pull request for it.

A broker written with Go channels

I was thinking of having a broker written entirely in Go. Its benefits would be:

  • No external dependencies: you don't need Redis or AMPQ. It's all Go.
  • You're not limited to marshalable data: I have to do some weird things because I can't pass a channel to my task handler function. If this was all in Go, I could pass any value I wanted.

Would it work with Machinery? Would there be a point in it? I was thinking that it would reduce a lot of boilerplate code in projects which want to do some asynchronous jobs such as sending emails, without depending on a messaging queue, or people like me, who want to pass in unmarshalable values to their functions.

Feature Request: Delay task

Ability to delay the tasks for processing e.g run task in 5 Min.

Could be implemented by adding a Execute Time to the task signature, and by user a helper method e.g

func (t *TaskSignature) Delay(d time.Duration) 

When the worker gets the task, it holds onto it until the execution time and then runs it.

You want a store a time to execute and not just the delay as you want to guarantee that the task is run as close as possible to it, If the worker crashes the delay would start again otherwise.

Launch machinery worker within application?

I'm trying to use machinery within a larger application. I was wondering if there was any way to launch machinery workers within the application, so that it can send and work on applications at the same time...

Or am I fundamentally misusing machinery?

Retrying Doesn't Reset Fibonacci Sequence

When a connection drops, worker will keep retrying to connect to the broker using Fibonacci sequence, so it will retry in 1, 1, 2, 3, 5, 8... seconds.

However, when worker successfully connects and starts consuming messages, the retry Fibonacci sequence should be reset so next time connection drops we start from 1 again.

amqp high cpu usage

when I start server and there no workers with registered tasks which server sends, there many queries to amqp and near 100% cpu usage

Publish Message error when use redis as broker

Publish Message: write tcp 192.168.56.1:14302->192.168.56.6:6379: wsasend: An existing connection was forcibly closed by the remote host.

I start a chord task which has more than 1500 subtask, one of each contains a 1K bytes message.

Feature Request: Pluggable Logger

Something simple where we can have this library use a logger that follows the standard interface would be good.

I'm looking to use logrus to hide a good chunk of the printf spam that I get from using this library.

Dependencies

Running it, happened this:

/code/go/src/github.com/RichardKnop/machinery$ go run examples/worker/worker.go
v1/amqp.go:11:2: cannot find package "github.com/streadway/amqp" in any of:
    /usr/lib/go/src/pkg/github.com/streadway/amqp (from $GOROOT)
    /home/vanhalt/code/go/src/github.com/streadway/amqp (from $GOPATH)
v1/config/config.go:7:2: cannot find package "gopkg.in/yaml.v2" in any of:
    /usr/lib/go/src/pkg/gopkg.in/yaml.v2 (from $GOROOT)
    /home/vanhalt/code/go/src/gopkg.in/yaml.v2 (from $GOPATH)

Pass struct as result value?

Is there an opportunity for a task to return a struct or map value in order to pass more information. For instance if I'm doing file processing tasks, I'd like to convey information like filepaths, sizes, stats, etc. Currently I'm just writing the data to a json file and sending back a string value of the config filepath as a result. Is it problematic for passing such a struct value to chained tasks?

sendGroup is very slow when task number is large

I tried to use send group task feature. However, it turns out to be quite slow. The scheduling frequency seems to be fixed. So I end up using send task one by one.

What is exact the benefit to use send group instead of sending task one by one?

Chord Callback Called Multiple Time

Chord callback is being called N times where N is number of tasks in the group.

This affects Redis and Memcache backends. AMQP backend is not affected.

Workaround until this is fixed is to make sure chord callback is idempotent.

Tasks only works with string and float64

if i try the example with

func Add(args ...int64) (int64, error) {
    sum := int64(0)
    for _, arg := range args {
        sum += arg
    }
    return sum, nil
}

i get "Failed processing {54740152-8C08-4F6F-BA2A-579B092E0615}. Error = 1 is not int64", from worker.go

i think it is because the default behavior of encoding/json is when it finds a number in json and a interface in go it converts the number to float64

Support for scheduled tasks

I'd love to see some kind of scheduled task support, where you can submit a task for execution at a point of time in the future.

Priority queues

We've been evaluating different queuing alternatives for golang with a Redis backend. Machinery seemed to be promising, but we were worried about different priority queues usage.

We're building a system in which several workers should share a queuing system to pick and perform tasks. We want some tasks to have higher priority than others and workers to pick first from higher priority queues if they contain messages.

From the examples we reviewed, it seems machinery is only capable to work with a single default queue. Is there any way to implement the desired behaviour? In case not, is there any plan to include this at any time?

We had other systems written in Python and we switched from Celery to rq because of the priority management in the former.

Thanks!

Is machinery ready for production?

Hi @RichardKnop , I'm moving from python to golang and I need a celery like architecture to handle asynchronous tasks for almost real time execution. Machinery looks a great fit for it, but I would like to get your honest review about whether it is ready to use in production and how much time it will take?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.