考慮到系統(tǒng)使用的廣泛程度與成熟度,在具體舉例時一般會拿Hive和Impala為例,當然在調(diào)研的過程中也會涉及到一些其他系統(tǒng),如Spark SQL,Presto,TAJO等。而對于HAWQ這樣的商業(yè)產(chǎn)品和apache drill這樣成熟度還不是很高的開源方案就不做過多了解了。
系統(tǒng)架構(gòu)Runtime Framework v.s. MPP在SQL on Hadoop系統(tǒng)中,有兩種架構(gòu),一種是基于某個運行時框架來構(gòu)建查詢引擎,典型案例是Hive;另一種是仿照過去關(guān)系數(shù)據(jù)庫的MPP架構(gòu)。前者現(xiàn)有運行時框架,然后套上SQL層,后者則是從頭打造一個一體化的查詢引擎。有時我們能聽到一種聲音,說后者的架構(gòu)優(yōu)于前者,至少在性能上。那么是否果真如此?
一般來說,對于SQL on Hadoop系統(tǒng)很重要的一個評價指標就是:快。后面提到的所有內(nèi)容也大多是為了查詢速度更快。在Hive逐漸普及之后,就逐漸有了所謂交互式查詢的需求,因為無論是BI系統(tǒng),還是Ad-Hoc,都不能按照離線那種節(jié)奏玩。這時候無論是有實力的大公司(比如Facebook),還是專業(yè)的供應(yīng)商(比如Cloudera),都試圖去解決這個問題。短期可以靠商業(yè)方案或者關(guān)系數(shù)據(jù)庫去支撐一下,但是長遠的解決方案就是參考過去的MPP數(shù)據(jù)庫架構(gòu)打造一個專門的系統(tǒng),于是就有了Impala,Presto等等。從任務(wù)執(zhí)行的角度說,這類引擎的任務(wù)執(zhí)行其實跟DAG模型是類似的,當時也有Spark這個DAG模型的計算框架了,但這終究是別人家的孩子,而且往Spark上套SQL又是Hive的那種玩法了。于是在Impala問世之后就強調(diào)自己“計算全部在內(nèi)存中完成”,性能也是各種碾壓當時還只有MR作為計算模型的Hive。那么Hive所代表的“基于已有的計算模型”方式是否真的不行?
不可否認,按照這種方式去比較,那么類MPP模式確實有很多優(yōu)勢:
DAG v.s. MR:最主要的優(yōu)勢,中間結(jié)果不寫磁盤(除非內(nèi)存不夠),一氣呵成。流水線計算:上游stage一出結(jié)果馬上推送或者拉到下一個stage處理,比如多表join時前兩個表有結(jié)果直接給第三個表,不像MR要等兩個表完全join完再給第三個表join。高效的IO:本地查詢沒有多余的消耗,充分利用磁盤。這個后面細說。線程級別的并發(fā):相比之下MR每個task要啟動JVM,本身就有很大延遲,占用資源也多。當然MPP模式也有其劣勢,一個是擴展性不是很高,這在關(guān)系數(shù)據(jù)庫時代就已經(jīng)有過結(jié)論;另一個是容錯性差,對于Impala來說一旦運行過程中出點問題,整個查詢就掛了。
但是,經(jīng)過不斷的發(fā)展,Hive也能跑在DAG框架上了,不僅有Tez,還有Spark。上面提到的一些劣勢,其實大都也可以在計算模型中解決,只不過考慮到計算模型的通用性和本身的設(shè)計目標,不會去專門滿足(所以如果從這個角度分類,Impala屬于“專用系統(tǒng)”,Spark則屬于“通用系統(tǒng)”)。在最近Cloudera做的benchmark中,雖然Impala仍然一路領(lǐng)先,但是基于Spark的Spark SQL完全不遜色于Presto,基于Tez的Hive也不算很差,至少在并發(fā)模式下能超過Presto,足見MPP模式并不是絕對占上風的。所以這種架構(gòu)上的區(qū)別在我看來并不是制勝的關(guān)鍵,至少不是唯一的因素,真正要做到快速查詢,各個方面的細節(jié)都要有所把握。后面說的都是這些細節(jié)。
核心組件不管是上面提到的那種架構(gòu),一個SQL on Hadoop系統(tǒng)一般都會有一些通用的核心組件,這些組件根據(jù)設(shè)計者的考慮放在不同的節(jié)點角色中,在物理上節(jié)點都按照Master/Worker的方式去做,如果Master壓力太大,一些本來適合放在Master上的組件可以放到一個輔助Master上。
UI層負責提供用戶輸入查詢的接口。一般有Web/GUI,命令行,編程方式3類。QL層負責把用戶提交的查詢解析成可以運行的執(zhí)行計劃(比如MR Job)。這部分在后面會專門提到。執(zhí)行層就是運行具體的Job。一般會有一個Master負責Query的運行管理,比如申請資源,觀察進度等等,同時Master也負責最終聚合局部結(jié)果到全局結(jié)果。而每個節(jié)點上會有相應(yīng)的Worker做本地計算。IO層提供與存儲層交互的接口。對于HDFS來說,需要根據(jù)I/O Format把文件轉(zhuǎn)換成K/V,Serde再完成K/V到數(shù)據(jù)行的映射。對于非HDFS存儲來說就需要一些專門的handler/connector。存儲層一般是HDFS,但也有可以查詢NoSQL,或者關(guān)系數(shù)據(jù)庫的。系統(tǒng)另外還需要一個元數(shù)據(jù)管理服務(wù),管理表結(jié)構(gòu)等。執(zhí)行計劃編譯流程
從SQL到執(zhí)行計劃,大致分為5步。
第一步將SQL轉(zhuǎn)換成抽象語法樹AST。這一步一般都有第三方工具庫可以完成,比如antlr。第二步對AST進行語義分析,比如表是否存在,字段是否存在,SQL語義是否有誤(比如select中被判定為聚合的字段在group by中有沒有出現(xiàn))。第三步生成邏輯執(zhí)行計劃,這是一個由邏輯操作符組成的DAG。比如對于Hive來說掃表會產(chǎn)生TableScanOperator,聚合會產(chǎn)生GroupByOperator。對于類MPP系統(tǒng)來說,情況稍微有點不同。邏輯操作符的種類還是差不多,但是會先生成單機版本,然后生成多機版本。多機版本主要是把aggregate,join,還有top n這幾個操作并行化,比如aggregate會分成類似MR那樣的本地aggregate,shuffle和全局aggregate三步。第四步做邏輯執(zhí)行計劃做優(yōu)化,這步在下面單獨介紹。第五步把邏輯執(zhí)行計劃轉(zhuǎn)換成可以在機器上運行的物理計劃。對于Hive來說,就是MR/Tez Job等;對于Impala來說,就是plan fragment。其他類MPP系統(tǒng)也是類似的概念。物理計劃中的一個計算單元(或者說Job),有“輸入,處理,輸出”三要素組成,而邏輯執(zhí)行計劃中的operator相對粒度更細,一個邏輯操作符一般處于這三要素之一的角色。下面分別舉兩個例子,直觀的認識下SQL、邏輯計劃、物理計劃之間的關(guān)系,具體解釋各個operator的話會比較細碎,就不展開了。
Hive on MR:1 select count(1) from status_updates where ds = ‘2009-08-01′
Presto(引用自美團技術(shù)團隊,其中SubPlan就是物理計劃的一個計算單元):
select c1.rank, count(*)from dim.city c1 join dim.city c2 on c1.id = c2.idwhere c1.id > 10 group by c1.rank limit 10; 優(yōu)化器關(guān)于執(zhí)行計劃的優(yōu)化,雖然不一定是整個編譯流程中最難的部分,但卻是最有看點的部分,而且目前還在不斷發(fā)展中。Spark系之所以放棄Shark另起爐灶做Spark SQL,很大一部分原因是想自己做優(yōu)化策略,避免受Hive的限制,為此還專門獨立出優(yōu)化器組件Catalyst(當然Spark SQL目前還是非常新,其未來發(fā)展給人不少想象空間)。總之這部分工作可以不斷的創(chuàng)新,優(yōu)化器越智能,越傻瓜化,用戶就越能解放出來解決業(yè)務(wù)問題。
早期在Hive中只有一些簡單的規(guī)則優(yōu)化,比如謂詞下推(把過濾條件盡可能的放在table scan之后就完成),操作合并(連續(xù)的filter用and合并成一個operator,連續(xù)的projection也可以合并)。后來逐漸增加了一些略復(fù)雜的規(guī)則,比如相同key的join + group by合并為1個MR,還有star schema join。在Hive 0.12引入的相關(guān)性優(yōu)化(correlation optimizer)算是規(guī)則優(yōu)化的一個高峰,他能夠減少數(shù)據(jù)的重復(fù)掃描,具體來說,如果查詢的兩個部分用到了相同的數(shù)據(jù),并且各自做group by / join的時候用到了相同的key,這個時候由于數(shù)據(jù)源和shuffle的key是一樣的,所以可以把原來需要兩個job分別處理的地方合成一個job處理。
比如下面這個SQL:
SELECT sum(l_extendedprice) / 7.0 as avg_yearlyFROM (SELECT l_partkey, l_quantity, l_extendedprice FROM lineitem JOIN part ON (p_partkey=l_partkey) WHERE p_brand=‘Brand#35′ AND p_container = ‘MED PKG’)touterJOIN (SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq FROM lineitem GROUP BY l_partkey) tinnerON (touter.l_partkey = tinnter.lp)WHERE touter.l_quantity < tinner.lq這個查詢中兩次出現(xiàn)lineitem表,group by和兩處join用的都是l_partkey,所以本來兩個子查詢和一個join用到三個job,現(xiàn)在只需要用到一個job就可以完成。
但是,基于規(guī)則的優(yōu)化(RBO)不能解決所有問題。在關(guān)系數(shù)據(jù)庫中早有另一種優(yōu)化方式,也就是基于代價的優(yōu)化CBO。CBO通過收集表的數(shù)據(jù)信息(比如字段的基數(shù),數(shù)據(jù)分布直方圖等等)來對一些問題作出解答,其中最主要的問題就是確定多表join的順序。CBO通過搜索join順序的所有解空間(表太多的情況下可以用有限深度的貪婪算法),并且算出對應(yīng)的代價,可以找到最好的順序。這些都已經(jīng)在關(guān)系數(shù)據(jù)庫中得到了實踐。
目前Hive已經(jīng)啟動專門的項目,也就是Apache Optiq來做這個事情,而其他系統(tǒng)也沒有做的很好的CBO,所以這塊內(nèi)容還有很大的進步空間。
執(zhí)行效率即使有了高效的執(zhí)行計劃,如果在運行過程本身效率較低,那么再好的執(zhí)行計劃也會大打折扣。這里主要關(guān)注CPU和IO方面的執(zhí)行效率。
CPU在具體的計算執(zhí)行過程中,低效的cpu會導(dǎo)致系統(tǒng)的瓶頸落在CPU上,導(dǎo)致IO無法充分利用。在一項針對Impala和Hive的對比時發(fā)現(xiàn),Hive在某些簡單查詢上(TPC-H Query 1)也比Impala慢主要是因為Hive運行時完全處于CPU bound的狀態(tài)中,磁盤IO只有20%,而Impala的IO至少在85%。
在SQL on Hadoop中出現(xiàn)CPU bound的主要原因有以下幾種:
大量虛函數(shù)調(diào)用:這個問題在多處出現(xiàn),比如對于a + 2 * b之類的表達式計算,解釋器會構(gòu)造一個expression tree,解釋的過程就是遞歸調(diào)用子節(jié)點做evaluation的過程。又比如以DAG形式的operator/task在執(zhí)行的過程中,上游節(jié)點會層層調(diào)用下游節(jié)點來獲取產(chǎn)生的數(shù)據(jù)。這些都會產(chǎn)生大量的調(diào)用。類型裝箱:由于表達式解釋器需要對不同數(shù)據(jù)類型的變量做解釋,所以在Java中需要把這些本來是primitive的變量包裝成Object,累積起來也消耗不少資源。這算是上面一個問題附帶出來的。branch instruction: 現(xiàn)在的CPU都是有并行流水線的,但是如果出現(xiàn)條件判斷會導(dǎo)致無法并行。這種情況可能出現(xiàn)在判斷數(shù)據(jù)的類型(是string還是int),或者在判斷某一列是否因為其他字段的過濾條件導(dǎo)致本行不需要被讀取(列存儲情況下)。cache miss:每次處理一行數(shù)據(jù)的方式導(dǎo)致cpu cache命中率不高。(這么說已經(jīng)暗示了解決方案)針對上面的問題,目前大多數(shù)系統(tǒng)中已經(jīng)加入了以下兩個解決辦法中至少一個。
一個方法是動態(tài)代碼生成,也就是不使用解釋性的統(tǒng)一代碼。比如a + 2 * b這個表達式就會生成對應(yīng)的執(zhí)行語言的代碼,而且可以直接用primitive type,而不是用固定的解釋性代碼。具體實現(xiàn)來說,JVM系的如Spark SQL,Presto可以用反射,C++系的Impala則使用了llvm生成中間碼。對于判斷數(shù)據(jù)類型造成的分支判斷,動態(tài)代碼的效果可以消除這些類型判斷,還可以展開循環(huán),可以對比下面這段代碼,左邊是解釋性代碼,右邊是動態(tài)生成代碼。
另一個方法是vectorization(向量化),基本思路是放棄每次處理一行的模式,改用每次處理一小批數(shù)據(jù)(比如1k行),當然前提條件是使用列存儲格式。這樣一來,這一小批連續(xù)的數(shù)據(jù)可以放進cache里面,cpu不僅減少了branch instruction,甚至可以用SIMD加快處理速度。具體的實現(xiàn)參考下面的代碼,對一個long型的字段增加一個常量。通過把數(shù)據(jù)表示成數(shù)組,過濾條件也用selVec裝進數(shù)組,形成了很緊湊的循環(huán):
add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec){ if (selVec == null) for (int i = 0; i < vecNum; i++) result[i] = col1[i] + col2[i]; else for (int i = 0; i < vecNum; i++) { int selIdx = selVec[i]; result[selIdx] = col1[selIdx] + col2[selIdx]; }}IO由于SQL on Hadoop存儲數(shù)據(jù)都是在HDFS上,所以IO層的優(yōu)化其實大多數(shù)都是HDFS的事情,各大查詢引擎則提出需求去進行推動。要做到高效IO,一方面要低延遲,屏蔽不必要的消耗;另一方面要高吞吐,充分利用每一塊磁盤。目前與這方面有關(guān)的特性有:
short-circuit local reads:當發(fā)現(xiàn)讀取的數(shù)據(jù)是本地數(shù)據(jù)時,不走DataNode(因為要走一次socket連接),而是用DFS Client直接讀本地的block replica。HDFS參數(shù)是dfs.client.read.shortcircuit和dfs.domain.socket.path。zero copy:避免數(shù)據(jù)在內(nèi)核buffer和用戶buffer之間反復(fù)copy,在早期的HDFS中已經(jīng)有這個默認實現(xiàn)。disk-aware scheduling:通過知道每個block所在磁盤,可以在調(diào)度cpu資源時讓不同的cpu讀不同的磁盤,避免查詢內(nèi)和查詢間的IO競爭。HDFS參數(shù)是dfs.datanode.hdfs-blocks-metadata.enabled。存儲格式對于分析類型的workload來說,最好的存儲格式自然是列存儲,這已經(jīng)在關(guān)系數(shù)據(jù)庫時代得到了證明。目前hadoop生態(tài)中有兩大列存儲格式,一個是由Hortonworks和Microsoft開發(fā)的ORCFile,另一個是由Cloudera和Twitter開發(fā)的Parquet。
ORCFile顧名思義,是在RCFile的基礎(chǔ)之上改造的。RCFile雖然號稱列存儲,但是只是“按列存儲”而已,將數(shù)據(jù)先劃分成row group,然后row group內(nèi)部按照列進行存儲。這其中沒有列存儲的一些關(guān)鍵特性,而這些特性在以前的列式數(shù)據(jù)庫中(比如我以前用過的Infobright)早已用到。好在ORCFile已經(jīng)彌補了這些特性,包括:
塊過濾與塊統(tǒng)計:每一列按照固定行數(shù)或大小進一步切分,對于切分出來的每一個數(shù)據(jù)單元,預(yù)先計算好這些單元的min/max/sum/count/null值,min/max用于在過濾數(shù)據(jù)的時候直接跳過數(shù)據(jù)單元,而所有這些統(tǒng)計值則可以在做聚合操作的時候直接采用,而不必解開這個數(shù)據(jù)單元做進一步的計算。更高效的編碼方式:RCFile中沒有標注每一列的類型,事實上當知道數(shù)據(jù)類型時,可以采取特定的編碼方式,本身就能很大程度上進行數(shù)據(jù)的壓縮。常見的針對列存儲的編碼方式有RLE(大量重復(fù)數(shù)據(jù)),字典(字符串),位圖(數(shù)字且基數(shù)不大),級差(排序過的數(shù)據(jù),比如日志中用戶訪問時間)等等。ORCFile的結(jié)構(gòu)如下圖,數(shù)據(jù)先按照默認256M分為row group,也叫strip。每個strip配一個index,存放每個數(shù)據(jù)單元(默認10000行)的min/max值用于過濾;數(shù)據(jù)按照上面提到的編碼方式序列化成stream,然后再進行snappy或gz壓縮。footer提供讀取stream的位置信息,以及更多的統(tǒng)計值如sum/count等。尾部的file footer和post script提供全局信息,如每個strip的行數(shù),各列數(shù)據(jù)類型,壓縮參數(shù)等。
Parquet的設(shè)計原理跟ORC類似,不過它有兩個特點:
通用性:相比ORCFile專門給Hive使用而言,Parquet不僅僅是給Impala使用,還可以給其他查詢工具使用,如Hive、Pig,進一步還能對接avro/thrift/pb等序列化格式。基于Dremel思想的嵌套格式存儲:關(guān)系數(shù)據(jù)庫設(shè)計模式中反對存儲復(fù)雜格式(違反第一范式),但是現(xiàn)在的大數(shù)據(jù)計算不僅出現(xiàn)了這種需求(半結(jié)構(gòu)化數(shù)據(jù)),也能夠高效的實現(xiàn)存儲和查詢效率,在語法上也有相應(yīng)的支持(各種UDF,Hive的lateral view等)。Google Dremel就在實現(xiàn)層面做出了范例,Parquet則完全仿照了Dremel。對嵌套格式做列存儲的難點在于,存儲時需要標記某個數(shù)據(jù)對應(yīng)于哪一個存儲結(jié)構(gòu),或者說是哪條記錄,所以需要用數(shù)據(jù)清楚的進行標記。 在Dremel中提出用definition level和repetition level來進行標記。definition level指的是,這條記錄在嵌套結(jié)構(gòu)中所處于第幾層,而repetition level指的是,這條記錄相對上一條記錄,在第幾層重復(fù)。比如下圖是一個二級嵌套數(shù)組。圖中的e跟f在都屬于第二層的重復(fù)記錄(同一個level2),所以f的r值為2,而c跟d則是不同的level2,但屬于同一個level1,所以d的r值為1。對于頂層而言(新的一個嵌套結(jié)構(gòu)),r值就為0。
但是僅僅這樣還不夠。上圖說明了r值的作用,但是還沒有說明d值的作用,因為按照字面解釋,d值對于每一個字段都是可以根據(jù)schema得到的,那為什么還要從行記錄級別標記?這是因為記錄中會插入一些null值,這些null值代表著他們“可以存在”但是因為是repeated或者是optional所以沒有值的情況,null值是用來占位的(或者說是“想象”出來的),所以他們的值需要單獨計算。null的d值就是說這個結(jié)構(gòu)往上追溯到哪一層(不包括平級)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一個null在code = en所在的結(jié)構(gòu)里面,那么language不是null(不考慮code,他跟country平級),他就是第二層;又比如country的第二個null在url = http://B 所在的結(jié)構(gòu)里面,那么name不是null(不考慮url,因為他跟本來就是null的language平級),所以就是第一層。
通過這種方式,就對一個樹狀的嵌套格式完成了存儲。在讀取的時候可以通過構(gòu)造一個狀態(tài)機進行遍歷。
有意思的是,雖然parquet支持嵌套格式,但是Impala還沒有來得及像Hive那樣增加array,map,struct等復(fù)雜格式,當然這項功能已經(jīng)被列入roadmap了,相信不久就會出現(xiàn)。
在最近我們做的Impala2.0測試中,順便測試了存儲格式的影響。parquet相比sequencefile在壓縮比上達到1:5,查詢性能也相差5-10倍,足見列存儲一項就給查詢引擎帶來的提升。
資源控制運行時資源調(diào)整對于一個MR Job,reduce task的數(shù)量一直是需要人為估算的一個麻煩事,基于MR的Hive也只是根據(jù)數(shù)據(jù)源大小粗略的做估計,不考慮具體的Job邏輯。但是在之后的框架中考慮到了這個情況,增加了運行時調(diào)整資源分配的功能。Tez中引入了vertex manager,可以根據(jù)運行時收集到的數(shù)據(jù)智能的判斷reduce動作需要的task。類似的功能在TAJO中也有提到,叫progressive query optimization,而且TAJO不僅能做到調(diào)整task數(shù)量,還能調(diào)整join順序。
資源集成在Hadoop已經(jīng)進入2.x的時代,所有想要得到廣泛應(yīng)用的SQL on Hadoop系統(tǒng)勢必要能與YARN進行集成。雖然這是一個有利于資源合理利用的好事,但是由于加入了YARN這一層,卻給系統(tǒng)的性能帶來了一定的障礙,因為啟動AppMaster和申請container也會占用不少時間,尤其是前者,而且container的供應(yīng)如果時斷時續(xù),那么會極大的影響時效性。在Tez和Impala中對這些問題給出了相應(yīng)的解決辦法:
AppMaster啟動延遲的問題,采取long lived app master,AppMaster啟動后長期駐守,而非像是MR那樣one AM per Job。具體實現(xiàn)時,可以給fair scheduler或capacity scheduler配置的每個隊列配上一個AM池,有一定量的AM為提交給這個隊列的任務(wù)服務(wù)。container供應(yīng)的問題,在Tez中采取了container復(fù)用的方式,有點像jvm復(fù)用,即container用完以后不馬上釋放,等一段時間,實在是沒合適的task來接班了再釋放,這樣不僅減少container斷供的可能,而且可以把上一個task留下的結(jié)果cache住給下一個task復(fù)用,比如做map join;Impala則采取比較激進的方式,一次性等所有的container分配到位了才開始執(zhí)行查詢,這種方式也能讓它的流水線式的計算不至于阻塞。其他到這里為止,已經(jīng)從上到下順了一遍各個層面用到的技術(shù),當然SQL on Hadoop本身就相當復(fù)雜,涉及到方方面面,時間精力有限不可能一一去琢磨。比如其他一些具有技術(shù)復(fù)雜度的功能有:
多數(shù)據(jù)源查詢:Presto支持從MySQL,Cassandra,甚至Kafka中去讀取數(shù)據(jù),這就大大減少了數(shù)據(jù)整合時間,不需要放到HDFS里才能查詢。Impala和Hive也支持查詢HBase。國內(nèi)也有類似的工作,如秒針改造Impala使之能查詢postgres。近似查詢:count distinct(基數(shù)估計)一直是SQL性能殺手之一,如果能接受一定誤差的話可以采用近似算法。Impala中已經(jīng)實現(xiàn)了近似算法(ndv),Presto則是請BlinkDB合作完成。兩者都是采用了HyperLogLog Counting。當然,不僅僅是count distinct可以使用近似算法,其他的如取中位數(shù)之類的也可以用。結(jié)束語盡管現(xiàn)在相關(guān)系統(tǒng)已經(jīng)很多,也經(jīng)過了幾年的發(fā)展,但是目前各家系統(tǒng)仍然在不斷的進行完善,比如:
增加分析函數(shù),復(fù)雜數(shù)據(jù)類型,SQL語法集的擴展。對于已經(jīng)成形的技術(shù)也在不斷的改進,如列存儲還可以增加更多的encoding方式。甚至對于像CBO這樣的領(lǐng)域,開源界拿出來的東西還算是剛剛起步,相比HAWQ中的ORCA這種商業(yè)系統(tǒng)提供的優(yōu)化器還差的很多。畢竟相比已經(jīng)比較成熟的關(guān)系數(shù)據(jù)庫,分布式環(huán)境下需要解決的問題更多,未來一定還會出現(xiàn)很多精彩的技術(shù)實踐,讓我們在海量數(shù)據(jù)中更快更方便的查到想要的數(shù)據(jù)。