rabbitmq是一款优秀的,基于AMQP(高级消息队列协议)的实现 ,企业级的消息中间件,同时也支持mqtt协议(需要启用插件) 。
AMQP: Advanced Message Queuing Protocol 高级消息队列协议 ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang实现有RabbitMQ等。
MQTT: Message Queuing Telemetry Transport,消息队列遥测传输,是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
其他消息中间件产品 ,有 Kafka、 alibaba RocketMQ 已捐献给apache开源软件基金会 、apache activeMQ 、apollo 等。
开始学习rabbitmq 前,我们先启动rabbitmq 服务器。
rabbitmq AMQP helloword 例子。
消息生产者
package com.tingcream.www.test.rabbitmq.helloworld;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* rabbitmq
* AMQP 消息生产者
*
* direct
*/
public class Producer {
public final static String QUEUE_NAME="rabbitMQ-helloworld";
public static void main(String[] args) throws Exception {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("zhangsan");
factory.setPassword("123456");
//指定端口
factory.setPort(AMQP.PROTOCOL.PORT);
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
/*
queueDeclare
第一个参数:队列名称、
第二个参数: 是否持久化(true表示是,队列将在服务器重启时生存)、
第三个参数:是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、
第四个参数: 当所有消费者客户端连接断开时是否自动删除队列、
第五个参数:队列的其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "你好,rabbitmq";
//往队列中发出一条消息
/*
* basicPublish
* 第一个参数: 交换机名称 默认交换机名称 amq.direct
* 第二个参数:队列映射的路由key、
* 第三个参数:消息的其他属性
* 第四个参数:发送信息的主体 message.getBytes()
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息:" + message);
//关闭频道和连接
channel.close();
connection.close();
}
}
消息消费者
package com.tingcream.www.test.rabbitmq.helloworld;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* rabbitmq
* AMQP 消息消费者
*/
public class Customer {
//消息队列名称
private final static String QUEUE_NAME="rabbitMQ-helloworld";
public static void main(String[] args) throws Exception {
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.9.130");
//指定用户 密码
factory.setUsername("lisi");
factory.setPassword("123456");
//指定端口
factory.setPort(AMQP.PROTOCOL.PORT);
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接受到消息:" + message );
}
}
}
分别启动消费者和生产者,看看控制台的输出。


阅读排行


Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1