首发于微服务

Spring Cloud Stream

本文参考文章 Spring Cloud Learning 以及 Spring Cloud Stream 来学习Spring Cloud Stream.

Spring Cloud Stream是用来为微服务应用构建消息驱动能力的框架。它为消息中间件产品提供了个性化的自动化配置实现,引入了发布-订阅消费组以及消息分区这三个核心概念。Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。它有效简化开发人员使用消息中间件的复杂度,从而更关注于核心业务逻辑。

目前,Spring Cloud stream只支持RabbitMQ和kafka。

我们使用Spring Cloud Stream结合RabbitMQ来创建project stream-rabbitmq来实践。

引入依赖:build.gradle

	implementation 'org.springframework.boot:spring-boot-starter-amqp'
	implementation 'org.springframework.cloud:spring-cloud-stream'
	implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'

上面依赖“spring-cloud-stream-binder-rabbit”等价于“spring-cloud-starter-stream-rabbit”,Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。

创建rabbitMQ消费者类:SinkReceiver

@EnableBinding(Sink.class)
public class SinkReceiver {
  private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

  @StreamListener(Sink.INPUT)
  public void receive(Object payLoad){
    logger.info("Received: " + payLoad);
  }
}

@EnableBinding注解用来指定一个或多个定义了@Input@Output注解的接口,以此实现对消息通道(Channel)的绑定。

知识点插入==========>

Spring Cloud Stream提供了对输入、输出及输入输出组合接口绑定的默认是实现。 Sink类默认绑定输入消息通道的接口;Source类默认绑定输出消息通道的接口;Processor类默认绑定输入、输出消息通道的接口。我们也可以自己通过@Input@Output注解来定义绑定消息通道的接口。当绑定多个接口时,@EnableBinding(value = {Sink.class, Source.class})。

下面是默认的接口类。

public interface Sink {
	//Input channel name.
	String INPUT = "input";

	//@return input channel. Binding to specified channel and return the channel
	@Input(Sink.INPUT)
	SubscribableChannel input();
}

public interface Source {
	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();
}

public interface Processor extends Source, Sink {}

<=================

@StreamListener注解用来将被修饰方法注册为消息中间件上数据流的事件监听器,属性指定监听的消息通道名。

下面是自动创建的主程序,启动主程序。

@SpringBootApplication
public class StreamRabbitmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(StreamRabbitmqApplication.class, args);
	}

}

启动主程序后,从log发现创建queue input.anonymous.F7egCTmeR4OP0WbMlQG94w来绑定到input channel并创建到localhost:5672的连接。

Registering MessageChannel input
Registering MessageChannel nullChannel
Registering MessageChannel errorChannel
Registering MessageHandler errorLogger
declaring queue for inbound: input.anonymous.F7egCTmeR4OP0WbMlQG94w, bound to: input
Attempting to connect to: [localhost:5672]

前提:你已经安装RabbitMQ并已经启动。安装RabbitMQ参见文章RaibbitMQ,启动RabbitMQ使用下面的命令。

//查看RabbitMQ的状态
service rabbitmq-server status   #Active: active (running) 说明处于运行状态

service rabbitmq-server start    # 启动
service rabbitmq-server stop     # 停止
service rabbitmq-server restart  # 重启 

访问http://localhost:15672并以guest作为用户名和密码登录进入RabbitMQ控制台。 从RabbitMQ控制台可以发现该queue并且创建exchange input来绑定该queue。

我们进入该queue并publish message来测试。

从程序log文件中可发现,消息被接收到。

到目前为止,我们并没有配置RabbitMQ的信息,而是使用Spring Boot对RabbitMQ的默认配置。当然,也可以使用Spring Boot支持的各种方式来修改配置。

上面使用RabbitMQ控制台来发送消息,我们也可以编写sender来发送消息进行测试。

SinkReceiverTests:

@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {
  @Autowired
  private SinkSender sinkSender;

  @Test
  public void sinkSenderTester(){
    sinkSender.output().send(MessageBuilder.withPayload("Test Message From Stream-RabbitMQ").build());
  }

  public interface SinkSender {
    String OUTPUT = "input";

    @Output(SinkSender.OUTPUT)
    MessageChannel output();
  }
}

这里定义SinkSender来绑定通道“input”作为输出通道,与SinkReceiver形成生产者-消费者关系。在执行主程序后,执行上面的test程序。SinkReceiver会接收到上面的消息。

Binder

下图是Spring Cloud Stream应用模型的结构图。App与消息中间件通过Binder来绑定,binder起到隔离的作用,开发者不需要关心消息中间件的实现细节,只需使用binder提供的概念去实现。

发布-订阅模式

Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式。当消息发送到消息中间件,消息通过共享的Topic主题进行广播,订阅该Topic的消息消费者会接收到消息。Topic是Spring Cloud Stream的抽象概念,对应RabbitMQ中的exchange及kafka中的topic。

上例中,程序启动时,自动生成RabbitMQ的exchange“input”,并将生成的匿名queue绑定到该exchange。由于binder的隔离作用,我们没有感知它们的存在,只知道自己指向Binder的输入或是输出通道。若多个消费者绑定到input topic,这消息会被传播给每个消费者。

相对于点对点的消息通信来说,Spring Cloud Stream降低了生产者与消费者之间的耦合,只需绑定到特定的输入通道或输出通道即可。

当我们启动stream-rabbitmq程序2次来生成2个instance时会发现input exchange绑定到2个queue。此时在input exchange上publish message会被2个queue同时接收到。


消费组

默认情况下,生产者发出消息到绑定的通道,该消息会被复制成多个副本被绑定到通道中的所有消息消费者处理。但有时候,我们只希望消息被消费一次。消费组用来解决该问题,消息只会被消费组中的一个消息消费者使用。

在消费者端,使用spring.cloud.stream.bindings.<channelName>.group属性来为应用程序指定组名并使用input.destination来指定主题名。这样,消息只会被同一个组中的一个消费者使用,或者A,或者B。

spring.cloud.stream.bindings.input.group=eureka-consumer-input-group

当我们重新启动stream-rabbitmq2次时,发现input exchange绑定到group eureka-consumer-input-group,而且group下存在2个channel。

此时,在input exchange发送消息,消息会随机分配到组里的任意一个channel处理。

消息分区

消费组无法控制消息被哪个实例消费。但有些业务场景,需要具有相同特征的消息被同一个实例消费,如监控服务中的统计业务。Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展。

为了实现消息分区,在消费者端添加如下配置:

spring.cloud.stream.bindings.input.consumer.partitioned=true  //开启消息分区功能
spring.cloud.stream.instanceCount=2 //指定消费者实例总数
spring.cloud.stream.instanceIndex=0 //当前实例的索引号,范围为0到instanceCount-1

当我们重新启动stream-rabbitmq 2次时,发现input exchange绑定到eureka-consumer-input-group-0,而且group下存在2个channel。

同时,在消息生产者端添加如下配置:

//分区键的表达式规则
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.producer.partitionCount=2 //消息分区的数量

知识点插入===========>

本文使用一个RabbitMQ实例,但为了实现高并发和高可用,我们可以使用多个RabbitMQ。这时,需要实现多个RabbitMQ instance之间的负载均衡。

负载均衡(Load balance)是一种计算机网络技术,在多个计算机资源间分配负载,以达到最佳资源使用、最大化吞吐率、最小响应时间以及避免过载的目的。

负载均衡通常分为软件负载均衡和硬件负载均衡两种。软件负载均衡可使用HAProxy,LVS,硬件负载均衡可使用F5。

负载均衡算法:轮询法,随机法,ip-hash法,加权轮询法,加权随机法,最小连接数法。

<==================

编辑于 2020-08-19 13:44