您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Kafka 0.10 Java使用者未从主题读取消息

Kafka 0.10 Java使用者未从主题读取消息

您对commitSync()的调用将确认最后一个poll()中批处理中的所有消息,而不是在处理消息时确认每个消息,这是我认为您要尝试的。

从文档中

“以上示例使用commitSync将所有收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync(offsets)时,应在最后处理的消息的偏移量上添加一个。”

java 2022/1/1 18:34:10 有518人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶