downloads | documentation | faq | getting help | mailing lists | licenses | wiki | reporting bugs | php.net sites | conferences | my php.net

search for in the

AMQPQueue::declare> <AMQPQueue::__construct
[edit] Last updated: Fri, 17 May 2013

view this page in

AMQPQueue::consume

(PECL amqp >= Unknown)

AMQPQueue::consumeConsume mensajes de una cola

Descripción

public void AMQPQueue::consume ( callable $callback [, int $flags = AMQP_NOPARAM ] )

El bloqueo de la función que va a recuperar el mensaje siguiente de la cola cuando esté disponible y hacerlo pasar a la llamada de retorno.

Parámetros

callback

Una función de llamada de retorno para que el mensaje de consumo será pasada. La función debe aceptar como mínimo un parámetro, un objeto AMQPEnvelope, y un segundo parámetro opcional AMQPQueue de que el mensaje fue consumido.

The AMQPQueue::consume() will not return the processing thread back to the PHP script until the callback function returns FALSE.

flags

Una máscara de bits de cualquiera de las flags: AMQP_NOACK.

Errores/Excepciones

Lanza una AMQPChannelException si el canal no está abierto.

Lanza una AMQPConnectionException si se perdió la conexión con el corredor.

Valores devueltos

Ejemplos

Ejemplo #1 Ejemplo de AMQPQueue::consume()

<?php

/* Crea una conección utilizando todas las credenciales por defecto: */
$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

/* Crea un objeto de la cola */
$queue = new AMQPQueue($channel);

// Declara la cola
$queue->declare('myqueue');

$i 0;
function 
processMessage($envelope$queue) {
   global 
$i;
   echo 
"Message $i: " $envelope->getBody() . "\n";
   
$i++;
   if (
$i 10) {
       
// Bail after 10 messages
       
return false;
    }
}

// Consume los mensajes en la cola
$queue->consume("processMessage");

?>



AMQPQueue::declare> <AMQPQueue::__construct
[edit] Last updated: Fri, 17 May 2013
 
add a note add a note User Contributed Notes AMQPQueue::consume - [5 notes]
up
1
pinepain at gmail dot com
4 months ago
Be careful using this function with non-zero amqp.timeout (you may check at AMQPConnection::getTimeout), because it looks like timeout value says how long to wait for a new message from broker before die in way like

Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Resource temporarily unavailable' in /path/to/your/file.php:12
Stack trace:
#0  /path/to/your/file.php(12): AMQPQueue->consume(Object(Closure), 128)
#1 {main}
  thrown in /path/to/your/file.php on line 12

As for notes about blocking, system resources greediness and so and so, you can investigate how it works by looking in  amqp_queue.c  for read_message_from_channel C function declaration and PHP_METHOD(amqp_queue_class, consume) method declaration. For me it works perfectly without any uncommon resources usage or I/O performance degradation under the load of 10k 64b message per second with delivery time for less than 0.001 sec.

OS: FreeBSD *** 8.2-RELEASE FreeBSD 8.2-RELEASE #0: Sat Mar ****** 2011     root@*****:****  amd64
PHP: PHP Version => 5.3.10, Suhosin Patch 0.9.10, Zend Engine v2.3.0

php AMQP extnsion:

amqp

Version => 1.0.9
Revision => $Revision: 327551 $
Compiled => Dec 2* 2012 @ *****
AMQP protocol version => 0-9-1
librabbitmq version => 0.2.0

Directive => Local Value => Master Value
amqp.auto_ack => 0 => 0
amqp.host => localhost => localhost
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.timeout => 0 => 0
amqp.vhost => / => /

AMQP broker: RabbitMQ 3.0.1, Erlang R14B04

Definitely, such loop will block main thread, but due to single-thread PHP nature it's completely normal behavior. To exit this consumption loop your callback function or method (i prefer to use closures, btw) should return FALSE.

The benefit of this function is that you don't have manually iteration for all messages, and what is more important, if there is no unprocessed messages in queue it will wait for such for you.

So you have just to run you consumer (one or many) and optionally time to time check whether they still alive just for reason if you are not sure about callbacks or memory-limit-critical stuff
up
0
liuxiangchao at ometworks dot com
3 months ago
To consume ALL messages stored DURABLE exchanges, you will need to  set channel's prefetch size parameter to 0:

<?php $channel->setPrefetchCount(0); ?>
up
0
Laurent
8 months ago
Be careful using consume() function on AMQP. It will catch all Exception and fall down in infinite loop (message will not be marked as readed and reput in queue)
up
0
peter dot colclough at toolstation dot com
1 year ago
Using AMQP_consume, against a RabbitMQ server, actually stuffs memory. It will work in a loop, or on a constant recall, so long as your exchange/queue and messages are set to durable. However, it will alo make the system unusable within a couple of minutes.

Using get(), all is fine. I think this may be a bugette in teh PHP access code... ff to take a look.
up
0
hlopetz at gmail dot com
2 years ago
you shouldn't use AMQPQueue::consume() if you have to get _all_ incoming messages. you'll get only "max" number of messages and the queue will be destroyed then.

for my amqp.so v0.2.2 this weird behavior is true.

use AMQPQueue::get() and use "count" param instead.

 
show source | credits | sitemap | contact | advertising | mirror sites