实战 | 将Apache Hudi数据集写入阿里云OSS

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

实战 | 将Apache Hudi数据集写入阿里云OSS

leesf   2020-04-25 我要评论
### 1. 引入 云上对象存储的廉价让不少公司将其作为主要的存储方案,而Hudi作为数据湖解决方案,支持对象存储也是必不可少。之前AWS EMR已经内置集成Hudi,也意味着可以在S3上无缝使用Hudi。当然国内用户可能更多使用阿里云OSS作为云上存储方案,那么如果用户想基于OSS构建数据湖,那么Hudi是否支持呢?随着Hudi社区主分支已经合并了支持OSS的PR,现在只需要基于master分支build版本即可,或者等待下一个版本释出便可直接使用,经过简单的配置便可将数据写入OSS。 ### 2. 配置 #### 2.1 pom依赖 需要额外添加的主要pom依赖如下 ```xml org.apache.hadoop hadoop-aliyun 3.2.1 com.aliyun.oss aliyun-sdk-oss 3.8.1 ``` #### 2.2 core-site.xml配置 若需访问OSS,需要修改core-site.xml,关键配置如下 ```xml fs.defaultFS oss://bucketname/ fs.oss.endpoint oss-endpoint-address Aliyun OSS endpoint to connect to. fs.oss.accessKeyId oss_key Aliyun access key ID fs.oss.accessKeySecret oss-secret Aliyun access key secret fs.oss.impl org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem ``` ### 3. 源码 示例源码如下 ```java import org.apache.hudi.QuickstartUtils.*; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.util.List; import static org.apache.hudi.QuickstartUtils.convertToStringList; import static org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs; import static org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME; import static org.apache.spark.sql.SaveMode.Overwrite; public class OssHudiDemo { public static void main(String[] args) throws IOException { SparkSession spark = SparkSession.builder().appName("Hoodie Datasource test") .master("local[2]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.io.compression.codec", "snappy") .config("spark.sql.hive.convertMetastoreParquet", "false") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); String tableName = "hudi_trips_cow"; String basePath = "/tmp/hudi_trips_cow"; DataGenerator dataGen = new DataGenerator(); List inserts = convertToStringList(dataGen.generateInserts(10)); Dataset df = spark.read().json(jsc.parallelize(inserts, 2)); df.write().format("org.apache.hudi"). options(getQuickstartWriteConfigs()). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath); Dataset roViewDF = spark.read().format("org.apache.hudi").load(basePath + "/*/*/*"); roViewDF.registerTempTable("hudi_ro_table"); spark.sql("select * from hudi_ro_table").show(false); spark.stop(); } } ``` 即先写入OSS,下图可以看到OSS的Bucket中已经成功写入了数据,然后再通过spark查询写入的结果。 ![](https://img2020.cnblogs.com/blog/616953/202004/616953-20200425145746705-629205626.png) 部分查询结果如下 ```xml |20200421205942 |20200421205942_2_10 |6fd496f8-ebee-4f67-8f86-783ff3fed3ab|asia/india/chennai |1f71bed9-833b-4fca-8b4b-4cd014bdf88a-0_2-22-30_20200421205942.parquet|0.40613510977307 |0.5644092139040959 |driver-213|0.798706304941517 |0.02698359227182834|17.851135255091155|asia/india/chennai |rider-213|0.0|6fd496f8-ebee-4f67-8f86-783ff3fed3ab| ``` 所有源代码已经上传至[https://github.com/leesf/oss-hudi-demo](https://github.com/leesf/oss-hudi-demo) ### 4. 最后 本篇文章很简单,只用作展示如何通过Hudi将数据写入OSS。当数据写入OSS后,便可打通阿里云上几乎所有产品,这使得基于阿里云技术栈进行数据湖分析将变得非常简单,比如使用DLA(Data Lake Analytics),对标AWS的Athena,对Hudi数据集进行分析查询,一体化的流程会让分析变得异常简单。

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们