zero_open 2024-04-20 16:25 采纳率: 77.8%
浏览 12
已结题

elk+kafka+filebeat,增加第二个filebeat收集日志时报错

elk+kafka+filebeat
filebeat 只收集一台服务器kafka服务器的日志时没出现问题

kafka-topics.sh --bootstrap-server kafka:9092 --list
topic-kafka-controller-log
topic-kafka-messages-log
topic-kafka-run-log

topic-kafka-state-change-log
logstash没有报错,kibana web页面可以看到根据不同topic生成的不同索引

增加了一台服务使用filebeat收集日志kafka生成的topic

kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
topic-httpd-access-log
topic-httpd-messages-log
topic-httpd_error-log
topic-kafka-controller-log
topic-kafka-messages-log
topic-kafka-run-log
topic-kafka-state-change-log
topic-mysql-messages-log

但是
**kafka,收集到的topic只有一部分的topic能被logstash读取
只有带kafka的能被读取到logstash
httpd的没有被读取
在logstash中 **
当把

topics_pattern => "topic-*"
改为 topics_pattern =>"topic-httpd-access-log"

输出内容显示

[2024-04-20T16:14:51,762][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-1, groupId=logstash.*] Notifying assignor about the new Assignment(partitions=[])
[2024-04-20T16:14:51,762][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-1, groupId=logstash.*] Adding newly assigned partitions: 
[2024-04-20T16:14:51,803][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f1cbbbf4a97a41c1915d254d3250270c181e1bfa9098a0a76b8bb8f6a88bfece] [Consumer clientId=logstash-0, groupId=logstash.*] Setting offset for partition topic-httpd-access-log-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 0 rack: null)], epoch=0}}

logstash的配置如下

input {
  kafka {
    bootstrap_servers => "192.168.1.179:9092"
    topics_pattern => "topic.*"
    consumer_threads => 5
    decorate_events => true
    codec => plain { charset => "UTF-8" }
    auto_offset_reset => "latest"
    group_id => "logstash1"
  }
}

filter {
  # 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码
  # 注意:下面的代码可能需要调整,确保它符合您的具体需求
  #ruby {
  #  code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"
  #}

  mutate {
    remove_field => ["beat"]
  }

  grok {
    match => {
      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)"
    }
 overwrite => ["message"]
    tag_on_failure => ["_grokparsefailure"]
  }
  if "[@metadata][kafka][topic]" {
    mutate {
      add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
    }
  }
}

output {
  # 确保topic_name字段存在才发送到Elasticsearch
  if [topic_name] {
    elasticsearch {
      hosts => ["192.168.1.177:9200"]
      # 使用topic_name字段来构建索引名
      index => "%{topic_name}-%{+YYYY.MM.dd}"
    }
  }

  stdout {
    codec => rubydebug
  }
}


  • 写回答

4条回答 默认 最新

  • zero_open 2024-04-21 15:14
    关注

    不是logstash配置端的问题,是filebeat配置文件的问题,当存在多个filebeat实例的时候,

    需要在filebeat配置文件中增加一条参数
    filebeat.name: filebeat-apache:
    #Filebeat实例的名称,这有助于在日志和监控中识别它。每个单独的filebeat实例的名称必须不同

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 4月29日
  • 已采纳回答 4月21日
  • 创建了问题 4月20日