Springboot整合activeMQ之主题,不懂也得懂了吧

  

前言

  

今天和大家分享springboot整合activeMq之主题(主题)——发布/订阅模式,类似微信公众号,我们关注公共就可以收到消息,主题需要消费者先订阅才能收到消息,如果没有消费者订阅,生产者产生的消息就是废消息(发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费)。本次实例支持websocket,消息重发,持久化…
版本信息:SpringBoot2.1.5 activeMq 5.15.10

  

消费者工程

  

消费者工程目录

  

 Springboot整合activeMQ之主题,不懂也得懂了吧

  

<强> pom文件

  
 <代码> & lt; dependency>
  & lt; groupId> org.springframework.boot
  & lt; artifactId> spring-boot-starter
  & lt;/dependency>
  
  & lt; dependency>
  & lt; groupId> org.springframework.boot
  & lt; artifactId> spring-boot-starter-test
  & lt; scope> test
  & lt;/dependency>
  
  & lt; dependency>
  & lt; groupId> org.springframework.boot
  & lt; artifactId> spring-boot-starter-activemq
  & lt;/dependency>
  
  & lt; dependency>
  & lt; groupId> org.springframework.boot
  & lt; artifactId> spring-boot-starter-websocket
  & lt;/dependency> 
  

<强> yml文件配置

  
 <代码>服务器
  端口:8085
  春天:
  activemq:
  broker-url: tcp://localhost: 61616
  用户:管理员
  密码:admin
  jms:
  pub-sub-domain:真
  #自己的主题名字
  myTopic: boot_actviemq_topic  
  

<>强配置类

  
 <代码>包com.example.topic_customer.config;
  
  进口org.apache.activemq.ActiveMQConnectionFactory;
  进口org.apache.activemq.RedeliveryPolicy;
  进口org.apache.activemq.command.ActiveMQTopic;
  进口org.springframework.beans.factory.annotation.Value;
  进口org.springframework.context.annotation.Bean;
  进口org.springframework.context.annotation.Configuration;
  进口org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  进口org.springframework.stereotype.Component;
  进口org.springframework.web.socket.server.standard.ServerEndpointExporter;
  
  进口javax.jms.ConnectionFactory;
  进口javax.jms.Topic;/* *
  * @Date 2019/11/13十
  * @Desc消费者配置类
  */@ configuration
  公开课BeanConfig {
  @ value (" $ {myTopic} ")
  私人字符串myTopic;/* *
  * websocket配置
  *
  * @return
  */@ bean
  公共ServerEndpointExporter ServerEndpointExporter () {
  返回新ServerEndpointExporter ();
  }
  
  @ bean
  公共话题的话题(){
  返回新ActiveMQTopic (myTopic);
  }
  
  公共RedeliveryPolicy RedeliveryPolicy () {
  RedeliveryPolicy RedeliveryPolicy=new RedeliveryPolicy ();//是否在每次尝试重新发送失败后,增长这个等待时间
  redeliveryPolicy.setUseExponentialBackOff(真正的);//重发次数,默认为6次,这里设置为10次,1表示不限次数
  redeliveryPolicy.setMaximumRedeliveries (1);//重发时间间隔,默认为1毫秒,设置为10000毫秒
  redeliveryPolicy.setInitialRedeliveryDelay (10000);//表示没有拖延只有UseExponentialBackOff(真正的)为真时生效//第一次失败后重新发送之前等待10000毫秒,第二次失败再等待10000 * 2毫秒//第三次翻倍10000 * 2 * 2,以此类推
  redeliveryPolicy.setBackOffMultiplier (2);//是否避免消息碰撞
  redeliveryPolicy.setUseCollisionAvoidance(真正的);//设置重发最大拖延时间360000毫秒表示没有拖延只有UseExponentialBackOff(真正的)为真时生效
  redeliveryPolicy.setMaximumRedeliveryDelay (360000);
  返回redeliveryPolicy;
  }
  
  公共ConnectionFactory ConnectionFactory () {
  ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory ();//设置重发属性
  connectionFactory.setRedeliveryPolicy (redeliveryPolicy ());
  返回connectionFactory;
  }/* *
  * JMS队列的监听容器工厂
  */@ bean (name=" jmsTopicListener ")
  公共DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory () {
  DefaultJmsListenerContainerFactory工厂=新DefaultJmsListenerContainerFactory ();
  factory.setConnectionFactory (connectionFactory ());
  factory.setPubSubDomain(真正的);
  factory.setSessionTransacted(真正的);
  factory.setAutoStartup(真正的);//开启持久化订阅
  factory.setSubscriptionDurable(真正的);//重连间隔时间
  factory.setRecoveryInterval (1000 l);
  factory.setClientId (“topic_provider: zb1”);
  返回工厂;
  }
  }

Springboot整合activeMQ之主题,不懂也得懂了吧