Spark、Hadoop或者HBase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
当质疑是否需要单独的批处理层时,Kappa 体系结构认为流处理引擎可以作为计算的通用解决方案。 从广义上讲,所有数据计算都可以描述为生产者生产一个数据流,而消费者不断的逐条迭代消费这个流中的记录(比如 Volcano Iterator model)。这个特点使得我们可以在流式处理层通过增加并行性和资源来对有状态的业务数据进行重新处理。这类系统可以依靠有效的检查点(checkpoint) 和大量的状态管理来让流式处理的结果不再只是一个近似值。这个模型被应用于很多的数据摄取任务(ingest pipelines)。尽管如此,虽然批处理层在这个模型中被去掉了,但是在服务层仍然存在两个问题。
如今很多流式处理引擎都支持行级的数据处理,这就要求我们的服务层也需要能够支持行级更新的能力。通常,这类系统并不能对分析类的查询扫描优化到这个地步,除非我们在内存中缓存大量记录(如Memsql)或者有强大的索引支持(如ElasticSearch)。这些系统为了获得数据摄取和扫描的性能往往需要增加成本和牺牲服务的可扩展性。出于这个原因,这类服务系统的数据驻留的能力往往是有限的,从时间上可能30~90天,从总量上来说几个TB的数据就是他们的极限了。对于历史数据的分析又会被重新定向到时延要求不那么高的HDFS上。
数据摄取延迟、扫描性能、计算资源和操作复杂性之间的基本权衡是不可避免的。但是对于能够容忍大约10分钟延迟的工作负载,如果有一种更快速的方式在 HDFS 中摄取和准备数据,则不需要单独的 Speed serving 层。这统一了服务层,并显著降低了系统总体的复杂性和资源使用。
然而,要使HDFS成为统一的服务层,它不仅需要存储变更集的日志(一个日志记录系统),而且还需要支持由有意义的业务度量划分的压缩的、去重复的业务状态。这种类型的统一服务层需要具备以下特点:
被压缩的业务状态变更是无法避免的,即使我们以事件时间(Event time)作为业务分区字段。由于延迟到达的数据以及事件时间和处理时间之间的差异,仍然会导致对许多旧分区进行更新。即使分区字段是处理时间,也可能存在更新的情况,比如出于审计、安全方面的考虑,需要清除数据。
下面让我们来看看 Hudi,这是一个增量数据处理的框架,它解决了我们在前面说的各种问题。简而言之,Hudi (Hadoop Upsert Delete and Incremental)是一种分析和扫描优化的数据存储抽象,可在几分钟之内将变更应用于 HDFS 中的数据集中,并支持多个增量处理系统处理数据。
Hudi 数据集通过自定义的 InputFormat 与当前 Hadoop 生态系统(包括 Apache Hive、Apache Parquet、Presto 和 Apache Spark)集成,使得该框架对最终用户来说是无缝的。
Google 的 DataFlow 模型基于数据管道的延迟和完整性保证来对数据管道进行分类。下图展示了 Uber 的各种用户场景按照这个逻辑进行的划分,以及每种流水通常采用的处理方式:
对于少数真正需要1分钟延迟和具有简单业务度量的展示用例,我们依赖于记录级流处理。对于传统的批处理用例,如机器学习,我们依赖于批处理。对于包含复杂 join 或者重要数据处理的近实时场景,我们基于 Hudi 以及它的增量处理原语来获得两全其美的结果。想要了解更多关于 Hudi 支持的用例,您可以在 Github 上查看相关文档。
Hudi 将数据集组织到 basepath 下的分区目录结构中,类似于传统的 Hive 表。数据集被分成分区,分区是包含该分区的数据文件的目录。每个分区由其相对于 basepath 的 partitionpath 唯一标识。在每个分区中,记录被分布到多个数据文件中。每个数据文件都由唯一的 fileId 和生成该文件的 commit 标识。对于更新,多个数据文件可以共享同一个 fileId,但对应于不同的 commit。
每条记录都由一个记录键(record key)惟一标识,并映射到一个 fileId。一旦记录的第一个版本被写入文件,记录键和 fileId 之间的映射就永久不变。简而言之,fileId 标识一组文件,而这些文件包含所有记录的所有版本数据。
Hudi 的存储引擎由三个不同的部分组成:
Hudi 对 HDFS 的使用模式进行了优化。Compaction 是将数据从写优化格式转换为读优化格式的关键操作。由于 Compaction 操作的基本并行单位是重写单个 fileId,所以 Hudi 确保了所有的数据文件大小和 HDFS 块大小对齐,以平衡压缩并行性、查询扫描并行性和HDFS中的文件总数,Compaction 操作也是可插拔的,可以扩展为合并不频繁更新的老的数据文件已进一步减少文件总数。
Hudi 是一个 Spark 库,以流摄取作业的形式运行,并以小批量的方式摄取数据(通常是一到两分钟)。但是,根据延迟需求和资源协商时间,也可以使用 Apache Oozie 或 Apache Airflow 将摄取的作业作为离线任务运行。
以下是默认配置下 Hudi 的写入路径:
如前所述,Hudi 会尽可能使文件大小与底层块大小保持一致。根据列式压缩的效率和要压缩的分区中的数据量,压缩仍然会创建小型 parquet 文件。因为对分区的插入操作会是以对现有小文件的更新来进行的,所有这些小文件的问题最终会被一次次的迭代不断修正。最终,文件大小会不断增长直到与HDFS块大小一致。
如果一个摄取作业由于间歇性错误而失败,Spark 将重试计算 RDD 并自动解决这个问题。如果失败的次数超过了 Spark 的最大尝试次数,则摄取作业失败,下一次迭代将再次尝试摄取相同的批处理。以下指出两个重要区别:
提交的时间轴支持在 HDFS 中对相同数据进行读取优化的视图和实时视图;这些视图允许客户在数据延迟和查询执行时间之间进行选择。Hudi 为这些视图提供了自定义的 InputFormat,并包含了一个 Hive 注册模块,该模块将这两个视图注册为Hive metastore 表。这两种输入格式都能够识别 fileId 和提交时间,并可以筛选出最新提交的文件。然后,Hudi 会基于这些数据文件生成输入分片供查询使用。InputFormat 详情如下:
这两种 InputFormats
都扩展了MapredParquetInputFormat
和 VectorizedParquetRecordReader
,因此所有针对 parquet 文件的优化依然被保留。通过依赖 hoodie-hadoop-mr 类库,Presto 和 Spark SQL可以对 Hudi 格式 的Hive Metastore 表做到开箱即用。
如前所述,需要在 HDFS 中处理和提供已建模的表,以便 HDFS 成为统一的服务层。构建低延时的数据模型表需要能够链接 HDFS 数据集的增量处理。由于 Hudi 在元数据中维护了每次提交的提交时间以及对应的文件版本,使得我们可以基于起始时间戳和结束时间戳从特定的 Hudi 数据集中提取增量的变更数据集。
这个过程基本上与普通的查询大致相同,只是选取特定时间范围内的文件版本进行读取而不是选最新的,提交时间会最为过滤条件被谓词下推到文件扫描阶段。这个增量结果集也受到文件自动清理的影响,如果某些时间范围内的文件被自动清理掉了,那自然也是不能被访问到了。
这样使得我们可以基于 watermark 做 stream-to-stream joins 和 stream-to-dataset joins 并对存储在 HDFS 中的建模表进行计算和 upsert 操作。
本文翻译自:Hudi: Uber Engineering’s Incremental Processing Framework on Apache Hadoop
本博客文章除特别声明,全部都是原创!
非常好的系列文章,拜读
感谢支持。