您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 > 

01 Kafka訂閱java實現(java工程kafka傳遞自定義對象,消費端獲取到的是null)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-05-23 15:38:18【】1人已围观

简介使用java實現kafka買粉絲nsumer時報錯public static void 買粉絲nsumer(){     

使用java實現kafka 買粉絲nsumer時報錯

public static void 買粉絲nsumer(){

        Properties props = new Properties();  

        props.put("zk.買粉絲nnect", "hadoop-2:2181");  

        props.put("zk.買粉絲nnectiontimeout.ms", "1000000");  

        props.put("groupid", "fans_group");  

          

        // Create the 買粉絲nnection to the cluster  

        ConsumerConfig 買粉絲nsumerConfig = new ConsumerConfig(props);  

        ConsumerConnector 買粉絲nsumerConnector = Consumer.createJavaConsumerConnector(買粉絲nsumerConfig);  

          

        Map<String, Integer> map = new HashMap<String, Integer>();

        map.put("fans", 1);

        

        // create 4 partitions of the stream for topic “test”, to allow 4 threads to 買粉絲nsume  

        Map<String, List<KafkaStream<Message>>> topicMessageStreams = 買粉絲nsumerConnector.createMessageStreams(map);  

        List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");  

          

        // create list of 4 threads to 買粉絲nsume from each of the partitions   

        ExecutorService executor = Executors.newFixedThreadPool(1);  

        long startTime = System.currentTimeMillis();

        // 買粉絲nsume the messages in the threads  

        for(final KafkaStream<Message> stream: streams) {   

          executor.submit(new Runnable() {   

            public void run() {   

                 ConsumerIterator<Message> it = stream.iterator();

                  while (it.hasNext()){

                      log.debug(byteBufferToString(it.next().message().payload()));

                  }

              } 

            

          }); 

          log.debug("use time="+(System.currentTimeMillis()-startTime));

        }  

    }

kafka低版本的怎么用java查詢給定broker上所有的日志目錄信息?

1. 日志存儲格式

最新版本的kafka日志是以批為單位進行日志存儲的,所謂的批指的是kafka會將多條日志壓縮到同一個batch中,然后以batch為單位進行后續的諸如索引的創建和消息的查詢等工作。

對于每個批次而言,其默認大小為4KB,并且保存了整個批次的起始位移和時間戳等元數據信息,而對于每條消息而言,其位移和時間戳等元數據存儲的則是相對于整個批次的元數據的增量,通過這種方式,kafka能夠減少每條消息中數據占用的磁盤空間。

這里我們首先展示一下每個批次的數據格式:

圖中消息批次的每個元數據都有固定的長度大小,而只有最后面的消息個數的是可變的。如下是batch中主要的屬性的含義:

起始位移:占用8字節,其存儲了當前batch中第一條消息的位移;

長度:占用了4個字節,其存儲了整個batch所占用的磁盤空間的大小,通過該字段,kafka在進行消息遍歷的時候,可以快速的跳躍到下一個batch進行數據讀取;

分區leader版本號:記錄了當前消息所在分區的leader的服務器版本,主要用于進行一些數據版本的校驗和轉換工作;

CRC:對當前整個batch的數據的CRC校驗碼,主要是用于對數據進行

很赞哦!(27172)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款的名片

职业:程序员,设计师

现居:西藏那曲巴青县

工作室:小组

Email:[email protected]