专栏/RabbitMQ分布式事务介绍及案例部署

RabbitMQ分布式事务介绍及案例部署

2020年09月09日 08:31--浏览 · --喜欢 · --评论
粉丝:14文章:6

当两系统进行消息传递时,各系统之间的数据需要保证一致性,而这时可以采用消息中间件达到分布式事务的目的。


事务例图


 实现方案:

1.保证消息生产者可以将消息推送到broker(RabbitMQ中间件),可以使用消息确认机制(Confirm模式),发送失败的消息进行补发。

2.保证消息消费者可以成功消费,可以采取RabiitMQ的手动ACK机制,由消费者控制消息的重发/清除/丢弃。

3.消息重发机制,当消费者处理失败,需要MQ再次重发给消费者。

4.当重试次数过多、消息内容格式错误等情况,通过线上预警机制通知运维人员。

案例演示:

依赖:

<dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-amqp</artifactId>

        </dependency>

yaml:

 rabbitmq:

    addresses: 192.168.119.11    #中间件服务器IP

    port: 5672                 #端口

    username: guest            #RabbitMQ用户

    password: guest            #RabiitMQ密码

    publisher-confirms: true    #消息确认机制

    publisher-returns: true     #开启发送失败退回

    template:

      mandatory: true    #保证监听有效

    listener:

      simple:

        acknowledge-mode: manual    #手动ACK机制

        concurrency: 1               #消费者最小数

        max-concurrency: 10          #消费者最大处

        retry:

          enabled: true              ##支持重试/重发

RabbitMQ配置类:

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig  {
   public static String exchange = "e1";
   public static String Queue = "q1";
   public static String Binding = "r1";
   public static final String routing = "r";
   /*声明队列*/
   @Bean
   public Queue getQueue(){
       return new Queue(RabbitMqConfig.Queue);
   }
   /*声明交换机*/
   @Bean
   public DirectExchange getExchange(){
       return new DirectExchange(RabbitMqConfig.exchange);
   }
   /*绑定路由*/
   @Bean
   public Binding bindingExchange(@Qualifier("getQueue")Queue queue, @Qualifier("getExchange")DirectExchange directExchange){
       return BindingBuilder.bind(queue).to(directExchange).with(routing);
   }

   @Bean
   public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
       return new RabbitTemplate(connectionFactory);
   }
}

servie层:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
public class ProductService {
   @Autowired
   private SenderService ss;

   /**
    * @param str 接收Controller层传递的参数
    * 因为数据库原因,这里不进行真正的插入数据,为了保证事务,需要创建
    * 一个消息专属表,用来记录消息的状态,默认消息状态0为未发送,当消
    * 息进行confirm校验时,将消息的状态0置为1,表示消息已经成功到达
    * broker,这里有两次插入操作,一次为业务需要插入的数据,一次为将
    * 消息信息插入到消息表里。
    */
   public void add(String str){
       System.out.println("模拟业务数据插入数据库操作");
       System.out.println("模拟消息信息插入数据库操作");
       ss.send(str);
   }
}

service层:

import com.sunshop.config.RabbitMqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SenderService implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
   private final Logger log= LoggerFactory.getLogger(SenderService.class);

   @Autowired
   private RabbitTemplate rt;

   /**
    * @param str 消息内容
    */
   public void send(String str){
       String[] split = str.split("-");
       System.out.println("消息内容"+split[0]);
       //当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。
       this.rt.setMandatory(true);
       //Confirm校验
       this.rt.setConfirmCallback(this);
       //失败回退
       this.rt.setReturnCallback(this);
       CorrelationData correlationData = new CorrelationData(split[1]);
       this.rt.convertAndSend(RabbitMqConfig.exchange,RabbitMqConfig.routing,
               split[0],correlationData);
   }

   @Override
   public void confirm(CorrelationData correlationData, boolean b, String s) {
       String id = correlationData.getId();
       if(b){
           System.out.println("通过id更改数据库里的消息发送状态:"+id);
       }
       else {
           System.out.println("失败处理");
       }
   }

   @Override
   public void returnedMessage(Message message, int i, String s, String s1, String s2) {
       System.out.println("return message:"+new String(message.getBody())+"-"+i+"-"
       +s+"-"+s1+"-"+s2);
   }
}

消费者:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Date;

@Component
@Transactional
public class Consumer {
   @RabbitListener(queues = "q1")     //监听消息队列
   public void process(Channel c, Message m){
       try {
           System.out.println("消费者:"+new String(m.getBody())+"消费时间"+new Date());
           //状态和订单号进行幂等性判断防止应用中途挂掉或异常,MQ没有收到ACK确认导致重发消息数据库重复添加
           System.out.println("查询数据库里是否有数据,如果没有则插入数据。");
           //手动ACK,告诉MQ消息已经消费,可以在队列中删除
           c.basicAck(m.getMessageProperties().getDeliveryTag(),false);
       } catch (IOException e) {
           //失败操作,可以统计同一ID操作失败的次数,达到上限,通知人工干预
           e.printStackTrace();
       }
   }
}

投诉或建议