在前面介绍的消息队列中,都没有涉及路由选择的问题,因为消费者监听的队列名称与生产者发布的队列名称是相同的,路由key等于队列名称,因而也是相同的。
如果消费者不关心队列名称(这时队列名称是匿名或随机的),而只关心路由key,就需要用到路由key的绑定。关键代码就是下面这行:
channel.queueBind(queue,EXCHANGE_NAME,routingKey);//channel 中 绑定 queue和路由key
示例代码如下:
日志生产者
package com.tingcream.www.test.rabbitmq.directRouting;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 日志生产者
* routing logs
* @author jelly
*/
public class RoutingLogSender {
private static final String EXCHANGE_NAME = "direct_routing_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("zhangsan");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");//注意是direct
//发送信息
for (String routingKey:routingKeys){
String message = "这是一条消息 " ;
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("发送: "+routingKey +":" + message);
}
channel.close();
connection.close();
}
}
日志消费者A
package com.tingcream.www.test.rabbitmq.directRouting;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* direct 消息。
* 消费者A
* 客户端根据路由key 筛选 需要接收处理的消息
*/
public class RoutingLogReceiverA {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_routing_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//获取匿名队列名称
String queueName=channel.queueDeclare().getQueue();
//根据路由关键字进行绑定
for (String routingKey:routingKeys){
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);//通道 绑定 queue
System.out.println("绑定 :"+EXCHANGE_NAME+"," +
" queue:"+queueName+", BindRoutingKey:" + routingKey);
}
System.out.println("RoutingLogReceiverA Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到 :" + envelope.getRoutingKey() + ":" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
日志消费者B
package com.tingcream.www.test.rabbitmq.directRouting;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* direct 消息。
* 消费者B
* 客户端根据路由key 筛选 需要接收处理的消息
*/
public class RoutingLogReceiverB {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_routing_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意这里的交换机类型为direct 类型
//获取匿名队列名称
String queue=channel.queueDeclare().getQueue();
//根据路由关键字进行绑定
for (String routingKey:routingKeys){
channel.queueBind(queue,EXCHANGE_NAME,routingKey);//通道 绑定 queue
System.out.println("绑定:"+EXCHANGE_NAME+"," +
" queue:"+queue+", BindRoutingKey:" + routingKey);
}
System.out.println("RoutingLogReceiverB Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到 :" + envelope.getRoutingKey() + ":" + message );
}
};
channel.basicConsume(queue, true, consumer);
}
}
分别启动消费者A、消费者B 和生产者 。


阅读排行


Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1