大數(shù)據(jù)時代給傳統(tǒng)數(shù)據(jù)倉庫架構(gòu)帶來了一系列的沖擊和挑戰(zhàn),僅從源數(shù)據(jù)采集和存儲層面,就讓倉儲的構(gòu)建者不得不重新認(rèn)真地思考:數(shù)據(jù)在哪里?這個問題的答案改寫了企業(yè)數(shù)據(jù)倉庫對數(shù)據(jù)源的既有定義,同時也傳遞了兩方面的焦慮,一是數(shù)據(jù)規(guī)模急速增長,現(xiàn)有的單節(jié)點或者共享磁盤架構(gòu)能否適應(yīng)海量數(shù)據(jù)的存儲;二是數(shù)據(jù)結(jié)構(gòu)復(fù)雜多樣,現(xiàn)有的基于結(jié)構(gòu)化數(shù)據(jù)為主體的存儲方案能否兼容無模式的非結(jié)構(gòu)化數(shù)據(jù)。
面對企業(yè)大數(shù)據(jù)的挑戰(zhàn),用友華表作為一家提供商業(yè)分析產(chǎn)品的供應(yīng)商,在技術(shù)上我們將如何解決大數(shù)據(jù)的問題呢?目前面對大數(shù)據(jù)給現(xiàn)有倉庫存儲架構(gòu)帶來的量的沖擊和數(shù)據(jù)種類增加的挑戰(zhàn),不同的公司會選擇不同的技術(shù)路線,我們最初試圖通過一個大而全的存儲架構(gòu)來解決海量數(shù)據(jù)和多種數(shù)據(jù)類型的問題。但結(jié)過一段時間反復(fù)研究,我們認(rèn)為大而全的存儲架構(gòu)不是解決大數(shù)據(jù)的最佳方案,我們目前決定采用的技術(shù)路線是讓不同種類的數(shù)據(jù)存儲在最適合他們的存儲系統(tǒng)里,然后再將不同的數(shù)據(jù)類型進行融合,企業(yè)在融合的數(shù)據(jù)基礎(chǔ)上做商業(yè)分析。
本文我將從用友華表的技術(shù)思路、存儲方案、存儲之后的數(shù)據(jù)如何融合三個層次來闡述我們?nèi)绾螒?yīng)對大數(shù)據(jù)的挑戰(zhàn)。
分而治之 三面突圍
第一,有“容”乃大。“容”,即有足夠的容量來存儲數(shù)據(jù)。對于大規(guī)模數(shù)據(jù),我們將采用分而治之的思想,構(gòu)建分布式存儲系統(tǒng),并且做到易擴展。保證系統(tǒng)可以方便的增加節(jié)點,當(dāng)企業(yè)的數(shù)據(jù)快速增加時,可以使數(shù)據(jù)分布始終保持在平衡狀態(tài);
第二,有“榮”乃大。即兼用多種存儲引擎。大數(shù)據(jù)因結(jié)構(gòu)復(fù)雜多樣使得數(shù)據(jù)倉庫要采集的源數(shù)據(jù)種類無比“繁榮”,因此新的倉儲架構(gòu)也要改變目前以結(jié)構(gòu)化為主體的單一存儲方案的現(xiàn)狀,針對每種數(shù)據(jù)的存儲特點選擇最合適的解決方案:對非結(jié)構(gòu)化數(shù)據(jù)采用分布式文件系統(tǒng)進行存儲,對結(jié)構(gòu)松散無模式的半結(jié)構(gòu)化數(shù)據(jù)采用面向文檔的分布式key/value存儲引擎,對海量的結(jié)構(gòu)化數(shù)據(jù)采用shared-nothing的分布式并行數(shù)據(jù)庫系統(tǒng)存儲;
第三,有“融”乃大。如上所述可以兼用多種分布式存儲引擎來解決“容”和“榮”的挑戰(zhàn),但企業(yè)存儲多元化數(shù)據(jù)的一個重要目標(biāo)是集成分析,而多種類型數(shù)據(jù)孤立存儲對后續(xù)的集成分析會帶來極大不便。因此我們還需要構(gòu)建分布式數(shù)據(jù)庫系統(tǒng)和分布式文件系統(tǒng)之間的連接器,使得非結(jié)構(gòu)化數(shù)據(jù)在處理成結(jié)構(gòu)化信息后,能方便的和分布式數(shù)據(jù)庫中的關(guān)系型數(shù)據(jù)快速融通,保證大數(shù)據(jù)分析的敏捷性。
存儲方案各不同
上面提到針對大數(shù)據(jù)規(guī)模大、種類多的特點,我們可以采用“容”、“榮”的方案,兼用多種分布式存儲引擎分而治之。那么我們就拿非結(jié)構(gòu)化、半結(jié)構(gòu)化和結(jié)構(gòu)化這三大類數(shù)據(jù)的存儲方案分別舉例說明,以便讓大家更清楚的了解到不同類型的海量數(shù)據(jù)通常都是通過哪些方式來進行存儲的。由于談到的都是業(yè)界普遍使用的開源或商業(yè)方案,因此不做深入討論。
首先,適合存儲海量非結(jié)構(gòu)化數(shù)據(jù)的分布式文件系統(tǒng)。
HDFS(Hadoop Distributed File System),是鼎鼎大名的開源項目Hadoop的家族成員,是谷歌文件系統(tǒng)GFS(Google File System)的開源實現(xiàn)。HDFS將大規(guī)模數(shù)據(jù)分割為多個64兆字節(jié)的數(shù)據(jù)塊,存儲在多個數(shù)據(jù)節(jié)點組成的分布式集群中,隨著數(shù)據(jù)規(guī)模的不斷增長,只需要在集群中增加更多的數(shù)據(jù)節(jié)點即可,因此具有很強的可擴展性;同時每個數(shù)據(jù)塊會在不同的節(jié)點中存儲3個副本,因此具有高容錯性;因為數(shù)據(jù)是分布式存儲的,因此可以提供高吞吐量的數(shù)據(jù)訪問能力,在海量數(shù)據(jù)批處理方面有很強的性能表現(xiàn)。
其次,適合存儲海量無模式的半結(jié)構(gòu)化數(shù)據(jù)的分布式Key/Value存儲引擎。
HBase(Hadoop Database),也是開源項目Hadoop的家族成員,是谷歌大表Bigtable的開源實現(xiàn)。HBase是一個高可靠性、高性能、面向列、可伸縮的分布式存儲系統(tǒng),它不同于一般的有模式的關(guān)系型數(shù)據(jù)庫,HBase存儲的數(shù)據(jù)表是無模式的,特別適合結(jié)構(gòu)復(fù)雜多樣的半結(jié)構(gòu)化數(shù)據(jù)存儲。此外,HBase利用HDFS作為其文件存儲系統(tǒng),利用MapReduce技術(shù)來處理HBase中的海量數(shù)據(jù)。
第三,適合存儲海量結(jié)構(gòu)化數(shù)據(jù)的分布式并行數(shù)據(jù)庫系統(tǒng)。
Greenplum是基于PostgreSQL開發(fā)的一款MPP(海量并行處理)架構(gòu)的、shared-nothing無共享的分布式并行數(shù)據(jù)庫系統(tǒng)。采用Master/Slave架構(gòu),Master只存儲元數(shù)據(jù),真正的用戶數(shù)據(jù)被散列存儲在多臺Slave服務(wù)器上,并且所有的數(shù)據(jù)都在其它Slave節(jié)點上存有副本,從而提高了系統(tǒng)可用性。
Greenplum最核心的技術(shù)就是,大表數(shù)據(jù)分片存儲,可以應(yīng)對海量數(shù)據(jù);基于大表的查詢語句在經(jīng)過Master分析后可以分片發(fā)送到Slave節(jié)點進行并行運行,所有節(jié)點將中間結(jié)果返回給Master節(jié)點,由Master進行匯總后返回給客戶端,大大提高了SQL的運行速度。
“三融合一”——Xnet數(shù)據(jù)交換網(wǎng)絡(luò)
各種復(fù)雜而大量的數(shù)據(jù)猶如一張立體的大網(wǎng),三類數(shù)據(jù)是網(wǎng)里三種不同的結(jié)點,前面提到的三類分布式存儲引擎可以將不同的結(jié)點有序的安排在網(wǎng)上,并且每種相同的結(jié)點都可以直接用線相互連接起來。但此時只是三個孤立的面,就如同三類數(shù)據(jù)間存在的孤島。若要把這三個面也相互連接起來,形成一張可以從點到面,從點到點,從面到面的大網(wǎng),則需要構(gòu)建一個方便、快速的數(shù)據(jù)交換組件,它是一個連接器,可以實現(xiàn)“三融合一”,滿足大數(shù)據(jù)存儲有“融”乃大的特性。
下面先介紹一下數(shù)據(jù)交換網(wǎng)絡(luò)Xnet(Exchange Net)的一些基本構(gòu)思,它是一個可以完成分布式文件系統(tǒng)和分布式數(shù)據(jù)庫之間海量數(shù)據(jù)快速交換的組件。
上圖僅是一個簡化的邏輯圖,在實際的物理部署中,HDFS集群和并行數(shù)據(jù)庫集群共用一個服務(wù)器集群,即在服務(wù)器集群的每個節(jié)點上既有HDFS數(shù)據(jù)節(jié)點也有并行數(shù)據(jù)庫的數(shù)據(jù)庫單實例。處于中間融通兩方數(shù)據(jù)的藍色部分就是我們本節(jié)要探討的分布式、可并行運行的高速連接器Xnet。
Xnet實際就是運行在Hadoop集群上的一系列Mapreduce任務(wù),它要完成從HDFS讀取源數(shù)據(jù)、處理中間結(jié)果集、最后寫入分布式數(shù)據(jù)庫的若干作業(yè),這些作業(yè)對調(diào)用者而言是完全透明的,僅需要配置簡單的業(yè)務(wù)信息,調(diào)用Xnet就可自動完成:
源數(shù)據(jù)文件:需要同步至分布式數(shù)據(jù)庫的HDFS文件
文件頭:源數(shù)據(jù)的列分隔符、以及與目標(biāo)表列名的映射
目標(biāo)表:要寫入的數(shù)據(jù)庫表
過濾條件:選擇寫入的數(shù)據(jù)行條件
散列鍵:根據(jù)散列鍵的值和哈希算法確定數(shù)據(jù)要寫入分布式數(shù)據(jù)庫的節(jié)點,對數(shù)據(jù)進行分片保證數(shù)據(jù)均衡分布
Xnet組件的主要功能設(shè)計如下:
第一, 拆分列表。將源數(shù)據(jù)文件符合過濾條件的面向行的記錄拆分為多個列表,并存入如下的目錄結(jié)構(gòu)中:
一級目錄為表名,二級目錄為列名,這樣方便后續(xù)的基于列的數(shù)據(jù)傳輸、裝載以及基于列的統(tǒng)計信息采集,其Mapreduce過程可以模擬如下:
Map階段:
a、源數(shù)據(jù)文件被splitable接口分割為多個數(shù)據(jù)段,對每個段Jobtracker會啟動一個Mapper檢索每一行記錄,根據(jù)Xnet的配置信息得到符合過濾條件的記錄
b、根據(jù)Xnet配置信息得到每條記錄的散列鍵,結(jié)合哈希算法計算出該記錄的散列值HK
c、根據(jù)Mapper處理的數(shù)據(jù)段號和當(dāng)前處理的行號產(chǎn)生一個行標(biāo)識RID
d、Mapper結(jié)合Xnet的配置信息對當(dāng)前記錄的每一列都產(chǎn)生輸出,數(shù)據(jù)格式為HK:(Cn;RID;Cv),其中HK為散列值,Cn為列名,RID為行標(biāo)識,Cv為列值
Reduce階段:
a、Reducer遠程讀取Mapper產(chǎn)生的中間數(shù)據(jù)集,通過實現(xiàn)定制化的MultipleOutputFormat接口并根據(jù)記錄中Cn即列名,將記錄寫入如上的樹結(jié)構(gòu)目錄中,數(shù)據(jù)格式為HK:(RID;Cv)。
第二,散列列表。將拆分得到的列表數(shù)據(jù)文件根據(jù)每條記錄的散列值HK匯聚到相應(yīng)的數(shù)據(jù)庫實例節(jié)點,進行排序等操作,并行加載入分布式數(shù)據(jù)庫中,其Mapreduce過程模擬如下:
Map階段:
a、Mapper從對應(yīng)的二級目錄結(jié)構(gòu)中讀取每個列表的數(shù)據(jù)文件
Reduce階段:
a、Mapper產(chǎn)生的中間數(shù)據(jù)集根據(jù)散列值HK進行Partition匯聚到不同的Reducer進程
b、Reducer對數(shù)據(jù)集按照列值即Cv進行排序操作,并通過實現(xiàn)定制化的MultipleOutputFormat接口將數(shù)據(jù)通過分布式數(shù)據(jù)庫的裝載接口寫入相應(yīng)的數(shù)據(jù)庫實例中
第三,統(tǒng)計信息。該功能主要是以單位列表為對象進行全量或者抽樣計算,產(chǎn)生列級統(tǒng)計信息,利用Xnet的數(shù)據(jù)交換過程進行統(tǒng)計信息采集,減輕分布式數(shù)據(jù)庫后續(xù)分析表的相關(guān)運算。這些統(tǒng)計信息有助于分布式數(shù)據(jù)庫的查詢引擎做出最合理的執(zhí)行計劃,提高用戶的數(shù)據(jù)分析效率。
我們將統(tǒng)計信息采集的操作實現(xiàn)為獨立的算子,嵌入到Xnet的數(shù)據(jù)交換過程中,保證代碼的獨立性,可以方便的控制何時進行統(tǒng)計信息的采集。主要算子描述如下:
StatisticsGather
簡單的聚集統(tǒng)計采集,如空值數(shù)、記錄數(shù),最大最小值等
HistogramGather
等高直方圖、常用值統(tǒng)計等
SampleGather
數(shù)據(jù)采樣算子,在采樣統(tǒng)計情景下,將命中的記錄傳給其它兩個算子進行統(tǒng)計
下面通過舉例看看統(tǒng)計信息采集的處理過程。
首先,準(zhǔn)備階段。Mapreduce過程要對輸入的數(shù)據(jù)進行分割split操作,在統(tǒng)計信息的采集過程中不用對所有的splits都進行統(tǒng)計分析,每次采集任務(wù)只選擇指定數(shù)量的split進行。這些數(shù)量值需要在統(tǒng)計信息任務(wù)里進行配置,如采樣的splits數(shù)量num_sample_splits,采樣的記錄數(shù)num_sample_records。
采樣的選擇方法:
如果num_sample_splits大于splits總數(shù),那么所有splits都需要進行采樣,每個split采樣的記錄數(shù)為num_sample_records / total_splits
如果num_sample_splits小于splits總數(shù),計算splits挑選的間隔sample_step = total_splits / num_sample_splits,按照sample_step的間隔從splits列表中選出
num_sample_splits個split,每個split需要采樣的任務(wù)數(shù)為num_sample_records / num_sample_splits
其次,采樣算子。在海量數(shù)據(jù)中,對每個字段統(tǒng)計一個精確的直方圖信息的代價太大,而且也沒有意義。因此采用采樣的手段來統(tǒng)計每個字段的直方圖信息比較可取,下面描述兩種采樣方式,如果能清楚知道每個分割文件的記錄數(shù)目那么直接采用采樣算法Algorithm S或者Algorithm R是最好的,具體采樣算法這里不做討論。
順序讀取:SampleGather接收需要處理的記錄,如果達到配置的采樣數(shù)目則跳過不做任何操作;如果沒有達到則傳遞給HistogramGather進行統(tǒng)計分析
估值讀取:SampleGather統(tǒng)計前n條記錄的平均長度,根據(jù)平均長度和當(dāng)前分割文件的總長度,估算當(dāng)前包含的總記錄數(shù)。有了總記錄數(shù)N就可進行采樣算法如Algorithm S或者Algorithm R,將采樣命中的記錄傳遞給HistogramGather進行統(tǒng)計分析
第三,MR流程。在此模擬統(tǒng)計信息采集的Mapreduce過程,包括字段記錄總數(shù)、空值記錄比例、常用值統(tǒng)計和等高直方圖統(tǒng)計。
Map階段:
a、Mapper從文本文件中解析得到的一行記錄,并傳遞給StatisticsGather和SampleGather
b、StatisticsGather的處理:統(tǒng)計記錄數(shù)num_all加1,如果該字段同時為空則num_null加1
c、SampleGather進行采樣后,將采樣的記錄傳遞給HistogramGather進行直方圖統(tǒng)計(在Map階段直方圖統(tǒng)計包括常用值統(tǒng)計和等高直方圖統(tǒng)計)
d、常用值統(tǒng)計:將該字段放入HashTable進行去重操作,統(tǒng)計該字段的重復(fù)類;HashTable中按照值出現(xiàn)的次數(shù)排序,并且保證HashTable的大小在指定的范圍內(nèi),比如250個Key
e、等高直方圖統(tǒng)計:直接將所有采樣到的所有記錄輸出到Reduce階段,因為等高直方圖需要一個全局的采樣統(tǒng)計
Map階段輸出:
a、StatisticsGather輸出:該split中包含的總記錄數(shù)num_all,該字段的空值數(shù)量num_null
b、HistogramGather輸出:常用值(MCV)統(tǒng)計輸出,HashTable中key的數(shù)目,key的列表,以及每個key在split中占有的數(shù)量;等高直方圖統(tǒng)計輸出,采樣得到的所有記錄
Reduce階段:
a、StatisticsGather輸出:累加每個Map輸出的num_all,得到總記錄數(shù);累加每個Map輸出的字段的空值數(shù)量,并與總記錄數(shù)相除,得到空值比例
b、HistogramGather的常用值統(tǒng)計輸出:利用Mapper和Reducer的Sort機制,對常用值出現(xiàn)次數(shù)進行累加統(tǒng)計。基于常用值統(tǒng)計可以進一步產(chǎn)生更多的統(tǒng)計信息,如遍歷常用值的HashTable,統(tǒng)計出現(xiàn)次數(shù)大于1的鍵值,如果該值為0,則說明該字段具有唯一性。還可以利用常用值輸出計算該列的離散度等,這里不再贅述。
c、HistogramGather的等高直方圖統(tǒng)計輸出:利用Mapper和Reducer的Sort機制,將每個Mapper輸出的采樣記錄進行排序。根據(jù)配置的直方圖箱個數(shù),將這些值分到不同的bin中,記錄每個bin的起始值和結(jié)束值。輸出等高的直方圖。(如下圖所示,得到等高直方圖的例子)
上圖中的直方圖(根據(jù)紅線分割bin)可以記錄為:0 - B;1-C;2 - C;3 - E;