Spring Cloud Stream
本文参考文章 Spring Cloud Learning 以及 Spring Cloud Stream 来学习Spring Cloud Stream.
Spring Cloud Stream是用来为微服务应用构建消息驱动能力的框架。它为消息中间件产品提供了个性化的自动化配置实现,引入了发布-订阅、消费组以及消息分区这三个核心概念。Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。它有效简化开发人员使用消息中间件的复杂度,从而更关注于核心业务逻辑。
目前,Spring Cloud stream只支持RabbitMQ和kafka。
我们使用Spring Cloud Stream结合RabbitMQ来创建project stream-rabbitmq来实践。
引入依赖:build.gradle
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
上面依赖“spring-cloud-stream-binder-rabbit”等价于“spring-cloud-starter-stream-rabbit”,Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。
创建rabbitMQ消费者类:SinkReceiver
@EnableBinding(Sink.class)
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payLoad){
logger.info("Received: " + payLoad);
}
}
@EnableBinding
注解用来指定一个或多个定义了@Input
或@Output
注解的接口,以此实现对消息通道(Channel)的绑定。
知识点插入==========>
Spring Cloud Stream提供了对输入、输出及输入输出组合接口绑定的默认是实现。 Sink类默认绑定输入消息通道的接口;Source类默认绑定输出消息通道的接口;Processor类默认绑定输入、输出消息通道的接口。我们也可以自己通过@Input
和@Output
注解来定义绑定消息通道的接口。当绑定多个接口时,@EnableBinding(value = {Sink.class, Source.class})。
下面是默认的接口类。
public interface Sink {
//Input channel name.
String INPUT = "input";
//@return input channel. Binding to specified channel and return the channel
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Processor extends Source, Sink {}
<=================
@StreamListener
注解用来将被修饰方法注册为消息中间件上数据流的事件监听器,属性指定监听的消息通道名。
下面是自动创建的主程序,启动主程序。
@SpringBootApplication
public class StreamRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqApplication.class, args);
}
}
启动主程序后,从log发现创建queue input.anonymous.F7egCTmeR4OP0WbMlQG94w来绑定到input channel并创建到localhost:5672的连接。
Registering MessageChannel input
Registering MessageChannel nullChannel
Registering MessageChannel errorChannel
Registering MessageHandler errorLogger
declaring queue for inbound: input.anonymous.F7egCTmeR4OP0WbMlQG94w, bound to: input
Attempting to connect to: [localhost:5672]
前提:你已经安装RabbitMQ并已经启动。安装RabbitMQ参见文章RaibbitMQ,启动RabbitMQ使用下面的命令。
//查看RabbitMQ的状态
service rabbitmq-server status #Active: active (running) 说明处于运行状态
service rabbitmq-server start # 启动
service rabbitmq-server stop # 停止
service rabbitmq-server restart # 重启
访问http://localhost:15672并以guest作为用户名和密码登录进入RabbitMQ控制台。 从RabbitMQ控制台可以发现该queue并且创建exchange input来绑定该queue。
我们进入该queue并publish message来测试。
从程序log文件中可发现,消息被接收到。
到目前为止,我们并没有配置RabbitMQ的信息,而是使用Spring Boot对RabbitMQ的默认配置。当然,也可以使用Spring Boot支持的各种方式来修改配置。
上面使用RabbitMQ控制台来发送消息,我们也可以编写sender来发送消息进行测试。
SinkReceiverTests:
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {
@Autowired
private SinkSender sinkSender;
@Test
public void sinkSenderTester(){
sinkSender.output().send(MessageBuilder.withPayload("Test Message From Stream-RabbitMQ").build());
}
public interface SinkSender {
String OUTPUT = "input";
@Output(SinkSender.OUTPUT)
MessageChannel output();
}
}
这里定义SinkSender来绑定通道“input”作为输出通道,与SinkReceiver形成生产者-消费者关系。在执行主程序后,执行上面的test程序。SinkReceiver会接收到上面的消息。
Binder
下图是Spring Cloud Stream应用模型的结构图。App与消息中间件通过Binder来绑定,binder起到隔离的作用,开发者不需要关心消息中间件的实现细节,只需使用binder提供的概念去实现。
发布-订阅模式
Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式。当消息发送到消息中间件,消息通过共享的Topic主题进行广播,订阅该Topic的消息消费者会接收到消息。Topic是Spring Cloud Stream的抽象概念,对应RabbitMQ中的exchange及kafka中的topic。
上例中,程序启动时,自动生成RabbitMQ的exchange“input”,并将生成的匿名queue绑定到该exchange。由于binder的隔离作用,我们没有感知它们的存在,只知道自己指向Binder
的输入或是输出通道。若多个消费者绑定到input topic,这消息会被传播给每个消费者。
相对于点对点的消息通信来说,Spring Cloud Stream降低了生产者与消费者之间的耦合,只需绑定到特定的输入通道或输出通道即可。
当我们启动stream-rabbitmq程序2次来生成2个instance时会发现input exchange绑定到2个queue。此时在input exchange上publish message会被2个queue同时接收到。
消费组
默认情况下,生产者发出消息到绑定的通道,该消息会被复制成多个副本被绑定到通道中的所有消息消费者处理。但有时候,我们只希望消息被消费一次。消费组用来解决该问题,消息只会被消费组中的一个消息消费者使用。
在消费者端,使用spring.cloud.stream.bindings.<channelName>.group属性来为应用程序指定组名并使用input.destination来指定主题名。这样,消息只会被同一个组中的一个消费者使用,或者A,或者B。
spring.cloud.stream.bindings.input.group=eureka-consumer-input-group
当我们重新启动stream-rabbitmq2次时,发现input exchange绑定到group eureka-consumer-input-group,而且group下存在2个channel。
此时,在input exchange发送消息,消息会随机分配到组里的任意一个channel处理。
消息分区
消费组无法控制消息被哪个实例消费。但有些业务场景,需要具有相同特征的消息被同一个实例消费,如监控服务中的统计业务。Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展。
为了实现消息分区,在消费者端添加如下配置:
spring.cloud.stream.bindings.input.consumer.partitioned=true //开启消息分区功能
spring.cloud.stream.instanceCount=2 //指定消费者实例总数
spring.cloud.stream.instanceIndex=0 //当前实例的索引号,范围为0到instanceCount-1
当我们重新启动stream-rabbitmq 2次时,发现input exchange绑定到eureka-consumer-input-group-0,而且group下存在2个channel。
同时,在消息生产者端添加如下配置:
//分区键的表达式规则
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.producer.partitionCount=2 //消息分区的数量
知识点插入===========>
本文使用一个RabbitMQ实例,但为了实现高并发和高可用,我们可以使用多个RabbitMQ。这时,需要实现多个RabbitMQ instance之间的负载均衡。
负载均衡(Load balance)是一种计算机网络技术,在多个计算机资源间分配负载,以达到最佳资源使用、最大化吞吐率、最小响应时间以及避免过载的目的。
负载均衡通常分为软件负载均衡和硬件负载均衡两种。软件负载均衡可使用HAProxy,LVS,硬件负载均衡可使用F5。
负载均衡算法:轮询法,随机法,ip-hash法,加权轮询法,加权随机法,最小连接数法。
<==================