在上一篇文章中,我們講了Spark大數據處理的可擴展性和負載均衡,今天要講的是更為重點的容錯處理,這涉及到Spark的應用場景和RDD的設計來源。
Spark的應用場景
Spark主要針對兩種場景:
機器學習,數據挖掘,圖應用中常用的迭代算法(每一次迭代對數據執行相似的函數)
交互式數據挖掘工具(用戶反復查詢一個數據子集)
Spark在spark-submit外,還提供了spark-shell,它就是專門用來做交互數據挖掘的工具
MapReduce等框架并不明確支持迭代中間結果/數據子集的共享,所以需要將數據輸出到磁盤,然后在每次查詢時重新加載,這帶來較大的開銷。
既然反復寫磁盤和從磁盤加載數據使得性能下降,那就把數據放到內存中,這就是Spark基于內存的彈性分布式數據集(RDD)的出發點。
自動容錯
MapReduce是容錯性非常好的系統。處理一步就放到磁盤,再處理一步又放到磁盤,一旦哪一步有問題,重做就好了,真可謂是一步一個腳印。Spark為了上述場景下的性能,把數據放在內存中,那整個系統的容錯就成了最困難的地方。
一般來說,分布式數據集的容錯性有兩種方式:即數據檢查點和記錄數據的更新。由于面向的是大規模數據分析,數據檢查點操作成本很高:需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源(在內存中復制數據可以減少需要緩存的數據量,而存儲到磁盤則會拖慢應用程序)。所以選擇記錄更新的方式。但是,如果更新太多,那么記錄更新成本也不低。因此,RDD只支持讀操作,并且只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列轉換記錄下來(即Lineage),以便恢復丟失的分區。
雖然只支持粗粒度轉換限制了編程模型,但是RDD仍然可以很好地適用于很多應用,特別是支持數據并行的批量分析應用,包括數據挖掘、機器學習、圖算法等,因為這些程序通常都會在很多記錄上執行相同的操作。
RDD抽象
RDD是只讀的、分區記錄的集合。RDD只能基于在穩定物理存儲中的數據集和其他已有的RDD上執行確定性操作來創建。這些確定性操作稱之為轉換,如map、filter、groupBy、join(轉換不是程開發人員在RDD上執行的操作)。
RDD含有如何從其他RDD計算出本RDD的相關信息(即Lineage),據此可以從物理存儲的數據計算出相應的RDD分區。
在需要反復使用的某個數據集時,使用RDD的持久化,即persist,這個持久化優先是放在內存中的。
再來看看WordCount
說了這么多,我們依然拿WordCount來說說,幫忙小伙伴們理解,還沒有看本系列前兩篇文章的童鞋抓緊去看看哈。
val file = "hdfs://127.0.0.1:9000/file.txt"
val lines = sc.textFile(file)
val words = lines.flatMap(line => line.split("\s+"))
val partialCountMap = words
.mapPartitions(convertWordsInPartitionToWordCountMap)
val wordCount = distCountMap.reduce(mergeMaps)
WordCount一共涉及到三個RDD,用于承載文本行的lines,用于承載單詞的words,用于承載每個文件塊上部分單詞計數的 partialCountMap。Lineage關系:partialCountMap的父RDD為words,words的父RDD為lines,如下圖:
有了Lineage和RDD的只讀特性,就可以輕松完成容錯了。
如果words在slave1上的一個分區出問題了,那么我們只需要加載slave1上對應的文件塊,并重新計算其lines對應的分區,進而計算得到words的這個分區。
圖中每個slave中只畫了一個文件塊,實際上可能有多個文件塊。一定要注意的是哪個分區出問題了,只會重算這一個分區,也就只會重新加載這個分區關聯的文件塊。
上面討論的是窄依賴的情況,如果像groupBy這種轉換,一個RDD分區需要依賴父RDD的多個分區,那么一個分區掛了,就需要計算父RDD中的多個分區。
分布式系統的三個問題:可擴展性,負載均衡,容錯處理,都解決了吧。
不知道看到這里的小伙伴,心里是否有個疑問,既然RDD的API只支持粗粒度的轉換,它真的能夠支持這么多千奇百怪的應用場景嗎?下一篇,我們一起看RDD的API,以及它對其它大數據處理框架能夠處理的應用場景的等效解決方案。