导读:本文将探讨数据湖实时数仓相关联的内容。很荣幸请到京东科技实时数仓建设负责人陈伟强老师,他同时也直接承担一些数据产品研制项目的支持,将分享实时数仓在落地和演化过程中的一些问题和解决方案。
实时数仓是一个解决方案,从期望看是离线数仓,甚至数仓本身的超集;湖仓一体的方案还是数仓方案。
我们要讨论的实时数仓是什么?因为实时数仓在很多用户心中的定位不是完全一致的,它跟已经存在了二三十年的数据仓库、离线数据仓库是没有可比性的。
首先通过了解资料,可以很明确地知道实时数仓能做什么、不能做什么、怎么做的。
其次实时数仓包含一个服务层,里面有很多组件,但是它对于体系内其他岗位是不一样。因为实时数仓和下游数据产品之间的关联比离线数仓要紧密一些。服务层和数据产品,比如像实时库、产品库,很多时候是一体的。
所以实时数仓的建设受用户的影响会更大,比如用户对于实时数仓的定位和认知、对实时数仓部门的评价等等,后文中还会对这一点展开说明。
这是京东科技之前使用比较久的一个实时数仓方案,整体上是 Lambda 方案。建设过程中,是以给离线数仓打补丁的方式来推进的。
数据链路分为两条,从底向上看,离线数据是从数据库批量抽取的,实时侧增量的数据主要是依靠 binlog 和前后端日志。
数据层主要是数据存储,一部分是离线数据和实时数据互相独立,有明细的,有汇总的。另一部分是离线和实时数据混合的。这两种方案的差别主要在查询服务和数据目录上。
对于流批完全独立的数据,绝大多数都是通过在数据目录中登记元数据信息,在查询服务里边通过时间轴拼接使用。比如我们会监听离线计算任务的通知,根据离线,拼接上一天或者两天的实时数据,这样上层应用就可以直接访问这个视图。汇总数据中类似于立方体的数据也是这样使用。不过涉及到人数这种不可加值的数据,我们现在的处理还不是很精细,还是用明细数据来计算的。
对于流批结合这部分数据,即紫色的部分,现在主要是通过离线数据的初始化+实时更新的逻辑来实现,这部分数据其实更适合以 binlog 作为数据源的数据,比如应用数据、一些业务系统、汇总数据等,主要是做数据快照、超大的时间跨度的累计计算等等。
这个方案里面,数据表和查询服务还有数据目录是紧密绑定的,将一张表直接注册到查询服务里,离线的任务、调度的任务、Flink 的任务,甚至接口的查询语句都是实时配置好的,此外,Flink 任务也能够最终靠配置化生成。复杂一点的逻辑就是手动开发,再加上手动注册。实时库的话,用 CK 、Redis 比较多,MySQL 用得比较少一些。查询服务包括一些 SQL 化的工作,比如 KV 的查询,会用 calcite 改成 SQL 的 join,但不是特别多。
在这个设计里边最重要的事情就是整个服务层都是实时数仓负责的,用户不太能看到底层逻辑,比如实时服务这个接口,对外伪装成了 CK 的 JDBC,用户直接调用JDBC 服务,但是底层用 spring 拦截了,再去做一些其他的事情,比如视图改造等。其他的还有一些 CK 不具备的功能,例如管理、缓存等一体化的服务也是在这层做的。
优点是,在实时数仓初始建设的时候,离线数仓和实时数仓绝大多数都是完全独立的,能够最终靠迭代的方式构建起来,基本上不用引入新的技术栈。比如 CK 当做一个独立的实时库,把服务层都包含在实时数仓内了,它的改造、故障处理链路都比较短。
缺点主要有两方面,首先就是继承了 Lambda 架构的问题。第二就是最开始提到的关于实时数仓定位模糊的问题,因为服务层都被圈在里面了。
指标部分,整个实时数仓有两种方案,一种是推的方案,一种是业务方自己来拉取的方案。
在出口端:有数据库事务的问题、旁路系统自身的问题、黑白名单的问题、依赖数据产品等。
在汇总层:有维度渐变处理复杂、结果存储问题、离线结果差异、持仓等快照问题复杂等。
在明细层:有关联、打宽高成本、归档与否两难、业务集成复杂度高、定制比例太高等。
其他方面:有自身元数据治理的债、和离线元数据脱离、预发环境构造难、安全及沙盒问题等。
首先是用户对数据及问题处理的时效性的双重要求。在调用实时指标的时候,用户总是期待数据是最新的,因为知道了用的是 Redis,那数据时效、查询响应时效肯定应该是一样的。并且很多用户会同时要求两条,第一个就是日志里面每一条数据变化都要反映出来,第二条就是数据库如果有事务问题,要帮我处理好。这两条同时做到并不容易。
其次用户对产品的惯性思维,对实时数仓的错误定位产生了错误的预期。有时候我们为了照顾功能性,可能还要做一些 Redis 内的立方体等,很多人在做离线实时数仓的角度也都碰见过。业务方对数仓的了解不完全,会习惯性地跟之前的产品做对比。当采用了这个方案之后,很多部分是用组件结合起来的,遇上问题后,业务方可能会基于自己的了解去质疑问题的难度在哪里。汇总层里面第一条提到维度渐变问题,这样的一个问题在离线数仓有一整套的理论来实现,比如 SCD1 型、SCD2 型,我们在实时计算的时候,也做了一些标准的逻辑,比如代码中针对增删改、可加制、不可加制的处理都做了不同的标准方案。但是用户经常会疑惑为什么业务数据稍微变更,指标数据就会出错。而数据同学难以解释清楚。我们已限制了数据库不允许删除了,也不能限制用户不能直接用枚举,必须要使用外键,即便是这条限制了,像雪花维度这种更复杂的情况,也没有很好的方法确定规则,并且这种维度的问题本来也是属于仓端应该解决的。所以这种实时指标问题的原因,就是它缺乏持久化的能力。因此使用户对实时数仓的定位是错误的,错误的定位自然就会产生错误的预期,也就导致了大家对实时数仓的负面评价,这并不是一个容易解决的问题。
前面讨论的是数据推送方遇到的问题,那是否把持久化问题解决了就可以了呢?事实上这个情况可能更复杂。前几年大家一直都在疯狂的上 CK、上 Doris,我们也在做类似于 ISM 结构、物化视图等等,这些特性和 Hive 的差异很大,它对很多场景都是有帮助的,我们也使用 CK 做了很久的服务层,但是总体来讲,将其当作一个数仓来用,并不太够格。
包括实时库的问题、中台职责边界的问题、链路过长的问题等等,说到底是用户预期的问题。
首先这里边有 CK 自己的问题,也有因为使用 CK 引入的其它问题。CK 和数据天然的亲近会模糊中台职责边界,又因为数据时效问题,产品侧的需求和用户评价也会对实时数仓有影响。
展开来说,比如用 CK 作为服务层最大的问题是实时数仓和数据产品的边界问题被有意识地模糊了,它作为一个数据库,天然和数据产品是很亲近的,但是要解决数据时效的问题,还有早期解决使用门槛的问题,因为它和 MySQL 的差异实在太大了,所以只能把它圈到实时数仓的范围里来。这样就需要去承接一些产品侧的需求,接受产品侧的评价,在这种情况下实时库的方案做得如何,就完全取决于设计者的预期。并且数据产品还有它的最终用户,用户一旦遇到了问题,响应慢、数据不准等等问题就会通过产品经理直接传导到实时数仓。
举个例子,CK 为了能够更好的保证数据时效问题,肯定要接入实时数据源,那么这个实时数据模型的开发、质量的监控,全部都要从开始端负责,因为中间没产品去缓冲。还有在写入的时候,用 Flink 或者用别的去写 CK,肯定是要调整的,CK 对于写,无论是并发还是写入效率,写放大都不是很好。如人群包这种位图运算,只做 sharding key 的调整还不够,还要调整分片。还有其它一些问题,整个集群和应用之间形成了紧绑定的关系,SQL 逻辑也都会绑定。CK 本来在扩容的时候就很难,这样问题就会被严重放大,CK 就变成了一个专用的数据库,因此后期我们基本上会严格限制共享集群接到其它生产环境下。
总结一下旧方案的问题。主要是两个方面,第一个就是 Lambda 方案的问题;第二是评价标准不由设计者来决定。
从技术层面来看,在流批结合上的投入成本占比太高。前文中提到,数据层的设计和查询服务的设计都是未解决流批数据的融合,在这种情况下产品的架构就很复杂,有时候应对数据产品需求会力不从心。CK 仅仅只有数据的功能肯定是不行的,我们对于 CK 服务的包装越来越复杂,一个大的数据产品甚至把 seatunnel,还有产品内部的一些中台的组件等等都囊括在内,方案越做越复杂。
在用户使用 CK 过程中,因为之前很多人都是有 Hive 经验的,大家在用 Hive 的时候都是自己出库,自己设计使用逻辑,也会在使用的过程中产生很多非必要的问题。
业务方自己开发服务层导致业务流失。在服务层的问题被别的业务方的技术同学了解到之后,他们就自己直接去做了一个服务层,自己构建了一个业务侧的小型实时数仓。因为针对一个特殊的场景,做一个优秀点的技术指标,这本身不是问题。后来我们做实时数仓的时候就遇到了其它一些问题,因此与业务方约定,实时数仓给出的数据不要用来结算。
数据时效准确性与客户期望还有一定的差距。对于离线和实时的不同源的数据,业务方期望保证完全一致,而目前只能做到 1/1000 至 5/1000,已经很难,与客户期望还有一定距离。
由于上述诸多问题,再加上业界正积极推动实时数仓,所以我们也在技术中台的帮助下去进行了数据湖的改造。
首先,数据层逻辑变了。原来流批混合的那部分数据,现在是用数据湖替代的,数据源换成了 binlog。针对数据同源的问题,现在有了平台层方案的图文图的支持,之前的一些查询服务,比如基于 CK 或 Redis 的各种包装服务,现在慢慢向平台层统一查询服务和统一数据目录迁移。
第二是实时库分层,仓端和产品端适当区分,仓端减少定制。实时库这边自从它的时效相对于 Hive 做了一个显著的提升之后,一部分用户慢慢的开始转为使用统一查询接口。原来使用实时库既承接数仓职能,又承接数据产品后端职能,以后就逐渐分开了。现在能适当地做一些相对公共方的业务逻辑,还可以放在 CK 等一些其他数据库中。
最开始是把仓端的集市层先往数据库上填,然后用定时的出库操作,把数据写到数据产品的产品库中。刚开始的想法最简单,就希望通过这个方案重新划定一下数据中台和数据产品的界限,以前实时库就是一个技术方案,它分饰两个角色,如果业务方可接受 10 分钟级别的数据时效的话,这个方案是比较有效的。
有效的点包括:首先,因为初始改造用的是数据仓库的一个公共层和湖表的 upsert 来构建集市层,这样的一个过程是相当可控的,而且需要改造的地方也只有两个,第一个就是把原先离线侧的集市层直接换成一个湖表,这个针对于某个特殊的业务方做起来其实非常容易。第二是对湖表接上实时的数据,因为数据湖现在对于事务天然支持,之前一次性的,还有多次循环校正等逻辑,现在都可以简化掉了。现在我们在第一版方案中与调度相关的内容基本上就仅限于数据出库了。
这个方案我们一开始在做预览的时候做得很快,一个星期做了个预览版,但是又大概花了一个多月才让任务稳定运行起来。
这个实际上的意思就是我们在一开始数据入湖的时候遇到的一些问题,首先就是对于数据湖这个技术组件的使用,一开始我们在 merge on read 和 copy on write 之间反复横跳,然后调索引,调产品性能,又折腾了一个版本,之后因为 10 分钟的时效问题,对业务方来说还是挺重要的一个系统,我们从始至终在调 MOR 的一些参数,把中台元数据的服务这个 TP 99 打爆了一次,因为 MOR 写出来的文件都要跟中台源数据有接口产生服务,半夜把人家的这个报警电话打爆了。
因为业务方还要数据有多个版本,说白了他希望近似有一个归档,或是说查询历史这样一个功能,但是我们走的是数据推送的路子,所以相当于把整个数据湖的 snapshot 隔离有时间轴回溯,大多数都废掉了。这个是我们给第一家业务方做的一个方案。
第二次迭代,后退了一步,让实时库和产品库再合一,相当于还是咱们提供实时库给业务方,但让集市层和实时库紧贴在一起,比如让实时库直接去访问集市层,这样就等于业务方只一定要通过 JDBC 去访问我们的实时库,省去了集市层和实时库之间的数据推送过程。这样时效的问题也非常容易解决,两个离线计算的步骤和两个实时计算的步骤被缩短成了一个步骤。
最早时候,我们的 Presto 不支持库表查询,HDFS 和 Hive 的 Metastore 有强管控,CK 访问不了,所以我们就把集市层搬到了 OSS 上。因为集市层不在 Hive 中,因此花了很多时间来解决这一个问题。Flink 写 OSS 因为版本的问题又调整了很久,我们内部又有三套 OSS 系统,还要处理 CK 和 Hudi 的识别问题。目前来看 23.8 还可以,像单表的分区,还有分布式表也都能做到。但是目前 CK 还不支持Hudi replace commit,所以 clustering 这个属性用得不太好,现在采用的是 cow 加上 upsert 的方案。
第三轮迭代继续向前推进。实时库暂时没有改动,回头去解决数据同源以及底层数据一致性的问题。如果我们把贴源层实现数据湖的迁移,那么 Lambda 架构在实时、准实时的需求上就可以解放了,这是目前我们 H1 投入最大的一个工作,整个目标除了把这个方案做成一个标准化方案之外,就是实现规模化的数据库迁移方案,因为贴源层这个数量是很大的。
最后一版中,我们一边与数据产品紧密结合,去做产品相关的方案迭代,另一方面技术中台的改造基本也都到位,比如管道、表管理工具、数据治理工具、产品平台这些基本能力都具备。最简化的一个方案里仓端只有公共层、集市层,数据产品直接通过 JDBC 来查询。目前还没有在生产环境中使用。
首先从技术上看,湖仓一体的方案,对于解决 Lambda 方案复杂度的问题帮助巨大,产研交付周期降低了很多。在引入数据湖之前,一个数据产品或一次功能迭代,只是跟实时数仓相关的(不包含别的的研发)、设计实时数仓的这部分的工作,设计的具体方案就占到 15% 左右。因我们需要在数据失效、查询、响应时间,还有租赁成本等方面跟业务方达成一致。现在方案简化了,我们与业务方的沟通也更顺畅,方案可以非常快速地定下来。
交付周期降低后,我们也有更多的时间去还以前的技术债,去改善公共层的设计。
从业务上看,很多准实时的需求,可以用数据湖的方案来交付。数据中台的边界相对清晰了很多,实时库和产品库的定位很清楚,沟通成本也随之降低,相应的职责划分事故响应协调冗余度也就比较大了。
总体上,证实了流批一致、流批结合、流批一体三步走的演化策略是比较有效的。最初两边完全游离,数据也不一样,这时就必须要求流和批的计算结果是一致的。而流批结合,比如实时任务可以复用离线的一些数据和结果,能够最终靠定时数据修复等,提高数据的准确性。进入流批一体模式,引入了数据湖方案。
第一个是时效性问题,与数据湖和 Flink 自身相关,目前我们的技术中台主推 Hudi 的方案,在跟业务方交付的时候,以十分钟作为一个周期其实是比较难的,而且这个 10 分钟还不是基于原生的 Hudi 方案去做的,是 Hudi 的各种 payload 已经做了优化,比如局部更新、微批聚合、历史累积等场景都已经做过优化,还有大表的关联、多级的聚合等等,技术中台还在持续优化中。
第二个问题是公共层的改造难度非常大。多层模型的优化、账目类数据快照,可靠的改造实践,我们现在还在摸索中。集市层面,原来的实时计算工具,也有一部分转到了 Hudi 的自定义开发上来,用于优化和灵活的交付。
第三个问题与贴源层的改造相关,可能日志的数据改造进度是最快的,但是 MySQL 的数据还没有大规模推进 CDC 这个方案,还有过程中 Flink、Hudi 各类的编码调参也不可免,以及一些技术中台正在解决的內目录的优化、状态存储的优化等等。
第二类是一些非技术的问题,数据湖升级的代价是非常大的。截至目前,大部分跟数据湖相关的工作不能离开程序员,程序员边啃数据湖的代码边工作。
第三个是迭代成本。我们既要做建设,又要做交付,所以要有一些成文的标准方案。目前为止,我们内部的标准方案,无论标准化程度有多高,大多数都要经历多次迭代。一开始,我们大家都认为引入数据湖后,通过 SQL、统一的数据目录等可以大幅度减少各种定制化的方案。但随着交付的需求慢慢的变多,开始有点回退到老版本时面向交付去设计的具体方案的状态,需要仔细考虑的因素太多,很多方案只是针对一两类的特殊场景。
那么我们对数据湖的期望值在哪里?我认为要把用户对实时数仓的期望值和实时数仓的边界放在一起考虑。
首先就是实时数仓本身,流批一体仍需要持续优化。像现在 Paimon 为代表的底层方案,还有把存储数据目录优化调度放在一起的一体化的智能服务等等。目前直接往湖里写,我们的中台工具管道,元数据建表写入和查询,这整个其实就是技术类的中台。如果对湖的期望比较多,或者因为交付节奏的问题,就需要一个高层次的服务中台。如果数据湖有一个一体化的方案,那么优化一定也是很明显的,否则离线数仓的整体迁移是十分艰难的。
功能上,我们大家都希望实现一个小的功能——未提交读,因为现在最重要的一点还是数据时效性不够高,并且不支持脏读,这个对我们的一些场景来说不是很严重,但是若能够允许脏读来提升数据时效的话,也是值得考虑的。比如现在 CK 把数据再写入到最终的磁盘存储之后,在内存里面那份数据也能够允许对外读,这个功能也是很重要的,Hudi 中间的缓存有没有可能对外直接放出来,同样是需要思考的一个问题。
另外一点是数据湖和实时库一体化,前文中提到,湖仓一体给老版本的方案带来的提升很明显。但是实时库的场景只是被压缩了,一些准实时类的场景不需要实时库,并不意味着用户不需要实时库了。现在很多业务方提出要求,想要一个超级大的MySQL,把湖仓实时库的所有优点结合在一起,并且把缺点全都剔除掉,因此希望之后有更好的设计。
最后是数仓的公共层改造,刚才在介绍整个数仓的迭代节奏时,其实没有把公共层的改造放进去。这里面有个问题,我们该把公共层的一部分业务逻辑放在哪?我们都知道离线数仓很重要的特性是稳定性,而数仓稳定性靠的就是 DWD、DWS 这些结构代码来实现的,这些体现是最直接的。引入数据湖之后,数仓除了要保留稳定性之外,还应该要考虑快速演进的问题。
毕竟在 Iceberg、Hudi 这样一些表格型的数据库的组件出现之前,真正数据库的概念里面数据的快速迭代其实是很重要的。就以埋点数据为例,常见的 json 的扩展字段,可以用视图、物化视图,也就是用多层表多层构建的方案来做,也可优先考虑在整个数据湖的服务内部加一个前后置的处理层,类似于之前关系库里面触发器的概念。这个我们之前在 CK 里面也做过。在 CK 中,如果把物化视图当做触发器来做,其实也可以类似于实现,但是对于一些账目类的数据不是很好做,账目类数据比如一些较为稳定的台账类型,采用仓端思路去做逐层的构建没问题,但是对于活动、优惠券等等,就不能非常依赖于固定的逻辑,所以就需要快速的开发迭代。我认为就是要用一个类似于以前触发器的逻辑,那这个方案其实在 Paimon 中效率会更高一些,因为它是一个近似有序的结构,但是仍可能不足以解决,所以后面会考虑在新的计算工具里面引入一个这样的功能层。
最后做一下总结,我们通过引入湖仓一体的技术,让实时数仓方案整体上了一个台阶。最直接的结果就是减少了实时数仓的状态和用户的期望之间的差距,当然一部分是通过调低用户的预期,提高他们对于实时数仓的认识,或者通过给他们提供一个可见的准时的方案来实现。但是从用户的角度来看,仍有很大的进步空间。如果我们从数据产品,而不是从大数据架构来看,怎么让数据和技术能快速地迭代,还有非常长的路要走。