當下,Spark已經在國內得到了廣泛的認可和支持:2014年,Spark Summit China在北京召開,場面火爆;同年,Spark Meetup在北京、上海、深圳和杭州四個城市舉辦,其中僅北京就成功舉辦了5次,內容更涵蓋Spark Core、Spark Streaming、Spark MLlib、Spark SQL等眾多領域。而作為較早關注和引入Spark的移動互聯網大數據綜合服務公司,TalkingData也積極地參與到國內Spark社區的各種活動,并多次在Meetup中分享公司的Spark使用經驗。本文則主要介紹TalkingData在大數據平臺建設過程中,逐漸引入Spark,并且以Hadoop YARN和Spark為基礎來構建移動大數據平臺的過程。
初識Spark
作為一家在移動互聯網大數據領域創業的公司,時刻關注大數據技術領域的發展和進步是公司技術團隊必做的功課。而在整理Strata 2013公開的講義時,一篇主題為《An Introduction on the Berkeley Data Analytics Stack_BDAS_Featuring Spark,Spark Streaming,and Shark》的教程引起了整個技術團隊的關注和討論,其中Spark基于內存的RDD模型、對機器學習算法的支持、整個技術棧中實時處理和離線處理的統一模型以及Shark都讓人眼前一亮。同時期我們關注的還有Impala,但對比Spark,Impala可以理解為對Hive的升級,而Spark則嘗試圍繞RDD建立一個用于大數據處理的生態系統。對于一家數據量高速增長,業務又是以大數據處理為核心并且在不斷變化的創業公司而言,后者無疑更值得進一步關注和研究。
Spark初探
2013年中期,隨著業務高速發展,越來越多的移動設備側數據被各個不同的業務平臺收集。那么這些數據除了提供不同業務所需要的業務指標,是否還蘊藏著更多的價值?為了更好地挖掘數據潛在價值,我們決定建造自己的數據中心,將各業務平臺的數據匯集到一起,對覆蓋設備的相關數據進行加工、分析和挖掘,從而探索數據的價值。初期數據中心主要功能設置如下所示:
1. 跨市場聚合的安卓應用排名;
2. 基于用戶興趣的應用推薦。
基于當時的技術掌握程度和功能需求,數據中心所采用的技術架構如圖1。
整個系統構建基于Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大數據計算架構。通過日志匯集程序,將不同業務平臺的日志匯集到數據中心,并通過ETL將數據進行格式化處理,儲存到HDFS。其中,排名和推薦算法的實現都采用了MapReduce,系統中只存在離線批量計算,并通過基于Azkaban的調度系統進行離線任務的調度。
第一個版本的數據中心架構基本上是以滿足“最基本的數據利用”這一目的進行設計的。然而,隨著對數據價值探索得逐漸加深,越來越多的實時分析需求被提出。與此同時,更多的機器學習算法也亟需添加,以便支持不同的數據挖掘需求。對于實時數據分析,顯然不能通過“對每個分析需求單獨開發MapReduce任務”來完成,因此引入Hive 是一個簡單而直接的選擇。鑒于傳統的MapReduce模型并不能很好地支持迭代計算,我們需要一個更好的并行計算框架來支持機器學習算法。而這些正是我們一直在密切關注的Spark所擅長的領域——憑借其對迭代計算的友好支持,Spark理所當然地成為了不二之選。2013年9月底,隨著Spark 0.8.0發布,我們決定對最初的架構進行演進,引入Hive作為即時查詢的基礎,同時引入Spark計算框架來支持機器學習類型的計算,并且驗證Spark這個新的計算框架是否能夠全面替代傳統的以MapReduce為基礎的計算框架。圖2為整個系統的架構演變。
在這個架構中,我們將Spark 0.8.1部署在YARN上,通過分Queue,來隔離基于Spark的機器學習任務,計算排名的日常MapReduce任務和基于Hive的即時分析任務。
想要引入Spark,第一步需要做的就是要取得支持我們Hadoop環境的Spark包。我們的Hadoop環境是Cloudera發布的CDH 4.3,默認的Spark發布包并不包含支持CDH 4.3的版本,因此只能自己編譯。Spark官方文檔推薦用Maven進行編譯,可是編譯卻不如想象中順利。各種包依賴由于眾所周知的原因,不能順利地從某些依賴中心庫下載。于是我們采取了最簡單直接的繞開辦法,利用AWS云主機進行編譯。需要注意的是,編譯前一定要遵循文檔的建議,設置:
否則,編譯過程中就會遇到內存溢出的問題。針對CDH 4.3,mvn build的參數為:
在編譯成功所需要的Spark包后,部署和在Hadoop環境中運行Spark則是非常簡單的事情。將編譯好的Spark目錄打包壓縮后,在可以運行Hadoop Client的機器上解壓縮,就可以運行Spark了。想要驗證Spark是否能夠正常在目標Hadoop環境上運行,可以參照Spark的官方文檔,運行example中的SparkPi來驗證:
完成Spark部署之后,剩下的就是開發基于Spark的程序了。雖然Spark支持Java、Python,但最合適開發Spark程序的語言還是Scala。經過一段時間的摸索實踐,我們掌握了Scala語言的函數式編程語言特點后,終于體會了利用Scala開發Spark應用的巨大好處。同樣的功能,用MapReduce幾百行才能實現的計算,在Spark中,Scala通過短短的數十行代碼就能完成。而在運行時,同樣的計算功能,Spark上執行則比MapReduce有數十倍的提高。對于需要迭代的機器學習算法來講,Spark的RDD模型相比MapReduce的優勢則更是明顯,更何況還有基本的MLlib的支持。經過幾個月的實踐,數據挖掘相關工作被完全遷移到Spark,并且在Spark上實現了適合我們數據集的更高效的LR等等算法。
全面擁抱Spark
進入2014年,公司的業務有了長足的發展,對比數據中心平臺建立時,每日處理的數據量亦翻了幾番。每日的排名計算所花的時間越來越長,而基于Hive的即時計算只能支持日尺度的計算,如果到周這個尺度,計算所花的時間已經很難忍受,到月這個尺度則基本上沒辦法完成計算。基于在Spark上的認知和積累,是時候將整個數據中心遷移到Spark上了。
2014年4月,Spark Summit China在北京舉行。抱著學習的目的,我們技術團隊也參加了在中國舉行的這一次Spark盛會。通過這次盛會,我們了解到國內的很多同行已經開始采用Spark來建造自己的大數據平臺,而Spark也變成了在ASF中最為活躍的項目之一。另外,越來越多的大數據相關的產品也逐漸在和Spark相融合或者在向Spark遷移。Spark無疑將會變為一個相比Hadoop MapReduce更好的生態系統。通過這次大會,我們更加堅定了全面擁抱Spark的決心。
基于YARN和Spark,我們開始重新架構數據中心依賴的大數據平臺。整個新的數據平臺應該能夠承載:
1. 準實時的數據匯集和ETL;
2. 支持流式的數據加工;
3. 更高效的離線計算能力;
4. 高速的多維分析能力;
5. 更高效的即時分析能力;
6. 高效的機器學習能力;
7. 統一的數據訪問接口;
8. 統一的數據視圖;
9. 靈活的任務調度.
整個新的架構充分地利用YARN和Spark,并且融合公司的一些技術積累,架構如圖3所示。
在新的架構中,引入了Kafka作為日志匯集的通道。幾個業務系統收集的移動設備側的日志,實時地寫入到Kafka 中,從而方便后續的數據消費。
利用Spark Streaming,可以方便地對Kafka中的數據進行消費處理。在整個架構中,Spark Streaming主要完成了以下工作。
1. 原始日志的保存。將Kafka中的原始日志以JSON格式無損的保存在HDFS中。
2. 數據清洗和轉換,清洗和標準化之后,轉變為Parquet格式,存儲在HDFS中,方便后續的各種數據計算任務。
3. 定義好的流式計算任務,比如基于頻次規則的標簽加工等等,計算結果直接存儲在MongoDB中。
排名計算任務則在Spark上做了重新實現,借力Spark帶來的性能提高,以及Parquet列式存儲帶來的高效數據訪問。同樣的計算任務,在數據量提高到原來3倍的情況下,時間開銷只有原來的1/6。
同時,在利用Spark和Parquet列式存儲帶來的性能提升之外,曾經很難滿足業務需求的即時多維度數據分析終于成為了可能。曾經利用Hive需要小時級別才能完成日尺度的多維度即時分析,在新架構上,只需要2分鐘就能夠順利完成。而周尺度上也不過十分鐘就能夠算出結果。曾經在Hive上無法完成的月尺度多維度分析計算,則在兩個小時內也可以算出結果。另外Spark SQL的逐漸完善也降低了開發的難度。
利用YARN提供的資源管理能力,用于多維度分析,自主研發的Bitmap引擎也被遷移到了YARN上。對于已經確定好的維度,可以預先創建Bitmap索引。而多維度的分析,如果所需要的維度已經預先建立了Bitmap索引,則通過Bitmap引擎由Bitmap計算來實現,從而可以提供實時的多維度的分析能力。
在新的架構中,為了更方便地管理數據,我們引入了基于HCatalog的元數據管理系統,數據的定義、存儲、訪問都通過元數據管理系統,從而實現了數據的統一視圖,方便了數據資產的管理。
YARN只提供了資源的調度能力,在一個大數據平臺,分布式的任務調度系統同樣不可或缺。在新的架構中,我們自行開發了一個支持DAG的分布式任務調度系統,結合YARN提供的資源調度能力,從而實現定時任務、即時任務以及不同任務構成的pipeline。
基于圍繞YARN和Spark的新的架構,一個針對數據業務部門的自服務大數據平臺得以實現,數據業務部門可以方便地利用這個平臺對進行多維度的分析、數據的抽取,以及進行自定義的標簽加工。自服務系統提高了數據利用的能力,同時也大大提高了數據利用的效率。
使用Spark遇到的一些坑
任何新技術的引入都會歷經陌生到熟悉,從最初新技術帶來的驚喜,到后來遇到困難時的一籌莫展和惆悵,再到問題解決后的愉悅,大數據新貴Spark同樣不能免俗。下面就列舉一些我們遇到的坑。
【坑一:跑很大的數據集的時候,會遇到org.apache.spark.SparkException: Error communicating with MapOutputTracker】
這個錯誤報得很隱晦,從錯誤日志看,是Spark集群partition了,但如果觀察物理機器的運行情況,會發現磁盤I/O非常高。進一步分析會發現原因是Spark在處理大數據集時的shuffle過程中生成了太多的臨時文件,造成了操作系統磁盤I/O負載過大。找到原因后,解決起來就很簡單了,設置spark.shuffle.consolidateFiles為true。這個參數在默認的設置中是false的,對于linux的ext4文件系統,建議大家還是默認設置為true吧。Spark官方文檔的描述也建議ext4文件系統設置為true來提高性能。
【坑二:運行時報Fetch failure錯】
在大數據集上,運行Spark程序,在很多情況下會遇到Fetch failure的錯。由于Spark本身設計是容錯的,大部分的Fetch failure會經過重試后通過,因此整個Spark任務會正常跑完,不過由于重試的影響,執行時間會顯著增長。造成Fetch failure的根本原因則不盡相同。從錯誤本身看,是由于任務不能從遠程的節點讀取shuffle的數據,具體原因則需要利用:
查看Spark的運行日志,從而找到造成Fetch failure的根本原因。其中大部分的問題都可以通過合理的參數配置以及對程序進行優化來解決。2014年Spark Summit China上陳超的那個專題,對于如何對Spark性能進行優化,有非常好的建議。
當然,在使用Spark過程中還遇到過其他不同的問題,不過由于Spark本身是開源的,通過源代碼的閱讀,以及借助開源社區的幫助,大部分問題都可以順利解決。
下一步的計劃
Spark在2014年取得了長足的發展,圍繞Spark的大數據生態系統也逐漸的完善。Spark 1.3引入了一個新的DataFrame API,這個新的DataFrame API將會使得Spark對于數據的處理更加友好。同樣出自于AMPLab的分布式緩存系統Tachyon因為其與Spark的良好集成也逐漸引起了人們的注意。鑒于在業務場景中,很多基礎數據是需要被多個不同的Spark任務重復使用,下一步,我們將會在架構中引入Tachyon來作為緩存層。另外,隨著SSD的日益普及,我們后續的計劃是在集群中每臺機器都引入SSD存儲,配置Spark的shuffle的輸出到SSD,利用SSD的高速隨機讀寫能力,進一步提高大數據處理效率。
在機器學習方面,H2O機器學習引擎也和Spark有了良好的集成從而產生了Sparkling-water。相信利用Sparking-water,作為一家創業公司,我們也可以利用深度學習的力量來進一步挖掘數據的價值。
結語
2004年,Google的MapReduce論文揭開了大數據處理的時代,Hadoop的MapReduce在過去接近10年的時間成了大數據處理的代名詞。而Matei Zaharia 2012年關于RDD的一篇論文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”則揭示了大數據處理技術一個新時代的到來。伴隨著新的硬件技術的發展、低延遲大數據處理的廣泛需求以及數據挖掘在大數據領域的日益普及,Spark作為一個嶄新的大數據生態系統,逐漸取代傳統的MapReduce而成為新一代大數據處理技術的熱門。我們過去兩年從MapReduce到Spark架構的演變過程,也基本上代表了相當一部分大數據領域從業者的技術演進的歷程。相信隨著Spark生態的日益完善,會有越來越多的企業將自己的數據處理遷移到Spark上來。而伴隨著越來越多的大數據工程師熟悉和了解Spark,國內的Spark社區也會越來越活躍,Spark作為一個開源的平臺,相信也會有越來越多的華人變成Spark相關項目的Contributor,Spark也會變得越來越成熟和強大。