《物化列:字节为解决 Spark 嵌套列查询性能低下的优化》。在字节超过 98% 的 ETL 作业是用 Spark SQL 进行的。Parquet 是数据仓库的默认文件格式,Parquet 向量化读取默认也是启用的通过 spark.sql.parquet.enableVectorizedReader 参数启用。

Spark 是如何读取 Parquet 文件的

首先我们来回滚一下 Parquet 文件的格式。下面是从 Parquet 官方网站拷贝过来的 Parquet 文件格式。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

从上图可以看出 Parquet 格式的文件包含多个 Row Group 和一个 Footer。Parquet 文件里面包含了许多有用的信息,比如统计信息(Statistics),Spark 和 Parquet 可以使用 Footer 和 Row Group 里面的统计信息来进行过滤下推。比如某个 Row Group 的 id 列最大值为 10,当我们查询 id > 20 的时候,就可以利用统计信息过滤掉这个 Row Group。

由于 Parquet 格式的特点,所以在 Parquet 中做列裁减(column pruning)是非常容易实现的;同时 Parquet 里面对数据的压缩也是非常容易做的。
接下来我们来看下 Spark SQL 中读取 Parquet 文件的过程。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

整个过程的入口点是 DataSourceScanExecution,然后是 ParquetFileFormat,最后是 VectorizedParquetReactorReader。VectorizedParquetReactorReader 是读取 Parquet 文件非常重要的类,VectorizedParquetReactorReader 可以通过把算子下推转换成 ParquetFilters 来过滤掉一些无用的 Row Group。

同时,这个类为每个目标列构建 column Readers,而这些 column Readers 是一起以批量的形式读取数据的。比如我们想读取三列的数据,VectorizedParquetReactorReader 会为我们构造三个 column Readers。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

假设我们的查询 SQL 为 select * fromtable_name where date = '***' and category = 'test'。在这个例子中,date 这列是分区列,而 category 是 predicate column。

上图中表格里面假设是 Parquet 文件里面每个 Row Group 分布情况。由于 RowGroup1 中 category 的最小值为 a1,最大值为 z1,所以在执行上面的 SQL 查询时需要把这个 Row Group 的数据读出来,因为 category = 'test' 包含在其中。同理,RowGroup2、RowGroup3 也需要读出来。所以在这种情况下 Spark 需要把 Parquet 文件中的三个 Row Group 都读出来,因为这三个 Row Group 的统计信息是过滤不了数据的。

在一些生产环境下,Parquet filter pushdown 不能很好的工作,因为 Parquet 中的 predicate columns 是乱序的。所以如果对一些比较常用的过滤列进行排序是非常有用的,以此来减少数据读取的 IO。

下面我们来看下另外一个例子。假设我们的查询 SQL 为 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

在这个例子中,RowGroup1 是被过滤下推过滤了;col3 是被列裁剪过滤了。所以在向量化读取的时候 col1 和 col2 是被读取的。在很多情况下,如果 col2=‘test’ 过滤的数据非常高,那么 col1 的大部分数据都是不必要读取的。所以在查询时首先通过 filter column(col2=‘test’)来读取和过滤数据,然后再读取其他列是非常值得的做法。

字节跳动是优化 Parquet Filter Pushdown 和 Parquet Reader 的

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

首先字节的同学发现 Parquet 文件中 RowGroup 的统计信息是不能简单过滤数据的,所以他们希望提高 Parquet 统计信息的准确性。对于用户而言,他们的目标也是很明确的:低负载;所以过滤整个数据是非常昂贵的,甚至是不可能的。所以只能对某些列进行排序,但是新增的排序不能要求用户对现有的 ETL 做任何修改。

基于上面的原因,字节开发出 LocalSort 功能,也就是在 InsertIntoHiveTable 节点之前添加一个 SortExec 节点。通过这个特性,Spark 可以对 Parquet 中的某些列进行排序,以此来提高 Parquet 文件中统计信息的准确性。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

上图是添加了 LocalSort 功能之后 Spark 的执行计划对比,可以看到,InsertIntoHiveTable 之前添加了 SortExec 节点。

那么问题来了,哪些列应该排序?字节的策略是分析历史的查询,选择那些在过滤条件中经常使用的列进行排序。同时也可以通过在表的属性里配置排序列,Spark SQL 会自动读取这些属性。所有的排序过程都是自动的,不需要用户做任何改变,从而满足用户的需求。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

有了 LocalSort 之后,RowGroup 的信息就是准确的。比如上面 RowGroup1 中 category 最小值为 a1;最大值为 g1。根据上面的准确统计信息,在执行 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’ 查询时,只需要读取 RowGroup2。这就使得我们可以读取更少的数据,由于相同的数据存放在一起,所以进一步导致 Parquet 文件的存储大小也变小了。而这些好处仅仅需要付出 5% 的代价,所以还是很值得的。

上面就是 Parquet filter pushdown 的优化细节。下面我们来看下 Parquet Reader 相关的优化。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

我们发现 Spark 读取了很多无用的数据,所以想尽可能的读取上一点的数据。为了解决这个问题,字节的同学开发了名为 Prewhere 的功能,这个思想来自 ClickHouse。在 Prewhere 中,首先批量读取过滤的列,如果过滤的列没有匹配,那么其他列直接跳过。那么字节他们是如何实现的呢?

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

字节团队他们把 Parquet reader 拆分成两个 Reader:FilterReader 和 NonFilterReader。FilterReader 用于过滤列;而 NonFilterReader 用于其他列的读取。

整个过程如下:首先使用 FilterReader 来批量读取数据,然后对读取的数据使用过滤条件,如果这些数据没有符合过滤条件,则继续使用 FilterReader 读取数据。如果有数据符合过滤条件,则使用 NonFilterReader 批量的读取需要的数据同时跳过那些无用的数据,最后读取出来的数据会和 FilterReader 读取的数据进行 union 操作。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

经过这个操作,一些场景下可以直接跳过 Parquet page,甚至是直接跳过读取 Parquet RowGroup。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

在字节,生产环境下支持的过滤列类型有 ByteType、ShortType、IntegerType、 LongType、FloatType、 DoubleType 以及 StringType。支持的类型有 > 、>=、<、<=、=、in、isnull 以及 isnotnull。通过这个特点 Parquet 文件的读取性能在很多场景下提升了 13%。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的?】(https://www.iteblog.com/archives/9912.html)
喜欢 (4)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 为啥不考虑用orc 格式呢 有什么说法吗 大神

    SuperMe2020-12-22 10:29 回复
  2. 应该是 Parquet 在 spark 里面的优话比较多吧。orc 应该也可以做类似的优化。

    w3970907702020-12-25 13:22 回复