2014年,Spark開源生態系統得到了大幅增長,已成為大數據領域最人氣的開源項目之一,活躍在Hortonworks、IBM、 Cloudera、MapR和Pivotal等眾多知名大數據公司,更擁有Spark SQL、Spark Streaming、MLlib、GraphX等多個相關項目。同時值得一提的是,Spark貢獻者中有一半左右的中國人。
短短四年時間,Spark不僅發展為Apache基金會的頂級開源項目,更通過其高性能內存計算及其豐富的生態快速贏得幾乎所有大數據處理用戶。 2015年1月10日,一場基于Spark的高性能應用實踐盛宴由Databricks軟件工程師連城、百度高級工程師甄鵬、百度架構師孫垚光、百度美國 研發中心高級架構師劉少山四位專家聯手打造。
Databricks軟件工程師連城——Spark SQL 1.2的提升和新特性
談及Spark SQL 1.2的提升和新特性,連城主要總結了4個方面——External data source API(外部數據源API)、列式內存存儲加強(Enhanced in-memory columnar storage)、Parquet支持加強(Enhanced Parquet support)和Hive支持加強(Enhanced Hive support)。
External data source API
連城表示,因為在處理很多外部數據源中出現的擴展問題,Spark在1.2版本發布了External data source API。通過External data source API,Spark將不同的外部數據源抽象成一個關系表格,從而實現更貼近無縫的操作。
External data source API在支持了多種如JSON、Avro、CSV等簡單格式的同時,還實現了Parquet、ORC等的智能支持;同時,通過這個API,開發者還可以使用JDBC將HBase這樣的外部系統對接到Spark中。
連城表示,在1.2版本之前,開發者其實已經實現了各種各樣外部數據源的支持,因此,對比更原生的支持一些外部數據源,External data source API的意義更在于針對相應數據源進行的特殊優化,主要包括Column pruning(列剪枝)和Pushing predicates to datasources(將predicates貼近數據源)兩個方面:
Column pruning。 主要包括縱橫的兩種剪枝。在列剪枝中,Column pruning可以完全忽視無需處理的字段,從而顯著地減少IO。同時,在某些條件查詢中,基于Parquet、ORC等智能格式寫入時記錄的統計信息 (比如最大值、最小值等),掃描可以跳過大段的數據,從而省略了大量的磁盤掃描負載。
Pushing predicates to datasources。 在更復雜的SQL查詢中,讓過濾條件維度盡可能的接近數據源,從而減少磁盤和網絡IO,最終提高整體端到端的性能。
使用External data source API之前
使用External data source API之后
搭載了如Parquet和ORC這樣的智能格式
連城表示,在Spark 1.2版本中,External data source API并沒有實現預期中的功能,在Roadmap中,First class分片支持(First class partitioning support with partition pruning)、Data sink(insertion)API、將Hive作為外部數據源等。
Enhanced in-memory columnar storage
連城表示,不管Shark,還是Spark,內存緩存表的支持都是非常重要的一個特性。他表示,雖然在1.1和之前版本中的列式內存表的性能已然 不錯,但是還會出現一些問題:第一,大數據量下緩存超大體積表時(雖然不推薦,但不缺現實用例),會出現OOM等問題;第二,在列式存儲中,像 Parquet、ORC這種收集統計信息然后通過這些信息做partition skipping等操作在之前版本中并沒有完全實現。這些問題在1.2版本中都得到了解決,本節,連城主要介紹了語義統一、緩存實體化、基于緩存共享的查 詢計劃、Cache大表時的OOM問題、表格統計(Table statistics)等方面。
緩存實體化。 SQLContext.cacheTable(“tbl”)默認使用eager模式,緩存實體化將自動進行,不會再等到表被使用或觸發時,避免手動做 “SELECT COUNT(*) FROM src;”。同時,新增了“CACHE [LAZY] TABLE tbl [AS SELECT …]”這樣的DML。
語義統一。 早期時候,SchemaRDD.cache()和SQLContext.cacheTable(“tbl”)這兩個語義是不同的。其 中,SQLContext.cacheTable會去建立一些列式存儲格式相關優化,而SchemaRDD.cache()卻以一行一個對象的模式進行。 在1.2版本中,這兩個操作已被統一,同時各種cache操作都將得到一個統一的內存表。
基于緩存共享的查詢計劃。 兩個得到相同結果的cache語句將共享同一份緩存數據。
避免Cache大表時的OOM問題。 優化內存表的建立和訪問,減少開銷,進一步提升性能;在緩存大表時,引入batched column buffer builder,將每一列切成多個batch,從而避免了OOM。
表格統計。 Table statistics,類似Parquet、ORC使用的技術,在1.2版本中主要實現了Predicate pushdown(實現更快的表格掃描)和Auto broadcast join(實現更快的表格join)。
最后,連城還詳細介紹了一些關于加強Parquet和Hive支持的實現,以及Spark未來的一些工作。
百度基礎架構部高級工程師甄鵬——Spark在百度開放云BMR中的實戰分享
百度分布式計算團隊從2011年開始持續關注Spark,并于2014年將Spark正式引入百度分布式計算生態系統中,在國內率先面向開發者及 企業用戶推出了支持Spark并兼容開源接口的大數據處理產品BMR(Baidu MapReduce)。在甄鵬的分享中,我們主要了解了百度Spark 應用現狀、百度開放云BMR和Spark On BMR三個方面的內容。
Spark在百度
甄鵬表示,當前百度的Spark集群由上千臺物理主機(數萬Cores,上百TBMemory)組成,日提交App在數百,已應用于鳳巢、大搜索、直達號、百度大數據等業務。之以選擇Spark,甄鵬總結了三個原因:快速高效、API 友好易用和組件豐富。
快速高效。 首先,Spark使用了線程池模式,任務調度效率很高;其次,Spark可以最大限度地利用內存,多輪迭代任務執行效率高。
API友好易用。 這主要基于兩個方面:第一,Spark支持多門編程語言,可以滿足不同語言背景的人使用;第二,Spark的表達能力非常豐富,并且封裝了大量常用操作。
組件豐富。 Spark生態圈當下已比較完善,在官方組件涵蓋SQL、圖計算、機器學習和實時計算的同時,還有著很多第三方開發的優秀組件,足以應對日常的數據處理需求。
百度開放云BMR
在BMR介紹中,甄鵬表示,雖然BMR被稱為Baidu MapReduce,但是這個名稱已經不能完全表示出這個平臺:BMR是百度開放云的數據分析服務產品,基于百度多年大數據處理分析經驗,面向企業和開發 者提供按需部署的Hadoop&Spark集群計算服務,讓客戶具備海量數據分析和挖掘能力,從而提升業務競爭力。
如圖所示,BMR基于BCC(百度云服務器),建立在HDFS和BOS(百度對象存儲)分布式存儲之上,其處理引擎包含了MapReduce和 Spark,同時還使用了HBase數據庫。在此之上,系統集成了Pig、Hive、SQL、Streaming、GraphX、MLLib等專有服務。 在系統的最上層,BMR提供了一個基于Web的控制臺,以及一個API形式的SDK。
在圖片的最右邊,Scheduler在BMR中起到了管理作用,使用它開發者可以編寫比較復雜的作業流。
Spark On BMR
類似于通常的云服務,BMR中的Spark同樣隨用隨起,集群空閑即銷毀,幫助用戶節省預算。此外,集群創建可以在3到5分鐘內完成,包含了完整的Spark+HDFS+YARN堆棧。同時,BMR也提供Long Running模式,并有多種套餐可選。