Comments (14)
That's exactly what I'm looking for too.
from laravel-amqp.
I'm using a command to run the "Consume messages forever" code like this:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Bschmitt\Amqp\Consumer;
use Bschmitt\Amqp\Facades\Amqp;
use MongoDB\Model\BSONDocument;
class ListenRabbit extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'queue:custom';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Catches Data from RabbitMQ';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
try {
Amqp::consume('queue_name', function ($message, $resolver) {
$data = $message->body;
// do things with data here
$resolver->acknowledge($message);
});
}catch(\PhpAmqpLib\Exception\AMQPTimeoutException $ex){
echo "Connection timed out. Restarting.";
}
}
}
and on the supervisor side, I create the command as php artisan queue:custom
from laravel-amqp.
@tpaksu thanks for posting this! Just activated the php syntax highlighting if you don't mind.
@marcstreeter @gutierrezps Does that answer your question?
I would also consider using Laravel Logging instead of just echo it out.
from laravel-amqp.
Hi @stevenklar , I'm using worker.log
file of supervisor
so I needed the echo
statement :) And one more thing, this way if I don't set this config:
'connect_options' => [
"read_write_timeout" => 120,
"keepalive" => true,
"heartbeat" => 60
],
the cpu usage gets high. Do you have any comments about it @stevenklar ?
from laravel-amqp.
@tpaksu Sorry. I have no idea.
Maybe you can ask this question at the underlying package.
There are some suggestion when searching for this issue with amqp in general. ĀÆ_(ć)_/ĀÆ
from laravel-amqp.
this is awesome, I'll give it a try. I'm currently bogged down with some other tasks at the moment, but this looks promising.
from laravel-amqp.
Ok @stevenklar thanks, I'll ask them.
from laravel-amqp.
Hi @tpaksu and @stevenklar, I've implemented @tpaksu 's code, but it doesn't keep consuming indefinitely.
public function handle()
{
$this->info('Queue worker started');
try {
Amqp::consume('queue_name', function ($message, $resolver) {
$data = $message->body;
// do things with data here
$this->line('[*] Message received: '.$data);
$resolver->acknowledge($message);
});
}catch(\PhpAmqpLib\Exception\AMQPTimeoutException $ex){
echo "Connection timed out. Restarting.";
}
$this->info('Queue worker stopped');
}
After executing the command, both start and stop messages are printed. Here's my config/amqp.php file:
<?php
return [
/*
|--------------------------------------------------------------------------
| Define which configuration should be used
|--------------------------------------------------------------------------
*/
'use' => 'production',
/*
|--------------------------------------------------------------------------
| AMQP properties separated by key
|--------------------------------------------------------------------------
*/
'properties' => [
'production' => [
'host' => 'localhost',
'port' => 5672,
'username' => 'my_username',
'password' => 'my_password',
'vhost' => '/',
'connect_options' => [],
'ssl_options' => [],
'exchange' => 'my_exchange',
'exchange_type' => 'direct',
'exchange_passive' => false,
'exchange_durable' => true,
'exchange_auto_delete' => false,
'exchange_internal' => false,
'exchange_nowait' => false,
'exchange_properties' => [],
'queue_force_declare' => false,
'queue_passive' => false,
'queue_durable' => true,
'queue_exclusive' => false,
'queue_auto_delete' => false,
'queue_nowait' => false,
'queue_properties' => ['x-ha-policy' => ['S', 'all']],
'consumer_tag' => '',
'consumer_no_local' => false,
'consumer_no_ack' => false,
'consumer_exclusive' => false,
'consumer_nowait' => false,
'timeout' => 0,
'persistent' => false,
],
],
];
Am I missing something? Since this didn't work, right now I'm following RabbitMQ's tutorial, using its receive.php as a base.
from laravel-amqp.
@gutierrezps are you using supervisor? Can you try the connect_options
options I shared in my next post?
from laravel-amqp.
@gutierrezps AFAIK the code I posted here checks the queue once and if it has messages inside, it'll process them until the queue becomes empty. If your queue is empty, it's normal to see both messages printed at once. I'm trying to keep the consumer open with the keep-alive option, and if it times out, supervisor restarts it.
from laravel-amqp.
Persistent should be true because of https://github.com/bschmitt/laravel-amqp/blob/master/src/Consumer.php#L32.
In case you have config parameter "persistent" equals false and there are no messages, it just stops.
I recommend read the Consumer.php consume method. It will explain you how it works. :)
EDIT: Also don't think too complicated. You can also add an additional "while" to your code, so supervisor doesn't need to restart all the time when not necessary.
from laravel-amqp.
@stevenklar yes I've first coded like that, but I had doubts about while(true)
regarding memory usage. But later I saw the while loops inside this package and php-amqplib package it uses then I started to think another way to do this more specific to amqp itself.
from laravel-amqp.
I assume this question is too old to keep open.
@tpaksu and @marcstreeter please reopen if you want to continue the question/discussion.
from laravel-amqp.
Hi! @tpaksu @stevenklar
I'm going to use this library in a software. I want to know if is recommended listening forever? do you have any performance issues on the php side?
My configuration is:
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Jobs\SubscribeCliente;
use Amqp;
class listeningSga extends Command
{
protected $signature = 'listening:sga';
protected $description = 'Command description';
public function __construct()
{
parent::__construct();
}
public function handle()
{
Amqp::consume('cliente', function ($message, $resolver) {
$resolver->acknowledge($message);
//logic to handle the data...
},[ 'exchange' => 'cliente', 'routing' => 'default' ]);
}
}
'properties' => [
'production' => [
'host' => 'localhost',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'vhost' => 'sga',
'connect_options' => [],
'ssl_options' => [],
'exchange' => 'amq.direct',
'exchange_type' => 'direct',
'exchange_passive' => false,
'exchange_durable' => true,
'exchange_auto_delete' => false,
'exchange_internal' => false,
'exchange_nowait' => false,
'exchange_properties' => [],
'queue_force_declare' => false,
'queue_passive' => false,
'queue_durable' => true,
'queue_exclusive' => false,
'queue_auto_delete' => false,
'queue_nowait' => false,
'queue_properties' => ['x-ha-policy' => ['S', 'all']],
'consumer_tag' => '',
'consumer_no_local' => false,
'consumer_no_ack' => false,
'consumer_exclusive' => false,
'consumer_nowait' => false,
'consumer_properties' => [],
'timeout' => 0,
'persistent' => true,
'publish_timeout' => 0, // Only applicable when a publish is marked as mandatory
'qos' => false,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_a_global' => false
],
],
from laravel-amqp.
Related Issues (20)
- Azure Service Bus - Support HOT 2
- Broken pipe or closed connection HOT 2
- Looking for help HOT 1
- PHP 8 support HOT 2
- AmazonMQ support HOT 1
- Target class [Amqp] does not exist. HOT 1
- Option for 'x-queue-type' HOT 1
- Configure RabbitMQ - Cluster HOT 1
- How to set initial durable for exchange HOT 1
- Non-static method 'consume' should not be called statically HOT 1
- PhpAmqpLib\Exception\AMQPInvalidFrameException: Invalid frame type 21 in /var/www/rabbit/php/app-rabbit/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php:606
- Unit Tests Schema update
- Support to set queue max length
- Wildcard Routing
- How to return the results via APi to the user when listening consume in the laravel controller HOT 1
- Publish multiple messages - Batch publish HOT 10
- [Question]open and close connections or channels every time when publish message HOT 1
- Publisher and Consumer should not be singletons HOT 2
- Consumer priority HOT 1
- Version 2.0.11 doesn't work on PHP bellow 7.4 version HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
š Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ššš
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ā¤ļø Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from laravel-amqp.