优化SQL处理join数据倾斜
上篇已经多次提到了数据倾斜,包括已经写过的sort by代替order by,以及group by代替方法,本质上也是为了解决它。join操作更是数据倾斜的重灾区,需要多加注意。
空值或无意义值
这种情况很常见,比如当事实表是日志类数据时,往往会有一些项没有记录到,我们视情况会将它置为nullmapjoin,或者空字符串、-1等。如果缺失的项很多,在做join时这些空值就会非常集中,拖累进度。
因此,若不需要空值数据,就提前写where语句过滤掉。需要保留的话,将空值key用随机方式打散,例如将用户ID为null的记录随机改为负值:
select a.uid,a.event_type,b.nickname,b.age
from (
select
(case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,
event_type from calendar_record_log
where pt_date >= 20190201
) a left outer join (
select uid,nickname,age from user_info where status = 4
) b on a.uid = b.uid;
单独处理倾斜key
这其实是上面处理空值方法的拓展,不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),最后再进行聚合。SQL语句与上面的相仿,不再赘述。
不同数据类型
这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。
举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是( int),新表的是( )。为了兼容旧版记录,新表的也会以字符串形式存储旧版的值,比如'17'。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”型key都分配到一个上。所以要注意类型转换:
select a.uid,a.event_type,b.record_data
from calendar_record_log a
left outer join (
select uid,event_type from calendar_record_log_2
where pt_date = 20190228
) b on a.uid = b.uid and b.event_type = cast(a.event_type as string)
where a.pt_date = 20190228;
build table过大
有时,build table会大到无法直接使用map join的地步,比如全量用户维度表,而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件,削减build table的数据量,再使用map join解决。代价就是需要进行两次join。举个例子:
select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info
from calendar_record_log a
left outer join (
select /*+mapjoin(s)*/ t.uid,t.status,t.extra_info
from (select distinct uid from calendar_record_log where pt_date = 20190228) s
inner join user_info t on s.uid = t.uid
) b on a.uid = b.uid
where a.pt_date = 20190228;
优化
image
调整数
数量与输入文件的split数息息相关,在源码org....lib.input.类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述数是如何确定的。
可见,如果想减少数,就适当调高.min.split.size,split数就减少了。如果想增大数,除了降低.min.split.size之外,也可以调高.map.tasks。
一般来讲,如果输入文件是少量大文件,就减少数;如果输入文件是大量非小文件,就增大数;至于大量小文件的情况,得参考下面“合并小文件”一节的方法处理。
调整数
数量的确定方法比简单得多。使用参数..tasks可以直接设定数量,不像一样是期望值。但如果不设这个参数的话,Hive就会自行推测mapjoin,逻辑如下:
数量与输出文件的数量相关。如果数太多,会产生大量小文件,对HDFS造成压力。如果数太少,每个要处理很多数据,容易拖慢运行时间或者造成OOM。
合并小文件启用压缩
压缩job的中间结果数据和输出数据,可以用少量CPU时间节省很多空间。压缩方式一般选择,效率最高。
要启用中间压缩,需要设定press.为true,同时指定压缩方式.codec为press.。另外,参数.type可以选择对块(BLOCK)还是记录()压缩,BLOCK的压缩率比较高。
输出压缩的配置基本相同,打开press.即可。
JVM重用
在MR job中,默认是每执行一个task就启动一个JVM。如果task非常小而碎,那么JVM启动和关闭的耗时就会很长。可以通过调节参数.job.reuse.jvm.num.tasks来重用。例如将这个参数设成5,那么就代表同一个MR job中顺序执行的5个task可以重复使用一个JVM,减少启动和关闭的开销。但它对不同MR job中的task无效。
并行执行与本地模式严格模式
所谓严格模式,就是强制不允许用户执行3种有风险的语句,一旦执行会直接失败。这3种语句是:
要开启严格模式,需要将参数hive..mode设为。
采用合适的存储格式
在的 table语句中,可以使用 as ...指定表的存储格式。Hive表支持的存储格式有、、、Avro、ORC、等。
存储格式一般需要根据业务进行选择,在我们的实操中,绝大多数表都采用与两种存储格式之一。
是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。虽然它的磁盘开销比较大,查询效率也低,但它更多地是作为跳板来使用。、ORC、等格式的表都不能由文件直接导入数据,必须由来做中转。
和ORC都是旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。我们选择的原因主要是它支持查询引擎,并且我们对、和事务性操作需求很低。
这里就不展开讲它们的细节,可以参考各自的官网:
结束
写了这么多,肯定有遗漏或错误之处,欢迎各位大佬批评指正。
2人点赞