RabbitMQ快速使用代码手册
本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu
参考文档csdn博客:
(相关资料图)
基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.ymlserver:port: 8021spring:#给项目来个名字application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虚拟host 可以不设置,使用server默认hostvirtual-host: JCcccHost#确认消息已发送到交换机(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true
完善更多信息
spring:rabbitmq:host: localhostport: 5672virtual-host: /username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: trueretry:#发布重试,默认falseenabled: true#重试时间 默认1000msinitial-interval: 1000#重试最大次数 最大3max-attempts: 3#重试最大间隔时间max-interval: 10000#重试的时间隔乘数,比如配2,0第一次等于10s,第二次等于20s,第三次等于40smultiplier: 1listener:\# 默认配置是simpletype: simplesimple:\# 手动ack Acknowledge mode of container. auto noneacknowledge-mode: manual#消费者调用程序线程的最小数量concurrency: 10#消费者最大数量max-concurrency: 10#限制消费者每次只处理一条信息,处理完在继续下一条prefetch: 1#启动时是否默认启动容器auto-startup: true#被拒绝时重新进入队列default-requeue-rejected: true
相关注解说明@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。
\@Componentpublic class PointConsumer {//监听的队列名\@RabbitListener(queues = \"point.to.point\")public void processOne(String name) {System.out.println(\"point.to.point:\" + name);}}
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
\@Component\@RabbitListener(queues = \"consumer_queue\")public class Receiver {\@RabbitHandlerpublic void processMessage1(String message) {System.out.println(message);}\@RabbitHandlerpublic void processMessage2(byte\[\] message) {System.out.println(new String(message));}}
@Payload
可以获取消息中的 body 信息
\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body) {System.out.println(\"body:\"+body);}
@Header,@Headers
可以获得消息中的 headers 信息
\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@Header String token){System.out.println(\"body:\"+body);System.out.println(\"token:\"+token);}\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@HeadersMap\ headers) {System.out.println(\"body:\"+body);System.out.println(\"Headers:\"+headers);}
快速使用配置xml文件\org.springframework.boot\ \spring-boot-starter-amqp\ \
配置exchange、queue注解快速创建版本\@Configurationpublic class RabbitmqConfig {//创建交换机//通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机\@Bean(\"bootExchange\")public Exchange bootExchange() {returnExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();}//创建队列\@Bean(\"bootQueue\")public Queue bootQueue() {return QueueBuilder.durable(\"zx_queue\").build();}/\*\*\* 将队列与交换机绑定\*\* \@param queue\* \@param exchange\* \@return\*/\@Beanpublic Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,\@Qualifier(\"bootExchange\") Exchange exchange) {returnBindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();}}
Directimport org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class DirectRabbitConfig {//队列 起名:TestDirectQueue\@Beanpublic Queue TestDirectQueue() {//durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效//exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable//autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue(\"TestDirectQueue\",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue(\"TestDirectQueue\",true);}//Direct交换机 起名:TestDirectExchange\@BeanDirectExchange TestDirectExchange() {// return new DirectExchange(\"TestDirectExchange\",true,true);return new DirectExchange(\"TestDirectExchange\",true,false);}//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting\@BeanBinding bindingDirect() {returnBindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");}\@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(\"lonelyDirectExchange\");}}
Fanoutimport org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class FanoutRabbitConfig {/\*\*\* 创建三个队列 :fanout.A fanout.B fanout.C\* 将三个队列都绑定在交换机 fanoutExchange 上\* 因为是扇型交换机, 路由键无需配置,配置也不起作用\*/\@Beanpublic Queue queueA() {return new Queue(\"fanout.A\");}\@Beanpublic Queue queueB() {return new Queue(\"fanout.B\");}\@Beanpublic Queue queueC() {return new Queue(\"fanout.C\");}\@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(\"fanoutExchange\");}\@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}\@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}\@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
Topicimport org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class TopicRabbitConfig {//绑定键public final static String man = \"topic.man\";public final static String woman = \"topic.woman\";\@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.man);}\@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.woman);}\@BeanTopicExchange exchange() {return new TopicExchange(\"topicExchange\");}//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man//这样只要是消息携带的路由键是topic.man,才会分发到该队列\@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列\@BeanBinding bindingExchangeMessage2() {returnBindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");}}
生产者发送消息直接发送给队列
//指定消息队列的名字,直接发送消息到消息队列中\@Testpublic void testSimpleQueue() {// 队列名称String queueName = \"simple.queue\";// 消息String message = \"hello, spring amqp!\";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
发送给交换机,然后走不同的模式
////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息\@Testpublic void testSendDirectExchange() {// 交换机名称,有三种类型String exchangeName = \"itcast.direct\";// 消息String message =\"红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!\";// 发送消息,red为队列的key,因此此队列会得到消息rabbitTemplate.convertAndSend(exchangeName, \"red\", message);}
也可以将发送的消息封装到HashMap中然后发送给交换机
import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.HashMap;import java.util.Map;import java.util.UUID;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@RestControllerpublic class SendMessageController {\@AutowiredRabbitTemplate rabbitTemplate;//使用RabbitTemplate,这提供了接收/发送等等方法\@GetMapping(\"/sendDirectMessage\")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = \"test message, hello!\";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-ddHH:mm:ss\"));Map\ map=new HashMap\<\>();map.put(\"messageId\",messageId);map.put(\"messageData\",messageData);map.put(\"createTime\",createTime);//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend(\"TestDirectExchange\",\"TestDirectRouting\", map);return \"ok\";}}
消费者接收消息//使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。\@Componentpublic class MessageListener {\@RabbitListener(queues = \"direct_queue\")public void receive(String id){System.out.println(\"已完成短信发送业务(rabbitmq direct),id:\"+id);}}参数用Map接收也可以\@Component\@RabbitListener(queues = \"TestDirectQueue\")//监听的队列名称TestDirectQueuepublic class DirectReceiver {\@RabbitHandlerpublic void process(Map testMessage) {System.out.println(\"DirectReceiver消费者收到消息 : \" +testMessage.toString());}}
高级特性消息可靠性传递有confirm和return两种
在application.yml中添加以下配置项:
server:port: 8021spring:#给项目来个名字application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虚拟host 可以不设置,使用server默认hostvirtual-host: JCcccHost#确认消息已发送到交换机(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true
有两种配置方法:
写到配置类中
写到工具类或者普通类中,但是这个类得实现那两个接口
写法一编写消息确认回调函数
import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class RabbitConfig {\@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {\@Overridepublic void confirm(CorrelationData correlationData, boolean ack, Stringcause) {System.out.println(\"ConfirmCallback:\"+\"相关数据:\"+correlationData);System.out.println(\"ConfirmCallback: \"+\"确认情况:\"+ack);System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {\@Overridepublic void returnedMessage(Message message, int replyCode, StringreplyText, String exchange, String routingKey) {System.out.println(\"ReturnCallback: \"+\"消息:\"+message);System.out.println(\"ReturnCallback: \"+\"回应码:\"+replyCode);System.out.println(\"ReturnCallback: \"+\"回应信息:\"+replyText);System.out.println(\"ReturnCallback: \"+\"交换机:\"+exchange);System.out.println(\"ReturnCallback: \"+\"路由键:\"+routingKey);}});return rabbitTemplate;}}
写法二\@Component\@Slf4jpublic class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate RabbitTemplate rabbitTemplate;private String finalId = null;private SmsDTO smsDTO = null;/\*\*\* 发布者确认的回调\*\* \@param correlationData 回调的相关数据。\* \@param b ack为真,nack为假\* \@param s 一个可选的原因,用于nack,如果可用,否则为空。\*/\@Overridepublic void confirm(CorrelationData correlationData, boolean b, Strings) {// 消息发送成功,将redis中消息的状态(status)修改为1if (b) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +finalId, \"status\", 1);} else {// 发送失败,放入redis失败集合中,并删除集合数据log.error(\"短信消息投送失败:{}\--\>{}\", correlationData, s);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}}/\*\*\* 发生异常时的消息返回提醒\*\* \@param returnedMessage\*/\@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(\"发生异常,返回消息回调:{}\", returnedMessage);// 发送失败,放入redis失败集合中,并删除集合数据redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}\@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}}
消息确认机制手动确认
yml配置#手动确认 manuallistener:simple:acknowledge-mode: manual
写法一首先在消费者项目中创建MessageListenerConfig
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class MessageListenerConfig {\@Autowiredprivate CachingConnectionFactory connectionFactory;\@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收处理类\@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //RabbitMQ默认是自动确认,这里改为手动确认消息//设置一个队列container.setQueueNames(\"TestDirectQueue\");//如果同时设置多个如下: 前提是队列都是必须已经创建存在的//container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue(\"TestDirectQueue\",true));//container.addQueues(new Queue(\"TestDirectQueue2\",true));//container.addQueues(new Queue(\"TestDirectQueue3\",true));container.setMessageListener(myAckReceiver);return container;}}
然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();System.out.println(\" MyAckReceiver messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"消费的主题消息来自:\"+message.getMessageProperties().getConsumerQueue());channel.basicAck(deliveryTag, true);//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认delivery_tag 小于等于传入值的所有消息//channel.basicReject(deliveryTag,true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:
首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();if(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消费到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"执行TestDirectQueue中的消息的业务处理流程\...\...\");}if(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消费到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"执行fanout.A中的消息的业务处理流程\...\...\");}channel.basicAck(deliveryTag, true);//channel.basicReject(deliveryTag, true);//为true会重新放回队列} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
写法二\@Component\@Slf4jpublic class SendSmsListener {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate SendSmsUtils sendSmsUtils;/\*\*\* 监听发送短信普通队列\* \@param smsDTO\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_QUEUE_NAME)public void sendSmsListener(SmsDTO smsDTO, Message message, Channelchannel) throws IOException {String messageId = message.getMessageProperties().getMessageId();int retryCount = (int)redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +messageId, \"retryCount\");if (retryCount \> 3) {//重试次数大于3,直接放到死信队列log.error(\"短信消息重试超过3次:{}\", messageId);//basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。//该方法reject后,该消费者还是会消费到该条被reject的消息。channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);return;}try {String phoneNum = smsDTO.getPhoneNum();String code = smsDTO.getCode();if(StringUtils.isAnyBlank(phoneNum,code)){throw new RuntimeException(\"sendSmsListener参数为空\");}// 发送消息SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,code);SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();SendStatus sendStatus = sendStatusSet\[0\];if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"sendsuccess\".equals(sendStatus.getMessage())){throw new RuntimeException(\"发送验证码失败\");}//手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info(\"短信发送成功:{}\",smsDTO);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);} catch (Exception e) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}/\*\*\* 监听到发送短信死信队列\* \@param sms\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)public void smsDelayQueueListener(SmsDTO sms, Message message, Channelchannel) throws IOException {try{log.error(\"监听到死信队列消息==\>{}\",sms);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
消费端限流#配置RabbitMQspring:rabbitmq:host: 192.168.126.3port: 5672username: guestpassword: guestvirtual-host: /#开启自动确认 none 手动确认 manuallistener:simple:#消费端限流机制必须开启手动确认acknowledge-mode: manual#消费端最多拉取的消息条数,签收后不满该条数才会继续拉取prefetch: 5
消息存活时间TTL可以设置队列的存活时间,也可以设置具体消息的存活时间
设置队列中所有消息的存活时间
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();
即在创建队列时,设置存活时间
设置某条消息的存活时间
//发送消息,并设置该消息的存活时间
\@Testpublic void testSendMessage(){//1.创建消息属性MessageProperties messageProperties = new MessageProperties();//2.设置存活时间messageProperties.setExpiration(\"10000\");//3.创建消息对象Message message = newMessage(\"sendMessage\...\".getBytes(),messageProperties);//4.发送消息rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);}
若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。
在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准
死信队列死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能
import org.springframework.amqp.core.\*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configuration//Rabbit配置类public class RabbitConfig4 {private final String DEAD_EXCHANGE = \"dead_exchange\";private final String DEAD_QUEUE = \"dead_queue\";private final String NORMAL_EXCHANGE = \"normal_exchange\";private final String NORMAL_QUEUE = \"normal_queue\";//创建死信交换机\@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字topic为通配符模式的交换机.durable(true)//是否持久化,true即存到磁盘,false只在内存上.build();}//创建死信队列\@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE)//队列持久化//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源.build();}//死信交换机绑定死信队列\@Bean//@Qualifier注解,使用名称装配进行使用public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchangeexchange, \@Qualifier(DEAD_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"dead_routing\").noargs();}//创建普通交换机\@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字topic为通配符模式的交换机.durable(true)//是否持久化,true即存到磁盘,false只在内存上.build();}//创建普通队列\@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE)//队列持久化//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源.deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机.deadLetterRoutingKey(\"dead_routing\")//死信队列路由关键字.ttl(10000)//消息存活10s.maxLength(10)//队列最大长度为10.build();}//普通交换机绑定普通队列\@Bean//@Qualifier注解,使用名称装配进行使用public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchangeexchange, \@Qualifier(NORMAL_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"my_routing\").noargs();}}
延迟队列RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。
消费者监听延迟队列
\@Componentpublic class ExpireOrderConsumer {//监听过期订单队列\@RabbitListener(queues = \"expire_queue\")public void listenMessage(String orderId){//模拟处理数据库等业务System.out.println(\"查询\"+orderId+\"号订单的状态,如果已支付无需处理,如果未支付则回退库存\");}}控制层代码\@RestControllerpublic class OrderController {\@Autowiredprivate RabbitTemplate rabbitTemplate;\@RequestMapping(value = \"/place/{orderId}\",method =RequestMethod.GET)public String placeOrder(@PathVariable String orderId){//模拟service层处理System.out.println(\"处理订单数据\...\");//将订单id发送到订单队列rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);return \"下单成功,修改库存\";}}
标签:
抢先读
- 焦点快报!塔瑞斯世界官网公测时间(dota2公测时间)
- 弘扬诚信文化 雁峰区开展“6.14信用记录关爱日”宣传
- 全球通讯!TÜV莱茵与正泰绿色能源板块在德举行合作签约仪式
- 视讯!国际油价料终结周线二连跌,产油国期待中国买需托市
- 智能化社区安全体验馆(智能化社区) 今热点
- 三星 S23 系列系统迎来重大更新!国行版预计下周推送|世界信息
- 焦点播报:剑指智能驾驶 腾势N7憋大招?
- 【焦点热闻】2023年山西注册会计师考试缴费入口已开通
- 魔幻!那个抱了梅西的球迷,穿的鞋登上淘宝热销榜首!网友:一双好鞋,掌控全场…… 百事通
- 世界观速讯丨英国首相称AI是未来最大机会之一,力保英科技中心地位【附人工智能全球竞争力预测】
- 单位扣工资违反劳动法吗
- 拉丁美洲的OmniMLS与LoneWolf合作提供完整的交易套件作为会员福利
- 热门看点:5月份国民经济延续恢复态势
- 当前热讯:动画|宝“藏”朋友圈
- 国家发改委:大力推广“信易贷”模式
- 【当前热闻】79.12亿+5.3万㎡现房!中皋置业摇号竞得亦庄新城X47R1地块
- 搴旗(搴) 速读
- 天天微资讯!北京:16项治疗性辅助生殖技术纳入医保
- 当前热文:有钱人都去买电动车了,降价潮后,豪华燃油车没活路了?
- 延边人民出版社大型辞书《朝鲜语大辞典》首发仪式举行|精选
- 全球新消息丨填权是指什么意思 填权出现后如何应对
- 这次新能源汽车下乡,共有69款车型参与!|每日视点
- 四川省1-5月居民消费价格(CPI)同比上涨0.8%
- 机电安装工程行业市场分析及未来前景研究_天天即时看
- 拉菲尼亚:我将在下个赛季留在巴萨 欧冠是我们的目标 世界热点
- 邹平市黄山街道开展扫黄打非·护苗2023专项行动|天天观点
- 我国经济运行保持恢复态势 重点在六方面发力
- 海南椰岛(600238.SH):未涉及离岛免税业务|天天观热点
- 全球观察:两年以上基层工作经验含两年吗(2年以上基层工作经历什么意思)
- 每日观察!支持科研项目找资金 助力企业机构找项目 深交所打通技术与资本两个市场
- 双下巴抽脂会导致脸部松弛吗
- 三河市气象台更新高温橙色预警【Ⅱ级/严重】【2023-06-16】
- 百隆东方: 目前,公司在新疆没有生产加工基地,
- 当前焦点!商洛发布旅游优惠政策
- 祁阳市农产品监测保护市民“舌尖”安全
- 车辆违停碾压盲道,司机到场得知被贴罚单后竟称:那我不移了
- 重磅|“看中国”22省上线启动!主题策划发布!IPTV数据排行榜发布! 最新
- 当前关注:安阳红旗渠机场飞行程序实地验证试飞成功
- 要闻:做好防暑降温、保障劳动者健康!广州市总工会“送清凉”进基层
- 能链智电于翔:储能技术推动新能源充电服务升级转型-每日讯息
- 重庆巫溪“文旅+科技”融入巴蜀文旅走廊建设
- 夫唱妇随!“国宝”朱鹮从浙江跨省定居江西婺源啦!|天天热资讯
- 百隆东方: 百隆东方关于调整公司2021年第二期股票期权激励计划股票期权行权价格、激励对象名单、期权数量并注销部分已获授但未行权的股票期权的公告
- 南昌一体化政务服务平台再增3项特色应用场景_天天短讯
- 智慧交通多场景加速落地
- RabbitMQ快速使用代码手册
- 环球热文:吉林省2023外贸企业汇率避险及融资需求对接会举办
- 贵阳首美整形医院怎么样 医院真实案例分享
- 张家口:浓情“粽”动员 情暖环卫工 天天速读
- 世界百事通!中国绿发举办共迎亚运倒计时100天骑行活动
- 「公安心向党 护航新征程」国家反诈中心推出《2023版防范电信网络诈骗宣传手册》 每日短讯
- 最新通化市各劳动仲裁委员会地址及咨询电话名单一览
- 天天热门:西峡县城区二中开展“我们的节日∙端午节”主题系列活动
- 瓜叔必发:国足磨合存在问题,缅甸实力较差
- 加速电动化!法拉利动力总成工厂建成在即
- 每日焦点!香港考虑再放宽首置人群按揭成数 或接近零首付
- 上海昇思AI框架&大模型创新中心正式启动 云从科技等首批22家单位入驻
- 【焦点热闻】多省份公布高温津贴发放标准:多地月标准达300元,海南最长发7个月
- 魏牌“双旗舰SUV”亮相粤港澳大湾区车展-天天看点
- 聚焦数字化人才培养,2023微盟616数造零售大赛正式启动|天天看点
- 【国际漫评】这是没有新谣可造了吗?|世界今日讯
- 菜满园、果飘香!竹山这所“袖珍小学”有了“幸福农场” 环球简讯
- 花样滑冰项目单、双人滑训练营在首体进行 打造花滑选手互相学习提高的平台|环球今日讯
- 中国开辟多国玉米进口通道
- 世界观察:我国首条长江高铁隧道开始盾构始发掘进
- 信银理财智慧象合治进取1号年内跌7.66%-全球视讯
- 【三夏进行时】海报|三晋夏收农忙“丰”景
- 全球最资讯丨【三夏进行时】海报|三晋夏收农忙“丰”景
- “时尚中国”摄影大展延边州精选展在延吉举行
- 大连市将设立大连市政府引导母基金,首期规模100亿元 环球通讯
- 中国大巴山(重庆·城口)消夏康养季6月21日启动-当前消息
- 今日热门!英伟达携甜点级好物重磅加码 RTX4060Ti显卡在京东618卖爆
- 全球观点:1-5月中国汽车类零售总额同比小幅增长
- 看热讯:IWG集团签署广东首个管理项目 并首发HQ品牌
- 紫薇被容嬷嬷扎针台词(容嬷嬷扎针是什么梗) 环球消息
- 转型氢能源未见成效,美锦能源投资活动净流出超百亿?|微资讯
- 2023城镇职工医保报销流程是什么
- 世界快消息!中薇金融(00245):倪新光退任执行董事
- 重庆武隆区交通局二级调研员陈华涉嫌严重违纪违法接受审查调查
- 强奸罪追诉时效最长是多久_当前时讯
- 环球观热点:昇思开源社区理事会成立,基于昇思AI框架的全模态大模型“紫东.太初2.0”发布
- 欧洲主要股指集体高开
- 【全球播资讯】中交二航局举办2023世界交通运输大会交通基础设施工业化智能建造平行论坛
- 当健康成生意 主播应避免被流量“绑架”-天天要闻
- “五五购物节”火热进行中,活动丰富、优惠多多
- 理想首款纯电车型W01手绘图 灵感源自鲸鱼/明日公布车型名称
- 当前滚动:甘肃陇西县总工会:“1+17+N”构建新时代职工书屋矩阵
- 赛力斯股价连续上涨 李想称问界M7“打残”理想ONE
- 浙江边检总站开展“护航亚运”海空联合巡航和应急处突演练-资讯
- 每日快看:私人山庄遭网红闯入并渲染成鬼屋 房主发声:丢失9万余元财物,吓得我都不敢回了
- 壁挂炉水压2到3正常吗为什么(壁挂炉水压2到3正常吗)
- 长沙小升初微机派位看成绩吗
- 甬金股份: 浙江甬金金属科技股份有限公司公开发行可转换公司债券受托管理事务报告(2022年度) 观察
- 大货车行驶途中起火自燃 淮安消防民警联合扑救|时快讯
- 快看:尉氏县举行金融反诈知识宣传活动
- 天天快消息!“小学老师被指课堂猥亵女生”续:警方未发现违法行为,不予立案
- 插入word图片不显示(word插入图片显示一条) 观热点
- 2023厦门个人社保缴费标准是多少钱一个月 焦点速看
- 是谁杀死了机械硬盘?不是固态硬盘!_世界实时
- 东吴医院专家 女性不孕不育查什么_全球消息