当两系统进行消息传递时,各系统之间的数据需要保证一致性,而这时可以采用消息中间件达到分布式事务的目的。
实现方案:
1.保证消息生产者可以将消息推送到broker(RabbitMQ中间件),可以使用消息确认机制(Confirm模式),发送失败的消息进行补发。
2.保证消息消费者可以成功消费,可以采取RabiitMQ的手动ACK机制,由消费者控制消息的重发/清除/丢弃。
3.消息重发机制,当消费者处理失败,需要MQ再次重发给消费者。
4.当重试次数过多、消息内容格式错误等情况,通过线上预警机制通知运维人员。
案例演示:
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml:
rabbitmq:
addresses: 192.168.119.11 #中间件服务器IP
port: 5672 #端口
username: guest #RabbitMQ用户
password: guest #RabiitMQ密码
publisher-confirms: true #消息确认机制
publisher-returns: true #开启发送失败退回
template:
mandatory: true #保证监听有效
listener:
simple:
acknowledge-mode: manual #手动ACK机制
concurrency: 1 #消费者最小数
max-concurrency: 10 #消费者最大处
retry:
enabled: true ##支持重试/重发
RabbitMQ配置类:
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static String exchange = "e1";
public static String Queue = "q1";
public static String Binding = "r1";
public static final String routing = "r";
/*声明队列*/
@Bean
public Queue getQueue(){
return new Queue(RabbitMqConfig.Queue);
}
/*声明交换机*/
@Bean
public DirectExchange getExchange(){
return new DirectExchange(RabbitMqConfig.exchange);
}
/*绑定路由*/
@Bean
public Binding bindingExchange(@Qualifier("getQueue")Queue queue, @Qualifier("getExchange")DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with(routing);
}
@Bean
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
servie层:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
public class ProductService {
@Autowired
private SenderService ss;
/**
* @param str 接收Controller层传递的参数
* 因为数据库原因,这里不进行真正的插入数据,为了保证事务,需要创建
* 一个消息专属表,用来记录消息的状态,默认消息状态0为未发送,当消
* 息进行confirm校验时,将消息的状态0置为1,表示消息已经成功到达
* broker,这里有两次插入操作,一次为业务需要插入的数据,一次为将
* 消息信息插入到消息表里。
*/
public void add(String str){
System.out.println("模拟业务数据插入数据库操作");
System.out.println("模拟消息信息插入数据库操作");
ss.send(str);
}
}
service层:
import com.sunshop.config.RabbitMqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SenderService implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
private final Logger log= LoggerFactory.getLogger(SenderService.class);
@Autowired
private RabbitTemplate rt;
/**
* @param str 消息内容
*/
public void send(String str){
String[] split = str.split("-");
System.out.println("消息内容"+split[0]);
//当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。
this.rt.setMandatory(true);
//Confirm校验
this.rt.setConfirmCallback(this);
//失败回退
this.rt.setReturnCallback(this);
CorrelationData correlationData = new CorrelationData(split[1]);
this.rt.convertAndSend(RabbitMqConfig.exchange,RabbitMqConfig.routing,
split[0],correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData.getId();
if(b){
System.out.println("通过id更改数据库里的消息发送状态:"+id);
}
else {
System.out.println("失败处理");
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("return message:"+new String(message.getBody())+"-"+i+"-"
+s+"-"+s1+"-"+s2);
}
}
消费者:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Date;
@Component
@Transactional
public class Consumer {
@RabbitListener(queues = "q1") //监听消息队列
public void process(Channel c, Message m){
try {
System.out.println("消费者:"+new String(m.getBody())+"消费时间"+new Date());
//状态和订单号进行幂等性判断防止应用中途挂掉或异常,MQ没有收到ACK确认导致重发消息数据库重复添加
System.out.println("查询数据库里是否有数据,如果没有则插入数据。");
//手动ACK,告诉MQ消息已经消费,可以在队列中删除
c.basicAck(m.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
//失败操作,可以统计同一ID操作失败的次数,达到上限,通知人工干预
e.printStackTrace();
}
}
}