這才是kafka
文章轉載自:https://www.jianshu.com/p/d3e963ff8b70
作者:鄭杰文,騰訊云存儲,高級后臺工程師
簡介
kafka是一個分布式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。
kafka對外使用topic的概念,生產者往topic里寫消息,消費者從讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數量來進行橫向擴容。單個parition內是保證消息有序。每新寫一條消息,kafka就是在對應的文件append寫,所以性能非常高。kafka的總體數據流是這樣的:
大概用法就是,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進行業務處理。
圖中有兩個topic,topic 0有兩個partition,topic 1有一個partition,三副本備份??梢钥吹絚onsumer gourp 1中的consumer 2沒有分到partition處理,這是有可能出現的,下面會講到。關于broker、topics、partitions的一些元信息用zk來存,監控和路由啥的也都會用到zk。
生產
基本流程是這樣的

創建一條記錄,記錄中一個要指定對應的topic和value,key和partition可選。
先序列化,然后按照topic和partition,放進對應的發送隊列中。kafka produce都是批量請求,會積攢一批,然后一起發送,不是調send()就進行立刻進行網絡發包。
如果partition沒填,那么情況會是這樣的:
1、key有填
按照key進行哈希,相同key去一個partition。(如果擴展了partition的數量那么就不能保證了)
2、key沒填
round-robin來選partition
這些要發往同一個partition的請求按照配置,攢一波,然后由一個單獨的線程一次性發過去。
API
有high level api,替我們把很多事情都干了,offset,路由啥都替我們干了,用以來很簡單。
還有simple api,offset啥的都是要我們自己記錄。
partition
當存在多副本的情況下,會盡量把多個副本,分配到不同的broker上。kafka會為partition選出一個leader,之后所有該partition的請求,實際操作的都是leader,然后再同步到其他的follower。當一個broker歇菜后,所有leader在該broker上的partition都會重新選舉,選出一個leader。(這里不像分布式文件存儲系統那樣會自動進行復制保持副本數)
然后這里就涉及兩個細節:怎么分配partition,怎么選leader。
關于partition的分配,還有leader的選舉,總得有個執行者。在kafka中,這個執行者就叫controller。kafka使用zk在broker中選出一個controller,用于partition分配和leader選舉。
partition的分配
1、將所有Broker(假設共n個Broker)和待分配的Partition排序
2、將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)
3、將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
leader容災
controller會在Zookeeper的/brokers/ids節點上注冊Watch,一旦有broker宕機,它就能知道。當broker宕機后,controller就會給受到影響的partition選出新leader。controller從zk的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應partition的ISR(in-sync replica已同步的副本)列表,選一個出來做leader。
選出leader后,更新zk,然后發送LeaderAndISRRequest給受影響的broker,讓它們改變知道這事。為什么這里不是使用zk通知,而是直接給broker發送rpc請求,我的理解可能是這樣做zk有性能問題吧。
如果ISR列表是空,那么會根據配置,隨便選一個replica做leader,或者干脆這個partition就是歇菜。如果ISR列表的有機器,但是也歇菜了,那么還可以等ISR的機器活過來。
多副本同步
這里的策略,服務端這邊的處理是follower從leader批量拉取數據來同步。但是具體的可靠性,是由生產者來決定的。
生產者生產消息的時候,通過request.required.acks參數來設置數據的可靠性。
在acks=-1的時候,如果ISR少于min.insync.replicas指定的數目,那么就會返回不可用。
這里ISR列表中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。以前還有根據落后多少條消息就踢出ISR,在1.0版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出ISR列表。
從ISA中選出leader后,follower會從把自己日志中上一個高水位后面的記錄去掉,然后去和leader拿新的數據。因為新的leader選出來后,follower上面的數據,可能比新leader多,所以要截取。這里高水位的意思,對于partition和leader,就是所有ISR中都有的最新一條記錄。消費者最多只能讀到高水位;
從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。
也正是由于這個高水位延遲一輪,在一些情況下,kafka會出現丟數據和主備數據不一致的情況,0.11開始,使用leader epoch來代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)
思考:
當acks=-1時
1、是follwers都來fetch就返回成功,還是等follwers第二輪fetch?
2、leader已經寫入本地,但是ISR中有些機器失敗,那么怎么處理呢?
消費
訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那么就會有個別消費者一直空閑。
API
訂閱topic時,可以用正則表達式,如果有新topic匹配上,那能自動訂閱上。
offset的保存
一個消費組消費partition,需要保存offset記錄消費到哪,以前保存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這里zk的性能嚴重影響了消費的速度,而且很容易出現重復消費。
在0.10版本后,kafka把這個offset的保存,從zk總剝離,保存在一個名叫__consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact??偸潜A糇钚碌?span style=";padding: 0px;list-style: none;-webkit-font-smoothing: antialiased">key,其余刪掉。一般情況下,每個key的offset都是緩存在內存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷partition建立緩存,然后查詢返回。
確定consumer group位移信息寫入__consumers_offsets的哪個partition,具體計算公式:
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
思考:
如果正在跑的服務,修改了offsets.topic.num.partitions,那么offset的保存是不是就亂套了?
分配partition--reblance
生產過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用于分配partition。
下面從頂向下,分別闡述一下
1、怎么選coordinator。
2、交互流程。
3、reblance的流程。
選coordinator
1、看offset保存在那個partition
2、該partition leader所在的broker就是被選定的coordinator
這里我們可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一臺機器。
交互流程
把coordinator選出來之后,就是要分配了
整個流程是這樣的:
1、consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,發送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應coordinator的地址。
2、consumer 發送heartbeat請求給coordinator,返回IllegalGeneration的話,就說明consumer的信息是舊的了,需要重新加入進來,進行reblance。返回成功,那么consumer就從上次分配的partition中繼續執行。
reblance流程
1、consumer給coordinator發送JoinGroupRequest請求。
2、這時其他consumer發heartbeat請求過來時,coordinator會告訴他們,要reblance了。
3、其他consumer發送JoinGroupRequest請求。
4、所有記錄在冊的consumer都發了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader。然后回JoinGroupRespone,這會告訴consumer你是follower還是leader,對于leader,還會把follower的信息帶給它,讓它根據這些信息去分配partition
5、consumer向coordinator發送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。
6、coordinator回包,把分配的情況告訴consumer,包括leader。
當partition或者消費者的數量發生變化時,都得進行reblance。
列舉一下會reblance的情況:
1、增加partition
<p style="margin-top: 0px;margin-bottom: 0.8rem;padding: 0px;list-style: none;-webkit-font-smoothing: antialiased