選單

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

導語 | Controller作為Apache Kafka的核心元件,本文將從背景、原理以及原始碼與監控等方面來深入剖析Kafka Controller,希望帶領大家去了解Controller在整個Kafka叢集中的作用。

一、背景

Controller,是Apache Kafka的核心元件非常重要。它的主要作用是在Apache Zookeeper的幫助下管理和協調控制整個Kafka叢集。

在整個Kafka叢集中,如果Controller故障異常,有可能會影響到生產和消費。所以,我們需要對其狀態、選舉、日誌等做全面的監控。

二、Controller是什麼

Controller,是Apache Kafka的核心元件。它的主要作用是在Apache Zookeeper的幫助下管理和協調控制整個Kafka叢集。

叢集中的任意一臺Broker都能充當Controller的角色,但是,在整個叢集執行過程中,只能有一個Broker成為Controller。也就是說,

每個正常執行的Kafka叢集,在任何時刻都有且只有一個Controller

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

三、Controller儲存的資料

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

其中比較重要的資料有:

所有主題資訊。包括具體的分割槽資訊,比如領導者副本是誰,ISR集合中有哪些副本等。

所有Broker資訊。包括當前都有哪些執行中的Broker,哪些正在關閉中的Broker等。

所有涉及運維任務的分割槽。包括當前正在進行Preferred領導者選舉以及分割槽重分配的分割槽列表。

這些資料其實在ZooKeeper中也儲存了一份。每當控制器初始化時,它都會從ZooKeeper上讀取對應的元資料並填充到自己的快取中。

而Broker上元資料的更新都是由Controller通知完成的,Broker並不從Zookeeper獲取元資料資訊。

四、Controller職責

Controller職責大致分為

5

種:

主題管理,分割槽重分配,Preferred leader選舉,叢集成員管理(Broker上下線),資料服務(向其他Broker提供資料服務)

它們分別是:

UpdateMetadataRequest:更新元資料請求。topic分割槽狀態經常會發生變更(比如leader重新選舉了或副本集合變化了等)。由於當前clients只能與分割槽的leader Broker進行互動,那麼一旦發生變更,controller會將最新的元資料廣播給所有存活的Broker。具體方式就是給所有Broker傳送UpdateMetadataRequest請求。

CreateTopics: 建立topic請求。當前不管是透過API方式、指令碼方式抑或是CreateTopics請求方式來建立topic,做法幾乎都是在Zookeeper的/brokers/topics下建立znode來觸發建立邏輯,而controller會監聽該path下的變更來執行真正的“建立topic”邏輯。

DeleteTopics:刪除topic請求。和CreateTopics類似,也是透過建立Zookeeper下的/admin/delete_topics/節點來觸發刪除topic,controller執行真正的邏輯。

分割槽重分配:即kafka-reassign-partitions指令碼做的事情。同樣是與Zookeeper結合使用,指令碼寫入/admin/reassign_partitions節點來觸 發,controller負責按照方案分配分割槽。

Preferred leader分配:preferred leader選舉當前有兩種觸發方式:自動觸發(auto。leader。rebalance。enable=true)和kafka-preferred-replica-election指令碼觸發。兩者“玩法”相同,向Zookeeper的/admin/preferred_replica_election寫資料,controller提取資料執行preferred leader分配。

分割槽擴充套件:即增加topic分割槽數。標準做法也是透過kafka-reassign-partitions指令碼完成,不過使用者可直接往Zookeeper中寫資料來實現,比如直接把新增分割槽的副本集合寫入到/brokers/topics/下,然後controller會為你自動地選出leader並增加分割槽。

叢集擴充套件:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工作。

broker崩潰:同樣地,controller透過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可立即感知併為受影響分割槽選舉新的leader。

ControlledShutdown:broker除了崩潰,還能“優雅”地退出。broker一旦自行終止,controller會接收到一個 ControlledShudownRequest請求,然後controller會妥善處理該請求並執行各種收尾工作。

Controller leader選舉:controller必然要提供自己的leader選舉以防這個全域性唯一的元件崩潰宕機導致服務中斷。這個功能也是透過 Zookeeper的幫助實現的。

原始碼位置可以看後面段落9原始碼的說明。

五、Broker如何成為Controller

和解決可能的腦裂問題

(一)Broker如何成為Controller

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

最先在Zookeeper上建立臨時節點/controller成功的Broker就是Controller。

原始碼路徑(Kafka2。2):

Kafka#main->KafkaServerStartable#startup()->KafkaServer#startup()->KafkaController#startup()->eventManager。put(Startup)->elect()-> zkClient。registerControllerAndIncrementControllerEpoch

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

Controller重度依賴Zookeeper,依賴zookeepr儲存元資料,依賴zookeeper進行服務發現。Controller大量使用Watch功能實現對叢集的協調管理。

當broker節點因故障離開Kafka叢集時,broker中存在的leader分割槽將不可用(因為客戶端只對leader分割槽進行讀寫)

為了最大限度地減少停機時間,需要快速找到替代的領導分割槽。Controller可以從zookeeper watch獲取通知資訊。Zookeeper給了客戶端監聽znode變化的能力,也就是所謂的watch通知功能。一旦znode節點建立、刪除、子節點數量發生變化,或者znode中儲存的資料本身發生變化,Zookeeper會透過節點變化處理程式顯式通知客戶端。

當Broker宕機或主動關閉時,Broker與Zookeeper的會話結束,znode會被自動刪除。同樣的,Zookeeper的watch機制把這個變化推送給Controller,讓Controller知道有Broker down或者up,這樣Controller就可以進行後續的協調操作。

Controller將收到通知並對其採取行動,以確定Broker上的哪些分割槽將成為Leader partition。然後,它會通知每個相關的Broker,或者Broker上的topic partition將成為leader partition,或者LeaderAndIsrRequest從新的leader分割槽複製資料。

(二)如何避免Controller出現裂腦

如果Controller所在的Broker故障,Kafka叢集必須有新的Controller,否則叢集將無法正常工作。這兒存在一個問題。很難確定Broker是宕機還是隻是暫時的故障。但是,為了使叢集正常執行,必須選擇新的Controller。如果之前更換的Controller又正常了,不知道自己已經更換了,那麼叢集中就會出現兩個Controller。

其實這種情況是很容易發生的。例如,

由於垃圾回收(GC),一個Controller被認為是死的,並選擇了一個新的控制器。在GC的情況下,在原Controller眼裡沒有任何變化,Broker甚至不知道自己已經被暫停了。因此,它將繼續充當當前Controller,這在分散式系統中很常見,稱為裂腦

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

現在,叢集中有兩個Controller,可能會一起發出相互衝突的事件,這會導致腦裂。可能會導致嚴重的不一致。所以需要一種方法來區分誰是叢集的最新Controller。

Kafka是透過使用epoch number來處理,epoch number只是一個單調遞增的數。第一次選擇控制器時,epoch number值為1。如果再次選擇新控制器,epoch number為2,依次單調遞增。

每個新選擇的Controller透過zookeeper的條件遞增操作獲得一個新的更大的epoch number。當其他Broker知道當前的epoch number時,如果他們從Controller收到包含舊(較小)epoch number的訊息,則它們將被忽略。即Broker根據最大的epoch number來區分最新的Controller。

epoch number記錄在Zookeepr的一個永久節點controller_epoch。

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

上圖中,Broker3向Broker1下發命令:將Broker1上的partitionA做為leader,訊息的epoch number值為1,同時Broker2也向Broker1傳送同樣的命令。不同的是,訊息的epoch number值為2,此時broker1只監聽broker2的命令(由於其epoch號大),而會忽略broker3的命令,以免發生腦裂。

六、Controller在版本上的改進

在Kafka2.2之前

網路處理模型:Kafka Server在啟動時會初始化SocketServer、KafkaApis和KafkaRequestHandlerPool物件,這也是Server網路處理模型的主要組成部分。Kafka Server的網路處理模型也是基於Java NIO機制實現的,實現模式與Reactor模式類似。

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

如上圖,所有請求共享一個requestQueue佇列。

問題:

當前Broker對入站請求型別不做任何優先順序處理。

不論是PRODUCE請求、FETCH請求還是Controller類的請求。對Controller傳送的訊息非常不公平,因為這個類請求應該優先順序更高。

這就可能造成一個問題:即clients傳送的資料類請求積壓導致controller推遲了管理類請求的處理。設想這樣的場景。假設controller向broker廣播了leader發生變更。於是新leader開始接收clients端請求,而同時老leader所在的broker由於出現了資料類請求的積壓使得它一直忙於處理這些請求而無法處理controller發來的LeaderAndIsrRequest請求,因此這是就會出現“雙主”的情況——也就是所謂的腦裂。

在Kafka 2.2

將控制器傳送的請求與普通資料類請求分開處理,原始碼SocketServer。scala#startup()->KafkaServer。scala。

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

在0。11版本上也做了大的改進,會在後面段落8中說明。

七、Controller的監控

在整個叢集執行過程中,只能有一個Broker成為Controller。所以要監控Controller的數量以及Controller的變更史。

可以用Kafka的JMXTool,進行輕量級的監控。

記錄Controller變更歷:

監控效果:

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

透過JMXTool,還可以拉取Kafka的其他指標進行監控。

例如:

under_replicated_partitions有非同步副本監控。

OfflinePartitionsCount分割槽丟失leader監控。

ZooKeeper_SessionState Broker與Zookeeper斷開連線監控。

MessagesInPerSec,進入Broker消費數量監控。

ISR擴縮容率等。

監控可以有很多方式,這樣做主要是簡單方便,不需要依賴太多監控系統,同時監控程式可以快速部署到海外或者合作伙伴機房。

八、關於Controller的架構改進

Kafka中的一臺Broker充當Controller的角色,此臺Broker不僅對生產者消費者提供服務,還要協調整個叢集的管理工作。如果使用0。11版本之前的Kafka而且分割槽很多時,建議將幾臺機器配置為只能成為Controller(當然這裡需要修改原始碼,編譯)。

0.11版本之前

同步操作Zookeeper使用同步的API,效能差。當Broker宕機,大量主題分割槽發生變更時,自動恢復時間長。Controller是一個分割槽一個分割槽進行寫入的,對於分割槽數很多的叢集來說,這無疑是個巨大的效能瓶頸。

0.11 版本

非同步操作Zookeeper使用async API,寫入提升了10倍。

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

如果機器效能較好,可以將Zookeeper和Controller部署在相同的機器。Kafka對Zookeeper寫請求比較少。

注意

:消費方式有基於Zookeeper消費和基於Broker訊息。基於Zookeeper消費,就是將消費位移提交到Zookeeper上,這種方式對Zookeeper有大量寫操作。不要將Zookeeper和其他機器共用。

Zookeeper官網上有對讀寫佔比的壓測說明:

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

九、Controller的原始碼

原始碼(基於kafka 2。2)的內容較多:

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

10分鐘帶你玩轉Kafka基於Controller的領導選舉!

(一)Controller啟動流程【主要看寫的原始碼註釋】

(二)Controller選舉流程【主要看寫的原始碼註釋】

(三)成為Controller後的初始化工作【主要看寫的原始碼註釋】

(四)從KafkaController類看Controller的主要工作【主要看寫的原始碼註釋】

(五)其他原始碼部分

Controller還有幾個重要部分的原始碼:

Controller 傳送模型NetWork

ControllerChannelManager

Controller-Partition狀態機

Controller-Replica狀態機

Controller-分割槽副本重分配(PartitionReassignment)與Preferred leader副本選舉

Controller-Broker的上線與下線

Controller-LeaderAndIsr請求

Topic 的新建/擴容/刪除

由於程式碼和註釋比較多,在此略過。

參考資料:

1。Kafka運維填坑

2。Matt‘s Blog

3。What is Kafka’s controller broker

4。ZooKeeper:A Distributed Coordination Service for Distributed Applications

作者簡介

袁吉

騰訊運營規劃工程師

騰訊運營規劃工程師,目前負責騰訊遊戲萬億級實時數倉、BG資料中臺的運營工作。有豐富的訊息中介軟體,分散式大資料處理引擎的運營管理經驗。