如今微服務架構討論的如火如荼。但在企業架構里除了大量的OLTP交易外,還存在海量的批處理交易。在諸如銀行的金融機構中,每天有3-4萬筆的批處理作業需要處理。針對OLTP,業界有大量的開源框架、優秀的架構設計給予支撐;但批處理領域的框架確鳳毛麟角。是時候和我們一起來了解下批處理的世界哪些優秀的框架和設計了,今天我將以Spring Batch為例,和大家一起探秘批處理的世界。
如今微服務架構討論的如火如荼。但在企業架構里除了大量的OLTP交易外,還存在海量的批處理交易。在諸如銀行的金融機構中,每天有3-4萬筆的批處理作業需要處理。針對OLTP,業界有大量的開源框架、優秀的架構設計給予支撐;但批處理領域的框架確鳳毛麟角。是時候和我們一起來了解下批處理的世界哪些優秀的框架和設計了,今天我將以Spring Batch為例,和大家一起探秘批處理的世界。
初識批處理典型場景探秘領域模型及關鍵架構實現作業健壯性與擴展性批處理框架的不足與增強批處理典型業務場景對賬是典型的批處理業務處理場景,各個金融機構的往來業務和跨主機系統的業務都會涉及到對賬的過程,如大小額支付、銀聯交易、人行往來、現金管理、POS業務、ATM業務、證券公司資金賬戶、證券公司與證券結算公司。
下面是某行網銀的部分日終跑批實例場景需求。
涉及到的需求點包括:
批量的每個單元都需要錯誤處理和回退;每個單元在不同平臺中運行;需要有分支選擇;每個單元需要監控和獲取單元處理日志;提供多種觸發規則,按日期,日歷,周期觸發;除此之外典型的批處理適用于如下的業務場景:定期提交批處理任務(日終處理)并行批處理:并行處理任務企業消息驅動處理大規模的并行處理手動或定時重啟按順序處理依賴的任務(可擴展為工作流驅動的批處理)部分處理:忽略記錄(例如在回滾時)完整的批處理事務與OLTP類型交易不同,批處理作業兩個典型特征是批量執行與自動執行(需要無人值守):前者能夠處理大批量數據的導入、導出和業務邏輯計算;后者無需人工干預,能夠自動化執行批量任務。
在關注其基本功能之外,還需要關注如下的幾點:
健壯性:不會因為無效數據或錯誤數據導致程序崩潰;可靠性:通過跟蹤、監控、日志及相關的處理策略(重試、跳過、重啟)實現批作業的可靠執行;擴展性:通過并發或者并行技術實現應用的縱向和橫向擴展,滿足海量數據處理的性能需求;苦于業界真的缺少比較好的批處理框架,Spring Batch是業界目前為數不多的優秀批處理框架(Java語言開發),SpringSource和Accenture(埃森哲)共同貢獻了智慧。
Accenture在批處理架構上有著豐富的工業級別的經驗,貢獻了之前專用的批處理體系框架(這些框架歷經數十年研發和使用,為Spring Batch提供了大量的參考經驗)。
SpringSource則有著深刻的技術認知和Spring框架編程模型,同時借鑒了JCL(Job Control Language)和COBOL的語言特性。2013年JSR-352將批處理納入規范體系,并被包含在了JEE7之中。這意味著,所有的JEE7應用服務器都會有批處理的能力,目前第一個實現此規范的應用服務器是Glassfish 4。當然也可以在Java SE中使用。
但最為關鍵的一點是:JSR-352規范大量借鑒了Spring Batch框架的設計思路,從上圖中的核心模型和概念中可以看出究竟,核心的概念模型完全一致。完整的JSR-252規范可以從https://jcp.org/aboutJava/communityprocess/final/jsr352/index.html下載。
通過Spring Batch框架可以構建出輕量級的健壯的并行處理應用,支持事務、并發、流程、監控、縱向和橫向擴展,提供統一的接口管理和任務管理。
框架提供了諸如以下的核心能力,讓大家更關注在業務處理上。更是提供了如下的豐富能力:
明確分離批處理的執行環境和應用將通用核心的服務以接口形式提供提供“開箱即用” 的簡單的默認的核心執行接口提供Spring框架中配置、自定義、和擴展服務所有默認實現的核心服務能夠容易的被擴展與替換,不會影響基礎層提供一個簡單的部署模式,使用Maven進行編譯批處理關鍵領域模型及關鍵架構先來個Hello World示例,一個典型的批處理作業。
典型的一個作業分為3部分:作業讀、作業處理、作業寫,也是典型的三步式架構。整個批處理框架基本上圍繞Read、Process、Writer來處理。除此之外,框架提供了作業調度器、作業倉庫(用以存放Job的元數據信息,支持內存、DB兩種模式)。
完整的領域概念模型參加下圖:
Job Launcher(作業調度器)是Spring Batch框架基礎設施層提供的運行Job的能力。通過給定的Job名稱和作Job Parameters,可以通過Job Launcher執行Job。
通過Job Launcher可以在Java程序中調用批處理任務,也可以在通過命令行或者其它框架(如定時調度框架Quartz)中調用批處理任務。
Job Repository來存儲Job執行期的元數據(這里的元數據是指Job Instance、Job Execution、Job Parameters、Step Execution、Execution Context等數據),并提供兩種默認實現。
一種是存放在內存中;另一種將元數據存放在數據庫中。通過將元數據存放在數據庫中,可以隨時監控批處理Job的執行狀態。Job執行結果是成功還是失敗,并且使得在Job失敗的情況下重新啟動Job成為可能。Step表示作業中的一個完整步驟,一個Job可以有一個或者多個Step組成。
批處理框架運行期的模型也非常簡單:
Job Instance(作業實例)是一個運行期的概念,Job每執行一次都會涉及到一個Job Instance。
Job Instance來源可能有兩種:一種是根據設置的Job Parameters從Job Repository(作業倉庫)中獲取一個;如果根據Job Parameters從Job Repository沒有獲取Job Instance,則新創建一個新的Job Instance。
Job Execution表示Job執行的句柄,一次Job的執行可能成功也可能失敗。只有Job執行成功后,對應的Job Instance才會被完成。因此在Job執行失敗的情況下,會有一個Job Instance對應多個Job Execution的場景發生。
總結下批處理的典型概念模型,其設計非常精簡的十個概念,完整支撐了整個框架。
Job提供的核心能力包括作業的抽象與繼承,類似面向對象中的概念。對于執行異常的作業,提供重啟的能力。
框架在Job層面,同樣提供了作業編排的概念,包括順序、條件、并行作業編排。
在一個Job中配置多個Step。不同的Step間可以順序執行,也可以按照不同的條件有選擇的執行(條件通常使用Step的退出狀態決定),通過next元素或者decision元素來定義跳轉規則;
為了提高多個Step的執行效率,框架提供了Step并行執行的能力(使用split進行聲明,通常該情況下需要Step之間沒有任何的依賴關系,否則容易引起業務上的錯誤)。Step包含了一個實際運行的批處理任務中的所有必需的信息,其實現可以是非常簡單的業務實現,也可以是非常復雜的業務處理,Step的復雜程度通常是業務決定的。
每個Step由ItemReader、ItemProcessor、ItemWriter組成,當然根據不同的業務需求,ItemProcessor可以做適當的精簡。同時框架提供了大量的ItemReader、ItemWriter的實現,提供了對FlatFile、XML、Json、DataBase、Message等多種數據類型的支持。
框架還為Step提供了重啟、事務、重啟次數、并發數;以及提交間隔、異常跳過、重試、完成策略等能力。基于Step的靈活配置,可以完成常見的業務功能需求。其中三步走(Read、Processor、Writer)是批處理中的經典抽象。
作為面向批的處理,在Step層提供了多次讀、處理,一次提交的能力。
在Chunk的操作中,可以通過屬性commit-interval設置read多少條記錄后進行一次提交。通過設置commit-interval的間隔值,減少提交頻次,降低資源使用率。Step的每一次提交作為一個完整的事務存在。默認采用Spring提供的聲明式事務管理模式,事務編排非常方便。如下是一個聲明事務的示例:
框架對于事務的支持能力包括:
Chunk支持事務管理,通過commit-interval設置每次提交的記錄數;支持對每個Tasklet設置細粒度的事務配置:隔離界別、傳播行為、超時;支持rollback和no rollback,通過skippable-exception-classes和no-rollback-exception-classes進行支撐;支持JMS Queue的事務級別配置;另外,在框架資深的模型抽象方面,Spring Batch也做了極為精簡的抽象。
僅僅使用六張業務表存儲了所有的元數據信息(包括Job、Step的實例,上下文,執行器信息,為后續的監控、重啟、重試、狀態恢復等提供了可能)。
BATCH_JOB_INSTANCE:作業實例表,用于存放Job的實例信息BATCH_JOB_EXECUTION_PARAMS:作業參數表,用于存放每個Job執行時候的參數信息,該參數實際對應Job實例的。BATCH_JOB_EXECUTION:作業執行器表,用于存放當前作業的執行信息,比如創建時間,執行開始時間,執行結束時間,執行的那個Job實例,執行狀態等。BATCH_JOB_EXECUTION_CONTEXT:作業執行上下文表,用于存放作業執行器上下文的信息。BATCH_STEP_EXECUTION:作業步執行器表,用于存放每個Step執行器的信息,比如作業步開始執行時間,執行完成時間,執行狀態,讀寫次數,跳過次數等信息。BATCH_STEP_EXECUTION_CONTEXT:作業步執行上下文表,用于存放每個作業步上下文的信息。實現作業的健壯性與擴展性
批處理要求Job必須有較強的健壯性,通常Job是批量處理數據、無人值守的,這要求在Job執行期間能夠應對各種發生的異常、錯誤,并對Job執行進行有效的跟蹤。
一個健壯的Job通常需要具備如下的幾個特性:
1. 容錯性
在Job執行期間非致命的異常,Job執行框架應能夠進行有效的容錯處理,而不是讓整個Job執行失敗;通常只有致命的、導致業務不正確的異常才可以終止Job的執行。
2. 可追蹤性
Job執行期間任何發生錯誤的地方都需要進行有效的記錄,方便后期對錯誤點進行有效的處理。例如在Job執行期間任何被忽略處理的記錄行需要被有效的記錄下來,應用程序維護人員可以針對被忽略的記錄后續做有效的處理。
3. 可重啟性
Job執行期間如果因為異常導致失敗,應該能夠在失敗的點重新啟動Job;而不是從頭開始重新執行Job。
框架提供了支持上面所有能力的特性,包括Skip(跳過記錄處理)、Retry(重試給定的操作)、Restart(從錯誤點開始重新啟動失敗的Job):
Skip,在對數據處理期間,如果數據的某幾條的格式不能滿足要求,可以通過Skip跳過該行記錄的處理,讓Processor能夠順利的處理其余的記錄行。
Retry,將給定的操作進行多次重試,在某些情況下操作因為短暫的異常導致執行失敗,如網絡連接異常、并發處理異常等,可以通過重試的方式避免單次的失敗,下次執行操作時候網絡恢復正常,不再有并發的異常,這樣通過重試的能力可以有效的避免這類短暫的異常。
Restart,在Job執行失敗后,可以通過重啟功能來繼續完成Job的執行。在重啟時候,批處理框架允許在上次執行失敗的點重新啟動Job,而不是從頭開始執行,這樣可以大幅提高Job執行的效率。
對于擴展性,框架提供的擴展能力包括如下的四種模式 :
Multithreaded Step 多線程執行一個Step;Parallel Step 通過多線程并行執行多個Step;Remote Chunking 在遠端節點上執行分布式Chunk操作;Partitioning Step 對數據進行分區,并分開執行;我們先來看第一種的實現Multithreaded Step:
批處理框架在Job執行時默認使用單個線程完成任務的執行,同時框架提供了線程池的支持(Multithreaded Step模式),可以在Step執行時候進行并行處理,這里的并行是指同一個Step使用線程池進行執行,同一個Step被并行的執行。使用tasklet的屬性task-executor可以非常容易的將普通的Step變成多線程Step。
Multithreaded Step的實現示例:
需要注意的是Spring Batch框架提供的大部分的ItemReader、ItemWriter等操作都是線程不安全的。
可以通過擴展的方式顯現線程安全的Step。
下面為大家展示一個擴展的實現:
需求:針對數據表的批量處理,實現線程安全的Step,并且支持重啟能力,即在執行失敗點可以記錄批處理的狀態。
對于示例中的數據庫讀取組件JdbcCursorItemReader,在設計數據庫表時,在表中增加一個字段Flag,用于標識當前的記錄是否已經讀取并處理成功,如果處理成功則標識Flag=true,等下次重新讀取的時候,對于已經成功讀取且處理成功的記錄直接跳過處理。
Multithreaded Step(多線程步)提供了多個線程執行一個Step的能力,但這種場景在實際的業務中使用的并不是非常多。
更多的業務場景是Job中不同的Step沒有明確的先后順序,可以在執行期并行的執行。
Parallel Step:提供單個節點橫向擴展的能力
使用場景:Step A、Step B兩個作業步由不同的線程執行,兩者均執行完畢后,Step C才會被執行。
框架提供了并行Step的能力。可以通過Split元素來定義并行的作業流,并制定使用的線程池。
Parallel Step模式的執行效果如下:
每個作業步并行處理不同的記錄,示例中三個作業步,處理同一張表中的不同數據。
并行Step提供了在一個節點上橫向處理,但隨著作業處理量的增加,有可能一臺節點無法滿足Job的處理,此時我們可以采用遠程Step的方式將多個機器節點組合起來完成一個Job的處理。
Remote Chunking:遠程Step技術本質上是將對Item讀、寫的處理邏輯進行分離;通常情況下讀的邏輯放在一個節點進行操作,將寫操作分發到另外的節點執行。
遠程分塊是一個把step進行技術分割的工作,不需要對處理數據的結構有明確了解。
任何輸入源能夠使用單進程讀取并在動態分割后作為"塊"發送給遠程的工作進程。
遠程進程實現了監聽者模式,反饋請求、處理數據最終將處理結果異步返回。請求和返回之間的傳輸會被確保在發送者和單個消費者之間。
在Master節點,作業步負責讀取數據,并將讀取的數據通過遠程技術發送到指定的遠端節點上,進行處理,處理完畢后Master負責回收Remote端執行的情況。
在Spring Batch框架中通過兩個核心的接口來完成遠程Step的任務,分別是ChunkProvider與ChunkProcessor。
ChunkProvider:根據給定的ItemReader操作產生批量的Chunk操作;ChunkProcessor:負責獲取ChunkProvider產生的Chunk操作,執行具體的寫邏輯;Spring Batch中對遠程Step沒有默認的實現,但我們可以借助SI或者AMQP實現來實現遠程通訊能力。
基于SI實現Remote Chunking模式的示例:
Step本地節點負責讀取數據,并通過MessagingGateway將請求發送到遠程Step上;遠程Step提供了隊列的監聽器,當請求隊列中有消息時候獲取請求信息并交給ChunkHander負責處理。
接下來我們看下最后一種分區模式;Partitioning Step:分區模式需要對數據的結構有一定的了解,如主鍵的范圍、待處理的文件的名字等。
這種模式的優點在于分區中每一個元素的處理器都能夠像一個普通Spring Batch任務的單步一樣運行,也不必去實現任何特殊的或是新的模式,來讓他們能夠更容易配置與測試。
通過分區可以實現以下的優點:
分區實現了更細粒度的擴展;基于分區可以實現高性能的數據切分;分區比遠程通常具有更高的擴展性;分區后的處理邏輯,支持本地與遠程兩種模式;分區作業典型的可以分成兩個處理階段,數據分區、分區處理;數據分區:根據特殊的規則(例如:根據文件名稱,數據的唯一性標識,或者哈希算法)將數據進行合理的數據切片,為不同的切片生成數據執行上下文Execution Context、作業步執行器Step Execution。可以通過接口Partitioner生成自定義的分區邏輯,Spring Batch批處理框架默認實現了對多文件的實現org.springframework.batch.core.partition.support.MultiResourcePartitioner;也可以自行擴展接口Partitioner來實現自定義的分區邏輯。
分區處理:通過數據分區后,不同的數據已經被分配到不同的作業步執行器中,接下來需要交給分區處理器進行作業,分區處理器可以本地執行也可以遠程執行被劃分的作業。接口PartitionHandler定義了分區處理的邏輯,Spring Batch批處理框架默認實現了本地多線程的分區處理org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;也可以自行擴展接口PartitionHandler來實現自定義的分區處理邏輯。
Spring Batch框架提供了對文件分區的支持,實現類org.springframework.batch.core.partition.support.MultiResourcePartitioner提供了對文件分區的默認支持,根據文件名將不同的文件處理進行分區,提升處理的速度和效率,適合有大量小文件需要處理的場景。
示例展示了將不同文件分配到不同的作業步中,使用MultiResourcePartitioner進行分區,意味著每個文件會被分配到一個不同的分區中。如果有其它的分區規則,可以通過實現接口Partitioner來進行自定義的擴展。有興趣的TX,可以自己實現基于數據庫的分區能力哦。
總結一下,批處理框架在擴展性上提供了4中不同能力,每種都是各自的使用場景,我們可以根據實際的業務需要進行選擇。
批處理框架的不足與增強
Spring Batch批處理框架雖然提供了4種不同的監控方式,但從目前的使用情況來看,都不是非常的友好。
通過DB直接查看,對于管理人員來講,真的不忍直視;通過API實現自定義的查詢,這是程序員的天堂,確實運維人員的地獄;提供了Web控制臺,進行Job的監控和操作,目前提供的功能太裸露,無法直接用于生產;提供JMX查詢方式,對于非開發人員太不友好;但在企業級應用中面對批量數據處理,僅僅提供批處理框架僅能滿足批處理作業的快速開發、執行能力。
企業需要統一的批處理平臺來處理復雜的企業批處理應用,批處理平臺需要解決作業的統一調度、批處理作業的集中管理和管控、批處理作業的統一監控等能力。
那完美的解決方案是什么呢?
企業級批處理平臺需要在Spring Batch批處理框架的基礎上,集成調度框架,通過調度框架可以將任務按照企業的需求進行任務的定期執行;豐富目前Spring Batch Admin(Spring Batch的管理監控平臺,目前能力比較薄弱)框架,提供對Job的統一管理功能,增強Job作業的監控、預警等能力;通過與企業的組織機構、權限管理、認證系統進行合理的集成,增強平臺對Job作業的權限控制、安全管理能力。
由于時間關系,今天的分享就到這里,很多內容未能展開討論。歡迎大家在實際業務中使用Spring Batch框架。