本期概覽:
ReceiverTracker架構設計
消息循環系統
python 筆記、ReceiverTracker具體的實現
Spark Streaming作為Spark Core基礎 架構之上的一個應用程序,其中的ReceiverTracker接收到數據之后,具體該怎么進行數據處理呢?
為了弄清楚這個問題,首先,我們打開源碼
找到ReceiverSupervisorImpl這個類
Apache Spark。
從源碼中可以看出,寫數據是通過ReceivedBlockHandler的對象receivedBlockHandler寫的。寫的過程中有倆種方式,一種是基于WAL方式進行容錯寫。一種是直接寫(相對不安全)。如下圖所示
?
github記筆記?然后存儲數據完成后并報告給Driver,以便Driver對元數據進行存儲,如下所示
用于匯報給Driver的消息類、如下圖所示
上圖談到了Record,要注意到,一般專業的描述處理的數據的大小的時候,應該用多少條記錄來描述更科學,一般說數據規模達到多少多少百億條記錄,而不是說數據規模達到多少PB的數據規模,這樣不是很科學,因為記錄可能有很多字段,比如說,1PB的數據,5個字段,和5PB的數據1個字段是差不多的。所以1PB的數據規模未必比5PB的數據規模體現出一個大數據引擎的數據處理能力。也比如說,有些數據是視頻或者音頻。更不適合說多少個PB來描述規模大小。
上圖說明ReceiverSupervisorImpl中有ReceiverTracker的通信體,能進行與ReceiverTracker的通信
并且ReceiverSupervisorImpl將數據的元數據信息匯報給ReceiverTracker
于是,我們進入ReceiverTracker這個類,這個類是整個流處理數據管理的中心。
?
ReceiverTracker中有endpoint通信體,這個通信體接收來自ReceiverSuperVisorImpl的元數據的數據匯報。
?
?
接下來,我們再進入ReceiverTracker本身,從整體上認識ReceiverTracker。
記錄Receiver的三種狀態,分別為非活躍狀態,正在執行調度任務狀態,活躍狀態
密封關鍵字,說明所有的子類都密封在這里,方便管理
/**
?* This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver.
?*/
這個消息用來告知為receiver啟動一個job,?ReceiverTracker有很多這樣的case class用于通信。
private[streaming] case class RestartReceiver(receiver: Receiver[_])
? extends ReceiverTrackerLocalMessage
再比如此類相同的消息
/**
?* This message will trigger ReceiverTrackerEndpoint to send stop signals to all registered
?* receivers.
?*/
private[streaming] case object StopAllReceivers extends ReceiverTrackerLocalMessage
注意:param skipReceiverLaunch Do not launch the receiver. This is useful for testing.,如下圖
簡單的來說,ReceiverTracker可以簡單的說包括Receiver的數據的啟動接收,管理,回收三個過程。
?
事先來個預告,我們將把Streaming流處理的所有的代碼一行行的過濾,講整個streaming通過一滴水看世界。
?
所有的輸入流都會交給grapx對象,因為該對象會將所有的待調度的數據統一調度。
內部還有一個成員叫做ReceiverBlockTracker
ListenerBus非常的重要,后續我們會重點分析ListenerBus的源代碼,它在監控層面起著重要的作用。
在這里,可以看出ReceiverTracker的狀態有如下的4種狀態,分別為
初始化,開始,正在停止中,停止了。
接收到ReceiverSuperVisorImpl遠程發送過來的消息之后進行處理的過程在此。
這也是今天的重點之一。
先寫日志后再進行下一步操作,這里是出于容錯的原因考慮的。
注意:這里如果指定了checkpoint目錄的話,才會使得isWriteAheadLogEnabled為true.
ReceivedBlockTrackerLogEvent其實就是元數據信息。
用一個HashMap結構將Stream 與 BlockQueue中的Block一一對應,可謂是真的巧妙到了極點。
?
再回到我們的消息通信層面。
回復對方,告知對方,addBlock成功。并且保存有數據的元數據信息。
ReceivedBlockTracker類的主要的任務在于將Block分配給沒有分配Block的Stream batch。
這是具體分配Block給batch的代碼。
這里說明具體的分配是以batch time為單位分配的.
再次看看消息通信體。
這里說啟動所有的Receiver.
啟動所有的receiver
這樣,整個數據接收的環節就打通了。
最后做點補充:
該階段是CleanupOldBlocks階段,此時將發送消息給ReceiverSuperVisorImpl,從而讓它執行cleanUpOldBlocks方法。
/** Update a receiver's maximum ingestion rate */
最后stopAllReceivers,結束了。
?