一、簡介
Apache Kafka 是一個分散式的流處理平臺(分散式的基於釋出/訂閱模式的訊息佇列【Message Queue】)
流處理平臺有以下3個特性:
可以讓你釋出和訂閱流式的記錄。這一方面與訊息佇列或者企業訊息系統類似。
可以儲存流式的記錄,並且有較好的容錯性。
可以在流式記錄產生時就進行處理。
1。1 訊息佇列的兩種模式
1。1。1 點對點模式
生產者將訊息傳送到queue中,然後消費者從queue中取出並且消費訊息。訊息被消費以後,queue中不再儲存,所以消費者不可能消費到已經被消費的訊息。Queue支援存在多個消費者,但是對一個訊息而言,只能被一個消費者消費。
1。1。2 釋出/訂閱模式
生產者將訊息釋出到topic中,同時可以有多個消費者訂閱該訊息。和點對點方式不同,釋出到topic的訊息會被所有訂閱者消費。
1。2 Kafka 適合什麼樣的場景
它可以用於兩大類別的應用:
構造實時流資料管道,它可以在系統或應用之間可靠地獲取資料。(相當於message queue)。
構建實時流式應用程式,對這些流資料進行轉換或者影響。(就是流處理,透過kafka stream topic和topic之間內部進行變化)。
為了理解Kafka是如何做到以上所說的功能,從下面開始,我們將深入探索Kafka的特性。
首先是一些概念:
Kafka作為一個叢集,執行在一臺或者多臺伺服器上。
Kafka 透過 topic 對儲存的流資料進行分類。
每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
1。3 主題和分割槽
Kafka的訊息透過主題(Topic)進行分類,就好比是資料庫的表,或者是檔案系統裡的資料夾。主題可以被分為若干個分割槽(Partition),一個分割槽就是一個提交日誌。訊息以追加的方式寫入分割槽,然後以先進先出的順序讀取。注意,由於一個主題一般包含幾個分割槽,因此無法在整個主題範圍內保證訊息的順序,但可以保證訊息在單個分割槽內的順序。主題是邏輯上的概念,在物理上,一個主題是橫跨多個伺服器的。
Kafka 叢集保留所有釋出的記錄(無論他們是否已被消費),並透過一個可配置的引數——保留期限來控制(可以同時配置時間和訊息大小,以較小的那個為準)。
舉個例子, 如果保留策略設定為2天,一條記錄釋出後兩天內,可以隨時被消費,兩天過後這條記錄會被拋棄並釋放磁碟空間。
有時候我們需要增加分割槽的數量,比如為了擴充套件主題的容量、降低單個分割槽的吞吐量或者要在單個消費者組內執行更多的消費者(因為一個分割槽只能由消費者組裡的一個消費者讀取)。從消費者的角度來看,基於鍵的主題新增分割槽是很困難的,因為分割槽數量改變,鍵到分割槽的對映也會變化,所以對於基於鍵的主題來說,建議在一開始就設定好分割槽,避免以後對其進行調整。
(注意:不能減少分割槽的數量,因為如果刪除了分割槽,分割槽裡面的資料也一併刪除了,導致資料不一致。如果一定要減少分割槽的數量,只能刪除topic重建)
1。4 生產者和消費者
生產者(釋出者)
建立訊息,一般情況下,一個訊息會被髮布到一個特定的主題上。生產者在預設情況下把訊息均衡的分佈到主題的所有分割槽上,而並不關心特定訊息會被寫入哪個分割槽。不過,生產者也可以把訊息直接寫到指定的分割槽。這通常透過訊息鍵和分割槽器來實現,分割槽器為鍵生成一個雜湊值,並將其對映到指定的分割槽上。生產者也可以自定義分割槽器,根據不同的業務規則將訊息對映到分割槽。
消費者(訂閱者)
讀取訊息,消費者可以訂閱一個或者多個主題,並按照訊息生成的順序讀取它們。消費者透過檢查訊息的偏移量來區分已經讀取過的訊息。偏移量是一種元資料,它是一個不斷遞增的整數值,在建立訊息時,kafka會把它新增到訊息裡。在給定的分割槽裡,每個訊息的偏移量都是唯一的。消費者把每個分割槽最後讀取的訊息偏移量儲存在zookeeper或者kafka上,如果消費者關閉或者重啟,它的讀取狀態不會丟失。
消費者是消費者組的一部分,也就是說,會有一個或者多個消費共同讀取一個主題。
消費者組保證每個分割槽只能被同一個組內的一個消費者使用。如果一個消費者失效,群組裡的其他消費者可以接管失效消費者的工作。
1。5 broker和叢集
broker:一個獨立的kafka伺服器被稱為broker。broker接收來自生產者的訊息,為訊息設定偏移量,並提交訊息到磁碟儲存。broker為消費者提供服務,對讀取分割槽的請求作出相應,返回已經提交到磁碟上的訊息。
叢集:交給同一個zookeeper叢集來管理的broker節點就組成了kafka的叢集。
broker是叢集的組成部分,每個叢集都有一個broker同時充當叢集控制器的角色。控制器負責管理工作,包括將分割槽分配給broker和監控broker。在broker中,一個分割槽從屬於一個broker,該broker被稱為分割槽的首領。一個分割槽可以分配給多個broker(Topic設定了多個副本的時候),這時會發生分割槽複製。如下圖:
broker如何處理請求:
broker會在它所監聽的每個埠上執行一個Acceptor執行緒,這個執行緒會建立一個連線並把它交給Processor執行緒去處理。Processor執行緒(也叫網路執行緒)的數量是可配的,Processor執行緒負責從客戶端獲取請求資訊,把它們放進請求佇列,然後從響應佇列獲取響應資訊,併發送給客戶端。如下圖所示:
生產請求和獲取請求都必須傳送給分割槽的首領副本(分割槽Leader)。
如果broker收到一個針對特定分割槽的請求,而該分割槽的首領在另外一個broker上,那麼傳送請求的客戶端會收到一個“非分割槽首領”的錯誤響應。Kafka客戶端要自己負責把生產請求和獲取請求傳送到正確的broker上。
客戶端如何知道該往哪裡傳送請求呢?客戶端使用了另外一種請求型別——元資料請求。這種請求包含了客戶端感興趣的主題列表,伺服器的響應訊息裡指明瞭這些主題所包含的分割槽、每個分割槽都有哪些副本,以及哪個副本是首領。元資料請求可以發給任意一個broker,因為所有的broker都快取了這些資訊。客戶端快取這些元資料,並且會定時從broker請求重新整理這些資訊。此外如果客戶端收到“非首領”錯誤,它會在嘗試重新發送請求之前,先重新整理元資料。
1。6 Kafka 基礎架構
二、Kafka架構深入
2。1 Kafka工作流程及檔案儲存機制
2。1。1 工作流程
Kafka中訊息是以topic進行分類的,生產者生產訊息,消費者消費訊息,都是面向topic的。
Topic是邏輯上的概念,而partition(分割槽)是物理上的概念,每個partition對應於一個log檔案,該log檔案中儲存的就是producer生產的資料。Producer生產的資料會被不斷追加到該log檔案末端,且每條資料都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到哪個offset,以便出錯恢復時,從上次的位置繼續消費。
2。1。2 檔案儲存機制
索引檔案和日誌檔案命名規則:每個 LogSegment 都有一個基準偏移量,用來表示當前 LogSegment 中第一條訊息的 offset。偏移量是一個 64位的長整形數,固定是20位數字,長度未達到,用 0 進行填補。如下圖所示:
查詢message的流程(比如要查詢offset為170417的message):
首先用二分查詢確定它是在哪個Segment檔案中,其中0000000000000000000。index為最開始的檔案,第二個檔案為0000000000000170410。index(起始偏移為170410+1 = 170411),而第三個檔案為0000000000000239430。index(起始偏移為239430+1 = 239431)。所以這個offset = 170417就落在第二個檔案中。其他後續檔案可以依此類推,以起始偏移量命名並排列這些檔案,然後根據二分查詢法就可以快速定位到具體檔案位置。
用該offset減去索引檔案的編號,即170417 - 170410 = 7,也用二分查詢法找到索引檔案中等於或者小於7的最大的那個編號。可以看出我們能夠找到[4,476]這組資料,476即offset=170410 + 4 = 170414的訊息在log檔案中的偏移量。
開啟資料檔案(0000000000000170410。log),從位置為476的那個地方開始順序掃描直到找到offset為170417的那條Message。
2。1。3 資料過期機制
2。2 Kafka生產者
2。2。1 分割槽策略
為什麼要分割槽
多Partition分散式儲存,利於叢集資料的均衡。
併發讀寫,加快讀寫速度。
加快資料恢復的速率:當某臺機器掛了,每個Topic僅需恢復一部分的資料,多機器併發。
分割槽的原則
指明partition的情況下,使用指定的partition;
沒有指明partition,但是有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值;
既沒有指定partition,也沒有key的情況下,第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與topic可用的partition數取餘得到partition值,也就是常說的round-robin演算法。
2。2。2 資料可靠性保證
kafka提供了哪些方面的保證
kafka可以保證分割槽訊息的順序。如果使用同一個生產者往同一個分割槽寫入訊息,而且訊息B在訊息A之後寫入,那麼kafka可以保證訊息B的偏移量比訊息A的偏移量大,而且消費者會先讀取到訊息A再讀取訊息B。
只有當訊息被寫入分割槽的所有副本時,它才被認為是“已提交”的。生產者可以選擇接收不同型別的確認,比如在訊息被完全提交時的確認、在訊息被寫入分割槽首領時的確認,或者在訊息被髮送到網路時的確認。
只要還有一個副本是活躍的,那麼已經提交的資訊就不會丟失。
消費者只能讀取到已經提交的訊息。
複製
Kafka的複製機制和分割槽的多副本架構是kafka可靠性保證的核心。把訊息寫入多個副本可以使kafka在發生奔潰時仍能保證訊息的永續性。
kafka的topic被分成多個分割槽,分割槽是基本的資料塊。每個分割槽可以有多個副本,其中一個是首領。所有事件都是發給首領副本,或者直接從首領副本讀取事件。其他副本只需要與首領副本保持同步,並及時複製最新的事件。
與zookeeper之間有一個活躍的會話,也就是說,它在過去的6s(可配置)內向zookeeper傳送過心跳。
在過去的10s(可配置)內從首領那裡獲取過最新的資料。
影響Kafka訊息儲存可靠性的配置
ack應答機制
對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒有必要等ISR中的follower全部接收成功。所以Kafka提供了三種可靠性級別,使用者可以根據對可靠性和延遲的要求進行權衡。acks:
0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒寫入磁碟就已經返回,當broker故障時可能丟失資料;
1: producer等待leader的ack,partition的leader落盤成功後返回ack,如果在follower同步成功之前leader故障,那麼將會丟失資料;
-1(all):producer等待broker的ack,partition的leader和ISR裡的follower全部落盤成功後才返回ack。但是如果在follower同步完成後,broker傳送ack之前,leader發生故障,那麼會造成重複資料。(極端情況下也有可能丟資料:ISR中只有一個Leader時,相當於1的情況)。
消費一致性保證
(1)follower故障
follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會讀取本地磁碟記錄的上次的HW,並將log檔案高於HW的部分擷取掉,從HW開始向leader進行同步。
等該follower的LEO大於等於該Partition的HW,即follower追上leader之後,就可以重新加入ISR了。
(2)leader故障
leader發生故障後,會從ISR中選出一個新的leader,之後為了保證多個副本之間的資料一致性,其餘的follower會先將各自的log檔案高於HW的部分截掉,然後從新的leader同步資料。
注意:這隻能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。
2。2。3 訊息傳送流程
Kafka 的producer 傳送訊息採用的是非同步傳送的方式。在訊息傳送過程中,涉及到了兩個執行緒——main執行緒和sender執行緒,以及一個執行緒共享變數——RecordAccumulator。main執行緒將訊息傳送給RecordAccumulator,sender執行緒不斷從RecordAccumulator中拉取訊息傳送到Kafka broker。
為了提高效率,訊息被分批次寫入kafka。批次就是一組訊息,這些訊息屬於同一個主題和分割槽。(如果每一個訊息都單獨穿行於網路,會導致大量的網路開銷,把訊息分成批次傳輸可以減少網路開銷。不過要在時間延遲和吞吐量之間做出權衡:批次越大,單位時間內處理的訊息就越多,單個訊息的傳輸時間就越長)。批次資料會被壓縮,這樣可以提升資料的傳輸和儲存能力,但要做更多的計算處理。
相關引數:
2。3 Kafka消費者
2。3。1 消費方式
consumer採用pull(拉)的模式從broker中讀取資料。
push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。它的目標是儘可能以最快的速度傳遞訊息,但是這樣容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式可以根據consumer的消費能力以適當的速率消費訊息。
pull模式的不足之處是,如果kafka沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,kafka的消費者在消費資料時會傳入一個時長引數timeout,如果當前沒有資料可消費,consumer會等待一段時間後再返回。
2。3。2 分割槽分配策略
一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定哪個partition由哪個consumer來消費。Kafka提供了3種消費者分割槽分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor介面用於使用者定義實現分割槽分配演算法,以實現Consumer之間的分割槽分配。消費組的成員訂閱它們感興趣的Topic並將這種訂閱關係傳遞給作為訂閱組協調者的Broker。協調者選擇其中的一個消費者來執行這個消費組的分割槽分配並將分配結果轉發給消費組內所有的消費者。Kafka預設採用RangeAssignor的分配演算法。
2。3。2。1 RangeAssignor
RangeAssignor對每個Topic進行獨立的分割槽分配。對於每一個Topic,首先對分割槽按照分割槽ID進行排序,然後訂閱這個Topic的消費組的消費者再進行排序,之後儘量均衡的將分割槽分配給消費者。這裡只能是儘量均衡,因為分割槽數可能無法被消費者數量整除,那麼有一些消費者就會多分配到一些分割槽。分配示意圖如下:
分割槽分配的演算法如下:
這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來越嚴重,比如上圖中4個分割槽3個消費者的場景,C0會多分配一個分割槽。如果此時再訂閱一個分割槽數為4的Topic,那麼C0又會比C1、C2多分配一個分割槽,這樣C0總共就比C1、C2多分配兩個分割槽了,而且隨著Topic的增加,這個情況會越來越嚴重。分配結果:
訂閱2個Topic,每個Topic4個分割槽,共3個Consumer
C0:
[T0P0,T0P1,T1P0,T1P1]
C1:
[T0P2,T1P2]
C2:
[T0P3,T1P3]
2。3。2。2 RoundRobinAssignor
RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分割槽及所有消費者進行排序後儘量均衡的分配(RangeAssignor是針對單個Topic的分割槽進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那麼分配結果是儘量均衡的(消費者之間分配到的分割槽數的差值不會超過1)。如果訂閱的Topic列表是不同的,那麼分配結果是不保證“儘量均衡”的,因為某些消費者不參與一些Topic的分配。
以上兩個topic的情況,相比於之前RangeAssignor的分配策略,可以使分割槽分配的更均衡。不過考慮這種情況,假設有三個消費者分別為C0、C1、C2,有3個Topic T0、T1、T2,分別擁有1、2、3個分割槽,並且C0訂閱T0,C1訂閱T0和T1,C2訂閱T0、T1、T2,那麼RoundRobinAssignor的分配結果如下:
看上去分配已經儘量的保證均衡了,不過可以發現C2承擔了4個分割槽的消費而C1訂閱了T1,是不是把T1P1交給C1消費能更加的均衡呢?
2。3。2。3 StickyAssignor
StickyAssignor分割槽分配演算法,目的是在執行一次新的分配時,能在上一次分配的結果的基礎上,儘量少的調整分割槽分配的變動,節省因分割槽分配變化帶來的開銷。Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標有兩點:
分割槽的分配儘量的均衡。
每一次重分配的結果儘量與上一次分配結果保持一致。
當這兩個目標發生衝突時,優先保證第一個目標。第一個目標是每個分配演算法都儘量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。
StickyAssignor演算法比較複雜,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:
有4個Topic:T0、T1、T2、T3,每個Topic有2個分割槽。
有3個Consumer:C0、C1、C2,所有Consumer都訂閱了這4個分割槽。
上面紅色的箭頭代表的是有變動的分割槽分配,可以看出,StickyAssignor的分配策略,變動較小。
2。3。3 offset的維護
2。3。4 kafka高效讀寫資料(瞭解)
順序寫磁碟
Kafka 的 producer生產資料,要寫入到log檔案中,寫的過程是一直追加到檔案末端,為順序寫。資料表明,同樣的磁碟,順序寫能到600M/s,而隨機寫只有100K/s。這與磁碟的機械結構有關,順序寫之所以快,是因為其省去了大量磁頭定址的時間。
零複製技術
零複製主要的任務就是避免CPU將資料從一塊儲存複製到另外一塊儲存,主要就是利用各種零複製技術,避免讓CPU做大量的資料複製任務,減少不必要的複製,或者讓別的元件來做這一類簡單的資料傳輸任務,讓CPU解脫出來專注於別的任務。這樣就可以讓系統資源的利用更加有效。
最近面試BAT,整理一份面試資料《
Java面試BATJ通關手冊
》,覆蓋了Java核心技術、JVM、Java併發、SSM、微服務、資料庫、資料結構等等。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)