消息中间件

消息中间件即MQ,其具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能 。是目前主流的异步RPC 实现手段之一,常见的消息中间件有:ActiveMQRabbitMQKafkaRocketMQ

MQ的组成

  • Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

  • Broker:消息服务器,作为server提供消息核心服务

  • Producer:消息生产者,负责生产消息传输给broker

  • Consumer:消息消费者,负责从broker获取消息并进行处理

  • Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播

  • Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收

    mq的组成

MQ技术维度

  • api发送与接收
  • 高可用性
  • 集群和容错配置
  • 持久化
  • 延时发送、定时投递
  • 签收机制

MQ的作用

  • 解耦:由于MQ的介入,避免了系统/模块之间的直接调用

  • 存储:有些情况下处理数据的过程会失败,造成数据丢失,可使用消息中间件进行数据持久化;

  • 扩展性:消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。

  • 削峰: 在访问量剧增的情况下,程序不会因为突发的超负荷请求而崩溃。

  • 可恢复性: 当系统一部分组件失效时,不会影响到整个系,消息中间件降低了进程间的耦合度,所以即使 个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。

  • 顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持 定程度上的顺序性。

  • 缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过 个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速 该缓冲层有助于控制和优化数据流经过系统的速度。

  • 异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理

    MQ的应用场景如下

    mq应用场景

###

ActiveMQ

安装与启动

无论在windows或Linux系统下,ActiveMQ的安装都很简单

  1. 在官网下载对应系统的MQ压缩包:http://activemq.apache.org/activemq-5159-release.html
  2. 在合适的路径解压
  3. 在bin目录下执行activemq.bat,即可启动MQ;其默认后台端口为61616
  4. MQ控制台默认地址:http://localhost:8161/admin ,默认用户名与密码:admin/admin

SpringBoot集成ActiveMQ

  1. 引入相关jar包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    
  2. 在配置文件中配置MQ连接

    spring.activemq.broker-url=tcp://127.0.0.1:61616
    spring.activemq.user=admin
    spring.activemq.password=admin
    #false:Queue  true:Topic
    spring.jms.pub-sub-domain=false 
    #定义队列名称
    myqueue=waston_queue
    
  3. 自定义配置类,配置MQ的队列

    package cn.waston.test_mq.config;
       
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    import javax.jms.Queue;
       
    @Component
    @EnableJms
    public class ConfigBean {
        @Value("${myqueue}")
        private String myQueue;
       
        @Bean
        public Queue queue(){
            return new ActiveMQQueue(myQueue);
        }
    }
    
  4. 创建消息生产者,并调用生产者的方法

    @Component
    public class QueueProduce {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
       
        @Autowired
        private Queue queue;
       
        public void produceMsg(){
            jmsMessagingTemplate.convertAndSend(queue,"hello world");
        }
    }
    
  5. 创建消息消费者(新建一个模块或项目)

    @Component
    public class QueueConsumer {
        // 监听指定Queue的消息
        @JmsListener(destination = "${myqueue}")
        public void receive(TextMessage textMessage) throws JMSException{
            System.out.println("消费者收到消息:" + textMessage.getText());
        }
    }
    
  6. 启动消费者模块,即可在控制台看到之前生产者向Queue中放入的消息;同时可以在ActiveMQ的控制台看该Queue里的消息被消费了

  7. 发布订阅Topic的使用

    1. Topic发布者模块

      配置文件:

      server.port=8082
      spring.activemq.broker-url=tcp://127.0.0.1:61616
      spring.activemq.user=admin
      spring.activemq.password=admin
      #false:Queue  true:Topic
      spring.jms.pub-sub-domain=true
      #定义Topic名称
      myTopic=waston_topic
      

      配置类:

      @Component
      public class ConfigBean {
          @Value("${myTopic}")
          private String topicName;
            
          @Bean
          public Topic topic(){
              return new ActiveMQTopic(topicName);
          }
      }
      

      发送者类:

      这里使用到了定时任务

      @Component
      public class TopicProduce {
          @Autowired
          private JmsMessagingTemplate jmsMessagingTemplate;
          @Autowired
          private Topic topic;
          @Scheduled(fixedRate = 3000)
          public void produceTopic(){
              jmsMessagingTemplate.convertAndSend(topic, "hello topic");
          }
      }
      
    2. 两个Topic订阅者模块

      配置文件与发送者相同(端口号不同)

      订阅者类:

      @Component
      public class TopicConsumer {
          @JmsListener(destination = "${myTopic}")
          public void receive(TextMessage text) throws JMSException{
              System.out.println("消费者收到Topic的消息:" + text.getText());
          }
      }
      
    3. 先启动两个订阅者,再启动发送者;在两个订阅者的控制台都可以看见发送者所发送的消息;在ActiveMQ也可以看到消息的变化