SpringCloud Stream详解

目录

为什么需要SpringCloud Stream消息驱动呢?

Binder

案例

消息驱动之生产者

消息驱动之消费者

分组消费与持久化

为什么需要SpringCloud Stream消息驱动呢?

        比如说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic、partitions分区,这些中间件的差异性导致在实际项目开发中给我们造成了一定的困扰,我们如果用了两种消息队列的其中一种,后面的业务需求,如果我想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西都要重新推倒重新编写,因为它跟我们的系统耦合了,这时候SpringSloud Stream出场了!        

        SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!

        SpringCloud Stream由一个中间件中立的核组成,应用通过SpringCloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是发送消息到队列中的)通道与外界交流

        通道通过指定中间件的Binder实现与外部代理连接,业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可!

Binder

       Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder

       通过binder,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码

案例

消息驱动之生产者

1.新建模块cloud-stream-rabbitmq-provider8801

2.pom文件

<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--监控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--热部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.yml文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 112.164.16.82  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5673
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8801.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址

注意:我之前按照上述中的配置来连接ribbitmq时报错了 !!

上述的application.yml中使用了
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.xx
来配置rabbitmq的环境,如果你用的是远程服务器上的rabbitmq,比如我使用的是我自己的阿里云服务器然后在docker容器中运行的rabbitmq,按照上述配置方式的话,启动时会试图连接两次rabbitmq程序,第一次试图连接访问的就是application.yml中配置的地址,此时已经订阅成功了,但是程序还会在之后进行第二次连接,此时访问的地址就是localhost:5673,在我的环境中,我本地没有rabbitmq环境,所以直接报异常,因此,如果是使用自己的服务器来配置,则需要修改配置文件,将rabbitmq的配置信息移动到application.yml中的spring节点下!

异常信息:

看一下是什么原因: 

修改后的yml文件(最终版):

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 112.124.16.82  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
    port: 5673
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型

      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8801.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址

4.主启动类

@SpringBootApplication
public class StreamMQMain8801 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }

}

 5.业务类

  • 新建service.IMessageProvider接口
package service;

public interface IMessageProvider {
    public String send();
}
  • 在service下新建impl.IMessageProviderImpl实现类 
package service.impl;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import service.IMessageProvider;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class)    // 定义消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;  // 消息发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        // MessageBuilder是spring的integration.support.MessageBuilder
        output.send(MessageBuilder.withPayload(serial).build());    
        System.out.println("*******serial: " + serial);
        return null;
    }
}
  • 新建controller.SendMessageController 
package com.IT.springcloud.controller;

import com.IT.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider iMessageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return iMessageProvider.send();
    }

}

消息驱动之消费者

1.新建模块cloud-stream-rabbitmq-consumer8802

2.pom文件与8801相同

3.yml文件

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 112.124.16.82  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
    port: 5673
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型

      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8802.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址

4.主启动类StreamMQMain8802(这里省略代码)

5.新建controller.ReceiveMessageListenerController 

package com.IT.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Controller;


@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT) // 监听
    public void input(Message<String> message){
        System.out.println("消费者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
    }

}

6.测试,启动7001,8801,8802

浏览器地址栏输入:localhost:8801/sendMessage 

发送方8801

接收方8802

分组消费与持久化

1.按照8802克隆一个新模块8803

2.将8802/8803实现轮询分组,每次只有一个消费者收到消息,也就是说,8801发出一条消息,只能被8802和8803中的其中一个接收到,不能同时被接收,这样就可以避免重复消费,只需要在8802和8803的yml文件中:bindings/input下设置为同一个分组即可!

bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
          group: ITA # 设置分组

3.测试,现在我们发送两条消息

8801发送方

8802接收方1

8803接收方2

看一下rabbitmq管理界面

  • 8
    点赞
  • 66
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 5
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 5
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

小馒头爱学Java

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值