Code Monkey home page Code Monkey logo

php-amqplib's Introduction

php-amqplib

PHPUnit tests Latest Version on Packagist Total Downloads Software License

codecov Coverage Status Quality Score

This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It's been tested against RabbitMQ.

The library was used for the PHP examples of RabbitMQ in Action and the official RabbitMQ tutorials.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

Project Maintainers

Thanks to videlalvaro and postalservice14 for creating php-amqplib.

The package is now maintained by Ramūnas Dronga, Luke Bakken and several VMware engineers working on RabbitMQ.

Supported RabbitMQ Versions

Starting with version 2.0 this library uses AMQP 0.9.1 by default and thus requires RabbitMQ 2.0 or later version. Usually server upgrades do not require any application code changes since the protocol changes very infrequently but please conduct your own testing before upgrading.

Supported RabbitMQ Extensions

Since the library uses AMQP 0.9.1 we added support for the following RabbitMQ extensions:

  • Exchange to Exchange Bindings
  • Basic Nack
  • Publisher Confirms
  • Consumer Cancel Notify

Extensions that modify existing methods like alternate exchanges are also supported.

Related libraries

  • enqueue/amqp-lib is a amqp interop compatible wrapper.

  • AMQProxy is a proxy library with connection and channel pooling/reusing. This allows for lower connection and channel churn when using php-amqplib, leading to less CPU usage of RabbitMQ.

Setup

Ensure you have composer installed, then run the following command:

$ composer require php-amqplib/php-amqplib

That will fetch the library and its dependencies inside your vendor folder. Then you can add the following to your .php files in order to use the library

require_once __DIR__.'/vendor/autoload.php';

Then you need to use the relevant classes, for example:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

Usage

With RabbitMQ running open two Terminals and on the first one execute the following commands to start the consumer:

$ cd php-amqplib/demo
$ php amqp_consumer.php

Then on the other Terminal do:

$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish

You should see the message arriving to the process on the other Terminal

Then to stop the consumer, send to it the quit message:

$ php amqp_publisher.php quit

If you need to listen to the sockets used to connect to RabbitMQ then see the example in the non blocking consumer.

$ php amqp_consumer_non_blocking.php

Change log

Please see CHANGELOG for more information what has changed recently.

API Documentation

http://php-amqplib.github.io/php-amqplib/

Tutorials

To not repeat ourselves, if you want to learn more about this library, please refer to the official RabbitMQ tutorials.

More Examples

  • amqp_ha_consumer.php: demos the use of mirrored queues.
  • amqp_consumer_exclusive.php and amqp_publisher_exclusive.php: demos fanout exchanges using exclusive queues.
  • amqp_consumer_fanout_{1,2}.php and amqp_publisher_fanout.php: demos fanout exchanges with named queues.
  • amqp_consumer_pcntl_heartbeat.php: demos signal-based heartbeat sender usage.
  • basic_get.php: demos obtaining messages from the queues by using the basic get AMQP call.

Multiple hosts connections

If you have a cluster of multiple nodes to which your application can connect, you can start a connection with an array of hosts. To do that you should use the create_connection static method.

For example:

$connection = AMQPStreamConnection::create_connection([
    ['host' => HOST1, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
    ['host' => HOST2, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
],
$options);

This code will try to connect to HOST1 first, and connect to HOST2 if the first connection fails. The method returns a connection object for the first successful connection. Should all connections fail it will throw the exception from the last connection attempt.

See demo/amqp_connect_multiple_hosts.php for more examples.

Batch Publishing

Let's say you have a process that generates a bunch of messages that are going to be published to the same exchange using the same routing_key and options like mandatory. Then you could make use of the batch_basic_publish library feature. You can batch messages like this:

$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);

$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);

and then send the batch like this:

$ch->publish_batch();

When do we publish the message batch?

Let's say our program needs to read from a file and then publish one message per line. Depending on the message size, you will have to decide when it's better to send the batch. You could send it every 50 messages, or every hundred. That's up to you.

Optimized Message Publishing

Another way to speed up your message publishing is by reusing the AMQPMessage message instances. You can create your new message like this:

$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);

Now let's say that while you want to change the message body for future messages, you will keep the same properties, that is, your messages will still be text/plain and the delivery_mode will still be AMQPMessage::DELIVERY_MODE_PERSISTENT. If you create a new AMQPMessage instance for every published message, then those properties would have to be re-encoded in the AMQP binary format. You could avoid all that by just reusing the AMQPMessage and then resetting the message body like this:

$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);

Truncating Large Messages

AMQP imposes no limit on the size of messages; if a very large message is received by a consumer, PHP's memory limit may be reached within the library before the callback passed to basic_consume is called.

To avoid this, you can call the method AMQPChannel::setBodySizeLimit(int $bytes) on your Channel instance. Body sizes exceeding this limit will be truncated, and delivered to your callback with a AMQPMessage::$is_truncated flag set to true. The property AMQPMessage::$body_size will reflect the true body size of a received message, which will be higher than strlen(AMQPMessage::getBody()) if the message has been truncated.

Note that all data above the limit is read from the AMQP Channel and immediately discarded, so there is no way to retrieve it within your callback. If you have another consumer which can handle messages with larger payloads, you can use basic_reject or basic_nack to tell the server (which still has a complete copy) to forward it to a Dead Letter Exchange.

By default, no truncation will occur. To disable truncation on a Channel that has had it enabled, pass 0 (or null) to AMQPChannel::setBodySizeLimit().

Connection recovery

Some RabbitMQ clients using automated connection recovery mechanisms to reconnect and recover channels and consumers in case of network errors.

Since this client is using a single-thread, you can set up connection recovery using exception handling mechanism.

Exceptions which might be thrown in case of connection errors:

PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException

Some other exceptions might be thrown, but connection can still be there. It's always a good idea to clean up an old connection when handling an exception before reconnecting.

For example, if you want to set up a recovering connection:

$connection = null;
$channel = null;
while(true){
    try {
        $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
        // Your application code goes here.
        do_something_with_connection($connection);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage();
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\RuntimeException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\ErrorException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    }
}

A full example is in demo/connection_recovery_consume.php.

This code will reconnect and retry the application code every time the exception occurs. Some exceptions can still be thrown and should not be handled as a part of reconnection process, because they might be application errors.

This approach makes sense mostly for consumer applications, producers will require some additional application code to avoid publishing the same message multiple times.

This was a simplest example, in a real-life application you might want to control retr count and maybe gracefully degrade wait time to reconnection.

You can find a more excessive example in #444

UNIX Signals

If you have installed PCNTL extension dispatching of signal will be handled when consumer is not processing message.

$pcntlHandler = function ($signal) {
    switch ($signal) {
        case \SIGTERM:
        case \SIGUSR1:
        case \SIGINT:
            // some stuff before stop consumer e.g. delete lock etc
            pcntl_signal($signal, SIG_DFL); // restore handler
            posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html
        case \SIGHUP:
            // some stuff to restart consumer
            break;
        default:
            // do nothing
    }
};

pcntl_signal(\SIGTERM, $pcntlHandler);
pcntl_signal(\SIGINT,  $pcntlHandler);
pcntl_signal(\SIGUSR1, $pcntlHandler);
pcntl_signal(\SIGHUP,  $pcntlHandler);

To disable this feature just define constant AMQP_WITHOUT_SIGNALS as true

<?php
define('AMQP_WITHOUT_SIGNALS', true);

... more code

Signal-based Heartbeat

If you have installed PCNTL extension and are using PHP 7.1 or greater, you can register a signal-based heartbeat sender.

<?php

$sender = new PCNTLHeartbeatSender($connection);
$sender->register();
... code
$sender->unregister();

Debugging

If you want to know what's going on at a protocol level then add the following constant to your code:

<?php
define('AMQP_DEBUG', true);

... more code

?>

Benchmarks

To run the publishing/consume benchmark type:

$ make benchmark

Tests

To successfully run the tests you need to first have a stock RabbitMQ broker running locally.Then, run tests like this:

$ make test

Contributing

Please see CONTRIBUTING for details.

Using AMQP 0.8

If you still want to use the old version of the protocol then you can do it by setting the following constant in your configuration code:

define('AMQP_PROTOCOL', '0.8');

The default value is '0.9.1'.

Providing your own autoloader

If for some reason you don't want to use composer, then you need to have an autoloader in place fo the library classes. People have reported to use this autoloader with success.

Original README:

Below is the original README file content. Credits goes to the original authors.

PHP library implementing Advanced Message Queuing Protocol (AMQP).

The library is port of python code of py-amqplib http://barryp.org/software/py-amqplib/

It have been tested with RabbitMQ server.

Project home page: http://code.google.com/p/php-amqplib/

For discussion, please join the group:

http://groups.google.com/group/php-amqplib-devel

For bug reports, please use bug tracking system at the project page.

Patches are very welcome!

Author: Vadim Zaliva [email protected]

php-amqplib's People

Contributors

actions-user avatar anton-siardziuk avatar brikou avatar cthos avatar ehberg avatar fprochazka avatar fsalehpour avatar hairyhum avatar i3bepb avatar ikulis avatar imsop avatar jvalduvieco avatar laurynasgadl avatar lkorczewski avatar lukebakken avatar mdrollette avatar mente avatar michaelklishin avatar nubeiro avatar postalservice14 avatar prolic avatar ramunasd avatar rcatlin avatar ricardclau avatar roihq avatar romainneutron avatar tutu-robot avatar videlalvaro avatar yozhef avatar znarf avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

php-amqplib's Issues

Message not published, AMPQConnection not closed

We're experiencing a strange behavior which is only happening on our staging server:

When a producer publishes a message to rabbitMQ, that message is never received by rabbitmq, the rabbitMQ log shows that a connection is being opened, but the connection is never closed.

When this is done from the CLI, the php process hangs forever, you need to kill it manually.

The weird thing is that consumers can connect successfully to rabbitMQ.

Any hints how to debug this problem?

ParameterNotFoundException

Hi there,

When I try to deploy my app using php-amqplib I have this error :

** [out :: prod] [Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException]
** [out :: prod] You have requested a non-existent parameter "amqp_host".

I double check the "app/config/parameters.ini" and the parameter amqp_host is well written (amqp_host="fully.qualified.domain.name").

Any idea ?

PS : I'm using the last version of both php-amqplib and RabbitMqBundle.

Can't publish message of size > 127 bytes

The following will send a message of 127 bytes:

php php-amqplib/demo/amqp_publisher.php "$(printf "%127s")"

But when you try to send a message of 128 bytes or more:

php php-amqplib/demo/amqp_publisher.php "$(printf "%128s")"

...it fails.

[Feature request] Lazy initialization of the connection

Currently, the connection reaches the RabbitMQ server as soon as you instantiate it (it is done in the constructor).
But this is a bad idea for people using dependency injection: their code producing messages will then connect to RabbitMQ each time even when their logic is actually conditional.
To be friendly with DI, you should not put logic in the constructor, so that instantiating the object does not become an heavy operation.

Auto close channels on connection close

If I understand things correctly channels in Rabbit are a way to multiplex connections. It's my understanding that a channel without a connection doesn't really make sense.

Here https://github.com/videlalvaro/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L49 there is definitely thought given to cleaning up channels.

I think the reason why the code never got written is that the Connection itself should iterate over its channel array and close them off, since it's the aggregate and has sufficient knowledge of lifetimes.

Error reading data. Recevived 0 instead of expected 1 bytes

Hi, when I try to run

videlalvaro/php-amqplib/demo/amqp_consumer.php

I've got the error after process completed:

PHP Fatal error:  Uncaught exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Error reading data. Received 0 instead of expected 1 bytes' in /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:50
Stack trace:
#0 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106): PhpAmqpLib\Wire\IO\StreamIO->read(1)
#1 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(145): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#2 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(236): PhpAmqpLib\Wire\AMQPReader->read_octet()
#3 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(264): PhpAmqpLib\Connection\AbstractConnection->wait_frame(0)
#4 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(111): PhpAmqpLib\Connection\AbstractConnection->wait_channel(1, 0)
#5 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(207): PhpAmqpLib\Channel\AbstractChannel->next_fram in /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 50

What's the problem? Help me please.

Error while running demo example

I am a newbie at working upon messaging. Also, my php skills are a bit rusty. So, this might be a no-brainer question !

I installed php-amqplib and attempted to run the demo examples. The PHP error that I see when I run amqp_consumer.php is :-

PHP Parse error: syntax error, unexpected T_STRING, expecting T_CONSTANT_ENCAPSED_STRING or '(' in /opt/rabbitmq_server-2.8.6/php-amqplib/demo/amqp_consumer.php on line 4

Parse error: syntax error, unexpected T_STRING, expecting T_CONSTANT_ENCAPSED_STRING or '(' in /opt/rabbitmq_server-2.8.6/php-amqplib/demo/amqp_consumer.php on line 4

This is what I have in my Linux box :-
PHP Version : PHP 5.1.6
Rabbitmq : v2.8.6

I am not sure of how to troubleshoot this. Any inputs on this, please ?

Thanx !

Registering multiple consumers on a channel where messages are already in some of them

I have 2 queues, q1 and q2. q1 already has some messages in it, q2 is empty. I register a consumer for each of them via basic_consume(), in the same script, on the same channel. In the RabbitMQ management console, both messages in q1 move to unacked state. I then call wait() on the channel. Nothing happens, the registered callbacks don't get called unless a new message is inserted into either q1 or q2. Also, if there already is a message waiting in q2 when the script starts, the callbacks get called for all the messages from q1 and q2.

It seems that the messages get delivered to the consumers as soon as a consumer is registered, but if something else but wait() is called next, the consumer callbacks don't get called for the previously delivered messages.

This issue can be worked around by calling flow(false) before and flow(true) after registering the consumers to prevent delivery of messages.

I don't know which is the intended behavior here, I couldn't find anything about this case on the web. If calling flow() is the proper way, it should be mentioned somewhere (example scripts, documentation, readme, ...). If it is not, then this is a bug in php-amqplib, so please fix it :)

support pseudo-simultaneous channel processing

The library is currently unable to simultaneously process messages for different channels on the same connection. E.g. say you want a worker that listens on a job message queue (consumer1) as well as a management queue (to receive messages that trigger it to reconfigure itself, shutdown, etc.) (consumer2) and you want to have each consumer use their own channel.

Such a setup is currently impossible due to the nature of AbstractConnection::wait_channel().

Ironically, it seems this can be easily mitigated by modifying that method to always call wait() on any messages for other channels:

    protected function wait_channel($channel_id, $timeout = 0)
    {
        while (true) {
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($timeout);
            if ($frame_channel == $channel_id) {
                return array($frame_type, $payload);
            }

            // we introduce a check if the channel still exists since we now have
            // potential for other code to interfere
            if (isset($this->channels[$frame_channel])) {
              // Not the channel we were looking for.  Queue this frame
              //for later, when the other channel is looking for frames.
              array_push($this->channels[$frame_channel]->frame_queue,
                         array($frame_type, $payload));

              // Always invoke a 'sub-wait()' causing the according channel to process
              // the just queued frame and call us again
              $this->channels[$frame_channel]->wait();
            }
        }
    }

This potentially results in a somewhat ugly call chain, but should be safe due to the frame- and method queueing mechanisms in AbstractChannel.

Of course in a larger refactoring effort this could be done a lot cleaner, but this solution should work without modifying AbstractChannel and hence doesn't require people to change their code.

AMQPSSLConnection overwrites passed $vhost variable

It seems to me that the AMQPSSLConnection class overwrites the passed $vhost variable when constructing an object of this class. With the current code, even when I pass a specific VHost (e.g. '/some_host'), php-amqplib will try to connect to the standard '/' VHost. As I don't allow any users (except for admin) to use the standard '/' VHost, the following error is written to the RabbitMQ log:

exception on TCP connection <0.18196.2> from "ip-adress:port"
{channel0_error,opening,
            {amqp_error,access_refused,
                        "access to vhost '/' refused for user 'username'",
                        'connection.open'}}

The problem lies in code line 15 of the AMQPSSLConnection class:

parent::__construct($host, $port, $user, $password, $vhost="/",
{...}

By replacing $vhost="/" with simply $vhost, the error can be fixed. Code:

parent::__construct($host, $port, $user, $password, $vhost,
{...}

This does not remove default behavior, as $vhost is already defaulted to '/' in the __construct paramters (line 9):

public function __construct($host, $port,
                          $user, $password,
                          $vhost="/", $ssl_options = array(), $options = array())
{...}

As mentioned above, this is a problem of SSL connections only.

Multiple queue send/receive at one runtime

Say, we need to send a message to queue1, then send another message to queue2 in one script runtime. But when I'm trying to do this (another queue_declare and queue_bind call), the message will be delivered to ALL customers, even though exchange type is DIRECT.

What am I doing wrong?

Consider adding re-connection handling

I'm wondering whether you have thought about adding some logic for handling re-connects?

Especially for longer running processes, there is always the possibility that the rabbitmq server is temporarily unavailable which would be good to handle a bit more gracefully than just throwing an exception.

Do you think such logic would belong to this library, or should rather be in a different place?

Handling database connections in long running consumers

I apologize if this is not really a amqplib specific question, but i guess this problem is a quite common among users of this library, so i thought it might be an appropriate place to ask.

Some of our amqp consumers create persistent database connections - in our case to a MySQL database. However, when the consumer is idle for a long time, the database will close the connection, and the next message received by the consumer will fail.

Are there any best practices how to tackle this problem when using php consumers?

why writes to error log instead of throwing an exception?

I guess I am confused, by looking at the AMQPReader::read_value(), why does it write to error log instead of throwing an exception? Throwing an exception helps developers catch the error immediately, while writing to error log requires developers to keep an eye on error log? Is there any particular reason?

public function read_value($fieldType)
    {
        $this->bitcount = $this->bits = 0;

        $val = NULL;
        switch($fieldType) {
            case 'S': // Long string
                $val = $this->read_longstr();
                break;
            case 'I': // Signed 32-bit
                $val = $this->read_signed_long();
                break;
            case 'D': // Decimal
                $e = $this->read_octet();
                $n = $this->read_signed_long();
                $val = new AMQPDecimal($n, $e);
                break;
            case 'T': // Timestamp
                $val = $this->read_timestamp();
                break;
            case 'F': // Table
                $val = $this->read_table();
                break;
            case 'A': // Array
                $val = $this->read_array();
                break;
            default:
                // UNKNOWN TYPE
                error_log("Usupported table field type $fieldType");
                break;
        }

        return $val;
    }

Reconsider non_blocking option of Channel::wait()

This option seems to be useless now, because actually blocking read is performed. Now with this option enabled wait() will return after something is received, not only when one of allowed methods is received. But if there is not data in the sockets, wait will not return ever even with this option enabled. IMO it is not an expected behaviour.

This option is introdiced here: https://github.com/videlalvaro/php-amqplib/pull/11/files, but it is not even used in new non_blocking example. Seems like it is refuse from unsuccessful experiment

Notices in PhpAmqpLib/Wire/IO/StreamIO.php during failed fwrite

Hey,

my PHP error log mentioned some notices in a PHP 5.5.1 environment on Ubuntu 12.04:

[15-Aug-2013 09:13:49 Europe/Berlin] PHP Notice:  fwrite(): send of 12 bytes failed with errno=104 Connection reset by peer in /var/application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61
[15-Aug-2013 09:13:49 Europe/Berlin] PHP Notice:  fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /var/application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61

This are only notices and all seems to be working as normal, but i think this has to be fixed. At the moment i do not know any kind of fix for this.
Maybe you got an idea.

Got an error when using php-amqplib with rabbitmq-bundle

my php-amqplib version is v2.1.0

I was running a command that send message to a queue. but I've got this error:

PHP Notice: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /var/www/www.test.com/shared/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61

List consumers

Is there a way to check the list of available consumers from the broker? something like this, but available from within php:

    rabbitmqctl list_consumers

vendor/autoload.php is missing

The vendor/autoload.php is missing on github, it's used in the demo and test.

Could some add this or tell me where I can get this?

Returning result of consumer callback

When I call $channel->wait() it calls consumer callback in such call chain:

return AbstractChannel->dispatch
return AMQPChannel->basic_deliver

'basic_deliver' just calls 'call_user_func', but it doesn't returns a result of that call.

It would be nice to add 'return' statement. Then it would be possible to write:

$callback_result = $channel->wait();

AMQPChannel.close() is hanging when connection is in blocking status

Hi,

RabbitMQ server puts connection into blocking state if memory watermark alarm is true. In this state RMQ blocks publishing and basic_publish() method returns false. But after this if I try to close channel it will be hanging while waiting ''channel.close_ok' method. Passing a timeout value to the wait method called in AMQPChannel.close() solves this problem but I am not sure whether it is a good idea or bad. The timeout which is set in the AMQPStreenConnection is not working.

AMQPConnection can hang in __destruct after a write() failed

I've seen some processes hanging in AMQPConnection->__destruct after a "Broken pipe or closed connection" exception was thrown by AMQPConnection->write.

The backtrace looks like this (from gdb, using zbacktrace from php's .gdbinit):

// hanging here:
[0x7fa4f4e4e618] fread(resource(#865), 1) vendor/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:55
[0x7fa4f4e4e480] PhpAmqpLib\Wire\AMQPReader->rawread(1) vendor/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:94
[0x7fa4f4e4e058] PhpAmqpLib\Wire\AMQPReader->read_octet() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:268 
[0x7fa4f4e4dd00] PhpAmqpLib\Connection\AMQPConnection->wait_frame() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:288 
[0x7fa4f4e4db20] PhpAmqpLib\Connection\AMQPConnection->wait_channel(0) vendor/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:143
[0x7fa4f4e4cc18] PhpAmqpLib\Channel\AbstractChannel->next_frame() vendor/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:240 
[0x7fa4f4e4c8b8] PhpAmqpLib\Channel\AbstractChannel->wait(array(1)[0x2eec498]) vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:338 
[0x7fa4f4e4c770] PhpAmqpLib\Connection\AMQPConnection->close() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:129
[0x7fff9a628f20] PhpAmqpLib\Connection\AMQPConnection->__destruct()

The resource is in open state, and not eof. netstat show that the underlying socket is in ESTABLISHED state on both side.

Quick fix is to disable close_on_destruct, however I'm not sure of the implications of that.

How to acknowledge that messages are consumed?

Hi,

I am playing with the code provided in demo/amqp_publisher.php and I have added "$ch->tx_select();" to the channel and I am sending a lot of messages followed by "$ch->tx_commit();" to make sure the messages are persisted on disk.

Then I use amqp_consumer.php to read all the messages from the queue. After the queue is empty and i restart the rabbitmq server the messages are back in the queue.

What is the proper way to acknowledge that this messages are consumed?

p.s. If a set $ch->tx_commit() in the consumer the queue never even reaches 0 message count.

NOT_IMPLEMENTED - prefetch_size!

When I use method of basic_qos, I get the following error

    ...->basic_qos(1, 1, false);
[PhpAmqpLib\Exception\AMQPProtocolConnectionException]  
  NOT_IMPLEMENTED - prefetch_size!=0 (1)  

Always allow to set a timeout

We ran into a problem with rabbitmq: due to a bug the broker occasionally wouldn't answer to tx.commit and amqpChannel->tx_commit() hangs indefinitly as the tx.commit-ok never arrives.
So we need a timeout for tx_commit and all other methods, which currently do not provide this parameter

Using amqp without composer

How would one go about using this with out composer, i am not using it in a project of mine and dont feel like adding it this close to the end of my project?

Additional AMQP data types

Are there plans to implement additional data types? In AMQPReader it supports 'S', 'I', 'D', 'T', 'F' and outputs an error for any other type. I am experimenting with RabbitMQ and am finding that the reader is unable to parse arrays (type='A'), which are present in messages that are sent to dead-letter exchanges.

I see that some other client libraries implement this, e.g. bodgit/bunny@49f5943

Ipv6 support

I've tried to connect to an ipv6 rabbitmq host and I get:

PhpAmqpLib\Exception\AMQPRuntimeException: Error Connecting to server(0): php_network_getaddresses: getaddrinfo failed: Name or service not known  (uncaught exception)

What does basic_get() return?

The phpdoc is not explicit, and I've scrolled through everything I could find on the web I can't be sure:

  • does it wait for a message, and then always return a message? if so, is there a way to set a timeout for the "wait"?
  • or can it return null? or a null response?

Once I know, I'll submit a PR with improved phodoc.

Thanks

Fatal on AMQPChannel->close() when sending mandatory message and no queue

When I send mandatory message and there is exchange, but there is no queue connected to the exchange I receive a

'Exception' with message 'Expecting AMQP method, received frame type: 2
when closing the channel.
Shouldn`t the exception be thrown on the point of sending the message?

According to the java http://www.rabbitmq.com/api-guide.html#returning there cat be ReturnListener, which will catch the error.
See also: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.return

$ php emit_return_error.php 
 [x] Sent non-mandatory ... done.
 [x] Sent mandatory ...  done.
PHP Fatal error:  Uncaught exception 'Exception' with message 'Expecting AMQP method, received frame type: 2' in ./lib/PhpAmqpLib/Channel/AbstractChannel.php:225
Stack trace:
#0 ./lib/PhpAmqpLib/Channel/AMQPChannel.php(116): PhpAmqpLib\Channel\AbstractChannel->wait(Array)
#1 ./emit_return_error.php(30): PhpAmqpLib\Channel\AMQPChannel->close()
#2 {main}
  thrown in ./lib/PhpAmqpLib/Channel/AbstractChannel.php on line 225

code to reproduce (emit_return_error.php) :

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPConnection;

require_once('autoload.php');

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// declare  exchange but don`t bind any queue
$channel->exchange_declare('hidden_exchange', 'topic');

$msg = new AMQPMessage("Hello World!");

echo " [x] Sent non-mandatory ...";
$channel->basic_publish($msg,
        'hidden_exchange',
        'rkey');
echo " done.\n";

echo " [x] Sent mandatory ... ";
$channel->basic_publish($msg,
        'hidden_exchange',
        'rkey',
        true );
echo " done.\n";

$channel->close();
$connection->close();

Problem width mbstring.internal_encoding and mbstring.func_overload

We have found a problem which causes the client library not work under some circumstances.

If the PHP INI settings below are set the the indicated values, the client library refuses to finish the handshake.

mbstring.internal_encoding => UTF-8
mbstring.func_overload => 2

It's most likely because to implement the binary protocol you take advantage of string functions excessively.
We got around the problem by callign ini_set each time we access the library, something like this:

$original_internal_encoding = ini_get( 'mbstring.internal_encoding' );
ini_set( 'mbstring.internal_encoding', null );

// Call the php-amqp

ini_set( 'mbstring.internal_encoding', $original_internal_encoding );

...but this is quite inconvenient. Do you think this problem could be addressed inside the library?

Thank you!

Differences from the PECL version.

I'm curious on the differences between this library and the PECL extension. It seems like there's a significant API overlap between the two.

It seems like a common pattern for pure PHP libraries to provide a fallback for the compiled versions if they exist. For instance, Zend_Json provides their custom interface but if a native JSON extensions exists, it uses that native library under the hood to increase performance. (I know that Zend Framework is old news, and that JSON is probably a much simpler API.)

I guess I have these questions:

  • Is there something wrong with the PECL version that makes a pure PHP version more compelling (beyond the obvious portability advantage of a pure PHP version)?
  • Are there any important performance differences between the two?
  • Is there something preventing the pure PHP version from falling back to the PECL version if it exists? It seems like a reasonable addition to support both portability and performance.

I'm in a position where I can install the PECL version (and I assume it's faster) but I'll end up having to use the pure PHP version because that is what the Symfony Bundle uses. I'm happy to be told I missed the documentation that addresses this.

Error "Expecting AMQP method, received frame type: 3"

I am using phpamqplib with OldSoundRabbitMqBundle and during start of consumer via CLI ./app/console rabbitmq:consumer -m 50 primary I get an error "Expecting AMQP method, received frame type: 3".

Not sure what is the reason for it to happen, but it looks it happens when query is not empty. Is it bundle specific problem or phpamqplib's ?

Here is the debug output:


< [hex]:
0000  41 4D 51 50 01 01 09 01                            AMQP.... 

waiting for 10,10
waiting for a new frame
> 10,10: Connection.start
Start from server, version: 8.0, properties: capabilities=(), copyright=Copyright (C) 2007-2012 VMware, Inc., information=Licensed under the MPL.  See http://www.rabbitmq.com/, platform=Erlang/OTP, product=RabbitMQ, version=2.8.4, mechanisms: PLAIN, AMQPLAIN, locales: en_US
< [hex]:
0000  01 00 00 00 00 00 76 00  0A 00 0B 00 00 00 38 07   ......v. ......8.
0010  6C 69 62 72 61 72 79 53  00 00 00 13 50 48 50 20   libraryS ....PHP 
0020  53 69 6D 70 6C 65 20 41  4D 51 50 20 6C 69 62 0F   Simple A MQP lib.
0030  6C 69 62 72 61 72 79 5F  76 65 72 73 69 6F 6E 53   library_ versionS
0040  00 00 00 03 30 2E 31 08  41 4D 51 50 4C 41 49 4E   ....0.1. AMQPLAIN
0050  00 00 00 23 05 4C 4F 47  49 4E 53 00 00 00 05 67   ...#.LOG INS....g
0060  75 65 73 74 08 50 41 53  53 57 4F 52 44 53 00 00   uest.PAS SWORDS..
0070  00 05 67 75 65 73 74 05  65 6E 5F 55 53 CE         ..guest. en_US?

< 10,11: Connection.start_ok
waiting for 10,20, 10,30
waiting for a new frame
> 10,30: Connection.tune
< [hex]:
0000  01 00 00 00 00 00 0C 00  0A 00 1F FF FF 00 02 00   ........ ...??...
0010  00 00 00 CE                                        ...?

< 10,31: Connection.tune_ok
< [hex]:
0000  01 00 00 00 00 00 08 00  0A 00 28 01 2F 00 00 CE   ........ ..(./..?

< 10,40: Connection.open
waiting for 10,41, 10,50
waiting for a new frame
> 10,41: Connection.open_ok
Open OK! known_hosts: 
using channel_id: 1
< [hex]:
0000  01 00 01 00 00 00 05 00  14 00 0A 00 CE            ........ ....?

< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000  01 00 01 00 00 00 1A 00  28 00 0A 00 00 07 70 72   ........ (.....pr
0010  69 6D 61 72 79 06 64 69  72 65 63 74 02 00 00 00   imary.di rect....
0020  00 CE                                              .?

< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000  01 00 01 00 00 00 13 00  32 00 0A 00 00 07 70 72   ........ 2.....pr
0010  69 6D 61 72 79 02 00 00  00 00 CE                  imary... ..?

< 50,10: Channel.queue_declare
waiting for 50,11
waiting for a new frame
> 50,11: Channel.queue_declare_ok
< [hex]:
0000  01 00 01 00 00 00 1C 00  32 00 14 00 00 07 70 72   ........ 2.....pr
0010  69 6D 61 72 79 07 70 72  69 6D 61 72 79 00 00 00   imary.pr imary...
0020  00 00 00 CE                                        ...?

< 50,20: Channel.queue_bind
waiting for 50,21
waiting for a new frame
> 50,21: Channel.queue_bind_ok
< [hex]:
0000  01 00 01 00 00 00 2A 00  3C 00 14 00 00 07 70 72   ......*. <.....pr
0010  69 6D 61 72 79 1A 50 48  50 50 52 4F 43 45 53 53   imary.PH PPROCESS
0020  5F 42 6F 62 2E 6C 6F 63  61 6C 5F 32 33 33 38 37   _Bob.loc al_23387
0030  00 CE                                              .?

< 60,20: Channel.basic_consume
waiting for 60,21
waiting for a new frame
> 60,21: Channel.basic_consume_ok
waiting for any method
waiting for a new frame
> 60,60: Channel.basic_deliver
waiting for a new frame
waiting for a new frame
using channel_id: 2
< [hex]:
0000  01 00 02 00 00 00 05 00  14 00 0A 00 CE            ........ ....?

< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000  01 00 02 00 00 00 1A 00  28 00 0A 00 00 07 74 77   ........ (.....tw
0010  69 74 74 65 72 06 64 69  72 65 63 74 02 00 00 00   itter.di rect....
0020  00 CE                                              .?

< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000  01 00 02 00 00 00 10 00  3C 00 28 00 00 07 74 77   ........ <.(...tw
0010  69 74 74 65 72 00 00 CE                            itter..? 

< 60,40: Channel.basic_publish
< [hex]:
0000  02 00 02 00 00 00 1A 00  3C 00 00 00 00 00 00 00   ........ <.......
0010  00 00 32 90 00 0A 74 65  78 74 2F 70 6C 61 69 6E   ..2?..te xt/plain
0020  02 CE                                              .?

< [hex]:
0000  03 00 02 00 00 00 32 7B  22 69 6D 61 67 65 5F 69   ......2{ "image_i
0010  64 22 3A 34 33 35 31 36  2C 22 74 61 73 6B 22 3A   d":43516 ,"task":
0020  22 66 69 6E 64 5F 74 61  67 73 5F 61 6E 64 5F 6D   "find_ta gs_and_m
0030  65 6E 74 69 6F 6E 73 22  7D CE                     entions" }?

using channel_id: 3
< [hex]:
0000  01 00 03 00 00 00 05 00  14 00 0A 00 CE            ........ ....?

< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000  01 00 03 00 00 00 1C 00  28 00 0A 00 00 09 64 65   ........ (.....de
0010  6C 69 63 69 6F 75 73 06  64 69 72 65 63 74 02 00   licious. direct..
0020  00 00 00 CE                                        ...?

< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000  01 00 03 00 00 00 12 00  3C 00 28 00 00 09 64 65   ........ <.(...de
0010  6C 69 63 69 6F 75 73 00  00 CE                     licious. .?

< 60,40: Channel.basic_publish
< [hex]:
0000  02 00 03 00 00 00 1A 00  3C 00 00 00 00 00 00 00   ........ <.......
0010  00 00 32 90 00 0A 74 65  78 74 2F 70 6C 61 69 6E   ..2?..te xt/plain
0020  02 CE                                              .?

< [hex]:
0000  03 00 03 00 00 00 32 7B  22 69 6D 61 67 65 5F 69   ......2{ "image_i
0010  64 22 3A 34 33 35 31 36  2C 22 74 61 73 6B 22 3A   d":43516 ,"task":
0020  22 66 69 6E 64 5F 74 61  67 73 5F 61 6E 64 5F 6D   "find_ta gs_and_m
0030  65 6E 74 69 6F 6E 73 22  7D CE                     entions" }?

< [hex]:
0000  01 00 01 00 00 00 0D 00  3C 00 50 00 00 00 00 00   ........ <.P.....
0010  00 00 01 00 CE                                     ....?

< 60,80: Channel.basic_ack
waiting for any method
waiting for a new frame



  [Exception]                                    
  Expecting AMQP method, received frame type: 3  

How to publish a durable message ?

Reading Python tutos, we can see that both queue and message has to be configured as durable if we don't want to loose messages if RabbitMQ crashes.
http://www.rabbitmq.com/tutorials/tutorial-two-python.html

Python basic_publish has a delivery_mode property

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

which not exists on php-amqplib

public function basic_publish($msg, $exchange="", $routing_key="",
                                  $mandatory=false, $immediate=false,
                                  $ticket=NULL)

Is there a way to publish a durable message with php-amqplib ?

Channel lose connection after having thrown an error : PHP Fatal error: Call to a member function send_channel_method_frame()

Hello,

Whenever a queue_declare fails, the connection is lost.
For example in the above code, the part inside the catch always fails because the connection is closed

<?php

require __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Exception\AMQPException;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

try {
    // declare a queue with the passive bit.
    // It must return a 404 code if the queue does not exists or a 406 if it exists with a different settings
    $channel->queue_declare('pretty.queue', true, true);
} catch (AMQPException $e) {
    if ($e->getCode() == 406) {
        // precondition failed, I delete it to recreate it with the correct settings
        $channel->queue_delete('pretty.queue');
        $channel->queue_declare('pretty.queue', false, true);
    } elseif ($e->getCode() == 404) {
        // queue do not exist, I create it
        $channel->queue_declare('pretty.queue', false, true);
    }
    $channel->queue_declare('pretty.queue', true, true);
}

results in :

PHP Fatal error:  Call to a member function send_channel_method_frame() on a non-object in /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php on line 148
PHP Stack trace:
PHP   1. {main}() /tmp/test.php:0
PHP   2. PhpAmqpLib\Channel\AMQPChannel->queue_declare() /tmp/test.php:18
PHP   3. PhpAmqpLib\Channel\AbstractChannel->send_method_frame() /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:373

Fatal error: Call to a member function send_channel_method_frame() on a non-object in /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php on line 148

Call Stack:
    0.0002     246392   1. {main}() /tmp/test.php:0
    0.0125    1122240   2. PhpAmqpLib\Channel\AMQPChannel->queue_declare() /tmp/test.php:18
    0.0126    1125536   3. PhpAmqpLib\Channel\AbstractChannel->send_method_frame() /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:373

The only fix I found is to create a new channel before using it like this :

<?php

require __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Exception\AMQPException;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

try {
    // declare a queue with the passive bit.
    // It must return a 404 code if the queue does not exists or a 406 if it exists with a different settings
    $channel->queue_declare('pretty.queue', true, true);
} catch (AMQPException $e) {
    if ($e->getCode() == 406) {
        // precondition failed, I delete it to recreate it with the correct settings
        $channel = $connection->channel();
        $channel->queue_delete('pretty.queue');
        $channel->queue_declare('pretty.queue', false, true);
    } elseif ($e->getCode() == 404) {
        // queue do not exist, I create it
        $channel = $connection->channel();
        $channel->queue_declare('pretty.queue', false, true);
    }
    $channel->queue_declare('pretty.queue', true, true);
}

Add support for publisher confirmations

It's a pretty useful feature, and my initial dig-through the codebase doesn't show me how I can implement this. The basics would be as follows (I think):

<?php
// Add the following to `AMQPChannel`
  function confirm_select($nowait = true, $callback = null):
    '''
    Set this channel to use publisher confirmations.
    '''
    $nowait = $nowait && $this->allow_nowait() && !$callback;

    $this->_msg_id = 0;
    $this->_last_ack_id = 0;
    list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);

    $this->send_method_frame(array($class_id, $method_id), $args);

    if (!$nowait) {
      $this->select_callbacks[] = $callback;
      $this->wait(
        $this->waitHelper->get_wait('confirm.select_ok')
      )
    }
  }

  function confirm_select_ok($method_frame) {
    $callback = array_pop($this->select_callbacks);
    $callback();
  }

I think this is sort of what is needed, though I am not quite sure whether the callback stuff is correct. I translated the code from here, if it matters.

I'd be happy to work through this with anyone who has any idea as to how this works!

Exception when running demo

oren@oren-laptop:~/vendor/videlalvaro/php-amqplib/demo$ php amqp_consumer.php
< [hex]:
0000 41 4D 51 50 01 01 09 01 AMQP....

waiting for 10,10
waiting for a new frame

10,10: Connection.start
Start from server, version: 8.0, properties: capabilities=(), copyright=Copyright (C) 2007-2012 VMware, Inc., information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, product=RabbitMQ, version=2.8.7, mechanisms: PLAIN, AMQPLAIN, locales: en_US
< [hex]:
0000 01 00 00 00 00 00 A2 00 0A 00 0B 00 00 00 38 07 ......�. ......8.
0010 6C 69 62 72 61 72 79 53 00 00 00 13 50 48 50 20 libraryS ....PHP
0020 53 69 6D 70 6C 65 20 41 4D 51 50 20 6C 69 62 0F Simple A MQP lib.
0030 6C 69 62 72 61 72 79 5F 76 65 72 73 69 6F 6E 53 library_ versionS
0040 00 00 00 03 30 2E 31 08 41 4D 51 50 4C 41 49 4E ....0.1. AMQPLAIN
0050 00 00 00 4F 05 4C 4F 47 49 4E 53 00 00 00 16 61 ...O.LOG INS....a
0060 70 70 31 30 35 35 30 30 35 31 5F 68 65 72 6F 6B pp105500 51_herok
0070 75 2E 63 6F 6D 08 50 41 53 53 57 4F 52 44 53 00 u.com.PA SSWORDS.
0080 00 00 20 74 66 38 47 41 4E 51 46 76 36 4D 6B 39 .. tf8GA NQFv6Mk9
0090 63 33 32 72 4D 5A 56 6A 6E 75 35 35 5F 48 55 36 c32rMZVj nu55_HU6
00A0 33 42 56 05 65 6E 5F 55 53 CE 3BV.en_U S

< 10,11: Connection.start_ok
waiting for 10,20, 10,30
waiting for a new frame

10,30: Connection.tune
< [hex]:
0000 01 00 00 00 00 00 0C 00 0A 00 1F FF FF 00 02 00 ........ ...��...
0010 00 00 00 CE ...

< 10,31: Connection.tune_ok
< [hex]:
0000 01 00 00 00 00 00 1E 00 0A 00 28 17 2F 61 70 70 ........ ..(./app
0010 31 30 35 35 30 30 35 31 5F 68 65 72 6F 6B 75 2E 10550051 _heroku.
0020 63 6F 6D 00 00 CE com..

< 10,40: Connection.open
waiting for 10,41, 10,50
waiting for a new frame
PHP Fatal error: Uncaught exception 'Exception' with message 'Error reading data. Received 0 instead of expected 1 bytes' in /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:61
Stack trace:
#0 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(94): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#1 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(268): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(288): PhpAmqpLib\Connection\AMQPConnection->wait_frame()
#3 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(143): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(240): PhpAmqpLib\Channel\AbstractChannel->next_frame()
#5 /home/oren/workspac in /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 61

How to avoid deadlocks for failed workers

I don't think there's a mailing list for the library, so i'm misusing the issue tracker for a question...

As far as i understood rabbitmq will send failed messages to newly available workers when a worker dies and doesn't send back an acknowledgment that the message has been processed.

Are there any strategies how to avoid deadlocks which could freeze a whole queue, for example if a worker transcodes a video file and the underlying ffmpeg process fails for a specific file.

Then rabbitmq would send the message to a next worker which would also fail and so on.

Any best practices on that problem?

Warning: fwrite() expects parameter 1 to be resource, null given

Since today i've got a very strange error. We are using this lib together with Symfony 2.
This happens while opening the connection to the RabbitMQ server. This is the part of my controller:

$rabbitmqHost = $this->container->getParameter("rabbitmq_host");
$rabbitmqPort = $this->container->getParameter("rabbitmq_port");
$rabbitmqUser = $this->container->getParameter("rabbitmq_user");
$rabbitmqPassword = $this->container->getParameter("rabbitmq_password");
$rabbitmqVHost = $this->container->getParameter("rabbitmq_vhost");
$rabbitmqExchange = $this->container->getParameter("rabbitmq_exchange");

$this->conn = new AMQPConnection($rabbitmqHost, $rabbitmqPort, $rabbitmqUser, $rabbitmqPassword);
$this->vVhost = $rabbitmqVHost;
$this->exchange = $rabbitmqExchange;

The messaging works without problems. I can send und receive messages from the server, but this message appears in the error log from the webserver:

PHP Fatal error: Uncaught exception 'Symfony\Component\Debug\Exception\ContextErrorException' with message 'Warning: fwrite() expects parameter 1 to be resource, null given in PhpAmqpLib/Wire/IO/StreamIO.php line 61' in PhpAmqpLib/Wire/IO/StreamIO.php:61
Stack trace:
#0 [internal function]: Symfony\Component\Debug\ErrorHandler->handle(2, 'fwrite() expect...', '', 61, Array)
#1 PhpAmqpLib/Wire/IO/StreamIO.php(61): fwrite(NULL, '??????????2????...')
#2 PhpAmqpLib/Connection/AbstractConnection.php(173): PhpAmqpLib\Wire\IO\StreamIO->write('??????????2????...')
#3 PhpAmqpLib/Connection/AbstractConnection.php(258): PhpAmqpLib\Connection\AbstractConnection- in PhpAmqpLib/Wire/IO/StreamIO.php on line 61

Add application_header to message for basic_reject

Is there a way to modify the message properties like application_headers when rejecting a message using basic_reject? I'm trying to add a stack trace to the message before it's put into a dead letter queue for easier debugging.

Right now i've tried something along these lines, but there's no additional properties when retrieving the message in the rabbitmq backend:

<?php
$message->set('application_headers', array('exception' => 'some exception info'));
$message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], false);

nack fail

Hey, I was trying to use nack, and I've got an error:

PHP Notice:  fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /.../vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php on line 164

The sample of my php code:

public function process(AMQPMessage $message)
{
        // ...

        if ($condition) {
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        } else {
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, false);
        }
 }

Do I miss something ?
When I use reject, all is okay.

Change visibility on debug

Can you please change $debug in AbstractChannel to be public, or provide a setter so that debug can be enabled and disabled by callers instead of defining a constant?

Strange performance difference between Linux and Windows

This might probably not be a problem with php-amqlib directly, but I would appreciate if someone else could also test it, so we can pinpoint the problem, inform people and maybe even help to find a solution.
It might be some issue with Virtualbox, PHP itself or RabbitMQ, I have no Idea yet...

So, doing some development and performance testing, I noticed a strange phenomenon .

The throughput of messages in an RPC-Queue on a windows machine was about 650msg/s, while on the two tested Linux Virtual machines it wouldn't go higher than 25msg/s with the same scripts, same PHP and same RabbitMQ-Server(3.1.4 (Windows and Centos), (3.1.5 on Fedora))!!! Erlang is R16B01 on Windows and Centos and R16B02 on Fedora

That said, I must confess that the Linux machines are on a VirtualBox inside my Windows development box.
However, I/O performance on both Linux machines is very good.
The Host CPU is a i2600K with 16GB DDR3 1866Mhz RAM, and the CPU doesn't even reach 10% during highest load of the tests (Full throughput on Windows).

Linuxes get 4GB each, but don't actually use more than 1.5GB

The first Linux where I discovered it, was a fully updated Centos 6.4.
After that I quickly set up a Fedora 19 with a 3.10 kernel, just to be sure, there isn't some problem with the Centos Kernel (polling, etc).

So this should rule out any Linux Kernel related problem. Network buffers are big enough, file descriptors are virtually unused in this case, so I don't think we're yet at the point where we should be doing micro optimizations. These are normal installations and should provide decent throughput, even without doing heavy optimizations to the OS.

Unfortunately at this time I don't have a real Linux machine to test it, in order to exclude the possibility of Virtualbox being the problem.

Does anyone of you guys have a real box with Linux, Rabbit and php-amqlib to do a few tests?

I have included some profiling into AbstractConnection.php and done a 20000 messages loop in the producer. Also, in order to have a clean profiling, I have remarked the fib calculations and just return whatever the producer sent.

I have put the modified AbstractConnection.php, rpc-consumer.php and rpc-producer.php in this gist: https://gist.github.com/frankmayer/6264596, so there's not a lot to prepare for the tests. You might need to change the autoloader path in the producer/consumer though.

So, here's the profiling output of the Producer on Windows and after that, on Linux:

Notice that, in both Windows and Linux, the first wait_frame always takes longer, which is because the first fread() takes longer than the subsequent ones.

But the big difference is that it takes only about 1,5 to 2 ms on windows while it takes around 40ms on Linux, which is a big difference. This amount in every roundtrip is the problem that makes the difference of 650msg/s to 25msg/s throughput.

Producer on Windows:

[.] AbstractConnection::wait_frame() Got response in: 0.0020 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0005 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0020 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30

Producer on Linux:

 [.] AbstractConnection::wait_frame() Got response in: 0.0411 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0404 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0416 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0411 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0402 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0404 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0412 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0414 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0403 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0405 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0402 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0403 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30
 [.] AbstractConnection::wait_frame() Got response in: 0.0412 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
 [.] Got 30

One more note: the throughput on windows echoing this profiling, went down to 450 msg/s while on Linux it stayed at 25msg/s.

When profiling these kind of scripts, it's important to not have the XDebug extension loaded at all, as it will hugely impact performance, even in its disabled state. Read my blog post on this exact matter, here: http://bit.ly/14SaWpp

Thanks for helping and please let me know if you need any additional info.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.