Code Monkey home page Code Monkey logo

queue's Introduction

fast_testing packaging publish

A collection of persistent queue implementations for Tarantool

Table of contents

Queue types

fifo - a simple queue

Features:

  • If there is only one consumer, tasks are scheduled in strict FIFO order.
  • If there are many concurrent consumers, FIFO order is preserved on average, but is less strict: concurrent consumers may complete tasks in a different order.

The following options can be specified when creating a fifo queue:

  • temporary - boolean - if true, the contents do not persist on disk (the queue is in-memory only)
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation; the expected function syntax is function(task, stats_data), where stats_data is the operation type, and task is normalized task data. NOTE: It's better to use :on_task_change() function.

fifo queue does not support:

  • task priorities (pri)
  • task time to live (ttl)
  • task time to execute (ttr)
  • delayed execution (delay)

Example:

-- add a log record on task completion
local function otc_cb(task, stats_data)
    if stats_data == 'delete' then
        log.info("task %s is done", task[1])
    end
end

queue.create_tube('tube_name', 'fifo', {temporary = true, on_task_change = otc_cb})
queue.tube.tube_name:put('my_task_data_1')
queue.tube.tube_name:put('my_task_data_2')

In the example above, the otc_cb function will be called 2 times, on each task completion. Values for the callback arguments will be taken from the queue.

fifottl - a simple priority queue with support for task time to live

The following options can be specified when creating a fifottl queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation

The following options can be specified when putting a task in a fifottl queue:

  • pri - task priority (0 is the highest priority and is the default)
  • ttl - numeric - time to live for a task put into the queue, in seconds. if ttl is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)
  • ttr - numeric - time allotted to the worker to work on a task, in seconds; if ttr is not specified, it is set to the same as ttl (if a task is being worked on for more than ttr seconds, its status is changed to 'ready' so another worker may take it)
  • delay - time to wait before starting to execute the task, in seconds

Example:

queue.create_tube('tube_name', 'fifottl', {temporary = true})
queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 })

In the example above, the task has 60.1 seconds to live, but the start of execution is delayed for 80 seconds. Thus the task actually will exist for up to (60.1 + 80) 140.1 seconds.

A smaller priority value indicates a higher priority, so a task with priority 1 will be executed after a task with priority 0, if all other options are equal.

limfifottl - a simple size-limited priority queue with support for task time to live

Works same as fifottl, but has limitied size and put operation timeout.

The following options can be specified when creating a fifottl queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation
  • capacity - number - limit size of the queue

The following options can be specified when putting a task in a fifottl queue:

  • pri - task priority (0 is the highest priority and is the default)
  • ttl - numeric - time to live for a task put into the queue, in seconds. if ttl is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)
  • ttr - numeric - time allotted to the worker to work on a task, in seconds; if ttr is not specified, it is set to the same as ttl (if a task is being worked on for more than ttr seconds, its status is changed to 'ready' so another worker may take it)
  • delay - time to wait before starting to execute the task, in seconds
  • timeout - numeric - seconds to wait until queue has free space; if timeout is not specified or time is up, and queue has no space, method return Nil

utube - a queue with sub-queues inside

The main idea of this queue backend is the same as in a fifo queue: the tasks are executed in FIFO order. However, tasks may be grouped into sub-queues.

It is advised not to use utube methods inside transactions with read-confirmed isolation level. It can lead to errors when trying to make parallel tube methods calls with mvcc enabled.

The following options can be specified when creating a utube queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk

  • if_not_exists - boolean - if true, no error will be returned if the tube already exists

  • on_task_change - function name - a callback to be executed on every operation

  • storage_mode - string - one of

    • queue.driver.utube.STORAGE_MODE_DEFAULT ("default") - default implementation of utube
    • queue.driver.utube.STORAGE_MODE_READY_BUFFER ("ready_buffer") - allows processing take requests faster, but by the cost of put operations speed. Right now this option is supported only for memtx engine. WARNING: this is an experimental storage mode.

    Here is a benchmark comparison of these two modes:

    • Benchmark for simple put and take methods. 30k utubes are created with a single task each. Task creation time is calculated. After that 30k consumers are calling take + ack, each in the separate fiber. Time to ack all tasks is calculated. The results are as follows:

      put (30k) take+ack
      default 180ms 1.6s
      ready 270ms 1.7s
    • Benchmark for the busy utubes. 10 tubes are created. Each contains 1000 tasks. After that, 10 consumers are created (each works on his tube only, one tube — one consumer). Each consumer will take, then yield and then ack every task from their utube (1000 tasks each). After that, we can also run this benchmark with 10k tasks on each utube, 100k tasks and 150k tasks. But all that with 10 utubes and 10 consumers. The results are as follows:

      1k 10k 50k 150k
      default 53s 1.5h 100h 1000h
      ready 450ms 4.7s 26s 72s

The following options can be specified when putting a task in a utube queue:

  • utube - the name of the sub-queue. Sub-queues split the task stream according to the sub-queue name: it is not possible to take two tasks out of a sub-queue concurrently, each sub-queue is executed in strict FIFO order, one task at a time.

utube queue does not support:

  • task priorities (pri)
  • task time to live (ttl)
  • task time to execute (ttr)
  • delayed execution (delay)

Example:

Imagine a web crawler, fetching pages from the Internet and finding URLs to fetch more pages. The web crawler is based on a queue, and each task in the queue refers to a URL which the web crawler must download and process. If the web crawler is split into many worker processes, then the same URL may show up in the queue many times, because a single URL may be referred to by many linking pages. And the worker processes, working in parallel, can cause a denial-of-service on the site of the URL. As a result, the web crawler can end up in the web server's user-agent ban list -- not a desirable outcome.

If the URL's domain name is used as a sub-queue name, this problem can be solved: all the URLs with the same domain name can be fetched and processed in strict FIFO order.

utubettl - extension of utube to support ttl

This queue type is effectively a combination of fifottl and utube.

It is advised not to use utubettl methods inside transactions with read-confirmed isolation level. It can lead to errors when trying to make parallel tube methods calls with mvcc enabled.

The following options can be specified when creating a utubettl queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk

  • if_not_exists - boolean - if true, no error will be returned if the tube already exists

  • on_task_change - function name - a callback to be executed on every operation

  • storage_mode - string - one of

    • queue.driver.utubettl.STORAGE_MODE_DEFAULT ("default") - default implementation of utubettl
    • queue.driver.utubettl.STORAGE_MODE_READY_BUFFER ("ready_buffer") - allows processing take requests faster, but by the cost of put operations speed. Right now this option is supported only for memtx engine. WARNING: this is an experimental storage mode.

    Here is a benchmark comparison of these two modes:

    • Benchmark for simple put and take methods. 30k utubes are created with a single task each. Task creation time is calculated. After that 30k consumers are calling take + ack, each in the separate fiber. Time to ack all tasks is calculated. The results are as follows:

      put (30k) take+ack
      default 200ms 1.7s
      ready 320ms 1.8s
    • Benchmark for the busy utubes. 10 tubes are created. Each contains 1000 tasks. After that, 10 consumers are created (each works on his tube only, one tube — one consumer). Each consumer will take, then yield and then ack every task from their utube (1000 tasks each). After that, we can also run this benchmark with 10k tasks on each utube, 100k tasks and 140k tasks. But all that with 10 utubes and 10 consumers. The results are as follows:

      1k 10k 50k 140k
      default 80s 1.6h 100h 1000h
      ready 520ms 5.4s 28s 83s

The following options can be specified when putting a task in a utubettl queue:

  • pri - task priority (0 is the highest priority and is the default)
  • utube - the name of the sub-queue
  • ttl - numeric - time to live for a task put into the queue, in seconds. if ttl is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)
  • ttr - numeric - time allotted to the worker to work on a task, in seconds; if ttr is not specified, it is set to the same as ttl (if a task is being worked on for more than ttr seconds, its status is changed to 'ready' so another worker may take it)
  • delay - time to wait before starting to execute the task, in seconds

The underlying spaces

The queue system consists of fibers, IPC channels, functions, and spaces. Here is how queues map to spaces in a Tarantool database.

The _queue space contains tuples for each queue and its properties. This space is created automatically when the queue system is initialized for the first time (for example, by "require 'queue'"), and is re-used on later occasions.

Fields of the _queue space

  1. tube - the name of the queue
  2. tube_id - queue ID, numeric
  3. space - the name of a space associated with the queue, which contains one tuple for each queue task
  4. type - the queue type ('fifo', 'fifottl', 'utube', 'utubettl')
  5. opts - additional options supplied when creating the queue, for example 'ttl'

The _queue_consumers temporary space contains tuples for each job which is working on a queue. Consumers may be simply waiting for tasks to be put in the queues.

Fields of the _queue_consumers space

  1. connection_id - connection ID of the client
  2. fid - client fiber ID
  3. tube_id - queue ID, referring to the tube_id field in the _queue space; the client waits for tasks in this queue
  4. timeout - the client wait timeout
  5. time - the time when the client took a task

The _queue_taken_2 (_queue_taken is deprecated) space contains tuples for each job which is processing a task in the queue.

Fields of the _queue_taken_2 space

  1. tube_id - queue ID, to which the task belongs
  2. task_id - task ID (of the task being taken)
  3. connection_id - connection ID of the client, referring to the connection_id field of the _queue_consumers space
  4. session_uuid - session UUID (string)
  5. time - the time when the client began to execute the task

The _queue_session_ids space contains a map: connection id (box session id) to the session UUID. This space is temporary if in_replicaset is set to false.

Fields of the _queue_session_ids space

  1. connection_id - connection id (numeric)
  2. session_uuid - session UUID (string)

Fields of the _queue_shared_sessions space

  1. uuid - session UUID (string)
  2. exp_time - session expiration time (numeric)
  3. active - session state (boolean)

This space is temporary if in_replicaset is set to false.

Also, there is a space which is associated with each queue, which is named in the space field of the _queue space. The associated space contains one tuple for each task.

Fields of the space associated with each queue

  1. task_id - numeric - see below
  2. task_state - 'r' for ready, 't' for taken, etc. - see below
  3. task_data - the contents of the task, usually a long string x. (additional fields if the queue type has options for ttl, priority, or delay)

The task_id value is assigned to a task when it's inserted into a queue. Currently, task_id values are simple integers for fifo and fifottl queues.

The task_state field takes one of the following values (different queue types support different sets of task_state values, so this is a superset):

  • 'r' - the task is ready for execution (the first consumer executing a take request will get it)
  • 't' - the task has been taken by a consumer
  • '-' - the task has been executed (done) (a task is removed from the queue after it has been executed, so this value will rarely be seen)
  • '!' - the task is buried (disabled temporarily until further changes)
  • '~' - the task is delayed for some time.

For details on the state transitions, refer to Task state diagram.

Task state diagram

The following diagram shows possible transitions between the task states. For information on the transition triggers, refer to:

flowchart LR
      INIT((" "))-->  |"put()"| READY
      INIT((" "))--> |"put('my_task_data', {delay = delay})"| DELAYED
      READY--> |"take()"| TAKEN
      READY--> |"delete() / ttl timeout"| DONE
      READY--> |"bury()"| BURIED
      TAKEN--> |"release() / ttr timeout"| READY
      TAKEN--> |"release\n(id, {delay = delay})"| DELAYED
      TAKEN--> |"ack() / delete()"| DONE
      TAKEN--> |"bury()"| BURIED
      BURIED--> |"delete() /\nttl timeout"| DONE
      BURIED--> |"kick()"| READY
      DELAYED--> |"timeout"| READY
      DELAYED--> |"delete()"| DONE
Loading

Queue state diagram

Queue can be used in a master-replica scheme:

There are five states for queue:

  • INIT
  • STARTUP
  • RUNNING
  • ENDING
  • WAITING

When the tarantool is launched for the first time, the state of the queue is always INIT until box.info.ro is false.

States switching scheme:

flowchart LR
      I(("init"))-->S[startup]
      S[startup]-->R[running]
      W[waiting]--> |"(ro ->rw)"| S[startup]
      R[running]--> |"(rw ->ro)"| E[ending]
      E[ending]-->W[waiting]
Loading

Current queue state can be shown by using queue.state() method.

In the STARTUP state, the queue is waiting for possible data synchronization with other cluster members by the time of the largest upstream lag multiplied by two. After that, all taken tasks are released, except for tasks with session uuid matching shared sessions uuids. This makes possible to take a task, switch roles on the cluster, and release the task within the timeout specified by the queue.cfg({ttr = N}) parameter. And the last step in the STARTUP state is starting tube driver using new method called start().

In the RUNNING state, the queue is working as usually. The ENDING state calls stop() method. in the WAITING state, the queue listens for a change in the read_only flag.

All states except INIT is controlled by new fiber called queue_state_fiber.

Installing

There are three alternative ways of installation.

  • Get the tarantool_queue package from a repository. For example, on Ubuntu, say: sudo apt-get install tarantool-queue
  • Take the Lua rock from rocks.tarantool.org.
  • Take the source files from https://github.com/tarantool/queue, then build and install.

Using the queue module

Initialization

queue = require 'queue'

The request "require 'queue'" causes automatic creation of the _queue space, unless it already exists. The same request also sets the necessary space triggers and other objects associated with queues.
If the instance hasn't been configured yet (box.cfg() hasn't been called), the initialization of the queue module will be deferred until the instance will be configured ("lazy start"). For a good work of the queue, it's necessary to run the instance in rw mode. If the instance run in ro mode, the initialization of the queue will be deferred until the instance will be configured in rw mode. After the instance has been started in rw mode and the queue has been initialized, it's a bad idea to switch it to ro mode. In the case, an attempt to do something with a persistent ("temporary" option set to false) queue will fail (a temporary queue will work fine). In addition, in the case of mode has been switched, triggers may fail (_on_consumer_disconnect for example), which may cause an inconsistent state of the queue. As for the core drivers that use background fibers (fifottl, limfifottl, utubettl) - they check the instance mode on each iteration and will wait until the instance will be switched to rw mode.

Get the module version

queue._VERSION

Returns the current version of the module.

Creating a new queue

queue.create_tube(queue name, queue type [, {options} ])

Creates a queue.

The queue name must be alphanumeric and be up to 32 characters long.

The queue type must be 'fifo', 'fifottl', 'utube', or 'utubettl'.

The options, if specified, must be one or more of the options described above (temporary and/or ttl and/or ttr and/or pri, depending on the queue type). The ttr and ttl options can be regarded as defaults, which may be overridden when a task is put in a queue.

Effect: a tuple is added in the _queue space, and a new associated space is created.

Example: queue.create_tube('list_of_sites', 'fifo', {temporary = true})

Set queue settings

queue.cfg({options})

Set queue settings.
If an invalid value or an unknown option is used, an error will be thrown.
Available options:

  • ttr - time to release in seconds. The time after which, if there is no active connection in the session, it will be released with all its tasks.
  • in_replicaset - enable replication mode. Must be true if the queue is used in master and replica mode. With replication mode enabled, the potential loss of performance can be ~20% compared to single mode. Default value is false.

Session identify

queue.identify(session_uuid)

In the queue the session has a unique UUID and many connections may share one logical session. Also, the consumer can reconnect to the existing session during thettr time.
To get the UUID of the current session, call the queue.identify() without parameters.
To connect to the existing session, call the queue.identify(session_uuid) with the UUID of the session.
In case of attempt to use an invalid format UUID or expired UUID, an error will be thrown.

Be careful, UUID here is a 16-bit string generated by uuid.bin(), not an object of type UUID.

Usage example:
Sometimes we need an ability to acknowledge a task after reconnect (because retrying it is undesirable) or even acknowlegde using another connection.

Example of code for connecting to the old session in case of reconnection:

local netbox = require('net.box')

local conn = netbox.connect('localhost:1918', { reconnect_after = 5 })
local session_uuid = conn:call('queue.identify')
conn:on_connect(function()
    conn:call('queue.identify', {session_uuid})
end)

Putting a task in a queue

To insert a new task into a queue, use:

queue.tube.tube_name:put(task_data [, {options} ])

The tube_name must be the name which was specified by queue.create_tube.

The task_data contents are the user-defined description of the task, usually a long string.

The options, if specified, must be one or more of the options described above (ttl and/or ttr and/or pri and/or delay and/or utube, depending on the queue type). If an option is not specified, the default is what was specified during queue.create_tube, and if that was not specified, then the default is what was described above for the queue type. Note: if the delay option is specified, the delay time is added to the ttl time.

Effect: a new tuple is created in the queue's associated space, with task_id = a number which is equal to the largest task_id so far, plus 1 task_state = 'r' (ready) task_data = whatever the user put in the task_data parameter

Returns: the value of the new tuple in the queue's associated space, also called the "created task".

Example: queue.tube.list_of_sites:put('Your task is to do something', {pri=2})

After a task has been put in a queue, one of these things may happen: it may be removed from the queue because its ttl (time to live) expires, or it may be acted on by a worker (usually with a take request).

Taking a task from the queue ("consuming")

queue.tube.tube_name:take([timeout])

Take a queue task.

The take request searches for a task in the queue or sub-queue (that is, a tuple in the queue's associated space) which has task_state = 'r' (ready), and task_id = a value lower than any other tuple which also has task_state = 'r'.

If there is no such task, and timeout was specified, then the job waits until a task becomes ready or the timeout expires.

Effect: the value of task_state changes to 't' (taken). The take request tells the system that the task is being worked on. It should be followed by an ack request when the work is finished. Additional effect: a tuple is added to the _queue_taken_2 space.

Returns: the value of the taken tuple, or nil if none was found. The value of the first field in the tuple (task_id) is important for further requests. The value of the second field in the tuple (task_data) is important as it presumably contains user-defined instructions for what to do with the task.

Example: t_value = queue.tube.list_of_sites:take(15)

Increasing TTR and/or TTL for tasks

queue.tube.tube_name:touch(task_id, increment)

Increase ttr of running task. Useful if you can't predict in advance time needed to work on task.

Effect: the value of ttr and ttl increased by increment seconds. If queue does not support ttr, error will be thrown. If increment is lower than zero, error will be thrown. If increment is zero or nil effect is noop. If current ttr of task is 500 years or greater then operation is noop.

Example: t_value = queue.tube.list_of_sites:touch(15, 60)

Acknowledging the completion of a task

queue.tube.tube_name:ack(task_id)

The worker which has used 'take' to take the task should use 'ack' to signal that the task has been completed. The current task_state of the tuple should be 't' (taken), and the worker issuing the ack request must have the same ID as the worker which issued the take request.

Effect: the value of task_state changes to '-' (acknowledged). Shortly after this, it may be removed from the queue automatically.

If 'take' occurs but is not soon followed by 'ack' -- that is, if ttr (time to run) expires, or if the worker disconnects -- the effect is: task_state is changed from 't' (taken) back to 'r' (ready). This effect is the same as what would happen with a release request.

Example: queue.tube.list_of_sites:ack(15)

Releasing a task

queue.tube.tube_name:release(task_id, opts)

Put the task back in the queue. A worker which has used 'take' to take a task, but cannot complete it, may make a release request instead of an ack request. Effectively, 'ack' implies successful completion of a taken task, and 'release' implies unsuccessful completion of a taken task.

Effect: the value of task_state changes to 'r' (ready). After this, another worker may take it. This is an example of a situation where, due to user intervention, a task may not be successfully completed in strict FIFO order.

Example: queue.tube.list_of_sites:release(15, {delay=10})

Note: in the above example, the delay option means "the task cannot be executed again for 10 seconds".

Peeking at a task

queue.tube.tube_name:peek(task_id)

Look at a task without changing its state.

Effect: this is the same as getting a tuple from the space associated with the queue: box.space.tube_name:select(task_id).

Returns: the tuple of the task.

Example: queue.tube.list_of_sites:peek(15)

Burying a task

queue.tube.tube_name:bury(task_id)

If it becomes clear that a task cannot be executed in the current circumstances, you can "bury" the task -- that is, disable it until the circumstances change.

Effect: the value of task_state changes to '!' (buried). Since '!' is not equal to 'r' (ready), the task can no longer be taken. Since '!' is not equal to '-' (complete), the task will not be deleted. The only thing that can affect a buried task is a kick request.

Returns: the tuple value.

Example: queue.tube.list_of_sites:bury(15)

Kicking a number of tasks

queue.tube.tube_name:kick(count)

Reverse the effect of a bury request on one or more tasks.

Effect: the value of task_state changes from '!' (buried) to 'r' (ready), for one or more tasks.

Returns: number of tasks actually kicked.

Example: queue.tube.list_of_sites:kick(99) (this will change up to 99 buried tasks)

Deleting a task

queue.tube.tube_name:delete(task_id)

Delete the task identified by task_id.

Effect: the current state of task_state is not checked. The task is removed from the queue.

Example: queue.tube.list_of_sites:delete(15)

Dropping a queue

queue.tube.tube_name:drop()

Reverse the effect of a create request.

Effect: remove the tuple from the _queue space, and drop the space associated with the queue.

Releasing all taken tasks

queue.tube.tube_name:release_all()

Forcibly returns all taken tasks to a ready state.

Getting statistics

queue.statistics( [queue name] )

Show the number of tasks in a queue broken down by task_state, and the number of requests broken down by the type of request. If the queue name is not specified, show these numbers for all queues. Statistics are temporary, they are reset whenever the Tarantool server restarts.

Example:

queue.tube.tube_name:on_task_change(callback)

Replace old on_task_change callback or set the new one. Previously set callback is returned.

Get statistics for given tube:

queue.statistics('list_of_sites')
---
- tasks:
     taken: 0
     buried: 0
     ready: 0
     done: 2
     delayed: 0
     total: 0
   calls:
     ack: 1
     take: 1
     kick: 1
     bury: 1
     put: 2
     delete: 1
...

Queue and replication

Usage example:

-- Instance file for the master.
queue = require("queue")
-- Queue is in replicaset.
-- Clean up session after 5 minutes after disconnect.
queue.cfg({ttr = 300, in_replicaset = true})

box.cfg{
  listen = 3301,
  replication = {'replicator:[email protected]:3301',  -- Master URI.
                 'replicator:[email protected]:3302'}, -- Replica URI.
  read_only = false,
}

box.once("schema", function()
   box.schema.user.create('replicator', {password = 'password'})
   box.schema.user.grant('replicator', 'replication') -- grant replication role
end)

require('console').start()
os.exit()
-- Instance file for the replica.
queue = require("queue")
-- Queue is in replicaset.
-- Clean up session after 5 minutes after disconnect.
queue.cfg({ttr = 300, in_replicaset = true})
box.cfg{
  listen = 3302,
  replication = {'replicator:[email protected]:3301',  -- Master URI.
                 'replicator:[email protected]:3302'}, -- Replica URI.
  read_only = true
}

require('console').start()
os.exit()

Start master and replica instances and check queue state:

Master:

tarantool> queue.state()
---
- RUNNING
...

Replica:

tarantool> queue.state()
---
- INIT
...

Now reverse the read_only setting of the master and replica and check the status of the queue again.

Master:

tarantool> box.cfg({read_only = true})
tarantool> queue.state()
---
- WAITING
...

Replica:

tarantool> box.cfg({read_only = false})
tarantool> queue.state()
---
- RUNNING
...

Implementation details

The implementation is based on the common functions for all queues:

  1. controlling the consumers (watching connection state/wakeup)
  2. similarities of the API
  3. spaces to support each tube
  4. etc

Each new queue has a "driver" to support it.

Queue drivers

Mandatory requirements

  1. The driver works with tuples. The only thing the driver needs to know about the tuples is their first two fields: id and state.
  2. Whenever the driver notices that a task state has changed, it must notify the framework about the change.
  3. The driver must not throw exceptions, unless the driver API is misused. I.e. for normal operation, even errors during normal operation, there should be no exceptions.

Registering a custom driver

register_driver(driver_name, tube_ctr) - queue method is used to register a custom driver. The arguments are:

  • driver_name: unique driver name. Must be different from the core drivers names.
  • tube_ctr: implementation of tube control methods("create_space" and "new").

Driver API

Driver class must implement the following API:

  1. new (constructs an instance of a driver), takes:
    • the space object, in which the driver must store its tasks
    • a callback to notify the main queue framework on a task state change (on_task_change)
    • options of the queue (a Lua table)
  2. create_space - creates the supporting space. The arguments are:
    • space name
    • space options
  3. start - initialize internal resources if any, e.g. start fibers.
  4. stop - clean up internal resources if any, e.g. stop fibers.

To sum up, when the user creates a new queue, the queue framework passes the request to the driver, asking it to create a space to support this queue, and then creates a driver instance, passing to it the created space object.

The same call sequence is used when the queue is "restarted" after Tarantool server restart.

The driver instance returned by the new method must provide the following API:

  • tube:normalize_task(task) - converts the task tuple to the object which is passed on to the user (removes the administrative fields)
  • tube:put(data[, opts]) - puts a task into the queue. Returns a normalized task which represents a tuple in the space
  • tube:take() - sets the task state to 'in progress' and returns the task. If there are no 'ready' tasks in the queue, returns nil.
  • tube:delete(task_id) - deletes a task from the queue. Returns the original task with a state changed to 'done'
  • tube:release(task_id, opts) - puts a task back to the queue (in the 'ready' state)
  • tube:bury(task_id) - buries a task
  • tube:kick(count) - digs out count tasks
  • tube:peek(task_id) - return the task state by ID
  • tube:touch(task_id, delta) - increases ttr and ttl of the task by delta seconds. If queue does not support ttr, error will be thrown. Returns the task
  • tube:tasks_by_state(task_state) - return the iterator to tasks in a certain state
  • tube:truncate() - delete all tasks from the tube. Note that tube:truncate must be called only by the user who created this tube (has space ownership) OR under a setuid function. Read more about setuid functions here.

queue's People

Contributors

0x501d avatar aleclarson avatar amdrozdov avatar andreinepsha avatar better0fdead avatar bigbes avatar derekbum avatar evgeny-sureev avatar grigory51 avatar grishnov avatar igorcoding avatar igorjan94 avatar ilmarkov avatar kasen avatar kostja avatar kyukhin avatar lenkis avatar leonidvas avatar mmelentiev-mail avatar nshy avatar oleg-jukovec avatar olegrok avatar opomuc avatar persdep avatar printercu avatar rtsisyk avatar rybakit avatar totktonada avatar unera avatar ylobankov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

queue's Issues

FR: add method "prolong ttr"

Hello.

We are using queue for passing tasks to background service. Unfortunately, at the moment of task creation exact volume of work is unknown and can vary from task to task on orders of magnitude. I would like to set low TTR on all tasks initially in such way that a majority of tasks will succeed before TTR will expire

But other, larger tasks need to prolong TTR by somehow. And it would be great if this will be done by queue interface, not by some hacks via underlying spaces.

Would you accept this feature and should I start to work on pull request?

Static 'on_task_change' callback

Passing 'on_task_change' into 'queue.create_tube' leads to unchangeable code in this function.
Callback function statically saved and never reloaded just after space creation.

wtf bug in queue

Config file: /etc/tarantool/instances.enabled/queue.lua

#!/usr/bin/env tarantool

box.cfg{
  listen = '3301'
}

queue = require('queue')
queue.start()

local tube_name = 'trash'
if queue.tube[tube_name] == null then
   queue.create_tube(tube_name, 'fifottl')
end

After tarantoolctl restart queue I got the following lines in log file:

2015-07-27 11:54:28.907 [17059] main C> got signal 15 - Terminated
Start failed: /usr/share/tarantool/queue/abstract.lua:287: attempt to index field '_queue' (a nil value)
2015-07-27 11:54:29.910 [18572] main/101/tarantoolctl C> version 1.6.5-274-gf84ad58

I can not reproduce this bug.

Tarantool packages:

tarantool-debug-1.5.3-79.x86_64
tarantool-queue-0.0.1-32.noarch
tarantool-common-1.6.5-274.noarch
tarantool-1.6.5-274.x86_64

tube:create() should check that a space with this name exists

If a space with the same name as tube name exists, queue module does not check that this space schema has structure which is different from the desired one, and goes on (if_not_exists clause is set to true).
This is error prone, since later on such queue will malfunction.

Check that the space used for queue actually is created with queue:create and complain if not.

async way to work with queue

There's one connection and two workers trying to get packet from server:

import asyncio
import aiotarantool

@asyncio.coroutine
def process_job(tnt):
    while True:
        t = yield from tnt.call("box.queue.tube.t1:take", 0.1)
        if not t:
            break

        tid = t.data[0][0]
        print(t.data)
        k = yield from tnt.call("box.queue.tube.t1:ack", tid)

loop = asyncio.get_event_loop()

tnt = aiotarantool.connect("127.0.0.1", 3301)
tasks = [asyncio.async(process_job(tnt))
         for _ in range(10)]

loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(tnt.close())
loop.close()

result - we have error that comes from second worker if queue is empty:

tarantool.error.DatabaseError: (3, b"Duplicate key exists in unique index 'consumer' in space '_queue_consumers'")

tube:bury() changes a task state to DONE

tarantool> tube = queue.create_tube('task_queue', 'utubettl')

tarantool> tube:put(1)

---
-  [0, 'r', 1]
...

tarantool> tube:bury(0)

---
-  [0, '-', 1]
...

Is it intended behaviour?

Add to readme appropriate grant setup.

To test queue I have to setup this weird set of grants and functions:
https://github.com/tarantool/go-tarantool/blob/d26edc79c694c2672a6541f6a9869325d61b2427/queue/config.lua

Note, that "execute universe" do not work without function creation for each action on particular queue.

Could you provide clean example, how to setup grants for unprivileged user (ie not "read,write,execute universe"), so it could use particular queue? Or provide helper function "queue.tube.tubename:grant(user)"?

Probably, right way could be to add functional interface which complements objective one:

queue.tube.tubename:put(data, cfg) => queue.put("tubename", data, cfg)
queue.tube.tubename:take() => queue.take("tubename")

But I'm not sure.

No way to check if tube exists without use of Eval iproto query.

I don't like to call eval from client cause it needs to 'execute universe' privilege.
But now there is no way neither check tube for existence nor create tube without using eval query.
create_tube fails cause it returns table that could not be serialized to msgpack and sent to response.
Without ability to call create_tube single way to check tube existence is eval('return queue.tube.myqueu ~= nil').

Please add ability to check tube existence without eval. And/or ability to create tube using iproto connector.

Tube recreation failed

> foo = queue.create_tube('foo', 'fifo')

---
...

> foo:drop()

---
- true
...

> foo = queue.create_tube('foo', 'fifo')

---
- error: Duplicate key exists in unique index 'tube'
...

O(n^2) complexity in fifottl:take()

take() degrades to O(N^2) complexity where N is the number of expired tasks:

while true do
task = self.space.index.status:select({state.READY},
{offset=offset, limit=1, iterator='GE'})[1]
if task == nil or task[i_status] ~= state.READY then
return
elseif is_expired(task) then
offset = offset + 1
else
break;
end
end

Affects Cloud.

Master-master replication incompatibility.

Consider following case:

Few (let's say 3) nodes in a cluster, with master-master replication.
Each node has it's own queue with name, unique in cluster.
As each tube need to have its own space, these spaces will be created.
But ID of each space may be not unique, as they created on each node separately at virtually the same time.
As a result, there will be errors like

E> ER_TUPLE_FOUND: Duplicate key exists in unique index 'primary' in space '_space'
E> ER_TUPLE_FOUND: Duplicate key exists in unique index 'primary' in space '_index'

in logs.
These errors are fatal for replication. And therefore replication stops.

Following solutions could be proposed:

  1. Change in application: change cluster startup logic, so it'll create all necessary tubes in box.once(). Later each node can pick appropriate queue somehow, and use it.
  2. Change in tarantool-queue: Ensure space ID is globally-unique in cluster. (But how to ensure it?)
  3. Mixed change: tarantool-queue to provide interface to specify ID of a new tube space, so application can assign cluster-unique values for them.

Не работает ttr в очереди fifottl

Если таск был просрочен (выполнение таска заняло больше, чем 'ttr' секунд), то повторный его 'take' и 'ack' приводит к ошибке "Task was not taken in the session".

ВАЖНО: если приведённые ниже команды запускать в одном скрипте через dofile, всё будет хорошо.

Команды, которые нужно выполнить в консоли, чтобы воспроизвести ошибку:

queue = require 'queue'
box.cfg{wal_mode='none'}
queue.create_tube('test_ttr', 'fifottl')
queue.tube.test_ttr:put('foobar', {ttr=1})
queue.tube.test_ttr:take(0)

-- Wait few seconds

queue.tube.test_ttr:take(0)
queue.tube.test_ttr:ack(0)
- error: Task was not taken in the session

Лог:

➜ tarantool
tarantool: version 1.6.4-472-g53b61a8
type 'help' for interactive help
tarantool> queue = require 'queue'

---
...

tarantool> box.cfg{wal_mode='none'}
2015-04-03 16:59:04.273 [31212] main/101/interactive C> version 1.6.4-472-g53b61a8
2015-04-03 16:59:04.274 [31212] main/101/interactive C> log level 5
2015-04-03 16:59:04.274 [31212] main/101/interactive I> mapping 1073741824 bytes for a shared arena...
2015-04-03 16:59:04.275 [31212] main/101/interactive I> initializing an empty data directory
2015-04-03 16:59:04.284 [31250] snap/101/dumper I> creating `./00000000000000000000.snap.inprogress'
2015-04-03 16:59:04.285 [31250] snap/101/dumper I> saving snapshot `./00000000000000000000.snap.inprogress'
2015-04-03 16:59:04.285 [31250] snap/101/dumper I> done
2015-04-03 16:59:04.286 [31212] main/101/interactive I> ready to accept requests

---
...

tarantool> queue.create_tube('test_ttr', 'fifottl')
2015-04-03 16:59:16.577 [31212] main/103/fifottl I> Started queue fifottl fiber

---
- raw:
    fiber:
      status: suspended
      name: fifottl
      id: 103
    on_task_change: 'function: 0x09472e18'
    space:
      index:
        0: &0
          unique: true
          parts:
          - type: NUM
            fieldno: 1
          id: 0
          space_id: 515
          name: task_id
          type: TREE
        1: &1
          unique: true
          parts:
          - type: STR
            fieldno: 2
          - type: NUM
            fieldno: 6
          - type: NUM
            fieldno: 1
          id: 1
          space_id: 515
          name: status
          type: TREE
        2: &2
          unique: true
          parts:
          - type: STR
            fieldno: 2
          - type: NUM
            fieldno: 3
          id: 2
          space_id: 515
          name: watch
          type: TREE
        status: *1
        task_id: *0
        watch: *2
      on_replace: 'function: 0x09471a08'
      temporary: false
      id: 515
      engine: memtx
      enabled: true
      name: queue_fifottl_test_ttr
      field_count: 0
    opts: &3
      ttr: 15768000000
      ttl: 15768000000
      pri: 0
  type: fifottl
  tube_id: 0
  name: test_ttr
  opts: *3
...

tarantool> queue.tube.test_ttr:put('foobar', {ttr=1})

---
- [0, 'r', 'foobar']
...

tarantool> queue.tube.test_ttr:take(0)

---
- [0, 't', 'foobar']
...

tarantool> print("Wait few seconds")
Wait few seconds

---
...

tarantool> queue.tube.test_ttr:take(0)

---
- [0, 't', 'foobar']
...

tarantool> queue.tube.test_ttr:ack(0)

---
- error: Task was not taken in the session
...

tarantool>

Inconsistency in stats output format

Since this commit queue.statistics() always returns tasks array and it’s filled with zeros (note that the "done" key is still missing) if there is no data yet (tasks was nil before the change). But the calls data wasn't updated to the new behaviour and now we have a mixed formatting for tasks and calls:

Before the change:

unix/:/var/run/tarantool/queues.control> queue.statistics()

---
- []
...

unix/:/var/run/tarantool/queues.control> t = create_tube('t', 'fifo')

---
...

unix/:/var/run/tarantool/queues.control> queue.statistics('t')

---
...

After the change:

unix/:/var/run/tarantool/queues.control> queue.statistics()

---
- []
...

unix/:/var/run/tarantool/queues.control> t = create_tube('t', 'fifo')

---
...

unix/:/var/run/tarantool/queues.control> queue.statistics('t')

---
- tasks:
    taken: 0
    buried: 0
    ready: 0
                        <- "done" is missing
    delayed: 0
    total: 0
  calls: []             <- Why it's []?
...

Expected result:

unix/:/var/run/tarantool/queues.control> queue.statistics('t')

---
- tasks:
    taken: 0
    buried: 0
    ready: 0
    done: 0
    delayed: 0
    total: 0
  calls:
    ack: 0
    delete: 0
    take: 0
    kick: 0
    release: 0
    put: 0
    bury: 0

ack does not work

ack throws error when I try to use it. Here is code to reproduce:

box.cfg{
  listen = '0.0.0.0:3301',
  slab_alloc_arena = 0.5
}

local queue = require('queue')

box.once("1.0", function()
  box.schema.user.grant('guest', 'read,write,execute', 'universe')
  queue.create_tube('tasks', 'fifo', {temporary = true})
end)

queue.tube.tasks:put('my_task')
local task = queue.tube.tasks:take(1)
local ok, err = pcall(queue.tube.tasks.ack, queue.tube.tasks, task[1])
print(ok, err)

tarantool-docker version 1.7.2-1-g92ed6c4

queue.tube and box.space._queue are out of sync

queue.tube contains 2 tubes (events_queue && notifications_queue), but box.space._queue:select returns only one.

/usr/local/share/lua/5.1/queue/abstract.lua:75: attempt to index local 'tube' (a nil value) (0x20)

First command in any connection is one of the following (single connection inspects single queue):

queue.create_tube('events_queue', 'fifottl', { if_not_exists = true })
queue.create_tube('notifications_queue', 'fifottl', { if_not_exists = true })

This needed to be sure tubes are always exists (server could create new tubes in runtime).

Tarantool snapshots, xlogs and logs are attached.

tarantool.tar.gz

touch() throws an error on nil interval

According to the documentation, nil increments should be handled w/o errors when passed to touch(). Currently, it's not the case:

foo = queue.create_tube('foo', 'fifottl')
foo:put('touch_ttr_15', {ttr=15})
foo:take(.1)
foo:touch(0, msgpack.NULL)
foo:touch(0, nil)

...

- error: '/usr/local/share/lua/5.1/queue/abstract.lua:86: attempt to compare ''void
*'' with ''number'''

So, either documentation or implementation should be fixed.

Возможность задавать свой идентификатор задачи

Необходимо в произвольный момент времени вытаскивать произвольную задачу, ID которой мне известен. Можно дополнительно сохранять айдишники задач, но это оверхед, который подкидывает проблемы синхронизации, а возможность задавать свой идентификатор задачи решает проблему просто.

Move non-Lua code to a new repository

Please extract all non-Lua code into separate repositories.
For example, Perl should go to tarantool-queue-perl (as it was done by @bigbes92 for Python).

Start failed

-- /etc/tarantool/instances.enabled/test.lua

box.cfg {
    listen = 3301,
    log_level = 6,
}

queue = require 'queue'
queue.start()

local name = 'foobar'

if null == queue.tube[name] then
    queue.create_tube(name, 'fifottl')
end

queue.tube[name]:put({foo = 'bar'})
$ sudo tarantoolctl start test
$ sudo tarantoolctl stop test
$ sudo tarantoolctl start test
$ sudo tarantoolctl stop test
$ cat /var/log/tarantool/test.log
...
2015-03-26 14:23:24.937 [9801] main/103/console/unix/:/var/run/tarant I> started
2015-03-26 14:23:25.007 [9801] main/104/fifottl I> Started queue fifottl fiber
2015-03-26 14:23:25.008 [9801] main/101/test D> ClientError at /usr/local/share/lua/5.1/queue/abstract/driver/fifottl.lua:30
2015-03-26 14:23:24.928 [9801] main/101/tarantoolctl I> Space 'foobar': done
2015-03-26 14:23:24.928 [9801] main/101/tarantoolctl I> ready to accept requests
2015-03-26 14:23:24.929 [9801] main/101/tarantoolctl I> set 'log_level' configuration option to '6'
2015-03-26 14:23:24.929 [9801] main/101/test I> Run console at /var/run/tarantool/test.control
2015-03-26 14:23:24.932 [9801] main/101/test I> tcp_server: remove dead UNIX socket: /var/run/tarantool/test.control
2015-03-26 14:23:24.937 [9801] wal D> cord_thread_func: unlocking &ct_arg->start_mutex
2015-03-26 14:23:24.937 [9801] wal D> wal_writer_thread: locking &writer->mutex
2015-03-26 14:23:24.937 [9801] main/103/console/unix/:/var/run/tarant I> started
2015-03-26 14:23:25.007 [9801] main/104/fifottl I> Started queue fifottl fiber
2015-03-26 14:23:25.008 [9801] main/101/test D> ClientError at /usr/local/share/lua/5.1/queue/abstract/driver/fifottl.lua:30
2015-03-26 14:23:25.021 [9801] main C> entering the event loop
2015-03-26 14:23:26.935 [9801] main D> wal_writer_stop: locking &writer->mutex
2015-03-26 14:23:26.935 [9801] main D> wal_writer_stop: unlocking &writer->mutex
2015-03-26 14:23:26.935 [9801] wal D> wal_writer_thread: unlocking &writer->mutex
2015-03-26 14:23:26.935 [9801] wal D> wal_writer_thread: locking &writer->mutex
2015-03-26 14:23:26.935 [9801] wal D> wal_writer_thread: unlocking &writer->mutex
Start failed: stack traceback:
        /usr/local/share/lua/5.1/queue/abstract/driver/fifottl.lua:30: in function 'event_time'
        /usr/local/share/lua/5.1/queue/abstract/driver/fifottl.lua:192: in function 'put'
        /usr/local/share/lua/5.1/queue/abstract.lua:32: in function 'put'
        /etc/tarantool/instances.enabled/test.lua:15: in main chunk
        [builtin#25]: at 0x004eb3b0
        [C]: in function 'pcall_lua'
        builtin/tarantool.lua:24: in function 'pcall'
        /usr/bin/tarantoolctl:334: in function 'start'
        /usr/bin/tarantoolctl:346: in main chun

Taken task is not released back to 'ready' after Tarantool reboot

How to reproduce:

  1. Get into the docker container and add a task:
docker-compose exec tnt1 tarantoolctl connect /var/run/tarantool/tarantool.sock
queue.tube.default:put({test=true})
---
- [0, 'r', {'test': true}]
...
  1. Take a task:
queue.tube.default:take()
---
- [0, 't', {'test': true}]
...
  1. Restart container:
docker-compose restart tnt1
  1. Check task status:
queue.tube.default:peek(0)
---
- [0, 't', {'test': true}]
...

The issue can be reproduced with connectors:

https://github.com/igorcoding/asynctnt-queue:

import asyncio
import asynctnt
import asynctnt_queue
import sys

async def run():
    conn = asynctnt.Connection(host='tnt1', port=3301, username='tnt1', password='tnt1')
    await conn.connect()

    queue = asynctnt_queue.Queue(conn)
    test_tube = queue.tube('default')

    # Retrieve a task from queue
    task = await test_tube.take(1)

    print(task)

    # Restart Tarantool here
    sys.stdin.readline()

    await conn.disconnect()

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

https://github.com/tarantool-php/queue:

<?php

$conn = new \Tarantool\Client\Connection\StreamConnection('tcp://127.0.0.1:3301');
$client = new \Tarantool\Client\Client($conn, new \Tarantool\Client\Packer\PurePacker());
$client->authenticate('tnt1', 'tnt1');

$queue = new \Tarantool\Queue\Queue($client, 'default');
$task = $queue->take(1);

var_dump($task);

readline('Restart Tarantool and press a key to continue');

Here is the simplified instance configuration:

box.cfg {
    listen = 3301,
    log_level = 5
}

local config = {
    user = 'tnt1',
    password = 'tnt1'
}

if not box.schema.user.exists(config.user) then
    box.schema.user.create(config.user, {password = config.password})
    box.schema.user.grant(config.user, 'read,write,execute', 'universe', nil)
end

queue = require('queue')
box.once('foobar:v0.1.0', function()
    queue.create_tube('default', 'fifottl', {if_not_exists = true})
end)

Rename statistics() to stats()

The statistics method name is cumbersome, especially compared to other method names, e.g. push, pop, kick, ack. Would it make sense to rename it to stats?

Luarocks downloading wrong[old] rockspec?

Hi, I am not sure if this is the correct place to post this issue ( I am learning lua and luarocks atm ), but I did try to install this package using luarocks
luarocks install queue ( after adding rocks.tarantool.org repo) and the compat.lua file was missing.

I did call luarocks download queue and saw:

package = 'queue'
version = 'scm-1'
source  = {
    url    = 'git://github.com/tarantool/queue.git',
    branch = 'master',
}
description = {
    summary  = "A set of persistent in-memory queues",
    homepage = 'https://github.com/tarantool/queue.git',
    license  = 'BSD',
}
dependencies = {
    'lua >= 5.1'
}
build = {
    type = 'builtin',

    modules = {
        ['queue.abstract']                 = 'queue/abstract.lua',
        ['queue.abstract.state']           = 'queue/abstract/state.lua',
        ['queue.abstract.driver.fifottl']  = 'queue/abstract/driver/fifottl.lua',
        ['queue.abstract.driver.utubettl'] = 'queue/abstract/driver/utubettl.lua',
        ['queue.abstract.driver.fifo']     = 'queue/abstract/driver/fifo.lua',
        ['queue.abstract.driver.utube']    = 'queue/abstract/driver/utube.lua',
        ['queue']                          = 'queue/init.lua'
    }
}

-- vim: syntax=lua

Clearly, the downloaded rockspec doesn't contain the compat.lua file, while the entry and the file itself is present in the git repo.

utubettl doesn't reset to READY state when using ttr option

t = queue.create_tube('test', 'utubettl')

tarantool> t:put('ttr1', { ttr = 1 })

---
- [0, 'r', 'ttr1']
...

tarantool> t:take(.1)

---
- [0, 't', 'ttr1']
...

tarantool> -- wait 1 second

---
...

tarantool> t:peek(0)

---
- [0, 't', 'ttr1']
...

Tasks in 'taken' state

We have a number of random tasks in tarantool queue having odd state 'taken'. At the same time we have no opened sockets(sessions) associated with such tasks.
To check we have phisically rebooted all applications and proxies connected to tarantool queue and found no changes in theirs states.
'ttl' option is the only thing that saves us from accumulating such broken tasks for today.

We have noticed some warnings in application:

/usr/share/tarantool/queue/abstract.lua:72: Task was not taken in the session

but these warnings have no strict correlation with broken tasks appearance, moreover this warning doesn't report a task number to investigate on.

Here is an example of such task from space 'box.space.common':

[17788457, 't', 1482752293369308, 86400000000, 15768000000000000, 9, 1482665893369308,
  {'handler': 'minute_log_prepare', 'volume': 500, 'value': 1482665760}]

Session is not found for this task:

box.space._queue_taken:pairs():grep(function (x) return x[3] == 17788457 end):length()
--- 0

Our tube init:

queue.create_tube('common', 'fifottl', {on_task_change = on_task_change_cb, if_not_exists = true})

Tarantool 1.6.8-771-g85df490

Concurrent consumers got same task from queue

Hello,

I've tried to create system with one producer and 5 consumers using tarantool queue and later i've noticed that sometimes two workers getting same task. Is there a way to avoid this issue or I just should use another queue for my purposes?

Fifottl, utubettl and priorities

> fifottl:put('pri1', {pri=1})
> fifottl:put('pri2', {pri=2})
> task1 = fifottl:take(.1)
> task2 = fifottl:take(.1)
> task1

---
- [0, 't', 'pri1']
...
> task2

---
- [1, 't', 'pri2']
...
> utubettl:put('pri1', {pri=1})
> utubettl:put('pri2', {pri=2})
> task1 = utubettl:take(.1)
> task2 = utubettl:take(.1)
> task1

---
- [0, 't', 'pri1']
...
> task2

---
- null
...

Is it a bug or intended behaviour for utubettl queue?

fifottl slow down event loop iteration

One of our Mail.Ru users implemented asynchronous Java adapter for tarantool/queue which emulates producer/consumer and sends a lot of put()/take() requests in parallel. tube type is "fifottl" and all tasks have "ttl < 5 sec". Producer is little bit faster than consumer, so queue has some
We realized that one event loop iteration may take more than 10 second in this case. gdb shown that some fiber, probably "fiberttl" performs a lot of box_select() requests without yielding.

Customer doesn't have enough time to isolate a test case. Please try to emulate this situation and discover what happens.

fifottl fiber in read only mode

"fifottl fiber" modifies data in queue's spaces and obviously will fail in "box.cfg.read_only=true"
Read only mode can be changed dynamically so it's good to keep "fifottl fiber" working all time but with continuos read_only state checking

add queue spaces format

Given spaces should be created with format:

  • _queue
  • _queue_consumers
  • _queue_taken
  • fifo tubes
  • fifottl tubes
  • utube tubes
  • utubettl tubes

Statistics doesn't cleared after dropping a tube

> t = queue.create_tube('t', 'fifottl')

---> queue.statistics('t')

---
- []
…

> t:put(1)

---
- [0, 'r', 1]
…

> queue.statistics('t')

---
- - t:
      tasks:
      - total: 1
      - ready: 1
      - taken: 0
      - buried: 0
      - done: 0
      - delayed: 0
      calls:
      - put: 1

> t:drop()

---
- true> queue.statistics() <should not raise an error

---
- error: '/home/vagrant/.luarocks/share/lua/5.1/queue/abstract.lua:380: attempt to
    index a nil value'
...


> queue.statistics('t')

---
- error: '/home/vagrant/.luarocks/share/lua/5.1/queue/abstract.lua:380: attempt to
    index a nil value'> queue.statistics('non_exsisting_tube')

---
- []
…

> t = queue.create_tube('t', 'fifottl')

---> queue.statistics('t')

---
- - t:
      tasks:
      - total: 0
      - ready: 0
      - taken: 0
      - buried: 0
      - done: 0
      - delayed: 0
      calls:
      - put: 1  <should be 0
...

Broken queue after update tarantool

Tarantool 1.7.5-195-gcaacfef
No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 16.04.3 LTS
Release:	16.04
Codename:	xenial

Output

tarantool> queue = require 'queue'
---
- error: Ambiguous field type, field 1. Requested type is string but the field has
    previously been defined as unsigned
...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.