相关推荐recommended
SpringBoot+Redis stream实现消息队列
作者:mmseoamin日期:2023-12-14

目录

一、前言

二、下载Redis及引入Redis依赖

三、配置消费者及消费组

四,配置Redsi及初始化stream、消费组、消费者


一、前言

相较于 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量级的消息队列中间件,Redis在需求量小的情况下,也可以作为消息中间件来使用。Redis作为消息队列使用,常见的有List、发布/订阅模型以及在Redis5以后出现的Stream。Stream相较于前两种,最大的优点就是可以持久化。

二、下载Redis及引入Redis依赖

下载Redis5以上的客户端,win版下载地址

pom中引入redis依赖


   org.springframework.boot
   spring-boot-starter-data-redis

三、配置消费者及消费组

application.yml中配置stream key,消费组和消费者可配置多个。

redis:
  mq:
    streams:
      # key名称
      - name:  RARSP:REPORT:READ:VS
        groups:
          # 消费组名称
          - name: VS_GROUP
            消费者名称
            consumers: VS-CONSUMER-A,VS-CONSUMER-B
      # key2
      - name: RARSP:REPORT:READ:BLC
        groups:
          - name: BLC_GROUP
            consumers: BLC-CONSUMER-A,BLC-CONSUMER-B
      # key3
      - name: RARSP:REPORT:READ:HD
        groups:
          - name: HD_GROUP
            consumers: HD-CONSUMER-A,HD-CONSUMER-B
     

自定义三个实体类RedisMqGroup、RedisMqStream、RedisMq,对应application.yml中的配置

public class RedisMqGroup {
    private String name;
    private String[] consumers;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String[] getConsumers() {
        return consumers;
    }
    public void setConsumers(String[] consumers) {
        this.consumers = consumers;
    }
}
public class RedisMqStream {
    public    String name;
    public   List groups;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public List getGroups() {
        return groups;
    }
    public void setGroups(List groups) {
        this.groups = groups;
    }
}
@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
public class RedisMq {
    public   List streams;
    public List getStreams() {
        return streams;
    }
    public void setStreams(List streams) {
        this.streams = streams;
    }
}

四,配置Redsi及初始化stream、消费组、消费者

@Slf4j
@Configuration
public class RedisConfiguration {
    @Resource
    private RedisTemplate redisTemplate;
    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisMq redisMq;
    /**
     * 处理乱码
     * @return
     */
    @Bean
    public RedisTemplate redisTemplateInit() {
        // key序列化
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //val实例化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //value hashmap序列化
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        //key haspmap序列化
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }
    @Bean
    public List subscription(RedisConnectionFactory factory){
        List resultList = new ArrayList<>();
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options =
                StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                 // 一次最多获取多少条消息
                .batchSize(5)
                .executor(executor)
                .pollTimeout(Duration.ofSeconds(1))
               // .errorHandler()
                .build();
        for (RedisMqStream redisMqStream :redisMq.getStreams()) {
            String streamName = redisMqStream.getName();
            RedisMqGroup redisMqGroup = redisMqStream.getGroups().get(0);
            initStream(streamName,redisMqGroup.getName());
            var listenerContainer = StreamMessageListenerContainer.create(factory,options);
            // 手动ask消息
            Subscription subscription = listenerContainer.receive(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());
            // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
            resultList.add(subscription);
            listenerContainer.start();
        }
        return resultList;
    }
    private void initStream(String key, String group){
        boolean hasKey = redisStreamUtil.hasKey(key);
        if(!hasKey){
            Map map = new HashMap<>(1);
            map.put("field","value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.createGroup(key,group);
            //将初始化的值删除掉
            redisStreamUtil.del(key,result);
            log.info("stream:{}-group:{} initialize success",key,group);
        }
    }
}

Redis工具类

 
@Component
public class RedisStreamUtil {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 创建消费组
     * @param key stream-key值
     * @param group 消费组
     * @return java.lang.String
     */
    public String createGroup(String key, String group){
        return stringRedisTemplate.opsForStream().createGroup(key, group);
    }
    /**
     * 获取消费者信息
     * @param key stream-key值
     * @param group 消费组
     * @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers
     */
    public StreamInfo.XInfoConsumers queryConsumers(String key, String group){
        return stringRedisTemplate.opsForStream().consumers(key, group);
    }
    /**
     * 添加Map消息
     * @param key stream对应的key
     * @param value 消息数据
     * @return
     */
    public String addMap(String key, Map value){
        return stringRedisTemplate.opsForStream().add(key, value).getValue();
    }
    /**
     * 读取消息
     * @param: key
     * @return java.util.List>
     */
    public List> read(String key){
        return stringRedisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    }
    /**
     * 确认消费
     * @param key
     * @param group
     * @param recordIds
     * @return java.lang.Long
     */
    public Long ack(String key, String group, String... recordIds){
        return stringRedisTemplate.opsForStream().acknowledge(key, group, recordIds);
    }
    /**
     * 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
     * @param: key
     * @param: recordIds
     * @return java.lang.Long
     */
    public Long del(String key, String... recordIds){
        return stringRedisTemplate.opsForStream().delete(key, recordIds);
    }
    /**
     * 判断是否存在key
     * @param key
     * @return
     */
    public boolean hasKey(String key){
        Boolean aBoolean = stringRedisTemplate.hasKey(key);
        return aBoolean==null?false:aBoolean;
    }
}

五、生产消息、消费消息

生产消息代码

Map message = new HashMap<>(2);         
 message.put("body","消息主题" );
 message.put("sendTime", "消息发送时间");
 String streamKey = "";//stream的key值,对应application.yml中配置的
 redisStreamUtil.addMap(streamKey, message);

消费消息

@Slf4j
@Component
public class ReportReadMqListener implements StreamListener> {
    @Override
    public void onMessage(MapRecord message) {
          // stream的key值
          String streamKey = message.getStream();
          //消息ID
          RecordId recordId = message.getId();
          //消息内容
          Map msg = message.getValue(); 
          //TODO 处理逻辑
         
          //逻辑处理完成后,ack消息,删除消息,group为消费组名称
          redisStreamUtil.ack(streamKey,group,recordId.getValue());
          redisStreamUtil.del(streamKey,recordId.getValue());
    }
    
}