小鱼-简简单单 2016-05-05 03:49
浏览 4084

strom+kafka,strom会一直重复解析同条数据,有时解析不到数据

protected StormTopology buildTopology() {

// ConsumerConfig.

    ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);

// storm.kafka.KafkaSpout

    TridentTopology tridentTopology = new TridentTopology();
    TridentKafkaConfig spoutConf = new TridentKafkaConfig(zkHosts, topic);
    spoutConf.forceFromStart = true;// 和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true
    spoutConf.startOffsetTime = -1L;// -2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

// OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);

    TransactionalTridentKafkaSpout opaqueTridentKafkaSpout = new TransactionalTridentKafkaSpout(spoutConf);

    Stream opTrdtSptStream = tridentTopology.newStream("opTrdtSptStream", opaqueTridentKafkaSpout);
    opTrdtSptStream.shuffle().each(new Fields("str"), new Utils.FilterLogInfo()).each(new Fields("str"), new
            LogFilterUtil(), new Fields("analyzeResult")).each(new Fields("analyzeResult"),new PropertyAnalyzeUtils
            (),new Fields("autoItem")).each(new Fields("autoItem"), new AnsjIllegalWordsUtil
                    (), new Fields("ansjIllegalWordsResult")).each(new Fields("ansjIllegalWordsResult"), new
            AnsjKeyWordsUtil(), new Fields("ansjKeyWordsResult")).each(new Fields("ansjKeyWordsResult"), new
            AnsjPicturesUtil(), new Fields("ansjPicturesResult"));
    return tridentTopology.build();
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 HFSS 中的 H 场图与 MATLAB 中绘制的 B1 场 部分对应不上
    • ¥15 如何在scanpy上做差异基因和通路富集?
    • ¥20 关于#硬件工程#的问题,请各位专家解答!
    • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
    • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
    • ¥30 截图中的mathematics程序转换成matlab
    • ¥15 动力学代码报错,维度不匹配
    • ¥15 Power query添加列问题
    • ¥50 Kubernetes&Fission&Eleasticsearch
    • ¥15 報錯:Person is not mapped,如何解決?