博客详情

springboot中整合rocketmq (原创)

作者: 朝如青丝暮成雪
发布时间:2020-08-26 16:20:04  文章分类:rocketmq   阅读(1876)  评论(0)

rocketmq是阿里开源的一款优秀消息中间件 (后捐献给了apache软件基金会),具有低延迟,高性能和可靠性,万亿级容量和灵活的可伸缩性。

github项目地址:https://github.com/apache/rocketmq/

官网地址:http://rocketmq.apache.org/


项目中使用mq的应用场景

1、将项目中方法的同步调用改为异步消息通知

2、使用消息通知机制来使程序解耦合

3、对于高并发请求,使用mq中间件将数据进行缓冲、暂存再交给低并发处理能力的程序进行处理

等......



springboot中整合rocketmq


1、pom.xml引入


   <!--rocketmq 引入-->
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.0.4</version>
    </dependency>

2、application.yml配置


#rocketmq 配置
rocketmq:
  name-server: 192.168.11.10:9876
  producer:
    group: rocketdemo
    send-message-timeout: 3000 #3秒


3、自动化配置的核心类为org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

properties的属性配置文件对应bean为: org.apache.rocketmq.spring.autoconfigure.RocketMQProperties

配置后上面两步后,会自动向spring容器中注入一个 RocketMQTemplate 的bean


4、springboot测试类,测试发送rocketmq消息


import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

 
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMqTest1 {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;



    /**
     *发送可靠同步消息
     */
    @Test
    public void syncSendTest(){

        /**
         * 发送可靠同步消息 ,可以拿到SendResult 返回数据
         * 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
         * 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         * 参数3: 超时时间 毫秒
         */
       // rocketMQTemplate.syncSend("xxx","asdf");
          rocketMQTemplate.syncSend("test-topic-1:myTag","这是一条rmq消息1");
          rocketMQTemplate.syncSend("test-topic-1:myTag","这是一条rmq消息2");

         System.out.println("ok");
    }

}
5、rocketmq的消息监听器



package com.tingcream.rocketdemo.listener;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        consumerGroup = "rocketdemo",
        topic = "test-topic-1",
        selectorExpression="myTag",

        //集群模式(相同topic中一条消息只有一个消费者接收到),广播模式(相同topic中一条消息所有消费者都会接收到)
        messageModel = MessageModel.CLUSTERING,
        consumeMode = ConsumeMode.CONCURRENTLY)// orderly:顺序消费   consurretly:无序消费
public class RmqListener  implements RocketMQListener<String> {


    @Override
    public void onMessage(String message) {
        try {
            //此方法体内一旦发生了异常,消费者会不断重复接收到消息,直到处理成功
            //故注意要把onMessage方法,try catch住并手动处理异常
            System.out.println("接收到rocketmq消息:"+message);
            //int a=10/0;
            //System.out.println(a);

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


注:

1、发送消息也可以直接在rocketmq的控制台进行发送

2、rocketmq支持发送同步可靠消息、异步可靠消息、事务可靠消息,分别需要调用rocketMQTemplate的syncSend、asyncSend、sendOneWay 、sendOneWayOrderly方法即可。

3、rocketmq消息的监听程序中 onMessage方法 一定要手动try catch 处理下异常,否则方法体中一旦发生异常(如因为写的业务逻辑不够严谨导致空指针) ,则会不断重复接收到相同的消息。





关键字:  rocketmq
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号