一些大數據客戶想分析新數據以對特定事件作出響應,他們可能已經定義好管道來執行批處理操作,這些管道是由 AWS Data Pipeline精心協調安排的。事件觸發管道的示例之一就是當數據分析師在一收到數據就必須對其進行分析時,以便他們可以立刻向合作伙伴作出相應。在這種情況下調度不是最優的解決方案,主要問題是如何在任意時間使用依賴于調度程序的Data Pipeline調度數據處理過程。
這里有一個解決方案。首先,創建一個簡單的管道,使用來自 Amazon S3的數據對管道進行測試,然后添加一個 Amazon SNS主題,使其在管道完成時通知客戶,以便數據分析師能夠查看處理結果。最后,創建一個 AWS Lambda函數,使其在新數據被成功提交到S3桶中時激活Data Pipeline,在此過程中,不用管理任何調度活動。該篇帖子將會向你展示如何實現這一過程。
在Data Pipeline活動可被調度時,客戶可以定義先決條件。這些先決條件可以看到數據是否存在于S3中,然后進行資源分配。但是,在Data Pipeline需要隨時被激活時,使用Lambda是一種很好的途徑。
克隆管道以備后用
在這種場景下,客戶的管道已經通過一些預定的活動被激活,但是想要能夠調用相同的管道以對某個特別事件,如提交新數據到S3桶中,作出響應。客戶已經開發了一個達到Finished狀態的“模板”管道。
重新發起該管道的一種方法是在S3中使用管道定義來保存JSON文件,使用它創建一個新管道。一些客戶在S3中對相同管道以多個版本的形式存儲,但是又想克隆和重新使用最近剛剛執行的那個管道版本。從已完成管道中獲取管道定義并創建一個克隆管道,這是可以滿足這種要求的簡單方法。這種方法依賴于最近被執行的管道,不需要客戶保存來自S3的管道版本注冊表,也不需要追蹤最近被執行的版本。
即使客戶想在S3中保留這樣的一個管道注冊表,他們可能也想使用Lambda API即時從一個既存的管道中獲取一個管道定義。他們可能有復雜的事件驅動工作流程,在這些流程中,他們需要克隆已完成的管道,重新運行它們,然后刪除克隆的管道。這就是為什么首先檢測處于Finished狀態的管道是如此重要了。
在本篇帖子中,我會向你展示如何完成這樣即時的管道克隆。在Data Pipeline中沒有直接克隆API,所以你可以進行幾次API調用完成這一過程。我也提供了代碼,使你能夠刪除已完成的過時的克隆管道。
三步式工作流程
創建一個簡單管道用于測試。
創建一個SNS通知,在管道完成時通知分析師。
創建一個Lambda函數,在新數據被提交到S3桶中時激活管道
第一步:創建一個簡單管道。
打開 AWS Data Pipeline控制臺。
如果在該域中還沒有創建管道,控制臺屏幕將會展示介紹性的信息。在這種情況下,選擇Get started now。如果在該域中你已經創建過管道了,控制臺將會顯示你在該域中創建的所有管道。在這種情況下,選擇Create new pipeline。
輸入名稱和描述信息。
選擇一個Elastic MapReduce (EMR)模板,然后選擇Run once on pipeline activation。
在Step字段中,輸入如下信息:/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,s3n://elasticmapreduce/samples/wordcount/input,-output,s3://example-bucket/wordcount/output/#{@scheduledStartTime},-mapper,s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate
你可以調整Amazon EMR集群節點的數量,選擇分發方式。想要獲取管道創建的更多信息,參見 Getting Started with AWS Data Pipeline。
第二步:創建一個SNS主題
想要創建一個SNS主題,執行以下步驟:
在瀏覽器的一個新頁簽中,打開 Amazon SNS console(Amazon SNS控制臺)。
選擇Create topic。
在Topic name字段中,輸入主題名稱。
選擇Create topic。選擇新主題,然后選擇主題ARN。Topic Details頁面出現
拷貝主題ARN用于下一個任務。
為該主題創建訂閱任務,提供你的電子郵件地址。AWS會發送電子郵件來確認你的訂閱結果。
想要在管道中配置主題通知動作,執行以下步驟
在 AWS Data Pipeline控制臺,在Architect窗口中打開你的管道。
在右側窗格中,選擇Others。
在DefaultAction1下,執行如下步驟:
輸入通知的名稱(如MyEMRJobNotice)
在Type字段中,選擇SnsAlarm。
在Subject字段中,輸入事由行。
在Topic Arn字段中,輸入主題的ARN。
在Message字段中,輸入消息內容。
Leave Role set to the default value.Role保留其默認值。
保存并激活管道,確保它能成功執行。
第三步:創建一個Lambda函數
在Lambda控制臺中,選擇Create a Lambda function。你可以選擇一個藍圖或者只是跳過第一步,繼續進行Step 2: Configure function(第二步:配置函數),在該步驟中,你提供一個函數名稱(如LambdaDP)和一條描述信息,選擇Node.js作為Runtime字段的值。
測試管道已經完成。目前仍不支持重新運行已完成的管道。要想重新運行一個已完成管道,從模板中克隆該管道,Lambda會觸發一個新管道。每一次清除老的克隆管道時,你將需要Lambda來創建一個新克隆管道。下面是幫助實現新管道克隆的一些函數。在Lambda控制臺中,使用Code entry type和Edit code inline字段,以下面的代碼開始:
console.log('Loading function'); var AWS = require('aws-sdk'); exports.handler
= function(event, context) { var Data Pipeline = new AWS.Data Pipeline();
var pipeline2delete ='None'; var pipeline ='df-02….T'; ………. }
定義管道ID,為克隆管道ID創建一個變量,比如pipeline2delete。然后,添加一個函數,執行下面的代碼,檢查前面的運行過程中遺留下來的既存克隆管道:
//Iterate over the list of pipelines and check if the pipeline clone already
exists Data Pipeline.listPipelines(paramsall, function(err, data) { if
(err) {console.log(err, err.stack); // an error occurred} else {console.log(data);
// successful response for (var i in data.pipelineIdList){ if (data.pipelineIdList[i].name
=='myLambdaSample') { pipeline2delete = data.pipelineIdList[i].id; console.log('Pipeline
clone id to delete: '+ pipeline2delete); };
如果前面的運行過程中遺留下來的已完成克隆管道已經被識別出來,你必須在該循環中調用刪除函數。下面展示了實現調用的示例代碼:
var paramsd = {pipelineId: pipeline2delete /* required */};
Data Pipeline.deletePipeline(paramsd, function(err, data) {
if (err) {console.log(err, err.stack); // an error occurred}
else console.log('Old clone deleted '+ pipeline2delete + 'Create new clone now');
});
最后,你需要進行三次API調用,從原來的Data Pipeline模板中創建一個新的克隆。下面是你可以使用的API:
getPipelineDefinition (for the finished pipeline)
createPipeline
putPipelineDefinition (from #1)
下面是這三次調用的示例:
1、使用管道定義創建下一個克隆:
var params = {pipelineId: pipeline};
Data Pipeline.getPipelineDefinition(params, function(err, definition) {
if (err) console.log(err, err.stack); // an error occurred
else {
var params = {
name: 'myLambdaSample', /* required */
uniqueId: 'myLambdaSample' /* required */
};
2、使用來自定義對象的克隆定義:
Data Pipeline.createPipeline(params, function(err, pipelineIdObject) {
if (err) console.log(err, err.stack); // an error occurred
else { //new pipeline created with id=pipelineIdObject.pipelineId
console.log(pipelineIdObject); // successful response
//Create and activate pipeline
var params = {
pipelineId: pipelineIdObject.pipelineId,
pipelineObjects: definition.pipelineObjects//(you can add parameter objects and values)
3、使用來自getPipelineDefinition API結果的定義:
Data Pipeline.putPipelineDefinition(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
Data Pipeline.activatePipeline(pipelineIdObject, function(err, data) { //Activate the pipeline finally
if (err) console.log(err, err.stack);
else console.log(data);
});
}
});
}});
}});
現在你具備了Lambda函數所需的所有函數調用過程。你也可以執行下面的步驟將這些調用過程打包成一個獨立的函數:
輸入Handler字段的值作為函數(LambdaDP.index)的名稱。
Role。該選項可以使你訪問像S3和Data Pipeline這樣的資源。
保留Memory和Timeout的默認值。
選擇Next,檢查函數,選擇Create function。
在Event source字段中,選擇S3。
提供管道所使用的桶的名稱。
在Event type字段,選擇Put。在新文件被提交到桶中時,該選項會激活管道。
保存管道,上傳一個數據文件到S3桶中。
檢查Data Pipeline控制臺,確保新管道已經創建完畢并已被激活(管道完成后,你應該能收到一條SNS通知消息)。