SpringBoot2高级应用(01):整合RocketMQ,实现请求异步处理

  

本文源码:   GitHub·点这里| |   GitEE·点这里

  

     1架构图片

  

   SpringBoot2高级应用(01):整合RocketMQ,实现请求异步处理”>
  <h3 id=      2、角色分类   

(1),代理

  

RocketMQ的核心,接收生产商发过来的消息,处理消费者的消费消息请求,消息的持久化存储,服务端过滤功能等。

  

(2),命名服务器

  

消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息。类似微服务中注册中心的服务注册,发现,下线,上线的概念。

  

热备份:   
NamServer可以部署多个,相互之间独立,其他角色同时向多个命名服务器机器上报状态信息。   

心跳机制:   
命名服务器中的经纪人,主题等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话,命名服务器会认为某个机器出故障不可用。   

(3),生产商

  

消息的生成者,最常用的生产商类就是DefaultMQProducer。

  

(4),消费者

  

消息的消费者,常用消费者类   
DefaultMQPushConsumer   
收到消息后自动调用传入的处理方法来处理,实时性高   
DefaultMQPullConsumer   
用户自主控制,灵活性更高。   

     二、代码实现案例

  

     2、配置文件

  <前>   <代码> rocketmq:   #生产者配置   制作人:   isOnOff:>      3、生产者配置   <前>   <代码类=" lang-java ">/* *   * RocketMQ生产者配置   */@ configuration   公开课ProducerConfig {   私有静态最终记录器日志=LoggerFactory.getLogger (ProducerConfig.class);   @ value (" $ {rocketmq.producer.groupName} ")   私人字符串groupName;   @ value (" $ {rocketmq.producer.namesrvAddr} ")   私人字符串namesrvAddr;   @ value (" $ {rocketmq.producer.maxMessageSize} ")   私人整数maxMessageSize;   @ value (" $ {rocketmq.producer.sendMsgTimeout} ")   私人整数sendMsgTimeout;   @ value (" $ {rocketmq.producer.retryTimesWhenSendFailed} ")   私人整数retryTimesWhenSendFailed;   @ bean   公共DefaultMQProducer getRocketMQProducer () {   DefaultMQProducer生产商;   制片人=new DefaultMQProducer (this.groupName);   producer.setNamesrvAddr (this.namesrvAddr);//如果需要同一个jvm中不同的制片人往不同的mq集群,发送消息,需要设置不同的都   如果(this.maxMessageSize !=null) {   producer.setMaxMessageSize (this.maxMessageSize);   }   如果(this.sendMsgTimeout !=null) {   producer.setSendMsgTimeout (this.sendMsgTimeout);   }//如果发送消息失败,设置重试次数,默认为2次   如果(this.retryTimesWhenSendFailed !=null) {   producer.setRetryTimesWhenSendFailed (this.retryTimesWhenSendFailed);   }   尝试{   producer.start ();   }捕捉(MQClientException e) {   e.printStackTrace ();   }   返回生产商;   }   }      

/* *   * RocketMQ消费者配置   */@ configuration   公开课ConsumerConfig {   私有静态最终记录器日志=LoggerFactory.getLogger (ConsumerConfig.class);   @ value (" $ {rocketmq.consumer.namesrvAddr} ")   私人字符串namesrvAddr;   @ value (" $ {rocketmq.consumer.groupName} ")   私人字符串groupName;   @ value (" $ {rocketmq.consumer.consumeThreadMin} ")   私人int consumeThreadMin;   @ value (" $ {rocketmq.consumer.consumeThreadMax} ")   私人int consumeThreadMax;   @ value (" $ {rocketmq.consumer.topics} ")   私人主题字符串;   @ value (" $ {rocketmq.consumer.consumeMessageBatchMaxSize} ")   私人int consumeMessageBatchMaxSize;   @   私人RocketMsgListener msgListener;   @ bean   公共DefaultMQPushConsumer getRocketMQConsumer () {   DefaultMQPushConsumer消费=new DefaultMQPushConsumer (groupName);   consumer.setNamesrvAddr (namesrvAddr);   consumer.setConsumeThreadMin (consumeThreadMin);   consumer.setConsumeThreadMax (consumeThreadMax);   consumer.registerMessageListener (msgListener);   consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);   consumer.setConsumeMessageBatchMaxSize (consumeMessageBatchMaxSize);   尝试{   String [] topicTagsArr=topics.split (“;”);   (字符串topicTags: topicTagsArr) {   String [] topicTag=topicTags.split (“~”);   consumer.subscribe (topicTag [0], topicTag [1]);   }   consumer.start ();   }捕捉(MQClientException e) {   e.printStackTrace ();   }   返回消费者;   }   }   

SpringBoot2高级应用(01):整合RocketMQ,实现请求异步处理