2014年的大數(shù)據(jù)領(lǐng)域,Apache Spark(以下簡稱Spark)無疑最受矚目。Spark,出自名門伯克利AMPLab之手,目前由商業(yè)公司Databricks保駕護航。自2014 年3月份躋身Apache頂級項目(TLP),Spark已然成為ASF最活躍的項目之一,得到了業(yè)內(nèi)廣泛的支持——2014年12月發(fā)布的Spark 1.2版本包含了來自172位Contributor貢獻的1000多個commits。而在2014一整年中,Spark共發(fā)布了大小9個版本(包含5 月底發(fā)布具有里程碑意義的1.0版本),其社區(qū)活躍度可見一斑。值得一提的是,2014年11月,Databricks基于AWS完成了一個 Daytona Gray類別的Sort Benchmark,并創(chuàng)造了該測試的新紀錄。本文將概括性地總結(jié)Spark在2014年的發(fā)展。
Spark 2014,星星之火已成燎原之勢
首先,Spark會議及相關(guān)交流。目前,世界范圍內(nèi)最權(quán)威的Spark領(lǐng)域會議無疑是Spark Summit,已于2013年與2014年連續(xù)成功舉辦兩屆,來自全球各地的工程師們與會分享了各自的Spark使用案例。鑒于目前Spark的火爆態(tài) 勢,Spark Summit將在2015年分Spark Summit East與Spark Summit West兩次舉行。著眼國內(nèi),首屆中國Spark技術(shù)峰會(Spark Summit China)于2014年4月在北京舉辦,據(jù)統(tǒng)計,全國各大互聯(lián)網(wǎng)公司幾乎都出席了會議。因此,大家可以期待下今年的Spark Summit China又會帶來怎樣的驚喜。除去這樣比較大型的會議,Spark Meetup也不定期地在全球各地舉行,截止本文寫作時,已有來自13個不同國家的33個城市舉辦過Spark Meetup,國內(nèi)目前已經(jīng)舉辦Spark Meetup的城市有四個,分別是北京、杭州、上海和深圳。除了線下交流,線上也會組織一些公開課,供那些不方便到線下交流的朋友參加。由此可以看 出,2014年關(guān)于Spark的交流活動非常頻繁,這對推動Spark發(fā)展是大有裨益的。
其次,在2014年,各大廠商相繼宣布與Databricks進行合作。其中,Cloudera早在2013年底即宣布將在其發(fā)行版中添加 Spark,而后又有更多的企業(yè)加入進來,如Datastax、MapR、Pivotal及Hortonworks等。由此可見,Spark已得到了眾多 大數(shù)據(jù)企業(yè)的認可,而這些企業(yè)也確實將自己的產(chǎn)品與Spark進行了緊密的集成。譬如Datastax將Cassandra與Spark進行了集成,使得 Spark可以操作Cassandra內(nèi)的數(shù)據(jù),又譬如ElasticSearch也和Spark進行了集成,更多這方面的動作可參考Spark Summit 2014中提到的相關(guān)內(nèi)容。
此外,Spark在2014年也吸引了更多企業(yè)的落地使用。國外比較知名的有Yahoo! 、eBay、Twitter、Amazon、SAP、Tableau及MicroStrategy等;同時,值得高興的是,在Spark落地實踐上,國內(nèi) 企業(yè)也不遑多讓,淘寶、騰訊、百度、小米、京東、唯品會、愛奇藝、搜狐、七牛、華為及亞信等知名企業(yè)都進行了生產(chǎn)環(huán)境使用,從而也促成了越來越多的華人工 程師為Spark提交代碼,特別是Spark SQL這個組件,甚至有一半左右的Contributor都是華人工程師。各大知名企業(yè)的使用,大幅度提升了整個業(yè)界使用Spark的興趣和信心,我們有 理由相信,在2015年,使用Spark的企業(yè)數(shù)量必會是井噴式的爆發(fā)。與此同時,已經(jīng)出現(xiàn)了一批基于Spark做應(yīng)用的創(chuàng)業(yè)公司,而其中有不少發(fā)展得相 當(dāng)不錯,如Adatao和TupleJump。
隨著市場上對Spark工程師需求的日益加強,Databricks也適時地推出了Spark開發(fā)者認證計劃,第一次線下測試已經(jīng)于2014年11 月在西班牙巴塞羅那舉行。截止到本文寫作時(2015年1月),Spark開發(fā)者認證還不支持線上測試,但線上測試平臺不久后就會上線。
基于Spark持續(xù)健康發(fā)展的生態(tài)系統(tǒng),越來越多的企業(yè)和機構(gòu)在Spark上面開發(fā)應(yīng)用和擴展庫。隨著這些庫的增長,Databricks在 2014年圣誕節(jié)前夕上線了一個類似pip的功能來跟蹤這些庫的網(wǎng)站:http://spark-packages.org,目前已經(jīng)有一些庫入駐 Spark Packages,其中有幾個相當(dāng)不錯,比如:dibbhatt/kafka-spark-consumer、spark-jobserver /spark-jobserver和mengxr/spark-als。
Spark 2014,解析眾人拾柴下的技術(shù)演進
如圖1所示,可以看出Spark包含了批處理、流處理、圖處理、機器學(xué)習(xí)、即席查詢與關(guān)系查詢等功能,這就意味著我們只需要一個框架就可以滿足各種 使用場景的需求。如果放在以前,我們可能需要為每個功能都準(zhǔn)備一套框架,譬如采用Hadoop MapReduce來做批處理和采用Storm來做流式處理,這樣做帶來的結(jié)果是我們必須分別針對兩套計算框架編寫不同的業(yè)務(wù)代碼,而編寫出的業(yè)務(wù)代碼也 幾乎無法重用;另一方面,為了使系統(tǒng)穩(wěn)定,我們還得額外投入人力去深入理解Hadoop MapReduce及Storm的原理,這將造成很大的人力開銷。當(dāng)采用Spark后,我們只需要去理解Spark即可,另一個吸引人的地方在于 Spark批處理與流計算的業(yè)務(wù)代碼幾乎可以完全重用,這也就意味著我們只需要編寫一份邏輯代碼就可以分別運行批處理與流計算。最后,Spark可以無縫 使用存儲在HDFS上的數(shù)據(jù),無需任何數(shù)據(jù)遷移動作。
圖1 Spark Stack
同時,由于現(xiàn)存系統(tǒng)必須要與以HDFS為代表的分布式文件系統(tǒng)進行數(shù)據(jù)共享和交換,由此造成的IO開銷大幅度地降低了計算效率;除此之外,反復(fù)的序 列化與反序列化也是不可忽略的開銷。鑒于此,Spark中抽象出了RDD的概念,并基于RDD定義了一系列豐富的算子,MapReduce只是其中一個非 常小的子集,與此同時,RDD也可以被緩存在內(nèi)存中,從而迭代計算可以充分地享受內(nèi)存計算所帶來的加速效果。與MapReduce基于進程的計算模型不一 樣,Spark基于的是多線程模型,這也意味著Spark的任務(wù)調(diào)度延遲可以控制在亞秒級,當(dāng)任務(wù)特別多的時候,這么做可以大幅度降低整體調(diào)度時間,并且 為基于macro batch的流式計算打下基礎(chǔ)。Spark的另一個特色是基于DAG的任務(wù)調(diào)度與優(yōu)化,Spark不需要像MapReduce一樣為每一步操作都去調(diào)度一 個作業(yè),相反,Spark豐富的算子可以更自然地以DAG形式表達運算。同時,在Spark中,每個stage內(nèi)部是有pipeline優(yōu)化的,所以即使 我們不使用內(nèi)存緩存數(shù)據(jù),Spark的執(zhí)行效率也要比Hadoop高。最后Spark基于RDD的lineage信息來容錯,由于RDD是不可變 的,Spark并不需要記錄中間狀態(tài),當(dāng)RDD的某些partition丟失時,Spark可以利用RDD的lineage信息來進行并行的恢復(fù),不過當(dāng) lineage較長時,還是推薦用戶適時checkpoint,從而減少恢復(fù)時間。
以下我們沿著2014年各主要版本的發(fā)布軌跡簡單總結(jié)下Spark及各個組件(Spark Streaming、MLlib、GraphX及Spark SQL)在新功能及穩(wěn)定性上做出的努力。
Spark 0.9.x
2014年2月初,Databricks發(fā)布了Spark的第一個版本0.9.0,這一版本帶來的最直接的變化是將Scala從2.9.x升級到了 2.10。由于Scala在那時并沒有做到二進制向下兼容,所以大家不得不使用Scala2.10重新編譯業(yè)務(wù)代碼,這也算是個插曲吧。
這個版本最大的貢獻應(yīng)該是加入了配置系統(tǒng),即SparkConf。在這之前,各種屬性參數(shù)都直接作為Master的參數(shù)傳進去,而有了 SparkConf后,Master就不需要管這些了,各種參數(shù)在SparkConf中配置完成后,將SparkConf傳給Master即可,這在測試 中是非常有用的。另外在提交任務(wù)時,允許把Driver程序放到集群中的某臺服務(wù)器上運行,以前只能放在集群外的服務(wù)器上運行。
Spark Streaming終于在這個版本“自信”地結(jié)束了alpha版本,并且加入了HA模式,現(xiàn)在大家知道,其實那時的HA并不能保證數(shù)據(jù)不丟失,這一點到 1.2的時候我們再談。在Spark Streaming跳出alpha的同時,新增加了alpha組件GraphX,GraphX是一個分布式圖計算框架,在這個版本中提供了一些標(biāo)準(zhǔn)算法, 如PageRank、connected components、 strongly connected components與triangle counting等等,但穩(wěn)定性還有待加強。MLlib在這個版本中增加了常用的樸素貝葉斯算法,不過更引人注意的是,MLlib終于也開始支持 Python API了(需要NumPy的支持)。
社區(qū)分別于4月份與7月份發(fā)布了兩個maintena-nce版本:0.9.1與0.9.2,修復(fù)了一些Bug,無新的feature加入,不過0.9.1倒是Spark成為Apache頂級項目后的第一個發(fā)布。
Spark 1.0.x
用“千呼萬喚始出來”形容Spark1.0一點都不為過,作為一個里程碑式的發(fā)布,Spark社區(qū)也是非常謹慎,在發(fā)布了多個RC版本后,終于在5 月底正式發(fā)布了1.0版本。這個版本有110多位Contributor,歷經(jīng)4個月的共同努力,而1.0版本也毫無懸念地成為了Spark誕生以來最大 的一次發(fā)布。作為1.x的開端版本,Spark社區(qū)也對API在以后所有1.x版本上的兼容性做了保證。另一方面,Spark 1.0的Java API開始支持Java 8的lambda表達式,這多少讓一些必須用Java來寫Spark程序的用戶得到了不小的便利。
萬眾矚目的Spark SQL終于在這個版本中亮相,盡管只是alpha版本,但全球各地的Spark用戶們已經(jīng)迫不及待開始嘗試,這一勢頭至今仍在延續(xù),Spark SQL現(xiàn)在是Spark中最活躍的組件,沒有之一。提到Spark SQL,不得不提Shark,Databricks在Spark Summit 2014上宣布Shark已經(jīng)完成了其學(xué)術(shù)使命,且Shark的整體設(shè)計架構(gòu)對Hive的依賴性太強,難以支持其長遠發(fā)展,所以決定終止Shark開發(fā), 全面轉(zhuǎn)向Spark SQL。Spark SQL支持以SQL的形式來操作結(jié)構(gòu)化數(shù)據(jù),并且也支持使用HiveContext來操作Hive中的數(shù)據(jù)。在這個方面,業(yè)內(nèi)對SQL on Hadoop的超強需求決定了Spark SQL必將長期處于快速發(fā)展的態(tài)勢。值得一提的是,Hive社區(qū)也推出了一個Hive on Spark的項目——將Hive的執(zhí)行引擎換成Spark。不過從目標(biāo)上看,Hive on Spark更注重于針對Hive徹底地向下兼容性,而Spark SQL更注重于Spark與其他組件的互操作和多元化數(shù)據(jù)處理。
MLlib方面也有一個較大的進步,1.0開始終于支持稀疏矩陣了,這對MLlib的使用者來說絕對是一個讓人歡欣鼓舞的特性。在算法方面,MLlib也增加了決策樹、SVD及PCA等。Spark Streaming與GraphX的性能在這個版本中都得到了增強。
此外,Spark提供了一個新的提交任務(wù)的工具,稱為spark-submit,無論是運行在Standalone模式,還是運行在YARN上,都可以使用這個工具提交任務(wù)。從這一點上說,Spark統(tǒng)一了提交任務(wù)的入口。
最后,社區(qū)在7月和8月份分別發(fā)布了1.0.1與1.0.2兩個maintenance版本。
Spark 1.1.x
Spark 1.1.0在9月如期而至。此版本加入了sort-based的shuffle實現(xiàn),之前hash-based的shuffle需要為每個reducer 都打開一個文件,導(dǎo)致的結(jié)果是大量的buffer開銷與低效的I/O,而最新sort-based的shuffle實現(xiàn)能很好地解決上述問題,當(dāng) shuffle數(shù)據(jù)量特別大的時候,sort-based的shuffle優(yōu)勢尤其明顯。需要指出的是,和MapReduce針對KV排序不一 樣,sort-based是按照partition序號進行排序的,在partition內(nèi)部并不排序。但是1.1中默認的shuffle方式還是基于 hash的,到1.2中才會把sort-based作為默認的shuffle方式。
Spark SQL在這個版本里加入了不少新特性。最值得關(guān)注的是加入了JDBC Server的功能,這意味著用戶可以只寫JDBC代碼就可以享受Spark SQL的各種功能。
MLlib引入了一個用于完成抽樣、相關(guān)性、估計、測試等任務(wù)的統(tǒng)計庫。之前呼聲很高的特征抽取工具Word2Vec和TF-IDF也被加進了此版 本。除了增加一些新的算法之外,MLlib性能在這一版本中得也到了較大的提升。比起MLlib,GraphX在這一版并無特別大的改變。
Spark Streaming在這一版本的數(shù)據(jù)源中加入了對Amazon Kinesis的支持,只不過國內(nèi)用戶對這個數(shù)據(jù)源支持的興趣不是很大,對于國外用戶的意義更多一些。不過在這個版本中,Spark Streaming改變了從Flume取得數(shù)據(jù)的方式,之前是Flume push數(shù)據(jù)到executor/worker中,但在這種模式下,當(dāng)executor/worker掛掉后,F(xiàn)lume便無法再正常地push數(shù)據(jù)。所 以現(xiàn)在把push改成了pull,這意味著即使某個receiver掛掉后,也能保證在其他worker上新啟動的receiver也能繼續(xù)正常地接收數(shù) 據(jù)。另一個重要的改進是加入了限流的功能,譬如之前Spark Streaming在讀取Kafka中topic數(shù)據(jù)時經(jīng)常會發(fā)生OOM,而加入限流后,OOM基本不再發(fā)生。Spark Streaming與MLlib的結(jié)合是另一個不得不提的全新特性,利用Streaming的實時性在線訓(xùn)練模型,但當(dāng)下只是一個比較初級的實現(xiàn)。
在11月底發(fā)布的maintenance版本1.1.1中修復(fù)了一個較大的問題,之前在使用外部數(shù)據(jù)結(jié)構(gòu)時(ExternalAppendOnlyMap與ExternalSorter)會產(chǎn)生大量非常小的中間文件,這不但會造成“too many open files”的異常,也會極大地影響性能,1.1.1版本對其進行了修復(fù)。
Spark 1.2.0
12月中旬發(fā)布了1.2,不得不說Spark社區(qū)在控制發(fā)布進度工作上做得很贊。在此版本中,首當(dāng)其沖的就是把sort-based shuffle設(shè)置成了默認的shuffle策略。另一方面,在數(shù)據(jù)傳輸量非常大的情況下,connection manager終于換成Netty-based的實現(xiàn)了,以前的實現(xiàn)非常慢的原因是每次都要從磁盤讀到內(nèi)核態(tài),再到用戶態(tài),再回到內(nèi)核態(tài)進入網(wǎng)卡,現(xiàn)在用zero-copy來實現(xiàn),效率高了很多。
對于Spark Streaming說,終于也算是個小小的里程碑,開始支持fully H/A模式。以前當(dāng)driver掛掉的時候,可能會丟失掉一小部分數(shù)據(jù)。現(xiàn)在加上了一層WAL(Write Ahead Log),每次receiver收到數(shù)據(jù)后都會存在HDFS上,這樣即使driver掛掉,當(dāng)它重啟起來后,還是可以接著處理。同時大家也需要注意 unreliable receivers和reliable receivers的區(qū)別,只有用戶使用reliable receivers才能保證數(shù)據(jù)零丟失。
MLlib最大變動是引入了新的pipeline API,可以更加便捷地搭建機器學(xué)習(xí)相關(guān)的全套流水線,其中還包括了以Spark SQL SchemaRDD為基礎(chǔ)的dataset API。
GraphX結(jié)束alpha正式發(fā)布,同時提供了stable API,這意味著用戶不需要擔(dān)心現(xiàn)有代碼以后會因API的變化而改動了。此外,新的核心API aggregateMessages也替代掉了mapReduceTriplet,大家要注意這個變動。
Spark SQL最重要的特性毫無疑問應(yīng)該屬于external data source,此API讓開發(fā)者可以更容易地開發(fā)出對接外部數(shù)據(jù)源的spark connector,統(tǒng)一用SQL操作所有數(shù)據(jù)源,同時也可以push predicates to data source,譬如你要從HBase取數(shù)據(jù)后做一些篩選,一般我們需要把數(shù)據(jù)從HBase全取出來后在Spark引擎中篩選,現(xiàn)在可以把這個步驟推到 data source端,讓用戶在取數(shù)據(jù)的時候就可以篩選。另一個值得一提的是現(xiàn)在cacheTable和原生的cache已經(jīng)統(tǒng)一了語義,并且性能和穩(wěn)定性也有 顯著提升,不但內(nèi)存表支持predicates pushdown,可以基于統(tǒng)計信息跳過批量數(shù)據(jù),而且建內(nèi)存buffer時分段建立,因此在cache較大的表時也不再會OOM。
由于篇幅原因,以上我們簡單總結(jié)了Spark在2014年的各個版本中比較重要的特性,但有一個功能的增強始終貫穿其中——YARN,由于目前很多 公司都把不同的計算框架跑在YARN上,所以Spark對YARN的支持肯定會越來越好,事實上Spark確實在這方面做了很多工作。
結(jié)語
2014年對Spark是非常重要的一年,不僅因為發(fā)布了里程碑式的1.0版本,更重要的是通過整個社區(qū)的努力,Spark變得越來越穩(wěn)定與高效, 也正在被越來越多的企業(yè)采用。在2015年,隨著社區(qū)不斷的努力,相信Spark一定會達到一個新的高度,在更多的企業(yè)中扮演更重要的角色。