rabbitmq中交换机有direct 、fanout、topic、headers四种类型。本文我们重点介绍下前三种的使用案例。
项目准备
1、项目中pom.xml引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、bootstrap.yml配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /yourvhost
username: youruser
password: yourpassword
=========================================
一、direct交换机
direct交换机 发送消息时必须指定routingKey(消息路由),且需要声明队列绑定到routingKey ,消费者程序监听消息队列。
DirectExchangeConfig 配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* direct 交换机
*/
@Configuration
public class DirectExchangeConfig {
public static final String DIRECT_EXCHANGE = "directExchange";
public static final String DIRECT_QUEUE1 = "directQueue1";
public static final String DIRECT_QUEUE2 = "directQueue2";
public static final String DIRECT_ROUTING_KEY = "direct";
// Direct交换机:持久化
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
// 队列1:持久化
@Bean
public Queue directQueue1() {
return new Queue(DIRECT_QUEUE1, true);
}
// 队列2:持久化
@Bean
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE2, true);
}
// 绑定队列到交换机,并设置路由键
@Bean
public Binding bindingDirectExchange1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY);
}
// 绑定队列到交换机,并设置路由键
@Bean
public Binding bindingDirectExchange2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with(DIRECT_ROUTING_KEY);
}
}
DirectExController.java 类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* directmq消息发送
*/
@Slf4j
@RestController
@RequestMapping("/directEx")
public class DirectExController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* direct交换机为直连模式交换机
* 根据消息携带的路由键将消息投递给对应队列
*
* @return
*/
@TokenIgnore
@RequestMapping(value = "/send",method = RequestMethod.GET)
public Resp send() {
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");
return Resp.success();
}
}
DirectQueueListener.java 监听器类
import com.riskmage.rmccloud.ins.rabbitmq.config.DirectExchangeConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** *消息监听器 */ @Slf4j @Component public class DirectQueueListener { @RabbitHandler @RabbitListener(queues = "directQueue1") public void process1(String message) { log.info("directQueue1接收到消息: " + message); } @RabbitHandler @RabbitListener(queues = "directQueue2") public void process2(String message) { log.info("directQueue2接收到消息: " + message); } }
监听的方法中除了可以接收普通字符串作为消息体,也支持Map、普通Java POJO类(实现Serializable接口)作为消息体。
@Payload 指定消息体对象, @Header 可以接收指定消息的Http头 ,也可以接收MessageProperties 拿到所有消息附加属性(含所有消息http请求头)等。 channel表示连接通道,可以回复ACK
@RabbitHandler
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE1)
public void onMessage(@Payload Map<String, Object> message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
MessageProperties messageProperties ) {
try {
String str = JSON.toJSONString(message);
log.info("directQueue1接收到消息对象:"+str);
log.info("deliveryTag:{}",deliveryTag);
Object value1 = messageProperties.getHeader("key1");
log.info("获取到自定义请求头key1的值: "+value1);
//消息成功处理完成 ,回复ack
log.info("消息成功处理完成 ,回复ack");
channel.basicAck(deliveryTag,false);
}catch (Exception e){
log.error(e.getMessage(),e);
}
}
二、fanout交换机
fanout交换机为扇形模式交换机 ,消息会广播到所有绑定的队列上,此时不需要指定routingKey。
FanoutExchangeConfig配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* fanout交换机为扇形模式交换机 ,消息会发送到所有绑定的队列上。此时不需要指定routingKey
*/
@Configuration
public class FanoutExchangeConfig {
public static final String FANOUT_EXCHANGE = "fanoutExchange";
public static final String FANOUT_QUEUE1 = "fanoutQueue1";
public static final String FANOUT_QUEUE2 = "fanoutQueue2";
//fanout交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
//fanout队列1
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, true);
}
//fanout队列2
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, true);
}
@Bean
public Binding bindingFanoutExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
FanoutExController类
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.riskmage.rmcbase.Resp;
import com.riskmage.rmcbase.mq.QMessage;
import com.riskmage.rmccloud.ins.common.annotation.TokenIgnore;
import com.riskmage.rmccloud.ins.rabbitmq.config.FanoutExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/fanoutEx")
public class FanoutExController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* fanout交换机为扇形模式交换机
* 消息会发送到所有绑定的队列上。
* @return
*/
@TokenIgnore
@RequestMapping(value = "/send",method = RequestMethod.GET)
public Resp send() {
QMessage message =new QMessage();
message.setMessageId(IdUtil.fastSimpleUUID());
message.setSendTime(DateUtil.now());
message.setData(new DemoPO("张三",23));
rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, message);
return Resp.successMsg("fanout消息发送成功!!") ;
}
}
QMessage需要实现序列化接口
public class QMessage implements Serializable {
private String messageId;
private String sendTime;
private Object data;
public QMessage() {
}
public QMessage(String messageId, String sendTime, Object data) {
this.messageId = messageId;
this.sendTime = sendTime;
this.data = data;
}
public String getMessageId() {
return this.messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getSendTime() {
return this.sendTime;
}
public void setSendTime(String sendTime) {
this.sendTime = sendTime;
}
public Object getData() {
return this.data;
}
public void setData(Object data) {
this.data = data;
}
}
FanoutQueueListener监听类
import com.alibaba.fastjson.JSON;
import com.riskmage.rmcbase.mq.QMessage;
import com.riskmage.rmccloud.ins.rabbitmq.config.FanoutExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class FanoutQueueListener {
@RabbitHandler
@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE1)
public void process1(@Payload QMessage message) {
String str = JSON.toJSONString(message);
log.info("收到来自fanoutQueue1的消息: " + str);
}
@RabbitHandler
@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)
public void process2(@Payload QMessage message) {
String str = JSON.toJSONString(message);
log.info("收到来自fanoutQueue2的消息: " + str);
}
}
http://localhost:8074/fanoutEx/send 发送请求试下。
三、topic交换机
topic交换机于fanout交换机类似,同样都是广播消息给多个队列(一对多)。但是推送消息需要指定routingKey, 队列绑定路由关键字支持* 和# 模式匹配。
(星号 *) 用来表示一个单词 (必须出现的)
(井号 #) 用来表示任意数量(零个或多个)单词
TopicExchangeConfig类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic 交换机
*
* (星号 *) 用来表示一个单词 (必须出现的)
* (井号 #) 用来表示任意数量(零个或多个)单词
*/
@Configuration
public class TopicExchangeConfig {
public static final String TOPIC_QUEUE1 = "topicQueue1";
public static final String TOPIC_QUEUE2 = "topicQueue2";
public static final String TOPIC_QUEUE3 = "topicQueue3";
public static final String TOPIC_EXCHANGE = "topicExchange";
// public static final String TOPIC_ROUTING_KEY = "logs.*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public Queue topicQueue3() {
return new Queue(TOPIC_QUEUE3, true);
}
@Bean
public Binding bindingTopicExchange1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("dev.#");
}
@Bean
public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");
}
@Bean
public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#");
}
}
TopicExController类
import com.riskmage.rmcbase.Resp;
import com.riskmage.rmccloud.ins.common.annotation.TokenIgnore;
import com.riskmage.rmccloud.ins.rabbitmq.config.TopicExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/topicEx")
public class TopicExController {
@Autowired
private RabbitTemplate rabbitTemplate;
@TokenIgnore
@RequestMapping(value = "/send",method = RequestMethod.GET)
public Resp send(String routingKey) {
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "这是一条topic测试消息,消息routingkey为"+routingKey);
return Resp.successMsg("topic消息发送成功!!") ;
}
}
TopicQueueListener监听类
import com.riskmage.rmccloud.ins.rabbitmq.config.TopicExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicQueueListener {
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE1)
public void process1(String message) {
log.info("topicQueue1收到消息 : " + message);
}
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)
public void process2(String message) {
log.info("topicQueue2收到消息 : " + message);
}
@RabbitHandler
@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)
public void process3(String message) {
log.info("topicQueue3收到消息 : " + message);
}
}
发送请求推送消息 试下。
http://localhost:8074/topicEx/send?routingKey=dev.goods
http://localhost:8074/topicEx/send?routingKey=test.goods


阅读排行


Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1