本文是以 RocketMQ 为例讲解,点击此处了解SpringBoot整合RocketMQ
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。
我们都知道市面上有很多消息中间件,Sping Cloud Stream 为了可以集成各种各样的中间件,它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和output是channel,Binder则是消息中间件和通道之间的桥梁
通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。
Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.
4.0.0 org.example RocketMQDemo 1.0-SNAPSHOT 17 17 UTF-8 2.6.11 2021.0.4 2021.0.4.0 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true com.alibaba fastjson 1.2.83 com.alibaba.cloud spring-cloud-starter-stream-rocketmq org.apache.rocketmq rocketmq-client 4.9.4 org.apache.rocketmq rocketmq-remoting 4.9.4 org.springframework.boot spring-boot-dependencies ${spring.boot.version} pom import org.springframework.cloud spring-cloud-dependencies ${spring.cloud.version} pom import com.alibaba.cloud spring-cloud-alibaba-dependencies ${spring.cloud.alibaba} pom import
@Data @ToString @NoArgsConstructor @AllArgsConstructor public class UserEntity { private String name;//账号 private String pass;//密码 }
虽然在 SpringCloudStream 3.x 版本后是可以看到 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,但是不妨碍我们测试学习
package cn.mq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface MyChannel { String INPUT = "test-input"; String OUTPUT = "test-output"; /** * 这两个通道可能定义在两个不同的通道里面,这里为了方便放在同一个项目中演示 */ // 收(订阅频道/消息消费者) @Input(INPUT) SubscribableChannel input(); // 发(消息生产者) @Output(OUTPUT) MessageChannel output(); }
此处可以使用我们自定义的通道,也可以使用原装的 Sink.class
package cn.mq; import cn.entity.UserEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @Slf4j //@EnableBinding(Sink.class) @EnableBinding(MyChannel.class) public class ReceiveMQ { @StreamListener(MyChannel.INPUT) public void receive(UserEntity entity){ log.info("收到消费消息:{}",entity.toString()); } }
默认情况下,如果消费者是一个集群,此时,一条消息会被多次消费。通过消息分组,我们可以解决这个问题。
添加如下配置分组,放入组 g1:
spring.cloud.stream.bindings.test-input.group=g1 spring.cloud.stream.bindings.test-output.group=g1
package cn.controller; import cn.entity.UserEntity; import cn.mq.MyChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class MQController { @Autowired private MyChannel myChannel; @GetMapping("/test") public void test(){ UserEntity userEntity = new UserEntity("hello", "pass"); boolean send = myChannel.output().send(MessageBuilder.withPayload(userEntity).build()); log.info("发送消息:{},结果:{}",userEntity.toString(),send); } }
其中,MessageBuilder是Spring Integration中用于创建消息的工具类。以下是createMessage, fromMessage和withPayload方法的区别:
例如:Message
例如:Message
例如:Message 总的来说,这三个方法提供了灵活的方式来创建和修改消息,你可以根据具体的需求来选择使用哪一个。 由于 SpringCloudStream 3.x 版本后是 可以看到 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解。后续的版本更新中会逐渐替换成函数式的方式实现。 既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢? 通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。 不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:1.3.4 配置文件
spring:
application:
name: rokcet-mq-demo
cloud:
stream:
bindings: # 配置消息通道的信息
test-input: # 自定义消费 通道
destination: test-optic
group: test
binder: rocketmq
test-output: # 自定义发送 通道
destination: test-optic
group: test
binder: rocketmq
rocketmq:
binder:
name-server: ip:port
group: test #此处定义整体消费者组名字
1.4 Stream 3.x 之后操作
1.4.1 Stream 3.x 之后讲解
myTopic-in-0
myTopic-out-0
注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致
package cn.mq; import cn.entity.UserEntity; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import java.util.function.Consumer; @Configuration public class ReceiveMQ { @Bean public Consumer> myTopicC(){ return (data)->{ UserEntity user = data.getPayload(); MessageHeaders headers = data.getHeaders(); System.out.println("myTopicC 接收一条记录:" + user); System.out.println("getHeaders headerFor:" + headers.get("for")); }; } }
package cn.mq; import cn.entity.UserEntity; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.function.Supplier; @Configuration public class SendMQ { Integer i = 1; @Bean public Supplier> myTopicP() { return () -> { UserEntity entity = new UserEntity(); entity.setPass(i++ + ""); entity.setName(Thread.currentThread().getName()); System.out.println("myTopicP 发送一条记录:" + entity); return MessageBuilder .withPayload(entity) .build(); }; } }
这种方式定义 suppelier 会 默认1000ms 发送一次记录
可以修改:spring.cloud.stream.poller:fixedDelay: 延迟毫秒值
通过 StreamBridge 触发
package cn.controller; import cn.entity.UserEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class MQController { @Autowired private StreamBridge streamBridge; @GetMapping("/test") public void sendMsg() { UserEntity entity = new UserEntity("hello","world"); System.out.println("sendMsg 发送一条记录:" + entity); streamBridge .send( "myTopicP-out-0", MessageBuilder.withPayload(entity) .setHeader("for", "这是一个请求头~") .build()); } }
spring: cloud: stream: rocketmq: binder: name-server: localhost:9876 # -------------- 分割线 --------------- function: # 组装和绑定 # 手动时把 myTopicP 去掉 definition: myTopicC;myTopicP bindings: myTopicC-in-0: destination: my-topic group: test myTopicP-out-0: destination: my-topic
Function< String,String > 范型中有两个参数 :一个入参,一个出参,所以在Stream中可以用来作于一个消息中转站来使用。相当于 top-1 接受到消息 但是我不想处理 我对其数据进行一次处理 发送到 top-2 通道,交给top-2 进行数据的最终处理。
采用手动触发示例,在上面改造测试:
@Bean public ConsumertestFunctionQ(){ return (data)->{ System.out.println("testFunctionQ 消息中转后接收一条记录:" + data); }; } @Bean public Function testFunction() { return value -> { System.out.println("中转 testFunction: " + value); value.setPass(value.getPass().toUpperCase()); value.setName(value.getName().toUpperCase()); return value; }; }
配置文件:
spring: application: name: rokcet-mq-demo cloud: stream: bindings: myTopicP-out-0: destination: test-topic testFunction-in-0: destination: test-topic group: my_input_group testFunction-out-0: destination: test-topic-Q testFunctionQ-in-0: destination: test-topic-Q group: my_input_group-Q rocketmq: binder: name-server: localhost:9876 group: test function: definition: testFunction;testFunctionQ
spring.cloud.function.definition 是一个配置属性,用于指定 Spring Cloud Function 应用程序中的函数定义。
这个属性的值是一个以 逗号分隔(如果用逗号分隔有顺序问题,还是最好用分号分隔)的字符串,表示要使用的函数、消费者(Consumer)或生产者(Supplier)的名称。
在 Spring Cloud Stream 中,这个属性用于将函数、消费者或生产者与消息队列(如 RabbitMQ、Kafka 等)进行绑定。当指定为 Supplier 时,它将作为消息队列的生产者,负责生成并发送消息;当指定为 Consumer 时,它将作为消息队列的消费者,负责接收并处理消息。
例如,假设有一个名为 process 的函数,你可以通过以下配置将其作为消费者与消息队列进行绑定:
spring.cloud.function.definition=process
这样,process 函数将作为消息队列的消费者,接收并处理来自队列的消息。同样,可以将 Supplier 与消息队列进行绑定,作为生产者生成并发送消息。
spring.cloud.stream.binders和spring.cloud.stream.bindings都是Spring Cloud Stream的配置属性,但它们的用途是不同的。
例如,如果使用的是 RabbitMQ,你需要在这里配置 RabbitMQ 的主机名、端口、用户名和密码等信息。可以配置多个binder,每个binder对应一个消息中间件。
简单来说,spring.cloud.stream.binders是用来配置消息中间件的,而spring.cloud.stream.bindings是用来配置消息通道的。
spring: cloud: stream: # 如果你项目里只对接一个中间件,那么不用定义binders # 当系统要定义多个不同消息中间件的时候,使用binders定义 binders: my-rabbit: type: rabbit # 消息中间件类型 environment: # 连接信息 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 添加coupon - Producer addCoupon-out-0: destination: request-coupon-topic content-type: application/json binder: my-rabbit # 添加coupon - Consumer addCoupon-in-0: destination: request-coupon-topic content-type: application/json # 消费组,同一个组内只能被消费一次 group: add-coupon-group binder: my-rabbit
在Spring Cloud Stream中,发送者(Producer)不需要分组,只有消费者(Consumer)需要分组。
分组的主要目的是为了实现消息的广播或者分区。当多个消费者在同一个组中时,消息会被任何一个消费者消费,但不会被同一组的所有消费者消费,这就实现了消息的负载均衡。如果每个消费者有自己的组,那么每个消费者都会收到一份消息的拷贝,这就实现了消息的广播。
spring.cloud.stream.rocketmq.binder.group 是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。
spring.cloud.stream.bindings.通道名字.group 是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。
总的来说,这两个属性都是用于设置消费组名的,但是作用范围不同,一个是全局的,一个是针对具体通道的。
报错:Property 'group' is required - producerGroup
这时候就需要在 spring.cloud.stream.rocketmq.binder.group属性中设置值,就不会报错了
spring.cloud.stream.bindings是Spring Cloud Stream的核心配置属性,用于定义消息通道的绑定和配置。
spring.cloud.stream.rocketmq.bindings是Spring Cloud Stream与RocketMQ集成时的配置属性,用于定义RocketMQ消息通道的绑定和配置。
具体区别如下:
上一篇:MySQL中的IF语句使用