dpdhf02040 2019-02-10 01:26
浏览 96
已采纳

转到GCP Cloud PubSub,而不是批量发布消息

I am working on a sample project that takes output from bigquery and publishes it to pubsub. The row output from bigquery could be >100,000. I saw there are options to batch publish and I've read in multiple places that 1k messages per batch is ideal. The issue I am running into is that for the life of me I can't get it to batch multiple messages and I think the solution is simple, but I'm missing how to do it..

Here is what I have right now and all it does is publish one message at a time.

func publish(client pubsub.Client, data []byte) (string, error) {
    ctx := context.Background()

    topic := client.Topic("topic-name")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000, // no matter what I put here it still sends one per publish
        // DelayThreshold: 1000 * time.Millisecond,
    }

    result := topic.Publish(ctx, &pubsub.Message{
        Data: data,
    })

    id, err := result.Get(ctx)
    if err != nil {
        return "", err
    }

    return id, nil
}

And this function is called by:

for _, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        id, err := publish(*pubsubClient, data)
        if err != nil {
            log.Printf("Unable to publish message: %s", data)
        }
        log.Printf("Published message with id: %s", id)
    }

Where qr is a slice of structs that contain the data returned from the bigquery query.

Now, is it due to how I am calling the function publish that makes each message get published and the topic.PublishSettings are being overwritten each method call so it forgets the previous messages? I'm at a loss here.

I saw some of the batch publishing code here: https://github.com/GoogleCloudPlatform/golang-samples/blob/master/pubsub/topics/main.go#L217

But they don't actually call it in their sample, so I can't tell how it should be done.

Side note and to prove my point further that it doesn't work, if I set the DelayThreshold in the topic.PublishSettings var to say, 1 second, it simply publishes one message every second, not all the messages that are supposed to be in memory.

Appreciate the help, thanks.

EDIT #1:

So going with kingkupps comment, I switched up the code to be this for testing purposes: (project and topic names switched from the real ones)

func QueryAndPublish(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()
    // setting up the pubsub client
    pubsubClient, err := pubsub.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get pubsub client: %v", err)
    }

    // init topic and settings for publishing 1000 messages in batch
    topic := pubsubClient.Topic("fake-topic")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000,
        // DelayThreshold: 1000 * time.Millisecond,
    }

    // bq set up
    bqClient, err := bigquery.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get bq client: %v", err)
    }
    // bq query function call
    qr, err := query(*bqClient)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Got query results, publishing now")

    // marshalling messages to json format
    messages := make([][]byte, len(qr))
    timeToMarshal := time.Now()
    for i, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        messages[i] = data
    }
    elapsedMarshal := time.Since(timeToMarshal).Nanoseconds() / 1000000
    log.Printf("Took %v ms to marshal %v messages", elapsedMarshal, len(messages))

    // publishing messages
    timeToPublish := time.Now()
    publishCount := 0
    for _, v := range messages {
        // ignore result, err from topic.Publish return, just publish
        topic.Publish(ctx, &pubsub.Message{
            Data: v,
        })
        publishCount++
    }
    elapsedPublish := time.Since(timeToPublish).Nanoseconds() / 1000000
    log.Printf("Took %v ms to publish %v messages", elapsedPublish, publishCount)

    fmt.Fprint(w, "Job completed")
}

What this does now is when my message count is 100,000 it will finish the publish calls in roughly 600ms but in the background, it will still be publishing one by one to the pubsub endpoint.

I can see this in both StackDriver and Wireshark where my messages/second in stackdriver is roughly 10-16/second and Wireshark is showing new connections per message sent.

  • 写回答

1条回答 默认 最新

  • dongyan1841 2019-02-11 19:38
    关注

    This is likely because when you call

    topic.PublishSettings = pubsub.PublishSettings{ // ByteThreshold: 5000, CountThreshold: 1000, // DelayThreshold: 1000 * time.Millisecond, }

    you are resetting the publish settings to a zero initialized struct. This sets topic.PublishSettings.ByteThreshold to 0, which means all messages will be immediately published; you told it to wait until it has 0 bytes, and it always has 0 bytes.

    Instead, you should do the following to set CountThreshold:

    topic.PublishSettings.CountThreshold = 1000

    The same applies to the other fields. They are already initialized to default values as described here, if you want to change them, modify them directly instead of reassigning the entire PublishSetttings object.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog