消息中间件
消息中间件即MQ,其具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能 。是目前主流的异步RPC 实现手段之一,常见的消息中间件有:ActiveMQ、RabbitMQ,Kafka,RocketMQ等
MQ的组成
-
Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
-
Broker:消息服务器,作为server提供消息核心服务
-
Producer:消息生产者,负责生产消息传输给broker
-
Consumer:消息消费者,负责从broker获取消息并进行处理
-
Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
-
Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
MQ技术维度
- api发送与接收
- 高可用性
- 集群和容错配置
- 持久化
- 延时发送、定时投递
- 签收机制
MQ的作用
-
解耦:由于MQ的介入,避免了系统/模块之间的直接调用
-
存储:有些情况下处理数据的过程会失败,造成数据丢失,可使用消息中间件进行数据持久化;
-
扩展性:消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
-
削峰: 在访问量剧增的情况下,程序不会因为突发的超负荷请求而崩溃。
-
可恢复性: 当系统一部分组件失效时,不会影响到整个系,消息中间件降低了进程间的耦合度,所以即使 个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
-
顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持 定程度上的顺序性。
-
缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过 个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速 该缓冲层有助于控制和优化数据流经过系统的速度。
-
异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理
MQ的应用场景如下:
###
ActiveMQ
安装与启动
无论在windows或Linux系统下,ActiveMQ的安装都很简单
- 在官网下载对应系统的MQ压缩包:http://activemq.apache.org/activemq-5159-release.html
- 在合适的路径解压
- 在bin目录下执行activemq.bat,即可启动MQ;其默认后台端口为61616
- MQ控制台默认地址:http://localhost:8161/admin ,默认用户名与密码:admin/admin
SpringBoot集成ActiveMQ
-
引入相关jar包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
-
在配置文件中配置MQ连接
spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin #false:Queue true:Topic spring.jms.pub-sub-domain=false #定义队列名称 myqueue=waston_queue
-
自定义配置类,配置MQ的队列
package cn.waston.test_mq.config; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.jms.Queue; @Component @EnableJms public class ConfigBean { @Value("${myqueue}") private String myQueue; @Bean public Queue queue(){ return new ActiveMQQueue(myQueue); } }
-
创建消息生产者,并调用生产者的方法
@Component public class QueueProduce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"hello world"); } }
-
创建消息消费者(新建一个模块或项目)
@Component public class QueueConsumer { // 监听指定Queue的消息 @JmsListener(destination = "${myqueue}") public void receive(TextMessage textMessage) throws JMSException{ System.out.println("消费者收到消息:" + textMessage.getText()); } }
-
启动消费者模块,即可在控制台看到之前生产者向Queue中放入的消息;同时可以在ActiveMQ的控制台看该Queue里的消息被消费了
-
发布订阅Topic的使用
-
Topic发布者模块
配置文件:
server.port=8082 spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin #false:Queue true:Topic spring.jms.pub-sub-domain=true #定义Topic名称 myTopic=waston_topic
配置类:
@Component public class ConfigBean { @Value("${myTopic}") private String topicName; @Bean public Topic topic(){ return new ActiveMQTopic(topicName); } }
发送者类:
这里使用到了定时任务
@Component public class TopicProduce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic; @Scheduled(fixedRate = 3000) public void produceTopic(){ jmsMessagingTemplate.convertAndSend(topic, "hello topic"); } }
-
两个Topic订阅者模块
配置文件与发送者相同(端口号不同)
订阅者类:
@Component public class TopicConsumer { @JmsListener(destination = "${myTopic}") public void receive(TextMessage text) throws JMSException{ System.out.println("消费者收到Topic的消息:" + text.getText()); } }
-
先启动两个订阅者,再启动发送者;在两个订阅者的控制台都可以看见发送者所发送的消息;在ActiveMQ也可以看到消息的变化
-