elk+kafka+filebeat
filebeat 只收集一台服务器kafka服务器的日志时没出现问题
kafka-topics.sh --bootstrap-server kafka:9092 --list
topic-kafka-controller-log
topic-kafka-messages-log
topic-kafka-run-log
topic-kafka-state-change-log
logstash没有报错,kibana web页面可以看到根据不同topic生成的不同索引
增加了一台服务使用filebeat收集日志kafka生成的topic
kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
topic-httpd-access-log
topic-httpd-messages-log
topic-httpd_error-log
topic-kafka-controller-log
topic-kafka-messages-log
topic-kafka-run-log
topic-kafka-state-change-log
topic-mysql-messages-log
但是
**kafka,收集到的topic只有一部分的topic能被logstash读取
只有带kafka的能被读取到logstash
httpd的没有被读取
在logstash中 **
当把
topics_pattern => "topic-*"
改为 topics_pattern =>"topic-httpd-access-log"
输出内容显示
[2024-04-20T16:14:51,762][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-1, groupId=logstash.*] Notifying assignor about the new Assignment(partitions=[])
[2024-04-20T16:14:51,762][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-1, groupId=logstash.*] Adding newly assigned partitions:
[2024-04-20T16:14:51,803][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-0, groupId=logstash.*] Setting offset for partition topic-httpd-access-log-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 0 rack: null)], epoch=0}}
logstash的配置如下
input {
kafka {
bootstrap_servers => "192.168.1.179:9092"
topics_pattern => "topic.*"
consumer_threads => 5
decorate_events => true
codec => plain { charset => "UTF-8" }
auto_offset_reset => "latest"
group_id => "logstash1"
}
}
filter {
# 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码
# 注意:下面的代码可能需要调整,确保它符合您的具体需求
#ruby {
# code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"
#}
mutate {
remove_field => ["beat"]
}
grok {
match => {
"message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)"
}
overwrite => ["message"]
tag_on_failure => ["_grokparsefailure"]
}
if "[@metadata][kafka][topic]" {
mutate {
add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
}
}
}
output {
# 确保topic_name字段存在才发送到Elasticsearch
if [topic_name] {
elasticsearch {
hosts => ["192.168.1.177:9200"]
# 使用topic_name字段来构建索引名
index => "%{topic_name}-%{+YYYY.MM.dd}"
}
}
stdout {
codec => rubydebug
}
}