迁移方案概要设计
Hive SQL迁移到Spark SQL后需满足以下条件:
为了满足以上三个条件, 一个很直观的思路就是使用两个引擎执行用户SQL,然后对比每个引擎的执行结果和资源消耗。
为了不影响用户线上数据,使用两个引擎执行用户SQL有两个可选方案:
下面详细介绍这两个方案:
再部署一套SQL任务执行系统用来使用Spark执行所有的SQL,包括HDFS,HiveServer2&MetaStore和Spark,DataStudio。新部署的系统需要周期性从生产环境同步任务信息,元数据信息和HDFS数据,在这个新部署的系统中把Hive SQL任务改成Spark SQL类型任务,这样一个用户的SQL在原有系统中使用Hive SQL执行,在新部署的系统中使用Spark执行。如下图所示,蓝色的表示需要新部署的子系统。
SQL双跑工具,可以线下使用两个引擎执行用户的SQL,具体流程如下:
经过权衡, 我们决定采用方案二, 因为:
Hive SQL提取包括以下步骤:
HiveHistoryParser的主要功能是:
SQL改写会对上一步生成的每个原始SQL文件执行以下步骤:
SQL双跑步骤如下:
结果对比时会遍历每个回放记录,统计以下指标:
具体流程如下:
分别对Spark和Hive的产出表执行以下SQL,获取表的概要信息
select sum(hash(c1)), sum(hash(c2)), ... sum(hash(cn)) from T;
比较两张表的概要信息:
汇总数据结果,并对回放的SQL分为以下几类:
迁移比较简单, 步骤如下:
如果SQL是“可迁移”或者“经验可迁移”,可以执行迁移,其它的任务需要排查,这部分是最耗时耗力的,迁移过程中大部分时间都是在调查和修复这些问题。修复之后再执行从头开始,提取最新任务的SQL,然后SQL改写和双跑,结果对比,满足迁移条件则说明修复了问题,可以迁移,否则继续排查,因此迁移过程是一个循环往复的过程,直到SQL满足迁移条件,整体过程如下图所示:
在迁移的过程中我们发现了很多两种引擎不同的地方,主要包括语法差异,UDF差异,功能差异和性能差异。
有些Hive SQL使用Spark SQL执行在语法分析阶段就会出错,有些语法差异我们在内部版本已经修复,目前正在反馈社区,正在和社区讨论,还有一些目前没有修复。
1.1 用例设计
1.2 未修复
在排查数据不一致的SQL过程中,我们发现有些是因为输入数据的顺序不同造成的, 这些差异逻辑上是正确的,而有些是UDF对异常值的处理方式不一致造成的,还有需要注意的是UDF执行环境不同造成的结果差异。
这些因为输入数据的顺序不同造成的结果差异逻辑上是一致的,对业务无影响,因此在迁移过程中可以忽略这些差异,这类差异的SQL任务属于经验可迁移。
2.1.1 collect_set
假设数据表如下:
执行如下SQL:
select c1, collect_set(c2) c2_set from T group by c1;
执行结果:
差异说明:
collect_set执行结果的顺序取决于记录被扫描的顺序,Spark SQL执行过程中是多个任务并发执行的,因此记录被读取的顺序是无法保证的.
2.1.2 collect_list
假设数据表如下:
执行如下SQL:
select c1, collect_list(c2) c2_list from T group by c1;
执行结果:
差异说明:
collect_list执行结果的顺序取决于记录被扫描的顺序,Spark SQL执行过程中是多个任务并发执行的,因此记录被读取的顺序是无法保证的。
2.1.3 row_number
假设数据表如下:
执行如下SQL:
select row_number() over (partition by c1 order by c1) row_id, c1, c2 from T;
执行结果:
差异说明:
执行row_number时,在一个分区内部,可以保证order by字段是有序的,对于非分区非order by字段的顺序是没有保证的。
2.1.4 map类型字段读写
数据表建表语句:
create table T (c1 map<string, string>);
假设数据表如下:
执行如下SQL:
select * from T;
执行结果:
差异说明:
Map类型是无序的,同一份数据,在query时显示的各个key的顺序会有变化。
2.1.5 sum(double/float)
假设数据表如下:
执行如下SQL:
select sum(d) from T;
执行结果:
差异说明:
这是由float/double类型的表示方式决定的,浮点数不能表示所有的实数,在执行运算过程中会有精度丢失,对于几个浮点数,执行加法时的顺序不同,结果有时就会不同。
2.1.6 顺序差异解决方案
由以上UDF造成的差异可以忽略,相关任务如果在资源方面也有节省,那么最终的状态是经验可迁移状态,符合迁移条件。
下面几个日期/时间相关函数,当有异常输入是Spark SQL会返回NULL,而Hive SQL会返回一个非NULL值。
2.2.1 datediff
对于异常日期,比如0000-00-00执行datediff两者会存在差异。
2.2.2 unix_timestamp
对于24点Spark认为是非法的返回NULL,而Hive任务是正常的,下表时执行unix_timestamp(concat('2020-06-01', ' 24:00:00'))时的差异。
2.2.3 to_date
当月或者日是00时Hive仍然会返回一个日期,但是Spark会返回NULL。
2.2.4 date_sub
当月或者日是00时Hive仍然会返回一个日期,但是Spark会返回NULL。
2.2.5 date_add
当月或者日是00时Hive仍然会返回一个日期,但是Spark会返回NULL。
2.2.6 非顺序差异解决方案
这些差异是是因为对异常UDF参数的处理逻辑不同造成的,虽然Spark SQL返回NULL更合理,但是现有的Hive SQL任务用户适应了这种处理逻辑,所以为了不影响现有SQL任务,我们对这类UDF做了兼容处理,用户可以通过配置来决定使用Hive内置函数还是Spark的内置UDF。
2.3.1 差异说明
基于MapReduce的Hive SQL一个Task会启动一个进程,进程中的主线程负责数据处理, 因此在Hive SQL中UDF只会在单程中执行。
而Spark 一个Executor可能会启动多个Task,如下图所示。因此在Spark SQL中自定义UDF时需要考虑线程安全问题。
2.3.2 差异解决方案
下面是一个非线程安全的示例,UDF内部共享静态变量,在执行UDF时会读写这个静态变量。
解决方案也比较简单,一种是加锁,如下图所示:
另一种是取消静态成员,如下图所示:
Hive SQL可以通过设置以下配置合并小文件,MR Job结束后,判断生成文件的平均大小,如果小于阀值,就再启动一个Job来合并文件。
set hive.merge.mapredfiles=true; set hive.merge.mapfiles=true; set hive.merge.smallfiles.avgsize=268435456;
目前Spark SQL不支持小文件合并,在迁移过程中,我们经常发现Spark SQL生成的文件数多于Hive SQL,为此我们参考Hive SQL的实现在Spark SQL中引入了小文件合并功能。
在InsertIntoHiveTable 中判断如果开启小文件合并,并且文件的平均大小低于阈值则执行合并,合并之后再执行loadTable或者loadPartition操作。
Hive SQL任务是DataStudio通过beeline -f执行的,客户端只负责发送SQL语句给HS2,已经获取执行结果,因此是非常轻量的。而Spark SQL只支持Client模式,Driver在Client进程中,因此Client模式执行Spark SQL时,有时会占用很多的资源,DataStudio无法感知Spark Driver的资源开销,所以在DataStudio层面会带来以下问题:
所以我们开发了Spark SQL支持Cluster模式,该模式只支持非交互式方式执行SQL,包括spark-sql -e和spark-sql -f,不支持交互式模式。
迁移过程中我们发现大部分任务的分区条件包括concat, concat_ws, substr等UDF, HiveServer2会调用MetaStore的getPartitionsByExpr方法返回符合分区条件的有效分区,避免无效的扫描, 但是Spark SQL的分区剪裁只支持由Attribute和Literal组成key/value结构的谓词条件,这一方面导致无法有效分区剪裁,会查询所有分区的数据, 造成读取大量无效数据,另一方面查询所有分区的元数据,导致MetaStore对MySQL查询压力激增,导致mysql进程把cpu打满。我们在社区版本的基础上迭代支持了多种场景的分区联合剪裁,目前能够覆盖生产任务90%以上的场景。
concat/concat_ws联合剪裁场景
concat_ws/concat = Int concat_ws/concat = String concat_ws/concat between and concat_ws/concat in concat_ws/concat and partition='part' concat_ws/concat between and partition='part' concat_ws/concat in and partition='part'
substr 联合剪裁场景
substr(dt, 1, 7) = substr('2020-06-30', 1, 7) substr = String substr(dt, 1, 6) between '202003' and '202006' substr(dt, 1, 6) in
concat/concat_ws&substr组合场景
substr and concat/concat_ws concat/concat_ws(substr, substr, ...)
目前已经反馈社区,正在讨论中,具体可参考[SPARK-33707][SQL] Support multiple types of function partition pruning on hive metastore
经过6个多月的团队的努力,我们迁移了1万多个Hive SQL任务到Spark SQL,在迁移过程中,随着spark SQL任务的增加,SQL任务的执行时间在逐渐减少,从最初的1000+秒下降到600+秒如下图所示:
迁移后Spark SQL任务占比85%,SQL任务运行时间节省40%,计算资源节省21%,内存资源节省49%,迁移的收益是非常大的。
迁移之后Spark已经成为SQL任务的主流引擎,但是还有大量的shell类型任务使用Hive执行SQL,所以后续我们会迁移shell类型任务,把shell中的Hive SQL迁移到Spark SQL。
在生产环境中,有些shuffle 比较中的任务经常会因为shuffle fetch重试甚至失败,我们想优化Spark External Shuffle Service。
社区推出Spark 3.x也半年多了,在功能和性能上有很大提升,所以我们也想和社区保持同步,升级Spark到3.x版本。
本文原文 Hive SQL迁移Spark SQL在滴滴的实践
本博客文章除特别声明,全部都是原创!