当要处理的任务非常繁忙时,单个机器的处理效率较低。
rabbitmq可以将 消息 (任务),发布给多个消费者。注意,这里exchange(称为交换机或路由器,exchange是rabbitmq中非常重要的概念,后面会详细介绍)类型仍然是direct,这说明同一时刻仍只有一个消费者在消费队列。
如有消费者有多个,同时监听了同一个队列。则消息队列会在多个消费者中轮询派发。
生产者,产生消息(任务)
package com.tingcream.www.test.rabbitmq.directTask; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* direct task
* 分布任务给多个消费端处理
*
* @author jelly
* @date 2017年9月23日 下午9:32:59
*/
public class NewTask {
private static final String TASK_QUEUE_NAME="direct_task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("zhangsan");
factory.setPassword("123456");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//分发信息
for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send : "+message); } channel.close(); connection.close(); } }
消费者A
package com.tingcream.www.test.rabbitmq.directTask;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消费者A
*/
public class WorkA {
private static final String TASK_QUEUE_NAME = "direct_task_queue";
public static void main(String[] args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//queue 持久化为true 持久到服务器本地磁盘中
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("WorkerA Waiting for messages");
//每次从队列获取的数量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WorkerA Received :" + message );
try {
doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("WorkerA Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
//消息消费完成确认 不自动 ack
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(2000); // 暂停2秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
消费者B
package com.tingcream.www.test.rabbitmq.directTask;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消费者B
*/
public class WorkB {
private static final String TASK_QUEUE_NAME = "direct_task_queue";
public static void main(String[] args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//queue 持久化为true 持久到服务器本地磁盘中
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("WorkerB Waiting for messages");
//每次从队列获取的数量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WorkerB Received :" + message );
try {
doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("WorkerB Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
//消息消费完成确认 不自动 ack
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(2000); // 暂停2秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
依次启动消费者A、消费者B 和生产者,查看控制台输出如下。
生产者:
NewTask send : Hello RabbitMQ0
NewTask send : Hello RabbitMQ1
NewTask send : Hello RabbitMQ2
NewTask send : Hello RabbitMQ3
NewTask send : Hello RabbitMQ4
NewTask send : Hello RabbitMQ5
NewTask send : Hello RabbitMQ6
NewTask send : Hello RabbitMQ7
NewTask send : Hello RabbitMQ8
NewTask send : Hello RabbitMQ9
消费者A:
WorkerA Received :Hello RabbitMQ1
WorkerA Done
WorkerA Received :Hello RabbitMQ3
WorkerA Done
WorkerA Received :Hello RabbitMQ5
WorkerA Done
WorkerA Received :Hello RabbitMQ7
WorkerA Done
WorkerA Received :Hello RabbitMQ9
WorkerA Done
WorkerB Received :Hello RabbitMQ0
WorkerB Done
WorkerB Received :Hello RabbitMQ2
WorkerB Done
WorkerB Received :Hello RabbitMQ4
WorkerB Done
WorkerB Received :Hello RabbitMQ6
WorkerB Done
WorkerB Received :Hello RabbitMQ8
WorkerB Done


阅读排行


Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1