一、基本概念
1、概念
Spring Cloud Stream是构建消息驱动微服务的框架。
2、解决问题
消息中间件太多了,RabbitMQ
、kafka
、ActiveMQ
,RocketMQ
等。导致学习使用成本过大。
3、作用
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。但Stream只支持RabbitMQ
、kafka
。
4、工作原理
应用程序通过 inputs 和 outputs 来与 Spring Cloud Stream 中的 binder 对象交互。而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。
Binder
- Input:消费者
- Output:生产者
5、工作流程
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可以理解为参照对象是Spring Cloud Stream 自身,从Stream发布消息就是输出,接受消息就是输入
6、常用的API和注解
二、模块构建
1、消息的生产者
(1)pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--如果使用其他消息中间件,更换这个依赖即可-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
(2)application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置,会爆红(不管)
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
(3)主启动类无需其他配置
(4)ServiceImpl
接口自己写
//不再试用@Service了
@EnableBinding(Source.class)//定义消息的推送管道,Source是源
public class MessageProviderImpl implements MessageProdvider {
@Resource
private MessageChannel output;//消息发送管道
@Override
public String send() {
//流水号
String serial = UUID.randomUUID().toString();
String value = "表头"+serial;
Message<String> build = MessageBuilder
.withPayload(serial)//设置消息内容
.setHeader("partitionKey",value)//设置表头
.build();//构建
output.send(build);//发送消息
System.out.println(value);
//因为构建了message对象,吧他传给source,然后在给MQ,所以没用到返回值
return null;
}
}
(5)controller
@RestController
public class SendMessageController {
@Resource
private MessageProdvider messageProdvider;
@GetMapping(value = "/sendMessage")
public String sendMessage(){
return messageProdvider.send();
}
}
(6)测试
访问http://localhost:8801/sendMessage
即可在http://localhost:15672/观察到消息
2、消息消费者
(1)application.yml
pom和主启动类与生产者相同
只需将output改为input,其他就是端口、微服务名、instance-id自己改喽。
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
(2)controller
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String servicePort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1,收到" +
message.getPayload() + message.getHeaders()+"\t"+servicePort);
}
}
(3)测试
当访问http://localhost:8801/sendMessage时
8802输出
消费者1,收到b3736ba1-c2bf-4df1-b75b-b3e6dd141f52 8802
三、消息重复消费问题(重点)
同组微服务不会出现重复消费问题,不设置分组的话默认不会出现同组的情况。
可以通过application.yml配置分组
spring:
cloud:
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: group1 #设置分组,解决重复消费问题
四、消息持久化(重点)
当8802未启动时,8801生产了几条消息:
- 当8802没有设定分组时,8802启动后,不会消费以存在的消息
- 当8802有设定分组时,8802启动后,自动获取未消费的消息
总结:
也就是,当我们没有配置分组时,会出现消息漏消费的问题
而配置分组后,我们可以自动获取未消费的数据