关于hbase(双节点集群)提供的subject_score数据集使用hbase map reduce编写代码并打成jar包将subject_score数据导入hbase的subject表中
可转文字
可转文字
关于hbase(双节点集群)提供的subject_score数据集使用hbase map reduce编写代码并打成jar包将subject_score数据导入hbase的subject表中
引自免费微信小程序:皆我百晓生
要将CSV文件subject_score.csv
中的数据导入到HBase的subject
表中,可以使用MapReduce作业。首先,确保Hadoop和HBase环境已正确配置,并且CSV文件已经上传到HDFS。
在HBase Shell中创建一个名为subject
的表,包含info
列族。
hbase(main):001:0> create 'subject', 'info'
使用Java编写一个MapReduce程序,读取HDFS上的CSV文件,并将数据写入HBase表中。
Mapper类应该解析CSV行,并将学号作为row key,科目和分数作为列名和值。
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class SubjectScoreMapper extends TableMapper<ImmutableBytesWritable, Put> {
private final static ImmutableBytesWritable row = new ImmutableBytesWritable();
private final static Text family = new Text("info");
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
if (parts.length == 3) {
String studentId = parts[0];
String subject = parts[1];
String score = parts[2];
Put put = new Put(Bytes.toBytes(studentId));
put.addColumn(family.getBytes(), Bytes.toBytes(subject), Bytes.toBytes(score));
row.set(Bytes.toBytes(studentId));
context.write(row, put);
}
}
}
Reducer在这个场景中不是必需的,因为我们可以直接从Mapper输出写入HBase。但是,如果你需要进行一些汇总操作,你可以添加一个Reducer类。
这个类用于设置作业参数,包括输入输出路径、Mapper和Reducer类(如果有的话),以及HBase表的信息。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class SubjectScoreJob {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, "Subject Score Import");
job.setJarByClass(SubjectScoreJob.class);
job.setMapperClass(SubjectScoreMapper.class);
// job.setReducerClass(YourReducerClass.class); // 如果有Reducer
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(TableOutputFormat.class);
TableOutputFormat.configureIncrementalLoad(job, "subject", null);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译上述Java类,并将它们打包成一个JAR文件。然后,使用Hadoop命令行工具提交MapReduce作业。
hadoop jar your-jar-file.jar SubjectScoreJob /path/to/subject_score.csv
这将启动MapReduce作业,从CSV文件读取数据,并将其写入HBase的subject
表中。确保替换your-jar-file.jar
和/path/to/subject_score.csv
为实际的JAR文件名和CSV文件的HDFS路径。