# AliCloudRocketMQDemo **Repository Path**: dadeity/ali-cloud-rocket-mqdemo ## Basic Information - **Project Name**: AliCloudRocketMQDemo - **Description**: 存放研究阿里云消息队列MQ RocketMQ的整合代码,在官方实例代码的基础上进行改造 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-01-15 - **Last Updated**: 2025-01-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 阿里云消息队列MQ(现在MQ指的是RocketMQ)和Spring Boot的整合 前言: 开源版本Rocket和商业版本的RocketMQ有些不同,研究的是商业版本的RocketMQ,阿里云的官方文档,感觉有点乱。看不咋明白,网上虽然有教程,大都还是有点缺少,有时候会突然跳了步骤,抹去了一些细节。 #### 前置步骤 **阿里云MQ开通及子Access账号的权限的生成** 阿里云MQ开通 ~~~ 开通阿里云MQ(现在叫阿里云RocketMQ)百度的教程够用,不多记录,需要的参考该地址http://mtw.so/5Q5nHp,进行开通。PS:页面由于开发人员一直在更新,教程的页面不一定和现有页面完全一样,所以不要死脑筋。 ~~~ 子Access账号 阿里云可以为账号,创建两个字段,用于你身份的验证,下图中可以进入申请子账户 ![image-20211103143554371](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103143554371.png) 跳出提示,选择`开始使用子用户AccessKey` ![image-20211103143728935](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103143728935.png) 点击 创建用户 ![image-20211103144038053](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103144038053.png) ![image-20211103144205700](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103144205700.png) 点击确定,会要你验证手机,输入验证码即可 创建完以后会给你两个字段的值,一个是`AccessKey ID`和`AccessKey Secret`一定要及时**妥善保存**,虽然可以重新创建 ![image-20211103144539923](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103144539923.png) **ps:这里别忘了给账户赋予MQ的权限,不然无法进行消息的订阅和发送** 如何设置权限? ![image-20211103144907801](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103144907801.png) ![image-20211103145204332](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103145204332.png) 点击添加权限,添加以下权限 ![image-20211103145226492](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103145226492.png) ##### Topic和Group的创建(在阿里云控制台页面进行) 首先创建实例,点击`创建实例` ![image-20211103151514954](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103151514954.png) ![image-20211103151801021](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103151801021.png) 点击确定 ![image-20211103151839645](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103151839645.png) 按提示创建Group和Topic 即可,然后将Group和Topic的名称,填入到`application.properties`对应字段中 nameSrvAddr的获取,在创建好Group和Topic后,从这进入到接入点的获取页面 ![image-20211103153043593](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103153043593.png) ![image-20211103153019978](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103153019978.png) 接入点有两个,分别对应了不同的接入方式。TCP和HTTP,我这里用的TCP协议的接入方式 这里只能获取到公网的接入地址,没有内网 # 开始开发 #### SpringBoot整合阿里云RocketMQ(普通消息为例) Maven工程 POM文件依赖 ~~~xml org.springframework.boot spring-boot-starter-web com.aliyun.openservices ons-client 1.8.4.Final org.projectlombok lombok 1.18.16 org.springframework.boot spring-boot-starter-test test ~~~ application.properties ~~~ #启动测试之前请替换如下 XXX 为您的配置,从阿里云MQ里获取,具体获取方式,看下前置步骤 rocketmq.accessKey=xxx rocketmq.secretKey=xxx rocketmq.nameSrvAddr=xxx rocketmq.topic=TpMQTest rocketmq.groupId=GID_MQTEST rocketmq.tag=* rocketmq.orderTopic=XXX rocketmq.orderGroupId=XXX rocketmq.orderTag=* ~~~ 配置类,用于读取application.properties中相应字段的值 ~~~java import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.Properties; @Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getOrderTopic() { return orderTopic; } public void setOrderTopic(String orderTopic) { this.orderTopic = orderTopic; } public String getOrderGroupId() { return orderGroupId; } public void setOrderGroupId(String orderGroupId) { this.orderGroupId = orderGroupId; } public String getOrderTag() { return orderTag; } public void setOrderTag(String orderTag) { this.orderTag = orderTag; } } ~~~ 消费者的注册类 消费者的build,主要目的是将配置文件里的配置设置到ConsumerBean中,使其在Spring启动时,一同启动。 ~~~java import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.aliyun.openservices.springboot.example.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Properties; //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 @Configuration public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅关系 Map subscriptionTable = new HashMap(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.setExpression(mqConfig.getTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } } ~~~ 注册完成以后,开启监听,在消息队列有消息时就会进行消费 @Component这个注解,阿里云官方的Demo,并没有出现,导致一直消费者消费不到消息。后来加上以后就能正常消费消息了 ~~~java import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Component @Slf4j public class DemoMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { log.info("Receive: " + message); try { //do something.. //Action.CommitMessag 进行消息的确认 return Action.CommitMessage; } catch (Exception e) { //消费失败 return Action.ReconsumeLater; } } } ~~~ 生产者注册类 ~~~java import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.springboot.example.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } } ~~~ 生产者生产消息工具类 ~~~java import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.aliyun.openservices.springboot.example.config.MqConfig; import org.springframework.stereotype.Component; /** * @description:

RocketMessageProducer rocketMQ消息生产者

* @author: LiRen **/ @Component public class RocketMessageProducer { private static ProducerBean producer; private static MqConfig mqConfig; public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) { this.producer = producer; this.mqConfig = mqConfig; } /** * @Description:

生产 普通 消息

* @author: LiRen */ public static void producerMsg(String tag, String key, String body) { Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes()); long time = System.currentTimeMillis(); try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println(time + " Send mq message success.Topic is:" + msg.getTopic() + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey() + " msgId is:" + sendResult.getMessageId()); } catch (ONSClientException e) { e.printStackTrace(); System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic()); // TODO 发送失败 } } } ~~~ WEB接口,测试Controller类 ~~~java /** * ClassName: ProducerController
* Description:
* date: 2021/11/3 11:05
* * @author Hesion
* @version * @since JDK 1.8 */ import com.aliyun.openservices.springboot.example.normal.RocketMessageProducer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; /** * 测试生产者 * @author: hesion * @create: 2021-11-03 11:05 **/ @RestController public class ProducerController { /** * rocketmq demo */ @RequestMapping(value = {"/useRocketMQ"}, method = RequestMethod.GET) public String useRocketMQ() { RocketMessageProducer.producerMsg("RocketProdTagTest","RocketProdKeyTest","RocketProdBodyTest"); return "请求成功!"; } } ~~~ ![image-20211103160219978](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103160219978.png) ![image-20211103160333530](https://gitee.com/HesionBlack/typoraimg/raw/master/img//image-20211103160333530.png) [Gitee代码地址](https://gitee.com/HesionBlack/ali-cloud-rocket-mqdemo.git)