相关推荐recommended
kafka原理五之springboot 集成批量消费
作者:mmseoamin日期:2023-12-14

目录

前言

一、新建一个maven工程,添加kafka依赖

二、yaml配置文件

三、消息消费

手动提交非批量消费

  String 类型接入

使用注解方式获取消息头、消息体

手动提交批量消费

ConsumerRecord类接收

String类接收

使用注解方式获取消息头、消息体,则也是使用 List 来接收:

并发消费 

配置类方式

四、Kafka参数调优

一、Consumer参数说明

二、Kafka消息积压、消费能力不足怎么解决?

三、Kafka消费者如何进行流控?



前言

由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。


一、新建一个maven工程,添加kafka依赖


		org.springframework.kafka
		spring-kafka
         2.5.4.RELEASE

二、yaml配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9002
    producer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    consumer:
      group-id: test-consumer-group
      # 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset
      # earliest:表示自动重置到 partition 的最小 offset
      # latest:默认为 latest,表示自动重置到 partition 的最大 offset
      # none:不自动进行 offset 重置,抛
      auto-offset-reset: latest
      # 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offset
      enable-auto-commit: false
      ## 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000
      auto-commit-interval: 2000
      max-poll-records: 30
      heartbeat-interval: 3000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间
        session.timeout.ms: 60000
        # 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一
        heartbeat.interval.ms: 3000
        # 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者
        max.poll.interval.ms: 300000
        request.timeout.ms: 600000
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 2
      type: batch
      max-poll-records: 50
      #当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,
      #spring-kafka提供了通过ackMode的值表示不同的手动提交方式
      #手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      # 消费者监听的topic不存在时,项目会报错,设置为false
      missing-topics-fatal: false

三、消息消费

手动提交非批量消费

  •   String 类型接入
     @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")
        public void onMessage(String message, Consumer consumer) {
            System.out.println("接收到的消息:" + message);
            consumer.commitSync();
        }
    • 使用注解方式获取消息头、消息体
               /**
               * 处理消息
               */
              @KafkaListener(topics = "test-topic", groupId = "test-consumer-group")
              public void onMessage(@Payload String message,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                    @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
                                    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                                    Acknowledgment ack) {
                  try {
                      
                      ack.acknowledge();
                      log.info("Consumer>>>>>>>>>>>>>end");
                  } catch (Exception e) {
                      log.error("Consumer.onMessage#error . message={}", message, e);
                      throw new BizException("事件消息消费失败", e);
                  }
              } 

      手动提交批量消费

      想要批量消费,首先要开启批量消费,通过listener.type属性设置为batch即可开启,看下代码吧:

      spring:
        kafka:
          consumer:
            group-id: test-consumer-group
            bootstrap-servers: 127.0.0.1:9092
            max-poll-records: 50 # 一次 poll 最多返回的记录数
          listener:
            type: batch # 开启批量消费

      如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是50,并不是说如果没有达到50条消息,我们就一直等待。而是说一次poll最多返回的记录数为50

      • ConsumerRecord类接收
            /**
             * kafka的批量消费监听器
             */
            @KafkaListener(topics = "test-topic", groupId = "test-consumer-group")
            public void onMessage(List> records, Consumer consumer) {
                try {
                    log.info("Consumer.batch#size={}", records == null ? 0 : records.size());
                    if (CollectionUtil.isEmpty(records)) {
                        //分别是commitSync(同步提交)和commitAsync(异步提交)
                        consumer.commitSync();
                        return;
                    }
                    for (ConsumerRecord record : records) {
                        String message = record.value();
                        if (StringUtils.isBlank(message)) {
                            continue;
                        }
                       //处理业务数据
                       //doBuiness();
                    }
                    consumer.commitSync();
                    log.info("Consumer>>>>>>>>>>>>>end");
                } catch (Exception e) {
                    log.error("Consumer.onMessage#error .", e);
                    throw new BizException("事件消息消费失败", e);
                }
            }
        • String类接收
           @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")
              public void onMessage(List message, Consumer consumer) {
                  System.out.println("接收到的消息:" + message);
                  consumer.commitSync();
              }
          • 使用注解方式获取消息头、消息体,则也是使用 List 来接收:
            @Component
            public class KafkaConsumer {
                // 消费监听
                @KafkaListener(topics = {"test-topic"})
                public void listen2(@Payload List data,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) List topics,
                                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,
                                    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys,
                                    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List tss) {
                    System.out.println("收到"+ data.size() + "条消息:");
                    System.out.println(data);
                    System.out.println(topics);
                    System.out.println(partitions);
                    System.out.println(keys);
                    System.out.println(tss);
                }
            }

            并发消费 

            再来看下并发消费,为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态

            spring:
              kafka:
                consumer:
                  group-id: test-consumer-group
                  bootstrap-servers: 127.0.0.1:9092
                  max-poll-records: 50 # 一次 poll 最多返回的记录数
                listener:
                  type: batch # 开启批量监听
                  concurrency: 3 # 设置并发数

            我们设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition

            配置类方式

            通过自定义配置类的方式也是可以的,但是相对yml配置来说还是有点麻烦的

            /**
             * 消费者配置
             */
            @Configuration
            public class KafkaConsumerConfig {
             
                /**
                 * 消费者配置
                 * @return
                 */
                public Map consumerConfigs(){
                    Map props = new HashMap<>();
                    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9002");
                    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
                    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                    return props;
                }
             
                @Bean
                public KafkaListenerContainerFactory> batchFactory() {
                    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
                    //并发数量
                    factory.setConcurrency(3);
                    //开启批量监听
                    factory.setBatchListener(true);
                    return factory;
                }
            }

            同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

            @KafkaListener(topics = {"test-topic"},containerFactory = "batchFactory")
            public void consumer(List message){
            	System.out.println("接收到的消息:" + message);
            }

            四、Kafka参数调优

            一、Consumer参数说明

            1、enable.auto.commit

            该属性指定了消费者是否自动提交偏移量,默认值是true。

            为了尽量避免出现重复数据(假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费)和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。

            如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。

            2、auto.commit.interval.ms

            自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

            3、手动提交:commitSync/commitAsync

            手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

            相同点:都会将本次poll的一批数据最大的偏移量提交。

            不同点:commitSync会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败,导致重复消费。

            4、max.poll.records

            Consumer每次调用poll()时取到的records的最大数。

            二、Kafka消息积压、消费能力不足怎么解决?

            如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,同时相应的增加消费者实例,消费者数=分区数(二者缺一不可)。

            如果是下游的数据处理不及时,则可以提高每批次拉取的数量,通过max.poll.records这个参数可以调整。

            单个消费者实例的消费能力提升,可以用多线程/线程池的方式并发消费提高单机的消费能力。

            三、Kafka消费者如何进行流控?

            将自动提交改成手动提交(enable.auto.commit=false),每次消费完再手动异步提交offset,之后消费者再去Broker拉取新消息,这样可以做到按照消费能力拉取消息,减轻消费者的压力。