前面给大家讲过MapReduce的数据倾斜解决方案以及优化,今天就给大家说下Spark的数据倾斜解决方案。
简单来说数据倾斜就是数据的 key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面,即数据分布不均。如下图所示:
以 WordCount 为例,它的 map 阶段形成 (“xxxx”,1)的形式,然后在 reduce 阶段进行 value 相加,得出 “xxxx” 出现的次数。若进行 WordCount 的文本有100G,其中 80G 全部是 “xxxx”, 剩下 20G 是其余单词,那就会形成 80G 的数据量交给一个 reduce 进行相加,其余 20G 根据 key 不同分散到不同reduce 进行相加的情况。如此就造成了数据倾斜,临床反应就是 reduce 跑到99%然后一直在原地等着 那 80G 的 reduce 跑完。
为什么会出现数据倾斜
在执行shuffle操作的时候,是按照key,来进行values的数据的输出、拉取和聚合的。同一个key的values,一定是分配到一个reduce task进行处理的。多个key对应的values,比如一共是90万。可能某个key对应了88万数据,被分配到一个task上去面去执行。另外两个task,可能各分配到了1万数据,可能是数百个key,对应的1万条数据。这样就会出现数据倾斜问题。
想象一下,出现数据倾斜以后的运行的情况。
其中两个task,各分配到了1万数据,可能同时在10分钟内都运行完了。另外一个task有88万条,88 * 10 = 880分钟 = 14.5个小时。
大家看,本来另外两个task很快就运行完毕了(10分钟),但是由于一个拖后腿的家伙,第三个task,要14.5个小时才能运行完,就导致整个spark作业,也得14.5个小时才能运行完。这样就拖累了整个spark作业运行的时间,这样就很不划算了。
出现数据倾斜的现象
Spark数据倾斜,有两种表现:
-
你的大部分的task,都执行的特别特别快,(你要用client模式,standalone client,yarn client,本地机器一执行spark-submit脚本,就会开始打印log),task175 finished,剩下几个task,执行的特别特别慢,前面的task,一般1s可以执行完5个,最后发现1000个task,998,999 task,要执行1个小时,2个小时才能执行完一个task。出现以上现象,就表明出现数据倾斜了。
这样还算好的,因为虽然运行非常慢,但是至少还能跑完。 -
另一种情况是,运行的时候,其他task都执行完了,也没什么特别的问题,但是有的task,就是会突然间报了一个OOM,JVM Out Of Memory,内存溢出了,task failed,task lost,resubmitting task。反复执行几次都到了某个task就是跑不通,最后就挂掉。
某个task就直接OOM,那么基本上也是因为数据倾斜了,task分配的数量实在是太大了!所以内存放不下,然后你的task每处理一条数据,还要创建大量的对象,内存爆掉了。
这样也表明出现数据倾斜了。
这种就不太好了,因为你的程序如果不去解决数据倾斜的问题,压根儿就跑不出来。
作业都跑不完,还谈什么性能调优这些东西?
定位导致数据倾斜的代码
根据log去定位
出现数据倾斜的原因,基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。因为某个或者某些key对应的数据,远远的高于其他的key。
这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子:
distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。
在 Spark Web UI 上深入看一下当前这个 stage 各个 task 分配的数据量,从而
进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。
看log
log一般会报是在你的哪一行代码,导致了OOM异常。或者看log,看看是执行到了第几个stage。spark代码,是怎么划分成一个一个的stage的。哪一个stage生成的task特别慢,就能够自己用肉眼去对你的spark代码进行stage的划分,就能够通过stage定位到你的代码,到底哪里发生了数据倾斜。
常见数据倾斜解决方案
数据倾斜解决方案,一般来说。聚合源数据和过滤导致倾斜的key,这两个方案是最直接、最有效、最简单的解决数据倾斜问题的方案。
如果碰到了数据倾斜的问题。上来就先考虑这两个方案看能不能做,如果能做的话,就尽量使用这两个方法。
有效、简单、直接才是最好的,彻底根除了数据倾斜的问题。
1.聚合源数据
一些聚合的操作,比如groupByKey、reduceByKey,groupByKey说白了就是拿到每个key对应的values。reduceByKey说白了就是对每个key对应的values执行一定的计算。
这些操作,比如groupByKey和reduceByKey,包括之前说的join。都是在spark作业中执行的。
spark作业的数据来源,通常是哪里呢?90%的情况下,数据来源都是hive表(hdfs,大数据分布式存储系统)。
hdfs上存储的大数据。hive表中的数据通常是怎么出来的呢?有了spark以后,hive比较适合做什么事情?hive就是适合做离线的,晚上凌晨跑的,ETL(extract transform load,数据的采集、清洗、导入),hive sql,去做这些事情,从而去形成一个完整的hive中的数据仓库。说白了,数据仓库,就是一堆表。
spark作业的源表,hive表,通常情况下来说,也是通过某些hive etl生成的。hive etl可能是晚上凌晨在那儿跑。今天跑昨天的数据。
数据倾斜,某个key对应的80万数据,某些key对应几百条,某些key对应几十条。现在咱们直接在生成hive表的hive etl中对数据进行聚合。比如按key来分组,将key对应的所有的values全部用一种特殊的格式拼接到一个字符串里面去,比如“key=xx, value:id=01|user_id=1|category_id=001”。
对key进行group,在spark中,拿到key=xx,values。hive etl中,直接对key进行了聚合。那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串进行map操作,进行你需要的操作即可。
spark中,可能对这个操作,就不需要执行shffule操作了,也就根本不可能导致数据倾斜。
或者是对每个key在hive etl中进行聚合,对所有values聚合一下,不一定是拼接起来,可能是直接进行计算。reduceByKey计算函数应用在hive etl中,从而得到每个key的values。
聚合源数据方案第二种做法是:
你可能没有办法对每个key聚合出来一条数据。那么也可以做一个妥协,对每个key对应的数据,10万条。有好几个粒度,比如10万条里面包含了几个城市、几天、几个地区的数据,现在放粗粒度。直接就按照城市粒度,做一下聚合,几个城市,几天、几个地区粒度的数据,都给聚合起来。比如说:
city_id date area_id
select … from … group by city_id
尽量去聚合,减少每个key对应的数量,也许聚合到比较粗的粒度之后,原先有10万数据量的key,现在只有1万数据量。减轻数据倾斜的现象和问题。
2.对数据进行 ETL 预处理
-
hive 中文件大小不均匀,有的大有的小。spark 在读取大文件时会对大文件按。照 block 快进行切分,小文件不会切分。
如果不进行预处理,那么小文件处理速度快,大文件处理慢、资源没有得到充分利用,可以先对 hive 数据进行清洗、去重、重新分区等操作来将原本不均匀的数据重新均匀的存放在多个文件中。以简化后面依赖此数据源的任务。 -
hive 中 key 分布不均匀,可以将 shuffle 类操作在先进行处理。处理完毕之后,spark 应用不必进行重复的 shuffle,直接用处理后的结果就可以。
在频繁调用 spark 作业并且有实效要求的场景中,如果今天作业要用到昨天数据的聚合数据,可以每天进行一次预处理,将数据聚合好,从而保证今天作业的实效要求
3.过滤少数导致倾斜的 key
- 倾斜 key 没有业务有意义,比如存在很多 key 是‘-’(‘-’在我们系统代表空)的记录,那么久可以直接 filter 掉来解决’-‘带来的数据倾斜。
- 倾斜 key 是有意义的,那么就需要单独拎出来进行单独处理。
- 如果你能够接受某些数据在spark作业中直接就摒弃掉不使用。比如说,总共有100万个key。只有2个key是数据量达到10万的。其他所有的key,对应的数量都是几十万。
这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key。
那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在你的spark作业中,自然就不会发生数据倾斜了。
4.提高 shuffle 操作reduce的并行度
当前面提到过的最简单的两个解决数据倾斜问题的方案都不适合的时候,再考虑这个方案。
将reduce task的数量变多,就可以让每个reduce task分配到更少的数据量。这样的话也许就可以缓解甚至是基本解决掉数据倾斜的问题。
使用场景:
同一个 task 被分配了多个倾斜的 key。试图增加 shuffle read task的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task处理比原来更少的数据。实现起来比较简单,可以有效缓解和减轻数据倾斜的影响,原理如下图:
具体操作是将 shuffle 算子,比如 groupByKey、countByKey、reduceByKey。在调用的时候传入进去一个参数,一个数字。那个数字就代表了那个 shuffle 操作的 reduce 端的并行度。那么在进行 shuffle 操作的时候,就会对应着创建指定数量的 reduce task。
比如说,原本某个task分配数据特别多,直接OOM,内存溢出了,程序没法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操作,给它传入一个并行度数字,这样的话,原先那个task分配到的数据,肯定会变少。就至少可以避免OOM的情况,程序至少是可以跑的。
提升shuffle reduce并行度的缺陷
治标不治本的意思,因为它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。原理没有改变,只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。
实际生产环境中的经验:
1、如果最理想的情况下,提升并行度以后,减轻了数据倾斜的问题,或者甚至可以让数据倾斜的现象忽略不计,那么就最好。就不用做其他的数据倾斜解决方案了。
2、不太理想的情况下,比如之前某个task运行特别慢,要5个小时,现在稍微快了一点,变成了4个小时。或者是原先运行到某个task,直接OOM,现在至少不会OOM了,但是那个task运行特别慢,要5个小时才能跑完。
那么,如果出现第二种情况的话,各位,就立即放弃这种解决方案,开始去尝试和选择后面的几种方案。
5.两阶段聚合(局部聚合+全局聚合)
使用场景:
对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。如果是join类的shuffle操作,还得用其他的解决方案。
实现方式:
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
对groupByKey、reduceByKey造成的数据倾斜,有比较好的效果。如下图所示:
6.将 reduce join 转为 map join
使用场景:
如果两个RDD要进行join,其中一个RDD是比较小的。比如一个RDD是100万数据,一个RDD是1万数据。(一个RDD是1亿数据,一个RDD是100万数据)。
其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据以后,就会在每个executor的block manager中都保存一份。要确保你的内存足够存放那个小RDD中的数据。
这种方式下,根本不会发生shuffle操作,肯定也不会发生数据倾斜。从根本上杜绝了join操作可能导致的数据倾斜的问题。
对于join中有数据倾斜的情况,大家尽量第一时间先考虑这种方式,效果非常好。
不适合的情况:
两个RDD都比较大,那么这个时候,你去将其中一个RDD做成broadcast,就很笨拙了。很可能导致内存不足。最终导致内存溢出,程序挂掉。
而且其中某些key(或者是某个key),还发生了数据倾斜。此时可以采用最后两种方式。
对于join这种操作,不光是考虑数据倾斜的问题。即使是没有数据倾斜问题,也完全可以优先考虑,用我们讲的这种高级的reduce join转map join的技术,不要用普通的join,去通过shuffle,进行数据的join。
完全可以通过简单的map,使用map join的方式,牺牲一点内存资源。在可行的情况下,优先这么使用。
不走shuffle,直接走map,是不是性能也会高很多?这是肯定的。
7.sample采样倾斜key单独进行join
-
实现思路
将发生数据倾斜的key,单独拉出来,放到一个RDD中去。就用这个原本会倾斜的key RDD跟其他RDD单独去join一下,这个时候key对应的数据可能就会分散到多个task中去进行join操作。
就不至于说是,这个key跟之前其他的key混合在一个RDD中时,肯定是会导致一个key对应的所有数据都到一个task中去,就会导致数据倾斜。 -
使用场景:
适用于 join 类操作中,由于相同 key 过大占内存,不能使用第 5 个方案,但倾斜 key 的种数不是很多的场景。 -
实现方式:
第一步:对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个 key。
第二步:将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个 RDD。
第三步:接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,不会导致倾斜的大部分 key 也形成另外一个 RDD。
第四步:再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进 行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join了。
第五步:而另外两个普通的 RDD 就照常 join 即可。最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。
具体原理见下图:
什么时候不适用呢?
如果一个RDD中,导致数据倾斜的key特别多。那么此时,最好还是不要这样了。还是使用我们最后一个方案,终极的join数据倾斜的解决方案。
就是说,咱们单拉出来了一个或者少数几个可能会产生数据倾斜的key,然后还可以进行更加优化的一个操作。
对于那个key,从另外一个要join的表中,也过滤出来一份数据,比如可能就只有一条数据。userid2infoRDD,一个userid key,就对应一条数据。
然后呢,采取对那个只有一条数据的RDD,进行flatMap操作,打上100个随机数,作为前缀,返回100条数据。
单独拉出来的可能产生数据倾斜的RDD,给每一条数据,都打上一个100以内的随机数,作为前缀。
再去进行join,是不是性能就更好了。肯定可以将数据进行打散,去进行join。join完以后,可以执行map操作,去将之前打上的随机数给去掉,然后再和另外一个普通RDD join以后的结果进行union操作。
8.使用随机前缀和扩容 RDD 进行 join
使用场景:
如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜。
当采用随机数和扩容表进行join解决数据倾斜的时候,就代表着,你的之前的数据倾斜的解决方案,都没法使用。
这个方案是没办法彻底解决数据倾斜的,更多的,是一种对数据倾斜的缓解。
实现方式:
同上面的第 7 个方案,不同的是它不需要对原先 rdd 进行倾斜 key 过滤。将原来 rdd形成含倾斜 key 的 rdd,与不含倾斜 key 的 rdd。直接对整个原本的 rdd 的 key 一边进行加随机数,另一边进行相应倍数的扩容。而这一种方案是针对有大量倾斜key 的情况,没法将部分 key 拆分出来进行单独处理,因此只能对整个 RDD 进行数据扩容,对内存资源要求很高。
步骤:
1、选择一个RDD,要用flatMap,进行扩容,将每条数据,映射为多条数据,每个映射出来的数据,都带了一个n以内的随机数,通常来说会选择10。
2、将另外一个RDD,做普通的map映射操作,每条数据都打上一个10以内的随机数。
3、最后将两个处理后的RDD进行join操作。
弊端:
1、因为你的两个RDD都很大,所以你没有办法去将某一个RDD扩的特别大,一般咱们就是10倍。
2、如果就是10倍的话,那么数据倾斜问题的确是只能说是缓解和减轻,不能说彻底解决。
总结:
sample采样倾斜key并单独进行join
将key,从另外一个RDD中过滤出的数据,可能只有一条或者几条,此时,咱们可以任意进行扩容,扩成1000倍。
将从第一个RDD中拆分出来的那个倾斜key RDD,打上1000以内的一个随机数。这种情况下,还可以配合上,提升shuffle reduce并行度,join(rdd, 1000)。通常情况下,效果还是非常不错的。
打散成100份,甚至1000份,2000份,去进行join,那么就肯定没有数据倾斜的问题了吧。
转载:https://blog.csdn.net/zp17834994071/article/details/107893589