本文針對海量數據處理過程中的處理速度、存儲空間、容錯性、訪問時間等方面存在的問題,通過對Google MapReduce編程模型的原理、執行流程等進行分析研究,介紹5種主要的MapReduce實現框架:Hadoop MapReduce、Apache Spark、Phoenix、Disco、Mars,以期對MapReduce編程模型在行業內的使用前景有一個較全面的認識。
MapReduce介紹
海量數據的處理對服務器CPU、I/O的吞吐都是嚴峻的考驗,傳統的技術架構和僅靠單臺計算機基于串行的方式越來越不能適應當前海量數據處理的要求。只有將這些計算進行并行化處理,通過提取出處理過程中存在的可并行工作的分量,用分布式模型來實現這些并行分量的并行執行過程。
MapReduce是由谷歌推出的一個編程模型,是一個能處理和生成超大規模數據集的算法模型,該架構能夠在大量普通配置的計算機上實現并行化處理。
MapReduce編程模型結合用戶實現的Map和Reduce函數。用戶自定義的Map函數處理一個輸入的基于key/value pair的集合,輸出中間基于key/value pair的集合,MapReduce庫把中間所有具有相同key值的value值集合在一起后傳遞給Reduce函數,用戶自定義的Reduce函數合并所有具有相同key值的value值,形成一個較小value值的集合。一般地,一個典型的MapReduce程序的執行流程如圖1所示。
MapReduce執行過程主要包括:
將輸入的海量數據切片分給不同的機器處理;
執行Map任務的Worker將輸入數據解析成key/value pair,用戶定義的Map函數把輸入的key/value pair轉成中間形式的key/value pair;
按照key值對中間形式的key/value進行排序、聚合;
把不同的key值和相應的value集分配給不同的機器,完成Reduce運算;
任務成功完成后,MapReduce的輸出存放在R個輸出文件中,一般情況下,這R個輸出文件不需要合并成一個文件,而是作為另外一個MapReduce的輸入,或者在另一個可處理多個分割文件的分布式應用中使用。
MapReduce主要框架介紹
Hadoop MapReduce是一個在計算機集群上分布式處理海量數據集的軟件框架,包括一個JobTracker和一定數量的TaskTracker。用戶將MapReduce作業發送給Jobtracker所在集群的其他機器上分割工作,集群中其他機器執行Tasktracker的Map或Reduce任務。
Spark是一個基于內存計算的開源的集群計算系統,目的是讓數據分析更加快速。Spark非常小巧玲瓏,由加州伯克利大學AMP實驗室的Matei為主的小團隊所開發。使用的語言是Scala,項目的核心core部分的代碼只有63個Scala文件,非常短小精悍。Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。
Phoenix作為斯坦福大學EE382a課程的一類項目,由斯坦福大學計算機系統實驗室開發。Phoenix對MapReduce的實現原則和最初由Google實現的MapReduce基本相同。不同的是,它在集群中以實現共享內存系統為目的,共享內存能最小化由任務派生和數據間的通信所造成的間接成本。Phoenix可編程多核芯片或共享內存多核處理器(SMPs和ccNUMAs),用于數據密集型任務處理。
Disco是由Nokia研究中心開發的,基于MapReduce的分布式數據處理框架,核心部分由Erlang語言開發,外部編程接口為Python語言。Disco是一個開放源代碼的大規模數據分析平臺,支持大數據集的并行計算,能運行在不可靠的集群計算機上。Disco可部署在集群和多核計算機上,還可部署在Amazon EC2 上。
Mars是香港科技大學與微軟、新浪合作開發的基于GPU的MapReduce框架。目前已經包含字符串匹配、矩陣乘法、倒排索引、字詞統計、網頁訪問排名、網頁訪問計數、相似性評估和K均值等8項應用,能夠在32位與64位的Linux平臺上運行。
針對5種框架的特點筆者進行了如下分類。
編程語言
Hadoop MapReduce: Hadoop采用Java開發,所以能很好地支持Java語言編寫的MapReduce作業,如果采用C/C++或其他語言編寫MapReduce作業,需要用到Hadoop Streaming或Hadoop Pipes工具;
Spark:Spark 是在 Scala 語言中實現的,它將 Scala 用作其應用程序框架。與 Hadoop 不同,Spark 和 Scala 能夠緊密集成,其中的 Scala 可以像操作本地集合對象一樣輕松地操作分布式數據集;
Phoenix:采用全C++編寫,總代碼量不超過1萬行,提供C和C++的應用程序接口;
Disco:核心部分采用并發性能很高的Erlang語言開發,其外部編程接口為易于編程的Python語言;
Mars:采用C++編寫,提供C、C++的應用程序編程接口,支持最新的CUDA SDK。
構建平臺
Hadoop MapReduce:需要首先架構基于Hadoop的集群系統,通過HDFS完成運算的數據存儲工作;
Spark:可以的單獨運行,也可以與Hadoop框架完整結合;
Phoenix:獨立運行,不需要提前部署集群,運行時系統的實現是建立在PThread之上的,也可方便地移植到其他共享內存線程庫上;
Disco:整個Disco平臺由分布式存儲系統DDFS和MapReduce框架組成,DDFS與計算框架高度耦合,通過監控各個節點上的磁盤使用情況進行負載均衡;
Mars:運行時為Map或Reduce任務初始化大量的GPU線程,并為每個線程自動分配少量的key/value對來運行任務。
功能特點
Hadoop MapReduce:計算能力非常強,適合超大數據集的應用程序,但是由于系統開銷等原因,處理小規模數據的速度不一定比串行程序快,并且本身集群的穩定性不高;
Spark:在保證容錯的前提下,用內存來承載工作集,內存的存取速度快于磁盤多個數量級,從而可以極大提升性能;
Phoenix:利用共享內存緩沖區實現通信,從而避免了因數據復制產生的開銷,但Phoenix也存在不能自動執行迭代計算、沒有高效的錯誤發現機制等不足;
Disco:由一個Master服務器和一系列Worker節點組成,Master和Worker之間采用基于輪詢的通信機制,通過HTTP的方式傳輸數據。輪詢的時間間隔不好確定,若時間間隔設置不當,會顯著降低程序的執行性能;
Mars:由于GPU線程不支持運行時動態調度,所以給每個GPU線程分配的任務是固定的,若輸入數據劃分布均勻,將導致Map或Reduce階段的負載不均衡,使得整個系統性能急劇降低。同時由于GPU不支持運行時在設備內存中分配空間,需要預先在設備內存中分配好輸入數據和輸出數據的存放空間,但是Map和Reduce階段輸出數據大小是未知的,并且當多個GPU線程同時向共享輸出區域中寫數據時,易造成寫沖突。
五類實現框架對海量文本數據的統計實驗
單詞計數(WordCount)是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版“Hello World”。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數。
WordCount的實現步驟:
1、將文件拆分成splits,由于測試用的文件較小,所以每個文件為一個split,并將文件按行分割形成對,如圖2-1所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所占的字符數(Windows和Linux環境會不同)。
2、將分割好的對交給用戶定義的map方法進行處理,生成新的對。
3、得到map方法輸出的對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。
4、Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的對,并作為WordCount的輸出結果。
本次實驗的硬件資源基于x86服務器1臺,配置內存為32GB DDR3,E5 CPU/12核,GPU。實驗數據樣本為10M/50M/100M/500M/1000M的文本文件五個,我們使用Hadoop MapReduce、Spark、Phoenix、Disco、Mars等MapReduce框架分別運行文本分析程序,基于結果一致的前提下統計出運行時間、運行時CPU占有率、運行時內存占有率等數據,并采用這些數據繪制成柱狀圖。
Hadoop MapReduce的運行時間最長,原因是Hadoop生態環境包含內容過多,所以每次任務啟動時首先需要加載所需資源包,然后緩慢地發起任務,并且由于本身是用性能較差的Java語言編寫的,所以導致整體計算時間長、性能差。Phoenix由于采用匯編和C語言編寫,內核很小,運行時所用資源很少,所以整個測試過程耗時也較少。Mars由于必須在GPU上運行,本身GPU由于價格因素,導致不太可能在實際應用場景里推廣,所以Phoenix的性價比是最高的。需要時長從高到低分別是Hadoop MapReduce、Disco、Spark、Phoenix、Mars。
Hadoop MapReduce、Disco這兩個框架需要占用的CPU資源在1000M文本處理時基本到達最大飽和度(大于90%),Apache Spark的CPU使用率沒有完全伴隨著文本文件增大而大幅上漲,Phoenix和Mars基本控制在性價比較高的范圍內。
Mars和Phoenix使用的內存在數據量較小時是最少的,Apache Spark為隨著數據量增大而大幅增加,在數據量最大時它對內存的消耗是最小的。Hadoop MapReduce和Disco都需要占用較多的內存。
從上面的測試結果我們得出,如果用戶只需要處理海量的文本文件,不需要考慮存儲、二次數據挖掘等,采用Phoenix是最大性價比的選擇。如果應用程序需要處理的數據量非常大,并且客戶希望計算出的數據可以被存儲和二次計算或數據挖掘,那Hadoop MapReduce較好,因為整個Hadoop生態圈龐大,支持很好。Apache Spark由于架構層面設計不同,所以對于CPU、內存的使用率一直保持較低狀態,它未來可以用于海量視頻分析用途。
五類實現框架結合視頻人臉分析的實驗
安防行業的并行測試實驗大多是基于智能視頻分析技術基礎之上的。智能視頻分析技術是一種基于人工智能的識別模式。它綜合了各種高科技研究成果,主要借助智能視頻分析技術的處理方法,在結合一些硬件設施,對某些對象(比如人員、車輛等)進行研究和處理,形成一種核心算法。
在本次測試中,我們針對的是人臉特征抓取實驗,即通過對一段指定錄像分析,提取出錄像中所有出現的人臉圖片的過程。對錄像中出現的人臉圖片分析過程大致上可以分為三個階段:取流、解碼及分析、提取物發送。碼流分析提取服務即我們本次實驗所需要的三個階段。
主計算節點把錄像文件讀入到內存中,將碼流分割為若干個子塊分發給從計算節點。由于我們采用的是MapReduce框架,所以程序會自動分為若干個線程執行,每個線程對應一個Map,每個Map都會執行解碼、分析、結果輸出三個步驟。
我們在x86機器上進行了本次實驗,實驗數據是一個2.66GB大小的包含1092個人臉的錄像文件,錄像分辨率為1080P。我們通過分別采用不同的MapReduce框架來運行程序,對程序運行結果進行匹配,5個框架的運行結果完全一致,即抓取出1092個人臉圖片。我們對程序運行時間、運行過程中CPU使用率、運行過程中內存使用率做了統計并生成柱狀圖供參考。本次實驗過程中所使用的人臉檢測算法是筆者公司圖像處理與智能分析部門自主研發的算法。
Mars和Phoenix框架處理錄像所需時間最短,運行智能分析程序時CPU使用率對于所有框架基本上都達到最大飽和度(90%以上)。由于所做的實驗是對碼流進行分析,碼流本身需要占用較大的內存空間,解碼、分析等處理過程也許要占用內存用于存放中間結果,所以內存基本上也達到最大使用飽和度(90%以上)。綜上所述,CPU和內存的使用率在本類實驗過程中不需要過多考慮,最主要的對比點是運行時間。考慮到Mars必須基于GPU運行,并且GPU的價格較高,所以Phoenix的性價比更高。Hadoop MapReduce雖然處理時間最長,但是它具有強大的生態環境,利于對處理結果數據進行保存和數據挖掘,所以對于大型公司來說它依然是很好的選擇。Apache Spark雖然在本次實驗中沒有太多亮點,但是從各類大數據學術會議上得到的反饋較好,它基于內存方式的運算模式可以幫助處理海量數據,未來一定可以在智能分析領域有很大的作為。
結語
現實世界很多實例都可用MapReduce編程模型來表示,MapReduce作為一個通用可擴展的、高容錯性的并行處理模型,可有效地處理海量數據,不斷地從中分析挖掘出有價值的信息。MapReduce封裝了并行處理、負載均衡、容錯、數據本化等技術難點細節。通過本文的兩例測試用例可以證明MapReduce 適用于海量文本分析、海量視頻智能分析等安防行業密切相關的應用場景,諸如行為分析、車牌識別、人臉抓拍、客流統計等智能化技術的應用,尤其是對海量視頻執行高并發處理,可以很好地在平安城市、智慧城市等大型安防項目中落地,為公安機關治安管理、案件偵破等提供有力的技術支持。