prolic / humusamqpmodule Goto Github PK
View Code? Open in Web Editor NEWAMQP module for Zend Framework 2 to integrate RabbitMQ
Home Page: https://humusamqp.readthedocs.io
License: MIT License
AMQP module for Zend Framework 2 to integrate RabbitMQ
Home Page: https://humusamqp.readthedocs.io
License: MIT License
(rabbitmq extension)
Hello
I want to show you one use case which out team have used for our project. I hope it will show ways to improve current approach.
So, we believe, that current Consumer::consume() method which use get() method to get messages from queue isn't the best solution. Php AMQP extension object AMQPQueue has method consume() and it's work like a daemon.
We haven't found any possible solution to create Consumer with consume() method instead of default with get() method. And we did this.
In our config we have
'plugin_managers' => [
'consumer' => [
'abstract_factories' => [
\Application\Core\Infrastructure\Humus\Consumer\ConsumerAbstractServiceFactory::class
]
],
'callback' => [
'factories' => [
'import.core.application.console.history.consumer.HistoryConsumer' =>
//for example \Application\Core\Application\Console\History\Consumer\HistoryConsumerFactory::class,
],
],
],
We copied all code from default ConsumerAbstractServiceFactory and have changed only one line to use our Consumer.
$consumer = new Consumer($queues, $idleTimeout, $waitTimeout);
So, there is first bad place. Redefining of consumer should be easier.
And there is our Consumer code
class Consumer extends \HumusAmqpModule\Consumer
{
/**
* @param int $msgAmount
*/
public function consume($msgAmount = 0)
{
$this->target = $msgAmount;
/** @var \AMQPQueue $queue */
foreach($this->queues as $queue) {
$queue->consume(function ($message, $queue) {
if (!$this->timestampLastAck) {
$this->timestampLastAck = microtime(1);
}
try {
$processFlag = $this->handleDelivery($message, $queue);
} catch (\Exception $e) {
$this->handleDeliveryException($e);
$processFlag = false;
}
$this->handleProcessFlag($message, $processFlag);
$now = microtime(1);
if ($this->countMessagesUnacked > 0
&& ($this->countMessagesUnacked == $this->blockSize
|| ($now - $this->timestampLastAck) > $this->idleTimeout
)
) {
$this->ackOrNackBlock();
}
if ($this->usePcntlSignalDispatch) {
// Check for signals
pcntl_signal_dispatch();
}
if (!$this->keepAlive || (0 != $this->target && $this->countMessagesConsumed >= $this->target)) {
return;
}
});
}
}
}
As you can see we just moved original consumer code to callback function in consume() method.
This approach gives us better usage of module for our needs.
It would be great, if there will be easier way to overwrite Consumer or possibility to choose between two options - use get() method or consume() for running messages.
Hi!
I'm one of the contributors to LaminasCommons (which is now the new ZendFramework Commons eg. ZfcUser).
The company for which I work is using this module and we are now migrating to Laminas.
Is there any chance that this module is migrated to Lamins in the next time?
Maybe we could add this module to the LaminasCommons repository so I could help you with the migration and people get to know it easier :)
What do you think about ?
Regards Alex
Actually, consumer handle delivery like this:
/**
* @param AMQPEnvelope $message
* @param AMQPQueue $queue
* @return bool|null
* @triggers delivery
*/
public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue)
{
$params = compact('message', 'queue');
$results = $this->getEventManager()->trigger('delivery', $this, $params);
return $results->last();
}
I think it's a problem. Anyone should be able to attach a listener at any point of the application, just for logging, debug, events. If a listener is attached after the real consumer process, the return value is not correct.
I suggest to create an object for the response and do something like this:
public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue)
{
$params = compact('message', 'queue');
$results = $this->getEventManager()->triggerUntil('delivery', $this, $params, function($v) {
return ($v instanceof Object);
});
if ($results->stopped()) {
return $results->last();
}
// no ack (?)...
}
We should also allow users to specify listener priorities in the configuration, something like that:
return [
'humus_amqp_module' => [
'consumers' => [
'my-consumer' => [
'listeners' => [
['My\\Listener' => 100]
]
],
],
],
];
I'm starting to implement this, I need to code and to test it.
Consumers with more than one queue one of which is empty will usleep($this->waitTimeout)
. Which will lead to delayed processing of the other queues.
Ideally there should be a configuration with which it will be possible to configure the strategy of processing several queues (round-robin, fully-process-non-empty-queue). Do you think such a configuration would make sense?
I have migrated my app from ZF2 to ZF3, but HumusAmqpModule now is not working.
I had version v0.3.1
before and config like this:
return [
'controllers' => [
'factories' => [
TopicProducerController::class => TopicProducerControllerFactory::class,
]
],
'humus_amqp_module' => [
'exchanges' => [
'opm-exchange' => [
'name' => 'opm-exchange',
'type' => 'direct',
'arguments' => [],
],
'demo.error' => [
'name' => 'demo.error',
'type' => 'direct',
'arguments' => [],
],
],
'queues' => [
'opm-mail' => [
'name' => 'opm-mail-queue',
'exchange' => 'opm-exchange',
'routing_keys' => [],
'arguments' => [
'x-dead-letter-exchange' => 'demo.error' // must be defined as exchange before
],
'bind_arguments' => [],
],
],
'connections' => [
'default' => [
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
'persistent' => true,
'read_timeout' => 1, //sec, float allowed
'write_timeout' => 1, //sec, float allowed
],
],
'producers' => [
'opm-producer' => [
'exchange' => 'opm-exchange',
'qos' => [
'prefetch_size' => 0,
'prefetch_count' => 10
],
'auto_setup_fabric' => true
],
],
'consumers' => [
'opm-consumer' => [
'queues' => [
'opm-mail'
],
'auto_setup_fabric' => true,
'callback' => 'mail',
'idle_timeout' => 10,
'logger' => 'consumer-logger',
'error_callback' => 'errorcallback'
],
],
'plugin_managers' => [
'callback' => [
'invokables' => [
'echo' => EchoCallback::class,
'error' => EchoErrorCallback::class,
'poweroftwo' => PowerOfTwoCallback::class,
'randomint' => RandomIntCallback::class,
'errorcallback' => ErrorCallback::class,
],
'factories' => [
'mail' => MailCallbackFactory::class
]
]
]
],
];
After migration my composer.json looks like that:
{
"require": {
"php": "~7.0",
"ext-amqp": ">=1.7.0",
"ext-zip": "*",
"ext-intl": "*",
"ext-mbstring": "*",
"ext-gd": "*",
"ext-xml": "*",
"ext-curl": "*",
"ext-bcmath": "*",
"roave/security-advisories": "dev-master",
"zendframework/zendframework": "^3.0",
"zendframework/zend-mvc-console": "^1.1",
"phpoffice/phpexcel": "dev-master",
"imagine/Imagine": "dev-master",
"php-amqplib/php-amqplib": "2.6.2",
"prolic/humus-amqp": "^1.1",
"prolic/humus-amqp-module": "^1.0",
...
}
}
Also in application.config.php
in modules
section added HumusAmqpModule
What I need to change to make everything work.
Thanks.
HI!
thanks for making a new Tag but could you please update the package on packagist ?
https://packagist.org/packages/prolic/humus-amqp-module
There is still no new version for your module...
Thanks :)
You could use an adapter pattern for instance to give the opportunity to use another client to communicate with RabbitMQ. Some people can not or don't want use the PHP AMQP extension. In my case, we use videlalvaro/php-amqplib
package for that.
https://github.com/prolic/HumusAmqpModule/blob/master/src/HumusAmqpModule/Consumer.php#L271
======================================================================
The application has thrown an exception!
======================================================================
AMQPException
Library error: a socket error occurred
----------------------------------------------------------------------
/usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php:271
#0 /usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php(271): AMQPQueue->get()
/usr/lib/composer/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Controller/ConsumerController.php(87): HumusAmqpModule\Consumer->consume(0)
#2 /usr/lib/composer/vendor/zendframework/zend-mvc/src/DispatchListener.php(114): HumusAmqpModule\Controller\ConsumerController->dispatch(Object(Zend\Console\Request), Object(Zend\Console\Response))
#3 [internal function]: Zend\Mvc\DispatchListener->onDispatch(Object(Zend\Mvc\MvcEvent))
#4 /usr/lib/composer/vendor/zendframework/zend-eventmanager/src/EventManager.php(490): call_user_func(Array, Object(Zend\Mvc\MvcEvent))
#5 /usr/lib/composer/vendor/zendframework/zend-eventmanager/src/EventManager.php(263): Zend\EventManager\EventManager->triggerListeners('dispatch', Object(Zend\Mvc\MvcEvent), Object(Closure))
#6 /usr/lib/composer/vendor/zendframework/zend-mvc/src/Application.php(340): Zend\EventManager\EventManager->triggerEventUntil(Object(Closure), Object(Zend\Mvc\MvcEvent))
#7 /var/www/public/index.php(21): Zend\Mvc\Application->run()
#8 {main}
======================================================================
Wondering if anything what could be causing this exception and if it should be encapsulated within a try/catch
Consumer-Interfaces for processing messages can be removed, a callable is enough.
In order to use any class and to maintain a clean code, will be useful to have a CallbackInterface to be implemented in any class.
It's better to implement events. The idea is to implement events on Consumer and RpcServer and use classes that implements ListenerAggregateInterface to handle events.
One channel is idle...why?
Hi,
thanks for provider this great module. We have implemented several rpc-server and consumer project with this module.
Recently, I got a situation, where i need to modify the success flag and message of response by a rpc-server, but not throw an exception within customized CallBack class. In my callback locate an input filter with validators. If the validation process failed, I want return an array of error messages and give an unsuccess flag to the rpc-client.
Can you make a feature, that in callback we not only affacts the result, but also the success flag?
Or do you have some idea, that I can do it better in the case above?
Thanks in advance
(rabbitmq extension)
Spotted on different servers
one
php -v
PHP 5.6.18 (cli) (built: Feb 3 2016 12:50:09)
Copyright (c) 1997-2016 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2016 Zend Technologies
with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2016, by Zend Technologies
two
php -v
PHP 5.6.18-1+deb.sury.org~wily+1 (cli)
Copyright (c) 1997-2016 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2016 Zend Technologies
with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2016, by Zend Technologies
Queues is registered in rabbit. Messages was added successful, but consumer fails.
Our command to run looks like this
/usr/bin/php /home/project/public/index.php humus amqp consumer reporting.history.cpm.consumer
strace output below
open("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/QueueFactoryInterface.php", O_RDONLY) = 7
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
mmap(NULL, 1355, PROT_READ, MAP_SHARED, 7, 0) = 0x7f9bb8e92000
stat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/QueueFactoryInterface.php", {st_mode=S_IFREG|0644, st_size=1355, ...}) = 0
fcntl(3, F_SETLKW, {type=F_WRLCK, whence=SEEK_SET, start=0, len=1}) = 0
fcntl(3, F_SETLK, {type=F_UNLCK, whence=SEEK_SET, start=0, len=1}) = 0
munmap(0x7f9bb8e92000, 1355) = 0
close(7) = 0
sendto(6, "\1\0\1\0\0\0!\0002\0\n\0\0\25reporting.history."..., 41, MSG_NOSIGNAL, NULL, 0) = 41
recvfrom(6, 0x2c672b0, 131072, 0, 0, 0) = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=6, events=POLLIN}], 1, -1) = 1 ([{fd=6, revents=POLLIN}])
recvfrom(6, "\1\0\1\0\0\0\"\0002\0\v\25reporting.history.cp"..., 131072, 0, NULL, NULL) = 42
sendto(6, "\1\0\1\0\0\0004\0002\0\24\0\0\25reporting.history."..., 60, MSG_NOSIGNAL, NULL, 0) = 60
recvfrom(6, 0x2c672b0, 131072, 0, 0, 0) = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=6, events=POLLIN}], 1, -1) = 1 ([{fd=6, revents=POLLIN}])
recvfrom(6, "\1\0\1\0\0\0\4\0002\0\25\316", 131072, 0, NULL, NULL) = 12
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module/src", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic/humus-amqp-module", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
lstat("/home/project/vendor/prolic", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
open("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", O_RDONLY) = 7
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fstat(7, {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
mmap(NULL, 11161, PROT_READ, MAP_SHARED, 7, 0) = 0x7f9ba072a000
stat("/home/project/vendor/prolic/humus-amqp-module/src/HumusAmqpModule/Consumer.php", {st_mode=S_IFREG|0644, st_size=11161, ...}) = 0
fcntl(3, F_SETLKW, {type=F_WRLCK, whence=SEEK_SET, start=0, len=1}) = 0
--- SIGSEGV {si_signo=SIGSEGV, si_code=SI_KERNEL, si_addr=0} ---
+++ killed by SIGSEGV (core dumped) +++
Segmentation fault (core dumped)
I'm currently implementing some functionality that relies on fetching a Domain service from inside the callback function. What is the appropriate way to get the service manager?
I've defined the invokable class inside of configuration as well as added the ServiceLocatorAwareInterface to my callback class. This is the typical approach that I would expect to work in order to be able to get access to the service manager; however, this does not appear to be the case.
Service "ExampleService" has been requested to plugin manager of type "HumusAmqpModule\PluginManager\Callback",
but couldn't be retrieved.
A previous exception of type "Zend\ServiceManager\Exception\ServiceNotFoundException" has been raised in the process.
By the way, a service with the name "ExampleService" has been found in the parent service locator "Zend\ServiceManager\ServiceManager": did you forget to use $parentLocator = $serviceLocator->getServiceLocator() in your factory code?
is the exception that is thrown.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.