实时技术架构演进
我们之前的方案是以Apache Storm引擎进行点对点的数据处理,这种方式在业务需求快速增长的阶段,可以快速的满足实时报表的需求。但是随着业务的不断发展、数据量逐渐增加以及需求逐渐多样化,弊端随之产生。例如灵活性差、数据一致性无法满足、开发效率较低、资源成本增加等。
为解决之前架构出现的问题,我们首先进行了架构升级,将storm引擎替换为Apache Flink,用以实现高吞吐、exactly once的处理语义。同时根据搜索数据的特点,将实时数据进行分层处理,构建出PV流明细层、SKU流明细层和AB实验流明细层,期望基于不同明细层的实时流,构建上层的实时OLAP层。
OLAP层的技术选型,需要满足以下几点:
通过对比目前业界广泛使用的支持实时导入的OLAP引擎,我们在druid、ES、clickhouse和doris之间做了横向比较:
通过对比开源的几款实时OLAP引擎,我们发现doris和clickhouse能够满足我们的需求,但是clickhouse的并发度太低是个潜在的风险,而且clickhouse的数据导入没有事务支持,无法实现exactly once语义,对标准sql的支持也是有限的。
最终,我们选定doris作为聚合层,用于实时OLAP分析。对于流量数据,使用聚合模型建表;对于订单行,我们将聚合模型换成Uniq模型,保证同一个订单最终只会存储一条记录,从而达到订单行精准去重的目的。在flink处理时,我们也将之前的任务拆解,将反复加工的逻辑封装,每一次处理都生成新的topic流,明细层细分了不同粒度的实时流。新方案如下:
目前的技术架构中,flink的任务是非常轻的,state状态非常小,并没有使用KeyedState自定义状态,而OperatorState中只包含kafka的offset信息,这样保证任务的运行开销很小,稳定性大大提升。同时基于生产的数据明细层,我们直接使用了doris来充当聚合层的功能,将原本可以在flink中实现的窗口计算,下沉到doris中完成。利用doris的routine load消费实时数据,虽然数据在导入前是明细粒度,但是基于聚合模型,导入后自动进行异步聚合。而聚合度的高低,完全根据维度的个数与维度的基数决定。通过在base表上建立rollup,在导入时双写或多写并进行预聚合操作,这有点类似于物化视图的功能,可以将数据进行高度的汇总,以提升查询性能。
在明细层采用kafka直接对接到doris,还有一个好处就是这种方式天然的支持数据回溯。数据回溯简单说就是当遇到实时数据的乱序问题时,可以将“迟到”的数据进行重新计算,更新之前的结果。这是因为我们导入的是明细数据,延迟的数据无论何时到达都可以被写入到表中,而查询接口只需要再次进行查询即可获得最新的计算结果。最终方案的数据流图如下:
实时数据处理的技术实现,需要在低延迟、准确性和成本之间进行取舍。我们采用doris作为实时仓库的聚合层,其实也是在多方面进行了取舍。例如:
以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对实时数据大规模、无界、乱序等特点,实时流计算的选型,最终考虑的就是如何取舍。
上文提到我们在doris中建立了不同粒度的聚合模型,包括PV粒度、SKU粒度以及AB实验粒度。我们这里以每日生产数据量最大的曝光AB实验模型为例,阐述在doris中如何支持大促期间每日新增百亿条记录的查询的。
AB实验的效果监控,业务上需要10分钟、30分钟、60分钟以及全天累计等四个时间段,同时需要根据渠道、平台和一二三级品类等维度进行下钻分析,观测的指标则包含曝光PV、UV、曝光SKU件次、点击PV、点击UV等基础指标,以及CTR等衍生指标。
在数据建模阶段,我们将曝光实时数据建立聚合模型,其中K空间包含日期字段、分钟粒度的时间字段、渠道、平台、一二三级品类等,V空间则包含上述的指标列,其中UV和PV进行HLL近似计算,而SKU件次则采用SUM函数,每到来一条记录则加1。由于AB实验数据都是以AB实验位作为过滤条件,因此将实验位字段设置为分桶字段,查询时能够快速定位tablet分片。值得注意的是,HLL的近似度在目前PV和UV的基数下,实际情况误差在0.8%左右,符合预期。
目前doris的集群共30+台BE,存储采用的是支持NVMe协议的SSD硬盘。AB实验曝光topic的分区数为40+,每日新增百亿条数据。在数据导入阶段,我们主要针对导入任务的三个参数进行优化:最大时间间隔、最大数据量以及最大记录数。当这3个指标中任何一个达到设置的阈值时,任务都会触发导入操作。为了更好的了解任务每次触发的条件,达到10分钟消费6亿条记录的压测目标,我们通过间隔采样的方法,每隔3分钟采样一次任务的情况,获取Statistic信息中的receivedBytes、cimmittedTaskNum、loadedRows以及taskExecuteTimeMs数值。通过对上述数值在前后2个时间段的差值计算,确定每个任务触发的条件,并调整参数,以在吞吐和延迟之间进行平衡,最终达到压测的要求。
为了实现快速的多维数据查询,基于base表建立了不同的rollup,同时每个rollup的字段顺序,也要遵循过滤的字段尽可能放到前面的原则,充分利用前缀索引的特性。这里并不是rollup越多越好,因为每个rollup都会有相应的物理存储,每增加一个rollup,在写入时就会增加一份IO。最终我们在此表上建立了2个rollup,在要求的响应时间内尽可能多的满足查询需求。
京东搜索是在今年5月份引入doris的,第一个应用的上线到现在已经运行半年时间。目前集群版本是0.11.33,规模是30+台BE,线上同时运行着10+个routine load任务,每日新增数据条数在200亿+,已经成为京东体量最大的doris用户。从结果看,用doris替换flink的窗口计算,既可以提高开发效率,适应维度的变化,同时也可以降低计算资源,用doris充当实时数据仓库的聚合层,并提供统一的接口服务,保证了数据的一致性和安全性。
我们在使用中也遇到了查询相关的、任务调度相关的bug,也在推动京东OLAP平台升级到0.12版本。接下来待版本升级后,我们计划使用bitmap功能来支持UV等指标的精准去重操作,并将推荐实时业务应用doris实现。除此之外,为了完善实时数仓的分层结构,为更多业务提供数据输入,我们也计划使用适当的flink窗口开发聚合层的实时流,增加数据的丰富度和完整度。
作者:李哲,京东搜推数据开发工程师,曾就职于美团点评,主要从事离线数据开发、流计算开发以及OLAP多维查询引擎的应用开发。
本博客文章除特别声明,全部都是原创!