rocketmq是阿里开源的一款优秀消息中间件 (后捐献给了apache软件基金会),具有低延迟,高性能和可靠性,万亿级容量和灵活的可伸缩性。
github项目地址:https://github.com/apache/rocketmq/
官网地址:http://rocketmq.apache.org/
项目中使用mq的应用场景
1、将项目中方法的同步调用改为异步消息通知
2、使用消息通知机制来使程序解耦合
3、对于高并发请求,使用mq中间件将数据进行缓冲、暂存再交给低并发处理能力的程序进行处理
等......
springboot中整合rocketmq
1、pom.xml引入
<!--rocketmq 引入--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
2、application.yml配置
#rocketmq 配置 rocketmq: name-server: 192.168.11.10:9876 producer: group: rocketdemo send-message-timeout: 3000 #3秒
3、自动化配置的核心类为org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
properties的属性配置文件对应bean为: org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
配置后上面两步后,会自动向spring容器中注入一个 RocketMQTemplate 的bean
4、springboot测试类,测试发送rocketmq消息
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RocketMqTest1 { @Autowired private RocketMQTemplate rocketMQTemplate; /** *发送可靠同步消息 */ @Test public void syncSendTest(){ /** * 发送可靠同步消息 ,可以拿到SendResult 返回数据 * 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。 * 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。 * 参数1: topic:tag * 参数2: 消息体 可以为一个对象 * 参数3: 超时时间 毫秒 */ // rocketMQTemplate.syncSend("xxx","asdf"); rocketMQTemplate.syncSend("test-topic-1:myTag","这是一条rmq消息1"); rocketMQTemplate.syncSend("test-topic-1:myTag","这是一条rmq消息2"); System.out.println("ok"); } }5、rocketmq的消息监听器
package com.tingcream.rocketdemo.listener; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener( consumerGroup = "rocketdemo", topic = "test-topic-1", selectorExpression="myTag", //集群模式(相同topic中一条消息只有一个消费者接收到),广播模式(相同topic中一条消息所有消费者都会接收到) messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)// orderly:顺序消费 consurretly:无序消费 public class RmqListener implements RocketMQListener<String> { @Override public void onMessage(String message) { try { //此方法体内一旦发生了异常,消费者会不断重复接收到消息,直到处理成功 //故注意要把onMessage方法,try catch住并手动处理异常 System.out.println("接收到rocketmq消息:"+message); //int a=10/0; //System.out.println(a); }catch (Exception e){ e.printStackTrace(); } } }
注:
1、发送消息也可以直接在rocketmq的控制台进行发送
2、rocketmq支持发送同步可靠消息、异步可靠消息、事务可靠消息,分别需要调用rocketMQTemplate的syncSend、asyncSend、sendOneWay 、sendOneWayOrderly方法即可。
3、rocketmq消息的监听程序中 onMessage方法 一定要手动try catch 处理下异常,否则方法体中一旦发生异常(如因为写的业务逻辑不够严谨导致空指针) ,则会不断重复接收到相同的消息。
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1