dpdhf02040 2019-02-10 01:26
浏览 96
已采纳

转到GCP Cloud PubSub,而不是批量发布消息

I am working on a sample project that takes output from bigquery and publishes it to pubsub. The row output from bigquery could be >100,000. I saw there are options to batch publish and I've read in multiple places that 1k messages per batch is ideal. The issue I am running into is that for the life of me I can't get it to batch multiple messages and I think the solution is simple, but I'm missing how to do it..

Here is what I have right now and all it does is publish one message at a time.

func publish(client pubsub.Client, data []byte) (string, error) {
    ctx := context.Background()

    topic := client.Topic("topic-name")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000, // no matter what I put here it still sends one per publish
        // DelayThreshold: 1000 * time.Millisecond,
    }

    result := topic.Publish(ctx, &pubsub.Message{
        Data: data,
    })

    id, err := result.Get(ctx)
    if err != nil {
        return "", err
    }

    return id, nil
}

And this function is called by:

for _, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        id, err := publish(*pubsubClient, data)
        if err != nil {
            log.Printf("Unable to publish message: %s", data)
        }
        log.Printf("Published message with id: %s", id)
    }

Where qr is a slice of structs that contain the data returned from the bigquery query.

Now, is it due to how I am calling the function publish that makes each message get published and the topic.PublishSettings are being overwritten each method call so it forgets the previous messages? I'm at a loss here.

I saw some of the batch publishing code here: https://github.com/GoogleCloudPlatform/golang-samples/blob/master/pubsub/topics/main.go#L217

But they don't actually call it in their sample, so I can't tell how it should be done.

Side note and to prove my point further that it doesn't work, if I set the DelayThreshold in the topic.PublishSettings var to say, 1 second, it simply publishes one message every second, not all the messages that are supposed to be in memory.

Appreciate the help, thanks.

EDIT #1:

So going with kingkupps comment, I switched up the code to be this for testing purposes: (project and topic names switched from the real ones)

func QueryAndPublish(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()
    // setting up the pubsub client
    pubsubClient, err := pubsub.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get pubsub client: %v", err)
    }

    // init topic and settings for publishing 1000 messages in batch
    topic := pubsubClient.Topic("fake-topic")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000,
        // DelayThreshold: 1000 * time.Millisecond,
    }

    // bq set up
    bqClient, err := bigquery.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get bq client: %v", err)
    }
    // bq query function call
    qr, err := query(*bqClient)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Got query results, publishing now")

    // marshalling messages to json format
    messages := make([][]byte, len(qr))
    timeToMarshal := time.Now()
    for i, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        messages[i] = data
    }
    elapsedMarshal := time.Since(timeToMarshal).Nanoseconds() / 1000000
    log.Printf("Took %v ms to marshal %v messages", elapsedMarshal, len(messages))

    // publishing messages
    timeToPublish := time.Now()
    publishCount := 0
    for _, v := range messages {
        // ignore result, err from topic.Publish return, just publish
        topic.Publish(ctx, &pubsub.Message{
            Data: v,
        })
        publishCount++
    }
    elapsedPublish := time.Since(timeToPublish).Nanoseconds() / 1000000
    log.Printf("Took %v ms to publish %v messages", elapsedPublish, publishCount)

    fmt.Fprint(w, "Job completed")
}

What this does now is when my message count is 100,000 it will finish the publish calls in roughly 600ms but in the background, it will still be publishing one by one to the pubsub endpoint.

I can see this in both StackDriver and Wireshark where my messages/second in stackdriver is roughly 10-16/second and Wireshark is showing new connections per message sent.

  • 写回答

1条回答 默认 最新

  • dongyan1841 2019-02-11 19:38
    关注

    This is likely because when you call

    topic.PublishSettings = pubsub.PublishSettings{ // ByteThreshold: 5000, CountThreshold: 1000, // DelayThreshold: 1000 * time.Millisecond, }

    you are resetting the publish settings to a zero initialized struct. This sets topic.PublishSettings.ByteThreshold to 0, which means all messages will be immediately published; you told it to wait until it has 0 bytes, and it always has 0 bytes.

    Instead, you should do the following to set CountThreshold:

    topic.PublishSettings.CountThreshold = 1000

    The same applies to the other fields. They are already initialized to default values as described here, if you want to change them, modify them directly instead of reassigning the entire PublishSetttings object.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 运筹学排序问题中的在线排序
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥30 python代码,帮调试,帮帮忙吧