您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 > 

02 Kafka訂閱java實現(java工程kafka傳遞自定義對象,消費端獲取到的是null)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-06-02 17:48:19【】0人已围观

简介差錯校驗的;屬性:占用2個字節,這個字段的最低3位記錄了當前batch中消息的壓縮方式,現在主要有GZIP、LZ4和Snappy三種。第4位記錄了時間戳的類型,第5和6位記錄了新版本引入的事務類型和控

差錯校驗的;

屬性:占用2個字節,這個字段的最低3位記錄了當前batch中消息的壓縮方式,現在主要有GZIP、LZ4和Snappy三種。第4位記錄了時間戳的類型,第5和6位記錄了新版本引入的事務類型和控制類型;

最大位移增量:最新的消息的位移相對于第一條消息的唯一增量;

起始時間戳:占用8個字節,記錄了batch中第一條消息的時間戳;

最大時間戳:占用8個字節,記錄了batch中最新的一條消息的時間戳;

PID、procer epoch和起始序列號:這三個參數主要是為了實現事務和冪等性而使用的,其中PID和procer epoch用于確定當前procer是否合法,而起始序列號則主要用于進行消息的冪等校驗;

消息個數:占用4個字節,記錄當前batch中所有消息的個數;

通過上面的介紹可以看出,每個batch的頭部數據中占用的字節數固定為61個字節,可變部分主要是與具體的消息有關,下面我們來看一下batch中每條消息的格式:

這里的消息的頭部數據就與batch的大不相同,可以看到,其大部分數據的長度都是可變的。既然是可變的,這里我們需要強調兩個問題:

1、對于數字的存儲,kafka采用的是Zig-Zag的存儲方式,也即負數并不會使用補碼的方式進行編碼,而是將其轉換為對應的正整數,比如-1映射為1、1映射為2、-2映射為3、2映射為4,關系圖如下所示:

通過圖可以看出,在對數據反編碼的時候,我們只需要將對應的整數轉換成其原始值即可;

2、在使用Zig-Zag編碼方式的時候,每個字節最大為128,而其中一半要存儲正數,一半要存儲負數,還有一個0,也就是說每個字節能夠表示的最大整數為64,此時如果有大于64的數字,kafka就會使用多個字節進行存儲。

而這多個字節的表征方式是通過將每個字節的最大位作為保留位來實現的,如果最高位為1,則表示需要與后續字節共同表征目標數字,如果最高位為0,則表示當前位即可表示目標數字。

kafka使用這種編碼方式的優點在于,大部分的數據增量都是非常小的數字,因此使用一個字節即可保存,這比直接使用原始類型的數據要節約大概七倍的內存。

對于上面的每條消息的格式,除了消息key和value相關的字段,其還有屬性字段和header,屬性字段的主要作用是存儲當前消息key和value的壓縮方式,而header則供給用戶進行添加一些動態的屬性,從而實現一些定制化的工作。

通過對kafka消息日志的存儲格式我們可以看出,其使用batch的方式將一些公共信息進行提取,從而保證其只需要存儲一份,雖然看起來每個batch的頭部信息比較多,但其平攤到每條消息上之后使用的字節更少了;

在消息層面,kafka使用了數據增量的方式和Zig-Zag編碼方式對數據進行的壓縮,從而極大地減少其占用的字節數。總體而言,這種存儲方式極大的減少了kafka占用的磁盤空間大小。

2. 日志存儲方式

在使用kafka時,消息都是推送到某個topic中,然后由procer計算當前消息會發送到哪個partition,在partition中,kafka會為每條消息設置一個偏移量,也就是說,如果要唯一定位一條消息,使用<topic, partition, offset>三元組即可。

基于kafka的架構模式,其會將各個分區平均分配到每個broker中,也就是說每個broker會被分配用來提供一個或多個分區的日志存儲服務。在broker服務器上,kafka的日志也是按照partition進行存儲的,其會在指定的日志存儲目錄中為每個topic的partition分別創建一個目錄,目錄中存儲的就是這些分區的日志數據,而目錄的名稱則會以<topic-patition>的格式進行創建。如下是kafka日志的存儲目錄示意圖:

這里我們需要注意的是,圖中對于分區日志的存儲,當前broker只會存儲分配給其的分區的日志,比如圖中的買粉絲nnect-status就只有分區1和分區4的目錄,而沒有分區2和分區3的目錄,這是因為這些分區被分配在了集群的其他節點上。

在每個分區日志目錄中,存在有三種類型的日志文件,即后綴分別為log、index和timeindex的文件。其中log文件就是真正存儲消息日志的文件,index文件存儲的是消息的位移索引數據,而timeindex文件則存儲的是時間索引數據。

如下圖所示為一個分區的消息日志數據:

從圖中可以看出,每種類型的日志文件都是分段的,這里關于分段的規則主要有如下幾點需要說明:

在為日志進行分段時,每個文件的文件名都是以該段中第一條消息的位移的偏移量來命名的;

kafka會在每個log文件的大小達到1G的時候關閉該文件,而新開一個文件進行數據的寫入。可以看到,圖中除了最新的log文件外,其余的log文件的大小都是1G;

對于index文件和timeindex文件,在每個log文件進行分段之后,這兩個索引文件也會進行分段,這也就是它們的文件名與log文件一致的原因;

kafka日志的留存時間默認是7天,也就是說,kafka會刪除存儲時間超過7天的日志,但是對于某些文件,其部分日志存儲時間未達到7天,部分達到了7天,此時還是會保留該文件,直至其所有的消息都超過留存時間;

3. 索引文件

kafka主要有兩種類型的索引文件:位移索引文件和時間戳索引文件。位移索引文件中存儲的是消息的位移與該位移所對應的消息的物理地址;時間戳索引文件中則存儲的是消息的時間戳與該消息的位移值。

也就是說,如果需要通過時間戳查詢消息記錄,那么其首先會通過時間戳索引文件查詢該時間戳對應的位移值,然后通過位移值在位移索引文件中查詢消息具體的物理地址。關于位移索引文件,這里有兩點需要說明:

1、由于kafka消息都是以batch的形式進行存儲,因而索引文件中索引元素的最小單元是batch,也就是說,通過位移索引文件能夠定位到消息所在的batch,而沒法定位到消息在batch中的具體位置,查找消息的時候,還需要進一步對batch進行遍歷;

2、位移索引文件中記錄的位移值并不是消息真正的位移值,而是該位移相對于該位移索引文件的起始位移的偏移量,通過這種方式能夠極大的減小位移索引文件的大小。

如下圖所示為一個位移索引文件的格式示意圖:

如下則是具體的位移索引文件的示例:

關于時間戳索引文件,由于時間戳的變化比位移的變化幅度要大一些,其即使采用了增量的方式存儲時間戳索引,但也沒法有效地使用Zig-Zag方式對數據進行編碼,因而時間戳索引文件是直接存儲的消息的時間戳數據,

但是對于時間戳索引文件中存儲的位移數據,由于其變化幅度不大,因而其還是使用相對位移的方式進行的存儲,并且這種存儲方式也可以直接映射到位移索引文件中而無需進行計算。如下圖所示為時間戳索引文件的格式圖:

如下則是時間戳索引文件的一個存儲示例:

可以看到,如果需要通過時間戳來定位消息,就需要首先在時間戳索引文件中定位到具體的位移,然后通過位移在位移索引文件中定位到消息的具體物理地址。

4. 日志壓縮

所謂的日志壓縮功能,其主要是針對這樣的場景的,比如對某個用戶的郵箱數據進行修改,其總共修改了三次,修改過程如下:

email=john@gmail.買粉絲

email=john@yahoo.買粉絲.買粉絲

email=john@163.買粉絲

在這么進行修改之后,很明顯,我們主要需要關心的是最后一次修改,因為其是最終數據記錄,但是如果我們按順序處理上述消息,則需要處理三次消息。

kafka的日志壓縮就是為了解決這個問題而存在的,對于使用相同key的消息,其會只保留最新的一條消息的記錄,而中間過程的消息都會被kafka cleaner給清理掉。

但是需要注意的是,kafka并不會清理當前處于活躍狀態的日志文件中的消息記錄。所謂當前處于活躍狀態的日志文件,也就是當前正在寫入數據的日志文件。如下圖所示為一個kafka進行日志壓縮的示例圖:

圖中K1的數據有V1、V3和V4,經過壓縮之后只有V4保留了下來,K2的數據則有V2、V6和V10,壓縮之后也只有V10保留了下來;同理可推斷其他的Key的數據。

另外需要注意的是,kafka開啟日志壓縮使用的是log.cleanup.policy,其默認值為delete,也即我們正常使用的策略,如果將其設置為買粉絲paction,則開啟了日志壓縮策略,但是需要注意的是,開啟了日志壓縮策略并不代表kafka會清理歷史數據,只有將log.cleaner.enable設置為true才會定時清理歷史數據。

在kafka中,其本身也在使用日志壓縮策略,主要體現在kafka消息的偏移量存儲。在舊版本中,kafka將每個買粉絲nsumer分組當前消費的偏移量信息保存在zookeeper中,但是由于zookeeper是一款分布式協調工具,其對于讀操作具有非常高的性能,但是對于寫操作性能比較低,而買粉絲nsumer的位移提交動作是非常頻繁的,這勢必會導致zookeeper成為kafka消息消費的瓶頸。

因而在最新版本中,kafka將分組消費的位移數據存儲在了一個特殊的topic中,即__買粉絲nsumer_offsets,由于每個分組group的位移信息都會提交到該topic,因而kafka默認為其設置了非常多的分區,也即50個分區。

另外,買粉絲nsumer在提交位移時,使用的key為groupId+topic+partition,而值則為當前提交的位移,也就是說,對于每一個分組所消費的topic的partition,其都只會保留最新的位移。如果買粉絲nsumer需要讀取位移,那么只需要按照上述格式組裝key,然后在該topic中讀取最新的消息數據即可。

為什么會出現cannot be cast to java.lang.String

很明顯是類型轉換錯誤。即Integer 類型不能轉成String類型。 

解決方案:

1.直接使用tosting的方式 

String str = entry.value().toString(); 

2.使用String類的靜態方法valueOf()

String str = String.valueOf(entry.value());

3. String orderNo = ((String[])request.getAttribute("orderNo"))[0]; 

4. 將錯誤中的(String)強制轉換類型修改為    object.toString()

擴展資

很赞哦!(6)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款的名片

职业:程序员,设计师

现居:河南三门峡卢氏县

工作室:小组

Email:[email protected]