先给答案:
// 批量消费配置: 1批量, 2手动提交 factory.setBatchListener(true); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); // 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时 // 一次poll操作最大获取的记录数量 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500 // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者 // 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500 // 在消费者方法中注入acknowledgment并在执行完业务逻辑后手动调用确认方法 acknowledgment.acknowledge();
某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同一个对象的处理。其实只需要所有表插入完毕后,一次处理该对象即可。
mysql --> canal --> Kafka --> Spring-Kafka消费者 --> 下游接口
优化Spring-Kafka消费者,从单条处理改为多条处理,然后在消费者中合并相同的对象,从而达到一个对象只处理一次(最多两次)的目的。为什么有可能是两次,因为分批的时候很容易将同一个对象的多条消息分到上下两批,这样这个对象将会被处理两次;那为什么不会处理三次?其实也能够制造出来上次的情况,就是分批的大小如果很小就容易出现三次,甚至更多次。所以通常情况我们将分批大小设置得要比表数量多。
举个例子:一个对象的创建有10张表的插入消息,而分批大小设置成4,此时10条消息就被4这个批大小拆分成3批,每批处理一次该对象,那么该对象就会被处理3次。所以分批大小是一个重要的参数,其值的设定通常要大,但再大也不可避免被分成两批的情况。
Spring-kafka从1.1版本开始就支持了批量消费,需要在ContainerFactory中设置batchListener=true
同时设置消费者参数 max.poll.records 来控制一批的最大记录数量,该参数的缺省值为500。
AbstractKafkaListenerContainerFactory类的源码如下:
/** * Set to true if this endpoint should create a batch listener. * @param batchListener true for a batch listener. * @since 1.1 */ public void setBatchListener(Boolean batchListener) { this.batchListener = batchListener; }
可以从javaDoc的@since得知该功能从1.1版本就存在了,所以只要Spring-Kafka的版本高于1.1的都支持批量消费功能。这个参数是搭配@KafkaListener来使用的。单条消费的写法如下:
@KafkaListener(id = "", groupId = "", topics = {}) public void listen(ConsumerRecorddata) { }
以上只能一次处理一条,并不能将多条消息统筹处理,所以使用以下的批量消费的写法:
@KafkaListener(id = "", groupId = "", topics = {}, containerFactory = "") public void listen(List> datas) { }
从以上代码可以看出,批量消费仅仅是将参数的ConsumerRecord类型改为List
这样就行了吗?如果批次较大,对这一批数据的处理时间较长,就容易造成丢数据。场景如下:
1、开启自动提交offset,
2、提交offset的时间间隔为1s,
3、处理这一批数据耗时为2s。
还需要发生以下条件才能导致丢消息:
1、“自动提交offset的时间”到了且成功执行了offset的提交
2、此后几乎同时程序发生了严重的错误导致进程退出(注意此时消费者的代码逻辑并未执行完毕)
那么消息就会被丢失,因为Kafka收到了offset的提交,所以Kafka认为这一批消息已经处理成功了,但程序实际并未处理成功,等到下次启动程序,将从Kafka记录的offset开始消费,“记录的offset”就是发生异常退出前“提交的offset”。所以上次异常退出时刻的那一批消息就被丢失了,不会再被消费到。
举例:
假设消费者这一批消费到的消息offset编码列表为5,6,7。而自动提交offset时候会将7提交给Kafka,表示下一个消息将从8开始消费。但567这3条消息在消费者进程中还没来得及处理完毕就被意外终止了。等到人工处理错误重新启动程序,将从8开始消费,因为Kafka认为567已经处理过了,但实际567并没有成功处理,所以就会丢失567这一批的消息。
在进一步,如何防止消息丢失呢?答案是手动提交offset,同样Spring-Kafka已经提供了支持,其实Spring-Kafka只是对原生Kafka的包装,最核心的还是原生Kafka支持手动提交offset的能力。
上干货,Spring-Kafka中有一个类很有用:AcknowledgingMessageListener,这个类就是为了支持手动ack消息的,也即是手动提交offset,只是Spring将“手动提交offset”这个概念包装成了“确认消息”,里面有一个方法:
/** * Invoked with data from kafka. * @param data the data to be processed. * @param acknowledgment the acknowledgment. */ @Override void onMessage(ConsumerRecorddata, Acknowledgment acknowledgment);
这个类是设计成去实现它从而获取手动提交offset的能力,但我们还可以简化,结合之前的@KafkaListener的方法,我们将Acknowledgment acknowledgment参数放在@KafkaListener的方法中,Spring就能够将Acknowledgment对象传递进来,从而我们可以自己控制何时“确认消息”。当然仅有这步还不够,还需要告诉Spring-Kafka我需要手动提交offset,通过一个简单的设置即可:
@Bean(name = "batch_and_manual_ack_ContainerFactory") public KafkaListenerContainerFactory> batch_and_manual_ack_ContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 批量消费配置: 1批量, 2手动提交 factory.setBatchListener(true); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; }
其中最重要的一句是:.setAckMode(AckMode.MANUAL_IMMEDIATE);,该配置的缺省值是AckMode.BATCH。可以从ContainerProperties类的源码找到缺省值:
/** * The ack mode to use when auto ack (in the configuration properties) is false. *
要想提高吞度量,须要设置一下几个参数:
max.poll.records
一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。
fetch.min.bytes
一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 缺省是1B
fetch.max.wait.ms
一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者,缺省是500。
需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
@Bean(name = "batch_and_manual_ack_ContainerFactory") public KafkaListenerContainerFactory> batch_and_manual_ack_ContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); // 批量消费配置: 1批量, 2手动提交 factory.setBatchListener(true); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory (consumerConfigs()); } public Map consumerConfigs() { Map propsMap = new HashMap (); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); // 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时 // 一次poll操作最大获取的记录数量 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500 // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者 // 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500 return propsMap; } @KafkaListener(groupId = "junit-test-group", containerFactory = "batch_and_manual_ack_ContainerFactory", topics = {"test"}) public void test_batchConsume(List > datas, Acknowledgment acknowledgment) { System.out.println(new Date() + " datas = " + datas.size()); System.out.println(new Date() + " collect = " + datas.stream().map(t -> t.offset()).collect(Collectors.toList())); // 最后一定要提交进度 (用于持久化进度到Kafka) acknowledgment.acknowledge(); }
将以上代码放在某个Spring的类里,然后修改配置即可使用