后備任務調度策略
問題描述
通過上一章的系統評估和性能分析,我們看到了由于部分落后機器而導致的任務延遲增大問題。在Map或者是Reduce階段,位于某臺機器上的某個子任務,由于各種原因而導致其顯著慢于同一個任務同種類型的其他子任務,從而導致整個任務的延遲顯著增大。
由此系統評估和分析我們可以知道,本系統中的任務延遲存在此性能問題。接下來我們針對此問題改善系統的單任務延遲,并由此說明系統評估在其中起的重要作用。
對于短任務的用戶來說,系統的單任務延遲時間是一個重要的衡量因素。因為短任務的用戶都希望系統能夠快速地對他們提交的任務做出回答,比如在日志分析、系統監控、商務數據分析等等應用中,這樣的任務是很常見的。Google的MapReduce也是主要用于短任務,根據MapReduce的論文,Google在2007年9月中提交的MapReduce任務的平均延遲時間是395秒。延遲時間對于SQL-like的系統和任務來說也很重要,比較在這些系統之上的語言如Pig、SCOPE等等,用它們描述的計算任務基本上都希望有好的延遲。同時,對于EC2這樣的應用來說,單任務的延遲就顯得更加重要,因為EC2用戶的付費是按時間來計算的,如果使用的系統對單任務有更好的延遲,那無疑能夠有顯著的經濟效應。
所以,使用Backup的調度策略,對于落后任務投機地使用后備任務,能夠極大地提交系統的單任務延遲時間,對于短任務來說,尤為如此。而對于較長的任務,后備任務的使用的效果就相對比較小。我們這一章的工作中,也將主要考慮改善短任務的延遲時間。
相關工作
MapReduce
Google的論文中說明了他們在工作中這個問題的狀況。他們觀察到在任務的執行最后過程中,經常有一臺機器在運行這最后的幾個Map或Reduce任務時花費異常長的時間。他們也分析了可能的原因:這臺機器的硬盤狀況不好,可能會導致它的讀寫出現異常;或者是由于調度使得同一臺機器的負載過重而使得原先執行的任務變慢;或者是任務程序中的bug問題等等。通常這樣的落后機器出現的概率是1/100。
他們也使用了一些機制來啟動后備任務。他們的策略是這樣的,當MapReduce的任務快結束時,他們對于還在運行的子任務啟用后備任務,原子任務或者后備任務中的其中一個結束就標記這個子任務結束。他們同時采取了一些措施使得后備任務不會占用系統資源過多。根據文章的實驗,對于sort來說,啟用后備任務后,性能提高了44%。
Hadoop
對于Hadoop2中的實現,針對此問題,分析代碼細節如下。
如果一臺機器空閑的時候,Hadoop將選擇一個任務在其之上運行。選擇的方式如下:首先,失敗的任務優先最高執行,這是Hadoop的錯誤偵察機制,為了找到由于bug或者其他原因不斷失敗的任務并最終放棄;其次,沒有運行過的任務根據調度策略調度到此空閑機器上執行;最后,如果前兩種情況都沒有發生,Hadoop選擇一個正在運行的任務投機執行,而判斷的策略將在下面闡述。
為了挑選出需要投機執行的后備任務,Hadoop使用了一個進程分數的值來進行判斷。進程分數是一個在0到1之間的數值,它是由幾部分組成的。
對于Map任務來說,進程分數是該任務輸入數據占整個任務輸入數據的比值。而對于Reduce任務來說,這個值的獲取分三個階段考慮:
1.copy階段,當Reduce任務正在從Map任務拷貝數據的時候該值計入進程分數,值為copy數據占整個Map數據的比例。
2.sort階段,當Map輸出的數據根據key來排序的時候該值計入進程分數,值為sort數據占合并數據的比例。
3.reduce階段,當用戶定義的reduce函數正在對Map數據進行reduce操作時該值計入進程分數,值為reduce數據占此階段數據的比例。
然后,Hadoop對每種任務根據它們的平均進程分數定義了一個閾值。Hadoop簡單地把閾值設為平均分數減0.2。對于落后者的判斷,Hadoop發現如果一個任務的進程分數低于此類任務的閾值,并且已經運行了至少一分鐘,則這個任務被標記為一個落后任務,需要執行后備任務以加快任務的延遲。
最后,Hadoop使用一個FIFO的隊伍來進行任務的調度。
異構環境中后備任務調度
在OSDI 08的這篇文章中[ Matei Zaharia Andy Konwinski Anthony D. Joseph Randy Katz Ion Stoica. Improving MapReduce Performance in Heterogeneous Environments, proceedings of OSDI, 2008.],作者關注了在異構的環境下后備任務的調度。在日常的應用中,如Amazon的EC2這樣的按需式計算服務,或者是小型機構的機房中,異構的機群和環境都是很常見的。而Hadoop的調度基本上是基于同構的環境來設計的。作者分析了Hadoop的設計并給異構的環境中設計了新的調度策略。最后在200臺機器的EC2異構機群中,提高了2倍的性能。
實現細節
首先,我們考慮了Tplatform的實際情況:主要用于本實驗室小組的搜索引擎的研究信息處理,應用程序的數據基本上是與機群規模匹配的數據集合,而任務集合基本上是搜索引擎相關的如網頁消重、網頁的實體分析、數據的挖掘等等。而機群的環境都是同構的,所以我們考慮的是怎么在同構的環境在針對這些中短任務進行后備任務策略的優化。如下我們將細致討論實現細節。
整體框架
我們從高層邏輯來闡述實現的細節。首先需要在Master端解決落后者的判定問題,對于一個正在Worker端執行的子任務,其是否是落后者,需要Master綜合各方面信息分析后根據判定策略給出決策;其次,一旦該子任務被判定為落后者,具體的處理措施分析在Master和Worker端進行,需要保證Master端記錄任務的數據結構的一致性和效率,以免Master變成整個系統的性能瓶頸,此外還需要保證Worker端的后備任務不會過多成為完全的冗余,避免系統由于后備任務策略的開銷反而低效。
設計系統在執行此策略的流程如下:
(1)如果有一個Worker的機器出于空閑狀態,它向Master匯報為沒有任務在其上執行,并且Master也沒有分配新的任務給它,此時Master啟動后備任務的策略,決定是否在此機器上執行后備任務。
(2)Master根據落后者的判定策略,選擇一個合適的后備任務在該Worker上執行,或者Master根據策略認為當前系統不需要后備任務,這可能是因為沒有落后者或者是系統認為此時的系統負載不合適執行過多的后備任務?傊,此處將根據策略決定是否執行,原則是能夠改善系統的性能同時保證開銷不會過大。
(3)系統進行處理,分別在Master端和Worker端做出反應,直至完成。
我們在下面的小節中詳細闡述這三個步驟中的細節。
落后者判定策略
落后者的判定是這個系統優化中最重要的部分,如果系統誤將非落后任務判定為落后任務,那將造成不必要的開銷最終導致性能變差;如果系統沒有識別出落后者,那潛在的性能優化空間并沒有得到填充,那優化并沒有得到充分發揮。而如何決策使得上述兩個方面的折衷能夠得到同時滿足,一方面是根據最終系統運行的實際應用程序而定的,另一方面是根據系統的實際狀況而決定的。根據上述分析,我們的應用場景和系統是這樣的:在同構的系統環境下面對中短任務的計算。
首先,我們分析導致落后者出現的原因和表現行為。落后者可能因為多種原因出現,在Google的MapReduce論文中已經有所提及。無論這些原因是什么,落后者出現后,它們的表現行為都是落后于正常子任務,根據短板效應導致最后整個任務需要等待這個最慢的子任務才能算結束,從而增大了延遲。所以對于落后者的判定,要做的就是把它們和普通的正常子任務區分開來,知道哪些子任務的正常的,哪些是落后的。
然后,對于子任務耗時的信息和完成進度和來說,我們可以分別在Master和Worker端獲取這些信息。
最后,我們在Master匯總這兩方面的信息,確定性地給出判定的結果。
Master端:
我們有一個假設,那就是對于同一個任務的同種類型的子任務(Map或者Reduce,Transfer子任務會在Map做完后開始,我們不考慮對Transfer子任務投機地進行后備執行)可能會在相同的時間內完成。此假設認為1.機群環境同構;2.數據基本上均勻分布;3.計算時間和IO時間不會因為數據的值而發生劇烈的變化。如果這一系列假設不成立時,就有可能出現落后者問題。我們將在下面根據實驗說明在測試的基準程序集合中,這樣的應用程序負載下此假設是成立的,偶爾會因為各種原因使得假設失敗,這些情況就是落后者,需要做后備任務投機執行以改善延遲。
所以,我們將根據系統監控和程序的概要分析記錄下任務的Map子任務和Reduce子任務的平均完成時間,如果一個正在執行的子任務比同任務同類型的其他子任務顯著地慢,那它可被判定為落后者。
Worker端:
對于正在Worker端執行的子任務來說,Worker具有知道該子任務進行到什么地步的能力。我們用進度分數來描述Worker正在執行的任務處于什么狀態,進度分數是一個從0到1的連續值。
在Map階段,由于TFS的默認設置是一個chunk大小為64M,所以默認的Map輸入數據大小為64M。我們根據Mapper讀入的數據大小來確定進度分數。
在Reduce階段,需要先進行一個Sort的過程,因為Sort的完整性,我們簡化了此過程中進度分數記錄。也就是說,在Sort的過程中,Worker認為Sort還沒有完成,進度為0。整個Reduce階段的進度分數由是否Sort完成和reduce進度兩部分組成,這兩部分的權值我們進行了簡單的權重分配。Sort階段在大數據時會進行多路歸并的外排,而reduce階段基本上是IO占主要的時間。同時,我們根據實驗的經驗設置兩部分權重比例為1:1。最后根據權重計算出整個Reduce階段目前的進度分數。
在Worker端記錄下各階段的進度分數后,在本地由文件管道傳遞給和Master端通信的心跳進程,再由此進程通過心跳把進度和任務的相關情況捎帶傳給Master。
綜合:
對于需要判斷的正在執行的子任務來說,一方面,Master通過記錄的以前執行的同類型子任務的歷史信息可以知道平均耗時,一方面Master通過Worker每次心跳傳來的實時進度可以知道此子任務進行到什么進度。如果該子任務已經顯著超過平均耗時水平或者根據進度明顯慢于同類型任務,那即可判定該子任務為落后者。
系統處理過程
從兩方面來描述這一過程中系統的處理。
首先是Master端,Master對一個子任務進行是否為落后者的判定后,需要修改Master端的數據結構,以進行處理。
如果不是落后者,那Master不作處理。
如果落后者判定成功,Master修改數據結構以記錄原始的子任務和新的后備子任務。初始化后備子任務的數據結構,在和指定的Worker發送消息時將此新命令發出。
在命令發出之后,Master還需要處理此子任務的結束。如果原始子任務和后備子任務其中一個完成,Master即認為此子任務結束,并發停止執行的命令給還未結束的Worker,以免浪費資源。
然后是Worker端的處理。在這里的實現中,Worker對后備任務這一策略是透明的,如果Master發命令給Worker要求做一個任務,原始任務和后備任務在Worker看來是一樣的。
數據結構細節
同樣地,我們從Master和Worker兩方面來描述數據結構的實現細節。
對于Master,在保證不規模修改實現接口的情況下,進行了如下的實現。
TaskInfo結構:TaskInfo是記錄子任務信息的數據結構,在之前的實現上添加了兩個域,用以標明和區分后備子任務。
/// if it is backup task: MAP/REDUCE only
bool isBackup;
/// for backup task: original task id
int32_t originalTaskId;
一個是標明此任務是否為后備任務,一個是記錄原始任務的ID。
在TaskManager中添加兩個輔助的數據結構,用來在添加后備任務以及判定完成情況時處理。
/// task status map: record status of map/reduce tasks with each job, task is completed when either backup task or orginal task completed
std::map<int32_t, std::map<int32_t, bool> > m_jobid2tasksIsCompleted;
/// backup task map: one task only have a single backup task at one time
std::map<int32_t, bool> m_taskid2backing;
第一個map是記錄各個job中的task是否完成,如果有后備子任務,只要原始任務和后備子任務中其中一個完成就算此任務完成。
第二個map是記錄各個任務是否有后備任務,在這里使用map是因為后備任務的查看和處理過程中需要經?丛既蝿盏臓顟B,所以使用map避免Master端大規模的掃描任務隊列,成為性能瓶頸。此外,這里的一對一映射保證了一個原始子任務最多只有一個后備子任務,這是為了防止造成多個后備任務出現而造成開銷太大,或者是后備子任務再次成為落后者引起級聯反饋的效果后浪費系統的資源。
后備任務策略評估實驗
機群配置和任務準備
我們的機群配置如下。
我們在后備任務策略的評估實驗中使用了一臺Master、十四臺Worker組成的MapReduce系統集群。所有的機器都是Dell 2850服務器,每臺機器配置為2顆Intel Xeon處理器,2GB內存,6個7200 rpm SCSI硬盤組成一個RAID-0的邏輯卷。這些機器存放在兩個機架中,各有一臺Dell 2748 1Gbps交換機,機器通過一個1Gbps的全雙工以太網卡與交換機相連接,兩個機架通過一個Cisco千兆路由器鏈接。
我們實驗使用的是PennySort程序來進行評估。生成了50M的Record,一共是4.8G大小的數據。
任務耗時趨同性分析
我們首先分析在5.3.2節的設計中做出的假設在我們的環境和工作負載下是否合理。
我們的假設是,對于同一個任務的同種類型的任務基本上會在相同的時間內完成。
我們對與選定的基準程序集合的任務集合的耗時情況進行分析如下表,可以看到它們耗時的標準差和均值的比例并不高,說明這些任務基本上是在相同的時間內完成的。
系統優化方向
根據我們對系統的分析和評估,以及我們在Tplatform平臺的日常使用中的經驗,除了已經實現的后備任務策略,我們針對分析得出的其他系統優化方向,進行分析和探討。
網絡傳輸問題
在MapReduce的模型和體系結構的實現中,需要進行網絡的傳輸任務,也就是在reduce階段需要把map生成的數據傳到reduce對應的機器上。在這個階段很多應用程序可能生成大量的中間數據,而經過我們的之前的分析,網絡會成為MapReduce系統的性能瓶頸。所以,如果優化網絡的傳輸,減少不必要的中間數據,也是一個直接和實際的系統優化問題。
在網絡傳輸的問題上,可以有不同的研究方向:
如何優化機群的網絡傳輸,考慮在此環境下機器的路由和網絡層的優化問題。已經有一些研究工作在這個方向上進行,如DCell[ Chuanxiong Guo etc., DCell: A Scalable and Fault-Tolerant Network Structure for Data Centers, proceedings of SIGCOMM, 2008.]、Fat Tree[ Al-Fares, Loukissas, Vahdat, A Scalable, Commodity Data Center Network Architecture, proceedings of SIGCOMM, 2008.]。
如何通過優化負載和任務等不同粒度上的調度,來優化系統的網絡傳輸。
如何有效地利用應用程序的數據特征,使得系統可以充分利用數據的時間和空間局部性,從而減少產生的中間數據,最后達到優化網絡傳輸問題的目的。
此外,還有其他方向和角度來考慮這個問題。從我們的日常使用經驗來看,網絡通常是比硬盤I/O更容易成為瓶頸。
增加用戶和系統的交互
在MapReduce的體系框架下,系統對用戶掩蓋了很多系統細節,同時簡單的計算模型也使得大量的并行細節對用戶來說并不透明。Google對于MapReduce的此設計目的在于掩蓋細節,使得用戶只是簡單的實現Map函數和Reduce函數,就可以進行大規模的數據處理。但是我們發現,在實際使用中過于簡單的模型和不透明也會帶來性能問題。用戶對系統的不了解可能造成重復計算或者浪費用戶的時間,用戶白白等待無效的計算等等情況。
所以此方向我們需要研究的是,哪些系統實現細節是有必要對用戶掩蓋的,哪些系統實現細節如果用戶知道能夠使得一個專家級的用戶更好地控制系統和對應用程序進行優化。同時,我們還需要研究的是,在應用程序層面上來看,哪些信息如果讓系統知道,能夠更好更高效地執行應用程序;此外,為了更好地讓系統了解應用程序,系統應該提供什么樣的接口或者配置讓用戶方便地和系統進行交互。
總之,我們認為,系統和用戶不應該是孤立的,系統對用戶也不是完全透明的,同時系統對用戶的應用程序也不是一無所知的。系統應該多了解用戶的行為和應用程序的特點,同時用戶也需要更了解系統。用戶和系統之間的交互應該增加。
從數據庫領域看系統性能的其他提升空間
關于MapReduce和分布式數據庫到底有什么不同,是目前人們爭論的一個焦點[ Andrew Pavlo. Erik Paulson. Alexander Rasin etc., A Comparison of Approaches to Large-Scale Data Analysis, proceedings of SIGMOD, 2009.]。數據庫通過很多年的發展對數據的存儲和計算,以及用戶使用的語言等等都做了大量的研究并發展了很多成熟的技術。但是在MapReduce這樣類似的“云計算”環境下,數據庫的技術是否在MapReduce系統的研究中可以參考和借鑒,哪些可以參考和借鑒,什么樣的任務是分布式數據庫難以勝任的,什么樣的任務是MapReduce難以勝任的,他們兩種體系的計算引擎的本質區別到底是什么?這些都是亟待解決的問題,也是人們關心和爭論的焦點。
我們針對其中的一些問題,可以進行研究。
比如索引的使用,在分布式數據庫中是很正常和成熟的技術。MapReduce的系統中是不支持索引的,對于一些任務來說,如果使用MapReduce的框架來進行處理,將是比較低效的21。但是如何在MapReduce這樣的體系下使用索引還是一個需要研究的問題。
系統易用性
我們通過日常的使用發現,MapReduce程序的編寫還是過于底層,通常一些簡單的任務如日志分析等等需要花費比較長的時間來編寫。對記錄層級的數據進行直接處理和使用文件系統作為底層存儲也會對易用性造成一些問題,現在有一些高層語言來處理這些問題,如Pig Latin3等,但是系統的易用性和語言的問題仍然是一個需要不斷研究的問題。
總結
我們介紹了我們的Tplatform的設計特點,包括TFS和MapReduce,并探討了由于我們設計的不同導致系統的性能優化和設計折衷。
然后我們分析了系統的評估目標,包括從單任務延遲、總機器時間、平均結束時間、加速比、公平性、故障恢復穩定性等多個方面來考察系統的性能和其他各方面表現。
同時,我們并設計了一系列的基準程序和數據,從MapReduce的系統體系結構出發,考慮不同的程序和數據如計算密集型、I/O密集型、網絡密集型等來衡量MapReduce或者類似系統的上述評估目標。
為了達到分析和評估系統的目的,我們在系統中設計了性能的監控和程序概要分析框架,用來收集系統的相關表現信息。通過對收集的數據進行分析,我們可以得到實驗的結果,并對系統進行分析和給出改善意見。我們還在實驗中分析了程序概要分析框架的開銷,實驗結果可以看到開銷不大。
我們設計了一系列的實驗來對我們的Tplatform進行評估,分析系統中的實際情況。我們發現網絡常常成為MapReduce和類似系統的瓶頸,落后者對系統的延遲有很大的影響,系統對短任務的調度并不公平等等問題。
通過實驗和分析,我們發現了當前系統的這些問題,然后我們選取了落后者的問題進行改進。我們針對此問題實現了后備任務策略,落后者會顯著地造成延遲增大的性能問題。我們的模擬實驗表明,我們的后備任務能夠有效地改善這一問題。
最后我們總結了在分析和日常使用中發現的問題,并提出了一系列的未來工作方向。