程序员小李的开发路 2023-05-13 17:38 采纳率: 0%
浏览 150
已结题

flink1.15读取hdfs目录文件

我需要使用flink1.15去读取hdfs目录下的parquet文件,并且将其转为hfile,请问有合适的样例不,非常感谢

不要chatgpt生成的哈 需要能跑的,另外请附上pom依赖!

  • 写回答

7条回答 默认 最新

  • 肩匣与橘 游戏开发领域新星创作者 2023-05-15 09:47
    关注

    直接就是上代码:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.fs.StringWriter;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.parquet.avro.AvroParquetReader;
    import org.apache.parquet.hadoop.ParquetFileReader;
    import org.apache.parquet.hadoop.api.ReadSupport;
    import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    import org.apache.parquet.schema.MessageType;
    import org.apache.parquet.schema.MessageTypeParser;
    import java.io.IOException;
    public class ParquetToHFile {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 读取Parquet文件
            DataStream<String> parquetDataStream = env.readTextFile("hdfs://your-hdfs-path/*.parquet");
            // 将Parquet数据转换为HFile
            DataStream<Put> hfileDataStream = parquetDataStream.map(new MapFunction<String, Put>() {
                @Override
                public Put map(String value) throws Exception {
                    // 从Parquet文件中读取Avro记录
                    Configuration conf = new Configuration();
                    Path parquetFilePath = new Path(value);
                    ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(conf, parquetFilePath);
                    MessageType schema = parquetMetadata.getFileMetaData().getSchema();
                    ReadSupport<Object> readSupport = new AvroReadSupport<>(schema);
                    ParquetFileReader reader = new ParquetFileReader(conf, parquetFilePath, parquetMetadata);
                    try (ParquetRecordReader<Object> recordReader = new AvroParquetReader<>(reader, readSupport)) {
                        Object record = null;
                        while ((record = recordReader.read()) != null) {
                            // 将Avro记录转换为HBase Put
                            Put put = new Put(Bytes.toBytes(record.get("rowkey")));
                            for (String column : schema.getFields()) {
                                String value = record.get(column).toString();
                                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(column), Bytes.toBytes(value));
                            }
                            return put;
                        }
                    }
                    return null;
                }
            });
            // 将HFile数据写入HDFS
            Configuration hbaseConf = HBaseConfiguration.create();
            hbaseConf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3"); // 替换为您的Zookeeper地址
            Path hfilePath = new Path("/your-hdfs-path/hfile");
            hfileDataStream
                    .map(new MapFunction<Put, Tuple2<ImmutableBytesWritable, Put>>() {
                        @Override
                        public Tuple2<ImmutableBytesWritable, Put> map(Put value) throws Exception {
                            return new Tuple2<>(new ImmutableBytesWritable(value.getRow()), value);
                        }
                    })
                    .writeUsingOutputFormat(HFileOutputFormat2.configure()
                            .withTable("your-table-name") // 替换为您的HBase表名
                            .withConfiguration(hbaseConf)
                            .withOutputPath(hfilePath)
                            .build())
                    .setParallelism(1);
            env.execute("ParquetToHFile");
        }
    }
    
    

    读取HDFS中所有Parquet文件,将其转换为HBase的HFile格式,并将其写入HDFS中。
    xml文件包含的依赖 依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-fs</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hbase_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-hadoop</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-column</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hbase</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    
    
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 5月16日
  • 修改了问题 5月14日
  • 赞助了问题酬金50元 5月14日
  • 赞助了问题酬金20元 5月13日
  • 展开全部

悬赏问题

  • ¥15 is not in the mmseg::model registry。报错,模型注册表找不到自定义模块。
  • ¥15 安装quartus II18.1时弹出此error,怎么解决?
  • ¥15 keil官网下载psn序列号在哪
  • ¥15 想用adb命令做一个通话软件,播放录音
  • ¥30 Pytorch深度学习服务器跑不通问题解决?
  • ¥15 部分客户订单定位有误的问题
  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Bug traq 数据包 大概什么价
  • ¥15 在anaconda上pytorch和paddle paddle下载报错
  • ¥25 自动填写QQ腾讯文档收集表