數據科學家們早已熟悉的R和Pandas等傳統數據分析框架 雖然提供了直觀易用的API,卻局限于單機,無法覆蓋分布式大數據場景。在Spark 1.3.0以Spark SQL原有的SchemaRDD為藍本,引入了Spark DataFrame API,不僅為Scala、Python、Java三種語言環境提供了形如R和Pandas的API,而且自然而然地繼承了Spark SQL的分布式處理能力。此外,Spark 1.2.0中引入的外部數據源API也得到了進一步的完善,集成了完整的數據寫入支持,從而補全了Spark SQL多數據源互操作的最后一塊拼圖。借小數據分析之力,撼大數據分析之巨石;四兩撥千斤,不亦樂乎!
圖1:飛速增長中的Spark
Spark SQL是Spark的核心組件之一,于2014年4月隨Spark 1.0版一同面世。上圖左側展示了自去年4月份Spark 1.0發布至今開源貢獻者數量的增長情況,基本上呈現了一個線性增長的態勢。右側所展示的每月PR數量的增長情況也同樣迅猛。值得一提的是,在Spark 1.3當中,Spark SQL終于從alpha階段畢業,除了部分developer API以外,所有的公共API都已經穩定,可以放心使用了。
作為Shark的繼任者,Spark SQL的主要功能之一便是訪問現存的Hive數據。在與Hive進行集成的同時,Spark SQL也提供了JDBC/ODBC接口。Tableau、Qlik等第三方工具可以通過該接口接入Spark SQL,借助Spark進行數據處理。
然而,Spark SQL的應用并不局限于SQL。實際上“Spark SQL”這個名字并不恰當。根據Spark官方文檔的定義:Spark SQL是一個用于處理結構化數據的Spark組件——該定義強調的是“結構化數據”,而非“SQL”。新近發布的Spark 1.3更加完整的表達了Spark SQL的愿景:讓開發者用更精簡的代碼處理盡量少的數據,同時讓Spark SQL自動優化執行過程,以達到降低開發成本,提升數據分析執行效率的目的。為此,我們在Spark 1.3中引入了與R和Python Pandas接口類似的DataFrame API,延續了傳統單機數據分析的開發體驗,并將之推廣到了分布式大數據場景。
DataFrame
與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還掌握數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。由于與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。
圖2:DataFrame和 RDD的區別
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person`類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。了解了這些信息之后,Spark SQL的查詢優化器就可以進行針對性的優化。舉一個不太恰當的例子,其中的差別有些類似于動態類型的Python與靜態類型的C++之間的區別。后者由于在編譯期有詳盡的類型信息,編譯期就可以編譯出更加有針對性、更加優化的可執行代碼。
外部數據源API
然而對于用戶來說,只有一個結構化的數據抽象還是不夠的。數據往往會以各種各樣的格式存儲在各種各樣的系統之上,而用戶會希望方便地從不同的數據源獲取數據,進行混合處理,再將結果以特定的格式寫回數據源或直接予以某種形式的展現。Spark 1.2引入的外部數據源API正是為了解決這一問題而產生的。Spark SQL外部數據源API的一大優勢在于,可以將查詢中的各種信息下推至數據源處,從而充分利用數據源自身的優化能力來完成列剪枝、過濾條件下推等優化,實現減少IO、提高執行效率的目的。自1.2發布以來,社區內涌現出了多種多樣的外部數據源。下圖是Spark 1.3支持的各種數據源的一個概覽(左側是Spark SQL內置支持的數據源,右側為社區開發者貢獻的數據源)。在外部數據源API的幫助下,DataFrame實際上成為了各種數據格式和存儲系統進行數據交換的中間媒介:在Spark SQL內,來自各處的數據都被加載為DataFrame混合、統一成單一形態,再以之基礎進行數據分析和價值提取。
圖3:DataFrame支持 的各種外部數據源
Spark SQL助力大數據分析
精簡代碼
DataFrame帶來的最明 顯的優點之一就是幫助用戶進一步精簡代碼。下圖展示了分別用Hadoop MR、Python RDD API和Python DataFrame API實現同一業務邏輯的三段代碼片段。顯然Hadoop MR的代碼量最大,而且并不容易看明白業務邏輯到底是什么。Python RDD API的版本精簡了許多,但仍然不容易看出到底是在干什么。Python DataFrame API的版本相較Python RDD API的版本又更精進了一步;更重要的是,凡是略懂SQL的人,都可以一眼看出它在做什么——可見,taFrame API不僅可以令代碼更加精簡,而且顯著提升了可讀性。Spark 1.3提供了Python、Scala、Java三種語言的DataFrame API binding,供用戶按需選用。
圖4:Hadoop MR、Python RDD API、Python DataFrame API代碼示例
除此以外,Spark SQL還針對大數據處理中的一些常見場景和模式提供了一些便利的工具,使得用戶在處理不同項目中重復出現的模式時可以避免編寫重復或高度類似的代碼:
JSON schema自動推導
JSON 是一種可讀性良好的重要結構化數據格式,許多原始數據往往以JSON的形式存在。然而JSON數據的體積卻過于龐大,不利于批量數據分析。因此一個常見的數據處理步驟就是將JSON轉換為ORC、Parquet等高效的列式存儲格式。然而,不同版本的JSON數據往往具有不同的schema(例如新版本的Twitter API返回的數據可能比老版本的API返回的數據多出若干列)。人工合并整個JSON數據集所有記錄的schema是一件十分枯燥繁瑣的任務。Spark SQL在處理JSON數據時可以自動掃描整個數據集,得到所有記錄中出現的數據列的全集,推導出完整的schema。(對于同名但不同類型的列,Spark SQL會嘗試規約出一個公共類型。)
圖5:Spark對不規整JSON數據的處理
上圖展示了Spark SQL對三條不規整的個人信息JSON記錄進行整理和schema推導的過程。第2條記錄跟第1條記錄類似,但多出了一個age字段,第3條與前兩條也很類似,但是身高字段的類型是double而不是int。對此,Spark SQL的JSON數據源作出的處理是,將出現的所有列都納入最終的schema中,對于名稱相同但類型不同的列,取所有類型的公共父類型(例如int和 double的公共父類型為double)。通過這樣的處理,我們最終就得到了右下方的DataFrame。
Hive風格的分區表
Hive 的分區表可以認為是一種簡易索引。分區表的每一個分區的每一個分區列都對應于一級目錄,目錄以<列名>=<列值>的格式命名。Spark 1.3中的Parquet數據源實現了自動分區發現的功能:當數據以Hive分區表的目錄結構存在時,無須Hive metastore中的元數據,Spark SQL也可以自動將之識別為分區表。于是,在處理這張表時,分區剪枝等分區特有的優化也可以得以實施。
提升執行效率
利用DataFrame API,不僅代碼可以更加精簡,更重要的是,執行效率也可以得到提升。下圖對比了用Scala、Python的RDD API和DataFrame API實現的累加一千萬整數對的四段程序的性能對比。可以看到,Python DataFrame API相對于Python RDD API的執行效率有了近五倍的提升。這是因為在DataFrame API實際上僅僅組裝了一段體積小巧的邏輯查詢計劃,Python端只需將查詢計劃發送到JVM端即可,計算任務的大頭都由JVM端負責。在使用 Python RDD API時,Python VM和JVM之間需要進行大量的跨進程數據交換,從而拖慢了Python RDD API的速度。
值得注意的是,不僅Python API有了顯著的性能提升,即便是使用Scala,DataFrame API的版本也要比RDD API快一倍。上述示例的邏輯極為簡單,查詢優化器的作用不明顯,那么為什么會有加速效果呢?RDD API是函數式的,強調不變性,在大部分場景下傾向于創建新對象而不是修改老對象。這一特點雖然帶來了干凈整潔的API,卻也使得Spark應用程序在運行期傾向于創建大量臨時對象,對GC造成壓力。在現有RDD API的基礎之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內的數據創建方式,用復用可變對象的方式來減小對象分配和GC的開銷,但這犧牲了代碼的可讀性,而且要求開發者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark SQL在框架內部已經在各種可能的情況下盡量重用對象,這樣做雖然在內部會打破了不變性,但在將數據返回給用戶時,還會重新轉為不可變數據。利用 DataFrame API進行開發,可以免費地享受到這些優化效果。
減少數據讀取
分析大數據,最快的方法就是 ——忽略它。這里的“忽略”并不是熟視無睹,而是根據查詢條件進行恰當的剪枝。
上文討論分區表時提到的分區剪 枝便是其中一種——當查詢的過濾條件中涉及到分區列時,我們可以根據查詢條件剪掉肯定不包含目標數據的分區目錄,從而減少IO。
對于一些“智能”數據格 式,Spark SQL還可以根據數據文件中附帶的統計信息來進行剪枝。簡單來說,在這類數據格式中,數據是分段保存的,每段數據都帶有最大值、最小值、null值數量等 一些基本的統計信息。當統計信息表名某一數據段肯定不包括符合查詢條件的目標數據時,該數據段就可以直接跳過(例如某整數列a某段的最大值為100,而查詢條件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其余列的數據。
查詢優化
Spark SQL的第三個目標,就是讓查詢優化器幫助我們優化執行效率,解放開發者的生產力,讓新手也可以寫出高效的程序。
圖6:Spark SQL查詢優化引擎
DataFrame的背后是 Spark SQL的全套查詢優化引擎,其整體架構如上圖所示。通過SQL/HiveQl parser或是DataFrame API構造的邏輯執行計劃經過analyzer的分析之后再經優化得到優化執行計劃,接著再轉為物理執行計劃,并最終轉換為RDD DAG在Spark引擎上執行。
圖7:人口數據分析示例
為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之后又做了一次filter操作。如果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基于關系代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的數據源的特性將過濾條件下推只數據源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用于執行最終的讀取操作的表掃描節點內。
對于普通開發者而言,查詢優化 器的意義在于,即便是經驗并不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。
DataFrame As The New RDD
在Spark 1.3中,DataFrame已經開始替代RDD成為新的數據共享抽象。以下的Spark ML示例搭建了一整套由切詞、詞頻計算、邏輯回歸等多個環節組成的機器學習流水線。該流水線的輸入、各環節間的數據交換,以及流水線的輸出結果,都是以 DataFrame來表示的。
圖8:機器學習流水線
相對于RDD,DataFrame有幾個特點:
包含schema信息,能夠進行針對性的優化。
對用戶有更加 友好、更直觀的API。
與外部數據源 API緊密集成,可以用作多種存儲格式和存儲系統間的數據交換媒介。
作為一個比RDD更加高效的數 據共享抽象,DataFrame使得我們可以更加便捷地搭建一體化的大數據流水線。