学习这个的契机是在做练手项目的时候用到了兔子(RabbitMQ),它遵循 AMQP 协议,属于消息中间件实现的一种,既然这样就来看看什么是消息中间件。
正好看到慕课网有相关的课程就顺便学习下吧,为什么使用消息中间件?,比较常用的是 RabbitMQ、RocketMQ、ActiveMQ、Kafka。
解耦、异步、横向扩展、安全可靠、顺序保证,这些还不够么,回想 Rabbit 官网的那几幅图吧
如果忘记了就再看看吧:飞机
关于JMS
Java消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。
JSM 的组成有:
- JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
- JMS客户
生产或消费消息的基于Java的应用程序或对象。
- JMS生产者
创建并发送消息的JMS客户。
- JMS消费者
接收消息的JMS客户。
- JMS消息
包括可以在JMS客户之间传递的数据的对象
- JMS队列
一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题
一种支持发送消息给多个订阅者的机制。
Java 消息服务应用程序结构支持两种模型:
- 点对点或队列模型
包含生产者和消费者,队列中的消息只能被一个消费者消费,消费者可以随时消费消息
每一个连接都依次平均分担消息队列中的消息(即使一个应用建立了两个连接) - 发布/订阅模型
包括发布者和订阅者,主题中的消息会被所有的订阅者消费,消费者不能消费在订阅之前的消息
每一个连接都会收到主题中完整的消息
关于架构等详细信息可参考 wiki :
https://zh.wikipedia.org/wiki/Java%E6%B6%88%E6%81%AF%E6%9C%8D%E5%8A%A1
中间件&AMQP
中间件(英语:Middleware),是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务、面向服务的体系结构等中应用比较广泛。
如数据库、Apache的Tomcat,IBM公司的WebSphere,BEA公司的WebLogic应用服务器,东方通公司的Tong系列中间件,以及Kingdee公司的等都属于中间件。
或者简单说就是:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给用户带来价值的软件统称为中间件
高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。
作为线路层协议,而不是 API(例如 JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用。
注意:JMS 是规范(针对 Java),AMQP 是协议
然后来张图来比较,JMS 和 AMQP:
然后是市面上常见的一些 MQ 方案:
ActiveMQ使用
导入依赖就不用多说了,下面的 Java 代码中的使用(以主题订阅模式为例),先来消息发布者:
1 | private static final String URL = "tcp://127.0.0.1:61616"; |
然后是消息订阅者,在此模式下订阅之前发的消息是没法接收到的:
1 | private static final String URL = "tcp://127.0.0.1:61616"; |
并且接收消息的过程是异步的,所以不要马上 close;
完整的代码见 GitHub
关于集群
集群总的来说就是为了实现高可用和负载均衡,以 ActiveMQ 为例,集群不只是在服务器端配置,客户端也需要支持;在ActiveMQ 中提供了失效转移的支持,URL 类似于:failover:(tcp://127.0.0.1:61617,tcp://192.168.1.11:61617)?randomize=true
在服务器方面,想要实现负载均衡就要保证服务器之间的消息同步,采用的是 “网络连接器”的方式,用于服务器的透传消息,分为静态连接器和动态连接器(用网址来代替写死的 IP 地址)。
对于高可用服务器的方案一般有两种:
- 共享存储集群
采用集群共享一份持久化数据(使用 NAS 或者 JDBC),服务器获取排它锁来独占资源的方式,当此服务器宕机时会释放(配合客户的的失效转移机制)备用服务器会获取锁成为新的 Master - 基于复制的 LeveIDB Store
至少需要三台来保证稳定性,还用到了 ZooKeeper(ZK 本身也需要三台来保证自己的稳定性)
也就是 Master 由 ZK 来选举,然后通过 ZK 来实现各服务器之间的消息同步
他们能达到高可用,但是实现不了负载均衡,想要同时实现就需要进行一些改造。
还有一些其他的问题解决方案:
实现每个系统消费各自的消息可以使用 ActiveMQ 提供的虚拟主题功能;
解决消息发送的一致性问题可以使用 JMS 中的 XA 系列接口;
解决幂等性的问题,方案和上面一样,使用本地事务或者内存日志
JMS 中的 XA 协议常用于分布式事务,因为效率较低所以不太使用,或者还可以使用本地事务、内存日志解决(都要配合消息补偿机制)
解决这些问题一般分段考虑比较好。幂等性就是指处理一次和多次的消息最终的效果是一样的。
HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用
为了解决代码过于复杂和复用,可以使用“基于消息机制的事件总线”,简单说 EDA (事件驱动架构)就是:有事你叫我,没事别烦我,这样一般就需要先在事件总线上注册,事件总线一般还需要包含消息提供者(各种 MQ 的实现)。
可以尝试面向服务的架构
Spring集成
主要涉及的有:
ConnectionFactory:用于管理连接的连接工厂;Spring 提供了两个具体的类,SingleConnectionFactory 和 CachingConnectionFactory ,后者是带有缓存功能的。
JmsTemplate:用于发送和接收消息的模板类,它是线程安全的
MessageListerner:消息监听器
使用时记得引入 Spring-jms 依赖,具体的相关依赖有:
1 | <!-- Java JMS 原生API --> |
接下来就是在 Spring 的配置文件中进行配置 Bean 了:
1 | <!-- 配置连接ActiveMQ的ConnectionFactory --> |
消费者和生产者的配置最好是分开来放,可以抽取相同的配置到独立的文件再利用 include 导入
Java 代码的使用,也分为两个角色,写在一起了:
1 | // 接收消息,记得配置到 IOC 容器中 |
实际使用时记得把它们给分开,更改模式只需要在 Destination 注入的时候选择合适的模式即可,其他的地方不需要修改
更多内容:
https://my.oschina.net/thinwonton/blog/889805
http://www.cnblogs.com/jaycekon/p/ActiveMq.html
SB集成
SpringBoot 必定是主流,所以与 SB 的集成必定要了解,并且会更加的简单。首先增加依赖:
1 | <dependency> |
然后在主配置文件中进行一些配置,这里只写几个最基本的:
1 | spring.activemq.broker-url=tcp://localhost:61616 |
之后就可以使用 SB 提供的 JmsMessagingTemplate 进行简单的 MQ 相关操作了,不放心的话可以手动开启 @EnableJms。
下面是一些简单的测试用例:
1 | (SpringRunner.class) |
那么,如果我们想要同时支持两种模式呢,单从配置已经没法改了,这时候需要我们自定义工厂:
1 |
|
上面的配置列举了一些常见的配置,不涉及集群,如果需要 SC 集成,可以参考这篇文章
关于 JmsTemplate 的性能问题参考:使用Spring/Spring Boot集成JMS的陷阱
RabbitMQ
基于 AMQP 所以功能是绝对够用,这里就只说与 SB 的集成,也是主流了。
步骤也都是类似的,加依赖,写配置:
1 | <dependency> |
主要的配置:
1 | # rabbitmq配置 |
下面就是简单的一些代码示例:
1 |
|
以上,都是最基本的使用,不涉及集群相关,RocketMQ 也是类似,使用的是 RocketMQTemplate。
评论框加载失败,无法访问 Disqus
你可能需要魔法上网~~