Code Monkey home page Code Monkey logo

humusamqpmodule's People

Contributors

fntlnz avatar prolic avatar thomasvargiu avatar zhaovitapublic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

humusamqpmodule's Issues

Use case for possible improvement

Hello

I want to show you one use case which out team have used for our project. I hope it will show ways to improve current approach.
So, we believe, that current Consumer::consume() method which use get() method to get messages from queue isn't the best solution. Php AMQP extension object AMQPQueue has method consume() and it's work like a daemon.
We haven't found any possible solution to create Consumer with consume() method instead of default with get() method. And we did this.

In our config we have

'plugin_managers' => [
            'consumer' => [
                'abstract_factories' => [
                    \Application\Core\Infrastructure\Humus\Consumer\ConsumerAbstractServiceFactory::class
                ]
            ],
            'callback' => [
                'factories' => [
                    'import.core.application.console.history.consumer.HistoryConsumer' =>
//for example                        \Application\Core\Application\Console\History\Consumer\HistoryConsumerFactory::class,
                ],
            ],
        ],

We copied all code from default ConsumerAbstractServiceFactory and have changed only one line to use our Consumer.

$consumer = new Consumer($queues, $idleTimeout, $waitTimeout);

So, there is first bad place. Redefining of consumer should be easier.
And there is our Consumer code

class Consumer extends \HumusAmqpModule\Consumer
{
    /**
     * @param int $msgAmount
     */
    public function consume($msgAmount = 0)
    {
        $this->target = $msgAmount;
        /** @var \AMQPQueue $queue */
        foreach($this->queues as $queue) {
            $queue->consume(function ($message, $queue) {
                if (!$this->timestampLastAck) {
                    $this->timestampLastAck = microtime(1);
                }

                try {
                    $processFlag = $this->handleDelivery($message, $queue);
                } catch (\Exception $e) {
                    $this->handleDeliveryException($e);
                    $processFlag = false;
                }
                $this->handleProcessFlag($message, $processFlag);

                $now = microtime(1);

                if ($this->countMessagesUnacked > 0
                    && ($this->countMessagesUnacked == $this->blockSize
                        || ($now - $this->timestampLastAck) > $this->idleTimeout
                    )
                ) {
                    $this->ackOrNackBlock();
                }

                if ($this->usePcntlSignalDispatch) {
                    // Check for signals
                    pcntl_signal_dispatch();
                }

                if (!$this->keepAlive || (0 != $this->target && $this->countMessagesConsumed >= $this->target)) {
                    return;
                }
            });
        }
    }
}

As you can see we just moved original consumer code to callback function in consume() method.

This approach gives us better usage of module for our needs.
It would be great, if there will be easier way to overwrite Consumer or possibility to choose between two options - use get() method or consume() for running messages.

Migration to Laminas

Hi!
I'm one of the contributors to LaminasCommons (which is now the new ZendFramework Commons eg. ZfcUser).
The company for which I work is using this module and we are now migrating to Laminas.
Is there any chance that this module is migrated to Lamins in the next time?
Maybe we could add this module to the LaminasCommons repository so I could help you with the migration and people get to know it easier :)
What do you think about ?
Regards Alex

Consumer should return an object/value that can be checked as result

Actually, consumer handle delivery like this:

    /**
     * @param AMQPEnvelope $message
     * @param AMQPQueue $queue
     * @return bool|null
     * @triggers delivery
     */
    public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue)
    {
        $params = compact('message', 'queue');
        $results = $this->getEventManager()->trigger('delivery', $this, $params);
        return $results->last();
    }

I think it's a problem. Anyone should be able to attach a listener at any point of the application, just for logging, debug, events. If a listener is attached after the real consumer process, the return value is not correct.

I suggest to create an object for the response and do something like this:

    public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue)
    {
        $params = compact('message', 'queue');
        $results = $this->getEventManager()->triggerUntil('delivery', $this, $params, function($v) {
            return ($v instanceof Object);
        });

        if ($results->stopped()) {
            return $results->last();
        }

        // no ack (?)...
    }

We should also allow users to specify listener priorities in the configuration, something like that:

return [
    'humus_amqp_module' => [
        'consumers' => [
            'my-consumer' => [
                'listeners' => [
                    ['My\\Listener' => 100]
                ]
            ],
        ],
    ],
];

I'm starting to implement this, I need to code and to test it.

Wait timeout glitch for consumers with more than one queue

Consumers with more than one queue one of which is empty will usleep($this->waitTimeout). Which will lead to delayed processing of the other queues.
Ideally there should be a configuration with which it will be possible to configure the strategy of processing several queues (round-robin, fully-process-non-empty-queue). Do you think such a configuration would make sense?

Migration from ZF2 to ZF3

I have migrated my app from ZF2 to ZF3, but HumusAmqpModule now is not working.
I had version v0.3.1 before and config like this:

return [
    'controllers' => [
        'factories' => [
            TopicProducerController::class => TopicProducerControllerFactory::class,
        ]
    ],

    'humus_amqp_module' => [
        'exchanges' => [
            'opm-exchange' => [
                'name' => 'opm-exchange',
                'type' => 'direct',
                'arguments' => [],
            ],
            'demo.error' => [
                'name' => 'demo.error',
                'type' => 'direct',
                'arguments' => [],
            ],
        ],
        'queues' => [
            'opm-mail' => [
                'name' => 'opm-mail-queue',
                'exchange' => 'opm-exchange',
                'routing_keys' => [],
                'arguments' => [
                    'x-dead-letter-exchange' => 'demo.error' // must be defined as exchange before
                ],
                'bind_arguments' => [],
            ],
        ],
        'connections' => [
            'default' => [
                'host' => 'localhost',
                'port' => 5672,
                'login' => 'guest',
                'password' => 'guest',
                'vhost' => '/',
                'persistent' => true,
                'read_timeout' => 1, //sec, float allowed
                'write_timeout' => 1, //sec, float allowed
            ],
        ],
        'producers' => [
            'opm-producer' => [
                'exchange' => 'opm-exchange',
                'qos' => [
                    'prefetch_size' => 0,
                    'prefetch_count' => 10
                ],
                'auto_setup_fabric' => true
            ],
        ],
        'consumers' => [
            'opm-consumer' => [
                'queues' => [
                    'opm-mail'
                ],
                'auto_setup_fabric' => true,
                'callback' => 'mail',
                'idle_timeout' => 10,
                'logger' => 'consumer-logger',
                'error_callback' => 'errorcallback'
            ],
        ],

        'plugin_managers' => [
            'callback' => [
                'invokables' => [
                    'echo'          => EchoCallback::class,
                    'error'         => EchoErrorCallback::class,
                    'poweroftwo'    => PowerOfTwoCallback::class,
                    'randomint'     => RandomIntCallback::class,
                    'errorcallback' => ErrorCallback::class,
                ],

                'factories' => [
                    'mail' => MailCallbackFactory::class
                ]
            ]
        ]
    ],
];

After migration my composer.json looks like that:

{
  "require": {
    "php": "~7.0",
    "ext-amqp": ">=1.7.0",
    "ext-zip": "*",
    "ext-intl": "*",
    "ext-mbstring": "*",
    "ext-gd": "*",
    "ext-xml": "*",
    "ext-curl": "*",
    "ext-bcmath": "*",
    "roave/security-advisories": "dev-master",
    "zendframework/zendframework": "^3.0",
    "zendframework/zend-mvc-console": "^1.1",
    "phpoffice/phpexcel": "dev-master",
    "imagine/Imagine": "dev-master",
    "php-amqplib/php-amqplib": "2.6.2",
    "prolic/humus-amqp": "^1.1",
    "prolic/humus-amqp-module": "^1.0",
    ...
  }
}

Also in application.config.php in modules section added HumusAmqpModule

What I need to change to make everything work.
Thanks.

Socket Error uncaught exception when trying to consume messages

https://github.com/prolic/HumusAmqpModule/blob/master/src/HumusAmqpModule/Consumer.php#L271

======================================================================
   The application has thrown an exception!
======================================================================
 AMQPException
 Library error: a socket error occurred
----------------------------------------------------------------------
/usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php:271
#0 /usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php(271): AMQPQueue->get()
   /usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Controller/ConsumerController.php(87): HumusAmqpModule\Consumer->consume(0)
#2 /usr/lib/composer/vendor/zendframework/zend-mvc/src/DispatchListener.php(114): HumusAmqpModule\Controller\ConsumerController->dispatch(Object(Zend\Console\Request), Object(Zend\Console\Response))
#3 [internal function]: Zend\Mvc\DispatchListener->onDispatch(Object(Zend\Mvc\MvcEvent))
#4 /usr/lib/composer/vendor/zendframework/zend-eventmanager/src/EventManager.php(490): call_user_func(Array, Object(Zend\Mvc\MvcEvent))
#5 /usr/lib/composer/vendor/zendframework/zend-eventmanager/src/EventManager.php(263): Zend\EventManager\EventManager->triggerListeners('dispatch', Object(Zend\Mvc\MvcEvent), Object(Closure))
#6 /usr/lib/composer/vendor/zendframework/zend-mvc/src/Application.php(340): Zend\EventManager\EventManager->triggerEventUntil(Object(Closure), Object(Zend\Mvc\MvcEvent))
#7 /var/www/public/index.php(21): Zend\Mvc\Application->run()
#8 {main}
======================================================================

Wondering if anything what could be causing this exception and if it should be encapsulated within a try/catch

Add events on Consumer and RpcServer

In order to use any class and to maintain a clean code, will be useful to have a CallbackInterface to be implemented in any class.

It's better to implement events. The idea is to implement events on Consumer and RpcServer and use classes that implements ListenerAggregateInterface to handle events.

RpcServer Response handling

Hi,
thanks for provider this great module. We have implemented several rpc-server and consumer project with this module.
Recently, I got a situation, where i need to modify the success flag and message of response by a rpc-server, but not throw an exception within customized CallBack class. In my callback locate an input filter with validators. If the validation process failed, I want return an array of error messages and give an unsuccess flag to the rpc-client.
Can you make a feature, that in callback we not only affacts the result, but also the success flag?
Or do you have some idea, that I can do it better in the case above?
Thanks in advance

Segmentation fault (core dumped) on consumer run

Spotted on different servers

one

php -v
PHP 5.6.18 (cli) (built: Feb  3 2016 12:50:09) 
Copyright (c) 1997-2016 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2016 Zend Technologies
    with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2016, by Zend Technologies

two

php -v
PHP 5.6.18-1+deb.sury.org~wily+1 (cli) 
Copyright (c) 1997-2016 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2016 Zend Technologies
    with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2016, by Zend Technologies

Queues is registered in rabbit. Messages was added successful, but consumer fails.
Our command to run looks like this

/usr/bin/php /home/project/public/index.php humus amqp consumer reporting.history.cpm.consumer

strace output below

open("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/QueueFactoryInterface.php", O_RDONLY) = 7
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
mmap(NULL, 1355, PROT_READ, MAP_SHARED, 7, 0) = 0x7f9bb8e92000
stat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/QueueFactoryInterface.php", {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fcntl(3, F_SETLKW, {type=F_WRLCK, whence=SEEK_SET, start=0, len=1}) = 0
fcntl(3, F_SETLK, {type=F_UNLCK, whence=SEEK_SET, start=0, len=1}) = 0
munmap(0x7f9bb8e92000, 1355)            = 0
close(7)                                = 0
sendto(6, "\1\0\1\0\0\0!\0002\0\n\0\0\25reporting.history."..., 41, MSG_NOSIGNAL, NULL, 0) = 41
recvfrom(6, 0x2c672b0, 131072, 0, 0, 0) = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=6, events=POLLIN}], 1, -1)    = 1 ([{fd=6, revents=POLLIN}])
recvfrom(6, "\1\0\1\0\0\0\"\0002\0\v\25reporting.history.cp"..., 131072, 0, NULL, NULL) = 42
sendto(6, "\1\0\1\0\0\0004\0002\0\24\0\0\25reporting.history."..., 60, MSG_NOSIGNAL, NULL, 0) = 60
recvfrom(6, 0x2c672b0, 131072, 0, 0, 0) = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=6, events=POLLIN}], 1, -1)    = 1 ([{fd=6, revents=POLLIN}])
recvfrom(6, "\1\0\1\0\0\0\4\0002\0\25\316", 131072, 0, NULL, NULL) = 12
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
open("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", O_RDONLY) = 7
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
mmap(NULL, 11161, PROT_READ, MAP_SHARED, 7, 0) = 0x7f9ba072a000
stat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fcntl(3, F_SETLKW, {type=F_WRLCK, whence=SEEK_SET, start=0, len=1}) = 0
--- SIGSEGV {si_signo=SIGSEGV, si_code=SI_KERNEL, si_addr=0} ---
+++ killed by SIGSEGV (core dumped) +++
Segmentation fault (core dumped)

unable to get service locator from inside callback function

I'm currently implementing some functionality that relies on fetching a Domain service from inside the callback function. What is the appropriate way to get the service manager?

I've defined the invokable class inside of configuration as well as added the ServiceLocatorAwareInterface to my callback class. This is the typical approach that I would expect to work in order to be able to get access to the service manager; however, this does not appear to be the case.

Service "ExampleService" has been requested to plugin manager of type "HumusAmqpModule\PluginManager\Callback",
 but couldn't be retrieved.
A previous exception of type "Zend\ServiceManager\Exception\ServiceNotFoundException" has been raised in the process.
By the way, a service with the name "ExampleService" has been found in the parent service locator "Zend\ServiceManager\ServiceManager": did you forget to use $parentLocator = $serviceLocator->getServiceLocator() in your factory code?

is the exception that is thrown.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.