大數據的分布式調度是在進行數據ETL過程中起到了總體的承上啟下的角色,整個數據的生產、交付、消費都會貫穿其中,本文從調度、分布式調度的特征展開,再對大數據調度個性化特征的一些闡述,由滿足大數據使用的架構和業務場景的需求上娓娓道來,從實踐的角度分享如何打造一個高可用、高效率、靈活性的大數據調度平臺。
調度
從上個世紀50年代起,調度問題的研究就受到數學、運籌學、工程技術學等領域科學的重視[1],人們主要從數學的角度來研究調度問題,調度問題也同樣被定義為”分配一組資源來執行一組任務”,以獲得生產任務執行時間或成本的最優[2]。調度在計算機任務的實現可以依賴操作系統的定時任務進行觸發(例如Linux系統的Crontab),主要針對單任務機制的觸發,調度最基本的需要能夠按時或者按照事件進行觸發(At-least-once),如果任務不符合預期,還需要在應用端進行重試,最大可能保證任務被按時執行,并且成功執行,同時不能多次執行(Exactly once);但是在業務場景能保證可重復執行、一致性操作情況下對于爭取能正常調度執行多次執行也是不可或缺的,比如給商戶進行1min前的例行結算,如果結算是按照30min的時間窗口查找未結算的商戶,那么就會容忍30min延遲,并且多次被執行也不會給商戶多結算,因為在結算付款和重置是否結算標志位可以設計成原子性操作。所以在調度上能夠做到按時、正確的執行,在業務方設計為了保證最終一致性也有一些架構上的取舍。
如果應用場景有上下游的協作,或者在任務執行會存在不同的宿主機來完成,或者為了保證任務高可用場景,就需要引入分布式調度的架構。
分布式調度
分布式調度是在單機的基礎上發展起來,在綜合考慮高可用、高效率、分布式協作的背景下逐步演進的調度方式,從單點調度到分布式協作是一個質變的過程,這個過程涉及到許多在單機并不存在的特征,下面針對重點展開聊下:
圖1 分布式調度組件化分解圖
2.1 調度器去中心化&高可用
涉及到分布式調度的協作,就需要有調度中心節點,同時要保證高可用的目的就需要調度中心節點是多節點發布,主備的方式去單點依賴。
2.2 宿主選擇
分布式調度在任務執行階段,可以在目標宿主中進行全部執行、N選M(N>=M>=1)的選擇,宿主機具備相同類型任務互備的機制,在MPP(Massively Parallel Processor)架構中尤為常見,把大任務分而治之快速完成。也存在場景(比如外賣給商戶結算)為了一致性和準確性只能由一臺主機進行執行,并且需要成功執行。
被動選擇策略:宿主的被動選擇機制一般可以隨機或者按照順序選擇策略,也可以按照當前宿主機進行的任務執行數量的方式進行常規的調度分配。當然,也可以進行高級的操作,參照宿主機的處理能力(吞吐量和響應時間)、資源使用情況(CPU、Memory、Disk I/O、Net I/O等)進行反饋機制的動態分配。后者需要有集中節點存儲當前宿主機的處理能力、資源情況,便于在決策選擇中提供參照。
主動選擇策略:宿主的主動選擇具備更加豐富的選舉策略,任務在下達到具體算子時,會比較明確的定義出當前任務需要由多少個宿主參與執行,通過zookeeper的分布式鎖來實現鎖的搶占機制,搶占成功則執行,否則放棄。這種選舉策略讓宿主機得到了更多的參與,降低了對調度器的依賴。這種主動選擇的方式,避免被動選擇因不具備執行條件被選中,在執行的能力在時間上的損耗。
2.3 任務故障轉移
調度任務的從任務級別job到transformer、operator,整個鏈條都存在具體局部失敗的情況,調度器需要在原目標宿主機重試和失敗后轉移到其他備宿主機的功能,最大力度的保證任務被成功執行。
2.4 執行算子抽象
以往單機任務的調度可以比較靈活的執行多樣的任務,可以是腳本、Webservice調用、HDFS Client命令行等,但是對于分布式協作需要接收外部命令運行,這就需要算子通過標準的數據通訊協議對外提供調用服務,常規的WebService、RPC(thrift/protocol buffer)等協議在跨語言通訊上具有較為廣泛的應用。所以具體執行單元可以是具體任務的抽象,例如提供了Rest API方式,調用的URL和參數都是執行方填入,最大程度上支撐了靈活性;數據庫操作算子可以包含數據庫驗證信息、具體執行的SQL等。執行算子抽象后,滿足規范和靈活性,靈活是一個雙刃劍,可以最大限度的滿足用戶需求,但也會導致大數據層面無法很細粒度的去感知數據的表、字段數據的完成情況,對數據生產無法更加精細粒度的產出交付。
2.5 彈性擴展
任務具體執行的宿主機需要在調度層面滿足彈性的擴展,擴展最主要的需要是滿足高可用和任務隨著水平擴展進行分攤壓力。在集群目標宿主機選擇時,一般目標集合可以指定具體IP-List,也可以是一個BNS(百度機器的NameServer服務)。IP-List方式設置比較簡單直觀,但是存在每次調整依賴變更調度系統服務,變更之后還需要進行刷新宿主機的情況。而通過BNS服務比較簡單,同時和線上服務發布部署進行結合,不存在延遲部署和刷新,推薦通過BNS的方式介入。
2.6 觸發機制
常規觸發是按照執行間隔或者具體時間的Crontab語法,開始時間,截止時間參數完成,但是在分布式調度任務中,最重要的就是完成協作,所以如果要進階的話,就是依賴觸發的機制。這種就很好的形成了上下游依賴觸發,是分布式協作的關鍵步驟。從最初的任務節點按照常規觸發,下游節點形成依賴鏈條,這里如果在高級進階的話,就是依賴的某個/某些頻次觸發,比如每小時的12分鐘開始被執行,下游可以選擇具體的2:12 ,4:12進行觸發,而非每個整點12分都被調用。這三種方式目前在外賣的大數據平臺都有不同場景訴求,架構設計在3個需求上都有靈活的交付。
2.7 堵塞機制
對于相同任務的不同時間的運行實例,會存在前面的實例還沒有正常結束的情況,這種在高頻次調用,第三方依賴故障延遲等情況下會出現,如果繼續調用會造成調用鏈條惡化,所以防止這種情況,堵塞機制會提供三種模式:常規例行(默認模式)、丟棄后續、丟棄前例。后面2種方案都需要提供容錯重放機制,這個場景比較類似1.1章節提到的結算案例。
2.8 圖形化進展查看
調度可以根據調用鏈條和不同事件頻次的實例,通過樹狀圖形化的方式查看執行的進度情況,例如可以查看job中transformer、算子的運行機器狀況、狀態和具體的實時執行日志。圖形化是根據調用的觸發機制分析出來的一個鏈條,是在煩冗復雜的調用關系中找到清晰脈絡的數據直觀表達的方式,是調度中常規的展示方式。在進階中可以查看相應的參數傳遞,并發算子的執行進度條,預估完成周期等。
2.9 報警
通過郵件或者短信的方式對不符合預期返回標識的進行中止,同時通過郵件或者短信等方式對預先設置的用戶或者用戶組發出警告。報警觸發的機制可以在宿主機單臺時候觸發,也可以在一定占比的宿主機在一定的時間窗口超過了閾值,觸發報警。同時也要支持報警的屏蔽,用在進行運維或者升級部署、運維接管的情況。
上面是很多常規調度擁有的一些特征,這些是在分布式場景下的延伸需求,從單點簡單的邏輯到多節點的協作統籌在工程層面無疑增加了額外輔助,這些都是在業務演進中逐步完善起來,而高可用、高效率是在分布式環境下做出的改變。
大數據分布式調度
大數據分布式調度,在上面通用調度的基礎上又進行了具體跟數據特征相匹配的改良。主要是從數據的流程層面進行梳理,用來解釋數據的上下游、血緣關系的問題,具體又有哪些特征是針對大數據的呢?
3.1 數據扇入扇出
大數據的存儲和檢索方案很多,因大數據特征之一就是多樣性,為了滿足多樣的業務場景會有不同的引擎或者存儲選擇,在多樣化解決方案的同時,造成了數據之間進行交換變得復雜,引擎之間的數據存取規則都有個性化的支持,比如Hbase的數據到Mysql和ElasticSearch(以下簡稱ES),涉及到Hbase的讀取和后續后面兩者的數據存入,這種對于Hbase就是一對二的數據扇出,但是在數據在Hbase中通過Get或者Scan方式獲取后,要插入數據需要了解后面2者的存儲結構,甚至是索引結構。所以類似這種跨引擎(或者跨版本,不同API)的方式,為了保持通用,需要進行需求的抽象,在外賣平臺針對數據的交換定義了一套開放式SQL,這個框架對數據引擎的存和取分別作了抽象,在不同的目標引擎中有具體的實現,所以就有一些約定的規范。
圖2 開放式SQL扇入扇出流程圖
主鍵:數據必須存在業務主鍵或者聯合主鍵,目的是為了保證數據在聚合或者更新的時候有依據。主鍵在Nosql的引擎中作為RowKey,在關系數據庫中作為主鍵,在ES中作為主鍵key。對于Kudu來講也是主鍵,針對數據的upsert就可以有依據的進行更新或者插入。
數據列:數據列的變更會稍微復雜,如果在關系數據庫中會涉及到增加、變更列,但是在Hbase、ES中基本不需要主動擴展列,只需要對數據變更就可以了。
分區字段:對于事實表數據,在大數據量的情況下,為了檢索效率和數據存放最優,一般會提供分區和桶的策略,針對Hive、Impala、GreenPlum的引擎會額外增加分區字段,分區可以是一級到多級,一般業務場景下第一分區為日期,根據實際業務需求可以變更更細粒度或者其他業務字段。在一般Mysql、Postgresql、Hbase這種引擎中不需要單獨增加分區字段。
數據更新范圍:大數據的數據交換,一般為了提高效率會進行多批次的并發處理,這就需要在一批次的數據進行分割,一般情況下會按照單一字段的進行截取,字段的類型以時間戳(create_time、update_time)居多,也可以根據主鍵的key排序后分批次獲取,在源數據引擎允許的情況下,按照多批次的并發query可以做到很好的數據獲取,把串行的操作截斷成多段的并發;這種在同一個任務多時間批次的情況下也很重要,每個批次會界定本批次設計數據更新的范圍。數據更新范圍使用前一般會獲取本次更新的數據量,可以根據原目標引擎單個批次的最優性能計算出offset。
多步驟過程:多步驟顧名思義就是數據的準備不是一蹴而就的,例如在3個Mysql庫、Postgresql、Oracle中獲取員工信息,而員工編號是統一的,最終數據在DB2中匯聚在一起,最基礎的步驟是三份數據匯入到Oracle中,這就涉及到前面通過key做數據的Merge,這里會涉及到數據的插入和更新,但是如果有key存在并且不同數據源目標數據列清楚的情況下,三份數據早到和晚到場景都沒有太大區別。第二步驟則根據匯總完的數據分析出一個過濾場景下的聚合信息,這步驟的場景作為計算數據源,再次進行數據的扇出插入結果。第三步驟可以把第一步的臨時結果進行刪除。所以在多步驟的場景下數據是分步驟完成了匯聚、聚合和刪除。
更新類型:百度外賣大數據實踐的開放式SQL場景有Insert(大批明細場景)、Update(數據后續更新)、Insert Once(聚合結果插入)、Insert Temp(臨時結果緩存)、Delete(善后處理場景),在這些組合操作類型的場景下,需要在是線上增加一個執行優先級的信息,如果區分優先級會按照從前到后的步驟執行,如果沒有設定則可以并發操作。
黑盒暴露操作:黑盒操作是在通過開放式SQL的存取原則情況下,對無法按照約定規范操作的情況下實行的一種妥協方式,目的有兩個:一方面要把黑盒對數據依賴過程必須對外暴漏,這樣是為了后期梳理數據血緣關系提供素材;另一方面通過黑盒來滿足數據處理的靈活性,比如對json負責xpath的選擇,集中緩存優化方案;黑盒雖然通過規范暴露了依賴源數據,但是也造成了對外不好解釋數據的處理過程,同時這種黑盒一般針對表或者多個字段,精細化程度不夠。
開放式SQL是大數據在做數據ETL的一個規范標準,目的在數據的交換和流動是通過配置的范式來完成,并非是通過硬編碼或者單純組件化的方式。編碼更多的是要提供豐富的解析函數,更優秀的中間大結果集的Cache和復用。開放式SQL提供了數據從哪里來,到哪里去的哲學問題,同時也可以進行對外闡述對數據做何種操作,這是在為后期數據血緣關系提供最基礎的指導,在發展過程中,百度外賣大數據平臺也經歷了如下的不同階段。
圖3 分布式調度的演進過程
3.2 協作參數一致性
調度策略除了有之前提到的上下游關系外,在大數據場景下還需保證數據處理的統籌協作,更為重要的是精細參數的上傳下達。上下游使用系統默認的參數Key定義,也可以自定義Key的參數;系統參數比如說起止時間戳、機器IP、執行任務實例等。對于全局系統默認的Key,由調度系統進行賦值。
參數的作用域有本地化和全局2種方式,本地化可以設定參數的Key:Value,相同Key的全局不會被覆蓋,本地的優先級高于全局;而全局的變量是由上游產生并且進行流轉;調度本身規定了不同算子在參數接收方面的追加、解析、編碼規范,比如在Shell命令和WebService中追加參數有較大區別。
參數除了作用域還有是否被傳遞的屬性,上游的參數可以有針對性的對下游輸出,同樣,如果算子接收到上游參數可以選擇修改值,但是這種傳遞是不被修改。
3.3 數據質量實時Check
數據生產在交付之前一般會對數據進行校驗,由于大數據生產的過程比較冗長,如果在后期輸出數據再進行質量校驗,往往發現問題比較滯后。所以在數據的階段性交付過程就可以對數據進行核驗,可以比較早的對數據的問題進行干預,保證數據交付的可靠及時性。
Check算子:針對數據的校驗特點,設計了專門算子提供質量保證。數據核驗的方式一般有2種:跟自身歷史比較、跟其他數據源進行比較。前者只需要對目標數據源進行選擇相應的SQL或者標準API來獲取當前生產窗口的數據,然后才去同比、環比、滑動窗口的均值、左右邊界等方式,時間粒度可以靈活到天、小時、分鐘。如果跟其他數據源進行比較則需要對源和目標分別進行描述,可以進行嚴格相等、區間、浮動率等方式比較,應用的場景以數據交換較多。除了數據比較之外,還提供關鍵性字段類型、精度、寬度的比較,以及對空置率、重復率、區分度的統計報表產出,比較直觀的查看數據的稀疏和分布。
整體和抽樣:針對于其他數據源進行比較的方式,常規的是通過宏觀的字段抽樣的Count方式條數比較,也可以通過對數據類型的Sum、Avg的比較,這里需要注意不同引擎的存儲精度略有區別,盡量選擇整形字段;除此之外也會增加對明細數據抽樣的全列的字段比較,這種比較容易發現字段值的缺失,類型變更等問題。
這里需要說明的是,如果沒有配置Check算子,則認為數據生產完就可以進行交付;如果數據的樹狀結構中有Check算子,則認為在下一個Check算子之間的所有數據生產節點都默認數據可以交付。這樣默認操作是因為數據的校驗不一定要面面俱到,否則也會帶來時間上的損耗,一般情況下我們認為只需要在關鍵性節點進行核驗就可以了。校驗失敗通過告警的方式中止數據ETL過程,后續可以重試或者人工方式介入處理。
3.4 數據血緣關系
人生哲學解釋:血緣關系分析是大數據調度與其他調度之間的區分度較大特征之一,主要解決大數據的“人生哲學問題”:我是誰,從哪里來,到哪里去。而這一切的基礎是開放式SQL對數據存取的規范,之后依賴對開放式SQL的解析來完成血緣關系分析,主要包含數據的上游依賴關系和下游的被依賴關系,這2個是通常被涉及到的,除此之外還包含第三個特征:計算邏輯或者口徑對外的輸出,鑒于大數據在進行計算和挖掘之后數據會被推送到不同的業務場景使用,會造成相同口徑指標不同的計算結果,當被提及計算邏輯時,研發同學也無所適從,經常需要追根溯源對代碼和過程進行回訪,進而導致無益消耗的增加。
所以計算邏輯輸出也是常規和減少人力梳理成本的重要特點。
開放式SQL可以對外解釋,數據從哪里來,到哪里去的邏輯問題,也會涉及到具體SQL或者API層面的計算口徑,但是這里需要提到之前的【黑盒暴露】和研發專注開發ETL的豐富function,黑盒是無法解釋計算邏輯的,但是function卻可以給出入參、出參的說明,讓特征三的提供成本最低。
血緣關系分析的手法一方面依賴SQL屬主引擎的語法解析,例如Mysql可以使用Alibaba druid、JSqlparser,GreenPlum、Postgresql可以借助JSqlparser,Impala則需要通過impala-frontend進行語法分析,分析的結果在外賣大數據平臺需要精確到單個字段依賴上游的哪些庫表、字段;越是精細越是精細在進行大數據回溯的時候就越有針對性,同時也越有利于效率的提高。
在進行大數據回溯的時候越有針對性和利于效率的提高。
針對非SQL方式,例如Hbase、ElasticSearch數據源的依賴,也會同樣被映射成不同的文檔/表,具體的列簇中的列,source中的key。
總之,數據可解釋是血緣關系存在的價值,血緣關系同樣和開放式SQL都在ETL的演進中具有里程碑的意義。
3.5 基于表的Transformer演進
在大數據調度中,對用戶最直觀的展示是某個表是否可以被交付,或者更為精確查看表中的字段哪些具備了可以被交付?這樣做是為了讓下游數據更好的有選擇性的、細粒度的依賴觸發動作。所以在大數據調度中會區分出三類角色,從粗粒度到細粒度分別是:Job、Transformer、operator。
圖4 三者協作示例
下面解釋下三者的分工和協作:
任務(Job):Job的主要作用是進行數據相關性的統籌,簡單來講是針對表之間、多種數據源之間進行協作的一個統籌,是一個最大粒度的過程,具體調度的實例化過程都是以Job作為入口,其他2個角色都不具備實例化的能力。這里會區分出同樣有數據之間依賴,但是并不一定在一個執行頻次上的任務,可以采取配置不同的job依賴關系。
轉換(Transformer):一個轉換就代表一個表,單獨把表拿出來,是因為在大數據的交付過程,表是一個完整的符號,不如庫的粒度大,也不像字段太精細無法對外完整表述。
算子(operator):算子是調度的最細粒度,不可分割。算子的分類根據應用會擴展很多,有控制類型算子,例如啟停算子、分發算子、Check算子等。也會有針對數據操作進行封裝的功能性算子,比如獲取hdfs數據推送到mysql,Ftp到對象存儲等;針對大數據調度的功能性算子是針對單個字段或者幾個字段的產生,這個完全依賴于數據產生的難易程度和組合回溯的相關性,最終由開放式SQL進行配置,例如其中的一行則認為是對一個算子的功能進行的描述,select字段中的數據獲取可以是多個,同樣對應的insert中也可以是多個;大數據調度在完成開發之后,后期的更多運維精力就在算子的豐富。算子的實現會考慮到前面提到的靈活和通用的選擇。
3.6 基于字段精細化回溯
字段級別的回溯,主要依賴2+1的方式完成,前面的2是指血緣關系+可更新目標引擎;通過開放式SQL可以梳理出數據的血緣關系,便于分析出整個鏈條中可以上下游依賴的點和并發的點。另外的1是指在調度的圖形化界面中,可以針對一個具體實例化的Job選擇需要回溯的transformer或者某些算子。
同樣,根據上圖4中的流程,我們走一個具體的實例。圖中標識的黑色0/6代表的是開放式SQL中黑盒的部分,這部分對數據來說無法解釋的生產過程;三個標識圖形2代表的是Check算子,其他圓角方形顏色相同代表有上下游血緣關系依賴,例如7會依賴上游的1。下面我們了解下幾個場景的回溯:
回溯1:在這種情況下算子1/2/3/4/6會被進行回溯,而算子0和5則不會被執行到,同樣因為1后面有緊鄰的check算子2,則1執行完,算子7不會馬上被并發執行,因為有一個黑色的算子6。但是在算子2執行成功之后,如果能暴露出算子6的依賴和產出關系,算子7就可以被執行,不需要等待算子3/4/6的執行完成。所以節約了一定的時間。其他場景也是類似
回溯Transformer2,這種場景算子7和算子9會同時觸發執行,同樣,如果算子9在完成的情況下,下游transformer3中的11不會被執行,因為是非首節點,但是在算子7執行完成之后,算子13和算子10都會被同時調起。
可更新目標引擎是指非SQL On Hadoop的文件解決方案,類似GreenPlum、Hbase、ES都是可以被實時更新。這里不詳細展開。
3.7 信號燈
信號燈在大數據分布式調度中作為一個消息中間件,主要作用是生產者(Producer)在數據生產結束、數據質量核驗通過等過程對外釋放信號,這里面包含具體的庫表、字段和本批次的數據范圍等信息,消費者(Consumer)可以根據需要監聽不同的表主題,來完成后續的操作。通過信號燈的方式,可以很好的對數據下游依賴解耦合,同時信號燈也可以被應用在數據集市中庫表、字段的數據完成情況標識,可以讓用戶進行查看,免去了數據是否可用,是否交付的交互。
總結
大數據分布式調度的應用場景和ETL的定義過程、數據引擎和業務場景的需求有著至關重要的關聯,分布式調度的過程是通過場景化驅動逐步完善的過程,百度外賣大數據的調度V2.0是滿足了通用的調度之后,發現存在的數據解釋和細粒度更新延遲等問題之后,開啟了逐步迭代完善過程,后期也期待我們的系統開源的一天。
引用
徐俊剛, 戴國忠, 王宏安. 生產調度理論和方法研究綜述[J]. 計算機研究與發展, 2004, 41(2):257-267.
KNMcKay,VCSWiers1Unifyingthetheoryandpracticeofproductionscheduling1JournalofManufacturingSystem,1999,18(4):241~255
作者:梁福坤,百度外賣大數據首席架構師