• <th id="fvgbn"><video id="fvgbn"></video></th>
    <code id="fvgbn"><nobr id="fvgbn"><track id="fvgbn"></track></nobr></code>

      歡迎訪問合肥市大數據資產運營有限公司網站!
      0551-65909059
      聯系電話:
      當前位置:首頁>>新聞中心>>行業動態 >>這才是kafka
      今天是: 2021年09月29日   【農歷:八月廿三】  星期三
      這才是kafka

      這才是kafka

      文章轉載自:https://www.jianshu.com/p/d3e963ff8b70

      作者:鄭杰文,騰訊云存儲,高級后臺工程師

      簡介

      kafka是一個分布式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。

      kafka對外使用topic的概念,生產者往topic里寫消息,消費者從讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數量來進行橫向擴容。單個parition內是保證消息有序。每新寫一條消息,kafka就是在對應的文件append寫,所以性能非常高。kafka的總體數據流是這樣的:


      大概用法就是,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進行業務處理。
      圖中有兩個topic,topic 0有兩個partition,topic 1有一個partition,三副本備份??梢钥吹絚onsumer gourp 1中的consumer 2沒有分到partition處理,這是有可能出現的,下面會講到。關于broker、topics、partitions的一些元信息用zk來存,監控和路由啥的也都會用到zk。

      生產

      基本流程是這樣的


      創建一條記錄,記錄中一個要指定對應的topic和value,key和partition可選。 先序列化,然后按照topic和partition,放進對應的發送隊列中。kafka produce都是批量請求,會積攢一批,然后一起發送,不是調send()就進行立刻進行網絡發包。
      如果partition沒填,那么情況會是這樣的:

      1、key有填
      按照key進行哈希,相同key去一個partition。(如果擴展了partition的數量那么就不能保證了)

      2、key沒填
      round-robin來選partition

      這些要發往同一個partition的請求按照配置,攢一波,然后由一個單獨的線程一次性發過去。

      API

      有high level api,替我們把很多事情都干了,offset,路由啥都替我們干了,用以來很簡單。
      還有simple api,offset啥的都是要我們自己記錄。

      partition

      當存在多副本的情況下,會盡量把多個副本,分配到不同的broker上。kafka會為partition選出一個leader,之后所有該partition的請求,實際操作的都是leader,然后再同步到其他的follower。當一個broker歇菜后,所有leader在該broker上的partition都會重新選舉,選出一個leader。(這里不像分布式文件存儲系統那樣會自動進行復制保持副本數)

      然后這里就涉及兩個細節:怎么分配partition,怎么選leader。

      關于partition的分配,還有leader的選舉,總得有個執行者。在kafka中,這個執行者就叫controller。kafka使用zkbroker中選出一個controller,用于partition分配和leader選舉。

      partition的分配

      1、將所有Broker(假設共nBroker)和待分配的Partition排序

      2、將第iPartition分配到第(i mod n)個Broker上 (這個就是leader

      3、將第iPartition的第jReplica分配到第((i + j) mode n)個Broker

      leader容災

      controller會在Zookeeper/brokers/ids節點上注冊Watch,一旦有broker宕機,它就能知道。當broker宕機后,controller就會給受到影響的partition選出新leader。controllerzk/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應partitionISRin-sync replica已同步的副本)列表,選一個出來做leader。
      選出leader后,更新zk,然后發送LeaderAndISRRequest給受影響的broker,讓它們改變知道這事。為什么這里不是使用zk通知,而是直接給broker發送rpc請求,我的理解可能是這樣做zk有性能問題吧。

      如果ISR列表是空,那么會根據配置,隨便選一個replicaleader,或者干脆這個partition就是歇菜。如果ISR列表的有機器,但是也歇菜了,那么還可以等ISR的機器活過來。

      多副本同步

      這里的策略,服務端這邊的處理是followerleader批量拉取數據來同步。但是具體的可靠性,是由生產者來決定的。
      生產者生產消息的時候,通過request.required.acks參數來設置數據的可靠性。

      acks=-1的時候,如果ISR少于min.insync.replicas指定的數目,那么就會返回不可用。

      這里ISR列表中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。以前還有根據落后多少條消息就踢出ISR,在1.0版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出ISR列表。

      ISA中選出leader后,follower會從把自己日志中上一個高水位后面的記錄去掉,然后去和leader拿新的數據。因為新的leader選出來后,follower上面的數據,可能比新leader多,所以要截取。這里高水位的意思,對于partitionleader,就是所有ISR中都有的最新一條記錄。消費者最多只能讀到高水位;

      leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的brokerfetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。

      也正是由于這個高水位延遲一輪,在一些情況下,kafka會出現丟數據和主備數據不一致的情況,0.11開始,使用leader epoch來代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection

      思考:
      acks=-1

      1、是follwers都來fetch就返回成功,還是等follwers第二輪fetch?

      2、leader已經寫入本地,但是ISR中有些機器失敗,那么怎么處理呢?

      消費

      訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那么就會有個別消費者一直空閑。


      API

      訂閱topic時,可以用正則表達式,如果有新topic匹配上,那能自動訂閱上。

      offset的保存

      一個消費組消費partition,需要保存offset記錄消費到哪,以前保存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這里zk的性能嚴重影響了消費的速度,而且很容易出現重復消費。
      0.10版本后,kafka把這個offset的保存,從zk總剝離,保存在一個名叫__consumeroffsets topictopic中。寫進消息的keygroupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact??偸潜A糇钚碌?span style=";padding: 0px;list-style: none;-webkit-font-smoothing: antialiased">key
      ,其余刪掉。一般情況下,每個keyoffset都是緩存在內存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷partition建立緩存,然后查詢返回。

      確定consumer group位移信息寫入__consumers_offsets的哪個partition,具體計算公式:

      __consumers_offsets partition =
                 Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
      //groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。

      思考:
      如果正在跑的服務,修改了offsets.topic.num.partitions,那么offset的保存是不是就亂套了?

      分配partition--reblance

      生產過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用于分配partition。
      下面從頂向下,分別闡述一下

      1、怎么選coordinator。

      2、交互流程。

      3、reblance的流程。

      coordinator

      1、看offset保存在那個partition

      2、該partition leader所在的broker就是被選定的coordinator

      這里我們可以看到,consumer groupcoordinator,和保存consumer group offsetpartition leader是同一臺機器。

      交互流程

      coordinator選出來之后,就是要分配了
      整個流程是這樣的:

      1、consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,發送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應coordinator的地址。

      2、consumer 發送heartbeat請求給coordinator,返回IllegalGeneration的話,就說明consumer的信息是舊的了,需要重新加入進來,進行reblance。返回成功,那么consumer就從上次分配的partition中繼續執行。

      reblance流程

      1、consumercoordinator發送JoinGroupRequest請求。

      2、這時其他consumerheartbeat請求過來時,coordinator會告訴他們,要reblance了。

      3、其他consumer發送JoinGroupRequest請求。

      4、所有記錄在冊的consumer都發了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader。然后回JoinGroupRespone,這會告訴consumer你是follower還是leader,對于leader,還會把follower的信息帶給它,讓它根據這些信息去分配partition

      5、consumercoordinator發送SyncGroupRequest,其中leaderSyncGroupRequest會包含分配的情況。
      6
      、coordinator回包,把分配的情況告訴consumer,包括leader。

      partition或者消費者的數量發生變化時,都得進行reblance。
      列舉一下會reblance的情況:

      1、增加partition

      <p style="margin-top: 0px;margin-bottom: 0.8rem;padding: 0px;list-style: none;-webkit-font-smoothing: antialiased

      亚洲av无码国产在丝袜线观看_国产sm调教折磨视频失禁_老少交欧美另类_超在线视频