secowo 2021-05-25 11:26 采纳率: 50%
浏览 303
已结题

使用HBase的bulkLoad的时候出现HFileOutputFormat2 not found

在使用HBase中的BulkLoad方式导入数据的时候出现了问题,通过查找发现HFileOutputFormat2 not found,但是运行其他的bulkload程序时仍然正常,下面是我出现的问题:

 

21/05/25 10:45:56 INFO mapreduce.JobSubmitter: number of splits:1
21/05/25 10:45:56 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
21/05/25 10:45:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1621909750051_0002
21/05/25 10:45:56 INFO impl.YarnClientImpl: Submitted application application_1621909750051_0002
21/05/25 10:45:56 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1621909750051_0002/
21/05/25 10:45:56 INFO mapreduce.Job: Running job: job_1621909750051_0002
21/05/25 10:46:01 INFO mapreduce.Job: Job job_1621909750051_0002 running in uber mode : false
21/05/25 10:46:01 INFO mapreduce.Job:  map 0% reduce 0%
21/05/25 10:46:01 INFO mapreduce.Job: Job job_1621909750051_0002 failed with state FAILED due to: Application application_1621909750051_0002 failed 2 times due to AM Container for appattempt_1621909750051_0002_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://master:8088/cluster/app/application_1621909750051_0002Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1621909750051_0002_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
21/05/25 10:46:01 INFO mapreduce.Job: Counters: 0

----

然后这是我的java程序:

package edu.tju.weather.sea;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

/**
 * 输入的数据内容为一个map处理一个文件,一个文件不能进行分割
 */
public class WaveDataInput {
    private static final Log LOG = LogFactory.getLog(WaveDataInput.class);

    /**
     * 自定义的RecordReader,完成的任务是每次读取一整个文件的内容
     */
    public static class WholeReader extends RecordReader<Text, Text> {

        private Text key = new Text();
        private Text value = new Text();
        private boolean isProcessed = false;
        private FSDataInputStream fsDataInputStream;
        private byte[] buffer;
        private FileSplit fileSplit;


        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            // 获得切片的位置
            this.fileSplit = (FileSplit) split;
            // 获取文件的Path
            Path path = fileSplit.getPath();
            // 需要传一个conf,我们在任务中,所有的信息在taskAttemptContext都有
            FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
            fsDataInputStream = fileSystem.open(path);  // 获取文件的输入流
        }

        /**
         * 尝试读取keyValue,如果读取到了那就返回true,如果没有读取到就返回false
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!isProcessed) {
                // 设置key为path的路径
                String keyName = fileSplit.getPath().toString();
                key.set(keyName);
                // 一次将文件读取完毕
                buffer = new byte[(int) fileSplit.getLength()];
                fsDataInputStream.read(buffer);
                value.set(buffer, 0, buffer.length);

                isProcessed = true;
                return true;
            } else {
                return false;
            }
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return isProcessed ? 0 : 1;
        }

        @Override
        public void close() throws IOException {
            if (fsDataInputStream != null) {
                IOUtils.closeStream(fsDataInputStream);
            }
        }
    }

    /**
     * 自定义的读取文件文件
     */
    public static class customFileInputFormat extends FileInputFormat<Text, Text> {

        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            // 设置为不分割文件
            return false;
        }

        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new WholeReader();
        }
    }


    /**
     * 设置自定义的map
     */
    public static class WaveMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String[] text = value.toString().split("\n");

            /**
             * 数据类型为:
             * 标志符 层数 纬度  经度  时区 水深 样本平均时间 项目名称
             * 2     2 335024N 0782848W  0000     9.8  3600 NDBC
             *
             * 数据行:
             * 标志符  时间 深度 水平流速 水平流向
             * 3 20160101000800     0.8       6.60    297.0
             * 3 20160101000800     9.8       2.90    297.0
             */
            // 设置全局变量经度 纬度
            float lon = 0.0f;   // 经度
            float lat = 0.0f;   // 纬度
            int level = 0;
            int region = 0;

            for (String line : text) {
                String ele[] = line.split(" ");
                if (ele.length < 3) return;

                if ("2".equals(ele[0].trim())) {
                    if(ele.length < 4) continue;
                    // 配置行的内容
                    level = Integer.parseInt(ele[1]);
                    // 纬度和经度的写入
                    lat = getLonLat(ele[2]);
                    lon = getLonLat(ele[3]);

                } else if ("3".equals(ele[0].trim())) {
                    if(ele.length < 5) continue;
                    // 设置一个rowkey
                    region = (ele[1].hashCode())%100;
                    String timestap = System.nanoTime() + "";
                    String stamp = timestap.substring(timestap.length() - 9, timestap.length() - 2);
                    String rowkey = (String.format("%02d", region)) + ele[1] +stamp;

                    Put put = new Put(Bytes.toBytes(rowkey));
                    ImmutableBytesWritable putRowkey = new ImmutableBytesWritable(rowkey.getBytes());
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("time"), Bytes.toBytes(ele[1]));
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("lat"), Bytes.toBytes(lat));
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("lon"), Bytes.toBytes(lon));
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("depth"), Bytes.toBytes(Float.parseFloat(ele[2])));
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("wave_speed"), Bytes.toBytes(Float.parseFloat(ele[3])));
                    put.addColumn(Bytes.toBytes("element"), Bytes.toBytes("wave_dir"), Bytes.toBytes(Float.parseFloat(ele[4])));

                    // 多次写入
                    ImmutableBytesWritable outkey = new ImmutableBytesWritable(rowkey.getBytes());
                    context.write(outkey, put);
                }
            }


        }
    }

    public static void main(String[] args) throws Exception {
        if(args.length < 3){
            System.err.println("输入三个参数....");
            return;
        }
        // 设置全局变量,设置Hadoop操作的用户名
        System.setProperty("HADOOP_USER_NAME", "611");
        System.out.println("程序开始启动。。。");
        long start = System.currentTimeMillis();

        String inputPath = "hdfs://192.168.1.237:9000/dataset/gtspp/gtspp4_in202012.txt";
        String outputPath = "hdfs://192.168.1.237:9000/dataset/out3";
        String HbaseTable = "sea:element";

        inputPath = args[0];
        outputPath = args[1];
        HbaseTable = args[2];

        Configuration conf = new Configuration();
        conf.set("fs.defaultFs", "hdfs://192.168.1.237:9000");
        conf.set("hbase.zookeeper.quorum", "master,slave1");
        conf.set("hbase.master", "master:6000");
        conf.set("hbase.zookeeper.property.clientPort", "2183");

        // 设置跨平台提交任务
        conf.set("mapreduce.app-submission.cross-platform", "true");
        // 设置生成jar包的位置
        conf.set("mapred.jar", "D:\\MyFile\\实验室项目\\2021大数据项目\\out\\artifacts\\wave\\wave.jar");
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());

        // 连接到HBase表中
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTable table = (HTable) conn.getTable(TableName.valueOf(HbaseTable));

        final Path OutputPath=new Path(outputPath);
        // 删除out目录
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.1.237:9000"), conf);
        if(fs.exists(OutputPath))
            fs.delete(OutputPath, true);

        Job job = Job.getInstance(conf, "wave-load");
        job.setMapperClass(WaveMapper.class);

        job.setJarByClass(WaveDataInput.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);

        // 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 等待job作业的完成,从HFile文件中生成HBase表格
        if(job.waitForCompletion(true)){
            LoadIncrementalHFiles Loader=new LoadIncrementalHFiles(conf);
            Loader.doBulkLoad(OutputPath, admin, table, conn.getRegionLocator(TableName.valueOf(HbaseTable)));
        }

        System.out.printf("程序运行完成,运行时间为:%fs 。。。。\n", (System.currentTimeMillis() - start)/1000.0);
        LOG.info(String.format("程序运行完成,运行时间为:%fs 。。。。\n", (System.currentTimeMillis() - start)/1000.0));

    }

    public static float getLonLat(String str){
        int flag = 1;
        if(str.contains("N") || str.contains("E")){
            flag = 1;
        }else if(str.contains("W") || str.contains("S")){
            flag = -1;
        }else {
            return 0;
        }

        float value =Float.parseFloat(str.substring(0, str.length() - 1))/10000 ;
        return value * flag;
    }
}

 

最后是http://master:8088/cluster/app/application_1621909750051_0002中显示的错误:

2021-05-25 10:46:00,634 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1621909750051_0002_000002
2021-05-25 10:46:00,824 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2021-05-25 10:46:00,825 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 2 cluster_timestamp: 1621909750051 } attemptId: 2 } keyId: -1575443230)
2021-05-25 10:46:01,055 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2021-05-25 10:46:01,057 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null
2021-05-25 10:46:01,094 INFO [main] org.apache.hadoop.service.AbstractService: Service org.apache.hadoop.mapreduce.v2.app.MRAppMaster failed in state INITED; cause: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:518)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:498)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.callWithJobClassLoader(MRAppMaster.java:1593)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.createOutputCommitter(MRAppMaster.java:498)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:284)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$5.run(MRAppMaster.java:1551)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1548)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1481)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2267)
    at org.apache.hadoop.mapreduce.task.JobContextImpl.getOutputFormatClass(JobContextImpl.java:222)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$2.call(MRAppMaster.java:514)
    ... 11 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2171)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2265)
    ... 13 more

请求各位大神帮忙看看是什么问题!!!

  • 写回答

1条回答 默认 最新

  • 江湖侠客 2023-02-17 23:57
    关注

    HFileOutputFormat2 not found 错误通常是由于缺少 HBase 依赖库或版本不兼容导致的。具体来说,这个错误通常出现在使用 HBase 2.x 版本的应用程序中,但是没有正确包含 HBase 2.x 版本的依赖库,或者依赖库版本不正确。
    您提到其他 BulkLoad 程序能够正常运行,这表明这些程序可能是针对特定版本的 HBase 编译的。而您遇到的问题可能是由于使用了不兼容的 HBase 版本或缺少相应的依赖库。
    解决这个问题的方法是检查您的应用程序的依赖库,并确保它们与您的 HBase 版本兼容。如果您正在使用 Maven 等构建工具,则可以检查您的 pom.xml 文件中的 HBase 依赖库是否正确,并且与您正在使用的 HBase 版本相匹配。如果您手动添加依赖库,则需要检查这些库是否适用于您正在使用的 HBase 版本。
    此外,如果您使用的是 HBase 2.x 版本,请确保使用正确的 API 和类。对于 BulkLoad,您应该使用 HBase 2.x 中提供的 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 类,而不是旧版本的 HFileOutputFormat。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 6月30日
  • 已采纳回答 6月22日

悬赏问题

  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!