了解如何使用Java和Spark MLlib開發一種算法,該算法能夠根據700萬條記錄的數據集檢測欺詐行為。
在這篇文章中,我們將使用Spark MLlib開發Java中的算法。完整的工作代碼可以從GitHub下載??梢栽诓皇褂蒙钊氲腏ava知識(使用配置文件)的情況下,用幾種不同的配置和實驗來運行代碼。
在之前的文章中,我們使用Octave實現了相同的異常檢測算法。我們從七百萬個篩選出了500,000條記錄(僅限于TRANSFER類型),以便調查和了解可用數據。此外,還繪制了幾張圖表來顯示數據和異常(欺詐)的樣子。由于Octave加載了內存中的所有數據,因此它對大數據有限制。出于這個原因,我們將使用Spark在700萬個更大的數據集上運行異常檢測。
高斯分布
本節簡要介紹如何使用高斯函數進行異常檢測。高斯密度函數具有鐘形曲線形狀,如下所示:
大部分數據的常規數據往往處于鐘形曲線的中心,而邊緣的異常更為罕見。與此同時,我們可以看到邊緣上的點與中心點(接近0.4)相比具有更低的函數值(或者概率小于0.1)。
在這個例子之后,我們可以說每一個具有低于0.05的概率密度函數的例子都是異常的。當然,我們可以根據需要來控制閾值。大的值意味著更多的異常被標記,其中大部分可能不是異常。另一方面,小的值意味著我們可能錯過異常,因為算法變得更加寬容。
上面的例子是一維的,數據只有一個特征。實際上,我們有更多的功能和維度的數據。為了將數據繪制到圖中,我們使用主成分分析(PCA)將數據的維數減少到二維(2D)甚至三維(3D)。以下是兩個維度的示例:
注意正常的數據在第一個和第二個圓的中間趨于一起,異常在第三個圓的邊緣。圖上的圓圈表示高斯鐘形曲線如何在數據之間分布(通常,它將在3D中為鐘形,但為了簡單明了,以2D表示)。
為了在鐘形圖中的某個位置上舉一個例子,我們需要計算兩個分量:μ(均值)和σ2(方差)。一旦計算了均值和方差,我們可以應用一個相當簡單的公式來得到新的例子的密度概率。如果概率低于某個特定值(σ),我們將其標記為異常;否則,這是正常的。在我以前的文章中查找關于開發的細節。
Spark和MLlib
本節提供Spark和MLlib的簡要說明。
Spark
Apache Spark是一個集群計算框架。 Spark幫助我們在群集中的不同節點上并行執行作業,然后將這些結果合并成一個結果/響應。它將我們的數據集合轉換為分布在集群節點(稱為RDD(彈性分布式數據集))的元素集合。例如,在一個Java程序中,我們可以將一個集合轉換成一個能夠并行操作的RDD,如下所示:
并行集合被分割成分區,Spark的每個分區執行一個任務,所以我們希望每個CPU有兩到四個分區。我們可以通過用sc.parallelize(collection,partitionNumber)定義另一個參數來控制Spark創建的分區數量。除了來自應用程序的集合之外,Spark還能夠轉換來自Hadoop支持的存儲源(包括本地文件系統,HDFS,Cassandra,HBase和Amazon S3)的數據。
將數據轉換為RDD后,我們可以在集群節點上執行兩種并行操作。轉換操作將RDD集合作為輸入,并返回一個新的RDD集合,如映射和操作,它們采用RDD并返回單個結果,如reduce、count等。不管類型如何,操作都是惰性的,類似于Java 8在定義時不運行,而是在請求時運行。因此,可以在請求時多次計算一個操作,為了避免這種情況,保存在內存或緩存中。
MLlib
Spark支持Java、Scala、Python和R API。它還支持一套豐富的高級工具,包括用于SQL和結構化數據處理的Spark SQL,用于機器學習的MLlib,用于圖形處理的GraphX以及Spark Streaming。
MLlib是Spark的機器學習(ML)庫。它提供了幾個現成的ML工具,如:
ML算法
·分類
·回歸
·聚類
·協作過濾
Featurization
·特征提取
·轉型
·降維
·選擇
公用事業
·線性代數
·統計
·數據處理
數據準備
我們需要為算法執行準備數據。以下是數據的樣子:
我們需要把所有東西都轉換成數字。 幸運的是,大部分數據都是數字,只有nameOrig和nameDest以C,D或M這樣的字符開始。我們簡單地用1代替C,用2代替D,用3代替M。同樣,我們將字符從chars轉換為數字 如下所示:
所有的準備工作都是通過使用Spark轉換操作映射的Java代碼完成的:
之后,文件應該是這樣的:
由于較大的文件大小和GitHub文件大小限制,數據不在代碼中提供。你可以從這里下載文件(https://www.kaggle.com/ntnu-testimon/paysim1),將其重命名為allData.csv(將常量FILE_NAME更改為其他名稱),并將其復制到文件夾data /中。
執行算法
讓我們一步一步看看我們如何執行異常檢測算法。
1.從所有數據(七百萬)中,我們需要隨機選擇一個百分比進行訓練、交叉驗證和測試數據。 隨機挑選數據集的常規和欺詐性數據的代碼如下所示:
我們運行這個代碼兩次以獲得訓練和交叉驗證數據。 剩下的是測試數據。 稍后我們會看到幾個百分比的選擇。
2.接下來,將需要μ(均值)和σ2(方差)的計算,因為它們對于獲得新例子的概率至關重要。 代碼如下所示:
3.如前所述,一旦我們使用高斯公式,具有均值和方差,就可以計算出概率值。 根據概率值,我們決定它是一個異常還是一個正常的例子。 將該值與某個閾值(ε)進行比較。如果它較低,那么我們將其標記為異常,如果更大,則將其標記為常規。 選擇epsilon是至關重要的,因為具有小的價值會導致算法標記大量的虛假欺詐。 另一方面,大的價值,我們又會錯過欺詐。 所以使用精確的交叉驗證數據和召回選擇最佳的epsilon。
4.現在,我們準備在測試數據上評估我們的算法(我們也對交叉驗證數據做了可選的評估)?! ?img alt="使用Java和Spark MLlib開發一種算法檢測欺詐行為,可行嗎?" src="http://p1.pstatp.com/large/5688000119acf7efc978" />
在執行算法之前,需要下載數據(因為GitHub的文件大小限制而沒有打包),解壓縮,然后把allData.csv復制粘貼到文件夾data / allData.csv。 文件位置是可配置的,以及文件名稱。 該算法可以通過配置文件config / algorithm.properties中的數據和各種選項進行測試,如下所示:
配置更改后,應用程序可以在Java IDE或Maven上運行:
根據機器和配置,應用程序可能需要一些時間(對我來說,這需要兩分鐘)完成。 此外,計算機可能會凍結,因為在某個時刻,Spark將CPU占用率降至100%。 此外,應用程序使用大量的內存(2-3 GB)。 可以看到打印在控制臺上的結果或通過查看文件夾out /; 會有一個生成的文件* .txt與輸出。 該算法是基于隨機性的,所以你可以將其配置為運行多次,并且每次執行期望一個文件。
實驗和結果
從這次的實驗中,發現欺詐只適用于兩種類型:TRANSFER和CASH_OUT。 在以前的文章中詳細調查了TRANSFER。 我們取得了很高的比例:99.7%。
當僅為CASH_OUT類型運行而不跳過任何列/功能時,我們得到的結果很差:
我們只能找到約14%的這種欺詐行為。 以前,我們可以通過使特征看起來像高斯鐘形曲線來改善很多,但不幸的是,這次并不是這樣。
我們可以做的是看看功能,看看是否可以添加或跳過一些功能,因為功能cam=n引起混亂和噪音,而不是好處。查看數據源,我們有以下描述的欺詐,這可以幫助:
當大量的資金變現時,可能是欺詐。 慢慢地,我們開始刪除不需要的功能,通過刪除功能[1,2,3,7,8]或類型找到了很好的結果。 當兌現時,資金被占用的賬戶比目的地更重要,因為賬戶可能已經有錢了,而且看起來很正常,但是一個空的來源賬戶可能表示欺詐行為。 當離開目標帳戶名稱,它可能有助于欺詐性帳戶名稱。 結果如下所示:
這是一個巨大的進步。我們能夠通過將所有類型組合在一起,從14%提高到82.77%。 此外,它不會帶來不同的跳過功能的更好結果(隨意嘗試,因為并非所有這些都被探索)。 我只能通過跳過數量(2)得到一些結果,但這仍然不令人滿意,因為大量的非欺詐活動被標記了(1,040,950)。
在這種情況下,為每種類型都運行算法可能會更好。 當一個可能的交易完成時,我們會反對它的類型。 通過這種方式,可以更好地檢測到欺詐行為,因為轉賬有99.7%的利率,現金流有87%。 但是,對于現金流,我們可以說這個比率并不令人滿意,也許還有其他的方法值得嘗試,但這必須首先被調查,而且通常,直覺是錯誤的,還花費了很多時間。 由于隱私問題在金融應用中獲取更多數據是困難的,所以寧愿在這里應用不同的算法。 當現金流的數據被繪制時,我們得到如下的視圖:
該圖顯示問題在于大多數欺詐行為被包含在正常數據的中心,并且該算法努力檢測它們。不過,我相信還有其他方法可以混合使用甚至能添加更多的功能。
Java流與Spark
我們可以配置算法(請參閱屬性runsWith)在Spark或Java 8 Streams上運行以處理數據。如果要在集群上的多個遠程節點上運行代碼并將結果集合到請求的機器上,則Spark是一個很好的框架。在本文中,算法在本地執行,Spark將本地資源(如CPU數量)視為目標群集資源。另一方面,Java 8流很容易提供與collection.stram()。parallel()(當然,在本地運行的機器上)的并行性。因此,作為實驗的一部分,Java 8流在一臺機器上與Spark進行了比較。
結果表明,Java 8流在本地速度更快,即使不是太多。 Java = 111,927秒,Spark = 128,117秒。所以基本上,在運行所有數據時,流速要快16-25秒。請注意,每個人的電腦結果可能會有所不同。
由于Spark針對分布式計算進行了優化,與Java Streams相比,它在分區、任務等方面存在一些開銷,只需要考慮本地機器,并且可以在那里進行優化。無論如何,都可以看到數據量在本地增加的差距。
對于少量的數據,Java 8 流更適合,但是對于大量的數據,Spark的縮放比例更好。也許值得嘗試在AWS上運行的群集上配置Spark,而不是在本地。有關更多詳細信息,請參閱處理相同精確算法的兩個Java實現的代碼,但具有不重要的小差異:Fraud Detection Algorithm Java Stream和Fraud Detection Algorithm Spark。