LittleEighteen 2024-08-12 16:43 采纳率: 0%
浏览 7

flink使用ES报错,如何解决?(语言-java)

flink中处理数据的时候,需要去ES里面获取一下数据,但是使用下面的代码报错client.performRequest(request)会报错,错误信息是thread waiting for the response was interrupted

  /**
     * 获取维保计划列表
     */
    public static ArrayList<Maintenance> getMaintenance() throws Exception {
        // 创建RestClientBuilder
        RestClientBuilder builder = RestClient.builder(new HttpHost(PropertyUtils.elasticsearchHost, Integer.valueOf(PropertyUtils.elasticsearchPort), "http"));
        // 获取RestClient实例
        RestClient client = builder.build();
        //创建一个搜索请求
        Request request = new Request("GET", PropertyUtils.elasticsearchName);
        request.addParameter("pretty", "true");
        // 执行搜索请求
        // 执行请求
        Response response = client.performRequest(request);
        ArrayList<Maintenance> maintenances = jointMaintenance(response);
        // 关闭RestClient实例
        client.close();
        return maintenances;

    }

  /**
     * 加载维保计划
     */
    public static void loadMaintenance() throws Exception {
   
        ArrayList<Maintenance> list = ElasticsearchUtils.getMaintenance();
   
        System.out.println("开始加载维保计划!!!");
        log.info("开始加载维保计划!!!");
        for (Maintenance maintenance : list) {
            loadMaintenance(maintenance);
        }
        System.out.println("维保计划加载完成!!!");
        log.info("维保计划加载完成!!!");
    }

但是不使用flink,直接main方法的时候就没有这个错误。目前的解决方式是创建一个线程去处理这个

  public static void main(String[] args) {
        try {
            ArrayList<Maintenance> maintenance = getMaintenance();
            System.out.println(maintenance);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


 @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 加载物模型-加载物模型与设备关系---项目启动就加载
        try {

            LoadModelFlink.loadModel();
            RuleEngineUtils.loadRuleEngine();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        MaintenanceUtils.loadMaintenance();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            MaintenanceTask.maintenanceTask();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

有什么其他的解决办法吗

  • 写回答

1条回答 默认 最新

  • 关注

    下午好🌅🌅🌅
    本答案参考ChatGPT-3.5

    这个问题可能是由于Request类内部有一个线程等待请求响应被中断造成的。在这种情况下,你可以尝试以下几种方法来解决问题:

    1. 使用非阻塞I/O流:使用Java 9引入的非阻塞I/O流(如AsyncSocket或SocketChannel)可以避免这个问题。通过这些流,你的代码可以在不阻塞主线程的情况下执行其他任务。

    2. 将请求发送到不同的线程:你可以将请求发送到不同的线程,每个线程处理一部分请求。这样,当某个线程遇到问题时,其他线程就可以继续处理请求。

    3. 使用FutureExecutorServiceFuture表示异步操作的结果,而ExecutorService则用于管理线程池。你可以使用FutureExecutorService将请求封装起来,并确保所有请求都按照预期的方式工作。

    下面是修改后的代码示例:

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
    import org.apache.flink.streaming.connectors.elasticsearch离线ElasticsearchSource;
    import org.apache.flink.streaming.connectors.elasticsearch.WriteMode;
    import org.apache.flink.streaming.connectors.elasticsearch.index.IndexResponse;
    import org.apache.flink.streaming.connectors.elasticsearch.index.mapper.RequestMappingMapper;
    import org.apache.flink.streaming.connectors.elasticsearch.index.query.Query;
    import org.apache.flink.streaming.connectors.elasticsearch.index.write.WriteToEsTask;
    import org.apache.flink.streaming.connectors.elasticsearch.mapreduce.SlaveToMapReduceConnection;
    
    public class FlinkGetMaintenance {
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 创建RestClientBuilder
            RestClientBuilder builder = new RestClientBuilder(new HttpHost(PropertyUtils.elasticsearchHost, Integer.valueOf(PropertyUtils.elasticsearchPort), "http"));
    
            // 获取RestClient实例
            RestClient client = builder.build();
    
            // 创建一个搜索请求
            Request request = new Request("GET", PropertyUtils.elasticsearchName);
    
            // 设置Query参数
            Query query = new Query.Builder().setField("product_name").build();
    
            // 设置Write模式
            WriteMode writeMode = WriteMode.OFF;
    
            // 调用WriteToEsTask执行查询
            WriteToEsTask task = new WriteToEsTask(client, EnvironmentEdgeManager.get EdgeId(), indexResponse -> {
                Query response = indexResponse.getMatchQuery(query);
                return tuple(new Tuple2<>(response徵数, response文字描述));
            }, writeMode);
    
            // 执行任务
            task.execute();
    
            // 关闭RestClient实例
            client.close();
        }
    }
    

    在这个例子中,我们首先创建了一个RestClientBuilder来构建HttpClient,然后从这个HttpClient中获取一个RestClient实例。然后,我们创建了一个搜索请求,设置了一个查询参数和写入模式。最后,我们调用了WriteToEsTask来执行查询,然后开始执行任务。

    请注意,你需要根据实际情况修改PropertyUtils.elasticsearchHost, PropertyUtils.elasticsearchPort, PropertyUtils.elasticsearchName等属性。

    评论

报告相同问题?

问题事件

  • 修改了问题 8月12日
  • 创建了问题 8月12日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?