37款手游基于Flink CDC + Hudi Lake仓库集成解决方案实践
Flink CDC基础知识介绍
Hudi基础知识介绍
37个手游业务痛点及技术方案选择
37 综合手游护仓介绍
Flink CDC + Hudi实践
总结
一、Flink-CDC 2.0
Flink CDC Connectors 是Apache Flink 的源端连接器。目前2.0版本支持从MySQL和Postgres数据源获取数据。 2.1版本社区肯定会支持Oracle和MongoDB数据源。
Fink CDC 2.0的核心特性主要实现了以下三个非常重要的功能:
整个过程没有锁,不会存在需要锁数据库带来的风险;多并行,全量数据的读取阶段支持水平扩展,亿级大表可以通过增加并行度来加速;断点续传传输支持全阶段检查点。即使任务由于某种原因退出,也可以通过保存的检查点恢复任务,实现断点续传数据。 Flink CDC 2.0 详细讲解核心改进
二、Hudi
Apache Hudi 目前被业界描述为围绕数据库核心构建的流数据湖平台。
由于Hudi具有良好的Upsert能力,且0.10 Master支持Flink版本最高至1.13.x,因此我们选择使用Flink + Hudi为37款手游的业务场景提供分钟级的Upsert数据分析和查询能力。
三、37 手游的业务痛点和技术方案选型
1.老架构和业务痛点
1.1 数据的实时性不够
日志数据使用sqoop每隔30分钟将前60分钟的数据同步到Hive;数据库数据使用sqoop每隔60分钟将全天的数据同步到Hive;数据库数据每天使用sqoop同步前60天的数据到Hive。 1.2 业务代码逻辑复杂,难以维护
目前37手游的很多业务开发都遵循MySQL+PHP的开发模式。代码逻辑复杂,难以维护。对于相同的代码逻辑,流处理往往需要开发一份代码,批处理则需要开发另一份代码。不能重复使用。 1.3 经常刷新历史数据
经常刷新历史数据,保证数据一致性。 1.4 Schema 频繁变更
由于业务需要,经常需要添加表字段。 1.5 Hive版本低
当前使用的Hive版本为1.x版本,版本升级困难;不支持更新插入;不支持行级删除。由于37手游的业务场景,数据的插入和删除是很常见的需求。因此,基于Hive数据仓库的架构不足以满足业务需求。
2、技术选型
选择同步工具时考虑了Canal 和Maxwell。但Canal只适合增量数据同步,需要部署,维护相对繁重。虽然Maxwell像Canal一样相对轻量级,但它需要与Kafka等消息队列一起使用。相比之下,Flink CDC 可以通过配置Flink Connector 基于Flink-SQL 来使用。它非常轻量级,完美契合基于Flink-SQL的流批一体化架构。
在存储引擎选择方面,目前最流行的数据湖产品有:Apache Hudi、Apache Iceberg 和DeltaLake,在我们的场景中,它们各有优缺点。最终选择Hudi作为湖仓,基于Hudi对上下游生态的开放性、支持全局索引、支持Flink 1.13版本、兼容Hive版本(Iceberg不支持Hive1.x版本)。集成的流批处理集成存储引擎。
针对上述业务痛点和选型比较,我们最终的方案是:使用Flink1.13.2作为计算引擎,依托Flink提供的统一的流批API,基于Flink-SQL实现流批一体化,而Flink-CDC 2.0作为ODS层的数据同步工具和Hudi-0.10 Master作为存储引擎的湖和仓库,解决维护两套代码的业务痛点。
四、新架构与湖仓一体
37手游的湖仓一体化解决方案是37手游流批一体化架构的一部分。通过湖与仓的融合,流与批的融合,可以实现准实时场景下的同数据源、同计算引擎、同存储、同计算口径。数据的时效性可以达到分钟级,能够很好地满足业务准实时数据仓库的需求。下面是架构图:
MySQL数据通过Flink CDC进入Kafka。之所以先将数据录入Kafka而不是直接录入Hudi,是为了让多个实时任务能够复用来自MySQL的数据,避免多个任务通过Flink CDC连接MySQL表和Binlog,影响性能MySQL 库的。
通过CDC进入Kafka的数据除了落入离线数仓的ODS层之外,还会沿着实时数仓的链路,从ODS-DWD-DWS-OLAP数据库,最终被使用通过报告等数据服务。实时数仓各层结果数据将准实时落入离线数仓。这样程序一次开发即可,指标口径统一,数据统一。
从架构图中可以看到有一个数据修正的步骤(重新运行历史数据)。这样做的原因是考虑到前一天实时任务的计算结果可能存在口径调整或者错误,导致重新运行历史。数据的情况。
Kafka中存储的数据是有过期时间的,历史数据不会存储太久。长时间重跑的历史数据无法从Kafka中获取。此外,如果再次将大量历史数据推送到Kafka,并通过实时计算链路对历史数据进行修正,可能会影响当天的实时运行。因此,重新运行的历史数据将通过数据校正步骤进行处理。
总体来说,37手游的数据仓库是Lambda和Kappa的混合架构。流批一体化数据仓库的每条数据链路都有数据质量验证流程。第二天对前一天的数据进行核对。如果前一天实时计算的数据没有异常,则无需对数据进行修正。 Kappa 结构就足够了。
五、Flink CDC 2.0 + Kafka + Hudi 0.10 实践
1.环境准备
Flink 1.13.2./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar(修改Master分支的Hudi Flink版本为1.13.2然后构建)./lib/hadoop -mapreduce- client-core-2.7.3.jar (解决Hudi ClassNotFoundException)./lib/flink-sql-connector-mysql-cdc-2.0.0.jar./lib/flink-format-changelog-json- 2.0.0 .jar./lib/flink-sql-connector-kafka_2.11-1.13.2.jarsource MySQL-CDC表定义:
创建表sy_ payment_cdc ( ID BIGINT, PRIMARY KEY(ID) NOT ENFORCED) with( 'connector'='mysql-cdc', 'hostname'='', 'port'='', 'username'='' , '密码'='', '数据库名称'='', '表名称'='', 'connect.timeout'='60s', 'scan.incremental.snapshot.chunk.size'='100000 ', '服务器ID'='5401-5416');值得注意的是,scan.incremental.snapshot.chunk.size参数需要根据实际情况进行配置。如果表数据量不大,使用默认值即可。
Sink端Kafka+Hudi COW表定义:
创建表sy_ payment_cdc2kafka ( ID BIGINT, PRIMARY KEY(ID) NOT ENFORCED) with ( 'connector'='kafka', 'topic'='', 'scan.startup.mode'='latest-offset', ' property.bootstrap.servers'='', 'properties.group.id'='', 'key.format'='', 'key.fields'='', 'format'='changelog-json');创建表sy_ payment2Hudi ( ID BIGINT, PRIMARY KEY(ID) NOT ENFORCED)PARTITIONED BY (YMD)WITH ( 'connector'='Hudi', 'path'='hdfs:///data/Hudi/m37_mpay_tj/sy_ payment' , 'table.type'='COPY_ON_WRITE', 'partition.default_name'='YMD', 'write.insert.drop.duplicates'='true', 'write.bulk_insert.shuffle_by_partition'='false', '写入。 bulk_insert.sort_by_partition'='false','write.precombine.field'='MTIME','write.tasks'='16','write.bucket_assign.tasks'='16','write.task.max。大小'='', 'write.merge.max_memory'='');将历史数据导入Hudi,可以选择离线bulk_insert方式入湖,然后通过Load Index Bootstrap加载数据,然后接收增量数据。使用bulk_insert方法入湖的数据的唯一性取决于源数据本身,同时还需要保证接收增量数据时数据不丢失。
这里我们选择一种更简单的方式来调整任务资源,将历史数据放入湖中。依靠Flink的检查点机制,无论是CDC 2.0导入Kafka时,还是Kafka导入Hudi时,都可以通过指定检查点来重新启动任务,而不会丢失数据。
我们可以在配置CDC 2.0进入Kafka和Kafka进入Hudi任务时增加内存并配置多个并行度,以加快历史数据入湖速度。历史数据全部入湖后,我们就可以相应减少入湖任务的内存配置,并将CDC进入Kafka的并行度设置为1,因为增量阶段的CDC并行度单一,然后指定检查点重启任务。
根据上表定义的参数配置,配置16度并行度。当Flink TaskManager内存大小为50G时,单表15亿条历史数据录入Hudi COW表实际需要10个小时,单表9亿条数据录入Hudi COW表需要10个小时。实际用时为6小时。当然,这个耗时很大一部分是由于COW写放大的特性,在数据量大的upsert模式下需要更多的时间。
目前,我们的集群由200多台机器组成,在线流计算任务总数超过200个,总数据量接近2PB。
如果集群资源非常有限,可以根据实际情况调整Hudi表和Flink任务的内存配置,也可以配置Hudi的限流参数write.rate.limit,让历史数据慢慢入湖。
在Flink CDC 1.x 之前的版本中,由于全量快照阶段是单并行读取,当时对于亿级以上的表全量快照读取阶段耗时较长,检查点会失败且无法保证数据的断点续传。
因此,进入Hudi时,我们首先启动一个CDC 1.x程序,将从此刻开始的增量数据写入Kafka,然后启动另一个sqoop程序,将当前数据全部拉取到Hive,然后通过Flink读取Hive数据。编写Hudi,最后将Kafka的增量数据消费回Hudi。由于Kafka和Hive的数据相交,因此数据不会丢失,而且Hudi的upsert能力保证了数据的唯一性。
但该方法的链接太长,操作困难。现在CDC 2.0在全快照阶段支持多重并行和检查点能力,这确实大大降低了架构的复杂度。
2. 数据对比
由于生产环境使用Hive1.x,Hudi不支持1.x的数据同步,因此通过创建Hive外部表进行查询。如果是Hive2.x以上,请参考Hive同步章节;创建Hive外部表+预创建分区;将Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar 添加到auxlib 文件夹。创建外部表M37_mpay_tj.`ods_SY_FAYENT_F_D_B_EXT` (`_hoodie_commit_time` String,` _Hoodie_Commit_seqno` String, `_hoodie_record_k_k Ey` String, `_Hoodie_partition_Path` String,`_hoodie_file_name` String, `ID` BIGINT,) 分区方式(` dt`字符串)行格式SERDE'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'存储为输入格式'org.apache.Hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.Hive.ql。 io .parquet.MapredParquetOutputFormat'LOCATION 'hdfs:///data/Hudi/m37_mpay_tj/sy_ payment'最后查询Hudi数据(以Hive外部表的形式)与原来sqoop同步的Hive数据对比得到:
总数一致;按天分组统计的数量一致;按天分组统计的数量是一致的。
六、总结
与传统数据仓库架构相比,湖仓一体化、流批一体化架构主要有以下优势:
Hudi提供Upsert能力,解决频繁Upsert/Delete的痛点;提供分钟级数据,比传统数据仓库更加及时;基于Flink-SQL实现流批一体化,代码维护成本低;数据来自同一来源、同一计算引擎、同一存储、同一计算口径;选择Flink CDC作为数据同步工具,节省sqoop的维护成本。最后,针对频繁添加表字段的痛点,我们希望在同步下游系统时能够自动添加该字段。目前还没有完美的解决方案。我们希望Flink CDC 社区能够在后续版本中提供对Schema Evolution 的支持。
参考
[1] MySQL CDC 文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
[2] Hudi Flink 解答问题:https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#
用户评论
这个游戏更新得太酷了!使用Flink和CDC,感觉整个游戏体验都变得超级流畅。
有12位网友表示赞同!
我真的很喜欢他们采用的Hudi湖仓一体化方案,这让玩家能即时看到游戏中的变化。
有17位网友表示赞同!
从技术角度看,这个游戏背后的Flink CDC + Hudi 实践很让人佩服,提高了数据处理效率。
有17位网友表示赞同!
对于非IT玩家来说可能有些复杂,但这样的技术选择确实增强了游戏背后的数据管理能力。
有11位网友表示赞同!
这款游戏在技术层面上的创新令人印象深刻,Flink和Hudi的应用使游戏内部通信更加无缝。
有7位网友表示赞同!
能够看到团队在后端使用现代化技术,感到相当自豪。这游戏真的很先进!
有15位网友表示赞同!
Flink CDC让实时更新成为可能,玩起来比传统的系统要快多了。
有5位网友表示赞同!
Hudi湖仓一体方案使得数据的管理和同步非常高效,提升了游戏的整体稳定性。
有7位网友表示赞同!
这款游戏的设计考虑了用户体验和底层架构的有效整合,很厉害的一点是采用了Flink和Hudi。
有11位网友表示赞同!
技术上采用这样的先进解方案,在某种程度上也是对玩家来说的一种福利,提高了服务质量。
有17位网友表示赞同!
从开发者角度来说,这种选择展现了游戏行业的前沿动态。对于游戏玩家,则意味着更好的服务体验。
有19位网友表示赞同!
Flink CDC帮助实现了即时反馈的机制,让玩家在游戏中的行为和系统能更快地同步起来。
有19位网友表示赞同!
这样的技术实践对后台数据处理有着显著提升,使得复杂事件流都能很好地管理。
有15位网友表示赞同!
Hudi湖仓一体方案使数据仓库和数据库能协同工作,提高了整体性能和效率。
有17位网友表示赞同!
能够看到Flink和Hudi在一款热门游戏里被应用到细节中,真的非常令人感到兴奋和自豪。
有19位网友表示赞同!
对于那些熟悉技术的人来说,这是一款技术驱动的精品游戏,后台有强大的技术团队支持。
有10位网友表示赞同!
这款游戏不仅好玩,背后的架构和技术选择也值得关注,Flink CDC+Hudi使得游戏运行顺畅无比。
有14位网友表示赞同!
Flink CDC带来的是即时的数据感知和反应能力,在此之上打造的游戏更加吸引用户。
有19位网友表示赞同!
Hudi湖仓一体方案在数据处理上提供了强大支持,为玩家呈现了一个更为稳定和高效的在线体验。
有11位网友表示赞同!
从Flink CDC到Hudi的应用,背后的技术创新让这款游戏充满活力,增强了用户的参与度与满意度。
有9位网友表示赞同!