多线程向flink集群提交任务失败

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Could not upload the program's JAR files to the JobManager.
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
... 14 common frames omitted
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not upload the program's JAR files to the JobManager.
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:410)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 19 common frames omitted
Caused by: java.io.IOException: Could not retrieve the JobManager's blob port.
at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:745)
at org.apache.flink.runtime.jobgraph.JobGraph.uploadUserJars(JobGraph.java:565)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:407)
... 20 common frames omitted
Caused by: java.io.IOException: PUT operation failed: Could not transfer error message
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:512)
at org.apache.flink.runtime.blob.BlobClient.put(BlobClient.java:374)
at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:771)
at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:740)
... 22 common frames omitted
Caused by: java.io.IOException: Could not transfer error message
at org.apache.flink.runtime.blob.BlobClient.readExceptionFromStream(BlobClient.java:799)
at org.apache.flink.runtime.blob.BlobClient.receivePutResponseAndCompare(BlobClient.java:537)
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:508)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.ipc.RemoteException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at org.apache.flink.runtime.blob.BlobClient.readExceptionFromStream(BlobClient.java:795)
... 27 common frames omitted

0

2个回答

自己回复一下 对于flink提交任务时 会将任务对应的jar文件上传至远程主机(如何上传因集群部署方式不同而不同),最终存储到hdfs上,然后taskmanager会去hdfs上下载此文件。
上传文件时,会生成对应的文件名,而文件名是根据jar包的字节码生成的(极端的说,即便jar包对应的源代码中多了一个空格,生成的文件名都不会相同)。
所以,同一个jar会生成同样的文件名,而它又在同样的路径中,这时就会出现多线程对同一文件读写,典型的多线程访问同一资源的问题。这也就是导致上述问题的根源。

0

多线程提交10个任务,其中大部分成功,部分失败。以上是失败的报错信息。我对着源码看了,问题出在向jobclient类中:当blobClient.put(is)时
(其中is为jar对应的inputstream),使用一个与blob server连接的socket向集群提交jar。
提交完成后会用这个socket获取返回数据:final InputStream is = this.socket.getInputStream();,然而这个返回的状态码为1,其含义为:Internal code to identify an erroneous operation.。然后又调用readExceptionFromStream方法解析报错信息,这个解析报错的方法也报错了: java.lang.ClassNotFoundException: org.apache.hadoop.ipc.RemoteException。有没有大神指点一下,给指个方向。感谢!!!

0
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
flink任务提交
flink安装启动后,默认端口为8081。 浏览器启动 http://ip:8081,可以进入可视化界面。可以看到正在运行的任务及状态。 若要提交新的任务,点击“Submit new Job” -- “Add New+”,上传编译好的jar包工程 点击“upload”,等待上传完成:   点击新上传的jar包,可进行提交操作,"submit":   提交后可看到任务运行及...
Flink提交任务至yarn
在flink on yarn模式中,flink yarn-session的两种提交方式 1.公用一个yarn-session 在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。 2.每个job提供一个yarn-session     每次提交都会创建一个新的flink集群,任务之间互相独立,互不影...
flink从安装到提交任务
本操作全部在Windows下环境进行操作,linux环境下更为简单 下载: flink这东西安装要启动local模式还是比较简单的,首先从官网上下载 http://flink.apache.org/downloads.html   我下载的是最新版本1.1.3, 解压flink: 下载完后对其进行解压,我解压所在的目录在D:\flink-1.1.3-bin-hadoop26-sca
Flink运行时之客户端提交作业图-上
客户端提交作业图作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。本篇我们将分析客户端如何提交JobGraph给JobManager。
Flink提交作业的两种方式
抛砖引玉: 在Spark集群提交作业时候可以使用--deploy参数指定client或者cluster方式提交作业到集群,前者是客户端模式,后者是集群模式,两者主要区别就是Driver的运行位置,在客户端模式下,Driver运行在提交作业的客户端机器上负责与集群进行资源申请调度等工作。而集群模式下Driver运行在集群中的某一个节点上负责资源申请以及调度。 一般的,客户端模式适合程序的调试,这
Flink任务提交模式
local模式本地运行,不需要集群环境IDE开发时,local模式方便本地测试standalone需要搭建flink集群提交命令flink run -m artemis-02:6123 -c com.test.WordCount2 ./Flink.jar hdfs://artemis-02:9000/tmp/lvxw/tmp/logs/words hdfs://artemis-02:9000/t...
第一次向线上提交flink任务遇到的一堆坑
flink版本 flink1.7.0_scala2.12.tar.gz 本项目包括内容 1.从kafka消费,结果写入kafka 2.中间有部分数据存入redis 问题描述 开始我没有注意scala版本问题,在pom中flink用的1.7.0,scala用的2.11.0,idea设置的scala环境也是2.11.0 在本地编译运行完全正常,然后使用build artifa...
Flink运行时之客户端提交作业图-下
submitJob方法分析JobClientActor通过向JobManager的Actor发送SubmitJob消息来提交Job,JobManager接收到消息对象之后,构建一个JobInfo对象以封装Job的基本信息
Apache Flink流作业提交流程分析
提交流程调用的关键方法链用户编写的程序逻辑需要提交给Flink才能得到执行。本文来探讨一下客户程序如何提交给Flink。鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行。
Flink 代码方式提交程序到远程集群运行
在学习Flink时候,看到如下方法,可以获取到远程集群上的一个ExecutionEnvironment实例,便尝试使用一下,将本地IDE作业提交到集群运行,代码如下: def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment 代码: pack
flink集群(standalone模式)简单操作
1、关键配置文件flink-conf.yaml ##配置master节点ip jobmanager.rpc.address: 192.168.1.100 ##配置slave节点可用内存,单位MB taskmanager.heap.mb: 25600 ##配置每个节点的可用slot,1 核CPU对应 1 slot ##the number of available CPUs per machi...
Flink集群没有异常信息,但不工作
1 现象:taskmanager和jobManager都有输出消息:Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /tmp/jna6879710861309389413.tmp which might have disabled stack guard. The VM will try to fix the s...
Flink提交运行中常见问题总结
Flink提交运行中常见问题总结 一、问题1 1.问题描述: 提交jar到Flink集群时候出现: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.viewfs.ViewFileSystem could not be instantiated 可...
Standalone模式两种提交任务方式
第一种方式Standalone-client提交任务方式 提交命令-方式1: ./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000 提交命令-方式2:  ./spark-su...
flink命令行参数
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。
flink集群上运行job问题
现象:在web上提交任务后报:org.apache.flink.client.program.ProgramInvocationException: Unknown I/O error while extracting contained jar files. at org.apache.flink.client.program.PackagedProgram.extractContainedL...
Flink提交任务(第一篇)——打包任务程序
文章目录1. 引言2. buildProgram的执行逻辑2.1. 如果没有指定入口类名称,那么就从jar包里找出来2.2. 提取任务jar文件中所有依赖的JAR包2.3. 通过入口类名称来真正加载执行入口类 针对flink-1.7-release 1. 引言 Flink客户端通过命令行的形式提交任务时,即执行./flink run xxxx.jar时,真正执行的run逻辑如下: protecte...
Flink任务提交给Yarn,为何web界面只有一个Taskmanager?
使用yarn-session在yarn集群上启动一个flink集群: ./yarn-session.sh -n 4 -jm 1024 -tm 1024 -s 2 此时,在master:8088的界面看到多了一个application,但是running containers数量为1;【为何不是分配的4】 然后,继续在yarn上提交flink的jar包运行: ./flink run -p...
Flink提交任务(总篇)——执行逻辑整体分析
Flink客户端提交任务执行的逻辑分析 针对Flink1.7-release版本 前言 Flink的源码体系比较庞大,一头扎进去,很容易一头雾水,不知道从哪部分代码看起。但是如果结合我们的业务开发,有针对性地去跟进源码去发现问题,理解源码里的执行细节,效果会更好。 笔者在近期的Flink开发过程中,因为产品的原因,只允许部署Flink standalone模式,出于性能考虑,很有必要对其性能做下测...
对Flink集群进行远程调试
导读:在学习或者使用各个大数据框架的时候,往往想对runtime层次的代码进行调试或者跟踪阅读,但其往往部署在其他机器上,因此需要进行远程调试。本文以Flink为例,介绍如何通过IDEA进行Flink的runtime层次的远程调试。 环境:   Flink1.4.2   IntelliJ IDEA   虚拟机模拟的Flink集群 先对Flink的作业提交过程进行简单介绍(以批处理为例)...
提交Flink的jar包到Yarn上,web界面log报错
在hadoop环境下运行flink的batch程序: ./flink run /home/hadoop/proj/wordcount.jar --input hdfs://mycluster/flinkProjectData/111.txt 一段时间后,flink的web界面上,taskmanager和jobmanager下的logs分别出现error: taskmanager: ER...
Flink开发中遇到的问题及解法
1. 当Source是Kafka的时候,如何设置Source Operator的并发度?如果没有指定,Source Operator的个数与集群中的TaskManager的个数相等。如果手动设置,建议使用的slot个数=Kafka Partition的个数/TaskManager的个数。此时,Slot的个数需大于等于2.因为其中有一个Source Operator。也不建议在一个Slot中启用多线...
flink填坑之jar包另一种打包方式
最近玩flink真的是疯掉了,新手想要打包maven工程又出问题了,找不到terminal,mvn命令不认识,又不存在goal,直接换思路网上找到另一种打包方式
如何向hadoop集群定时提交一个jar作业?
除了使用Hive,Pig来执行一个MapReduce任务,不需要专门的把项目打成jar包,提交执行,或者定时执行,因为Hive,Pig这些开源框架已经,帮我们自动打包上传了。 而有些时候,我们自己手写了一个MapReduce的任务,然后这个任务里面依赖了其他的第三方包,比如说是Lucene的,或者是Solr,ElasticSearch的,那么打包的时候就需要将这些依赖的jar包也一起的打...
从Storm到Flink:大数据处理的开源系统及编程模型(文末福利)
本文节选自CCF大数据教材系列丛书之《大数据处理》,本书由华中科技大学金海教授主编,包括大数据处理基础技术、大数据处理编程与典型应用处理、大数据处理系统与优化三个方面。本...
Spark集群任务提交
1. 集群管理器 Spark当前支持三种集群管理方式 Standalone—Spark自带的一种集群管理方式,易于构建集群。 Apache Mesos—通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用。 Hadoop YARN—Hadoop2中的资源管理器。 Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求
flink部署操作-flink standalone集群安装部署
flink集群安装部署   standalone集群模式   必须依赖 必须的软件 JAVA_HOME配置 flink安装 配置flink 启动flink 添加Jobmanager/taskmanager 实例到集群 个人真实环境实践安装步骤   必须依赖 必须的软件 flink运行在所有类unix环境中,例如:linux、mac、或者cygwin,并且集群由一个master节...
在Flink集群搭建和使用中遇到的坑
一、项目概况 使用Flink测试中间状态设置checkpoint和从checkpoint中恢复。 二、搭建中出现的问题 Flink的集群搭建中需要配置中间状态缓存的路径(项目中使用到的是在hdfs中存储中间状态) 在集群中需要配置的项目是(如果需要中间状态的保存,这个必须的): ##配置使用的web接口,用来访问集群。默认应该也可以 jobmanager.web.address: 192.168....
storm提交集群报错及处理
if (args != null && args.length > 0) {             conf.setNumWorkers(3);             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());           }
Storm集群已经部署、配置完毕,向集群提交任务。
原文:http://www.it610.com/article/1961490.htm启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Storm才能在 任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Supervisors被重 启,运行中的Topologies不会受到...
如何在集群中提交任务
-
Flink集群运行5个小时候后down掉
1 现象:flink集群执行job,运行5个小时后,jobmanager报:2018-03-29 15:25:39,311 INFO  org.apache.flink.runtime.jobmanager.JobManager                - RECEIVED SIGNAL 1: SIGHUP. Shutting down as requested.2018-03-29 15:...
Flink集群的搭建Standalone模式
今天我们来说一下flink,大家可能对flink还不是特别的熟悉,其实它是一个很早的项目,只是在2016年的时候才被大家所注意到,现在已经被很多公司所使用,作为一个后起之秀,或者说流计算的新贵,为什么它能得到大家的认可呢,Flink把批处理当作流处理中的一种特殊情况.在Flink中,所有的数据都看作流.这是一个很好的抽象.再加上它完善的checkpoint机制,使得它对状态的管理非常的细致.所以能...
记一次yarn资源足够但是任务却提交阻塞
直接上图: accepted.png yarn集群资源内存只用了246.50G,一共有515G,cpu用了211个,一共240个.但是这个任务死活提交不上去,此任务需要20个container容器,每个容器只要1个cpu,1G的内存,最多也就20个cpu和内存....... 考虑到yarn任务的state是ACCEPTED,ACCEPTED只...
Eclipse远程提交MapReduce任务到Hadoop集群
一、介绍 以前写完MapReduce任务以后总是打包上传到Hadoop集群,然后通过shell命令去启动任务,然后在各个节点上去查看Log日志文件,后来为了提高开发效率,需要找到通过Ecplise直接将MaprReduce任务直接提交到Hadoop集群中。该章节讲述用户如何从Eclipse的压缩包最终完成Eclipse提价任务给MapReduce集群。 二、详解 1、安装Eclipse,安装
Storm集群提交任务
本人原帖地址:http://www.jianshu.com/p/6783f1ec2da0,欢迎访问留言。 准备工作: 1. 将开发好的jar包上传到服务器package目录下 2. Storm集群已经启动 我们提交一个WordCount任务: 为了方便,我们仍以blogchong老师的代码为例 GitHub项目链接,请读者自行打包并上传到nimbus节点上nimbus服务器进入
flink 的datastream的作业提交问题
我们有一个search的系统,用来从不同的服务器上搜索不同类型的数据(应用程序日志,linux日志,系统日志,client或server日志), search后的数据要提交给Flink Cluster 去执行, 具体的应用模型是 search(Data)+ flink(data streaming)提交给flink集群, 应用场景是用户每search一次就要提交一次flink job ,
flink中akka的使用 以jobClient提交任务为例子
在flink中,集群内部的组件之间通过akka来互相通信,其中采用了akka中的actor模型。 当需要提交一个可用的任务交由jobManager来处理并分配资源时,将会在ClusterClinet中的run()方法中,交由jobClient通过submitJobAndWait()方法去提交相应的jobGraph给jobManager。 public JobExecutionResult r...
远程提交mapreduce到hadoop集群
从13年初,开始搞hadoop的时候,是采用hadoop-eclipse插件来进行开发mapreduce,发现使用这个插件,其实也就是把相关的jar,class文件提交到远程的hadoop集群。 而实际上要部署应用的时候,如果不在远程提交,就得把任务代码打包成JAR,ftp到集群机器上进行执行。当然也可以在一个client机器上部署一套hadoop环境,把任务JAR放在这里,再提...
spark集群的任务提交执行流程
本文转自:https://www.linuxidc.com/Linux/2018-02/150886.htm一、Spark on Standalone1.spark集群启动后,Worker向Master注册信息2.spark-submit命令提交程序后,driver和application也会向Master注册信息3.创建SparkContext对象:主要的对象包含DAGScheduler和Tas...
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 数据库课程设计任务 java培训的任务