本文源码:
GitHub·点这里| |
GitEE·点这里
1架构图片
2、角色分类
(1),代理
RocketMQ的核心,接收生产商发过来的消息,处理消费者的消费消息请求,消息的持久化存储,服务端过滤功能等。
(2),命名服务器
消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息。类似微服务中注册中心的服务注册,发现,下线,上线的概念。
热备份:
NamServer可以部署多个,相互之间独立,其他角色同时向多个命名服务器机器上报状态信息。
心跳机制:
命名服务器中的经纪人,主题等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话,命名服务器会认为某个机器出故障不可用。
(3),生产商
消息的生成者,最常用的生产商类就是DefaultMQProducer。
(4),消费者
消息的消费者,常用消费者类
DefaultMQPushConsumer
收到消息后自动调用传入的处理方法来处理,实时性高
DefaultMQPullConsumer
用户自主控制,灵活性更高。
二、代码实现案例
2、配置文件
<前>
<代码> rocketmq:
#生产者配置
制作人:
isOnOff:>
3、生产者配置
<前>
<代码类=" lang-java ">/* *
* RocketMQ生产者配置
*/@ configuration
公开课ProducerConfig {
私有静态最终记录器日志=LoggerFactory.getLogger (ProducerConfig.class);
@ value (" $ {rocketmq.producer.groupName} ")
私人字符串groupName;
@ value (" $ {rocketmq.producer.namesrvAddr} ")
私人字符串namesrvAddr;
@ value (" $ {rocketmq.producer.maxMessageSize} ")
私人整数maxMessageSize;
@ value (" $ {rocketmq.producer.sendMsgTimeout} ")
私人整数sendMsgTimeout;
@ value (" $ {rocketmq.producer.retryTimesWhenSendFailed} ")
私人整数retryTimesWhenSendFailed;
@ bean
公共DefaultMQProducer getRocketMQProducer () {
DefaultMQProducer生产商;
制片人=new DefaultMQProducer (this.groupName);
producer.setNamesrvAddr (this.namesrvAddr);//如果需要同一个jvm中不同的制片人往不同的mq集群,发送消息,需要设置不同的都
如果(this.maxMessageSize !=null) {
producer.setMaxMessageSize (this.maxMessageSize);
}
如果(this.sendMsgTimeout !=null) {
producer.setSendMsgTimeout (this.sendMsgTimeout);
}//如果发送消息失败,设置重试次数,默认为2次
如果(this.retryTimesWhenSendFailed !=null) {
producer.setRetryTimesWhenSendFailed (this.retryTimesWhenSendFailed);
}
尝试{
producer.start ();
}捕捉(MQClientException e) {
e.printStackTrace ();
}
返回生产商;
}
}
代码>
/* *
* RocketMQ消费者配置
*/@ configuration
公开课ConsumerConfig {
私有静态最终记录器日志=LoggerFactory.getLogger (ConsumerConfig.class);
@ value (" $ {rocketmq.consumer.namesrvAddr} ")
私人字符串namesrvAddr;
@ value (" $ {rocketmq.consumer.groupName} ")
私人字符串groupName;
@ value (" $ {rocketmq.consumer.consumeThreadMin} ")
私人int consumeThreadMin;
@ value (" $ {rocketmq.consumer.consumeThreadMax} ")
私人int consumeThreadMax;
@ value (" $ {rocketmq.consumer.topics} ")
私人主题字符串;
@ value (" $ {rocketmq.consumer.consumeMessageBatchMaxSize} ")
私人int consumeMessageBatchMaxSize;
@
私人RocketMsgListener msgListener;
@ bean
公共DefaultMQPushConsumer getRocketMQConsumer () {
DefaultMQPushConsumer消费=new DefaultMQPushConsumer (groupName);
consumer.setNamesrvAddr (namesrvAddr);
consumer.setConsumeThreadMin (consumeThreadMin);
consumer.setConsumeThreadMax (consumeThreadMax);
consumer.registerMessageListener (msgListener);
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize (consumeMessageBatchMaxSize);
尝试{
String [] topicTagsArr=topics.split (“;”);
(字符串topicTags: topicTagsArr) {
String [] topicTag=topicTags.split (“~”);
consumer.subscribe (topicTag [0], topicTag [1]);
}
consumer.start ();
}捕捉(MQClientException e) {
e.printStackTrace ();
}
返回消费者;
}
}