Hadoop(大數(shù)據(jù)分析領(lǐng)域無可爭(zhēng)辯的王者)專注于批處理。這種模型對(duì)許多情形(比如為網(wǎng)頁建立索引)已經(jīng)足夠,但還存在其他一些使用模型,它們需要來自高度動(dòng)態(tài)的來源的實(shí)時(shí)信息。為了解決這個(gè)問題,就得借助 Nathan Marz 推出的 Storm(現(xiàn)在在 Twitter 中稱為 BackType)。Storm 不處理靜態(tài)數(shù)據(jù),但它處理預(yù)計(jì)會(huì)連續(xù)的流數(shù)據(jù)。考慮到 Twitter 用戶每天生成 1.4 億條推文 (tweet),那么就很容易看到此技術(shù)的巨大用途。
但 Storm 不只是一個(gè)傳統(tǒng)的大數(shù)據(jù)分析系統(tǒng):它是復(fù)雜事件處理 (CEP) 系統(tǒng)的一個(gè)示例。CEP 系統(tǒng)通常分類為計(jì)算和面向檢測(cè),其中每個(gè)系統(tǒng)都可通過用戶定義的算法在 Storm 中實(shí)現(xiàn)。舉例而言,CEP 可用于識(shí)別事件洪流中有意義的事件,然后實(shí)時(shí)地處理這些事件。
Nathan Marz 提供了在 Twitter 中使用 Storm 的大量示例。一個(gè)最有趣的示例是生成趨勢(shì)信息。Twitter 從海量的推文中提取所浮現(xiàn)的趨勢(shì),并在本地和國(guó)家級(jí)別維護(hù)它們。這意味著當(dāng)一個(gè)案例開始浮現(xiàn)時(shí),Twitter 的趨勢(shì)主題算法就會(huì)實(shí)時(shí)識(shí)別該主題。這種實(shí)時(shí)算法在 Storm 中實(shí)現(xiàn)為 Twitter 數(shù)據(jù)的一種連續(xù)分析。
什么是 “大數(shù)據(jù)”?
大數(shù)據(jù) 指的是海量無法通過傳統(tǒng)方式管理的數(shù)據(jù)。互聯(lián)網(wǎng)范圍的數(shù)據(jù)正在推動(dòng)能夠處理這類新數(shù)據(jù)的新架構(gòu)和應(yīng)用程序的創(chuàng)建。這些架構(gòu)高度可擴(kuò)展,且能夠跨無限多的服務(wù)器并行、高效地處理數(shù)據(jù)。
Storm 與傳統(tǒng)的大數(shù)據(jù)
Storm 與其他大數(shù)據(jù)解決方案的不同之處在于它的處理方式。Hadoop 在本質(zhì)上是一個(gè)批處理系統(tǒng)。數(shù)據(jù)被引入 Hadoop 文件系統(tǒng) (HDFS) 并分發(fā)到各個(gè)節(jié)點(diǎn)進(jìn)行處理。當(dāng)處理完成時(shí),結(jié)果數(shù)據(jù)返回到 HDFS 供始發(fā)者使用。Storm 支持創(chuàng)建拓?fù)浣Y(jié)構(gòu)來轉(zhuǎn)換沒有終點(diǎn)的數(shù)據(jù)流。不同于 Hadoop 作業(yè),這些轉(zhuǎn)換從不停止,它們會(huì)持續(xù)處理到達(dá)的數(shù)據(jù)。
大數(shù)據(jù)實(shí)現(xiàn)
Hadoop 的核心是使用 Java 語言編寫的,但支持使用各種語言編寫的數(shù)據(jù)分析應(yīng)用程序。最新的應(yīng)用程序的實(shí)現(xiàn)采用了更加深?yuàn)W的路線,以充分利用現(xiàn)代語言和它們的特性。例如,位于伯克利的加利福尼亞大學(xué) (UC) 的 Spark 是使用 Scala 語言實(shí)現(xiàn)的,而 Twitter Storm 是使用 Clojure(發(fā)音同 closure)語言實(shí)現(xiàn)的。
Clojure 是 Lisp 語言的一種現(xiàn)代方言。類似于 Lisp,Clojure 支持一種功能性編程風(fēng)格,但 Clojure 還引入了一些特性來簡(jiǎn)化多線程編程(一種對(duì)創(chuàng)建 Storm 很有用的特性)。Clojure 是一種基于虛擬機(jī) (VM) 的語言,在 Java 虛擬機(jī)上運(yùn)行。但是,盡管 Storm 是使用 Clojure 語言開發(fā)的,您仍然可以在 Storm 中使用幾乎任何語言編寫應(yīng)用程序。所需的只是一個(gè)連接到 Storm 的架構(gòu)的適配器。已存在針對(duì) Scala、JRuby、Perl 和 PHP 的適配器,但是還有支持流式傳輸?shù)?Storm 拓?fù)浣Y(jié)構(gòu)中的結(jié)構(gòu)化查詢語言適配器。
Storm 的關(guān)鍵屬性
Storm 實(shí)現(xiàn)的一些特征決定了它的性能和可靠性的。Storm 使用 ZeroMQ 傳送消息,這就消除了中間的排隊(duì)過程,使得消息能夠直接在任務(wù)自身之間流動(dòng)。在消息的背后,是一種用于序列化和反序列化 Storm 的原語類型的自動(dòng)化且高效的機(jī)制。
Storm 的一個(gè)最有趣的地方是它注重容錯(cuò)和管理。Storm 實(shí)現(xiàn)了有保障的消息處理,所以每個(gè)元組都會(huì)通過該拓?fù)浣Y(jié)構(gòu)進(jìn)行全面處理;如果發(fā)現(xiàn)一個(gè)元組還未處理,它會(huì)自動(dòng)從噴嘴處重放。Storm 還實(shí)現(xiàn)了任務(wù)級(jí)的故障檢測(cè),在一個(gè)任務(wù)發(fā)生故障時(shí),消息會(huì)自動(dòng)重新分配以快速重新開始處理。Storm 包含比 Hadoop 更智能的處理管理,流程會(huì)由監(jiān)管員來進(jìn)行管理,以確保資源得到充分使用。
Storm 模型
Storm 實(shí)現(xiàn)了一種數(shù)據(jù)流模型,其中數(shù)據(jù)持續(xù)地流經(jīng)一個(gè)轉(zhuǎn)換實(shí)體網(wǎng)絡(luò)(參見 圖 1)。一個(gè)數(shù)據(jù)流的抽象稱為一個(gè)流,這是一個(gè)無限的元組序列。元組就像一種使用一些附加的序列化代碼來表示標(biāo)準(zhǔn)數(shù)據(jù)類型(比如整數(shù)、浮點(diǎn)和字節(jié)數(shù)組)或用戶定義類型的結(jié)構(gòu)。每個(gè)流由一個(gè)惟一 ID 定義,這個(gè) ID 可用于構(gòu)建數(shù)據(jù)源和接收器 (sink) 的拓?fù)浣Y(jié)構(gòu)。流起源于噴嘴,噴嘴將數(shù)據(jù)從外部來源流入 Storm 拓?fù)浣Y(jié)構(gòu)中。
圖 1. 一個(gè)普通的 Storm 拓?fù)浣Y(jié)構(gòu)的概念性架構(gòu)
接收器(或提供轉(zhuǎn)換的實(shí)體)稱為螺栓。螺栓實(shí)現(xiàn)了一個(gè)流上的單一轉(zhuǎn)換和一個(gè) Storm 拓?fù)浣Y(jié)構(gòu)中的所有處理。螺栓既可實(shí)現(xiàn) MapReduce 之類的傳統(tǒng)功能,也可實(shí)現(xiàn)更復(fù)雜的操作(單步功能),比如過濾、聚合或與數(shù)據(jù)庫(kù)等外部實(shí)體通信。典型的 Storm 拓?fù)浣Y(jié)構(gòu)會(huì)實(shí)現(xiàn)多個(gè)轉(zhuǎn)換,因此需要多個(gè)具有獨(dú)立元組流的螺栓。噴嘴和螺栓都實(shí)現(xiàn)為 Linux 系統(tǒng)中的一個(gè)或多個(gè)任務(wù)。
可使用 Storm 為詞頻輕松地實(shí)現(xiàn) MapReduce 功能。如 圖 2 中所示,噴嘴生成文本數(shù)據(jù)流,螺栓實(shí)現(xiàn) Map 功能(令牌化一個(gè)流的各個(gè)單詞)。來自 “map” 螺栓的流然后流入一個(gè)實(shí)現(xiàn) Reduce 功能的螺栓中(以將單詞聚合到總數(shù)中)。
圖 2. MapReduce 功能的簡(jiǎn)單 Storm 拓?fù)浣Y(jié)構(gòu)
請(qǐng)注意,螺栓可將數(shù)據(jù)傳輸?shù)蕉鄠€(gè)螺栓,也可接受來自多個(gè)來源的數(shù)據(jù)。Storm 擁有流分組 的概念,流分組實(shí)現(xiàn)了混排 (shuffling)(隨機(jī)但均等地將元組分發(fā)到螺栓)或字段分組(根據(jù)流的字段進(jìn)行流分區(qū))。還存在其他流分組,包括生成者使用自己的內(nèi)部邏輯路由元組的能力。
但是,Storm 架構(gòu)中一個(gè)最有趣的特性是有保障的消息處理。Storm 可保證一個(gè)噴嘴發(fā)射出的每個(gè)元組都會(huì)處理;如果它在超時(shí)時(shí)間內(nèi)沒有處理,Storm 會(huì)從該噴嘴重放該元組。此功能需要一些聰明的技巧來在拓?fù)浣Y(jié)構(gòu)中跟蹤元素,也是 Storm 的重要的附加價(jià)值之一。
除了支持可靠的消息傳送外,Storm 還使用 ZeroMQ 最大化消息傳送性能(刪除中間排隊(duì),實(shí)現(xiàn)消息在任務(wù)間的直接傳送)。ZeroMQ 合并了擁塞檢測(cè)并調(diào)整了它的通信,以優(yōu)化可用的帶寬。
Storm 示例演示
現(xiàn)在讓我們通過實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 MapReduce 拓?fù)浣Y(jié)構(gòu)的代碼(參見 清單 1),看一下 Storm 示例。這個(gè)示例使用了來自 Nathan 的 Storm 入門工具包(可從 GitHub 獲取)(參見 參考資料 獲取鏈接)的巧妙設(shè)計(jì)的字?jǐn)?shù)示例。此示例演示了 圖 2 中所示的拓?fù)浣Y(jié)構(gòu),它實(shí)現(xiàn)了一個(gè)包含一個(gè)螺栓的 map 轉(zhuǎn)換和包含一個(gè)螺栓的 reduce 轉(zhuǎn)換。
清單 1. 為圖 2 中的 Storm 構(gòu)建一個(gè)拓?fù)浣Y(jié)構(gòu)
01 TopologyBuilder builder = new TopologyBuilder();
02
03 builder.setSpout("spout", new RandomSentenceSpout(), 5);
04
05 builder.setBolt("map", new SplitSentence(), 4)
06 .shuffleGrouping("spout");
07
08 builder.setBolt("reduce", new WordCount(), 8)
09 .fieldsGrouping("map", new Fields("word"));
10
11 Config conf = new Config();
12 conf.setDebug(true);
13
14 LocalCluster cluster = new LocalCluster();
15 cluster.submitTopology("word-count", conf, builder.createTopology());
16
17 Thread.sleep(10000);
18
19 cluster.shutdown();
清單 1(添加了行號(hào)以供引用)首先使用 TopologyBuilder 聲明一個(gè)新拓?fù)浣Y(jié)構(gòu)。接下來在第 3 行,定義了一個(gè)噴嘴(名為 spout),該噴嘴包含一個(gè) RandomSentenceSpout。RandomSentenceSpout 類(也就是 nextTuple 方法)發(fā)出 5 個(gè)隨機(jī)句子的其中一個(gè)作為它的數(shù)據(jù)。setSpout 方法末尾的 5 參數(shù)是一個(gè)并行性提示(或要為此活動(dòng)創(chuàng)建的任務(wù)數(shù))。
在第 5 和 6 行。我定義了第一個(gè)螺栓(或算法轉(zhuǎn)換實(shí)體),在本例中為 map(或 split)螺栓。這個(gè)螺栓使用 SplitSentence 令牌化輸入流并將其作為輸出的各個(gè)單詞發(fā)出。請(qǐng)注意,第 6 行使用了 shuffleGrouping,它定義了對(duì)此螺栓(在本例中為 “spout”)的輸入訂閱,還將流分組定義為混排。這種混排分組意味著來自噴嘴的輸入將混排 或隨機(jī)分發(fā)給此螺栓中的任務(wù)(該螺栓已提示具有 4 任務(wù)并行性)。
在第 8 和 9 行,我定義了最后一個(gè)螺栓,這個(gè)螺栓實(shí)際上用于 reduce 元素,使用該元素的輸入作為 map 螺栓。WordCount 方法實(shí)現(xiàn)了必要的字?jǐn)?shù)統(tǒng)計(jì)行為(將相似的單詞分組到一起,以維護(hù)總數(shù)),但不是混排的,所以它的輸出是一致的。如果有多個(gè)任務(wù)在實(shí)現(xiàn) reduce 行為,那么您最終會(huì)得到分段的計(jì)數(shù),而不是總數(shù)。
第 11 和 12 行創(chuàng)建和定義了一個(gè)配置對(duì)象并啟用了 Debug 模式。Config 類包含大量配置可能性(參見 參考資料,獲取有關(guān) Storm 類樹的更多信息的鏈接)。
第 14 和 15 行創(chuàng)建了本地集群(在本例中,用于定義本地模式的用途)。我定義了我的本地集群、配置對(duì)象和拓?fù)浣Y(jié)構(gòu)的名稱(可通過builder 類的 createTopology 元素獲取)。
最后,在第 17 行,Storm 休眠一段時(shí)間,然后在第 19 行關(guān)閉集群。請(qǐng)記住,Storm 是一個(gè)持續(xù)運(yùn)行的操作系統(tǒng),所以任務(wù)可存在相當(dāng)長(zhǎng)時(shí)間,不斷處理它們訂閱的流上的新元組。
您可在 Storm 入門工具包中了解這個(gè)非常簡(jiǎn)單的實(shí)現(xiàn)的更多信息,包括噴嘴和螺栓的細(xì)節(jié)。
使用 Storm
Nathan Marz 編寫了一組簡(jiǎn)單易懂的文檔,詳細(xì)介紹了如何安裝 Storm 來執(zhí)行集群模式和本地模式的操作。本地模式無需一個(gè)龐大的節(jié)點(diǎn)集群,即可使用 Storm。如果需要在一個(gè)集群中使用 Storm 但缺乏節(jié)點(diǎn),也可在 Amazon Elastic Compute Cloud (EC2) 中實(shí)現(xiàn)一個(gè) Storm 集群。請(qǐng)參見 參考資料 獲取每個(gè) Storm 模式(本地、集群和 Amazon EC2)的參考信息。
其他開源的大數(shù)據(jù)解決方案
自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個(gè)使用原始 MapReduce 范式(或擁有該范式的質(zhì)量)的解決方案。Google 對(duì) MapReduce 的最初應(yīng)用是建立萬維網(wǎng)的索引。盡管此應(yīng)用程序仍然很流行,但這個(gè)簡(jiǎn)單模型解決的問題也正在增多。
表 1 提供了一個(gè)可用開源大數(shù)據(jù)解決方案的列表,包括傳統(tǒng)的批處理和流式處理應(yīng)用程序。在將 Storm 引入開源之前將近一年的時(shí)間里,Yahoo! 的 S4 分布式流計(jì)算平臺(tái)已向 Apache 開源。S4 于 2010 年 10 月發(fā)布,它提供了一個(gè)高性能計(jì)算 (HPC) 平臺(tái),向應(yīng)用程序開發(fā)人員隱藏了并行處理的復(fù)雜性。S4 實(shí)現(xiàn)了一個(gè)可擴(kuò)展的、分散化的集群架構(gòu),并納入了部分容錯(cuò)功能。
表 1. 開源大數(shù)據(jù)解決方案