精品国产一级在线观看,国产成人综合久久精品亚洲,免费一级欧美大片在线观看

當前位置:大數據業界動態 → 正文

從WordCount看Spark大數據處理的核心機制

責任編輯:editor005 |來源:企業網D1Net  2015-06-02 14:06:38 本文摘自:優雅程序員

大數據處理肯定是分布式的了,那就面臨著幾個核心問題:可擴展性,負載均衡,容錯處理。Spark是如何處理這些問題的呢?接著上一篇的“動手寫WordCount”,今天要做的就是透過這個大數據界的HelloWorld來看看Spark隱藏了哪些魔法。

請各位看官,帶著分布式的問題往下看。

分布式架構

大數據時代,單機裝下PB級的數據,然后在可接受的時間內處理完,不可能,所以一定是分布式的。

分布式存儲

HDFS(Hadoop Distributed File System)是最常見的,和Spark配合的分布式存儲系統。HDFS的存儲結構如下圖

每個文件被分成固定大小的塊,而塊作為最小的存儲單位放到眾多服務器上。那一旦存某個塊的機器掛了,不是整個文件就洗白了嗎?HDFS當然不會這么傻,文件的每個塊都有備份,默認情況下一個塊會存3份,分到不同的服務器。這樣一來,除非某個塊涉及的三臺服務器全掛,否則不用擔心。在合理分布3個塊的情況下,三臺服務器全掛的可能性比中500萬還低。下面是/file.txt有三個文件塊的情況。

NN是Name Node,存儲文件塊放在哪兒等元信息。DN是Data Node,用來存放具體的文件塊。

分布式處理

有一類系統數據是分布式存儲,但是處理卻集中在一起。比如Mysql分庫分表存數據,然后在某個服務器上,挨個獲取所有庫所有表的數據進行處理,這種系統的本質還是“數據分發到計算邏輯側”,它的性能瓶頸就在于做數據處理的那臺服務器。

而分布式處理的核心觀念在于“把計算邏輯分發到數據側”,有兩大優點:

計算邏輯分發明顯比數據分發節省網絡帶寬,而網絡帶寬是分布式系統中最寶貴的資源

計算邏輯在數據側執行,消除了集中式處理中計算邏輯側的性能瓶頸

Spark + HDFS的運行架構如下:

Driver是程序開始運行的地方,也是總控,它把計算邏輯(閉包的實例)發送到有數據塊的Slave上去執行,結果再收回去匯總。

是不是看出來了?

數據更多了,加機器唄,機器多了磁盤多,磁盤多了存的多。

跑的慢了,加機器唄,機器多了磁盤多,并行加載起來,數據吐吞量大。機器多了,內存CPU也多,并行處理起來,數據吞吐量大。

提示: 分布式處理系統會把計算邏輯分發到數據側,極大提高系統的水平擴展性。

WordCount運行機制

講了一堆理論知識,為了讓各位看官透徹理解,也為Spark程序算法優化打下堅實的基礎,我們拿WordCount來舉例說明,順便說說負載均衡。

額。。。還沒看“動手寫WordCount”的兄弟姐妹們,建議先去看看。

數據位置感知

下面是WordCount的業務邏輯代碼:

val file = "hdfs://127.0.0.1:9000/file.txt"

val lines = sc.textFile(file)

val words = lines.flatMap(line => line.split("\s+"))

val wordCount = words.countByValue()

lines是Spark的RDD,它包含了在哪些機器上有file文件的塊,信息是從HDFS來的。每文件塊映射到RDD上就是一個分區,對的,沒看錯。如果一個文件塊128MB,那么HDFS上一個1GB大小的文件就有8個文件塊,由這個文件創建的RDD就會有8個分區。

之前說了,在HDFS上每個文件塊默認會有3份,那RDD的分區選擇了那一份呢?對滴,根據負載選擇服務器負載最低的那一份。負載自動均衡了吧。

計算邏輯分發

有了這些信息,我們就知道把后續的計算邏輯該分發到哪兒去。

首先,我們得說清楚什么是計算邏輯,各位看官們想一下,類方法里面的代碼是如何運行的。充分必要條件:方法代碼 + 類實例(對象)的狀態。似成相識吧,程序 = 算法 + 數據。算法在代碼中,數據在對象的狀態中。

Spark要分發計算邏輯,也是分了兩部分。

第一部分是代碼。為什么spark-submit執行一開始,總是一堆jar包被分發,原因就在這兒。

第二部分是類實例。類在哪兒?作為RDD各API參數的閉包。

val words = lines.flatMap(line => line.split("\s+"))

flatMap的參數 **_.split("s+")** 是閉包,閉包是引用了外部自由變量的函數,在Scala中是由匿名類實現的。更多信息,請小伙伴們GFSOSO哈。

上面的一行代碼中,Spark要分發的實例就是 **_.split("s+")** 的實例。

val wordCount = words.countByValue()

實際上RDD的API countByValue 也有需要分發的閉包實例,只是都在Spark的源碼中,讓一碼給大家整理到明面上來哈。

val wordCount = words

.mapPartitions(convertWordsInPartitionToWordCountMap)

.reduce(mergeMaps)

前面我們提到了RDD的分區,mapPartitions會方法中的邏輯放到RDD的每個分區上執行,注意是遠程在Slave上執行的哈。而reduce是在把每個分區的結果拿到Driver后,對結果進行兩兩合并,最終得到結果。

WordCount分布式運行原理

先仔細看圖,相信不用下面的解釋,各位看官也能看懂了。(上面的圖是張巨高清的圖,手機上看不清,建議轉發文章到郵箱,然后到電腦上看,看懂這張圖,就真的把WordCount分布式運行的機制搞懂了。)

對于WordCount而言,分布式在每個Slave的每個分區上,統計本分區內的單詞計數,生成一個Map,然后將它傳回給Driver,再由Driver兩兩合并來自各個分區的所有Map,形成最終的單詞計數。

今天我們不僅說清楚了WordCount背后的分布式運行機制,而且解釋了Spark的水平擴展能力,以及負載均衡。

下一篇將透過WordCount來看重中之重的容錯處理,這涉及到Spark的應用場景與RDD的設計來源,可以毫不夸張地說,這才是Spark的精髓。

提示匯總

分布式處理系統會把計算邏輯分發到數據側,極大提高系統的水平擴展性。

關鍵字:Spark元信息位置感知

本文摘自:優雅程序員

x 從WordCount看Spark大數據處理的核心機制 掃一掃
分享本文到朋友圈
當前位置:大數據業界動態 → 正文

從WordCount看Spark大數據處理的核心機制

責任編輯:editor005 |來源:企業網D1Net  2015-06-02 14:06:38 本文摘自:優雅程序員

大數據處理肯定是分布式的了,那就面臨著幾個核心問題:可擴展性,負載均衡,容錯處理。Spark是如何處理這些問題的呢?接著上一篇的“動手寫WordCount”,今天要做的就是透過這個大數據界的HelloWorld來看看Spark隱藏了哪些魔法。

請各位看官,帶著分布式的問題往下看。

分布式架構

大數據時代,單機裝下PB級的數據,然后在可接受的時間內處理完,不可能,所以一定是分布式的。

分布式存儲

HDFS(Hadoop Distributed File System)是最常見的,和Spark配合的分布式存儲系統。HDFS的存儲結構如下圖

每個文件被分成固定大小的塊,而塊作為最小的存儲單位放到眾多服務器上。那一旦存某個塊的機器掛了,不是整個文件就洗白了嗎?HDFS當然不會這么傻,文件的每個塊都有備份,默認情況下一個塊會存3份,分到不同的服務器。這樣一來,除非某個塊涉及的三臺服務器全掛,否則不用擔心。在合理分布3個塊的情況下,三臺服務器全掛的可能性比中500萬還低。下面是/file.txt有三個文件塊的情況。

NN是Name Node,存儲文件塊放在哪兒等元信息。DN是Data Node,用來存放具體的文件塊。

分布式處理

有一類系統數據是分布式存儲,但是處理卻集中在一起。比如Mysql分庫分表存數據,然后在某個服務器上,挨個獲取所有庫所有表的數據進行處理,這種系統的本質還是“數據分發到計算邏輯側”,它的性能瓶頸就在于做數據處理的那臺服務器。

而分布式處理的核心觀念在于“把計算邏輯分發到數據側”,有兩大優點:

計算邏輯分發明顯比數據分發節省網絡帶寬,而網絡帶寬是分布式系統中最寶貴的資源

計算邏輯在數據側執行,消除了集中式處理中計算邏輯側的性能瓶頸

Spark + HDFS的運行架構如下:

Driver是程序開始運行的地方,也是總控,它把計算邏輯(閉包的實例)發送到有數據塊的Slave上去執行,結果再收回去匯總。

是不是看出來了?

數據更多了,加機器唄,機器多了磁盤多,磁盤多了存的多。

跑的慢了,加機器唄,機器多了磁盤多,并行加載起來,數據吐吞量大。機器多了,內存CPU也多,并行處理起來,數據吞吐量大。

提示: 分布式處理系統會把計算邏輯分發到數據側,極大提高系統的水平擴展性。

WordCount運行機制

講了一堆理論知識,為了讓各位看官透徹理解,也為Spark程序算法優化打下堅實的基礎,我們拿WordCount來舉例說明,順便說說負載均衡。

額。。。還沒看“動手寫WordCount”的兄弟姐妹們,建議先去看看。

數據位置感知

下面是WordCount的業務邏輯代碼:

val file = "hdfs://127.0.0.1:9000/file.txt"

val lines = sc.textFile(file)

val words = lines.flatMap(line => line.split("\s+"))

val wordCount = words.countByValue()

lines是Spark的RDD,它包含了在哪些機器上有file文件的塊,信息是從HDFS來的。每文件塊映射到RDD上就是一個分區,對的,沒看錯。如果一個文件塊128MB,那么HDFS上一個1GB大小的文件就有8個文件塊,由這個文件創建的RDD就會有8個分區。

之前說了,在HDFS上每個文件塊默認會有3份,那RDD的分區選擇了那一份呢?對滴,根據負載選擇服務器負載最低的那一份。負載自動均衡了吧。

計算邏輯分發

有了這些信息,我們就知道把后續的計算邏輯該分發到哪兒去。

首先,我們得說清楚什么是計算邏輯,各位看官們想一下,類方法里面的代碼是如何運行的。充分必要條件:方法代碼 + 類實例(對象)的狀態。似成相識吧,程序 = 算法 + 數據。算法在代碼中,數據在對象的狀態中。

Spark要分發計算邏輯,也是分了兩部分。

第一部分是代碼。為什么spark-submit執行一開始,總是一堆jar包被分發,原因就在這兒。

第二部分是類實例。類在哪兒?作為RDD各API參數的閉包。

val words = lines.flatMap(line => line.split("\s+"))

flatMap的參數 **_.split("s+")** 是閉包,閉包是引用了外部自由變量的函數,在Scala中是由匿名類實現的。更多信息,請小伙伴們GFSOSO哈。

上面的一行代碼中,Spark要分發的實例就是 **_.split("s+")** 的實例。

val wordCount = words.countByValue()

實際上RDD的API countByValue 也有需要分發的閉包實例,只是都在Spark的源碼中,讓一碼給大家整理到明面上來哈。

val wordCount = words

.mapPartitions(convertWordsInPartitionToWordCountMap)

.reduce(mergeMaps)

前面我們提到了RDD的分區,mapPartitions會方法中的邏輯放到RDD的每個分區上執行,注意是遠程在Slave上執行的哈。而reduce是在把每個分區的結果拿到Driver后,對結果進行兩兩合并,最終得到結果。

WordCount分布式運行原理

先仔細看圖,相信不用下面的解釋,各位看官也能看懂了。(上面的圖是張巨高清的圖,手機上看不清,建議轉發文章到郵箱,然后到電腦上看,看懂這張圖,就真的把WordCount分布式運行的機制搞懂了。)

對于WordCount而言,分布式在每個Slave的每個分區上,統計本分區內的單詞計數,生成一個Map,然后將它傳回給Driver,再由Driver兩兩合并來自各個分區的所有Map,形成最終的單詞計數。

今天我們不僅說清楚了WordCount背后的分布式運行機制,而且解釋了Spark的水平擴展能力,以及負載均衡。

下一篇將透過WordCount來看重中之重的容錯處理,這涉及到Spark的應用場景與RDD的設計來源,可以毫不夸張地說,這才是Spark的精髓。

提示匯總

分布式處理系統會把計算邏輯分發到數據側,極大提高系統的水平擴展性。

關鍵字:Spark元信息位置感知

本文摘自:優雅程序員

電子周刊
回到頂部

關于我們聯系我們版權聲明隱私條款廣告服務友情鏈接投稿中心招賢納士

企業網版權所有 ©2010-2024 京ICP備09108050號-6 京公網安備 11010502049343號

^
  • <menuitem id="jw4sk"></menuitem>

    1. <form id="jw4sk"><tbody id="jw4sk"><dfn id="jw4sk"></dfn></tbody></form>
      主站蜘蛛池模板: 桓台县| 吉安市| 缙云县| 凤台县| 同江市| 涟水县| 改则县| 五家渠市| 长治市| 馆陶县| 县级市| 防城港市| 白山市| 兴仁县| 海伦市| 吉首市| 正定县| 扶绥县| 额敏县| 绍兴县| 乳山市| 怀化市| 惠东县| 钟山县| 湖北省| 平顶山市| 高碑店市| 微山县| 长岭县| 民和| 定边县| 翁源县| 宁波市| 噶尔县| 台州市| 左贡县| 耿马| 永善县| 安平县| 苏尼特左旗| 原平市|