doujie1908 2015-01-03 18:28
浏览 81
已采纳

Activemq和Php Stomp:同步生产者样本

I'm trying have this principle working:

  • a producer that sends one message (1) and waits for ack which contains some result (json result of an operation, actually)
  • a consumer that checks all pending messages every 5 seconds, and handle all of them in one row, and acknowlegdes all of them in one row, then wait again 5 seconds (infinite loop).

Here are the 30 lines of my stompproducer.php:

<?php

function msg($txt)
{
    echo date('H:i:s > ').$txt."
";
}

$queue  = '/aaaa';
$msg    = 'bar';
if (count($argv)<3) {
    echo $argv[0]." [msg] [nb to send]
";
    exit(1);
}
$msg     = (string)$argv[1];
$to_send = intval($argv[2]);

try {
    $stomp = new Stomp('tcp://localhost:61613');
    while (--$to_send) {
        msg("Sending...");
        $result = $stomp->send(
            $queue,
            $msg." ". date("Y-m-d H:i:s"),
            array('receipt' => 'message-123')
        );
        echo 'result='.var_export($result,true)."
";
        msg("Done.");
    }
} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

Here are the 30 lines of my stompconsumer.php:

<?php

$queue  = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);

function msg($txt)
{
    echo date('H:i:s > ').$txt."
";
}

try {
    $stomp = new Stomp('tcp://localhost:61613');
    $stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
    $stomp->setReadTimeout(0, 10000);
    while (true) {
        $frames_read=array();
        while ($stomp->hasFrame()) {
            $frame = $stomp->readFrame();
            if ($frame != null) {
                array_push($frames_read, $frame);
            }
            if (count($frames_read)==40) {
                break;
            }
        }
        msg("Nombre de frames lues : ".count($frames_read));
        msg("Pause...");
        $e=$_waitTimer-(microtime(true)-$_timeLastAsk);
        if ($e>0) {
            usleep($e);
        }
        if (count($frames_read)>0) {
            msg("Ack now...");
            foreach ($frames_read as $frame) {
                $stomp->ack($frame);
            }
        }
        $_timeLastAsk = microtime(true);
    }
} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

I can't manage to do synchronous producer, ie producer that waits for consumer ack. If you run the samples I've done here, you'll see that producer instantaneously sends all messages, then quits, with all "true" like "ok" results when calling $stomp->send(). I still haven't found good examples, neither good documentation with a simple blocking sample.

What shall I do to make my producer blocking until the consumer sends its ack?

NB: I've read all documentation here and the stomp php questions on stackoverflow here and here.

  • 写回答

3条回答 默认 最新

  • dtgvl48608 2016-03-10 13:09
    关注

    First thing to pop t my mind: Take a look at this stomp plugin:

    http://activemq.apache.org/message-redelivery-and-dlq-handling.html

    Another workaround I can thing of is: On producer side: 1. Change your producer to send persistent messages

    On your consumer side: Use a timer. 1. Read message/frames until empty or max cap reached. 2. Create a CURL request and empty packed list of messages 3. Sleep your server for 5 secs

    You definitely need to test this further, but should work. Once the process wakes up, you should be able to read all messages queued.

    Things to consider: - persistent messages will need an expiration time - You'll need ACK on your consumer side to make sure to update status of messages already attended. Use ACK=client so you can ACK all messages acknowledged - It's easier if you don't have to wait for your CURL to respond. - Out of the box, it's not supported to send ACK from the consumer (server side).

    Best of luck

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料
  • ¥15 使用R语言marginaleffects包进行边际效应图绘制
  • ¥20 usb设备兼容性问题
  • ¥15 错误(10048): “调用exui内部功能”库命令的参数“参数4”不能接受空数据。怎么解决啊
  • ¥15 安装svn网络有问题怎么办
  • ¥15 vue2登录调用后端接口如何实现