2014年的大數據領域,Apache Spark(以下簡稱Spark)無疑最受矚目。Spark,出自名門伯克利AMPLab之手,目前由商業公司Databricks保駕護航。自2014年3月份躋身Apache頂級項目(TLP),Spark已然成為ASF最活躍的項目之一,得到了業內廣泛的支持——2014年12月發布的Spark 1.2版本包含了來自172位Contributor貢獻的1000多個commits。而在2014一整年中,Spark共發布了大小9個版本(包含5月底發布具有里程碑意義的1.0版本),其社區活躍度可見一斑。值得一提的是,2014年11月,Databricks基于AWS完成了一個Daytona Gray類別的Sort Benchmark,并創造了該測試的新紀錄。本文將概括性地總結Spark在2014年的發展。
Spark 2014,星星之火已成燎原之勢
首先,Spark會議及相關交流。目前,世界范圍內最權威的Spark領域會議無疑是Spark Summit,已于2013年與2014年連續成功舉辦兩屆,來自全球各地的工程師們與會分享了各自的Spark使用案例。鑒于目前Spark的火爆態勢,Spark Summit將在2015年分Spark Summit East與Spark Summit West兩次舉行。著眼國內,首屆中國Spark技術峰會(Spark Summit China)于2014年4月在北京舉辦,據統計,全國各大互聯網公司幾乎都出席了會議。因此,大家可以期待下今年的Spark Summit China又會帶來怎樣的驚喜。除去這樣比較大型的會議,Spark Meetup也不定期地在全球各地舉行,截止本文寫作時,已有來自13個不同國家的33個城市舉辦過Spark Meetup,國內目前已經舉辦Spark Meetup的城市有四個,分別是北京、杭州、上海和深圳。除了線下交流,線上也會組織一些公開課,供那些不方便到線下交流的朋友參加。由此可以看出,2014年關于Spark的交流活動非常頻繁,這對推動Spark發展是大有裨益的。
其次,在2014年,各大廠商相繼宣布與Databricks進行合作。其中,Cloudera早在2013年底即宣布將在其發行版中添加Spark,而后又有更多的企業加入進來,如Datastax、MapR、Pivotal及Hortonworks等。由此可見,Spark已得到了眾多大數據企業的認可,而這些企業也確實將自己的產品與Spark進行了緊密的集成。譬如Datastax將Cassandra與Spark進行了集成,使得Spark可以操作Cassandra內的數據,又譬如ElasticSearch也和Spark進行了集成,更多這方面的動作可參考Spark Summit 2014中提到的相關內容。
此外,Spark在2014年也吸引了更多企業的落地使用。國外比較知名的有Yahoo! 、eBay、Twitter、Amazon、SAP、Tableau及MicroStrategy等;同時,值得高興的是,在Spark落地實踐上,國內企業也不遑多讓,淘寶、騰訊、百度、小米、京東、唯品會、愛奇藝、搜狐、七牛、華為及亞信等知名企業都進行了生產環境使用,從而也促成了越來越多的華人工程師為Spark提交代碼,特別是Spark SQL這個組件,甚至有一半左右的Contributor都是華人工程師。各大知名企業的使用,大幅度提升了整個業界使用Spark的興趣和信心,我們有理由相信,在2015年,使用Spark的企業數量必會是井噴式的爆發。與此同時,已經出現了一批基于Spark做應用的創業公司,而其中有不少發展得相當不錯,如Adatao和TupleJump。
隨著市場上對Spark工程師需求的日益加強,Databricks也適時地推出了Spark開發者認證計劃,第一次線下測試已經于2014年11月在西班牙巴塞羅那舉行。截止到本文寫作時(2015年1月),Spark開發者認證還不支持線上測試,但線上測試平臺不久后就會上線。
基于Spark持續健康發展的生態系統,越來越多的企業和機構在Spark上面開發應用和擴展庫。隨著這些庫的增長,Databricks在2014年圣誕節前夕上線了一個類似pip的功能來跟蹤這些庫的網站:http://spark-packages.org,目前已經有一些庫入駐Spark Packages,其中有幾個相當不錯,比如:dibbhatt/kafka-spark-consumer、spark-jobserver/spark-jobserver和mengxr/spark-als。
Spark 2014,解析眾人拾柴下的技術演進
如圖1所示,可以看出Spark包含了批處理、流處理、圖處理、機器學習、即席查詢與關系查詢等功能,這就意味著我們只需要一個框架就可以滿足各種使用場景的需求。如果放在以前,我們可能需要為每個功能都準備一套框架,譬如采用Hadoop MapReduce來做批處理和采用Storm來做流式處理,這樣做帶來的結果是我們必須分別針對兩套計算框架編寫不同的業務代碼,而編寫出的業務代碼也幾乎無法重用;另一方面,為了使系統穩定,我們還得額外投入人力去深入理解Hadoop MapReduce及Storm的原理,這將造成很大的人力開銷。當采用Spark后,我們只需要去理解Spark即可,另一個吸引人的地方在于Spark批處理與流計算的業務代碼幾乎可以完全重用,這也就意味著我們只需要編寫一份邏輯代碼就可以分別運行批處理與流計算。最后,Spark可以無縫使用存儲在HDFS上的數據,無需任何數據遷移動作。
圖1 Spark Stack
同時,由于現存系統必須要與以HDFS為代表的分布式文件系統進行數據共享和交換,由此造成的IO開銷大幅度地降低了計算效率;除此之外,反復的序列化與反序列化也是不可忽略的開銷。鑒于此,Spark中抽象出了RDD的概念,并基于RDD定義了一系列豐富的算子,MapReduce只是其中一個非常小的子集,與此同時,RDD也可以被緩存在內存中,從而迭代計算可以充分地享受內存計算所帶來的加速效果。與MapReduce基于進程的計算模型不一樣,Spark基于的是多線程模型,這也意味著Spark的任務調度延遲可以控制在亞秒級,當任務特別多的時候,這么做可以大幅度降低整體調度時間,并且為基于macro batch的流式計算打下基礎。Spark的另一個特色是基于DAG的任務調度與優化,Spark不需要像MapReduce一樣為每一步操作都去調度一個作業,相反,Spark豐富的算子可以更自然地以DAG形式表達運算。同時,在Spark中,每個stage內部是有pipeline優化的,所以即使我們不使用內存緩存數據,Spark的執行效率也要比Hadoop高。最后Spark基于RDD的lineage信息來容錯,由于RDD是不可變的,Spark并不需要記錄中間狀態,當RDD的某些partition丟失時,Spark可以利用RDD的lineage信息來進行并行的恢復,不過當lineage較長時,還是推薦用戶適時checkpoint,從而減少恢復時間。
以下我們沿著2014年各主要版本的發布軌跡簡單總結下Spark及各個組件(Spark Streaming、MLlib、GraphX及Spark SQL)在新功能及穩定性上做出的努力。
Spark 0.9.x
2014年2月初,Databricks發布了Spark的第一個版本0.9.0,這一版本帶來的最直接的變化是將Scala從2.9.x升級到了2.10。由于Scala在那時并沒有做到二進制向下兼容,所以大家不得不使用Scala2.10重新編譯業務代碼,這也算是個插曲吧。
這個版本最大的貢獻應該是加入了配置系統,即SparkConf。在這之前,各種屬性參數都直接作為Master的參數傳進去,而有了SparkConf后,Master就不需要管這些了,各種參數在SparkConf中配置完成后,將SparkConf傳給Master即可,這在測試中是非常有用的。另外在提交任務時,允許把Driver程序放到集群中的某臺服務器上運行,以前只能放在集群外的服務器上運行。
Spark Streaming終于在這個版本“自信”地結束了alpha版本,并且加入了HA模式,現在大家知道,其實那時的HA并不能保證數據不丟失,這一點到1.2的時候我們再談。在Spark Streaming跳出alpha的同時,新增加了alpha組件GraphX,GraphX是一個分布式圖計算框架,在這個版本中提供了一些標準算法,如PageRank、connected components、 strongly connected components與triangle counting等等,但穩定性還有待加強。MLlib在這個版本中增加了常用的樸素貝葉斯算法,不過更引人注意的是,MLlib終于也開始支持Python API了(需要NumPy的支持)。
社區分別于4月份與7月份發布了兩個maintena-nce版本:0.9.1與0.9.2,修復了一些Bug,無新的feature加入,不過0.9.1倒是Spark成為Apache頂級項目后的第一個發布。
Spark 1.0.x
用“千呼萬喚始出來”形容Spark1.0一點都不為過,作為一個里程碑式的發布,Spark社區也是非常謹慎,在發布了多個RC版本后,終于在5月底正式發布了1.0版本。這個版本有110多位Contributor,歷經4個月的共同努力,而1.0版本也毫無懸念地成為了Spark誕生以來最大的一次發布。作為1.x的開端版本,Spark社區也對API在以后所有1.x版本上的兼容性做了保證。另一方面,Spark 1.0的Java API開始支持Java 8的lambda表達式,這多少讓一些必須用Java來寫Spark程序的用戶得到了不小的便利。
萬眾矚目的Spark SQL終于在這個版本中亮相,盡管只是alpha版本,但全球各地的Spark用戶們已經迫不及待開始嘗試,這一勢頭至今仍在延續,Spark SQL現在是Spark中最活躍的組件,沒有之一。提到Spark SQL,不得不提Shark,Databricks在Spark Summit 2014上宣布Shark已經完成了其學術使命,且Shark的整體設計架構對Hive的依賴性太強,難以支持其長遠發展,所以決定終止Shark開發,全面轉向Spark SQL。Spark SQL支持以SQL的形式來操作結構化數據,并且也支持使用HiveContext來操作Hive中的數據。在這個方面,業內對SQL on Hadoop的超強需求決定了Spark SQL必將長期處于快速發展的態勢。值得一提的是,Hive社區也推出了一個Hive on Spark的項目——將Hive的執行引擎換成Spark。不過從目標上看,Hive on Spark更注重于針對Hive徹底地向下兼容性,而Spark SQL更注重于Spark與其他組件的互操作和多元化數據處理。
MLlib方面也有一個較大的進步,1.0開始終于支持稀疏矩陣了,這對MLlib的使用者來說絕對是一個讓人歡欣鼓舞的特性。在算法方面,MLlib也增加了決策樹、SVD及PCA等。Spark Streaming與GraphX的性能在這個版本中都得到了增強。
此外,Spark提供了一個新的提交任務的工具,稱為spark-submit,無論是運行在Standalone模式,還是運行在YARN上,都可以使用這個工具提交任務。從這一點上說,Spark統一了提交任務的入口。
最后,社區在7月和8月份分別發布了1.0.1與1.0.2兩個maintenance版本。
Spark 1.1.x
Spark 1.1.0在9月如期而至。此版本加入了sort-based的shuffle實現,之前hash-based的shuffle需要為每個reducer都打開一個文件,導致的結果是大量的buffer開銷與低效的I/O,而最新sort-based的shuffle實現能很好地解決上述問題,當shuffle數據量特別大的時候,sort-based的shuffle優勢尤其明顯。需要指出的是,和MapReduce針對KV排序不一樣,sort-based是按照partition序號進行排序的,在partition內部并不排序。但是1.1中默認的shuffle方式還是基于hash的,到1.2中才會把sort-based作為默認的shuffle方式。
Spark SQL在這個版本里加入了不少新特性。最值得關注的是加入了JDBC Server的功能,這意味著用戶可以只寫JDBC代碼就可以享受Spark SQL的各種功能。
MLlib引入了一個用于完成抽樣、相關性、估計、測試等任務的統計庫。之前呼聲很高的特征抽取工具Word2Vec和TF-IDF也被加進了此版本。除了增加一些新的算法之外,MLlib性能在這一版本中得也到了較大的提升。比起MLlib,GraphX在這一版并無特別大的改變。
Spark Streaming在這一版本的數據源中加入了對Amazon Kinesis的支持,只不過國內用戶對這個數據源支持的興趣不是很大,對于國外用戶的意義更多一些。不過在這個版本中,Spark Streaming改變了從Flume取得數據的方式,之前是Flume push數據到executor/worker中,但在這種模式下,當executor/worker掛掉后,Flume便無法再正常地push數據。所以現在把push改成了pull,這意味著即使某個receiver掛掉后,也能保證在其他worker上新啟動的receiver也能繼續正常地接收數據。另一個重要的改進是加入了限流的功能,譬如之前Spark Streaming在讀取Kafka中topic數據時經常會發生OOM,而加入限流后,OOM基本不再發生。Spark Streaming與MLlib的結合是另一個不得不提的全新特性,利用Streaming的實時性在線訓練模型,但當下只是一個比較初級的實現。
在11月底發布的maintenance版本1.1.1中修復了一個較大的問題,之前在使用外部數據結構時(ExternalAppendOnlyMap與ExternalSorter)會產生大量非常小的中間文件,這不但會造成“too many open files”的異常,也會極大地影響性能,1.1.1版本對其進行了修復。
Spark 1.2.0
12月中旬發布了1.2,不得不說Spark社區在控制發布進度工作上做得很贊。在此版本中,首當其沖的就是把sort-based shuffle設置成了默認的shuffle策略。另一方面,在數據傳輸量非常大的情況下,connection manager終于換成Netty-based的實現了,以前的實現非常慢的原因是每次都要從磁盤讀到內核態,再到用戶態,再回到內核態進入網卡,現在用zero-copy來實現,效率高了很多。
對于Spark Streaming說,終于也算是個小小的里程碑,開始支持fully H/A模式。以前當driver掛掉的時候,可能會丟失掉一小部分數據?,F在加上了一層WAL(Write Ahead Log),每次receiver收到數據后都會存在HDFS上,這樣即使driver掛掉,當它重啟起來后,還是可以接著處理。同時大家也需要注意 unreliable receivers和reliable receivers的區別,只有用戶使用reliable receivers才能保證數據零丟失。
MLlib最大變動是引入了新的pipeline API,可以更加便捷地搭建機器學習相關的全套流水線,其中還包括了以Spark SQL SchemaRDD為基礎的dataset API。
GraphX結束alpha正式發布,同時提供了stable API,這意味著用戶不需要擔心現有代碼以后會因API的變化而改動了。此外,新的核心API aggregateMessages也替代掉了mapReduceTriplet,大家要注意這個變動。
Spark SQL最重要的特性毫無疑問應該屬于external data source,此API讓開發者可以更容易地開發出對接外部數據源的spark connector,統一用SQL操作所有數據源,同時也可以push predicates to data source,譬如你要從HBase取數據后做一些篩選,一般我們需要把數據從HBase全取出來后在Spark引擎中篩選,現在可以把這個步驟推到data source端,讓用戶在取數據的時候就可以篩選。另一個值得一提的是現在cacheTable和原生的cache已經統一了語義,并且性能和穩定性也有顯著提升,不但內存表支持predicates pushdown,可以基于統計信息跳過批量數據,而且建內存buffer時分段建立,因此在cache較大的表時也不再會OOM。
由于篇幅原因,以上我們簡單總結了Spark在2014年的各個版本中比較重要的特性,但有一個功能的增強始終貫穿其中——YARN,由于目前很多公司都把不同的計算框架跑在YARN上,所以Spark對YARN的支持肯定會越來越好,事實上Spark確實在這方面做了很多工作。
結語
2014年對Spark是非常重要的一年,不僅因為發布了里程碑式的1.0版本,更重要的是通過整個社區的努力,Spark變得越來越穩定與高效,也正在被越來越多的企業采用。在2015年,隨著社區不斷的努力,相信Spark一定會達到一個新的高度,在更多的企業中扮演更重要的角色。
感謝來自Databricks公司的Reynold Xin和連城給本文review,并提供寶貴建議。