考慮到系統使用的廣泛程度與成熟度,在具體舉例時一般會拿Hive和Impala為例,當然在調研的過程中也會涉及到一些其他系統,如Spark SQL,Presto,TAJO等。而對于HAWQ這樣的商業產品和apache drill這樣成熟度還不是很高的開源方案就不做過多了解了。
系統架構Runtime Framework v.s. MPP在SQL on Hadoop系統中,有兩種架構,一種是基于某個運行時框架來構建查詢引擎,典型案例是Hive;另一種是仿照過去關系數據庫的MPP架構。前者現有運行時框架,然后套上SQL層,后者則是從頭打造一個一體化的查詢引擎。有時我們能聽到一種聲音,說后者的架構優于前者,至少在性能上。那么是否果真如此?
一般來說,對于SQL on Hadoop系統很重要的一個評價指標就是:快。后面提到的所有內容也大多是為了查詢速度更快。在Hive逐漸普及之后,就逐漸有了所謂交互式查詢的需求,因為無論是BI系統,還是Ad-Hoc,都不能按照離線那種節奏玩。這時候無論是有實力的大公司(比如Facebook),還是專業的供應商(比如Cloudera),都試圖去解決這個問題。短期可以靠商業方案或者關系數據庫去支撐一下,但是長遠的解決方案就是參考過去的MPP數據庫架構打造一個專門的系統,于是就有了Impala,Presto等等。從任務執行的角度說,這類引擎的任務執行其實跟DAG模型是類似的,當時也有Spark這個DAG模型的計算框架了,但這終究是別人家的孩子,而且往Spark上套SQL又是Hive的那種玩法了。于是在Impala問世之后就強調自己“計算全部在內存中完成”,性能也是各種碾壓當時還只有MR作為計算模型的Hive。那么Hive所代表的“基于已有的計算模型”方式是否真的不行?
不可否認,按照這種方式去比較,那么類MPP模式確實有很多優勢:
DAG v.s. MR:最主要的優勢,中間結果不寫磁盤(除非內存不夠),一氣呵成。流水線計算:上游stage一出結果馬上推送或者拉到下一個stage處理,比如多表join時前兩個表有結果直接給第三個表,不像MR要等兩個表完全join完再給第三個表join。高效的IO:本地查詢沒有多余的消耗,充分利用磁盤。這個后面細說。線程級別的并發:相比之下MR每個task要啟動JVM,本身就有很大延遲,占用資源也多。當然MPP模式也有其劣勢,一個是擴展性不是很高,這在關系數據庫時代就已經有過結論;另一個是容錯性差,對于Impala來說一旦運行過程中出點問題,整個查詢就掛了。
但是,經過不斷的發展,Hive也能跑在DAG框架上了,不僅有Tez,還有Spark。上面提到的一些劣勢,其實大都也可以在計算模型中解決,只不過考慮到計算模型的通用性和本身的設計目標,不會去專門滿足(所以如果從這個角度分類,Impala屬于“專用系統”,Spark則屬于“通用系統”)。在最近Cloudera做的benchmark中,雖然Impala仍然一路領先,但是基于Spark的Spark SQL完全不遜色于Presto,基于Tez的Hive也不算很差,至少在并發模式下能超過Presto,足見MPP模式并不是絕對占上風的。所以這種架構上的區別在我看來并不是制勝的關鍵,至少不是唯一的因素,真正要做到快速查詢,各個方面的細節都要有所把握。后面說的都是這些細節。
核心組件不管是上面提到的那種架構,一個SQL on Hadoop系統一般都會有一些通用的核心組件,這些組件根據設計者的考慮放在不同的節點角色中,在物理上節點都按照Master/Worker的方式去做,如果Master壓力太大,一些本來適合放在Master上的組件可以放到一個輔助Master上。
UI層負責提供用戶輸入查詢的接口。一般有Web/GUI,命令行,編程方式3類。QL層負責把用戶提交的查詢解析成可以運行的執行計劃(比如MR Job)。這部分在后面會專門提到。執行層就是運行具體的Job。一般會有一個Master負責Query的運行管理,比如申請資源,觀察進度等等,同時Master也負責最終聚合局部結果到全局結果。而每個節點上會有相應的Worker做本地計算。IO層提供與存儲層交互的接口。對于HDFS來說,需要根據I/O Format把文件轉換成K/V,Serde再完成K/V到數據行的映射。對于非HDFS存儲來說就需要一些專門的handler/connector。存儲層一般是HDFS,但也有可以查詢NoSQL,或者關系數據庫的。系統另外還需要一個元數據管理服務,管理表結構等。執行計劃編譯流程
從SQL到執行計劃,大致分為5步。
第一步將SQL轉換成抽象語法樹AST。這一步一般都有第三方工具庫可以完成,比如antlr。第二步對AST進行語義分析,比如表是否存在,字段是否存在,SQL語義是否有誤(比如select中被判定為聚合的字段在group by中有沒有出現)。第三步生成邏輯執行計劃,這是一個由邏輯操作符組成的DAG。比如對于Hive來說掃表會產生TableScanOperator,聚合會產生GroupByOperator。對于類MPP系統來說,情況稍微有點不同。邏輯操作符的種類還是差不多,但是會先生成單機版本,然后生成多機版本。多機版本主要是把aggregate,join,還有top n這幾個操作并行化,比如aggregate會分成類似MR那樣的本地aggregate,shuffle和全局aggregate三步。第四步做邏輯執行計劃做優化,這步在下面單獨介紹。第五步把邏輯執行計劃轉換成可以在機器上運行的物理計劃。對于Hive來說,就是MR/Tez Job等;對于Impala來說,就是plan fragment。其他類MPP系統也是類似的概念。物理計劃中的一個計算單元(或者說Job),有“輸入,處理,輸出”三要素組成,而邏輯執行計劃中的operator相對粒度更細,一個邏輯操作符一般處于這三要素之一的角色。下面分別舉兩個例子,直觀的認識下SQL、邏輯計劃、物理計劃之間的關系,具體解釋各個operator的話會比較細碎,就不展開了。
Hive on MR:1 select count(1) from status_updates where ds = ‘2009-08-01′
Presto(引用自美團技術團隊,其中SubPlan就是物理計劃的一個計算單元):
select c1.rank, count(*)from dim.city c1 join dim.city c2 on c1.id = c2.idwhere c1.id > 10 group by c1.rank limit 10; 優化器關于執行計劃的優化,雖然不一定是整個編譯流程中最難的部分,但卻是最有看點的部分,而且目前還在不斷發展中。Spark系之所以放棄Shark另起爐灶做Spark SQL,很大一部分原因是想自己做優化策略,避免受Hive的限制,為此還專門獨立出優化器組件Catalyst(當然Spark SQL目前還是非常新,其未來發展給人不少想象空間)。總之這部分工作可以不斷的創新,優化器越智能,越傻瓜化,用戶就越能解放出來解決業務問題。
早期在Hive中只有一些簡單的規則優化,比如謂詞下推(把過濾條件盡可能的放在table scan之后就完成),操作合并(連續的filter用and合并成一個operator,連續的projection也可以合并)。后來逐漸增加了一些略復雜的規則,比如相同key的join + group by合并為1個MR,還有star schema join。在Hive 0.12引入的相關性優化(correlation optimizer)算是規則優化的一個高峰,他能夠減少數據的重復掃描,具體來說,如果查詢的兩個部分用到了相同的數據,并且各自做group by / join的時候用到了相同的key,這個時候由于數據源和shuffle的key是一樣的,所以可以把原來需要兩個job分別處理的地方合成一個job處理。
比如下面這個SQL:
SELECT sum(l_extendedprice) / 7.0 as avg_yearlyFROM (SELECT l_partkey, l_quantity, l_extendedprice FROM lineitem JOIN part ON (p_partkey=l_partkey) WHERE p_brand=‘Brand#35′ AND p_container = ‘MED PKG’)touterJOIN (SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq FROM lineitem GROUP BY l_partkey) tinnerON (touter.l_partkey = tinnter.lp)WHERE touter.l_quantity < tinner.lq這個查詢中兩次出現lineitem表,group by和兩處join用的都是l_partkey,所以本來兩個子查詢和一個join用到三個job,現在只需要用到一個job就可以完成。
但是,基于規則的優化(RBO)不能解決所有問題。在關系數據庫中早有另一種優化方式,也就是基于代價的優化CBO。CBO通過收集表的數據信息(比如字段的基數,數據分布直方圖等等)來對一些問題作出解答,其中最主要的問題就是確定多表join的順序。CBO通過搜索join順序的所有解空間(表太多的情況下可以用有限深度的貪婪算法),并且算出對應的代價,可以找到最好的順序。這些都已經在關系數據庫中得到了實踐。
目前Hive已經啟動專門的項目,也就是Apache Optiq來做這個事情,而其他系統也沒有做的很好的CBO,所以這塊內容還有很大的進步空間。
執行效率即使有了高效的執行計劃,如果在運行過程本身效率較低,那么再好的執行計劃也會大打折扣。這里主要關注CPU和IO方面的執行效率。
CPU在具體的計算執行過程中,低效的cpu會導致系統的瓶頸落在CPU上,導致IO無法充分利用。在一項針對Impala和Hive的對比時發現,Hive在某些簡單查詢上(TPC-H Query 1)也比Impala慢主要是因為Hive運行時完全處于CPU bound的狀態中,磁盤IO只有20%,而Impala的IO至少在85%。
在SQL on Hadoop中出現CPU bound的主要原因有以下幾種:
大量虛函數調用:這個問題在多處出現,比如對于a + 2 * b之類的表達式計算,解釋器會構造一個expression tree,解釋的過程就是遞歸調用子節點做evaluation的過程。又比如以DAG形式的operator/task在執行的過程中,上游節點會層層調用下游節點來獲取產生的數據。這些都會產生大量的調用。類型裝箱:由于表達式解釋器需要對不同數據類型的變量做解釋,所以在Java中需要把這些本來是primitive的變量包裝成Object,累積起來也消耗不少資源。這算是上面一個問題附帶出來的。branch instruction: 現在的CPU都是有并行流水線的,但是如果出現條件判斷會導致無法并行。這種情況可能出現在判斷數據的類型(是string還是int),或者在判斷某一列是否因為其他字段的過濾條件導致本行不需要被讀取(列存儲情況下)。cache miss:每次處理一行數據的方式導致cpu cache命中率不高。(這么說已經暗示了解決方案)針對上面的問題,目前大多數系統中已經加入了以下兩個解決辦法中至少一個。
一個方法是動態代碼生成,也就是不使用解釋性的統一代碼。比如a + 2 * b這個表達式就會生成對應的執行語言的代碼,而且可以直接用primitive type,而不是用固定的解釋性代碼。具體實現來說,JVM系的如Spark SQL,Presto可以用反射,C++系的Impala則使用了llvm生成中間碼。對于判斷數據類型造成的分支判斷,動態代碼的效果可以消除這些類型判斷,還可以展開循環,可以對比下面這段代碼,左邊是解釋性代碼,右邊是動態生成代碼。
另一個方法是vectorization(向量化),基本思路是放棄每次處理一行的模式,改用每次處理一小批數據(比如1k行),當然前提條件是使用列存儲格式。這樣一來,這一小批連續的數據可以放進cache里面,cpu不僅減少了branch instruction,甚至可以用SIMD加快處理速度。具體的實現參考下面的代碼,對一個long型的字段增加一個常量。通過把數據表示成數組,過濾條件也用selVec裝進數組,形成了很緊湊的循環:
add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec){ if (selVec == null) for (int i = 0; i < vecNum; i++) result[i] = col1[i] + col2[i]; else for (int i = 0; i < vecNum; i++) { int selIdx = selVec[i]; result[selIdx] = col1[selIdx] + col2[selIdx]; }}IO由于SQL on Hadoop存儲數據都是在HDFS上,所以IO層的優化其實大多數都是HDFS的事情,各大查詢引擎則提出需求去進行推動。要做到高效IO,一方面要低延遲,屏蔽不必要的消耗;另一方面要高吞吐,充分利用每一塊磁盤。目前與這方面有關的特性有:
short-circuit local reads:當發現讀取的數據是本地數據時,不走DataNode(因為要走一次socket連接),而是用DFS Client直接讀本地的block replica。HDFS參數是dfs.client.read.shortcircuit和dfs.domain.socket.path。zero copy:避免數據在內核buffer和用戶buffer之間反復copy,在早期的HDFS中已經有這個默認實現。disk-aware scheduling:通過知道每個block所在磁盤,可以在調度cpu資源時讓不同的cpu讀不同的磁盤,避免查詢內和查詢間的IO競爭。HDFS參數是dfs.datanode.hdfs-blocks-metadata.enabled。存儲格式對于分析類型的workload來說,最好的存儲格式自然是列存儲,這已經在關系數據庫時代得到了證明。目前hadoop生態中有兩大列存儲格式,一個是由Hortonworks和Microsoft開發的ORCFile,另一個是由Cloudera和Twitter開發的Parquet。
ORCFile顧名思義,是在RCFile的基礎之上改造的。RCFile雖然號稱列存儲,但是只是“按列存儲”而已,將數據先劃分成row group,然后row group內部按照列進行存儲。這其中沒有列存儲的一些關鍵特性,而這些特性在以前的列式數據庫中(比如我以前用過的Infobright)早已用到。好在ORCFile已經彌補了這些特性,包括:
塊過濾與塊統計:每一列按照固定行數或大小進一步切分,對于切分出來的每一個數據單元,預先計算好這些單元的min/max/sum/count/null值,min/max用于在過濾數據的時候直接跳過數據單元,而所有這些統計值則可以在做聚合操作的時候直接采用,而不必解開這個數據單元做進一步的計算。更高效的編碼方式:RCFile中沒有標注每一列的類型,事實上當知道數據類型時,可以采取特定的編碼方式,本身就能很大程度上進行數據的壓縮。常見的針對列存儲的編碼方式有RLE(大量重復數據),字典(字符串),位圖(數字且基數不大),級差(排序過的數據,比如日志中用戶訪問時間)等等。ORCFile的結構如下圖,數據先按照默認256M分為row group,也叫strip。每個strip配一個index,存放每個數據單元(默認10000行)的min/max值用于過濾;數據按照上面提到的編碼方式序列化成stream,然后再進行snappy或gz壓縮。footer提供讀取stream的位置信息,以及更多的統計值如sum/count等。尾部的file footer和post script提供全局信息,如每個strip的行數,各列數據類型,壓縮參數等。
Parquet的設計原理跟ORC類似,不過它有兩個特點:
通用性:相比ORCFile專門給Hive使用而言,Parquet不僅僅是給Impala使用,還可以給其他查詢工具使用,如Hive、Pig,進一步還能對接avro/thrift/pb等序列化格式。基于Dremel思想的嵌套格式存儲:關系數據庫設計模式中反對存儲復雜格式(違反第一范式),但是現在的大數據計算不僅出現了這種需求(半結構化數據),也能夠高效的實現存儲和查詢效率,在語法上也有相應的支持(各種UDF,Hive的lateral view等)。Google Dremel就在實現層面做出了范例,Parquet則完全仿照了Dremel。對嵌套格式做列存儲的難點在于,存儲時需要標記某個數據對應于哪一個存儲結構,或者說是哪條記錄,所以需要用數據清楚的進行標記。 在Dremel中提出用definition level和repetition level來進行標記。definition level指的是,這條記錄在嵌套結構中所處于第幾層,而repetition level指的是,這條記錄相對上一條記錄,在第幾層重復。比如下圖是一個二級嵌套數組。圖中的e跟f在都屬于第二層的重復記錄(同一個level2),所以f的r值為2,而c跟d則是不同的level2,但屬于同一個level1,所以d的r值為1。對于頂層而言(新的一個嵌套結構),r值就為0。
但是僅僅這樣還不夠。上圖說明了r值的作用,但是還沒有說明d值的作用,因為按照字面解釋,d值對于每一個字段都是可以根據schema得到的,那為什么還要從行記錄級別標記?這是因為記錄中會插入一些null值,這些null值代表著他們“可以存在”但是因為是repeated或者是optional所以沒有值的情況,null值是用來占位的(或者說是“想象”出來的),所以他們的值需要單獨計算。null的d值就是說這個結構往上追溯到哪一層(不包括平級)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一個null在code = en所在的結構里面,那么language不是null(不考慮code,他跟country平級),他就是第二層;又比如country的第二個null在url = http://B 所在的結構里面,那么name不是null(不考慮url,因為他跟本來就是null的language平級),所以就是第一層。
通過這種方式,就對一個樹狀的嵌套格式完成了存儲。在讀取的時候可以通過構造一個狀態機進行遍歷。
有意思的是,雖然parquet支持嵌套格式,但是Impala還沒有來得及像Hive那樣增加array,map,struct等復雜格式,當然這項功能已經被列入roadmap了,相信不久就會出現。
在最近我們做的Impala2.0測試中,順便測試了存儲格式的影響。parquet相比sequencefile在壓縮比上達到1:5,查詢性能也相差5-10倍,足見列存儲一項就給查詢引擎帶來的提升。
資源控制運行時資源調整對于一個MR Job,reduce task的數量一直是需要人為估算的一個麻煩事,基于MR的Hive也只是根據數據源大小粗略的做估計,不考慮具體的Job邏輯。但是在之后的框架中考慮到了這個情況,增加了運行時調整資源分配的功能。Tez中引入了vertex manager,可以根據運行時收集到的數據智能的判斷reduce動作需要的task。類似的功能在TAJO中也有提到,叫progressive query optimization,而且TAJO不僅能做到調整task數量,還能調整join順序。
資源集成在Hadoop已經進入2.x的時代,所有想要得到廣泛應用的SQL on Hadoop系統勢必要能與YARN進行集成。雖然這是一個有利于資源合理利用的好事,但是由于加入了YARN這一層,卻給系統的性能帶來了一定的障礙,因為啟動AppMaster和申請container也會占用不少時間,尤其是前者,而且container的供應如果時斷時續,那么會極大的影響時效性。在Tez和Impala中對這些問題給出了相應的解決辦法:
AppMaster啟動延遲的問題,采取long lived app master,AppMaster啟動后長期駐守,而非像是MR那樣one AM per Job。具體實現時,可以給fair scheduler或capacity scheduler配置的每個隊列配上一個AM池,有一定量的AM為提交給這個隊列的任務服務。container供應的問題,在Tez中采取了container復用的方式,有點像jvm復用,即container用完以后不馬上釋放,等一段時間,實在是沒合適的task來接班了再釋放,這樣不僅減少container斷供的可能,而且可以把上一個task留下的結果cache住給下一個task復用,比如做map join;Impala則采取比較激進的方式,一次性等所有的container分配到位了才開始執行查詢,這種方式也能讓它的流水線式的計算不至于阻塞。其他到這里為止,已經從上到下順了一遍各個層面用到的技術,當然SQL on Hadoop本身就相當復雜,涉及到方方面面,時間精力有限不可能一一去琢磨。比如其他一些具有技術復雜度的功能有:
多數據源查詢:Presto支持從MySQL,Cassandra,甚至Kafka中去讀取數據,這就大大減少了數據整合時間,不需要放到HDFS里才能查詢。Impala和Hive也支持查詢HBase。國內也有類似的工作,如秒針改造Impala使之能查詢postgres。近似查詢:count distinct(基數估計)一直是SQL性能殺手之一,如果能接受一定誤差的話可以采用近似算法。Impala中已經實現了近似算法(ndv),Presto則是請BlinkDB合作完成。兩者都是采用了HyperLogLog Counting。當然,不僅僅是count distinct可以使用近似算法,其他的如取中位數之類的也可以用。結束語盡管現在相關系統已經很多,也經過了幾年的發展,但是目前各家系統仍然在不斷的進行完善,比如:
增加分析函數,復雜數據類型,SQL語法集的擴展。對于已經成形的技術也在不斷的改進,如列存儲還可以增加更多的encoding方式。甚至對于像CBO這樣的領域,開源界拿出來的東西還算是剛剛起步,相比HAWQ中的ORCA這種商業系統提供的優化器還差的很多。畢竟相比已經比較成熟的關系數據庫,分布式環境下需要解決的問題更多,未來一定還會出現很多精彩的技術實踐,讓我們在海量數據中更快更方便的查到想要的數據。