隨著云計算大數據的發展,有越來越多的場景需要借助于實時數據處理技術,為此有很多公司開發了自己的實時處理系統,Facebook就是其中的一員,他們構建的實時數據處理生態系統每秒鐘能夠處理數百GB的數據。本文介紹了Facebook在設計該系統時從易用性、性能、容錯、可伸縮性以及正確性等方面考慮所做的重要設計決策,這些決策和系統如何滿足秒級的延遲需求,以及在構建該系統的過程中Facebook所總結的經驗教訓。
易用性:處理需求有多復雜?SQL是否足夠?是否必須要使用C++或者Java這樣的編程語言?用戶編寫、測試和部署一個新的應用程序需要多長時間? 性能:允許多長時間的延遲,毫秒級,秒級,還是分鐘級?單機或者總體需要多大的吞吐量? 容錯能力:可以容忍哪些類型的錯誤?數據處理或輸出的次數通過什么語義來保證?系統如何存儲和恢復內存狀態? 可伸縮性:數據是否支持分片從而進行并行處理?系統是否能夠容易地隨著數據量的變化進行調整?是否可以重新處理之前的有價值的老數據? 正確性:是否需要ACID特性?作為輸入的所有數據是否都需要被處理并在最終的結果中出現?
以及不同的流處理系統所做的設計決策:
語言范式決定了編寫應用程序的難易程度以及開發者對性能的操控程度?;居腥N選擇:聲明式,函數式以及過程式編程語言。對于Facebook而言,單一的某種語言無法滿足所有的用例,因此他們開發了三種不同的流處理系統。
數據傳輸對流處理系統的容錯性、性能和可伸縮性都有非常大的影響,傳統的數據傳輸方式包括:直接消息傳輸、基于代理的消息傳輸和基于持久化存儲的消息傳輸。Facebook使用Scribe,一種持久化的消息總線,來連接不同的處理節點。
處理語義包括狀態語義(每一個輸入事件最少被計數一次、最多被計數一次還是只被計數一次?)和輸出語義(給定的輸出值在輸出流中最少出現一次、最多出現一次還是只出現一次?)。其中無狀態的處理器只有輸出語義,而有狀態的處理器這兩種語義都有。Facebook對不同的應用通常有不同的狀態和輸出語義需求,因而開發了Puma、Stylus和Swift三個支持不同語義的系統?!?br />
狀態保存機制的實現方式有很多,包括復制副本、本地數據庫持久化、遠程數據庫持久化、上游備份以及全局一致性快照等。Facebook實現了兩種狀態保存機制,其中Puma實現了遠程數據庫存儲,而Stylus則實現了本地和遠程數據庫存儲?!?br />
再處理的方式有三種:僅使用流處理;維護兩個單獨的系統,一個用于流處理,一個用于批處理;開發一個能夠在批處理環境中運行的流處理系統。Facebook采用了一種與Spark Streaming以及Flink都不同的處理方式,他們使用標準的MapReduce框架從Hive中讀取數據并在批處理環境中運行流處理應用程序。Puma應用可以運行在Hive環境中,而Stylus則提供了三種類型的處理器:無狀態的處理器,通用的有狀態的處理器和一個居中的流處理器。
在系統建設方面,Facebook的主要設計目標是秒級的延遲,每秒鐘能夠處理幾百GB的數據,為此他們通過一個持久化消息總線將所有的處理組件連接起來進行數據傳輸,同時也將數據的處理和傳輸解耦,實現容錯、可伸縮、易用性和正確性。整個系統的架構圖如下:
該圖闡述了Facebook實時處理系統的數據流,數據從左側的移動和Web產品中產生,然后被送入Scribe(一個分布式數據傳輸系統),而Puma、Stylus和Swift等實時流處理系統則從Scribe中讀取數據并將處理結果寫入Scribe。Puma、Stylus和Swift可以根據需要通過Scribe連接成一個復雜的DAG(有向無環圖)。
接下來是使用該實時處理系統的一個示例應用,該應用識別一個輸入事件流中的趨勢事件,以5分鐘為單位對這段時間內產生的話題按事件數排序。每個事件包含一個事件類型,一個維度ID(用于獲取事件的維度信息,例如使用的編程語言)和一個文本(用于分類事件主題,例如電影或者嬰兒)。該應用有4個處理節點,每一個都可以并行執行,整體流程圖如下:
在該圖中,Filterer會根據事件類型過濾輸入流,然后將輸出按照維度ID進行分片,這樣下一個節點就能夠并行處理分片數據了。Joiner通過維度ID從一個或者多個外部系統檢索信息,然后根據事件的文本內容對其按照話題進行分類。Scorer記錄著最近一段時間內每一個話題的事件數,同時還會跟蹤這些計數器的長期趨勢。Ranker則計算每N分鐘每一個話題的前K個事件是什么。
最后是Facebook在構建該系統的過程總結的一些經驗教訓:首先,沒有一個單獨的流處理系統能夠適應所有場景,針對不同的點使用不同的系統才能更好地解決問題;其次易用性不僅包括使用,還包括開發、調試、部署、監控和運維等方面;最后,流處理和批處理并不是互斥的,組合使用這兩種系統能夠加速數據的處理速度。