2Driver HA
由于流計算系統(tǒng)是長期運(yùn)行、且不斷有數(shù)據(jù)流入,因此其Spark守護(hù)進(jìn)程(Driver)的可靠性至關(guān)重要,它決定了Streaming程序能否一直正確地運(yùn)行下去。
Driver實現(xiàn)HA的解決方案就是將元數(shù)據(jù)持久化,以便重啟后的狀態(tài)恢復(fù)。如圖一所示,Driver持久化的元數(shù)據(jù)包括:
Block元數(shù)據(jù)(圖1中的綠色箭頭):Receiver從網(wǎng)絡(luò)上接收到的數(shù)據(jù),組裝成Block后產(chǎn)生的Block元數(shù)據(jù);
Checkpoint數(shù)據(jù)(圖1中的橙色箭頭):包括配置項、DStream操作、未完成的Batch狀態(tài)、和生成的RDD數(shù)據(jù)等;
Driver失敗重啟后:

恢復(fù)計算(圖2中的橙色箭頭):使用Checkpoint數(shù)據(jù)重啟driver,重新構(gòu)造上下文并重啟接收器。
恢復(fù)元數(shù)據(jù)塊(圖2中的綠色箭頭):恢復(fù)Block元數(shù)據(jù)。

恢復(fù)未完成的作業(yè)(圖2中的紅色箭頭):使用恢復(fù)出來的元數(shù)據(jù),再次產(chǎn)生RDD和對應(yīng)的job,然后提交到Spark集群執(zhí)行。
通過如上的數(shù)據(jù)備份和恢復(fù)機(jī)制,Driver實現(xiàn)了故障后重啟、依然能恢復(fù)Streaming任務(wù)而不丟失數(shù)據(jù),因此提供了系統(tǒng)級的數(shù)據(jù)高可靠。
可靠的上下游IO系統(tǒng)
流計算主要通過網(wǎng)絡(luò)socket通信來實現(xiàn)與外部IO系統(tǒng)的數(shù)據(jù)交互。由于網(wǎng)絡(luò)通信的不可靠特點(diǎn),發(fā)送端與接收端需要通過一定的協(xié)議來保證數(shù)據(jù)包的接收確認(rèn)和失敗重發(fā)機(jī)制。
不是所有的IO系統(tǒng)都支持重發(fā),這至少需要實現(xiàn)數(shù)據(jù)流的持久化,同時還要實現(xiàn)高吞吐和低時延。在SparkStreaming官方支持的data source里面,能同時滿足這些要求的只有Kafka,因此在最近的SparkStreaming release里面,也是把Kafka當(dāng)成推薦的外部數(shù)據(jù)系統(tǒng)。
除了把Kafka當(dāng)成輸入數(shù)據(jù)源(inbound data source)之外,通常也將其作為輸出數(shù)據(jù)源(outbound data source)。所有的實時系統(tǒng)都通過Kafka這個MQ來做數(shù)據(jù)的訂閱和分發(fā),從而實現(xiàn)流數(shù)據(jù)生產(chǎn)者和消費(fèi)者的解耦。
一個典型的企業(yè)大數(shù)據(jù)中心數(shù)據(jù)流向視圖如圖3所示:

除了從源頭保證數(shù)據(jù)可重發(fā)之外,Kafka更是流數(shù)據(jù)Exact Once語義的重要保障。Kafka提供了一套低級API,使得client可以訪問topic數(shù)據(jù)流的同時也能訪問其元數(shù)據(jù)。SparkStreaming每個接收的任務(wù)都可以從指定的Kafka topic、partition和offset去獲取數(shù)據(jù)流,各個任務(wù)的數(shù)據(jù)邊界很清晰,任務(wù)失敗后可以重新去接收這部分?jǐn)?shù)據(jù)而不會產(chǎn)生“重疊的”數(shù)據(jù),因而保證了流數(shù)據(jù)“有且僅處理一次”。
可靠的接收器
在Spark 1.3版本之前,SparkStreaming是通過啟動專用的Receiver任務(wù)來完成從Kafka集群的數(shù)據(jù)流拉取。
Receiver任務(wù)啟動后,會使用Kafka的高級API來創(chuàng)建topicMessageStreams對象,并逐條讀取數(shù)據(jù)流緩存,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務(wù)。
由于Receiver任務(wù)存在宕機(jī)風(fēng)險,因此Spark提供了一個高級的可靠接收器-ReliableKafkaReceiver類型來實現(xiàn)可靠的數(shù)據(jù)收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批數(shù)據(jù)持久化到磁盤后,更新topic-partition的offset信息,再去接收下一批Kafka數(shù)據(jù)。萬一Receiver失敗,重啟后還能從WAL里面恢復(fù)出已接收的數(shù)據(jù),從而避免了Receiver節(jié)點(diǎn)宕機(jī)造成的數(shù)據(jù)丟失(以下代碼刪除了細(xì)枝末節(jié)的邏輯):

啟用WAL后,雖然Receiver的數(shù)據(jù)可靠性風(fēng)險降低了,但卻由于磁盤持久化帶來的開銷,系統(tǒng)整體吞吐率會明顯下降。因此,最新發(fā)布的Spark 1.3版本,SparkStreaming增加了使用Direct API的方式來實現(xiàn)Kafka數(shù)據(jù)源的訪問。
引入了Direct API后,SparkStreaming不再啟動常駐的Receiver接收任務(wù),而是直接分配給每個Batch及RDD最新的topic partition offset。job啟動運(yùn)行后Executor使用Kafka的simple consumer API去獲取那一段offset的數(shù)據(jù)。
這樣做的好處不僅避免了Receiver宕機(jī)帶來數(shù)據(jù)可靠性的風(fēng)險,也由于避免使用ZooKeeper做offset跟蹤,而實現(xiàn)了數(shù)據(jù)的精確一次性(以下代碼刪除了細(xì)枝末節(jié)的邏輯):

預(yù)寫日志 Write Ahead Log
Spark 1.2開始提供預(yù)寫日志能力,用于Receiver數(shù)據(jù)及Driver元數(shù)據(jù)的持久化和故障恢復(fù)。WAL之所以能提供持久化能力,是因為它利用了可靠的HDFS做數(shù)據(jù)存儲。
SparkStreaming預(yù)寫日志機(jī)制的核心API包括:
管理WAL文件的WriteAheadLogManager
讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader
基于WAL的RDD:WriteAheadLogBackedBlockRDD
基于WAL的Partition:WriteAheadLogBackedBlockRDDPartition
以上核心API在數(shù)據(jù)接收和恢復(fù)階段的交互示意圖如圖4所示。

從WriteAheadLogWriter的源碼里可以清楚看到,每次寫入一塊數(shù)據(jù)buffer到HDFS后都會調(diào)用flush方法去強(qiáng)制刷入磁盤,然后才去取下一塊數(shù)據(jù)。因此receiver接收的數(shù)據(jù)是可以保證持久化到磁盤了,因而做到較好的數(shù)據(jù)可靠性。

結(jié)束語
得益于Kafka這類可靠的data source以及自身的checkpoint/WAL等機(jī)制,SparkStreaming的數(shù)據(jù)可靠性得到了很好的保證,數(shù)據(jù)能保證“至少一次”(at least once)被處理。但由于其outbound端的一致性實現(xiàn)還未完善,因此Exact once語義仍然不能端到端保證。SparkStreaming社區(qū)已經(jīng)在跟進(jìn)這個特性的實現(xiàn)(SPARK-4122),預(yù)計很快將合入trunk發(fā)布。