RabbitMQ快速使用代码手册

博客 分享
0 291
张三
张三 2023-06-16 18:10:44
悬赏:0 积分 收藏

RabbitMQ快速使用代码手册

本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu

参考文档

csdn博客:

基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml

server:

port: 8021

spring:

#给项目来个名字

application:

name: rabbitmq-provider

#配置rabbitMq 服务器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虚拟host 可以不设置,使用server默认host

virtual-host: JCcccHost

#确认消息已发送到交换机(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#确认消息已发送到队列(Queue)

publisher-returns: true

完善更多信息

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /

username: guest

password: guest

publisher-confirm-type: correlated

publisher-returns: true

template:

mandatory: true

retry:

#发布重试,默认false

enabled: true

#重试时间 默认1000ms

initial-interval: 1000

#重试最大次数 最大3

max-attempts: 3

#重试最大间隔时间

max-interval: 10000

#重试的时间隔乘数,比如配2,0
第一次等于10s,第二次等于20s,第三次等于40s

multiplier: 1

listener:

\# 默认配置是simple

type: simple

simple:

\# 手动ack Acknowledge mode of container. auto none

acknowledge-mode: manual

#消费者调用程序线程的最小数量

concurrency: 10

#消费者最大数量

max-concurrency: 10

#限制消费者每次只处理一条信息,处理完在继续下一条

prefetch: 1

#启动时是否默认启动容器

auto-startup: true

#被拒绝时重新进入队列

default-requeue-rejected: true

相关注解说明

@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
里面的消息。

@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。

\@Component

public class PointConsumer {

//监听的队列名

\@RabbitListener(queues = \"point.to.point\")

public void processOne(String name) {

System.out.println(\"point.to.point:\" + name);

}

}

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。

\@Component

\@RabbitListener(queues = \"consumer_queue\")

public class Receiver {

\@RabbitHandler

public void processMessage1(String message) {

System.out.println(message);

}

\@RabbitHandler

public void processMessage2(byte\[\] message) {

System.out.println(new String(message));

}

}

@Payload

可以获取消息中的 body 信息

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body) {

System.out.println(\"body:\"+body);

}

@Header,@Headers

可以获得消息中的 headers 信息

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Header String token)
{

System.out.println(\"body:\"+body);

System.out.println(\"token:\"+token);

}

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Headers
Map\<String,Object\> headers) {

System.out.println(\"body:\"+body);

System.out.println(\"Headers:\"+headers);

}

快速使用

配置xml文件

<dependency\>

\<groupId\>org.springframework.boot\</groupId\>

\<artifactId\>spring-boot-starter-amqp\</artifactId\>

\</dependency\>

配置exchange、queue

注解快速创建版本

\@Configuration

public class RabbitmqConfig {

//创建交换机

//通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机

\@Bean(\"bootExchange\")

public Exchange bootExchange() {

return
ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();

}

//创建队列

\@Bean(\"bootQueue\")

public Queue bootQueue() {

return QueueBuilder.durable(\"zx_queue\").build();

}

/\*\*

\* 将队列与交换机绑定

\*

\* \@param queue

\* \@param exchange

\* \@return

\*/

\@Bean

public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,
\@Qualifier(\"bootExchange\") Exchange exchange) {

return
BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();

}

}

Direct

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class DirectRabbitConfig {

//队列 起名:TestDirectQueue

\@Bean

public Queue TestDirectQueue() {

//
durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效

//
exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable

//
autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

// return new Queue(\"TestDirectQueue\",true,true,false);

//一般设置一下队列的持久化就好,其余两个就是默认false

return new Queue(\"TestDirectQueue\",true);

}

//Direct交换机 起名:TestDirectExchange

\@Bean

DirectExchange TestDirectExchange() {

// return new DirectExchange(\"TestDirectExchange\",true,true);

return new DirectExchange(\"TestDirectExchange\",true,false);

}

//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting

\@Bean

Binding bindingDirect() {

return
BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");

}

\@Bean

DirectExchange lonelyDirectExchange() {

return new DirectExchange(\"lonelyDirectExchange\");

}

}

Fanout

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class FanoutRabbitConfig {

/\*\*

\* 创建三个队列 :fanout.A fanout.B fanout.C

\* 将三个队列都绑定在交换机 fanoutExchange 上

\* 因为是扇型交换机, 路由键无需配置,配置也不起作用

\*/

\@Bean

public Queue queueA() {

return new Queue(\"fanout.A\");

}

\@Bean

public Queue queueB() {

return new Queue(\"fanout.B\");

}

\@Bean

public Queue queueC() {

return new Queue(\"fanout.C\");

}

\@Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange(\"fanoutExchange\");

}

\@Bean

Binding bindingExchangeA() {

return BindingBuilder.bind(queueA()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeB() {

return BindingBuilder.bind(queueB()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeC() {

return BindingBuilder.bind(queueC()).to(fanoutExchange());

}

}

Topic

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class TopicRabbitConfig {

//绑定键

public final static String man = \"topic.man\";

public final static String woman = \"topic.woman\";

\@Bean

public Queue firstQueue() {

return new Queue(TopicRabbitConfig.man);

}

\@Bean

public Queue secondQueue() {

return new Queue(TopicRabbitConfig.woman);

}

\@Bean

TopicExchange exchange() {

return new TopicExchange(\"topicExchange\");

}

//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man

//这样只要是消息携带的路由键是topic.man,才会分发到该队列

\@Bean

Binding bindingExchangeMessage() {

return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);

}

//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#

// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列

\@Bean

Binding bindingExchangeMessage2() {

return
BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");

}

}

生产者发送消息

直接发送给队列

//指定消息队列的名字,直接发送消息到消息队列中

\@Test

public void testSimpleQueue() {

// 队列名称

String queueName = \"simple.queue\";

// 消息

String message = \"hello, spring amqp!\";

// 发送消息

rabbitTemplate.convertAndSend(queueName, message);

}

发送给交换机,然后走不同的模式

////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息

\@Test

public void testSendDirectExchange() {

// 交换机名称,有三种类型

String exchangeName = \"itcast.direct\";

// 消息

String message =
\"红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!\";

// 发送消息,red为队列的key,因此此队列会得到消息

rabbitTemplate.convertAndSend(exchangeName, \"red\", message);

}

也可以将发送的消息封装到HashMap中然后发送给交换机

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@RestController

public class SendMessageController {

\@Autowired

RabbitTemplate rabbitTemplate;
//使用RabbitTemplate,这提供了接收/发送等等方法

\@GetMapping(\"/sendDirectMessage\")

public String sendDirectMessage() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = \"test message, hello!\";

String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd
HH:mm:ss\"));

Map\<String,Object\> map=new HashMap\<\>();

map.put(\"messageId\",messageId);

map.put(\"messageData\",messageData);

map.put(\"createTime\",createTime);

//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange

rabbitTemplate.convertAndSend(\"TestDirectExchange\",
\"TestDirectRouting\", map);

return \"ok\";

}

}

消费者接收消息

//使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。

\@Component

public class MessageListener {

\@RabbitListener(queues = \"direct_queue\")

public void receive(String id){

System.out.println(\"已完成短信发送业务(rabbitmq direct),id:\"+id);

}

}

参数用Map接收也可以

\@Component

\@RabbitListener(queues = \"TestDirectQueue\")//监听的队列名称
TestDirectQueue

public class DirectReceiver {

\@RabbitHandler

public void process(Map testMessage) {

System.out.println(\"DirectReceiver消费者收到消息 : \" +
testMessage.toString());

}

}

高级特性

消息可靠性传递

有confirm和return两种

在application.yml中添加以下配置项:

server:

port: 8021

spring:

#给项目来个名字

application:

name: rabbitmq-provider

#配置rabbitMq 服务器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虚拟host 可以不设置,使用server默认host

virtual-host: JCcccHost

#确认消息已发送到交换机(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#确认消息已发送到队列(Queue)

publisher-returns: true

有两种配置方法:

写到配置类中

写到工具类或者普通类中,但是这个类得实现那两个接口

写法一

编写消息确认回调函数

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class RabbitConfig {

\@Bean

public RabbitTemplate createRabbitTemplate(ConnectionFactory
connectionFactory){

RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setConnectionFactory(connectionFactory);

//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

\@Override

public void confirm(CorrelationData correlationData, boolean ack, String
cause) {

System.out.println(\"ConfirmCallback:
\"+\"相关数据:\"+correlationData);

System.out.println(\"ConfirmCallback: \"+\"确认情况:\"+ack);

System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);

}

});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

\@Override

public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {

System.out.println(\"ReturnCallback: \"+\"消息:\"+message);

System.out.println(\"ReturnCallback: \"+\"回应码:\"+replyCode);

System.out.println(\"ReturnCallback: \"+\"回应信息:\"+replyText);

System.out.println(\"ReturnCallback: \"+\"交换机:\"+exchange);

System.out.println(\"ReturnCallback: \"+\"路由键:\"+routingKey);

}

});

return rabbitTemplate;

}

}

写法二

\@Component

\@Slf4j

public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private RabbitTemplate rabbitTemplate;

private String finalId = null;

private SmsDTO smsDTO = null;

/\*\*

\* 发布者确认的回调

\*

\* \@param correlationData 回调的相关数据。

\* \@param b ack为真,nack为假

\* \@param s 一个可选的原因,用于nack,如果可用,否则为空。

\*/

\@Override

public void confirm(CorrelationData correlationData, boolean b, String
s) {

// 消息发送成功,将redis中消息的状态(status)修改为1

if (b) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
finalId, \"status\", 1);

} else {

// 发送失败,放入redis失败集合中,并删除集合数据

log.error(\"短信消息投送失败:{}\--\>{}\", correlationData, s);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

}

/\*\*

\* 发生异常时的消息返回提醒

\*

\* \@param returnedMessage

\*/

\@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

log.error(\"发生异常,返回消息回调:{}\", returnedMessage);

// 发送失败,放入redis失败集合中,并删除集合数据

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

\@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

}

消息确认机制

手动确认

yml配置

#手动确认 manual

listener:

simple:

acknowledge-mode: manual

写法一

首先在消费者项目中创建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Queue;

import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class MessageListenerConfig {

\@Autowired

private CachingConnectionFactory connectionFactory;

\@Autowired

private MyAckReceiver myAckReceiver;//消息接收处理类

\@Bean

public SimpleMessageListenerContainer simpleMessageListenerContainer() {

SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer(connectionFactory);

container.setConcurrentConsumers(1);

container.setMaxConcurrentConsumers(1);

container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
RabbitMQ默认是自动确认,这里改为手动确认消息

//设置一个队列

container.setQueueNames(\"TestDirectQueue\");

//如果同时设置多个如下: 前提是队列都是必须已经创建存在的

//
container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");

//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues

//container.setQueues(new Queue(\"TestDirectQueue\",true));

//container.addQueues(new Queue(\"TestDirectQueue2\",true));

//container.addQueues(new Queue(\"TestDirectQueue3\",true));

container.setMessageListener(myAckReceiver);

return container;

}

}

然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = msgMap.get(\"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

System.out.println(\" MyAckReceiver messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"消费的主题消息来自:\"+message.getMessageProperties().getConsumerQueue());

channel.basicAck(deliveryTag, true);
//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认
delivery_tag 小于等于传入值的所有消息

//channel.basicReject(deliveryTag,
true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:

首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = msgMap.get(\"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

if
(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"消息成功消费到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"执行TestDirectQueue中的消息的业务处理流程\...\...\");

}

if
(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"消息成功消费到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"执行fanout.A中的消息的业务处理流程\...\...\");

}

channel.basicAck(deliveryTag, true);

//channel.basicReject(deliveryTag, true);//为true会重新放回队列

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

写法二

\@Component

\@Slf4j

public class SendSmsListener {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private SendSmsUtils sendSmsUtils;

/\*\*

\* 监听发送短信普通队列

\* \@param smsDTO

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_QUEUE_NAME)

public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
channel) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

int retryCount = (int)
redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
messageId, \"retryCount\");

if (retryCount \> 3) {

//重试次数大于3,直接放到死信队列

log.error(\"短信消息重试超过3次:{}\", messageId);

//basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。

//该方法reject后,该消费者还是会消费到该条被reject的消息。

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

return;

}

try {

String phoneNum = smsDTO.getPhoneNum();

String code = smsDTO.getCode();

if(StringUtils.isAnyBlank(phoneNum,code)){

throw new RuntimeException(\"sendSmsListener参数为空\");

}

// 发送消息

SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
code);

SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();

SendStatus sendStatus = sendStatusSet\[0\];

if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send
success\".equals(sendStatus.getMessage())){

throw new RuntimeException(\"发送验证码失败\");

}

//手动确认消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

log.info(\"短信发送成功:{}\",smsDTO);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

} catch (Exception e) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

/\*\*

\* 监听到发送短信死信队列

\* \@param sms

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)

public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
channel) throws IOException {

try{

log.error(\"监听到死信队列消息==\>{}\",sms);

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}catch (Exception e){

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

}

消费端限流

#配置RabbitMQ

spring:

rabbitmq:

host: 192.168.126.3

port: 5672

username: guest

password: guest

virtual-host: /

#开启自动确认 none 手动确认 manual

listener:

simple:

#消费端限流机制必须开启手动确认

acknowledge-mode: manual

#消费端最多拉取的消息条数,签收后不满该条数才会继续拉取

prefetch: 5

消息存活时间TTL

可以设置队列的存活时间,也可以设置具体消息的存活时间

设置队列中所有消息的存活时间

return QueueBuilder

.durable(QUEUE_NAME)//队列持久化

.ttl(10000)//设置队列的所有消息存活10s

.build();

即在创建队列时,设置存活时间

设置某条消息的存活时间

//发送消息,并设置该消息的存活时间

\@Test

public void testSendMessage()

{

//1.创建消息属性

MessageProperties messageProperties = new MessageProperties();

//2.设置存活时间

messageProperties.setExpiration(\"10000\");

//3.创建消息对象

Message message = new
Message(\"sendMessage\...\".getBytes(),messageProperties);

//4.发送消息

rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);

}

若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。

在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准

死信队列

死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能

import org.springframework.amqp.core.\*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration//Rabbit配置类

public class RabbitConfig4 {

private final String DEAD_EXCHANGE = \"dead_exchange\";

private final String DEAD_QUEUE = \"dead_queue\";

private final String NORMAL_EXCHANGE = \"normal_exchange\";

private final String NORMAL_QUEUE = \"normal_queue\";

//创建死信交换机

\@Bean(DEAD_EXCHANGE)

public Exchange deadExchange()

{

return ExchangeBuilder

.topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字
topic为通配符模式的交换机

.durable(true)//是否持久化,true即存到磁盘,false只在内存上

.build();

}

//创建死信队列

\@Bean(DEAD_QUEUE)

public Queue deadQueue()

{

return QueueBuilder

.durable(DEAD_QUEUE)//队列持久化

//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源

.build();

}

//死信交换机绑定死信队列

\@Bean

//@Qualifier注解,使用名称装配进行使用

public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
exchange, \@Qualifier(DEAD_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"dead_routing\")

.noargs();

}

//创建普通交换机

\@Bean(NORMAL_EXCHANGE)

public Exchange normalExchange()

{

return ExchangeBuilder

.topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字
topic为通配符模式的交换机

.durable(true)//是否持久化,true即存到磁盘,false只在内存上

.build();

}

//创建普通队列

\@Bean(NORMAL_QUEUE)

public Queue normalQueue()

{

return QueueBuilder

.durable(NORMAL_QUEUE)//队列持久化

//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源

.deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机

.deadLetterRoutingKey(\"dead_routing\")//死信队列路由关键字

.ttl(10000)//消息存活10s

.maxLength(10)//队列最大长度为10

.build();

}

//普通交换机绑定普通队列

\@Bean

//@Qualifier注解,使用名称装配进行使用

public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"my_routing\")

.noargs();

}

}

延迟队列

RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能

即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。

消费者监听延迟队列

\@Component

public class ExpireOrderConsumer {

//监听过期订单队列

\@RabbitListener(queues = \"expire_queue\")

public void listenMessage(String orderId)

{

//模拟处理数据库等业务

System.out.println(\"查询\"+orderId+\"号订单的状态,如果已支付无需处理,如果未支付则回退库存\");

}

}

控制层代码

\@RestController

public class OrderController {

\@Autowired

private RabbitTemplate rabbitTemplate;

\@RequestMapping(value = \"/place/{orderId}\",method =
RequestMethod.GET)

public String placeOrder(@PathVariable String orderId)

{

//模拟service层处理

System.out.println(\"处理订单数据\...\");

//将订单id发送到订单队列

rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);

return \"下单成功,修改库存\";

}

}
posted @ 2023-06-16 17:48  ChangesWorlds  阅读(0)  评论(0编辑  收藏  举报
回帖
    张三

    张三 (王者 段位)

    921 积分 (2)粉丝 (41)源码

     

    温馨提示

    亦奇源码

    最新会员