前言
今天和大家分享springboot整合activeMq之主题(主题)——发布/订阅模式,类似微信公众号,我们关注公共就可以收到消息,主题需要消费者先订阅才能收到消息,如果没有消费者订阅,生产者产生的消息就是废消息(发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费)。本次实例支持websocket,消息重发,持久化…
版本信息:SpringBoot2.1.5 activeMq 5.15.10
消费者工程
消费者工程目录
<强> pom文件强>
<代码> & lt; dependency> & lt; groupId> org.springframework.boot & lt; artifactId> spring-boot-starter & lt;/dependency> & lt; dependency> & lt; groupId> org.springframework.boot & lt; artifactId> spring-boot-starter-test & lt; scope> test & lt;/dependency> & lt; dependency> & lt; groupId> org.springframework.boot & lt; artifactId> spring-boot-starter-activemq & lt;/dependency> & lt; dependency> & lt; groupId> org.springframework.boot & lt; artifactId> spring-boot-starter-websocket & lt;/dependency>代码>
<强> yml文件配置强>
<代码>服务器 端口:8085 春天: activemq: broker-url: tcp://localhost: 61616 用户:管理员 密码:admin jms: pub-sub-domain:真 #自己的主题名字 myTopic: boot_actviemq_topic 代码>
<>强配置类强>
<代码>包com.example.topic_customer.config; 进口org.apache.activemq.ActiveMQConnectionFactory; 进口org.apache.activemq.RedeliveryPolicy; 进口org.apache.activemq.command.ActiveMQTopic; 进口org.springframework.beans.factory.annotation.Value; 进口org.springframework.context.annotation.Bean; 进口org.springframework.context.annotation.Configuration; 进口org.springframework.jms.config.DefaultJmsListenerContainerFactory; 进口org.springframework.stereotype.Component; 进口org.springframework.web.socket.server.standard.ServerEndpointExporter; 进口javax.jms.ConnectionFactory; 进口javax.jms.Topic;/* * * @Date 2019/11/13十 * @Desc消费者配置类 */@ configuration 公开课BeanConfig { @ value (" $ {myTopic} ") 私人字符串myTopic;/* * * websocket配置 * * @return */@ bean 公共ServerEndpointExporter ServerEndpointExporter () { 返回新ServerEndpointExporter (); } @ bean 公共话题的话题(){ 返回新ActiveMQTopic (myTopic); } 公共RedeliveryPolicy RedeliveryPolicy () { RedeliveryPolicy RedeliveryPolicy=new RedeliveryPolicy ();//是否在每次尝试重新发送失败后,增长这个等待时间 redeliveryPolicy.setUseExponentialBackOff(真正的);//重发次数,默认为6次,这里设置为10次,1表示不限次数 redeliveryPolicy.setMaximumRedeliveries (1);//重发时间间隔,默认为1毫秒,设置为10000毫秒 redeliveryPolicy.setInitialRedeliveryDelay (10000);//表示没有拖延只有UseExponentialBackOff(真正的)为真时生效//第一次失败后重新发送之前等待10000毫秒,第二次失败再等待10000 * 2毫秒//第三次翻倍10000 * 2 * 2,以此类推 redeliveryPolicy.setBackOffMultiplier (2);//是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(真正的);//设置重发最大拖延时间360000毫秒表示没有拖延只有UseExponentialBackOff(真正的)为真时生效 redeliveryPolicy.setMaximumRedeliveryDelay (360000); 返回redeliveryPolicy; } 公共ConnectionFactory ConnectionFactory () { ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory ();//设置重发属性 connectionFactory.setRedeliveryPolicy (redeliveryPolicy ()); 返回connectionFactory; }/* * * JMS队列的监听容器工厂 */@ bean (name=" jmsTopicListener ") 公共DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory () { DefaultJmsListenerContainerFactory工厂=新DefaultJmsListenerContainerFactory (); factory.setConnectionFactory (connectionFactory ()); factory.setPubSubDomain(真正的); factory.setSessionTransacted(真正的); factory.setAutoStartup(真正的);//开启持久化订阅 factory.setSubscriptionDurable(真正的);//重连间隔时间 factory.setRecoveryInterval (1000 l); factory.setClientId (“topic_provider: zb1”); 返回工厂; } }Springboot整合activeMQ之主题,不懂也得懂了吧