Abstract
MapReduce is becoming an important parallel programming paradigm for processing Internet scale data. It is widely used to process jobs such as searching, analyzing, and mining on large scale structured and semi-structured data. It is still a problem for the emerging MapReduce-like systems to analyze and eva luate systematically and efficiently.
This paper discussed the issues in performance eva luation for MapReduce runtime system. We designed and chose a series of representative programs and data as benchmark. And then we implement profiling in our homemade MapReduce system which named Tplatform. We did the eva luation experiment for finding the bottleneck of the system. Through the experiment, we found some performance problems such as scheduling and stragglers etc. We implemented backup tasks for improving the problems caused by stragglers. Our simulation results reveal that we improve the performance efficiently.
引言
MapReduce正在成為人們在海量數據上進行并行計算的重要編程模型,比如為大規模的網頁做索引、在海量的數據中進行挖掘、龐大的科學計算任務等等。
人們開始關注在普通計算機上實現大規模的并行計算以提供各種服務,Google則無疑是這方面的先驅者。Google使用MapReduce作為日常計算的引擎,將每天處理20PB的數據[ Dean, J. and Ghemawat, S. MapReduce: Simplified Data Processing on Large Clusters. proceedings of OSDI, 2004.]存在底層的存儲系統如GFS6、BigTable7中。很多重要的搜索引擎服務,如索引、網頁排序、網頁消重與去噪、用戶日志分析、用戶行為預測等等,都可以使用MapReduce的框架來加快程序員在進行相關的處理。
此外,MapReduce也是一個如今很受歡迎的并行計算模型。MapReduce良好的可擴展性使得并行處理變得很容易,人們可以很方便地把MapReduce部署到大規模的廉價機群上使用。它的開源實現版本Hadoop[ Hadoop. The hadoop project. http://lucene.apache.org/hadoop/, 2006.]也得到了廣泛的應用。如今很多公司如Yahoo!、FaceBook、Amazon、New York Times,以及部分研究機構和大學如CMU、Cornell等等都開始使用Hadoop進行研究和開發。
為了更好和方便地讓程序員使用MapReduce或者類似的并行處理計算框架如Map/Reduce/Merge6,人們在其上架設了一系列的編譯系統,并通過高層的語言把計算任務映射為底層的MapReduce任務。這方面的工作如Yahoo! 在Hadoop上實現的Pig[ C. Olston, B. Reed, U. Srivastava, R. Kumar and A. Tomkins. Pig Latin: A Not-So-Foreign Language for Data Processing. proceedings of SIGMOD, 2008.]、Google實現的Sawzall[ Pike, R., Dorward, S., Griesemer, R., and Quinlan, S. Interpreting the data: Parallel analysis with sawzall. Scientific Programming 13, 4 (2005), 277–298.]等等。
類似系統的開發和研究也層出不窮,如微軟有自己的Dryad5/SCOPE7/DryadLINQ[ Isard, M., Budiu, M., Yu, Y., Birrell, A., and Fetterly, D. Dryad: distributed data-parallel programs from sequential building blocks. proceedings of the ACM SIGOPS, 2007]系列系統。擁有這樣的處理能力無疑成為一個互聯網公司的核心競爭力,可以預見在未來的一段時間里面,還有類似的很多系統和研究出現。
人們在使用Hadoop或者類似的其他并行處理計算框架及其上層語言時,眾多的使用者對底層大規模并行處理計算框架有自己的需求。比如大學或研究機構使用此類框架進行科學計算時,系統的工作負載可能是偏向計算密集型,人們也關心系統對于計算任務的延遲反應;而大型因特網公司如Google、Yahoo!、Microsoft Live Search等的數據中心中,有若干程序員在同時提交計算任務,程序員不但關心計算任務的延遲,還關心整個中心中負載的調度公平性;而對于此類系統的開發和研究人員來說,他們關心系統的吞吐量、系統中各機器的狀態和使用情況等等。所以考慮此類并行處理計算框架特別是MapReduce系統的各項系統指標,并確定評估的程序和方法,對評估類似系統、基于用戶希望的系統設計折衷進行系統之間的比較、改進系統等等有很重要的意義。在這個基礎上如Berkeley也有一些系統測試的工作如分析網絡的性能X-Trace[ R Fonseca, G Porter, RH Katz, S Shenker, I Stoica. X-trace: A pervasive network tracing framework. proceedings of NSDI , 2007.],以及對MapReduce系統和數據庫系統性能評估的討論15。
我們基于MapReduce實現了自己的并行處理計算框架,并在其之上進行了系統的測試和評估。我們提出了測試程序和數據,并基于此在系統中實現了監控和程序性能概要分析框架。通過測試和評估實驗,我們總結了系統的性能指標和觀察到的問題。我們針對其中的單機落后問題,實現并驗證了后備任務策略,并基于此改進系統性能。最后,我們總結并給出了其他工作方向。
論文的剩余部分按如下方式進行組織。第二章對MapReduce的模型和體系結構進行概述,而第三章列出了需要評估的系統目標和我們設計的基準程序和數據集合。為了分析和評估系統,我們在第四章闡述了系統監控框架和程序概要分析的設計和實現細節。之后我們在第五章中列出了實驗結果和給出了實驗的分析,并在針對其中的落后者問題實現了后備任務策略,在第六章中詳細闡述了后備任務策略的實現和實驗評估。我們在第七章中對系統可能的優化方向進行了展望并在第八章中進行了總結,最后是致謝。
MapReduce框架
在這一章里面,我們將簡單介紹MapReduce框架的模型和我們的系統實現。
MapReduce模型介紹
Google的研究人員受到函數式編程語言(functional language)的啟發,在總結大量的大規模分布式處理程序共同特征的基礎上,提出了MapReduce并行程序框架。
MapReduce是一大類大規模并行數據處理程序的抽象。這類計算的輸入是一個(鍵,值)對的集合,輸出也是一個(鍵,值)對的集合。用戶只需要提供兩個操作map和reduce的實現,MapReduce運行時庫就可以自動把用戶程序并行化。
用戶提供Map函數的實現,它接收一個輸入對,產生一組中間結果對。MapReduce庫會把具有相同鍵的所有中間結果對聚合到一起,把他們傳給Reduce函數。用戶提供的Reduce函數,接收中間結果的一個鍵和具有此鍵的一組值,處理這些值,產生若干個(鍵,值)對做為輸出。它們的一般形式如下:[ 楊志豐,GFS與MapReduce的實現研究及其應用,北京大學碩士論文,2008.]
Map (k1, v1) -> list (k2, v2)
Reduce (k2, list (v2)) -> list (v2)
MapReduce模型的最大好處是簡便性,用戶只需要提供這兩個接口就可以處理大規模的數據,而不需要太多分布式計算的實現細節。
系統實現
MapReduce的實時運行主要是為并行化和并發執行服務的。為了盡可能的并行化和擴展系統,MapReduce把輸入的數據分割到多個機器上。中間數據的傳輸和序列化處理等由系統來控制。分割的數據由多個Reduce來處理。這兩個步驟中Map任務和Reduce都可以同時執行,且它們都具有良好的可擴展性,也即可以方便地增加機器增加并發度。
而在系統實現的層面上,系統需要決定底層的各個細節如數據單元的大小、中間數據的處理、內存的緩存多大、排序的方式、各個任務的調度、機器的失敗和容錯處理等等。系統自動的把這些細節都掩蓋,所以對程序員來說,他只需要知道這個編程模型并編寫MapReduce的程序即可。
Google的論文中描述了他們在分布式機群系統上對MapReduce的實現。系統把輸入數據劃分為M份數據片,這些輸入數據片可以在不同的機器上并發的被Map函數處理。所有的中間結果對使用一個分區函數(partitioning function)分為R份。然后,對于每個分區,通過排序把具有相同鍵的所有(鍵,值)對聚合到一起,用Reduce函數處理,最后產生R個輸出文件。R的值和分區函數可以由用戶指定,系統默認的分區函數是hash(key) mod R1。
Google的MapReduce實現是構建在GFS之上的,所有的MapReduce程序的輸入和輸出都是存儲在GFS中的文件。由于GFS中的數據都有多個副本,當執行MapReduce的機群和運行GFS的機群是同一個時,MapReduce庫的調度模塊會盡量把map任務分配到存儲數據的機器上本地運行,這樣可以避免輸入數據的網絡傳輸,極大的提高性能。此外,用戶可以指定函數用來把原始輸入數據轉換為map函數的輸入,用戶也可以指定函數用來把reduce的輸出結果序列化為輸出數據。體系結構圖1如下:
數據的流圖[ Colby Ranger, Ramanan Raghuraman, A. P. G. B. C. K. eva luating mapreduce for multi-core and multiprocessor systems. proceedings of HPCA, 2007]如下,Worker分別執行本地的任務,可能是Map任務、Transfer任務和Reduce任務。整個過程由Master控制和協調調度。
Tplatform的實現
我們實現的類似平臺是Tplatform。我們自己也實現了一個自制的MapReduce,建立在GFS類似的分布式文件系統TFS上[ Zhifeng Yang, Qichen Tu, K. F, L. Z, R. C, B.P. Performance gain with variable chunk size in gfs-like file systems. Journal of Computational Information System 3(2008), 1077-1084]。TFS在設計上與GFS的不同之處在于對底層Chunk大小的設置7。 與Google提供運行時庫然后通過一個二進制程序的多個副本扮演不同角色的方式不同,我們的實現提供的是一個執行MapReduce作業的服務,用戶把編寫好的實現指定接口的動態鏈接庫用系統提供的API提交上來,MapReduce系統就會自動調度和運行相關的任務。服務由一臺主控(Master)機器和若干臺工作機(Worker)組成,Master負責把用戶提交的作業(Job)切分為若干個任務,然后調度他們在各臺工作機上執行。相比提供運行時庫由用戶編譯為一個程序的方式,這樣做的好處是,系統的改進升級對用戶是不可見的。如果系統的實現改變了,只要MapReduce API不改變,用戶無需改變代碼甚至不需要重新編譯生成動態鏈接庫就可以執行MapReduce作業,這給我們未來系統的升級優化帶來了極大的便利。不僅如此,在Google的原始實現中,如果同一個機群有多個作業在同時運行,因為作業由主控程序負責調度但一個作業的主控程序是不知道另一個作業的存在的,所以多個作業之間可能產生資源的互相搶占。而在我們的系統中,一個機群只有一個主控程序,主控程序可以綜合各個作業的情況對所有任務整體進行調度。
這里需要詳細說明我們任務設計的細節。
我們把Worker需要做的任務分成三個類型:Map、Transfer、Reduce,我們把傳輸任務從原來的Reduce中抽離出來并作為一個可以由Worker單獨調度執行的任務。在這里我們對此設計有如下的分析。
在原來的Map、Reduce任務的執行流程和設計下,對于Map執行完生成的中間數據,是由Reduce來到Map機器上通過遠程調用取得。這些有可能出現的場景是很多Reducer同時來一臺Map機器上進行取數據操作,造成Map機器對硬盤的隨機寫,而隨機寫對性能的影響是很大的,這樣的數據傳輸模型可以稱之為“拉”。而我們把傳輸任務獨立開來,由Master調度控制,可以控制Mapper傳輸的時間,同時Reducer在同時接到多個傳輸任務的數據時可以做緩存,避免隨機寫的出現。
此外,我們在Worker端通過心跳線程和Master通信,在執行分配的任務時用Exec方式啟動一個新的進程來執行具體的Map和Reduce任務。而傳輸任務使用啟動線程用Socket進行傳輸。
我們在此基礎上,實現了MapReduce的系統,我們的設計在實現上有很多和Google不同之處,也不同于開源的Hadoop2。在完成原型的開發和測試后,針對性能和系統的評估成為了我們亟待解決的關鍵問題。我們由此開始系統地對MapReduce和類似相關系統進行分析和評估,我們相信對于MapReduce和類似系統的研究工作的下一步將是對此類系統的優化。
所以對當前系統的分析和評估成為關鍵,找到系統使用中的瓶頸所在,針對用戶需求的目標進行改進,都是實際應用中的重要問題。我們在Tplatform的實現基礎上,開發了一系列的基準程序,細致地分析了系統中可能出現的問題。
我們說明和分析表中的數值。
首先,Map和Reduce的任務的選擇度都是1,因為對于PennySort來說,Map做的是把數據簡單地讀入,然后進行傳輸和分割,而對Reduce來說,進行完數據的排序后也只需要把數據簡單地輸出,所以選擇度都是1.
然后,對于傳輸的方式,按記錄的生成原則,可以均稱地進行hash分割。中間數據比初始讀入的數據反而小是因為很多數據Map任務做完后可以在本地直接進行Reduce,利用的數據的空間數據性,所以傳輸數據變小。
最后Reduce任務需要進行排序,系統實現使用快排,復雜度為nO(logn).
系統監控和程序概要分析
更好地理解和監控云計算的基礎設施系統如MapReduce是一個煩人且亟待解決的問題。現有的實現都是比較簡單地記錄系統的相關性能信息,而且并沒有太多關于在此類系統中如何監控和評估的工作。但是在我們的開發和使用過程中,我們發現了系統的性能概要分析很重要,或者說通過更好地理解底層系統,能夠更好地改善和優化現有的系統。例如如下的幾個場景中,我們將說明這一點:
數據中心中的一個程序員向系統提交了一個用高層語言如Pig Latin描述的任務后,他/她可能想知道他的任務做到什么程度。從性能概要分析的角度來考慮任務監控這個問題,任務在多個機器上的性能分布很重要。這樣可以知道任務中最耗時的函數,從來讓程序員可以針對此考慮改進自己的程序,或者在系統對任務的編譯中進行優化。
失效在數據中心里面是正常的1。MapReduce這樣的系統對用戶掩蓋機器的失效,如果機器發生宕機,系統將處理并調度計算重執行;而對于計算任務的失效,處理方式是重新執行,如果多次失效超過一定次數,將放棄執行。這是因為在數據中心中, 很有可能是用戶提交的任務的程序中存在BUG,或者是數據有不滿足格式而導致無法讀入等等。對于需要進行長任務處理的工作來說,在現有系統的實現下,可能是一件極消耗用戶程序員精力的事情。可能的情形是,執行了很久到快結束的時候由于BUG或者存儲的問題導致失敗而最終放棄。 而實時的監控和交互可以部分地解決這個問題,讓用戶及時地知道系統里面發生的情況,對于系統無法做出判斷的事情(程序有錯),交給用戶去解決不失為一個可行的方案。
分布式系統中的一個很重要的措施就是要保證負載均衡,這對于并行計算的框架來說,同樣意義重大。在計算的過程中記錄性能信息和進行監控,可以通知用戶或者系統。通過重新的調度或者其他手段使得負載盡可能均衡。
總之,通過監控和程序的性能概要分析,我們可以讓系統和用戶之間有更多交互。同時給出的數據可以幫助用以評估系統,提供給不同的人如用戶或者系統開發人員分析。
實現細節
我們需要記錄一個子任務的運行時性能概要信息,通過以下的數據結構來實現。
struct ProfileInfo
{
// for map task
int mapFanIn;
int mapFanOut;
int mapRecordNumber;
int localCombineFanIn;
int localCombineFanOut;
int localCombineRecordNumber;
// for reduce task
int reduceFanIn;
int reduceFanOut;
int reduceRecordNumber
// for transfer task
int transferIO;
int transferRecordNumber;
// cost time, by seconds
int taskCostTime;
};
對于Map階段,分別記錄扇入扇出的數據大小、map的記錄個數;以及做localcombine的扇入扇出、記錄個數;對于Reduce階段,記錄扇入扇出的數據大小、reduce的記錄個數;還有傳輸任務的傳輸數據量;最后是各個任務的花費時間。
通過在Worker端執行任務后記錄下任務的性能概要情況,然后通過文件管道傳遞給Worker的心跳進程,然后通過心跳捎帶給Master以供分析。
進行捎帶處理的心跳使用rpc實現,具體實現如下。
先使用ICE的slice描述rpc的接口。
/**
* report to the master the task is successfully completed.
*
* @param taskID
* @param profileInfo, send the profileInfo piggybackly
*/
idempotent void completeTask(Address workerAddress, int taskID, ProfileInfo taskProfile);