RabbitMQ 事务

RabbitMQ消息的事务机制

在使用RabbitMQ的时候,可以通过消息持久化操作类解决因为服务器的异常崩溃导致的消息丢失。默认情况下生产者是不知道消息有没有到达broker的。

RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案
  • 通过将channel设置成confirm模式来实现

★AMQP事务机制控制

  txSelect()用于将当前channel设置成rransaction模式。

  txCommit()用于提交事务

  txRollback()用于回滚事务

1.创建一个生产者Send.java

这个发送的代码中有故意的错误来测试事务的回滚

package com.ckfuture.tx.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 事务消息生产者
 */
public class Send {

    //定义队列名称
    private final static String QUEUE_NAME = "tx";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //连接工厂的地址
        factory.setHost("localhost");
        int i=1/0;

        Connection connection=null;
        Channel channel=null;
        try {
            //连接工厂创建连接
            connection = factory.newConnection();
            //创建信道
            channel = connection.createChannel();
            //开启事务
            channel.txSelect();
            //绑定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            //提交事务
            channel.txCommit();
            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();
            //回滚事务
            channel.txRollback();
        }
    }
}

2.创建一个消费者Recv01.java

package com.ckfuture.tx.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 事务消息消费者
 */
public class Recv {

    private final static String QUEUE_NAME = "tx";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

运行结果:

 一般在生产中很少使用AMQP事务机制,因为会降低RabbitMQ的消息吞吐量和性能。

★confirm确认模式

优点:异步

实现方式:

普通确认方式:发一条确认一条,串行confirm。

批量确认方式:发一批确认一批

异步确认方式:提供一个回调方法。

confirm确认模式分为: 同步确认模式和异步确认模式

1.同步模式

也会降低RabbitMQ的性能,很少用到。

①创建一个生产者Send.java

package com.ckfuture.confirm.sync.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 确认模式-同步-消息生产者
 */
public class Send {

    //定义队列名称
    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //连接工厂的地址
        factory.setHost("localhost");

        Connection connection=null;
        Channel channel=null;
        try {
            //连接工厂创建连接
            connection = factory.newConnection();
            //创建信道
            channel = connection.createChannel();
            //开启确认模式
            channel.confirmSelect();
            //绑定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            //普通confirm确认模式
            if(channel.waitForConfirms()){
                System.out.println("消息发送成功!");
            }

            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();

        }
    }
}

 

 //批量confirm模式,只要有一条未确认,直接抛异常
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送成功!");

②创建一个消费者Recv.java

package com.ckfuture.confirm.sync.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 确认模式-同步-消息消费者
 */
public class Recv {

    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

③运行结果:

 

 2.异步模式

 

posted @ 2021-02-16 21:53  创客未来  阅读(400)  评论(0编辑  收藏  举报