目录
一、前言
二、下载Redis及引入Redis依赖
三、配置消费者及消费组
四,配置Redsi及初始化stream、消费组、消费者
相较于 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量级的消息队列中间件,Redis在需求量小的情况下,也可以作为消息中间件来使用。Redis作为消息队列使用,常见的有List、发布/订阅模型以及在Redis5以后出现的Stream。Stream相较于前两种,最大的优点就是可以持久化。
下载Redis5以上的客户端,win版下载地址
pom中引入redis依赖
org.springframework.boot spring-boot-starter-data-redis
application.yml中配置stream key,消费组和消费者可配置多个。
redis: mq: streams: # key名称 - name: RARSP:REPORT:READ:VS groups: # 消费组名称 - name: VS_GROUP 消费者名称 consumers: VS-CONSUMER-A,VS-CONSUMER-B # key2 - name: RARSP:REPORT:READ:BLC groups: - name: BLC_GROUP consumers: BLC-CONSUMER-A,BLC-CONSUMER-B # key3 - name: RARSP:REPORT:READ:HD groups: - name: HD_GROUP consumers: HD-CONSUMER-A,HD-CONSUMER-B
自定义三个实体类RedisMqGroup、RedisMqStream、RedisMq,对应application.yml中的配置
public class RedisMqGroup { private String name; private String[] consumers; public String getName() { return name; } public void setName(String name) { this.name = name; } public String[] getConsumers() { return consumers; } public void setConsumers(String[] consumers) { this.consumers = consumers; } }
public class RedisMqStream { public String name; public Listgroups; public String getName() { return name; } public void setName(String name) { this.name = name; } public List getGroups() { return groups; } public void setGroups(List groups) { this.groups = groups; } }
@EnableConfigurationProperties @Configuration @ConfigurationProperties(prefix = "redis.mq") public class RedisMq { public Liststreams; public List getStreams() { return streams; } public void setStreams(List streams) { this.streams = streams; } }
@Slf4j @Configuration public class RedisConfiguration { @Resource private RedisTemplate redisTemplate; @Resource private RedisStreamUtil redisStreamUtil; @Resource private RedisMq redisMq; /** * 处理乱码 * @return */ @Bean public RedisTemplate redisTemplateInit() { // key序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); //val实例化 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); //value hashmap序列化 redisTemplate.setHashValueSerializer(new StringRedisSerializer()); //key haspmap序列化 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); return redisTemplate; } @Bean public Listsubscription(RedisConnectionFactory factory){ List resultList = new ArrayList<>(); AtomicInteger index = new AtomicInteger(1); int processors = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), r -> { Thread thread = new Thread(r); thread.setName("async-stream-consumer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; }); StreamMessageListenerContainer.StreamMessageListenerContainerOptions > options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() // 一次最多获取多少条消息 .batchSize(5) .executor(executor) .pollTimeout(Duration.ofSeconds(1)) // .errorHandler() .build(); for (RedisMqStream redisMqStream :redisMq.getStreams()) { String streamName = redisMqStream.getName(); RedisMqGroup redisMqGroup = redisMqStream.getGroups().get(0); initStream(streamName,redisMqGroup.getName()); var listenerContainer = StreamMessageListenerContainer.create(factory,options); // 手动ask消息 Subscription subscription = listenerContainer.receive(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]), StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener()); // 自动ask消息 /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]), StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/ resultList.add(subscription); listenerContainer.start(); } return resultList; } private void initStream(String key, String group){ boolean hasKey = redisStreamUtil.hasKey(key); if(!hasKey){ Map map = new HashMap<>(1); map.put("field","value"); //创建主题 String result = redisStreamUtil.addMap(key, map); //创建消费组 redisStreamUtil.createGroup(key,group); //将初始化的值删除掉 redisStreamUtil.del(key,result); log.info("stream:{}-group:{} initialize success",key,group); } } }
Redis工具类
@Component public class RedisStreamUtil { @Resource private StringRedisTemplate stringRedisTemplate; /** * 创建消费组 * @param key stream-key值 * @param group 消费组 * @return java.lang.String */ public String createGroup(String key, String group){ return stringRedisTemplate.opsForStream().createGroup(key, group); } /** * 获取消费者信息 * @param key stream-key值 * @param group 消费组 * @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers */ public StreamInfo.XInfoConsumers queryConsumers(String key, String group){ return stringRedisTemplate.opsForStream().consumers(key, group); } /** * 添加Map消息 * @param key stream对应的key * @param value 消息数据 * @return */ public String addMap(String key, Mapvalue){ return stringRedisTemplate.opsForStream().add(key, value).getValue(); } /** * 读取消息 * @param: key * @return java.util.List > */ public List > read(String key){ return stringRedisTemplate.opsForStream().read(StreamOffset.fromStart(key)); } /** * 确认消费 * @param key * @param group * @param recordIds * @return java.lang.Long */ public Long ack(String key, String group, String... recordIds){ return stringRedisTemplate.opsForStream().acknowledge(key, group, recordIds); } /** * 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁 * @param: key * @param: recordIds * @return java.lang.Long */ public Long del(String key, String... recordIds){ return stringRedisTemplate.opsForStream().delete(key, recordIds); } /** * 判断是否存在key * @param key * @return */ public boolean hasKey(String key){ Boolean aBoolean = stringRedisTemplate.hasKey(key); return aBoolean==null?false:aBoolean; } }
五、生产消息、消费消息
生产消息代码
Mapmessage = new HashMap<>(2); message.put("body","消息主题" ); message.put("sendTime", "消息发送时间"); String streamKey = "";//stream的key值,对应application.yml中配置的 redisStreamUtil.addMap(streamKey, message);
消费消息
@Slf4j @Component public class ReportReadMqListener implements StreamListener> { @Override public void onMessage(MapRecord message) { // stream的key值 String streamKey = message.getStream(); //消息ID RecordId recordId = message.getId(); //消息内容 Map msg = message.getValue(); //TODO 处理逻辑 //逻辑处理完成后,ack消息,删除消息,group为消费组名称 redisStreamUtil.ack(streamKey,group,recordId.getValue()); redisStreamUtil.del(streamKey,recordId.getValue()); } }