I am working on a sample project that takes output from bigquery and publishes it to pubsub. The row output from bigquery could be >100,000. I saw there are options to batch publish and I've read in multiple places that 1k messages per batch is ideal. The issue I am running into is that for the life of me I can't get it to batch multiple messages and I think the solution is simple, but I'm missing how to do it..
Here is what I have right now and all it does is publish one message at a time.
func publish(client pubsub.Client, data []byte) (string, error) {
ctx := context.Background()
topic := client.Topic("topic-name")
topic.PublishSettings = pubsub.PublishSettings{
// ByteThreshold: 5000,
CountThreshold: 1000, // no matter what I put here it still sends one per publish
// DelayThreshold: 1000 * time.Millisecond,
}
result := topic.Publish(ctx, &pubsub.Message{
Data: data,
})
id, err := result.Get(ctx)
if err != nil {
return "", err
}
return id, nil
}
And this function is called by:
for _, v := range qr {
data, err := json.Marshal(v)
if err != nil {
log.Printf("Unable to marshal %s", data)
continue
}
id, err := publish(*pubsubClient, data)
if err != nil {
log.Printf("Unable to publish message: %s", data)
}
log.Printf("Published message with id: %s", id)
}
Where qr is a slice of structs that contain the data returned from the bigquery query.
Now, is it due to how I am calling the function publish
that makes each message get published and the topic.PublishSettings
are being overwritten each method call so it forgets the previous messages? I'm at a loss here.
I saw some of the batch publishing code here: https://github.com/GoogleCloudPlatform/golang-samples/blob/master/pubsub/topics/main.go#L217
But they don't actually call it in their sample, so I can't tell how it should be done.
Side note and to prove my point further that it doesn't work, if I set the DelayThreshold
in the topic.PublishSettings
var to say, 1 second, it simply publishes one message every second, not all the messages that are supposed to be in memory.
Appreciate the help, thanks.
EDIT #1:
So going with kingkupps comment, I switched up the code to be this for testing purposes: (project and topic names switched from the real ones)
func QueryAndPublish(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
// setting up the pubsub client
pubsubClient, err := pubsub.NewClient(ctx, "fake-project-id")
if err != nil {
log.Fatalf("Unable to get pubsub client: %v", err)
}
// init topic and settings for publishing 1000 messages in batch
topic := pubsubClient.Topic("fake-topic")
topic.PublishSettings = pubsub.PublishSettings{
// ByteThreshold: 5000,
CountThreshold: 1000,
// DelayThreshold: 1000 * time.Millisecond,
}
// bq set up
bqClient, err := bigquery.NewClient(ctx, "fake-project-id")
if err != nil {
log.Fatalf("Unable to get bq client: %v", err)
}
// bq query function call
qr, err := query(*bqClient)
if err != nil {
log.Fatal(err)
}
log.Printf("Got query results, publishing now")
// marshalling messages to json format
messages := make([][]byte, len(qr))
timeToMarshal := time.Now()
for i, v := range qr {
data, err := json.Marshal(v)
if err != nil {
log.Printf("Unable to marshal %s", data)
continue
}
messages[i] = data
}
elapsedMarshal := time.Since(timeToMarshal).Nanoseconds() / 1000000
log.Printf("Took %v ms to marshal %v messages", elapsedMarshal, len(messages))
// publishing messages
timeToPublish := time.Now()
publishCount := 0
for _, v := range messages {
// ignore result, err from topic.Publish return, just publish
topic.Publish(ctx, &pubsub.Message{
Data: v,
})
publishCount++
}
elapsedPublish := time.Since(timeToPublish).Nanoseconds() / 1000000
log.Printf("Took %v ms to publish %v messages", elapsedPublish, publishCount)
fmt.Fprint(w, "Job completed")
}
What this does now is when my message count is 100,000 it will finish the publish calls in roughly 600ms but in the background, it will still be publishing one by one to the pubsub endpoint.
I can see this in both StackDriver and Wireshark where my messages/second in stackdriver is roughly 10-16/second and Wireshark is showing new connections per message sent.