今天主要是以一個數據分析者的角度來與大家分享如何使用spark進行大數據分析。
我將分以下4部分為大家進行介紹。首先介紹spark的相關背景,包括基本概念以及spark與hadoop的關系。接下來介紹如何使用spark RDD進行數據分析。之后分享spark與大數據分析的關系,以及spark在大數據分析中所起到的作用。最后,為大家分享一下我與四位小伙伴基于去年的SODA開放的交通數據做的案例:大型活動大規模人群的檢測與疏散。
spark是一個快速易用的大規模數據計算框架,具有速度快、易使用、功能全的特點,并且可以與Hadoop很好地集成。
那么我們什么時候需要使用spark呢?首先,當我們需要處理的數據量超過了單機尺度(比如我們的計算機有4GB的內存,而我們需要處理100GB以上的數據)這時我們可以選擇spark集群進行計算。有時我們可能需要處理的數據量并不大,但是計算很復雜,需要大量的時間,這時我們也可以選擇利用spark集群強大的計算資源,并行化地計算。
spark可以提供了豐富的數據處理操作,包括在線的流式數據處理、離線的批量數據處理、即席查詢、機器學習。
spark也提供了多種編程API接口,供具有不同開發經驗的數據分析者使用。
spark與Hadoop是什么關系呢? Hadoop有兩個核心模塊,分布式存儲模塊HDFS和分布式計算模塊Mapreduce。spark本身并沒有提供分布式文件系統,因此spark的分析大多依賴于Hadoop的分布式文件系統HDFS。另一方面,Hadoop的Mapreduce與spark都可以進行數據計算,而相比于Mapreduce,spark的速度更快并且提供的功能更加豐富。
下面來介紹如何使用spark RDD進行編程。
首先介紹一下spark RDD,spark建立在統一抽象的RDD之上,RDD指的是一個只讀的可分區的分布式數據集。可以將它的全部或部分緩存在內存中,供多次計算重用。而且RDD提供了多種友好的操作函數供數據分析者做數據處理。
spark為什么會在迭代計算中比hadoop快很多呢?Hadoop進行迭代數據處理時,需要把數據從HDFS中讀出,分析,寫回到HDFS中,再讀出、分析、寫回。在此過程中進行了大量的磁盤I/O操作,消耗了大量的時間。而spark可以將數據一次性地從HDFS讀到內存中,并進行多次計算,因而減少了大量的開銷。
通過spark RDD進行編程可以理解為利用RDD提供的算子、結合實際需求,設計一個數據處理的pipeline,將原始數據轉換成我們需要得到的數據。RDD算子分為transformation和action,transformation是得到一個新的RDD,并且不會執行計算,直到遇到action算子的時候計算才會被觸發。
這是一些常用的spark RDD算子。
下面來介紹如何使用spark RDD進行數據處理。總結起來可以分為以下三步:1.根據我們的目標定義好輸入和輸出數據的格式,并比較兩者之間的差異;2.明確輸入輸出后我們根據RDD本身提供的算子以及自己定義的函數來設計pipeline;3.選擇一種API編程實現。
我們以詞頻統計為例進行說明。我們希望對一段非結構化文本做詞頻統計,即統計一段文本中每個單詞出現的次數,并將單詞按照字母ASCII順序升序排列。首先定義好我們的輸入與輸出數據格式,輸入數據是一段介紹spark的文本,輸出是逗號分隔的詞頻統計。
第二步設計算子pipeline,首先將數據從HDFS中讀取,通過flatMap算子、map算子和reduceByKey算子統計出每個單詞出現的頻次,通過sortByKey算子將單詞升序排列,再通過一個map算子轉化成我們需要的目標格式,最后通過save算子將處理好的結果寫回到HDFS中。
這是我們進行詞頻統計任務中使用的算子,包括4個transformation算子和一個action算子。
第三步我們來進行編程實現,在這里我們選擇python進行編程。我們看到原本很復雜的pipeline,spark只需要短短的幾行代碼就可以實現,可見spark的強大功能以及對數據分析者提供的友好接口。
下面和大家介紹spark與大數據分析的關系。
數據分析一般需要進行兩次創造。首先是第一次創造,即從整體上進行產品設計,找到一個好的應用問題,并思考問題是否有意義,數據源是否可靠,現有數據源可以解決該問題嗎,是否需要其他數據源。在整體設計完成之后我們進行第二次創造,即在細節上通過技術實現,這個過程是一個不斷迭代往復的過程。總結起來,數據分析,首先要找到正確的問題,然后再正確地分析數據。當然兩者并非完全獨立,比如對數據的基本統計往往會幫助我們不斷深入地理解數據,進而發現問題。
下面介紹數據流與應用問題之間的關系,以及不同的數據分析工具在其中所起到的作用。在明確了應用問題,選擇好了數據源之后,我們首先將原始數據轉化為中間數據。原始數據往往量巨大(幾百GB、TB級別),并且多是未經清洗的非結構化數據,因此我們需要用HDFS進行存儲,使用大數據分析工具spark進行清洗壓縮編碼,得到結構化的中間數據,我們以后大部分的分析都可以基于中間數據進行。中間數據往往會比原始數據量小(幾十GB),但單機仍然難以處理,因此也需要存儲到HDFS中,使用spark/Hive進行進一步的處理,得到小數據。小數據大多是一些統計結果、提取的特征等等,數據量也相對較小(幾MB至幾GB),我們可以通過python、R語言等工具在單機上進行建模、分析,并將分析結果進行可視化,可以選擇R語言、python繪制靜態的統計圖,也可以選擇echarts、D3等工具進行交互展示。通過這些可視化的結果發現insight進而解決實際問題。
在大數據快速發展的今天,有多種多樣的大數據分析工具應運而生,我們為什么要選擇spark作為我們的大數據分析工具?相比于其他分析工具,spark具有哪些優勢?ETL、機器學習、即席查詢是大數據分析中非常重要的操作。已經有了一些大數據工具為此提供了解決方案,例如hadoop mapreduce解決大數據ETL、mahout解決大數據機器學習、hive解決大數據即席查詢。然而這給數據分析者帶來了不便,對于每一種大數據操作,都要學習一種新的技術,這帶來了很大的學習成本。
那么我們會設想,會不會有一種工具,將常用的大數據分析功能統一起來呢?
spark經過近年來的飛速發展,已經做到“one stack to rule them all”,通過RDD將三者統一在了一起。數據分析者可以通過spark core大數據ETL,通過spark Mllib進行大數據機器學習,通過spark SQL進行大數據即席查詢。
因此,數據分析者只需掌握spark一種工具,即可實現絕大多數的大數據分析功能。
最后,我來與大家分享一下我與其他4位小伙伴(上海交通大學的張宏倫、李鐸、楊皓天,同濟大學的金建棟)使用去年SODA的開放交通數據進行案例分析的一些結果:大型活動大規模人群的檢測與疏散。
上海經常會舉辦大型活動,例如大型演唱會、足球賽等。這些大型活動會聚集大量的人群,有時會因為人數過多產生安全隱患,例如2015年新年上海外灘的踩踏事件。這些活動舉辦的時間地點不固定,也難以得知全部活動的信息,如果活動臨時更改時間地點,也難以實時得到新的信息。
這給政府帶來了公共安全的隱患。對于參加活動的人,在活動結束時,往往地鐵已經停運,面對黑車的漫天要價,會面臨回家難的問題。而現在市場上已經出現了一些專用巴士公司,他們希望尋找更多的客源創造更多的利潤。然而三者之間聯系脆弱、信息孤立。
我們希望以開放數據為基礎,利用spark大數據分析技術,使用算法模型,通過交通數據識別出大型活動并提供疏散建議。為政府解決社會問題,為活動參加者解決回家難的問題,同時為專用巴士公司提供更多客源,創造更多利潤。
我們選取了公交卡刷卡數據、出租車運行數據、地鐵運行數據以及浦東公交車實時數據、氣象數據。其中使用最多的是一卡通乘客刷卡數據,包含了2015年4月上海市的所有公交卡刷卡記錄,涵蓋用戶1000萬以上,交易記錄2億4千萬條以上。
如此多的數據量單機難以處理,因此我們選擇spark作為數據處理工具。這是我們的整體架構,首先根據我們的目標進行數據集擴充,包括從非常票務網、大麥網等票務網站爬取的各大活動的信息。之后進行數據預處理工作,包括數據去噪、數據融合等。之后進行數據分析挖掘,包括時序分析、空間挖掘、個體行為建模等,并將分析結果可視化。
我們對多種交通工具的每天出行時間分布進行了統計,可以看到地鐵和公交車有著明顯的早晚高峰,而出租車除了午夜時間一天的乘客數量較為平均。
我們對每天的交通總流量進行了分析,發現交通流量穩定,并以周為單位呈周期規律,而工作日的總流量要高于休息日。觀察一周的總流量,周一到周四的交通流量基本相同,周五流量要略高于周一至周四,而周六流量要低于工作日,周日的流量為一周最低。
在分析完每天的交通總流量之后,我們分析了一天中各個時段的流量。選取了周一到周四工作日中的2天(一個晴天、一個雨天),工作日周五和工作日周六。我們發現兩個周一到周四工作日的流量曲線幾乎重合,因此我們可以推測,周一到周四的工作日不僅總流量穩定,而且各個時段的交通總流量穩定,且早晚高峰顯著。而觀察周五的流量,我們發現在大約10:00之前,流量曲線幾乎與周一到周四的流量重合,而10:00以后幾乎每個時段流量都會比平時高出一些,這解釋了為什么周五的總流量會高于周一到周四。而周六的流量沒有早晚高峰,但在空閑時段(如中午)流量要高于工作日。
我們分析了一個月以來地鐵乘客的公交卡刷卡次數分布。上海乘坐一次地鐵,進出需要刷卡2次,因此正常情況下,乘客的刷卡次數一定是偶數。從分布圖中我們也可以觀察到這一點,然而我們也發現也有一些乘客的刷卡次數呈奇數,這可能是設備故障或乘客逃票行為導致。另外,一個月來乘客的交易次數呈重尾分布,而且一個月中出行2次的乘客最多。
在分析了宏觀上的流量之后,我們來分析個體的行為。我們用模序(motif)來對個體的行為進行抽象,即用有向圖表示用戶一天的軌跡。比如第二幅圖中,乘客一天中先從站點1出發去2,再從站點2出發返回1,這是典型的通勤行為。我們發現乘客絕大多數的行為可以使用以上10種模序描述,因此絕大多數的乘客行為是規律的。我們也關注模序的變化,因為模序的變化暗示著行為的異常。比如某天大量用戶的模序發生變化且都去一個共同的地點,那么他們很可能去參加同一場大型活動。
下面我們研究大型活動與交通流量的關系。這是中華藝術宮地鐵站幾天的客流量。平時情況下,客流量較少。4.18號晚中華藝術宮附近場館舉辦了一場演唱會,可以看到這一天在活動開始前與結束后客流量大大增加,遠高于平時,且出現了兩個尖峰。因此大型活動確實對交通流量造成了較為顯著的影響,我們通過交通數據來識別大型活動是可行的。
下面是我們使用spark技術,通過模型做出的大型活動識別結果,做圖顏色表示地鐵,例如藍色代表8號線,小長方形表示地鐵站點。右圖表示一個月中哪一天算法檢測出了大型活動,白色表示沒有檢測到,紅色表示檢測到。右側兩條曲線分別表示當日的客流量與歷史平均的客流量。
最后,我們基于虹口足球場4月11日晚(一場足球賽)的交通數據進行了控制性模擬實驗。我們發現,在未采取控制前,需要歷史很長時間才可以完成疏散,而當使用巴士協助疏散之后,所花時間大大減少,這也降低了風險。同時,我們發現調配巴士數量越多、載客量越大疏散越快,但也有可能造成巴士資源浪費、造成損失,因此存在使得盈利最大和疏散最快的最優點,可以通過最優化模型得到。至此,我們以開放數據為基礎,利用spark大數據技術和算法模型,對乘客解決了活動結束回家難問題,對專用巴士提供了更多客源增加其收益,同時幫助政府減少了公共安全風險。
最后從一個數據分析者的角度,總結一下個人對數據分析的理解。我們首先要根據實際需求找到應用問題,數據是我們的研究基礎,spark/hadoop等技術是我們的分析工具,算法模型是我們的理論方法,而數據可視化是一種呈現信息的手段。
作者: 科賽網 汪夢夢 鄧以勒