Code Monkey home page Code Monkey logo

queue's Introduction

Yii Queue Extension


An extension for running tasks asynchronously via queues.

Documentation is at docs/guide/README.md.

Latest Stable Version Total Downloads Build status Scrutinizer Code Quality Code Coverage Mutation testing badge static analysis type-coverage

Installation

The preferred way to install this extension is through composer.

Either run

composer require yiisoft/queue

or add

"yiisoft/queue": "~3.0"

to the require section of your composer.json file.

Ready for yiisoft/config

If you are using yiisoft/config, you'll find out this package has some defaults in the common and params configurations saving your time. Things you should change to start working with the queue:

  • Optionally: define default \Yiisoft\Queue\Adapter\AdapterInterface implementation.
  • And/or define channel-specific AdapterInterface implementations in the channel-definitions params key to be used with the queue factory.
  • Define message handlers in the handlers params key to be used with the QueueWorker.
  • Resolve other \Yiisoft\Queue\Queue dependencies (psr-compliant event dispatcher).

Differences to yii2-queue

If you have experience with yiisoft/yii2-queue, you will find out that this package is similar. Though, there are some key differences which are described in the "migrating from yii2-queue" article.

Basic Usage

Each queue task consists of two parts:

  1. A message is a class implementing MessageInterface. For simple cases you can use the default implementation, Yiisoft\Queue\Message\Message. For more complex cases you should implement the interface by your own.
  2. A message handler is a callable called by a Yiisoft\Queue\Worker\Worker. The handler handles each queue message.

For example, if you need to download and save a file, your message may look like the following:

$data = [
    'url' => $url,
    'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);

Then you should push it to the queue:

$queue->push($message);

Its handler may look like the following:

class FileDownloader
{
    private string $absolutePath;

    public function __construct(string $absolutePath) 
    {
        $this->absolutePath = $absolutePath;
    }

    public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
    {
        $fileName = $downloadMessage->getData()['destinationFile'];
        $path = "$this->absolutePath/$fileName"; 
        file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
    }
}

The last thing we should do is to create a configuration for the Yiisoft\Queue\Worker\Worker:

$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
    $handlers, // Here it is
    $logger,
    $injector,
    $container
);

There is the way to run all the messages that are already in the queue, and then exit:

$queue->run(); // this will execute all the existing messages
$queue->run(10); // while this will execute only 10 messages as a maximum before exit

If you don't want your script to exit immediately, you can use the listen method:

$queue->listen();

You can also check the status of a pushed message (the queue adapter you are using must support this feature):

$queue->push($message);
$id = $message->getId();

// Get status of the job
$status = $queue->status($id);

// Check whether the job is waiting for execution.
$status->isWaiting();

// Check whether a worker got the job from the queue and executes it.
$status->isReserved();

// Check whether a worker has executed the job.
$status->isDone();

Different queue channels

Often we need to push to different queue channels with an only application. There is the QueueFactory class to make different Queue objects creation for different channels. With this factory channel-specific Queue creation is as simple as

$queue = $factory->get('channel-name');

The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in the $definitions constructor parameter of the factory, where keys are channel names and values are definitions for the Yiisoft\Factory\Factory. Below are some examples:

use Yiisoft\Queue\Adapter\SynchronousAdapter;

[
    'channel1' => new SynchronousAdapter(),
    'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
    'channel3' => [
        'class' => SynchronousAdapter::class,
        '__constructor' => ['channel' => 'channel3'],
    ],
]

For more information about a definition formats available see the factory documentation.

Another queue factory usage strategy is implicit adapter creation via withChannel() method call. To use this approach you should pass some specific constructor parameters:

  • true to the $enableRuntimeChannelDefinition
  • a default AdapterInterface implementation to the $defaultAdapter.

In this case $factory->get('channel-name') call will be converted to $this->queue->withAdapter($this->defaultAdapter->withChannel($channel)), when there is no explicit adapter definition in the $definitions.

Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes in channel names.

Console execution

The exact way of task execution depends on the adapter used. Most adapters can be run using console commands, which the component automatically registers in your application.

The following command obtains and executes tasks in a loop until the queue is empty:

yii queue:run

The following command launches a daemon which infinitely queries the queue:

yii queue:listen

See the documentation for more details about adapter specific console commands and their options.

The component also has the ability to track the status of a job which was pushed into queue.

For more details see the guide.

Middleware pipelines

Any message pushed to a queue or consumed from it passes through two different middleware pipelines: one pipeline on message push and another - on message consume. The process is the same as for the HTTP request, but it is executed twice for a queue message. That means you can add extra functionality on message pushing and consuming with configuration of the two classes: PushMiddlewareDispatcher and ConsumeMiddlewareDispatcher respectively.

You can use any of these formats to define a middleware:

  • A ready-to-use middleware object: new FooMiddleware(). It must implement MiddlewarePushInterface, MiddlewareConsumeInterface or MiddlewareFailureInterface depending on the place you use it.
  • An array in the format of yiisoft/definitions. Only if you use yiisoft/definitions and yiisoft/di.
  • A callable: fn() => // do stuff, $object->foo(...), etc. It will be executed through the yiisoft/injector, so all the dependencies of your callable will be resolved.
  • A string for your DI container to resolve the middleware, e.g. FooMiddleware::class

Middleware will be executed forwards in the same order they are defined. If you define it like the following: [$middleware1, $midleware2], the execution will look like this:

graph LR
    StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue)
    -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1]
    PushMiddleware1[$middleware1] -.-> EndPush((End))
    

    StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle)
    -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1]
    ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))

Push pipeline

When you push a message, you can use middlewares to modify both message and queue adapter. With message modification you can add extra data, obfuscate data, collect metrics, etc.
With queue adapter modification you can redirect message to another queue, delay message consuming, and so on.

To use this feature you have to create a middleware class, which implements MiddlewarePushInterface, and return a modified PushRequest object from the processPush method:

return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);

With push middlewares you can define an adapter object at the runtime, not in the Queue constructor. There is a restriction: by the time all middlewares are executed in the forward order, the adapter must be specified in the PushRequest object. You will get a AdapterNotConfiguredException, if it isn't.

You have three places to define push middlewares:

  1. PushMiddlewareDispatcher. You can pass it either to the constructor, or to the withMiddlewares() method, which
    creates a completely new dispatcher object with only those middlewares, which are passed as arguments. If you use yiisoft/config, you can add middleware to the middlewares-push key of the yiisoft/queue array in the params.
  2. Pass middlewares to either Queue::withMiddlewares() or Queue::withMiddlewaresAdded() methods. The difference is that the former will completely replace an existing middleware stack, while the latter will add passed middlewares to the end of the existing stack. These middlewares will be executed after the common ones, passed directly to the PushMiddlewareDispatcher. It's useful when defining a queue channel. Both methods return a new instance of the Queue class.
  3. Put middlewares into the Queue::push() method like this: $queue->push($message, ...$middlewares). These middlewares have the lowest priority and will be executed after those which are in the PushMiddlewareDispatcher and the ones passed to the Queue::withMiddlewares() and Queue::withMiddlewaresAdded() and only for the message passed along with them.

Consume pipeline

You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Unless Push pipeline, you have only one place to define the middleware stack: in the ConsumeMiddlewareDispatcher, either in the constructor, or in the withMiddlewares() method. If you use yiisoft/config, you can add middleware to the middlewares-consume key of the yiisoft/queue array in the params.

Error handling pipeline

Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in yiisoft/queue with Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any Throwable. The key differences from the previous two pipelines:

  • You should set up the middleware pipeline separately for each queue channel. That means, the format should be ['channel-name' => [FooMiddleware::class]] instead of [FooMiddleware::class], like for the other two pipelines. There is also a default key, which will be used for those channels without their own one: FailureMiddlewareDispatcher::DEFAULT_PIPELINE.
  • The last middleware will throw the exception, which will come with the FailureHandlingRequest object. If you don't want the exception to be thrown, your middlewares should return a request without calling $handler->handleFailure().

You can declare error handling middleware pipeline in the FailureMiddlewareDispatcher, either in the constructor, or in the withMiddlewares() method. If you use yiisoft/config, you can add middleware to the middlewares-fail key of the yiisoft/queue array in the params.

See error handling docs for details.

Extra

Unit testing

The package is tested with PHPUnit. To run tests:

./vendor/bin/phpunit

Mutation testing

The package tests are checked with Infection mutation framework. To run it:

./vendor/bin/infection

Static analysis

The code is statically analyzed with Psalm. To run static analysis:

./vendor/bin/psalm

Support the project

Open Collective

Follow updates

Official website Twitter Telegram Facebook Slack

License

The Yii Queue Extension is free software. It is released under the terms of the BSD License. Please see LICENSE for more information.

Maintained by Yii Software.

queue's People

Contributors

airani avatar arhell avatar bscheshirwork avatar cebe avatar dependabot-preview[bot] avatar dependabot[bot] avatar devanych avatar fantom409 avatar iamsaint avatar ischenko avatar kids-return avatar lesha701 avatar luke- avatar machour avatar mikehaertl avatar mkubenka avatar romkatsu avatar rustamwin avatar s1lver avatar samdark avatar samnela avatar silverfire avatar softark avatar stylecibot avatar terabytesoftw avatar thenotsoft avatar viktorprogger avatar vjik avatar xepozz avatar zhuravljov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

queue's Issues

Create console commands

New console commands (old ones were deleted during refactoring) were not introduced yet. We need to have one for Queue::run and one for Queue::listen methods.

amqp: incorrect message arguments (on second and following messages)

What steps will reproduce the problem?

see fork pull request (link: semsty/yii3-demo#1)

docker-compose up worker (or run ./yii queue/listen channel-1 -vvv in another window)

push multiple messages

./yii queue-test -s 1
./yii queue-test -s 2
./yii queue-test -s 3

What is the expected result?

worker_1    | {"some_id":"1","time":1671276909}
worker_1    | object_id: 7348
worker_1    | {"some_id":"2","time": ...}
worker_1    | object_id: another_message_object_id
worker_1    | {"some_id":"3","time": ...}
worker_1    | object_id: another_message_object_id_2

What do you get instead?

worker_1    | {"some_id":"1","time":1671276909}
worker_1    | object_id: 7348
worker_1    | {"some_id":"1","time":1671276909}
worker_1    | object_id: 7348
worker_1    | {"some_id":"1","time":1671276909}
worker_1    | object_id: 7348

problem there ??? https://github.com/yiisoft/yii-queue/blob/master/src/Worker/Worker.php#L62 (injector->invoke will return handler with first message??)

"bad" fix semsty#1

after fix result is correct

worker_1    | {"some_id":"1","time":1671278169}
worker_1    | object_id: 7348
worker_1    | {"some_id":"2","time":1671278170}
worker_1    | object_id: 7341
worker_1    | {"some_id":"3","time":1671278171}
worker_1    | object_id: 7336

Additional info

Q A
Version master ?
PHP version 8.1
Operating system

Dependabot can't resolve your PHP dependency files

Dependabot can't resolve your PHP dependency files.

As a result, Dependabot couldn't update your dependencies.

The error Dependabot encountered was:

Your requirements could not be resolved to an installable set of packages.
  Problem 1
    - Installation request for yiisoft/yii-console ^3.0@dev -> satisfiable by yiisoft/yii-console[3.0.x-dev].
    - yiisoft/yii-console 3.0.x-dev requires psr/container-implementation 1.0.0 -> no matching package found.

Potential causes:
 - A typo in the package name
 - The package is not available in a stable-enough version according to your minimum-stability setting
   see <https://getcomposer.org/doc/04-schema.md#minimum-stability> for more details.
 - It's a private package and you forgot to add a custom repository to find it

Read <https://getcomposer.org/doc/articles/troubleshooting.md> for further common problems.

If you think the above is an error on Dependabot's side please don't hesitate to get in touch - we'll do whatever we can to fix it.

View the update logs.

Handler have access to the cli output of the worker

Workers who process messages from the queue are often run through daemons. For example supervisor.

[program:my-queue-program]
command=php yii queue/listen --channel=download-images-queue
stdout_logfile=/logs/download-images-queue.log

It is very convenient when there is a log file for one worker.

The log file at the same time contains approximately the following:

Start processing queue messages.
Processed 1 queue messages.

And I would like to be able to write messages to output from the queue handler.

I suggest transmitting not only a message to the worker, but also a console logger.

Use CallableFactory from yii-event instead of it copy

\Yiisoft\Yii\Queue\Worker\Worker::prepare() is just a copy of CallableFactory::prepare(). I think it's not the last place where we need to create some callable from its definition.
I suggest either move the factory from yii-event to injector, or create a separate package.

Avoid parasitic dependencies inside job

For now while using yii2-like jobs with DI autowiring we create them like this:

final class SomeJob implements JobInterface
{
    private ServiceInterface $service;
    private int $data;

    public function __construct($data, ServiceInterface $service)
    {
        $this->source = $service;
        $this->data = $data;
    }

    public function execute(): void
    {
        $this->service->doStuff($this->data);
    }
}

It means we will push the whole job to the queue, including service dependency, which may be very heavy and contain closures which are hard to serialize.

I propose to change the concept of Job and declare it as a Message. Message in this case will be an event/DTO and will have to have subscriber(s) which will do stuff with received data. Message handlers will be declared via EventDispatcher. This approach is used via symfony/messenger.

Pros:

  • Message can be any object
  • Simple message serialization

Cons:

  • I don't see them for now

Another way is to continue using yii2-like jobs, but execute them through Injector to inject dependencies into execute method.

Pros:

  • We still have familiar yii2-like jobs

Cons:

  • We require jobs to have execute method but can't declare an interface for it because of variadic parameter count.

The third way to avoid parasitic dependencies inside queue server message is to provide general hardcoded serialization and deserialization way which will allow for Job to serialize sensitive data only and to inject dependencies on deserialization.

Pros:

  • We still have familiar yii2-like jobs
  • More control on how to serialize job

Cons:

  • It is a hard way
  • We can't take into consideration some complicated cases
  • The second pro is achievable by custom serializer usage in the two other cases.

More convenient way to configure adapters

It's pain now to configure different queue channels in application. In case of yii-queue-amqp usage user should create aliased configuration for 5 classes on each channel. This means 50 classes for 10 channels!

I have a suggestion to solve the problem:

  • Queue class should have withAdapter(AdapterInterface $adapter) method
  • AdapterInterface should have withChannel(string $channel) method
  • Create a queue factory with the following interface:
interface QueueFactory {
    public function get(?string $channel = null): Queue;
}

It will return Queue object with either the default adapter when $channel is null, or a adapter configured with the given channel. It means some configuration array should be passed to the factory. In my vision this array will have channel names as keys and values in one of the formats:

  • No key/value pair. The factory will return $queue->withAdapter($adapter->withChannel($channel)). This ability should be disabled by default in order to prevent user mistakes. User should explicitly allow this behavior.
  • A callable that returns AdapterInterface. It will be called with the Injector.
  • An array configuration of AdapterInterface to be passed to the yiisoft/factory.

Adjust namespaces

Currently namespaces are using plurals i.e. exceptions, events. We are using singular elsewhere i.e. exception, event.

Should be adjusted.

Behavior refactoring

Consider refactoring message behavior into middleware stack with middlewares returning re-configured Adapters.
E.g. for deferred message with amqp adapter the behavior middleware will return a new Adapter implementation configured to send the message to a temporary queue with redirection to the final target queue after the given TTL.

There just one more question: how to make a unique interface for all the possible adapters?

update Tests folder links

What steps will reproduce the problem?

http=>https

What is the expected result?

What do you get instead?

Additional info

Q A
Version 1.0.?
PHP version
Operating system

Improve Job management via "Full duplex" Queue

FOR BRAINSTORMING

Target:

  • provide more information to end-user about job status
  • add possibility to cancel job

How to achieve:

JobStatus contains:

  • id
  • state (unknown, waiting, running, done, cancelled, failed....)
  • process indicator (sequential number)
  • process indicator [0.0 - 100.0]
  • fail reason
  • cancel initiator (user, ...)
  • additional info (e.g. PID for cancel, timestamps, display info , ...)

JobStatus is Message
Job Handler periodically (within processing loop) pushes JobStatus via Queue to Producer.

JobCancel is Message
Monitoring app. pushes JobCancel to Queue, on the other side QueueLoop (implements LoopInterface) receives it ...

Function "run Job" moved from Adapter to Queue.

Flow is generic, does not depend on specific functionality of Adapter.

Add Features for run-time checking of possible flow

Two examples from the documentation:

  • Important: Not every adapter (such as synchronous adapter) supports delayed execution
  • You can also check the status of a pushed message (the queue adapter you are using must support this feature)

Possibly

  • Features should be added as first class citizens
  • AdapterInterface will return features supported by used adapter

This functionality will allow to develop common code for different adapters.

Preventing job overlaps

Tasks arise when there are several tasks in the queue that do the same thing. For example, you need to index a product (which consists of options, pictures, prices, and so on).
When the price changes, an indexing task is sent, when the name changes, an indexing task is sent, and so on. There will be several tasks in the queue with the same product ID, while processing one task, the previous ones that are in the queue need to be skipped.
Or you just need to limit the execution of certain tasks in time (for example, no more than once a minute)
How do you think this relates to the queue? I think you can do this on events, or provide middleware for workers.

channel does't overridden (amqp, withChannel with another adapter does not work ? )

What steps will reproduce the problem?

see fork pull request (link: semsty/yii3-demo#1)

run ./yii queue-test -s 1

then check queue list with rabbitmqctl list_queues -p app

What is the expected result?

channel-1 1

What do you get instead?

yii-queue 1

fix there https://github.com/yiisoft/yii-queue/blob/master/src/QueueFactory.php#L89

            return $this->queue->withAdapter($adapter);

to

            return $this->queue->withAdapter($adapter->withChannel($channel));

after that messages pushed into correct channel, but in QueueTestCommand $channel->getChannelName() still return yii-queue

Additional info

Q A
Version master ?
PHP version 8.1
Operating system

Rename MessageInterface.getName to MessageInterface.gethHandlerName

When creating a message, the name parameter is misleading:

final class Message extends AbstractMessage
{
    ....
    public function __construct(string $name, $data)
    {
        $this->name = $name;
        $this->data = $data;
    }
'yiisoft/yii-queue' => [
    'handlers' => [
        'yii-handler-1' => [new \App\Job\HandlerOne(), 'handle'],
        'yii-handler-2' => [new \App\Job\HandlerTwo(), 'handle'],
    ],
    'channel-definitions' => [
        'yii-queue-1' =>  static fn(\Yiisoft\Yii\Queue\AMQP\Adapter $adapter) => $adapter->withChannel('yii-queue-1'),
    ],
],
$message1 = new \Yiisoft\Yii\Queue\Message\Message('yii-handler-1', ['message-1']);
$message2 = new \Yiisoft\Yii\Queue\Message\Message('yii-handler-2', ['message-2']);
$message3 = new \Yiisoft\Yii\Queue\Message\Message('yii-handler-3', ['message-3']);

$this->queueFactory->get('yii-queue-1')->push($message1);
$this->queueFactory->get('yii-queue-1')->push($message2);
$this->queueFactory->get('yii-queue-1')->push($message3);

In fact, this parameter defines the name of the handler.

#yii queue/listen yii-queue-1
Handle HandlerOne, yii-handler-1, message - ["message-1"]
Handle HandlerTwo, yii-handler-2, message - ["message-2"]
Error: No handler for message yii-handler-3 

I suggest renaming the parameter to increase the clarity of what is happening

Delayed jobs

Introduce jobs with delay time that are handled after a certain time.

Enable logging

Queue logging was removed during refactoring. We need to add it again, but in another way. I suggest to create a new class which will listen to the queue events and write messages to log. I think a good place for a such subscription is in console commands, where we can also create an option to disable logging.

Adjust dependencies

There is lots of redundant things in composer.json now. After the refactoring is finished we should remove estra packages from it, check versions and add drivers to the suggestions block.

Need to initialize

Currently unable to fork
You can use yii2-queue master and fix composer.json to submit to packagist
I will do some initial porting work to ensure normal work, and then carry out related optimization.

Pipelines

Add middleware pipelines for

  • Pushing a message to a queue. Every middleware should have an ability to modify both message and adapter through which the message will be pushed. Message can be enriched with new data/metadata, push can be canceled, and adapter can be changed (i.e. in order to push message with a delay) and other modifications.
  • Pulling message from a queue. Every middleware should have an ability to modify a message it is reading.

Configuration ways:

  1. Application config
    • Filter by channel name. Middleware will be applied only for channels it is related to. Wildcards should be supported.
    • Filter by message name. Middleware will be applied only for messages it is related to. Wildcards should be supported.
  2. On message push (for push-related middlewares). It will be useful for message push strategy change. I.e. when you want to push an only message with some delay, while the others should be pushed without delay: $queue->push($message, $delayMiddleware);.

Job failure strategy per channel

It would be useful to be able to specify job failure handling strategy per channel.

Out of my mind, the following are valid strategies:

  1. Ignore i.e. do nothing.
  2. Retry with exponentially raising delay and a limit.
  3. Send to another channel.
  4. Custom-made class implementing FailureStrategyInterface.

It's a good idea if strategies are done as middleware pipeline. Each strategy can either pass handling to next one or handle failure itself. Example:

Retry 5 times then send to another channel.

Foreign (non-PHP) Workers

Message format allows processing of jobs using non-PHP workers.

Foreign Worker requirements:

  • Worker is CLI application
  • listens on YiiQueue, receives Messages and processes Jobs
  • supports the same protocol and set of drivers (RabbitMQ, Redis, etc)
  • extendable: possibility to add
    • new type of driver
    • job handlers

"All the message data is fully serializable (that means message data must be serializable too)"

Additional requirements for PHP side:

Consider usage of Cloud Events as envelope for submitting jobs to Foreign Workers.

Remove events

There are no cases of using the event, except for metrics (they can be configured by other means), we decided to delete them.

Queue is created with the wrong channel

What steps will reproduce the problem?

Configure queue

    'yiisoft/yii-queue' => [
        'handlers' => [
            'yii-queue-1' => [new \App\Job\HandlerOne(), 'handle'],
        ],
        'channel-definitions' => [
            'yii-queue-1' => \Yiisoft\Yii\Queue\AMQP\Adapter::class,
        ],
    ],

Push message:

$message = new \Yiisoft\Yii\Queue\Message\Message('yii-queue-1', ['message']);
$this->queueFactory->get('yii-queue-1')->push($message);

What is the expected result?

A queue will be created 'yii-queue-1'

What do you get instead?

A queue has been created with the default name 'yii-queue'

изображение

Additional info

There will be no problem if configured like this:

'channel-definitions' => [
    'yii-queue-1' =>  static fn(\Yiisoft\Yii\Queue\AMQP\Adapter $adapter) => $adapter->withChannel('yii-queue-1'),
],

The problem is in the factory, here:
https://github.com/yiisoft/yii-queue/blob/master/src/QueueFactory.php#L89

StreamTarget (stdout) does not work with queue/listen

What steps will reproduce the problem?

add StreamTarget to logger

run queue/listen (with messages to process in the queue) with -vvv arg

What is the expected result?

with queue/listen -vvv all messages from logger are visible on stdout

What do you get instead?

logs are not visible in stdout

Additional info

Q A
Version master ?
PHP version 8.1
Operating system

Enhance queue and validator collectors

Here was added an integration with yii-queue.

I think it would be useful to pass a class that called any "queue" methods, such as push(), status() and the rest.
Sometimes user may know which class pushed message to a queue, even if the one may be pushed from a few handlers.
I mean the same functionality as in EventDispatcherInterfaceProxy:

public function dispatch(object $event): object
{
    [$callStack] = debug_backtrace();

    $this->collector->collect($event, $callStack['file'] . ':' . $callStack['line']);

    return $this->dispatcher->dispatch($event);
}

Also the same thing is needed for the ValidatorCollector

"multiqueue" worker [feature request]

useful functionality in production

with command queue/listen ... worker starts to listen several queues

there are several options for message processing:

  1. randomly (messages are processed with random order from the channels list)
  2. round-robin (messages are processed in order by channels list by one or batch)
  3. by priority in channel list order (e.g. messages from queue-1 have priority over messages from queue-2, etc.)
  4. ... other custom variations

Move commands

@xepozz suggests to move console commands from this package to yii-console to remove yii-console configuration from yii-queue code and to decouple these packages.

Ensure logging

  • Adding job to a queue, info level
  • Processing a job, info level
  • Errors, error level

Write tests

Improve test coverage and MSI up to 100%, set Psalm level 1.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.