目录

  1. 什么是数据倾斜
  2. Hadoop框架的特性
  3. 主要表现
  4. 容易数据倾斜的情况
  5. 产生数据清洗的原因
  6. 业务场景
    1. 空值产生的数据倾斜
    2. 不同数据类型关联产生数据倾斜
    3. 大小表关联查询产生数据倾斜

一、什么是数据倾斜

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

二、Hadoop框架的特性

  1. 不怕数据大,怕数据倾斜
  2. jobs数比较多的作业运行效率相对比较低,如子查询比较多
  3. sum、count、max、min等聚合函数,通常不会有数据倾斜问题

三、主要表现

任务进度长时间维持在99%或者100%的附近,查看任务监控页面,发现只有少量reduce子任务未完成,因为其处理的数据量和其他的reduce差异多大.单一reduce处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长.

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

四、容易数据倾斜的情况

关键词 情形 后果
join 其中一个表较小,但是key集中 分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或空值过多 这些空值都由一个reduce处理,非常慢
group by 维度过小,某值的数量过多 处理某值的reduce非常耗时
count distinct 某特殊值过多 处理次特殊值的reduce耗时
  1. group by不和聚集函数搭配使用的时候
  2. count(distinct),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组的,按distinct字段排序
  3. 小表关联超大表join

五、产生数据倾斜的原因

  1. key分布不均匀
  2. 业务数据本身的特性
  3. 建表考虑不周全
  4. 某些HQL语句本身就存在数据倾斜

六、业务场景

1. 空值产生的数据倾斜

场景说明

在日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id相关联,就会陪到数据倾斜的问题

解决方案

方案一: user_id为空的不参与关联

select * from log a join user b on a.user_id is not null and a.user_id = b.user_id
union all
select * from log c where c.user_id is null;

解决方案2: 赋予空值新的key值

select * from log a left outer join user b on
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id;

总结

方法2比方法1效率更好,不但IO少了,而且作业数也少了; 方案1中,log表读了两次,jobs肯定是2,而方案2是1.这个优化适合无效id(比如-99, ", null)产生的数据倾斜,把空值的key变成一个字符串加上一个随机数,就能把造成数据倾斜的数据分布到不同的reduce上解决数据倾斜的问题.

改变之处:使本身为null的所有记录不会拥挤在同一个reduce Task了,会由于由替代的随机字符串,而分散到了多个reduce Task中,由于null值关联不上,处理后并不影响最终结果.

2. 不同数据类型关联产生数据倾斜

场景说明

用户表中user_id字段为int,log表中user_id为既有string也有int的类型,当按照两个表的user_id进行join操作的时候,默认的hash操作会按照int类型的id进行分配,这样就会导致所有的string类型的id就被分到同一个reducer当中

解决方案

select * from user a left outer join log b on b.user_id = cast(a.user_id as string)

3. 大小表关联查询产生数据倾斜

注意: 使用map join解决小表关联大表造成的数据倾斜问题,这个方案使用的频率很高.

map join 概念: 将其中做连接的小表(全量数据)分发到所有Map Task端进行join,从而避免了reduce Task,前提要求是内存足以装下该全量数据

 Hive的数据倾斜 随笔

以大表a和小表b为例,所有的maptask节点都装载小表b的所有数据,然后大表a的一个数据块数据,比如说是a1去跟b全量数据做链接,就省去了reduce做汇总的过程.所以相对来说,在内存允许的条件下使用map join比直接使用MapReduce效率还高些,当然这只限于做join查询的时候.

在hive中,直接提供了能够在HQL语句指定该次查询使用map join,map join的用法是在查询/子查询的SELECT 关键字后面添加 /+ MAPJOIN(tablelist)/ 提示优化器转化为map join(早期的Hive版本的优化器是不能自动优化map join的).其中tablelist可以是一个表,或以都好连接的表的列表.tablelist中的表将会读入内存,通常应该是将小表写在这里.

MapJoin具体用法:

select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movied;

在Hive0.11版本以后会自动开启map join优化,由两个参数控制:

set hive.auto.convert.join=true; //设置MapJoin优化自动开启 set hive.mapjoin.smalltable.filesize=25000000; //设置小表不超过多大时开启mapjoin优化

如果是大大表关联呢?那就大事化小,小事化了.把大表切分成小表,然后分别map join.

那么如果小表不大不小,那该如何处理呢?

使用map join解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这是就需要特别的处理.

例如: 日志表和用户表做连接

select * from log a left outer join users b on a.user_id = b.user_id;

users表由600w+的记录,把users分发到所有的map上也是个不小的开销,而且map join不支持这么大的小表.如果用普通的join,又会碰到数据倾斜的问题.

改进方案:

select /* +mapjoin(x) */* from log a
left outer join (
 select /* +mapjoin(c)*/ d.*
 form (select distinct user_id from log ) c.join users d on c.user_id = d.user_id) x
 on a.user_id = x.user_id; 

假如,log里user_id有上百万个,这就又回到原来map join问题.所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等.所以这个放啊能解决很多情景下的数据倾斜问题.

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄