实际上在AQMP(高级消息队列协议)中,共有direct、fanout、topic、headers 四种不同类型的交换机。
topic交换机 称为主题交换机,能够让消费者以通配符的方式订阅相关主题。topic交换机上的路由key通常是由点号隔开的一系列的标识符。
* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。
如 "audit.#"能够匹配到"audit.irs.corporate",但是“audit.*"只会匹配到"audit.irs"。
示例代码如下。
消息生产者:
package com.tingcream.www.test.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* topic 生产者
* 这种应该属于模糊匹配
* (星号)可以替代一个词
# (井号)可以替代0或者更多的词
*/
public class TopicSender {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("zhangsan");
factory.setPassword("123456");
connection=factory.newConnection();
channel=connection.createChannel();
//声明一个匹配模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//待发送的消息
String[] routingKeys=new String[]{
"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"
};
//发送消息
for(String routingKey :routingKeys){
String message = "来自 "+routingKey+" routingKey' s message!";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("TopicSender 发送消息 " + routingKey + ":" + message );
}
}catch (Exception e){
e.printStackTrace();
if (connection!=null){
channel.close();
connection.close();
}
}finally {
if (connection!=null){
channel.close();
connection.close();
}
}
}
}
消息消费者A
package com.tingcream.www.test.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicReceiverA {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
// factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个匹配模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.orange.*"};
//绑定路由
for (String routingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
}
System.out.println("TopicReceiverA Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("TopicReceiverA 接收消息 " + envelope.getRoutingKey() + ":" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消息消费者B
package com.tingcream.www.test.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicReceiverB {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个匹配模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//绑定路由
for (String routingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
}
System.out.println("TopicReceiverB Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("TopicReceiverB 接收消息 " + envelope.getRoutingKey() + ":" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
依次启动消费者A、消费者B和生产者,控制台输出如下:
TopicSender:
TopicSender 发送消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicSender 发送消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!
TopicSender 发送消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicSender 发送消息 quick.brown.fox:来自 quick.brown.fox routingKey' s message!
TopicSender 发送消息 quick.orange.male.rabbit:来自 quick.orange.male.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!
TopicReceiverA:
绑定:topic_logs, queue:amq.gen-xFQbtZcDKv6uDcmn8hHzSQ, BindRoutingKey:*.orange.*
TopicReceiverA Waiting for messages
TopicReceiverA 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverA 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverA 接收消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!
TopicReceiverB:
绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:*.*.rabbit
绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:lazy.#
TopicReceiverB Waiting for messages
TopicReceiverB 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverB 接收消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!
登上rabbit管理控制台,查看topic_logs 交换机(topic类型)


阅读排行


Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1