qq_25651469 2020-03-11 21:03 采纳率: 44.4%
浏览 629

通过logstash将sql数据导入elasticSearch,但是es里面只有一行数据,求大神帮看看

es映射文件:

{
  "mappings": {
    "_doc": { 
      "properties": {
        "@timestamp" : {
          "type" : "date"
        },
        "orderNo": {
          "type": "keyword"
        },
        "comSubCode": {
          "type": "keyword"
        }
      }
    }
  }
}

sql:

SELECT
        order_main.order_no AS orderNo,
        order_main.com_sub_code AS comSubCode
FROM
        order_main

执行的sh脚本:

#!/bin/sh 
source ./gloable.sh



indexFile="my_index_company_order_report"
index="my_index_company_order_report$SUFFIX"
index_type="_doc"
index_data="my_index_company_order_report_data$SUFFIX"
document_id=%{id}

sed -i 's/$SUFFIX/'$SUFFIX'/g' $indexFile.json

# 删除一个索引
curl -XDELETE 'http://'$ES_IP':'$ES_PORT'/'$index'/'
# 根据指定的创建一个空索引
curl  -H "Content-Type: application/json" -XPUT  'http://'$ES_IP':'$ES_PORT'/'$index'' -d @''$indexFile''.json


# 导入数据
${LOGSTASH_BIN_DIR}/logstash -e ' input {
    jdbc {
        jdbc_driver_library =>"./mysql-connector-java-5.1.46-bin.jar"
        jdbc_driver_class =>"com.mysql.jdbc.Driver"
        jdbc_connection_string =>"'$JDBC_COMPANY_ORDER'"
        jdbc_user =>"'$JDBC_USER'"
        jdbc_password =>"'$JDBC_PWD'"
        lowercase_column_names => "false"
        statement_filepath => "./'$indexFile'.sql"
    }
}
output {
    elasticsearch {
        action => "index"
        hosts =>["'$ES_IP':'$ES_PORT'"]
        index =>"'$index'"
        document_type =>"'$index_type'"
        document_id => "'$document_id'" 
    }
}' --path.data ${LOGSTASH_DATA_DIR}/$index_data

控制台输出:

[deployer@cloudplatform-xian-yh29 create_es_index]$ ./my_index_company_order_report.sh
{"acknowledged":true}{"acknowledged":true,"shards_acknowledged":true,"index":"my_index_company_order_report"}Sending Logstash logs to /home/aiuap_jc/ELK6.5.3/logstash-6.5.3/logs which is now configured via log4j2.properties
[2020-03-11T20:46:27,206][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/aiuap_jc/ELK6.5.3/logstash-6.5.3/data/my_index_company_order_report_data/queue"}
[2020-03-11T20:46:27,213][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/aiuap_jc/ELK6.5.3/logstash-6.5.3/data/my_index_company_order_report_data/dead_letter_queue"}
[2020-03-11T20:46:27,635][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-11T20:46:27,647][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.5.3"}
[2020-03-11T20:46:27,683][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"da56bc0e-c6a7-4dd2-a6fc-3118bfe46fd1", :path=>"/home/aiuap_jc/ELK6.5.3/logstash-6.5.3/data/my_index_company_order_report_data/uuid"}
[2020-03-11T20:46:30,547][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch action=>"index", index=>"my_index_company_order_report", id=>"116914b44c341514850d4765364e8c26d59056377e90ff8c080aa5b0960c6cea", document_id=>"%{id}", hosts=>[//10.236.6.52:9200], document_type=>"company_order_doc", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_552f8ab0-92d5-43fd-8fb6-534dea21eaaa", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
[2020-03-11T20:46:30,604][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2020-03-11T20:46:31,104][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.236.6.52:9200/]}}
[2020-03-11T20:46:31,318][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.236.6.52:9200/"}
[2020-03-11T20:46:31,381][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2020-03-11T20:46:31,385][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2020-03-11T20:46:31,414][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.236.6.52:9200"]}
[2020-03-11T20:46:31,434][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2020-03-11T20:46:31,464][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2020-03-11T20:46:31,704][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xd856ba9 run>"}
[2020-03-11T20:46:31,754][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-03-11T20:46:32,045][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-03-11T20:46:33,034][INFO ][logstash.inputs.jdbc     ] (0.010293s) SELECT
        order_main.order_no AS orderNo,
        order_main.com_sub_code AS comSubCode
FROM
        order_main

[2020-03-11T20:46:34,427][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0xd856ba9 run>"}

控制台也没有报错,但是查询es只有一条数据

C:\Users\xiaolei>curl -X GET "http://10.236.6.52:9200/_sql" -H "Content-Type: application/json" -d "select * from my_ind
ex_company_order_report"
{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.
0,"hits":[{"_index":"my_index_company_order_report","_type":"company_order_doc","_id":"%{id}","_score":1.0,"_source":{"c
omSubCode":"sxmall_2019","orderNo":"4027","@timestamp":"2020-03-11T12:46:33.153Z","@version":"1"}}]}}

实际数据库是有多行数据的,求大神帮忙看看,在线等,给跪了

  • 写回答

1条回答 默认 最新

  • 晨光0703 2020-08-26 01:47
    关注

    logstash脚本写错了
    document_id => "'$document_id'"
    这里要改下,%{document_id}
    es当成字符串,每条数据都一样,所以认为是同一条数据。

    评论

报告相同问题?

悬赏问题

  • ¥15 自动转发微信群信息到另外一个微信群
  • ¥15 outlook无法配置成功
  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换