@Component
class KafkaField {
@Autowired
@transient
var streamingContext: StreamingContext = _
def fieldTop20(task: Task) = {
val ssc = streamingContext
val topicsSet = task.getTopic.split(",").toSet
val kafkaParams = MapString, Object
//创建流,模板代码,参数中的两个String代表的是Kafka中的键值对的数据,即key和value
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
//本地策略
LocationStrategies.PreferConsistent,
//消费者策略
//允许订阅固定的主题集合
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//从kafka中将数据读出
// 获得这些行,将它们拆分为单词,数一数单词并打印出来
val lines = messages.map(_.value)
val list = new util.ArrayList[String]()
lines.foreachRDD(rdd => {
val tops: Array[String] = rdd.take(20)
for (i <- tops.indices) {
println(tops(i))
// list.add(tops(i))+"\n"
list.add(tops(i)) + "\t"
}
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
ssc.stop()
list
}
}