2 yuxr0798 yuxr0798 于 2016.05.05 11:49 提问

strom+kafka,strom会一直重复解析同条数据,有时解析不到数据
protected StormTopology buildTopology() {

// ConsumerConfig.

    ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);

// storm.kafka.KafkaSpout

    TridentTopology tridentTopology = new TridentTopology();
    TridentKafkaConfig spoutConf = new TridentKafkaConfig(zkHosts, topic);
    spoutConf.forceFromStart = true;// 和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true
    spoutConf.startOffsetTime = -1L;// -2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

// OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);

    TransactionalTridentKafkaSpout opaqueTridentKafkaSpout = new TransactionalTridentKafkaSpout(spoutConf);

    Stream opTrdtSptStream = tridentTopology.newStream("opTrdtSptStream", opaqueTridentKafkaSpout);
    opTrdtSptStream.shuffle().each(new Fields("str"), new Utils.FilterLogInfo()).each(new Fields("str"), new
            LogFilterUtil(), new Fields("analyzeResult")).each(new Fields("analyzeResult"),new PropertyAnalyzeUtils
            (),new Fields("autoItem")).each(new Fields("autoItem"), new AnsjIllegalWordsUtil
                    (), new Fields("ansjIllegalWordsResult")).each(new Fields("ansjIllegalWordsResult"), new
            AnsjKeyWordsUtil(), new Fields("ansjKeyWordsResult")).each(new Fields("ansjKeyWordsResult"), new
            AnsjPicturesUtil(), new Fields("ansjPicturesResult"));
    return tridentTopology.build();
}

1个回答

qq_34890612
qq_34890612   2016.05.05 12:50
Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!