愁容骑士z 2019-01-13 18:45 采纳率: 0%
浏览 1277

用spark提供的java API写的程序怎么远程提交到集群上运行。

小弟最近在做一个机器学习平台,想通过前台选择数据源、算法、参数之类的东西,由后台程序提交到spark集群上调用sparkML库来跑出结果,然后把结果返回之后在前台渲染出效果。实验室之前有搭spark集群,这两天看了一下java提交任务上去spark集群的东西,似乎都是要先把东西打jar包,再传服务器通过spark-submit,这样跟需求就不符了,恳求各位使用java调用过spark的大侠答疑解惑。委实是之前没用过这方面的使用经验。之前有找过一些代码如下。

public class TestUtil {


    public static void main(String[] args){


        System.setProperty("user.name", "root");
        SparkConf conf = new SparkConf().setAppName("Spark Java API 学习")
                .setMaster("spark://211.87.227.79:7077");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> users = sc.textFile("hdfs://211.87.227.79:8020/input/wordcount.txt");
        System.out.println(users.first());
    }

}

看了spark的UI这个任务确实也提交上去了,但是idea的控制台一直重复地报这一段

图片说明

sparkUI如图。

图片说明

CSDN没币了没法悬赏。要是有大侠可以解决,可以有偿,留联系方式就行。

  • 写回答

1条回答 默认 最新

  • 你知我知皆知 2024-08-07 17:22
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    你已经正确地设置了JavaSparkContext并执行了一个简单的MapReduce作业。如果你想要将你的代码与Spark集群连接起来,你需要实现一个接口或类,该类能够处理Spark的API,并且能够发送和接收Spark作业。

    以下是一个基本的例子,展示了如何创建一个新的SparkContext实例:

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    public class SparkJob {
        private static final String SCHEMA_FILE_PATH = "/path/to/schema.json";
        private static final String DATA_FILE_PATH = "/path/to/data.csv";
    
        public static void main(String[] args) throws Exception {
            SparkConf sparkConf = new SparkConf()
                .setAppName("MyApp")
                .setMaster("local[*]")
                .set("spark.sql.warehouse.dir", "/tmp/spark-warehouse");
    
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
    
            // Read the data from a CSV file
            JavaPairRDD<String, Integer> data = sc.textFile(DATA_FILE_PATH)
                .mapToPair(line -> line.split(","))
                .mapToPair(row -> new Tuple2<>(row[0], Integer.parseInt(row[1])));
    
            // Write the result to a JSON file
            data.saveAsTextFile(SCHEMA_FILE_PATH);
    
            sc.close();
        }
    }
    

    在这个例子中,我们首先设置了一个SparkConf对象,其中包含了一些默认的配置信息。然后,我们创建了一个JavaSparkContext实例,并读取了CSV文件的数据。最后,我们将数据写入JSON格式的文件。

    如果你想让这个作业能够在Spark集群上运行,你需要实现一个方法,该方法接受一个SparkSession对象作为输入,并返回一个结果集。例如,你可以这样做:

    import org.apache.spark.api.java.function.Function;
    import scala.Tuple2;
    import java.util.Arrays;
    
    public class SparkJob {
    
        public static void main(String[] args) throws Exception {
            SparkConf sparkConf = new SparkConf()
                .setAppName("MyApp")
                .setMaster("local[*]")
                .set("spark.sql.warehouse.dir", "/tmp/spark-warehouse");
    
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
    
            // Read the data from a CSV file
            JavaPairRDD<String, Integer> data = sc.textFile("/path/to/data.csv")
                .mapToPair(line -> line.split(","))
                .mapToPair(row -> new Tuple2<>(row[0], Integer.parseInt(row[1])));
    
            // Write the result to a JSON file
            Function<Tuple2<String, Integer>, String> mapper = (pair) -> pair._1 + "," + pair._2;
    
            data.map(mapper).saveAsTextFile("/path/to/output.txt");
    
            sc.close();
        }
    }
    

    在这个例子中,我们创建了一个函数mapper,它接受一个Tuple2<String, Integer>作为输入,并将其转换为字符串形式的键值对。然后,我们使用saveAsTextFile方法将这些键值对保存到文本文件中。

    注意:在实际的应用中,你可能需要根据自己的需求修改这个代码。例如,你可能需要添加更多的逻辑(如错误处理),或者使用更复杂的数据结构(如RDD)。此外,你还需要考虑如何在不同的Spark版本之间迁移代码。

    评论

报告相同问题?