dongzhang5787 2019-09-09 12:07
浏览 48
已采纳

转到SDK Apache Beam:单边输入Single int定义不正确

Using the Go SDK for Apache Beam, I'm trying to create a view of a PCollection using a side input.

But I'm getting this weird error:

Failed to execute job: on ctx=      making side input 0:
singleton side input Singleton for int ill-defined
exit status 1

Here the code I'm using:

// A PCollection of key/value pairs
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
        return r.DoRecommend, 1
    }, col)

// A PCollection of ints (demo)
pcollInts := beam.CreateList(s, [3]int{
        1, 2, 3,
})

// A PCollection of key/values pairs
summed := stats.SumPerKey(s, pairedWithOne)

// Here is where I'd like to use my side input.
mapped := beam.ParDo(s, func(k string, v int, side int, emit func(ratio 
models.RecommendRatio)) {
        var ratio = models.RecommendRatio{
            DoRecommend: k,
            NumVotes:    v,
        }

        emit(ratio)
    }, summed, beam.SideInput{Input: pcollInts})

I found this example on git:

// Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
//     words := ...
//     cufoff := ...  // Singleton PCollection<int>
//     smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
//           if len(word) < cutoff {
//                emit(word)
//           }
//     }, words, beam.SideInput{Input: cutoff})

update: It seems like the Impulse(scope) function has a role here but I cannot figure what. From GoDoc :

Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.

The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.

If this can help, here my structs:

type Review struct {
    Date        time.Time `csv:"date" json:"date"`
    DoRecommend string    `csv:"doRecommend" json:"doRecommend"`
    NumHelpful  int       `csv:"numHelpful" json:"numHelpful"`
    Rating      int       `csv:"rating" json:"rating"`
    Text        string    `csv:"text" json:"text"`
    Title       string    `csv:"title" json:"title"`
    Username    string    `csv:"username" json:"username"`
}

type RecommendRatio struct {
    DoRecommend string `json:"doRecommend"`
    NumVotes    int    `json:"numVotes"`
}

Any solution for this?

thanks

  • 写回答

1条回答 默认 最新

  • doujiayao8433 2019-09-09 18:17
    关注

    Update:

    This can be simplified by removing the beam.Impulse() function (I think the wrong type caused the trouble here):

    mapped := beam.ParDo(s,
            func(k string, v int,
                sideCounted int,
                emit func(ratio models.RecommendRatio)) {
    
                p := percent.PercentOf(v, sideCounted)
    
                emit(models.RecommendRatio{
                    DoRecommend: k,
                    NumVotes:    v,
                    Percent:     p,
                })
    
            }, summed,
            beam.SideInput{Input: counted})
    

    Old: Seems like I've found a solution, maybe just a workaround, looking for a quick review and open to room for improvements. (I believe that function isnt idempotent because if it may executed more than once on multiple node workers, the append() function will duplicate entries...)

    But the global idea here is to make a singleton PCollection of a []uint8 byte using beam.Impulse(scope) function and pass all the "real" data as a side inputs.

        // Pair each recommendation value with one -> PColl<KV<string, int>>
        pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
            return r.DoRecommend, 1
        }, col)
    
        // Sum num occurrences of a recommendation k/v pair
        summed := stats.SumPerKey(s, pairedWithOne)
    
        // Drop keys for latter global count
        droppedKey := beam.DropKey(s, pairedWithOne)
    
        // Count globally the number of recommendation values -> PColl<int>
        counted := stats.Sum(s, droppedKey)
    
        // Map to a struct with percentage per ratio
        mapped := beam.ParDo(s,
            func(_ []uint8,
                sideSummed func(k *string, v *int) bool,
                sideCounted int,
                emit func(ratio []models.RecommendRatio)) {
    
                var k string
                var v int
                var ratios []models.RecommendRatio
    
    
                for sideSummed(&k, &v) {
                    p := percent.PercentOf(v, sideCounted)
    
                    ratio := models.RecommendRatio{
                        DoRecommend: k,
                        NumVotes:    v,
                        Percent:     p,
                    }
    
                    ratios = append(ratios, ratio)
                }
    
                emit(ratios)
    
            }, beam.Impulse(s),
            beam.SideInput{Input: summed},
            beam.SideInput{Input: counted})
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行
  • ¥20 测距传感器数据手册i2c