首发于数据仓库
数仓|COUNT DISTINCT数据倾斜优化

数仓|COUNT DISTINCT数据倾斜优化

什么是数据倾斜

数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被分配到一个分区里,造成了个别task运行的非常慢,从而影响了整个任务的执行效率。

数据倾斜产生的根本原因是少数Worker处理的数据量远远超过其他Worker处理的数据量,因此少数Worker的运行时长远远超过其他Worker的平均运行时长,导致整个任务运行时间超长,造成任务延迟。

数据倾斜的原因

当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。

导致数据倾斜常见的原因主要包括以下几个方面:

group by倾斜

  • 场景

group bykey分布不均匀。例如,在大促期间,某个店铺的单品PV量达4千万以上,店铺PV量达8千万以上,导致根据商品和店铺的PV量计算IPV时,发生数据倾斜。

  • 解决方案

配置下面的参数与SQL语句一起提交。

set hive.groupby.skewindata=true;

数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

join倾斜

  • 场景

join onkey分布不均匀。

  • 解决方案

  • 大表join小表
    如果join两边的表中有一张是小表,可以将join改为mapjoin来处理。

  • 关联key中有大量NULL值
    对易产生倾斜的key用单独的逻辑来处理。例如两边表的key中有大量NULL数据会导致倾斜,需要在join前先过滤掉NULL数据或补上随机数,然后再进行join。例如,某张表中,有大量未登录用户的访问记录(user_id为NULL),如果直接和用户表关联的话,会产生倾斜。这时候可以做如下处理:
    SELECT *
    FROM table_a a
    LEFT OUTER JOIN table_b b
    ON CASE WHEN a.user_id IS NULL THEN concat('dp_hive',rand() ) ELSE a.user_id END = b.user_id
    ;

    通常情况下,可能倾斜的值不是NULL,而是有意义的数据,这时候就需要对这类数据进行单独处理。

count distinct倾斜

  • 场景

特殊值过多,常见于固定的特殊值比较多的场景,和join中易产生倾斜的key类似。

  • 解决方案

先过滤特殊值,在count结果的基础上加上特殊值的个数。或根据具体场景进行具体分析。

count distinct数据倾斜优化

DISTINCT的使用场景

  • 数据量较小的表
  • 数据量较大时,且使用单个DISTINCT,且GROUP BY字段没有严重的数据倾斜

案例

  • 案例1

统计商品访问的uv

-- 优化前
SELECT  sku_code
        ,COUNT(DISTINCT user_id) AS vst_uv
FROM    dwd_log_vst_di
WHERE   ds = '${cur_date}'
GROUP BY sku_code

-- 优化后
SELECT  sku_code
        ,sum(uv) AS vst_uv
FROM    (
            SELECT  sku_code
                    ,user_id
                    ,1 AS uv
            FROM    dwd_log_vst_di
            WHERE   ds = '${cur_date}'
            GROUP BY sku_code
                     ,user_id
        ) 
GROUP BY sku_code

  • 案例2

统计不同分组下每个标签对应的用户数

假设有一张表dwd_user_tag_df用户标签表



 -- 优化前
SELECT  gid
        ,COUNT(DISTINCT if(is_tag_a = 1,user_id,null)) AS tag_a_usr_cnt
        ,COUNT(DISTINCT if(is_tag_b = 1,user_id,null)) AS tag_b_usr_cnt
        ,COUNT(DISTINCT if(is_tag_c = 1,user_id,null)) AS tag_c_usr_cnt
FROM dwd_user_tag_df
WHERE ds = '${cur_date}'
GROUP BY gid;

-- 优化后

SELECT  gid
        ,SUM(IF(is_a_cnt > 0 ,1,0))  AS a_cnt
        ,SUM(IF(is_b_cnt > 0 ,1,0))  AS b_cnt
        ,SUM(IF(is_c_cnt > 0 ,1,0))  AS ab_cnt
FROM    (
            SELECT  gid
                    ,user_id
                    ,SUM(IF( is_tag_a = 1 ,1,0 )) AS is_a_cnt
                    ,SUM(IF( is_tag_b = 1 ,1,0 )) AS is_b_cnt
                    ,SUM(IF( is_tag_c = 1 ,1,0 )) AS is_c_cnt
            FROM    dwd_user_tag_df
            WHERE   ds = '${cur_date}'
            GROUP BY gid
                     ,user_id
        ) t
GROUP BY gid
;

总结

当数据量较小时,我们几乎遇不到数据倾斜等影响任务运行效率的问题,但是当数据量非常大时且又存在文中所说的使用场景,如果不对任务进行优化,那么很可能任务产出效率会很低,不能满足业务的需求。本文主要介绍引起数据倾斜的几方面原因,并对每种情况进行了说明,最后给出了count distinct数据倾斜优化的案例,希望对你有所帮助。

Hive常见的分析函数

数仓规范|使SQL更易于阅读的几个小技巧

数仓面试|四个在工作后才知道的SQL密技

公众号:大数据技术与数仓

编辑于 2021-08-17 22:09