import java.io.IOException;
import java.net.URI;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple3;
public class Test {
public static void main(String[] args) throws IOException {
// 创建一个配置类SparkConf,然后创建一个SparkContext
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
String url = "E://";
JavaRDD<String> data = null;
Configuration config = new Configuration();
FileSystem hoursHDFS = FileSystem.get(URI.create(url),config);
FileStatus[] hoursFile = hoursHDFS.listStatus(new Path(url));
Path[] paths = FileUtil.stat2Paths(hoursFile);
for (int i = 0; i < paths.length; i++) {
Path path = paths[i];
// 读取原文件数据,每一行记录转成RDD里面的一个元素
data = jsc.textFile(path.toString());
String newFile = "E://"+Long.valueOf(new Date().getTime())+".txt";
FileSystem newHDFS = FileSystem.get(URI.create(newFile),config);
// 将每条记录的每列切割出来,生成一个Tuple
JavaRDD<Tuple3<String, String, Integer>> person = data
.map(new Function<String, Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = -2381522520231963249L;
public Tuple3<String, String, Integer> call(String s)
throws Exception {
// 按逗号分割一行数据
String[] tokens = s.split(",");
// 将分割后的三个元素组成一个三元Tuple
Tuple3<String, String, Integer> person = new Tuple3<String, String, Integer>(
tokens[0], tokens[1], Integer
.parseInt(tokens[2]));
return person;
}
});
FSDataOutputStream os = newHDFS.create(new Path(newFile),true);
// 遍历数据写到新文件中
for (Tuple3<String,String, Integer> d : person.collect()) {
StringBuffer sb = new StringBuffer();
sb.append(d._1() + "," + d._2()+","+d._3());
os.write(sb.toString().getBytes("UTF-8"));
}
}
// 将每条记录的每列切割出来,生成一个Tuple
jsc.stop();
jsc.close();
}
}
首先是代码。
目前功能已实现到读取文件夹的文件写到一个新文件上。
但是想以10个文件读取一次写到一个新文件上。没啥思路。
因为我写代码对于算法逻辑最是想不通。
求大神指教,另外代码是用spark处理的。在读文件时也是在hdfs里去读取。目前代码是写的本地路径