本文旨在介紹使用機器學習算法,來介紹Apache Spark數據處理引擎。我們一開始會先簡單介紹一下Spark,然后我們將開始實踐一個機器學習的例子。我們將使用Qualitative Bankruptcy數據集,來自UCI機器學習數據倉庫。雖然Spark支持同時Java,Scala,Python和R,在本教程中我們將使用Scala作為編程語言。不用擔心你沒有使用Scala的經驗。練習中的每個代碼段,我們都會詳細解釋一遍。
APACHE SPARK
Apache Spark是一個開源的集群計算框架,用Spark編寫的應用程序可以比Hadoop MapReduce范式的速度高100倍以上。Spark的一個主要的特點,基于內存,運行速度快,不僅如此,復雜應用在Spark系統上運行,也比基于磁盤的MapReduce更有效。Spark還旨在更通用,因此它提供了以下庫:
Spark SQL,處理結構化數據的模塊
MLlib,可擴展的機器學習庫
GraphX,圖和圖的并行計算API
Spark Streaming,可擴展的,可容錯的流式計算程序
正如已經提到的,Spark支持Java,Scala,Python和R編程語言。它還集成了其他大數據工具。特別是,Spark可以運行在Hadoop集群,可以訪問任何數據源,包括Hadoop Cassandra。
Spark核心概念
在一個高的抽象層面,一個Spark的應用程序由一個驅動程序作為入口,在一個集群上運行各種并行操作。驅動程序包含了你的應用程序的main函數,然后將這些應用程序分配給集群成員執行。驅動程序通過SparkContext對象來訪問計算集群。對于交互式的shell應用,SparkContext默認可通過sc變量訪問。
Spark的一個非常重要的概念是RDD–彈性分布式數據集。這是一個不可改變的對象集合。每個RDD會分成多個分區,每個分區可能在不同的群集節點上參與計算。RDD可以包含任何類型的Java,Scala對象,Python或R,包括用戶自定義的類。RDDS的產生有兩種基本方式:通過加載外部數據集或分配對象的集合如,list或set。
在創建了RDDs之后,我們可以對RDDs做2種不同類型的操作:
Transformations - 轉換操作,從一個RDD轉換成另外一個RDD
Actions - 動作操作,通過RDD計算結果
RDDs通過lazy的方式計算 - 即當RDDs碰到Action操作時,才會開始計算。Spark的Transformations操作,都會積累成一條鏈,只有當需要數據的時候,才會執行這些Transformations操作。每一次RDD進行Action操作時,RDD都會重新生成。如果你希望某些中間的計算結果能被其他的Action操作復用,那么你需要調用Spark的RDD.persist()來保存中間數據。
Spark支持多種運行模式,你可以使用交互式的Shell,或者單獨運行一個standalone的Spark程序。不管哪一種方式,你都會有如下的工作流:
輸入數據,用于生成RDD
使用Transformations 操作轉換數據集
讓Spark保存一些中間計算結果,用于復用計算
使用Action操作,讓Spark并行計算。Spark內部會自動優化和運行計算任務。
安裝Apache Spark
為了開始使用Spark,需要先從官網下載。選擇“Pre-built for Hadoop 2.4 and later”版本然后點擊“Direct Download”。如果是Windows用戶,建議將Spark放進名字沒有空格的文件夾中。比如說,將文件解壓到:C:spark。
正如上面所說的,我們將會使用Scala編程語言。進入Spark的安裝路徑,運行如下命令:
// Linux and Mac users
bin/spark-shell
// Windows users
binspark shell
然后你可以在控制臺中看到Scala:
scala>
QUALITATIVE 破產分類
現實生活中的問題是可以用機器學習算法來預測的。我們將試圖解決的,通過一個公司的定性信息,預測該公司是否會破產。數據集可以從UCI機器學習庫https://archive.ics.uci.edu/ml/datasets/qualitative_bankruptcy下載。在Spark的安裝文件夾中,創建一個新的文件夾命名為playground。復制 qualitative_bankruptcy.data.txt文件到這里面。這將是我們的訓練數據。
數據集包含250個實例,其中143個實例為非破產,107個破產實例。
每一個實例數據格式如下:
工業風險
管理風險
財務靈活性
信譽
競爭力
經營風險
這些被稱為定性參數,因為它們不能被表示為一個數字。每一個參數可以取下以下值:
P positive
A average
N negative
數據集的最后一個列是每個實例的分類:B為破產或NB非破產。
鑒于此數據集,我們必須訓練一個模型,它可以用來分類新的數據實例,這是一個典型的分類問題。
解決問題的步驟如下:
從qualitative_bankruptcy.data.txt文件中讀取數據
解析每一個qualitative值,并將其轉換為double型數值。這是我們的分類算法所需要的
將數據集劃分為訓練和測試數據集
使用訓練數據訓練模型
計算測試數據的訓練誤差
SPARK LOGISTIC REGRESSION
我們將用Spark的邏輯回歸算法訓練分類模型。如果你想知道更多邏輯回歸算法的原理,你可以閱讀以下教程http://technobium.com/logistic-regression-using-apache-mahout。
在Spark的Scala Shell中粘貼以下import語句:
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
這將導入所需的庫。
接下來我們將創建一個Scala函數,將數據集中的qualitative數據轉換為Double型數值。鍵入或粘貼以下代碼并回車,在Spark Scala Shell。
def getDoubleValue( input:String ) : Double = {
var result:Double = 0.0
if (input == "P") result = 3.0
if (input == "A") result = 2.0
if (input == "N") result = 1.0
if (input == "NB") result = 1.0
if (input == "B") result = 0.0
return result
}
如果所有的運行都沒有問題,你應該看到這樣的輸出:
getDoubleValue: (input: String)Double
現在,我們可以讀取到qualitative_bankruptcy.data.txt文件中的數據。從Spark的角度來看,這是一個Transformation操作。在這個階段,數據實際上不被讀入內存。如前所述,這是一個lazy的方式執行。實際的讀取操作是由count()引發,這是一個Action操作。
val data = sc.textFile("playground/Qualitative_Bankruptcy.data.txt")
data.count()
用我們val關鍵字聲明一個常量data。它是一個包含輸入數據所有行的RDD。讀操作被SC或sparkcontext上下文變量監聽。count操作應返回以下結果:
res0: Long = 250
現在是時候為邏輯回歸算法準備數據,將字符串轉換為數值型。
val parsedData = data.map{line =
val parts = line.split(",")
LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x =getDoubleValue(x))))
}
在這里,我們聲明了另外一個常量,命名為parsedData。對于data變量中的每一行數據,我們將做以下操作:
使用“,”拆分字符串,并獲得一個向量,命名為parts
創建并返回一個LabeledPoint對象。每個LabeledPoint包含標簽和值的向量。在我們的訓練數據,標簽或類別(破產或非破產)放在最后一列,數組下標0到6。這是我們使用的parts(6)。在保存標簽之前,我們將用getDoubleValue()函數將字符串轉換為Double型。其余的值也被轉換為Double型數值,并保存在一個名為稠密矢量的數據結構。這也是Spark的邏輯回歸算法所需要的數據結構。
Spark支持map()轉換操作,Action動作執行時,第一個執行的就是map()。
我們來看看我們準備好的數據,使用take():
parsedData.take(10)
上面的代碼,告訴Spark從parsedData數組中取出10個樣本,并打印到控制臺。一樣的,take()操作之前,會先執行map()。輸出結果如下:
res5: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((1.0,[3.0,3.0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1.0,2.0,2.0,2.0,1.0]), (1.0,[2.0,2.0,2.0,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,3.0,3.0,3.0]), (1.0,[1.0,1.0,3.0,3.0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.0,3.0,2.0]), (1.0,[3.0,3.0,2.0,3.0,3.0,3.0]), (1.0,[3.0,3.0,3.0,2.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,2.0,3.0,3.0]))
接著我們劃分一下訓練數據和測試數據,將parsedData的60%分為訓練數據,40%分為測試數據。
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val trainingData = splits(0)
val testData = splits(1)
訓練數據和測試數據也可以像上面一樣,使用take()者count()查看。
激動人心的時刻,我們現在開始使用Spark的LogisticRegressioinWithLBFGS()來訓練模型。設置好分類個數,這里是2個(破產和非破產):
val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)
當模型訓練完,我們可以使用testData來檢驗一下模型的出錯率。
val labelAndPreds = testData.map { point =
val prediction = model.predict(point.features)
(point.label, prediction)
}
val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testData.count
變量labelAndPreds保存了map()轉換操作,map()將每一個行轉換成二元組。二元組包含了testData的標簽數據(point.label,分類數據)和預測出來的分類數據(prediction)。模型使用point.features作為輸入數據。
最后一行代碼,我們使用filter()轉換操作和count()動作操作來計算模型出錯率。filter()中,保留預測分類和所屬分類不一致的元組。在Scala中_1和_2可以用來訪問元組的第一個元素和第二個元素。最后用預測出錯的數量除以testData訓練集的數量,我們可以得到模型出錯率:
trainErr: Double = 0.20430107526881722
總結
在這個教程中,你已經看到了Apache Spark可以用于機器學習的任務,如logistic regression。雖然這只是非分布式的單機環境的Scala shell demo,但是Spark的真正強大在于分布式下的內存并行處理能力。
在大數據領域,Spark是目前最活躍的開源項目,在過去幾年已迅速獲得關注和發展。在過去的幾年里。采訪了超過2100受訪者,各種各樣的使用情況和環境。
[參考資料]
“Learning Spark” by HoldenKarau, Andy Konwinski, Patrick Wendell and Matei Zaharia, O’Reilly Media 2015
Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression
https://spark.apache.org/docs/1.1.0/mllib-data-types.html
https://archive.ics.uci.edu/ml/datasets/Qualitative_Bankruptcy
https://databricks.com/blog/2015/01/27/big-data-projects-are-hungry-for-simpler-and-more-powerful-tools-survey-validates-apache-spark-is-gaining-developer-traction.html
原文來自:LOGISTIC REGRESSION USING APACHE SPARK