数仓|COUNT DISTINCT数据倾斜优化
什么是数据倾斜
数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被分配到一个分区里,造成了个别task运行的非常慢,从而影响了整个任务的执行效率。
数据倾斜产生的根本原因是少数Worker处理的数据量远远超过其他Worker处理的数据量,因此少数Worker的运行时长远远超过其他Worker的平均运行时长,导致整个任务运行时间超长,造成任务延迟。
数据倾斜的原因
当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。
导致数据倾斜常见的原因主要包括以下几个方面:
group by倾斜
- 场景
group by
的key
分布不均匀。例如,在大促期间,某个店铺的单品PV量达4千万以上,店铺PV量达8千万以上,导致根据商品和店铺的PV量计算IPV时,发生数据倾斜。
- 解决方案
配置下面的参数与SQL语句一起提交。
set hive.groupby.skewindata=true;
数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
join倾斜
- 场景
join on
的key
分布不均匀。
- 解决方案
- 大表join小表
如果join两边的表中有一张是小表,可以将join改为mapjoin来处理。 - 关联key中有大量NULL值
对易产生倾斜的key用单独的逻辑来处理。例如两边表的key中有大量NULL数据会导致倾斜,需要在join前先过滤掉NULL数据或补上随机数,然后再进行join。例如,某张表中,有大量未登录用户的访问记录(user_id为NULL),如果直接和用户表关联的话,会产生倾斜。这时候可以做如下处理:
SELECT *
FROM table_a a
LEFT OUTER JOIN table_b b
ON CASE WHEN a.user_id IS NULL THEN concat('dp_hive',rand() ) ELSE a.user_id END = b.user_id
;
通常情况下,可能倾斜的值不是NULL,而是有意义的数据,这时候就需要对这类数据进行单独处理。
count distinct倾斜
- 场景
特殊值过多,常见于固定的特殊值比较多的场景,和join
中易产生倾斜的key
类似。
- 解决方案
先过滤特殊值,在count
结果的基础上加上特殊值的个数。或根据具体场景进行具体分析。
count distinct数据倾斜优化
DISTINCT的使用场景
- 数据量较小的表
- 数据量较大时,且使用单个DISTINCT,且GROUP BY字段没有严重的数据倾斜
案例
- 案例1
统计商品访问的uv
-- 优化前
SELECT sku_code
,COUNT(DISTINCT user_id) AS vst_uv
FROM dwd_log_vst_di
WHERE ds = '${cur_date}'
GROUP BY sku_code
-- 优化后
SELECT sku_code
,sum(uv) AS vst_uv
FROM (
SELECT sku_code
,user_id
,1 AS uv
FROM dwd_log_vst_di
WHERE ds = '${cur_date}'
GROUP BY sku_code
,user_id
)
GROUP BY sku_code
- 案例2
统计不同分组下每个标签对应的用户数
假设有一张表dwd_user_tag_df用户标签表
-- 优化前
SELECT gid
,COUNT(DISTINCT if(is_tag_a = 1,user_id,null)) AS tag_a_usr_cnt
,COUNT(DISTINCT if(is_tag_b = 1,user_id,null)) AS tag_b_usr_cnt
,COUNT(DISTINCT if(is_tag_c = 1,user_id,null)) AS tag_c_usr_cnt
FROM dwd_user_tag_df
WHERE ds = '${cur_date}'
GROUP BY gid;
-- 优化后
SELECT gid
,SUM(IF(is_a_cnt > 0 ,1,0)) AS a_cnt
,SUM(IF(is_b_cnt > 0 ,1,0)) AS b_cnt
,SUM(IF(is_c_cnt > 0 ,1,0)) AS ab_cnt
FROM (
SELECT gid
,user_id
,SUM(IF( is_tag_a = 1 ,1,0 )) AS is_a_cnt
,SUM(IF( is_tag_b = 1 ,1,0 )) AS is_b_cnt
,SUM(IF( is_tag_c = 1 ,1,0 )) AS is_c_cnt
FROM dwd_user_tag_df
WHERE ds = '${cur_date}'
GROUP BY gid
,user_id
) t
GROUP BY gid
;
总结
当数据量较小时,我们几乎遇不到数据倾斜等影响任务运行效率的问题,但是当数据量非常大时且又存在文中所说的使用场景,如果不对任务进行优化,那么很可能任务产出效率会很低,不能满足业务的需求。本文主要介绍引起数据倾斜的几方面原因,并对每种情况进行了说明,最后给出了count distinct数据倾斜优化的案例,希望对你有所帮助。
公众号:大数据技术与数仓