在使用HBase中的BulkLoad方式导入数据的时候出现了问题,通过查找发现HFileOutputFormat2 not found,但是运行其他的bulkload程序时仍然正常,下面是我出现的问题:
21/05/25 10:45:56 INFO mapreduce.JobSubmitter: number of splits:1
21/05/25 10:45:56 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
21/05/25 10:45:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1621909750051_0002
21/05/25 10:45:56 INFO impl.YarnClientImpl: Submitted application application_1621909750051_0002
21/05/25 10:45:56 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1621909750051_0002/
21/05/25 10:45:56 INFO mapreduce.Job: Running job: job_1621909750051_0002
21/05/25 10:46:01 INFO mapreduce.Job: Job job_1621909750051_0002 running in uber mode : false
21/05/25 10:46:01 INFO mapreduce.Job: map 0% reduce 0%
21/05/25 10:46:01 INFO mapreduce.Job: Job job_1621909750051_0002 failed with state FAILED due to: Application application_1621909750051_0002 failed 2 times due to AM Container for appattempt_1621909750051_0002_000002 exited with exitCode: 1
For more detailed output, check application tracking page:http://master:8088/cluster/app/application_1621909750051_0002Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1621909750051_0002_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
at org.apache.hadoop.util.Shell.run(Shell.java:482)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
21/05/25 10:46:01 INFO mapreduce.Job: Counters: 0
----
然后这是我的java程序:
package edu.tju.weather.sea;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
/**
* 输入的数据内容为一个map处理一个文件,一个文件不能进行分割
*/
public class WaveDataInput {
private static final Log LOG = LogFactory.getLog(WaveDataInput.class);
/**
* 自定义的RecordReader,完成的任务是每次读取一整个文件的内容
*/
public static class WholeReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private boolean isProcessed = false;
private FSDataInputStream fsDataInputStream;
private byte[] buffer;
private FileSplit fileSplit;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 获得切片的位置
this.fileSplit = (FileSplit) split;
// 获取文件的Path
Path path = fileSplit.getPath();
// 需要传一个conf,我们在任务中,所有的信息在taskAttemptContext都有
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
fsDataInputStream = fileSystem.open(path); // 获取文件的输入流
}
/**
* 尝试读取keyValue,如果读取到了那就返回true,如果没有读取到就返回false
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!isProcessed) {
// 设置key为path的路径
String keyName = fileSplit.getPath().toString();
key.set(keyName);
// 一次将文件读取完毕
buffer = new byte[(int) fileSplit.getLength()];
fsDataInputStream.read(buffer);
value.set(buffer, 0, buffer.length);
isProcessed = true;
return true;
} else {
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return isProcessed ? 0 : 1;
}
@Override
public void close() throws IOException {
if (fsDataInputStream != null) {
IOUtils.closeStream(fsDataInputStream);
}
}
}
/**
* 自定义的读取文件文件
*/
public static class customFileInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// 设置为不分割文件
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new WholeReader();
}
}
/**
* 设置自定义的map
*/
public static class WaveMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] text = value.toString().split("\n");
/**
* 数据类型为:
* 标志符 层数 纬度 经度 时区 水深 样本平均时间 项目名称
* 2 2 335024N 0782848W 0000 9.8 3600 NDBC
*
* 数据行:
* 标志符 时间 深度 水平流速 水平流向
* 3 20160101000800 0.8 6.60 297.0
* 3 20160101000800 9.8 2.90 297.0
*/
// 设置全局变量经度 纬度
float lon = 0.0f; // 经度
float lat = 0.0f; // 纬度
int level = 0;
int region = 0;
for (String line : text) {
String ele[] = line.split(" ");
if (ele.length < 3) return;
if ("2".equals(ele[0].trim())) {
if(ele.length < 4) continue;
// 配置行的内容
level = Integer.parseInt(ele[1]);
// 纬度和经度的写入
lat = getLonLat(ele[2]);
lon = getLonLat(ele[3]);
} else if ("3".equals(ele[0].trim())) {
if(ele.length < 5) continue;
// 设置一个rowkey
region = (ele[1].hashCode())%100;
String timestap = System.nanoTime() + "";
String stamp = timestap.substring(timestap.length() - 9, timestap.length() - 2);
String rowkey = (String.format("%02d", region)) + ele[1] +stamp;
Put put = new Put(Bytes.toBytes(rowkey));
ImmutableBytesWritable putRowkey = new ImmutableBytesWritable(rowkey.getBytes());
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("time"), Bytes.toBytes(ele[1]));
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("lat"), Bytes.toBytes(lat));
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("lon"), Bytes.toBytes(lon));
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("depth"), Bytes.toBytes(Float.parseFloat(ele[2])));
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("wave_speed"), Bytes.toBytes(Float.parseFloat(ele[3])));
put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("wave_dir"), Bytes.toBytes(Float.parseFloat(ele[4])));
// 多次写入
ImmutableBytesWritable outkey = new ImmutableBytesWritable(rowkey.getBytes());
context.write(outkey, put);
}
}
}
}
public static void main(String[] args) throws Exception {
if(args.length < 3){
System.err.println("输入三个参数....");
return;
}
// 设置全局变量,设置Hadoop操作的用户名
System.setProperty("HADOOP_USER_NAME", "611");
System.out.println("程序开始启动。。。");
long start = System.currentTimeMillis();
String inputPath = "hdfs://192.168.1.237:9000/dataset/gtspp/gtspp4_in202012.txt";
String outputPath = "hdfs://192.168.1.237:9000/dataset/out3";
String HbaseTable = "sea:element";
inputPath = args[0];
outputPath = args[1];
HbaseTable = args[2];
Configuration conf = new Configuration();
conf.set("fs.defaultFs", "hdfs://192.168.1.237:9000");
conf.set("hbase.zookeeper.quorum", "master,slave1");
conf.set("hbase.master", "master:6000");
conf.set("hbase.zookeeper.property.clientPort", "2183");
// 设置跨平台提交任务
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置生成jar包的位置
conf.set("mapred.jar", "D:\\MyFile\\实验室项目\\2021大数据项目\\out\\artifacts\\wave\\wave.jar");
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
// 连接到HBase表中
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
HTable table = (HTable) conn.getTable(TableName.valueOf(HbaseTable));
final Path OutputPath=new Path(outputPath);
// 删除out目录
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.1.237:9000"), conf);
if(fs.exists(OutputPath))
fs.delete(OutputPath, true);
Job job = Job.getInstance(conf, "wave-load");
job.setMapperClass(WaveMapper.class);
job.setJarByClass(WaveDataInput.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
// 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// 等待job作业的完成,从HFile文件中生成HBase表格
if(job.waitForCompletion(true)){
LoadIncrementalHFiles Loader=new LoadIncrementalHFiles(conf);
Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf(HbaseTable)));
}
System.out.printf("程序运行完成,运行时间为:%fs 。。。。\n", (System.currentTimeMillis() - start)/1000.0);
LOG.info(String.format("程序运行完成,运行时间为:%fs 。。。。\n", (System.currentTimeMillis() - start)/1000.0));
}
public static float getLonLat(String str){
int flag = 1;
if(str.contains("N") || str.contains("E")){
flag = 1;
}else if(str.contains("W") || str.contains("S")){
flag = -1;
}else {
return 0;
}
float value =Float.parseFloat(str.substring(0, str.length() - 1))/10000 ;
return value * flag;
}
}
最后是http://master:8088/cluster/app/application_1621909750051_0002中显示的错误:
2021-05-25 10:46:00,634 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1621909750051_0002_000002
2021-05-25 10:46:00,824 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2021-05-25 10:46:00,825 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 2 cluster_timestamp: 1621909750051 } attemptId: 2 } keyId: -1575443230)
2021-05-25 10:46:01,055 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2021-05-25 10:46:01,057 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null
2021-05-25 10:46:01,094 INFO [main] org.apache.hadoop.service.AbstractService: Service org.apache.hadoop.mapreduce.v2.app.MRAppMaster failed in state INITED; cause: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:518)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:498)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.callWithJobClassLoader(MRAppMaster.java:1593)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.createOutputCommitter(MRAppMaster.java:498)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:284)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$5.run(MRAppMaster.java:1551)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1548)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1481)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2267)
at org.apache.hadoop.mapreduce.task.JobContextImpl.getOutputFormatClass(JobContextImpl.java:222)
at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:514)
... 11 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2171)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2265)
... 13 more
请求各位大神帮忙看看是什么问题!!!