I'm trying have this principle working:
- a producer that sends one message (1) and waits for ack which contains some result (json result of an operation, actually)
- a consumer that checks all pending messages every 5 seconds, and handle all of them in one row, and acknowlegdes all of them in one row, then wait again 5 seconds (infinite loop).
Here are the 30 lines of my stompproducer.php
:
<?php
function msg($txt)
{
echo date('H:i:s > ').$txt."
";
}
$queue = '/aaaa';
$msg = 'bar';
if (count($argv)<3) {
echo $argv[0]." [msg] [nb to send]
";
exit(1);
}
$msg = (string)$argv[1];
$to_send = intval($argv[2]);
try {
$stomp = new Stomp('tcp://localhost:61613');
while (--$to_send) {
msg("Sending...");
$result = $stomp->send(
$queue,
$msg." ". date("Y-m-d H:i:s"),
array('receipt' => 'message-123')
);
echo 'result='.var_export($result,true)."
";
msg("Done.");
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
Here are the 30 lines of my stompconsumer.php
:
<?php
$queue = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);
function msg($txt)
{
echo date('H:i:s > ').$txt."
";
}
try {
$stomp = new Stomp('tcp://localhost:61613');
$stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
$stomp->setReadTimeout(0, 10000);
while (true) {
$frames_read=array();
while ($stomp->hasFrame()) {
$frame = $stomp->readFrame();
if ($frame != null) {
array_push($frames_read, $frame);
}
if (count($frames_read)==40) {
break;
}
}
msg("Nombre de frames lues : ".count($frames_read));
msg("Pause...");
$e=$_waitTimer-(microtime(true)-$_timeLastAsk);
if ($e>0) {
usleep($e);
}
if (count($frames_read)>0) {
msg("Ack now...");
foreach ($frames_read as $frame) {
$stomp->ack($frame);
}
}
$_timeLastAsk = microtime(true);
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
I can't manage to do synchronous producer, ie producer that waits for consumer ack. If you run the samples I've done here, you'll see that producer instantaneously sends all messages, then quits, with all "true" like "ok" results when calling $stomp->send()
.
I still haven't found good examples, neither good documentation with a simple blocking sample.
What shall I do to make my producer blocking until the consumer sends its ack?
NB: I've read all documentation here and the stomp php questions on stackoverflow here and here.