dpdhf02040
2019-02-10 01:26
浏览 93
已采纳

转到GCP Cloud PubSub,而不是批量发布消息

I am working on a sample project that takes output from bigquery and publishes it to pubsub. The row output from bigquery could be >100,000. I saw there are options to batch publish and I've read in multiple places that 1k messages per batch is ideal. The issue I am running into is that for the life of me I can't get it to batch multiple messages and I think the solution is simple, but I'm missing how to do it..

Here is what I have right now and all it does is publish one message at a time.

func publish(client pubsub.Client, data []byte) (string, error) {
    ctx := context.Background()

    topic := client.Topic("topic-name")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000, // no matter what I put here it still sends one per publish
        // DelayThreshold: 1000 * time.Millisecond,
    }

    result := topic.Publish(ctx, &pubsub.Message{
        Data: data,
    })

    id, err := result.Get(ctx)
    if err != nil {
        return "", err
    }

    return id, nil
}

And this function is called by:

for _, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        id, err := publish(*pubsubClient, data)
        if err != nil {
            log.Printf("Unable to publish message: %s", data)
        }
        log.Printf("Published message with id: %s", id)
    }

Where qr is a slice of structs that contain the data returned from the bigquery query.

Now, is it due to how I am calling the function publish that makes each message get published and the topic.PublishSettings are being overwritten each method call so it forgets the previous messages? I'm at a loss here.

I saw some of the batch publishing code here: https://github.com/GoogleCloudPlatform/golang-samples/blob/master/pubsub/topics/main.go#L217

But they don't actually call it in their sample, so I can't tell how it should be done.

Side note and to prove my point further that it doesn't work, if I set the DelayThreshold in the topic.PublishSettings var to say, 1 second, it simply publishes one message every second, not all the messages that are supposed to be in memory.

Appreciate the help, thanks.

EDIT #1:

So going with kingkupps comment, I switched up the code to be this for testing purposes: (project and topic names switched from the real ones)

func QueryAndPublish(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()
    // setting up the pubsub client
    pubsubClient, err := pubsub.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get pubsub client: %v", err)
    }

    // init topic and settings for publishing 1000 messages in batch
    topic := pubsubClient.Topic("fake-topic")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000,
        // DelayThreshold: 1000 * time.Millisecond,
    }

    // bq set up
    bqClient, err := bigquery.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get bq client: %v", err)
    }
    // bq query function call
    qr, err := query(*bqClient)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Got query results, publishing now")

    // marshalling messages to json format
    messages := make([][]byte, len(qr))
    timeToMarshal := time.Now()
    for i, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        messages[i] = data
    }
    elapsedMarshal := time.Since(timeToMarshal).Nanoseconds() / 1000000
    log.Printf("Took %v ms to marshal %v messages", elapsedMarshal, len(messages))

    // publishing messages
    timeToPublish := time.Now()
    publishCount := 0
    for _, v := range messages {
        // ignore result, err from topic.Publish return, just publish
        topic.Publish(ctx, &pubsub.Message{
            Data: v,
        })
        publishCount++
    }
    elapsedPublish := time.Since(timeToPublish).Nanoseconds() / 1000000
    log.Printf("Took %v ms to publish %v messages", elapsedPublish, publishCount)

    fmt.Fprint(w, "Job completed")
}

What this does now is when my message count is 100,000 it will finish the publish calls in roughly 600ms but in the background, it will still be publishing one by one to the pubsub endpoint.

I can see this in both StackDriver and Wireshark where my messages/second in stackdriver is roughly 10-16/second and Wireshark is showing new connections per message sent.

  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

1条回答 默认 最新

  • dongyan1841 2019-02-11 19:38
    已采纳

    This is likely because when you call

    topic.PublishSettings = pubsub.PublishSettings{ // ByteThreshold: 5000, CountThreshold: 1000, // DelayThreshold: 1000 * time.Millisecond, }

    you are resetting the publish settings to a zero initialized struct. This sets topic.PublishSettings.ByteThreshold to 0, which means all messages will be immediately published; you told it to wait until it has 0 bytes, and it always has 0 bytes.

    Instead, you should do the following to set CountThreshold:

    topic.PublishSettings.CountThreshold = 1000

    The same applies to the other fields. They are already initialized to default values as described here, if you want to change them, modify them directly instead of reassigning the entire PublishSetttings object.

    打赏 评论

相关推荐 更多相似问题