springCloud使用rabbitMq

一.安装rabbitMq

安装rabbitMq(需要安装延时插件)。

二.rabbitMq 使用

1.导入maven依赖

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

2.在application.yml配置文件中加入RabbitMq配置信息

spring: 
  rabbitmq:
    host: 127.0.0.1
    port: 5672(端口)
    username: super(用户名)
    password: Jmy2019.(密码)
    virtual-host: ecosphere

3.rabbitMq配置类

package com.nuvole.merchant.conf.mq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * rabbit配置类(声明交换机、队列以及他们的绑定关系)
 *
 */
@Configuration
public class AmqpConfig {

    // 交换机名称(延时队列)
    public static final String DELAYED_EXCHANGE_KEY = "exchange.delayed";
    // 队列名称(成团状态更新)
    public static final String ORDER_GROUP_JOIN_QUEUE_KEY = "order.group.join.delayed";
    //队列路线/绑定关系(成团状态更新)
    public static final String ORDER_GROUP_JOIN_ROUTK = "order.group.join.delayed";

    /**
     * 延时队列交换器
     *
     */
    @Bean
    public CustomExchange testExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_KEY, "x-delayed-message", true, false, args);
    }
    /**
     * 成团状态更新推送队列
     *
     */
    @Bean
    public Queue orderGroupJoinQueue() {
        return new Queue(ORDER_GROUP_JOIN_QUEUE_KEY, true);
    }

    @Bean
    public Binding flashSalePushBinding(CustomExchange delayedExchange, Queue orderGroupJoinQueue) {
        Binding binding=BindingBuilder.bind(orderGroupJoinQueue).to(delayedExchange).with(ORDER_GROUP_JOIN_ROUTK).noargs();
        return binding;
    }
}

4.消息发送确认

package com.nuvole.shop.config.mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 */
@Component
public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
                + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }
}

5.发送队列消息的service

package com.nuvole.merchant.service.mq;

/**
 * 发送消息
 *
 */
public interface QueueMessageService {

    /**
     * 发送正常队列消息
     *
     */
    void send(String exchangeKey, String routingKey, Object message);


    /**
     * 发送延时队列消息
     *
     */
    void delayedSend(String exchangeKey, String routingKey, Object message, int msec);

}

6.发送队列消息的service实现类

package com.nuvole.merchant.service.mq;

import com.nuvole.util.IdGenerator;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class QueueMessageServiceImpl implements QueueMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(String exchangeKey, String routingKey, Object message) {
        CorrelationData correlationData = new CorrelationData(IdGenerator.getUUID());
        rabbitTemplate.convertAndSend(exchangeKey, routingKey, message, correlationData);
    }


    @Override
    public void delayedSend(String exchangeKey, String routingKey, Object msg,final int xdelay) {

        rabbitTemplate.convertAndSend(exchangeKey, routingKey, msg, message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay(xdelay);
            return message;
        });
    }

}

7.调用队列

       //msec:距离调用队列的时间(毫秒) param_extend:向队列传递的参数
       int msec = Integer.parseInt(Long.toString(DateUtil.betweenMs(new Date(), outTime)));
       HashMap<String, Object> param_extend = new HashMap<>();
       param_extend.put("id",“123456");
       queueMessageService.delayedSend(AmqpConfig.DELAYED_EXCHANGE_KEY, AmqpConfig.ORDER_GROUP_JOIN_QUEUE_KEY, JSON.toJSONString(param_extend), msec);

8.监听队列方法

package com.nuvole.shop.config.mq;


import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.JSON;
import com.nuvole.shop.domain.StoreGroupGoodsJoin;
import com.nuvole.shop.domain.extend.WechatStoreGroupJoinGoods;
import com.nuvole.shop.mapper.extend.ExtendStoreGroupGoodsJoinMapper;
import com.nuvole.shop.scheduler.StorePushScheduler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * 监听队列消息
 *
 */
@Slf4j
@Component
@EnableRabbit
public class TestReceiver {


    @RabbitListener(queues = AmqpConfig.ORDER_GROUP_JOIN_QUEUE_KEY)
    public void process(String msg, Channel channel, Message message) {
        log.info("======================延时队列开始执行。。。。。。。。。。。。。。");
        log.info(new Date().toString() + ",延时收到了信息 message = " + msg);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
             Map p = JSON.parseObject(msg, HashMap.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
  • 1
    点赞
  • 3
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值