Try subscribing ActiveMQ(Apollo) using Go-Stomp, but I am having read timeout error. My app should be alive 24 hours per day to process incoming message.
Question :
- Is there a way to keep the subcription although there is no more message exist in the queue? Trying to put ConnOpt.HeartBeat also does not seems to work
- Why after the readTimeout, it seems that I still accept one more message ?
Below is my steps :
- I put 1000 messages for testing in the inputQueue
- Run a subscriber, code provided below
- Subscriber finished reading 1000 messages After 2-3 seconds, saw error " 2016/10/07 17:12:44 Subscription 1: /queue/hflc-in: ERROR message:read timeout".
- Put another 1000 messages, but it seems the subscription is already down, therefore no message is not being processed
My Code :
var(
serverAddr = flag.String("server", "10.92.10.10:61613", "STOMP server endpoint")
messageCount = flag.Int("count", 10, "Number of messages to send/receive")
inputQ = flag.String("inputq", "/queue/hflc-in", "Input queue")
)
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login("userid", "userpassword"),
stomp.ConnOpt.Host("mybroker"),
stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}
func main() {
flag.Parse()
jobschan := make(chan bean.Request, 10)
//my init setup
go getInput(1, jobschan)
}
func getInput(id int, jobschan chan bean.Request) {
conn, err := stomp.Dial("tcp", *serverAddr, options...)
if err != nil {
println("cannot connect to server", err.Error())
return
}
fmt.Printf("Connected %v
", id)
sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
if err != nil {
println("cannot subscribe to", *inputQ, err.Error())
return
}
fmt.Printf("Subscribed %v
", id)
var messageCount int
for {
msg := <-sub.C
//expectedText := fmt.Sprintf("Message #%d", i)
if msg != nil {
actualText := string(msg.Body)
var req bean.Request
if actualText != "SHUTDOWN" {
messageCount = messageCount + 1
var err2 = easyjson.Unmarshal([]byte(actualText), &req)
if err2 != nil {
log.Error("Unable unmarshall", zap.Error(err))
println("message body %v", msg.Body) // what is [0/0]0x0 ?
} else {
fmt.Printf("Subscriber %v received message, count %v
", id, messageCount)
jobschan <- req
}
} else {
logchan <- "got some issue"
}
}
}
}
Error :
2016/10/07 17:12:44 Subscription 1: /queue/hflc-in: ERROR message:read timeout
[E] 2016-10-07T09:12:44Z Unable unmarshall
message body %v [0/0]0x0