yigerendengqingtian 2015-11-17 02:25
浏览 1196
已结题

spark计算mongodb数据,不知是环境的问题还是代码的问题,还没入门大神们帮帮忙啊

spark计算mongodb中的数据,总是计算不出结果,这些错误信息也找不到是为什么, 有一两次能计算出结果 。第一次接触这个东西 大神们帮帮忙啊

主要代码如下:

SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster(SPARK_PATH);
    sparkConf.setAppName("Logs_Collect");
    String[] jars = { "F:\\bigdata.jar" };// 将文件导出为jar包,不然会报classNotFound的异常
    sparkConf.setJars(jars);
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Configuration config = new Configuration();
    config.set("mongo.input.uri", MONGODB_URL + ".log");
    config.set("mongo.output.uri", MONGODB_URL + ".testcollect");
    Date start = DateUtil.getLastNDay(dateRange);
    Date end = DateUtil.getLastNDay(0);
    // 从mongodb取数据
    JavaPairRDD<Object, BSONObject> mongoRDD = ctx.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class,
            BSONObject.class);

    JavaPairRDD<Object, BSONObject> mongoRDD2 = mongoRDD
            .filter(new Function<Tuple2<Object, BSONObject>, Boolean>() {
                @Override
                public Boolean call(Tuple2<Object, BSONObject> arg0) throws Exception {
                    if (((Date) arg0._2.get("time")).after(start) && ((Date) arg0._2.get("time")).before(end)) {
                        return true;
                    } else
                        return false;
                }
            });
    JavaPairRDD<Map<String, Object>, BSONObject> mongoRDD3 = mongoRDD2
            .mapToPair(new PairFunction<Tuple2<Object, BSONObject>, Map<String, Object>, BSONObject>() {
                @Override
                public Tuple2<Map<String, Object>, BSONObject> call(Tuple2<Object, BSONObject> arg0)
                        throws Exception {
                    Object host = arg0._2.get("host");
                    Object content = arg0._2.get("content");
                    Map<String, Object> k = new HashMap<String, Object>();
                    k.put("host", host);
                    k.put("content", content);
                    return new Tuple2<Map<String, Object>, BSONObject>(k, arg0._2);
                }
            });

    JavaPairRDD<Map<String, Object>, Integer> mongoRDD4 = mongoRDD3
            .mapToPair(new PairFunction<Tuple2<Map<String, Object>, BSONObject>, Map<String, Object>, Integer>() {
                @Override
                public Tuple2<Map<String, Object>, Integer> call(Tuple2<Map<String, Object>, BSONObject> arg0)
                        throws Exception {
                    return new Tuple2<Map<String, Object>, Integer>(arg0._1, 1);
                }
            });

    JavaPairRDD<Map<String, Object>, Integer> mongoRDD5 = mongoRDD4
            .reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    Map<Map<String, Object>, Integer> map2 = mongoRDD5.collectAsMap();

运算过程console信息:(前面没有出错的就不贴了)

[INFO] com.mongodb.hadoop.splitter.MongoCollectionSplitter - Created split: min={ "_id" : { "$oid" : "563dc85a002e25dc6bfd59cd"}}, max= { "_id" : { "$oid" : "563dc85b002e25dc6bfd7b1b"}}
[INFO] com.mongodb.hadoop.splitter.MongoCollectionSplitter - Created split: min={ "_id" : { "$oid" : "563dc85b002e25dc6bfd7b1b"}}, max= null
[Stage 0:> (0 + 4) / 79][DEBUG] org.spark-project.jetty.http.HttpParser - filled 173/173
[DEBUG] org.spark-project.jetty.server.Server - REQUEST /jars/bigdata.jar on BlockingHttpConnection@3190b6f6,g=HttpGenerator{s=0,h=-1,b=-1,c=-1},p=HttpParser{s=-5,l=10,c=0},r=1
[DEBUG] org.spark-project.jetty.server.Server - RESPONSE /jars/bigdata.jar 200 handled=true
[WARN] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, slave02): java.io.IOException: java.lang.ArrayIndexOutOfBoundsException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1141)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException
at java.lang.System.arraycopy(Native Method)
at org.bson.BasicBSONDecoder$BSONInput._need(BasicBSONDecoder.java:404)
at org.bson.BasicBSONDecoder$BSONInput.read(BasicBSONDecoder.java:452)
at org.bson.BasicBSONDecoder$BSONInput.readCStr(BasicBSONDecoder.java:492)
at org.bson.BasicBSONDecoder.decodeElement(BasicBSONDecoder.java:197)
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:153)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:121)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
... 24 more

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 关于#python#的问题:求帮写python代码
    • ¥20 MATLAB画图图形出现上下震荡的线条
    • ¥15 LiBeAs的带隙等于0.997eV,计算阴离子的N和P
    • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
    • ¥15 来真人,不要ai!matlab有关常微分方程的问题求解决,
    • ¥15 perl MISA分析p3_in脚本出错
    • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
    • ¥15 ubuntu虚拟机打包apk错误
    • ¥199 rust编程架构设计的方案 有偿
    • ¥15 回答4f系统的像差计算