大數(shù)據(jù)工程師面試題 - Spark 調(diào)優(yōu)(七)
?我是大數(shù)據(jù)歐老師,曾在互聯(lián)網(wǎng)某大廠任大數(shù)據(jù)負(fù)責(zé)人,從業(yè)大數(shù)據(jù)領(lǐng)域近 10 年,全網(wǎng)粉絲 5000+,從很多候選人的面試和咨詢中復(fù)盤了大數(shù)據(jù)工程師的面試全流程,如果你有求職大數(shù)據(jù)工程師的計(jì)劃,歡迎找我聊一聊!
某個(gè)task莫名其妙內(nèi)存溢出的情況
這種情況下去定位出問題的代碼就比較容易了。我們建議直接看yarn-client模式下本地log的異常棧,或者是通過YARN查看yarn-cluster模式下的log中的異常棧。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出。然后在那行代碼附近找找,一般也會(huì)有shuffle類算子,此時(shí)很可能就是這個(gè)算子導(dǎo)致了數(shù)據(jù)傾斜。
但是大家要注意的是,不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜。因?yàn)樽约壕帉懙拇a的bug,以及偶然出現(xiàn)的數(shù)據(jù)異常,也可能會(huì)導(dǎo)致內(nèi)存溢出。因此還是要按照上面所講的方法,通過Spark Web UI查看報(bào)錯(cuò)的那個(gè)stage的各個(gè)task的運(yùn)行時(shí)間以及分配的數(shù)據(jù)量,才能確定是否是由于數(shù)據(jù)傾斜才導(dǎo)致了這次內(nèi)存溢出。
查看導(dǎo)致數(shù)據(jù)傾斜的key的數(shù)據(jù)分布情況
知道了數(shù)據(jù)傾斜發(fā)生在哪里之后,通常需要分析一下那個(gè)執(zhí)行了shuffle操作并且導(dǎo)致了數(shù)據(jù)傾斜的RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)。針對(duì)不同的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術(shù)方案來解決。
此時(shí)根據(jù)你執(zhí)行操作的情況不同,可以有很多種查看key分布的方式:
- 如果是Spark SQL中的group by、join語句導(dǎo)致的數(shù)據(jù)傾斜,那么就查詢一下SQL中使用的表的key分布情況。
- 如果是對(duì)Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()。然后對(duì)統(tǒng)計(jì)出來的各個(gè)key出現(xiàn)的次數(shù),collect/take到客戶端打印一下,就可以看到key的分布情況。
舉例來說,對(duì)于上面所說的單詞計(jì)數(shù)程序,如果確定了是stage1的reduceByKey算子導(dǎo)致了數(shù)據(jù)傾斜,那么就應(yīng)該看看進(jìn)行reduceByKey操作的RDD中的key分布情況,在這個(gè)例子中指的就是pairs RDD。如下示例,我們可以先對(duì)pairs采樣10%的樣本數(shù)據(jù),然后使用countByKey算子統(tǒng)計(jì)出每個(gè)key出現(xiàn)的次數(shù),最后在客戶端遍歷和打印樣本數(shù)據(jù)中各個(gè)key的出現(xiàn)次數(shù)。
val sampledPairs = pairs.sample(false, 0.1) val sampledWordCounts = sampledPairs.countByKey() sampledWordCounts.foreach(println(_))
數(shù)據(jù)傾斜的解決方案
解決方案一:使用Hive ETL預(yù)處理數(shù)據(jù)
- 方案適用場(chǎng)景:導(dǎo)致數(shù)據(jù)傾斜的是Hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個(gè)key對(duì)應(yīng)了100萬數(shù)據(jù),其他key才對(duì)應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場(chǎng)景需要頻繁使用Spark對(duì)Hive表執(zhí)行某個(gè)分析操作,那么比較適合使用這種技術(shù)方案。
- 方案實(shí)現(xiàn)思路:此時(shí)可以評(píng)估一下,是否可以通過Hive來進(jìn)行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對(duì)數(shù)據(jù)按照key進(jìn)行聚合,或者是預(yù)先和其他表進(jìn)行join),然后在Spark作業(yè)中針對(duì)的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后的Hive表。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過聚合或join操作了,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。
- 方案實(shí)現(xiàn)原理:這種方案從根源上解決了數(shù)據(jù)傾斜,因?yàn)閺氐妆苊饬嗽赟park中執(zhí)行shuffle類算子,那么肯定就不會(huì)有數(shù)據(jù)傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本。因?yàn)楫吘箶?shù)據(jù)本身就存在分布不均勻的問題,所以Hive ETL中進(jìn)行g(shù)roup by或者join等shuffle操作時(shí),還是會(huì)出現(xiàn)數(shù)據(jù)傾斜,導(dǎo)致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。
方案優(yōu)缺點(diǎn)
- 方案優(yōu)點(diǎn):實(shí)現(xiàn)起來簡(jiǎn)單便捷,效果還非常好,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會(huì)大幅度提升。
- 方案缺點(diǎn):治標(biāo)不治本,Hive ETL中還是會(huì)發(fā)生數(shù)據(jù)傾斜。
- 方案實(shí)踐經(jīng)驗(yàn):在一些Java系統(tǒng)與Spark結(jié)合使用的項(xiàng)目中,會(huì)出現(xiàn)Java代碼頻繁調(diào)用Spark作業(yè)的場(chǎng)景,而且對(duì)Spark作業(yè)的執(zhí)行性能要求很高,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的Hive ETL,每天僅執(zhí)行一次,只有那一次是比較慢的,而之后每次Java調(diào)用Spark作業(yè)時(shí),執(zhí)行速度都會(huì)很快,能夠提供更好的用戶體驗(yàn)。
- 項(xiàng)目實(shí)踐經(jīng)驗(yàn):在美團(tuán)·點(diǎn)評(píng)的交互式用戶行為分析系統(tǒng)中使用了這種方案,該系統(tǒng)主要是允許用戶通過Java Web系統(tǒng)提交數(shù)據(jù)分析統(tǒng)計(jì)任務(wù),后端通過Java提交Spark作業(yè)進(jìn)行數(shù)據(jù)分析統(tǒng)計(jì)。要求Spark作業(yè)速度必須要快,盡量在10分鐘以內(nèi),否則速度太慢,用戶體驗(yàn)會(huì)很差。所以我們將有些Spark作業(yè)的shuffle操作提前到了Hive ETL中,從而讓Spark直接使用預(yù)處理的Hive中間表,盡可能地減少Spark的shuffle操作,大幅度提升了性能,將部分作業(yè)的性能提升了6倍以上。
解決方案二:過濾少數(shù)導(dǎo)致傾斜的key
- 方案適用場(chǎng)景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個(gè),而且對(duì)計(jì)算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對(duì)應(yīng)10條數(shù)據(jù),但是只有一個(gè)key對(duì)應(yīng)了100萬數(shù)據(jù),從而導(dǎo)致了數(shù)據(jù)傾斜。
- 方案實(shí)現(xiàn)思路:如果我們判斷那少數(shù)幾個(gè)數(shù)據(jù)量特別多的key,對(duì)作業(yè)的執(zhí)行和計(jì)算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個(gè)key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對(duì)RDD執(zhí)行filter算子過濾掉這些key。如果需要每次作業(yè)執(zhí)行時(shí),動(dòng)態(tài)判定哪些key的數(shù)據(jù)量最多然后再進(jìn)行過濾,那么可以使用sample算子對(duì)RDD進(jìn)行采樣,然后計(jì)算出每個(gè)key的數(shù)量,取數(shù)據(jù)量最多的key過濾掉即可。
- 方案實(shí)現(xiàn)原理:將導(dǎo)致數(shù)據(jù)傾斜的key給過濾掉之后,這些key就不會(huì)參與計(jì)算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜。
方案優(yōu)缺點(diǎn)
- 方案優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。
- 方案缺點(diǎn):適用場(chǎng)景不多,大多數(shù)情況下,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個(gè)。
- 方案實(shí)踐經(jīng)驗(yàn):在項(xiàng)目中我們也采用過這種方案解決數(shù)據(jù)傾斜。有一次發(fā)現(xiàn)某一天Spark作業(yè)在運(yùn)行的時(shí)候突然OOM了,追查之后發(fā)現(xiàn),是Hive表中的某一個(gè)key在那天數(shù)據(jù)異常,導(dǎo)致數(shù)據(jù)量暴增。因此就采取每次執(zhí)行前先進(jìn)行采樣,計(jì)算出樣本中數(shù)據(jù)量最大的幾個(gè)key之后,直接在程序中將那些key給過濾掉。
解決方案三:提高shuffle操作的并行度
- 方案適用場(chǎng)景:如果我們必須要對(duì)數(shù)據(jù)傾斜迎難而上,那么建議優(yōu)先使用這種方案,因?yàn)檫@是處理數(shù)據(jù)傾斜最簡(jiǎn)單的一種方案。
- 方案實(shí)現(xiàn)思路:在對(duì)RDD執(zhí)行shuffle算子時(shí),給shuffle算子傳入一個(gè)參數(shù),比如reduceByKey(1000),該參數(shù)就設(shè)置了這個(gè)shuffle算子執(zhí)行時(shí)shuffle read task的數(shù)量。對(duì)于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個(gè)參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對(duì)于很多場(chǎng)景來說都有點(diǎn)過小。
- 方案實(shí)現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有5個(gè)key,每個(gè)key對(duì)應(yīng)10條數(shù)據(jù),這5個(gè)key都是分配給一個(gè)task的,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個(gè)task就分配到一個(gè)key,即每個(gè)task就處理10條數(shù)據(jù),那么自然每個(gè)task的執(zhí)行時(shí)間都會(huì)變短了。具體原理如下圖所示。
方案優(yōu)缺點(diǎn)
- 方案優(yōu)點(diǎn):實(shí)現(xiàn)起來比較簡(jiǎn)單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響。
- 方案缺點(diǎn):只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)實(shí)踐經(jīng)驗(yàn)來看,其效果有限。
- 方案實(shí)踐經(jīng)驗(yàn):該方案通常無法徹底解決數(shù)據(jù)傾斜,因?yàn)槿绻霈F(xiàn)一些極端情況,比如某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個(gè)對(duì)應(yīng)著100萬數(shù)據(jù)的key肯定還是會(huì)分配到一個(gè)task中去處理,因此注定還是會(huì)發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時(shí)嘗試使用的第一種手段,嘗試去用嘴簡(jiǎn)單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。
解決方案四:兩階段聚合(局部聚合+全局聚合)
- 方案適用場(chǎng)景:對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進(jìn)行分組聚合時(shí),比較適用這種方案。
- 方案實(shí)現(xiàn)思路:這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù),比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì)變成了(1_hello, 2) (2_hello, 2)。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。
- 方案實(shí)現(xiàn)原理:將原本相同的key通過附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過多的問題。接著去除掉隨機(jī)前綴,再次進(jìn)行全局聚合,就可以得到最終的結(jié)果。具體原理見下圖。
方案優(yōu)缺點(diǎn)
- 方案優(yōu)點(diǎn):對(duì)于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜,效果是非常不錯(cuò)的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜,將Spark作業(yè)的性能提升數(shù)倍以上。
- 方案缺點(diǎn):僅僅適用于聚合類的shuffle操作,適用范圍相對(duì)較窄。如果是join類的shuffle操作,還得用其他的解決方案。
// 第一步,給RDD中的每個(gè)key都打上一個(gè)隨機(jī)前綴。 JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair( new PairFunction<Tuple2<Long,Long>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(10); return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2); } }); // 第二步,對(duì)打上隨機(jī)前綴的key進(jìn)行局部聚合。 JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); // 第三步,去除RDD中每個(gè)key的隨機(jī)前綴。 JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair( new PairFunction<Tuple2<String,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<String, Long> tuple) throws Exception { long originalKey = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Long>(originalKey, tuple._2); } }); // 第四步,對(duì)去除了隨機(jī)前綴的RDD進(jìn)行全局聚合。 JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });#大數(shù)據(jù)##大數(shù)據(jù)工程師##大數(shù)據(jù)知識(shí)體系##大數(shù)據(jù)面試##大數(shù)據(jù)面經(jīng)#
解決職場(chǎng)真實(shí)面試問題,分享同學(xué)真實(shí)成功案例,歡迎訂閱關(guān)注!