Apache Hudi使用是怎么样的

技术Apache Hudi使用是怎么样的Apache Hudi使用是怎么样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。]数据实时处理和实时的数据实时分为

阿帕奇胡迪是如何工作的?我相信很多没有经验的人对此无能为力。为此,本文总结了问题产生的原因及解决方法。希望你能通过这篇文章解决这个问题。]

00-1010实时可分为数据的实时处理和实时即席分析,这就需要对数据进行实时处理,并应立即获得相应的结果。Flink和Spark Streaming用于实时数据的实时处理,需要实时处理和快速处理。数据不实时、处理不及时的场景就是我们的仓库T 1数据。

本文讨论的阿帕奇胡迪,对应的场景是实时数据,而不是实时处理。它旨在将Mysql中的时间近乎实时地映射到Hive等大数据平台。

数据实时处理和实时的数据

传统的离线数据仓库,通常数据为T ^ 1,不能满足当天的数据分析需求,而流式计算一般基于窗口,且窗口逻辑相对固定。作者工作的公司有一种特殊的需求。业务分析熟悉现有交易数据库的数据结构,希望会有大量的即席分析,包括当天的实时数据。他们曾经直接通过基于Mysql从库的Sql做相应的分析计算。但是很多时候你会遇到以下障碍。

当数据量大,分析逻辑复杂时,Mysql从数据库下载需要很长时间。

一些跨库分析无法实现。

因此,出现了一些弥合OLTP和OLAP之间差距的技术框架,通常是TiDB。它可以支持OLTP和OLAP。而Apache胡迪和Apache Kudu则相当于OLTP和OLAP技术之间的现有桥梁。它们可以将数据存储在现有的OLTP数据结构中,支持CRUD,并提供与现有OLAP框架(如Hive、Impala)的集成,实现OLAP分析。

Apache Kudu,需要单独部署集群。阿帕奇胡迪不需要它。它可以利用现有的大数据集群如HDFS存储数据文件,然后利用Hive做数据分析。相对来说更适合资源受限的环境# #阿帕奇胡迪入门。

业务场景和技术选型

胡迪提供了支持CRUD操作的胡迪表的概念。基于这个特性,我们可以将Mysql Binlog数据重放到胡迪表,然后基于Hive对胡迪表进行查询和分析。数据流结构如下Apache  Hudi使用是怎么样的

使用Aapche Hudi整体思路

胡迪数据文件可以存储在操作系统的文件系统或分布式文件系统HDFS中。HDFS通常用于存储,以便随后分析性能和数据可靠性。就HDFS存储而言,胡迪表的存储文件可以分为两类。

Apache  Hudi使用是怎么样的

与_partition_key相关的路径是按分区存储的实际数据文件。当然可以指定分区的路径键。我在这里使用了_partition_key。帽衫由于CRUD的分散性,每个操作都会生成一个文件。当小文件越来越多的时候,会严重影响HDFS的表现。胡迪设计了一种文件合并机制。那个。hoodie文件夹存储与相应文件合并操作相关的日志文件。

Hudi表数据结构

胡迪真实数据文件以拼花文件格式存储Apache  Hudi使用是怎么样的

数据文件

随着时间的推移,胡迪在时间线上调用了一系列CRUD操作。时间轴中的操作称为即时。即时包含以下信息

实例化记录这个操作是数据提交、文件合并还是文件清理。

瞬间时间此操作发生的时间。

状态操作的状态,已启动(已请求)、正在进行(正在进行)或已完成(已完成)

那个。帽衫文件夹存储相应操作的状态记录Apache  Hudi使用是怎么样的

.hoodie文件

胡迪需要能够唯一地识别记录,以便实现数据的CRUD。胡迪会的

把数据集中的唯一字段(record key ) + 数据所在分区 (partitionPath) 联合起来当做数据的唯一键

COW和MOR

基于上述基础概念之上,Hudi提供了两类表格式COW和MOR。他们会在数据的写入和查询性能上有一些不同

Copy On Write Table

简称COW。顾名思义,他是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据。正在读数据的请求,读取的是是近的完整副本,这类似Mysql 的MVCC的思想。

Apache Hudi使用是怎么样的

上图中,每一个颜色都包含了截至到其所在时间的所有数据。老的数据副本在超过一定的个数限制后,将被删除。这种类型的表,没有compact instant,因为写入时相当于已经compact了。

  • 优点 读取时,只读取对应分区的一个数据文件即可,较为高效

  • 缺点 数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后

Merge On Read Table

简称MOR。新插入的数据存储在delta log 中。定期再将delta log合并进行parquet数据文件。读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。当然,MOR表也可以像COW表一样,忽略delta log,只读取最近的完整数据文件。下图演示了MOR的两种数据读写方式 Apache Hudi使用是怎么样的

  • 优点 由于写入数据先写delta log,且delta log较小,所以写入成本较低

  • 缺点 需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log 和 老数据文件合并

基于hudi的代码实现

我在github上放置了基于Hudi的封装实现,对应的源码地址为 https://github.com/wanqiufeng/hudi-learn。

binlog数据写入Hudi表
  • binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为

参数名 含义 是否必填 默认值
--base-save-path hudi表存放在HDFS的基础路径,比如hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name 指定处理的Mysql库名
--mapping-mysql-table-name 指定处理的Mysql表名
--store-table-name 指定Hudi的表名 默认会根据--mapping-mysql-db-name和--mapping-mysql-table-name自动生成。假设--mapping-mysql-db-name 为crm,--mapping-mysql-table-name为order。那么最终的hudi表名为crm__order
--real-save-path 指定hudi表最终存储的hdfs路径 默认根据--base-save-path和--store-table-name自动生成,生成格式为'--base-save-path'+'/'+'--store-table-name' ,推荐默认
--primary-key 指定同步的mysql表中能唯一标识记录的字段名 默认id
--partition-key 指定mysql表中可以用于分区的时间字段,字段必须是timestamp 或dateime类型
--precombine-key 最终用于配置hudi的hoodie.datasource.write.precombine.field 默认id
--kafka-server 指定Kafka 集群地址
--kafka-topic 指定消费kafka的队列
--kafka-group 指定消费kafka的group 默认在存储表名前加'hudi'前缀,比如'hudi_crm__order'
--duration-seconds 由于本程序使用Spark streaming开发,这里指定Spark streaming微批的时长 默认10秒

一个使用的demo如下

/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
	--name hudi__goods \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
	--num-executors 1 \
    --queue hudi \
    --conf spark.executor.memoryOverhead=2048 \
    --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
	--conf spark.core.connection.ack.wait.timeout=300 \
	--conf spark.locality.wait=100 \
	--conf spark.streaming.backpressure.enabled=true \
	--conf spark.streaming.receiver.maxRate=500 \
	--conf spark.streaming.kafka.maxRatePerPartition=200 \
	--conf spark.ui.retainedJobs=10 \
	--conf spark.ui.retainedStages=10 \
	--conf spark.ui.retainedTasks=10 \
	--conf spark.worker.ui.retainedExecutors=10 \
	--conf spark.worker.ui.retainedDrivers=10 \
	--conf spark.sql.ui.retainedExecutions=10 \
	--conf spark.yarn.submit.waitAppCompletion=false \
	--conf spark.yarn.maxAppAttempts=4 \
	--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
	--conf spark.yarn.max.executor.failures=20 \
	--conf spark.yarn.executor.failuresValidityInterval=1h \
	--conf spark.task.maxFailures=8 \
    /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
历史数据同步以及表元数据同步至hive

history_import_and_meta_sync 分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作

同步历史数据至hudi表

这里采用的思路是

  • 将mysql全量数据通过注入sqoop等工具,导入到hive表。

  • 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表

HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为

参数名 含义 是否必填 默认值
--base-save-path hudi表存放在HDFS的基础路径,比如hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name 指定处理的Mysql库名
--mapping-mysql-table-name 指定处理的Mysql表名
--store-table-name 指定Hudi的表名 默认会根据--mapping-mysql-db-name和--mapping-mysql-table-name自动生成。假设--mapping-mysql-db-name 为crm,--mapping-mysql-table-name为order。那么最终的hudi表名为crm__order
--real-save-path 指定hudi表最终存储的hdfs路径 默认根据--base-save-path和--store-table-name自动生成,生成格式为'--base-save-path'+'/'+'--store-table-name' ,推荐默认
--primary-key 指定同步的hive历史表中能唯一标识记录的字段名 默认id
--partition-key 指定hive历史表中可以用于分区的时间字段,字段必须是timestamp 或dateime类型
--precombine-key 最终用于配置hudi的hoodie.datasource.write.precombine.field 默认id
--sync-hive-db-name 全量历史数据所在hive的库名
--sync-hive-table-name 全量历史数据所在hive的表名
--hive-base-path hive的所有数据文件存放地址,需要参看具体的hive配置 /user/hive/warehouse
--hive-site-path hive-site.xml配置文件所在的地址
--tmp-data-path 程序执行过程中临时文件存放路径。一般默认路径是/tmp。有可能出现/tmp所在磁盘太小,而导致历史程序执行失败的情况。当出现该情况时,可以通过该参数自定义执行路径 默认操作系统临时目录

一个程序执行demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
同步hudi表结构至hive meta

需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下

参数名 含义 是否必填 默认值
--hive-db-name 指定hudi表同步至哪个hive数据库
--hive-table-name 指定hudi表同步至哪个hive表
--hive-jdbc-url 指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000
--hive-user-name 指定hive meta的链接用户名 默认hive
--hive-pwd 指定hive meta的链接密码 默认hive
--hudi-table-path 指定hudi表所在hdfs的文件路径
--hive-site-path 指定hive的hive-site.xml路径

一个程序执行demo

java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑

hive相关配置

有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

spark streaming的一些调优

由于binlog写入Hudi表的是基于Spark streaming实现的,这里给出了一些spark 和spark streaming层面的配置,它能使整个程序工作更稳定

配置 含义
spark.streaming.backpressure.enabled=true 启动背压,该配置能使Spark Streaming消费速率,基于上一次的消费情况,进行调整,避免程序崩溃
spark.ui.retainedJobs=10 <br> spark.ui.retainedStages=10 <br> spark.ui.retainedTasks=10 <br>spark.worker.ui.retainedExecutors=10 <br>spark.worker.ui.retainedDrivers=10 <br>spark.sql.ui.retainedExecutions=10 默认情况下,spark 会在driver中存储一些spark 程序执行过程中各stage和task的历史信息,当driver内存过小时,可能使driver崩溃,通过上述参数,调节这些历史数据存储的条数,从而减小对内层使用
spark.yarn.maxAppAttempts=4 配置当driver崩溃后,尝试重启的次数
spark.yarn.am.attemptFailuresValidityInterval=1h 假若driver执行一周才崩溃一次,那我们更希望每次都能重启,而上述配置在累计到重启4次后,driver就再也不会被重启,该配置则用于重置maxAppAttempts的时间间隔
spark.yarn.max.executor.failures=20 executor执行也可能失败,失败后集群会自动分配新的executor, 该配置用于配置允许executor失败的次数,超过次数后程序会报(reason: Max number of executor failures (400) reached),并退出
spark.yarn.executor.failuresValidityInterval=1h 指定executor失败重分配次数重置的时间间隔
spark.task.maxFailures=8 允许任务执行失败的次数

未来改进

  • 支持无分区,或非日期分区表。目前只支持日期分区表

  • 多数据类型支持,目前为了程序的稳定性,会将Mysql中的字段全部以String类型存储至Hudi

看完上述内容,你们掌握Apache Hudi使用是怎么样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/112499.html

(0)

相关推荐

  • Centos下安装mysql命令怎么写

    技术Centos下安装mysql命令怎么写这篇文章主要为大家展示了“Centos下安装mysql命令怎么写”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Centos下安装my

    攻略 2021年11月15日
  • 如何实现web微前端沙箱

    技术如何实现web微前端沙箱这篇文章主要讲解了“如何实现web微前端沙箱”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何实现web微前端沙箱”吧!背景应用沙箱可能是微前

    攻略 2021年11月15日
  • 英语阅读理解练习,英语阅读理解改怎么练习

    技术英语阅读理解练习,英语阅读理解改怎么练习重视句子理解就没有问题英语阅读理解练习,但是要注意这里绝对不是咬文嚼字扣每句话具体什么意思,应该是注重句子间的联系,及文章的主题思想,但凡阅读一定会有你难以理解的单词或句子,一

    生活 2021年10月21日
  • pl什么意思,JM们,我到底PL了没啊?

    技术pl什么意思,JM们,我到底PL了没啊?我10月份做的造影,当时是备孕十一个月都没有好孕,去医院做了所有检查,除了造影,以前听说造影痛苦一直不敢做,医生也不建议我做,说是备孕时间短,后来等所有检查结果出来一切都没问题

    生活 2021年10月27日
  • 如何使用QGIS绘制铁路轨迹geojson

    技术如何使用QGIS绘制铁路轨迹geojson这期内容当中小编将会给大家带来有关如何使用QGIS绘制铁路轨迹geojson,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1. 下载并安装

    攻略 2021年11月10日
  • Go语言中go build命令怎么用

    技术Go语言中go build命令怎么用这篇文章主要介绍了Go语言中go build命令怎么用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。go buil

    攻略 2021年11月21日