我的代码:
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=spooldir
agent.sources.s1.spoolDir=/tmp/logs/tomcat2kafka
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=222.30.194.254:9092
#设置Kafka的Topic
agent.sinks.k1.topic=kafkatest2
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
错误提示:
[ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
org.apache.kafka.common.errors.InterruptException: Flush interrupted.
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
... 4 more
网上是真没有相应的答案,无奈了,给分求助