SparkSQL是Apache Spark最廣泛使用的一個組件,它提供了非常友好的接口來分布式處理結構化數據,在很多應用領域都有成功的生產實踐,但是在超大規模集群和數據集上,Spark SQL仍然遇到不少易用性和可擴展性的挑戰。為了應對這些挑戰,英特爾大數據技術團隊和百度大數據基礎架構部工程師在Spark 社區版本的基礎上,改進并實現了自適應執行引擎。本文首先討論Spark SQL在大規模數據集上遇到的挑戰,然后介紹自適應執行的背景和基本架構,以及自適應執行如何應對Spark SQL這些問題,最后我們將比較自適應執行和現有的社區版本Spark SQL在100 TB 規模TPC-DS基準測試碰到的挑戰和性能差異,以及自適應執行在Baidu Big SQL平臺的使用情況。
挑戰1:關于shuffle partition數
在Spark SQL中, shufflepartition數可以通過參數spark.sql.shuffle.partition來設置,默認值是200。這個參數決定了SQL作業每個reduce階段任務數量,對整個查詢性能有很大影響。假設一個查詢運行前申請了E個Executor,每個Executor包含C個core(并發執行線程數),那么該作業在運行時可以并行執行的任務數就等于E x C個,或者說該作業的并發數是E x C。假設shuffle partition個數為P,除了map stage的任務數和原始數據的文件數量以及大小相關,后續的每個reduce stage的任務數都是P。由于Spark作業調度是搶占式的,E x C個并發任務執行單元會搶占執行P個任務,“能者多勞”,直至所有任務完成,則進入到下一個Stage。但這個過程中,如果有任務因為處理數據量過大(例如:數據傾斜導致大量數據被劃分到同一個reducer partition)或者其它原因造成該任務執行時間過長,一方面會導致整個stage執行時間變長,另一方面E x C個并發執行單元大部分可能都處于空閑等待狀態,集群資源整體利用率急劇下降。
那么spark.sql.shuffle.partition參數究竟是多少比較合適?如果設置過小,分配給每一個reduce任務處理的數據量就越多,在內存大小有限的情況下,不得不溢寫(spill)到計算節點本地磁盤上。Spill會導致額外的磁盤讀寫,影響整個SQL查詢的性能,更差的情況還可能導致嚴重的GC問題甚至是OOM。相反,如果shuffle partition設置過大。第一,每一個reduce任務處理的數據量很小并且很快結束,進而導致Spark任務調度負擔變大。第二,每一個mapper任務必須把自己的shuffle輸出數據分成P個hash bucket,即確定數據屬于哪一個reduce partition,當shuffle partition數量太多時,hash bucket里數據量會很小,在作業并發數很大時,reduce任務shuffle拉取數據會造成一定程度的隨機小數據讀操作,當使用機械硬盤作為shuffle數據臨時存取的時候性能下降會更加明顯。最后,當最后一個stage保存數據時會寫出P個文件,也可能會造成HDFS文件系統中大量的小文件。
從上,shuffle partition的設置既不能太小也不能太大。為了達到最佳的性能,往往需要經多次試驗才能確定某個SQL查詢最佳的shuffle partition值。然而在生產環境中,往往SQL以定時作業的方式處理不同時間段的數據,數據量大小可能變化很大,我們也無法為每一個SQL查詢去做耗時的人工調優,這也意味這些SQL作業很難以最佳的性能方式運行。
Shuffle partition的另外一個問題是,同一個shuffle partition數設置將應用到所有的stage。Spark在執行一個SQL作業時,會劃分成多個stage。通常情況下,每個stage的數據分布和大小可能都不太一樣,全局的shuffle partition設置最多只能對某個或者某些stage最優,沒有辦法做到全局所有的stage設置最優。
這一系列關于shufflepartition的性能和易用性挑戰,促使我們思考新的方法:我們能否根據運行時獲取的shuffle數據量信息,例如數據塊大小,記錄行數等等,自動為每一個stage設置合適的shuffle partition值?
挑戰2:Spark SQL最佳執行計劃
Spark SQL在執行SQL之前,會將SQL或者Dataset程序解析成邏輯計劃,然后經歷一系列的優化,最后確定一個可執行的物理計劃。最終選擇的物理計劃的不同對性能有很大的影響。如何選擇最佳的執行計劃,這便是Spark SQL的Catalyst優化器的核心工作。Catalyst早期主要是基于規則的優化器(RBO),在Spark 2.2中又加入了基于代價的優化(CBO)。目前執行計劃的確定是在計劃階段,一旦確認以后便不再改變。然而在運行期間,當我們獲取到更多運行時信息時,我們將有可能得到一個更佳的執行計劃。
以join操作為例,在Spark中最常見的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin屬于map side join,其原理是當其中一張表存儲空間大小小于broadcast閾值時,Spark選擇將這張小表廣播到每一個Executor上,然后在map階段,每一個mapper讀取大表的一個分片,并且和整張小表進行join,整個過程中避免了把大表的數據在集群中進行shuffle。而SortMergeJoin在map階段2張數據表都按相同的分區方式進行shuffle寫,reduce階段每個reducer將兩張表屬于對應partition的數據拉取到同一個任務中做join。RBO根據數據的大小,盡可能把join操作優化成BroadcastHashJoin。Spark中使用參數spark.sql.autoBroadcastJoinThreshold來控制選擇BroadcastHashJoin的閾值,默認是10MB。然而對于復雜的SQL查詢,它可能使用中間結果來作為join的輸入,在計劃階段,Spark并不能精確地知道join中兩表的大小或者會錯誤地估計它們的大小,以致于錯失了使用BroadcastHashJoin策略來優化join執行的機會。但是在運行時,通過從shuffle寫得到的信息,我們可以動態地選用BroadcastHashJoin。以下是一個例子,join一邊的輸入大小只有600K,但Spark仍然規劃成SortMergeJoin。
圖1
這促使我們思考第二個問題:我們能否通過運行時收集到的信息,來動態地調整執行計劃?
挑戰3:數據傾斜
數據傾斜是常見的導致Spark SQL性能變差的問題。數據傾斜是指某一個partition的數據量遠遠大于其它partition的數據,導致個別任務的運行時間遠遠大于其它任務,因此拖累了整個SQL的運行時間。在實際SQL作業中,數據傾斜很常見,join key對應的hash bucket總是會出現記錄數不太平均的情況,在極端情況下,相同join key對應的記錄數特別多,大量的數據必然被分到同一個partition因而造成數據嚴重傾斜。如圖2,可以看到大部分任務3秒左右就完成了,而最慢的任務卻花了4分鐘,它處理的數據量卻是其它任務的若干倍。
圖2
目前,處理join時數據傾斜的一些常見手段有: (1)增加shuffle partition數量,期望原本分在同一個partition中的數據可以被分散到多個partition中,但是對于同key的數據沒有作用。(2)調大BroadcastHashJoin的閾值,在某些場景下可以把SortMergeJoin轉化成BroadcastHashJoin而避免shuffle產生的數據傾斜。(3)手動過濾傾斜的key,并且對這些數據加入隨機的前綴,在另一張表中這些key對應的數據也相應的膨脹處理,然后再做join。綜上,這些手段都有各自的局限性并且涉及很多的人為處理。基于此,我們思考了第三個問題:Spark能否在運行時自動地處理join中的數據傾斜?
自適應執行背景和簡介
早在2015年,Spark社區就提出了自適應執行的基本想法,在Spark的DAGScheduler中增加了提交單個map stage的接口,并且在實現運行時調整shuffle partition數量上做了嘗試。但目前該實現有一定的局限性,在某些場景下會引入更多的shuffle,即更多的stage,對于三表在同一個stage中做join等情況也無法很好的處理。所以該功能一直處于實驗階段,配置參數也沒有在官方文檔中提及。
基于這些社區的工作,英特爾大數據技術團隊對自適應執行做了重新的設計,實現了一個更為靈活的自適性執行框架。在這個框架下面,我們可以添加額外的規則,來實現更多的功能。目前,已實現的特性包括:自動設置shuffle partition數,動態調整執行計劃,動態處理數據傾斜等等。
自適應執行架構
在Spark SQL中,當Spark確定最后的物理執行計劃后,根據每一個operator對RDD的轉換定義,它會生成一個RDD的DAG圖。之后Spark基于DAG圖靜態劃分stage并且提交執行,所以一旦執行計劃確定后,在運行階段無法再更新。自適應執行的基本思路是在執行計劃中事先劃分好stage,然后按stage提交執行,在運行時收集當前stage的shuffle統計信息,以此來優化下一個stage的執行計劃,然后再提交執行后續的stage。
圖3
從圖3中我們可以看出自適應執行的工作方法,首先以Exchange節點作為分界將執行計劃這棵樹劃分成多個QueryStage(Exchange節點在Spark SQL中代表shuffle)。每一個QueryStage都是一棵獨立的子樹,也是一個獨立的執行單元。在加入QueryStage的同時,我們也加入一個QueryStageInput的葉子節點,作為父親QueryStage的輸入。例如對于圖中兩表join的執行計劃來說我們會創建3個QueryStage。最后一個QueryStage中的執行計劃是join本身,它有2個QueryStageInput代表它的輸入,分別指向2個孩子的QueryStage。在執行QueryStage時,我們首先提交它的孩子stage,并且收集這些stage運行時的信息。當這些孩子stage運行完畢后,我們可以知道它們的大小等信息,以此來判斷QueryStage中的計劃是否可以優化更新。例如當我們獲知某一張表的大小是5M,它小于broadcast的閾值時,我們可以將SortMergeJoin轉化成BroadcastHashJoin來優化當前的執行計劃。我們也可以根據孩子stage產生的shuffle數據量,來動態地調整該stage的reducer個數。在完成一系列的優化處理后,最終我們為該QueryStage生成RDD的DAG圖,并且提交給DAG Scheduler來執行。
自動設置reducer個數
假設我們設置的shufflepartition個數為5,在map stage結束之后,我們知道每一個partition的大小分別是70MB,30MB,20MB,10MB和50MB。假設我們設置每一個reducer處理的目標數據量是64MB,那么在運行時,我們可以實際使用3個reducer。第一個reducer處理partition 0 (70MB),第二個reducer處理連續的partition 1 到3,共60MB,第三個reducer處理partition 4 (50MB),如圖4所示。
圖4
在自適應執行的框架中,因為每個QueryStage都知道自己所有的孩子stage,因此在調整reducer個數時,可以考慮到所有的stage輸入。另外,我們也可以將記錄條數作為一個reducer處理的目標值。因為shuffle的數據往往都是經過壓縮的,有時partition的數據量并不大,但解壓后記錄條數確遠遠大于其它partition,造成數據不均。所以同時考慮數據大小和記錄條數可以更好地決定reducer的個數。
動態調整執行計劃
目前我們支持在運行時動態調整join的策略,在滿足條件的情況下,即一張表小于Broadcast閾值,可以將SortMergeJoin轉化成BroadcastHashJoin。由于SortMergeJoin和BroadcastHashJoin輸出的partition情況并不相同,隨意轉換可能在下一個stage引入額外的shuffle操作。因此我們在動態調整join策略時,遵循一個規則,即在不引入額外shuffle的前提下才進行轉換。
將SortMergeJoin轉化成BroadcastHashJoin有哪些好處呢?因為數據已經shuffle寫到磁盤上,我們仍然需要shuffle讀取這些數據。我們可以看看圖5的例子,假設A表和B表join,map階段2張表各有2個map任務,并且shuffle partition個數為5。如果做SortMergeJoin,在reduce階段需要啟動5個reducer,每個reducer通過網絡shuffle讀取屬于自己的數據。然而,當我們在運行時發現B表可以broadcast,并且將其轉換成BroadcastHashJoin之后,我們只需要啟動2個reducer,每一個reducer讀取一個mapper的整個shuffle output文件。當我們調度這2個reducer任務時,可以優先將其調度在運行mapper的Executor上,因此整個shuffle讀變成了本地讀取,沒有數據通過網絡傳輸。并且讀取一個文件這樣的順序讀,相比原先shuffle時隨機的小文件讀,效率也更勝一籌。另外,SortMergeJoin過程中往往會出現不同程度的數據傾斜問題,拖慢整體的運行時間。而轉換成BroadcastHashJoin后,數據量一般比較均勻,也就避免了傾斜,我們可以在下文實驗結果中看到更具體的信息。
圖5
動態處理數據傾斜
在自適應執行的框架下,我們可以在運行時很容易地檢測出有數據傾斜的partition。當執行某個stage時,我們收集該stage每個mapper 的shuffle數據大小和記錄條數。如果某一個partition的數據量或者記錄條數超過中位數的N倍,并且大于某個預先配置的閾值,我們就認為這是一個數據傾斜的partition,需要進行特殊的處理。
圖6
假設我們A表和B表做inner join,并且A表中第0個partition是一個傾斜的partition。一般情況下,A表和B表中partition 0的數據都會shuffle到同一個reducer中進行處理,由于這個reducer需要通過網絡拉取大量的數據并且進行處理,它會成為一個最慢的任務拖慢整體的性能。在自適應執行框架下,一旦我們發現A表的partition 0發生傾斜,我們隨后使用N個任務去處理該partition。每個任務只讀取若干個mapper的shuffle 輸出文件,然后讀取B表partition 0的數據做join。最后,我們將N個任務join的結果通過Union操作合并起來。為了實現這樣的處理,我們對shuffle read的接口也做了改變,允許它只讀取部分mapper中某一個partition的數據。在這樣的處理中,B表的partition 0會被讀取N次,雖然這增加了一定的額外代價,但是通過N個任務處理傾斜數據帶來的收益仍然大于這樣的代價。如果B表中partition 0也發生傾斜,對于inner join來說我們也可以將B表的partition 0分成若干塊,分別與A表的partition 0進行join,最終union起來。但對于其它的join類型例如Left Semi Join我們暫時不支持將B表的partition 0拆分。
自適應執行和Spark SQL在100TB上的性能比較
我們使用99臺機器搭建了一個集群,使用Spark2.2在TPC-DS 100TB的數據集進行了實驗,比較原版Spark和自適應執行的性能。以下是集群的詳細信息:
圖7
實驗結果顯示,在自適應執行模式下,103條SQL中有92條都得到了明顯的性能提升,其中47條SQL的性能提升超過10%,最大的性能提升達到了3.8倍,并且沒有出現性能下降的情況。另外在原版Spark中,有5條SQL因為OOM等原因無法順利運行,在自適應模式下我們也對這些問題做了優化,使得103條SQL在TPC-DS 100TB數據集上全部成功運行。以下是具體的性能提升比例和性能提升最明顯的幾條SQL。
圖8
圖9
通過仔細分析了這些性能提升的SQL,我們可以看到自適應執行帶來的好處。首先是自動設置reducer個數,原版Spark使用10976作為shuffle partition數,在自適應執行時,以下SQL的reducer個數自動調整為1064和1079,可以明顯看到執行時間上也提升了很多。這正是因為減少了調度的負擔和任務啟動的時間,以及減少了磁盤IO請求。
在運行時動態調整執行計劃,將SortMergeJoin轉化成BroadcastHashJoin在某些SQL中也帶來了很大的提升。例如在以下的例子中,原本使用SortMergeJoin因為數據傾斜等問題花費了2.5分鐘。在自適應執行時,因為其中一張表的大小只有2.5k所以在運行時轉化成了BroadcastHashJoin,執行時間縮短為10秒。
100 TB的挑戰及優化
成功運行TPC-DS 100 TB數據集中的所有SQL,對于Apache Spark來說也是一大挑戰。雖然SparkSQL官方表示支持TPC-DS所有的SQL,但這是基于小數據集。在100TB這個量級上,Spark暴露出了一些問題導致有些SQL執行效率不高,甚至無法順利執行。在做實驗的過程中,我們在自適應執行框架的基礎上,對Spark也做了其它的優化改進,來確保所有SQL在100TB數據集上可以成功運行。以下是一些典型的問題。
統計map端輸出數據時driver單點瓶頸的優化(SPARK-22537)
在每個map任務結束后,會有一個表示每個partition大小的數據結構(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回給driver。而在自適應執行中,當一次shuffle的map stage結束后,driver會聚合每個mapper給出的partition大小信息,得到在各個partition上所有mapper輸出的數據總大小。該統計由單線程完成,如果mapper的數量是M,shuffle partition的數量為S,那么統計的時間復雜度在O(M x S) ~ O (M x S x log(M x S)) 之間,當CompressedMapStatus被使用時,復雜度為這個區間的下限,當HighlyCompressedMapStatus被使用時,空間有所節省,時間會更長,在幾乎所有的partition數據都為空時,復雜度會接近該區間的上限。
在M x S增大時,我們會遇到driver上的單點瓶頸,一個明顯的表現是UI上map stage和reduce stage之間的停頓。為了解決這個單點瓶頸,我們將任務盡量均勻地劃分給多個線程,線程之間不相交地為scala Array中的不同元素賦聚合值。
在這項優化中,新的spark.shuffle.mapOutput.parallelAggregationThreshold(簡稱threshold)被引入,用于配置使用多線程聚合的閾值,聚合的并行度由JVM中可用core數和M * S / threshold + 1中的小值決定。
Shuffle讀取連續partition時的優化 (SPARK-9853)
在自適應執行的模式下,一個reducer可能會從一個mapoutput文件中讀取諾干個連續的數據塊。目前的實現中,它需要拆分成許多獨立的getBlockData調用,每次調用分別從硬盤讀取一小塊數據,這樣就需要很多的磁盤IO。我們對這樣的場景做了優化,使得Spark可以一次性地把這些連續數據塊都讀上來,這樣就大大減少了磁盤的IO。在小的基準測試程序中,我們發現shuffle read的性能可以提升3倍。
BroadcastHashJoin中避免不必要的partition讀的優化
自適應執行可以為現有的operator提供更多優化的可能。在SortMergeJoin中有一個基本的設計:每個reducetask會先讀取左表中的記錄,如果左表的 partition為空,則右表中的數據我們無需關注(對于非anti join的情況),這樣的設計在左表有一些partition為空時可以節省不必要的右表讀取,在SortMergeJoin中這樣的實現很自然。
BroadcastHashJoin中不存在按照join key分區的過程,所以缺失了這項優化。然而在自適應執行的一些情況中,利用stage間的精確統計信息,我們可以找回這項優化:如果SortMergeJoin在運行時被轉換成了BroadcastHashJoin,且我們能得到各個partition key對應partition的精確大小,則新轉換成的BroadcastHashJoin將被告知:無需去讀那些小表中為空的partition,因為不會join出任何結果。
Baidu真實產品線試用情況
我們將自適應執行優化應用在Baidu內部基于Spark SQL的即席查詢服務BaiduBig SQL之上,做了進一步的落地驗證,通過選取單日全天真實用戶查詢,按照原有執行順序回放重跑和分析,得到如下幾點結論:
對于秒級的簡單查詢,自適應版本的性能提升并不明顯,這主要是因為它們的瓶頸和主要耗時集中在了IO上面,而這不是自適應執行的優化點。
按照查詢復雜度維度考量測試結果發現:查詢中迭代次數越多,多表join場景越復雜的情況下自適應執行效果越好。我們簡單按照group by, sort, join, 子查詢等操作個數來將查詢分類,如上關鍵詞大于3的查詢有明顯的性能提升,優化比從50%~200%不等,主要優化點來源于shuffle的動態并發數調整及join優化。
從業務使用角度來分析,前文所述SortMergeJoin轉BroadcastHashJoin的優化在Big SQL場景中命中了多種典型的業務SQL模板,試考慮如下計算需求:用戶期望從兩張不同維度的計費信息中撈取感興趣的user列表在兩個維度的整體計費。收入信息原表大小在百T級別,用戶列表只包含對應用戶的元信息,大小在10M以內。兩張計費信息表字段基本一致,所以我們將兩張表與用戶列表做inner join后union做進一步分析,SQL表達如下:
對應的原版Spark執行計劃如下:
圖14
針對于此類用戶場景,可以全部命中自適應執行的join優化邏輯,執行過程中多次SortMergeJoin轉為BroadcastHashJoin,減少了中間內存消耗及多輪sort,得到了近200%的性能提升。
結合上述3點,下一步自適應執行在Baidu內部的優化落地工作將進一步集中在大數據量、復雜查詢的例行批量作業之上,并考慮與用戶查詢復雜度關聯進行動態的開關控制。對于數千臺的大規模集群上運行的復雜查詢,自適應執行可以動態調整計算過程中的并行度,可以幫助大幅提升集群的資源利用率。另外,自適應執行可以獲取到多輪stage之間更完整的統計信息,下一步我們也考慮將對應數據及Strategy接口開放給Baidu Spark平臺上層用戶,針對特殊作業進行進一步的定制化Strategy策略編寫。
總結
隨著Spark SQL廣泛的使用以及業務規模的不斷增長,在大規模數據集上遇到的易用性和性能方面的挑戰將日益明顯。本文討論了三個典型的問題,包括調整shuffle partition數量,選擇最佳執行計劃和數據傾斜。這些問題在現有的框架下并不容易解決,而自適應執行可以很好地應對這些問題。我們介紹了自適應執行的基本架構以及解決這些問題的具體方法。最后我們在TPC-DS 100TB數據集上驗證了自適應執行的優勢,相比較原版Spark SQL,103個SQL查詢中,90%的查詢都得到了明顯的性能提升,最大的提升達到3.8倍,并且原先失敗的5個查詢在自適應執行下也順利完成。我們在百度的Big SQL平臺也做了進一步的驗證,對于復雜的真實查詢可以達到2倍的性能提升。總之,自適應執行解決了Spark SQL在大數據規模上遇到的很多挑戰,并且很大程度上改善了Spark SQL的易用性和性能,提高了超大集群中多租戶多并發作業情況下集群的資源利用率。將來,我們考慮在自適應執行的框架之下,提供更多運行時可以優化的策略,并且將我們的工作貢獻回饋給社區,也希望有更多的朋友可以參與進來,將其進一步完善。