Java消息中间件

学习这个的契机是在做练手项目的时候用到了兔子(RabbitMQ),它遵循 AMQP 协议,属于消息中间件实现的一种,既然这样就来看看什么是消息中间件。
正好看到慕课网有相关的课程就顺便学习下吧,为什么使用消息中间件?,比较常用的是 RabbitMQ、RocketMQ、ActiveMQ、Kafka。
解耦、异步、横向扩展、安全可靠、顺序保证,这些还不够么,回想 Rabbit 官网的那几幅图吧
如果忘记了就再看看吧:飞机

关于JMS

Java消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

JSM 的组成有:

  • JMS提供者
    连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
  • JMS客户
    生产或消费消息的基于Java的应用程序或对象。
  • JMS生产者
    创建并发送消息的JMS客户。
  • JMS消费者
    接收消息的JMS客户。
  • JMS消息
    包括可以在JMS客户之间传递的数据的对象
  • JMS队列
    一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。
  • JMS主题

    一种支持发送消息给多个订阅者的机制。

Java 消息服务应用程序结构支持两种模型:

  1. 点对点或队列模型
    包含生产者和消费者,队列中的消息只能被一个消费者消费,消费者可以随时消费消息
    每一个连接都依次平均分担消息队列中的消息(即使一个应用建立了两个连接)
  2. 发布/订阅模型
    包括发布者和订阅者,主题中的消息会被所有的订阅者消费,消费者不能消费在订阅之前的消息
    每一个连接都会收到主题中完整的消息

关于架构等详细信息可参考 wiki :
https://zh.wikipedia.org/wiki/Java%E6%B6%88%E6%81%AF%E6%9C%8D%E5%8A%A1

中间件&AMQP

中间件(英语:Middleware),是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务、面向服务的体系结构等中应用比较广泛。
如数据库、Apache的Tomcat,IBM公司的WebSphere,BEA公司的WebLogic应用服务器,东方通公司的Tong系列中间件,以及Kingdee公司的等都属于中间件。
或者简单说就是:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给用户带来价值的软件统称为中间件


高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。
作为线路层协议,而不是 API(例如 JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用。

注意:JMS 是规范(针对 Java),AMQP 是协议

然后来张图来比较,JMS 和 AMQP:

然后是市面上常见的一些 MQ 方案:
MQ.png

ActiveMQ使用

导入依赖就不用多说了,下面的 Java 代码中的使用(以主题订阅模式为例),先来消息发布者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final String URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {
// 1.创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建连接
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话
// 第一个参数是是否在事务中,第二个是自动提交
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标(只需要修改这里)
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

// 7.创建消息/发送消息
TextMessage message = session.createTextMessage("testMessage");
producer.send(message);

// 8.关闭连接
connection.close();
}

然后是消息订阅者,在此模式下订阅之前发的消息是没法接收到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final String URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {
// 1.创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建连接
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话
// 第一个参数是是否在事务中,第二个是自动提交
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);

// 7.创建监听器(异步监听)
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println(msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

// 9.关闭连接(监听器异步,应该在程序退出时关闭)
// connection.close();
}

并且接收消息的过程是异步的,所以不要马上 close;
完整的代码见 GitHub

关于集群

集群总的来说就是为了实现高可用和负载均衡,以 ActiveMQ 为例,集群不只是在服务器端配置,客户端也需要支持;在ActiveMQ 中提供了失效转移的支持,URL 类似于:
failover:(tcp://127.0.0.1:61617,tcp://192.168.1.11:61617)?randomize=true
在服务器方面,想要实现负载均衡就要保证服务器之间的消息同步,采用的是 “网络连接器”的方式,用于服务器的透传消息,分为静态连接器和动态连接器(用网址来代替写死的 IP 地址)。
对于高可用服务器的方案一般有两种:

  • 共享存储集群
    采用集群共享一份持久化数据(使用 NAS 或者 JDBC),服务器获取排它锁来独占资源的方式,当此服务器宕机时会释放(配合客户的的失效转移机制)备用服务器会获取锁成为新的 Master
  • 基于复制的 LeveIDB Store
    至少需要三台来保证稳定性,还用到了 ZooKeeper(ZK 本身也需要三台来保证自己的稳定性)
    也就是 Master 由 ZK 来选举,然后通过 ZK 来实现各服务器之间的消息同步

他们能达到高可用,但是实现不了负载均衡,想要同时实现就需要进行一些改造。
还有一些其他的问题解决方案:
实现每个系统消费各自的消息可以使用 ActiveMQ 提供的虚拟主题功能;
解决消息发送的一致性问题可以使用 JMS 中的 XA 系列接口;
解决幂等性的问题,方案和上面一样,使用本地事务或者内存日志

JMS 中的 XA 协议常用于分布式事务,因为效率较低所以不太使用,或者还可以使用本地事务、内存日志解决(都要配合消息补偿机制)
解决这些问题一般分段考虑比较好。

幂等性就是指处理一次和多次的消息最终的效果是一样的。
HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用

为了解决代码过于复杂和复用,可以使用“基于消息机制的事件总线”,简单说 EDA (事件驱动架构)就是:有事你叫我,没事别烦我,这样一般就需要先在事件总线上注册,事件总线一般还需要包含消息提供者(各种 MQ 的实现)。
可以尝试面向服务的架构

Spring集成

主要涉及的有:
ConnectionFactory:用于管理连接的连接工厂;Spring 提供了两个具体的类,SingleConnectionFactory 和 CachingConnectionFactory ,后者是带有缓存功能的。
JmsTemplate:用于发送和接收消息的模板类,它是线程安全的
MessageListerner:消息监听器
使用时记得引入 Spring-jms 依赖,具体的相关依赖有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- Java JMS 原生API -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
</dependency>
<!-- spring-jms API -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- active-mq核心包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>

接下来就是在 Spring 的配置文件中进行配置 Bean 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- 配置连接ActiveMQ的ConnectionFactory -->
<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616"/>

<!--为了提高效率,配置一个连接池-->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="amqConnectionFactory"
p:sessionCacheSize="10"/>

<!--配置队列-->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${queue.name}"/>
</bean>

<!--配置主题-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${topic.name}"/>
</bean>

<!-- **************配置消息生产者************* -->
<!--点对点模型-->
<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<!--消息持久-->
<property name="deliveryPersistent" value="true"/>
<property name="defaultDestination" ref="destination"/>
<property name="pubSubDomain" value="false"/>
</bean>

<!--发布/订阅模型-->
<bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<!--消息持久-->
<property name="deliveryPersistent" value="true"/>
<!--目的地-->
<property name="defaultDestination" ref="destinationTopic"/>
<!--订阅模型 -->
<property name="pubSubDomain" value="true"/>
</bean>

<!-- **************配置消息消费者************* -->
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="com.bfchengnuo.Filter.QueueMessageListener" />

<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>

消费者和生产者的配置最好是分开来放,可以抽取相同的配置到独立的文件再利用 include 导入
Java 代码的使用,也分为两个角色,写在一起了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 接收消息,记得配置到 IOC 容器中
public class MyQueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("MyQueueMessageListener收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

// 发送消息 Destination 已注入
public void sendMessage(final String msg){
// String destination = jmsTemplate.getDefaultDestinationName();
System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息--->"+msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}

实际使用时记得把它们给分开,更改模式只需要在 Destination 注入的时候选择合适的模式即可,其他的地方不需要修改

更多内容:
https://my.oschina.net/thinwonton/blog/889805
http://www.cnblogs.com/jaycekon/p/ActiveMq.html

SB集成

SpringBoot 必定是主流,所以与 SB 的集成必定要了解,并且会更加的简单。首先增加依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

然后在主配置文件中进行一些配置,这里只写几个最基本的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

# 消息模式 true:广播(Topic),false:队列(Queue),默认时false
#spring.jms.pub-sub-domain=true

# 其他可选配置
#是否启用内存模式(也就是不安装MQ,也可以使用MQ功能)
spring.activemq.in-memory=false
#ActiveMQ连接池是否启用
spring.activemq.pool.enabled=true
#ActiveMQ连接池最大连接数
spring.activemq.pool.max-connections=5
#ActiveMQ连接池连接空闲时间,默认为30秒
spring.activemq.pool.idle-timeout=30000

之后就可以使用 SB 提供的 JmsMessagingTemplate 进行简单的 MQ 相关操作了,不放心的话可以手动开启 @EnableJms。
下面是一些简单的测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

/**
* 消息发送/生产
*/
@Test
public void testQueueMsg(){
// 创建名称为zyQueue的队列
// 广播模式使用 ActiveMQTopic 对象,此时所有监听者都能获得每一个生产的对象
Queue queue = new ActiveMQQueue("zyQueue");
// 向队列发送消息
jmsMessagingTemplate.convertAndSend(queue,"这是一个队列消息!");
}
}


/**
* 消息消费方
*/
@Component
public class Consumer {
private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");

/**
* destination 目标地址即队列
*/
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}

/**
* 多个消费者,默认会平均消息消费(轮训)
*/
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("q2-接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
}

那么,如果我们想要同时支持两种模式呢,单从配置已经没法改了,这时候需要我们自定义工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@SpringBootConfiguration
public class ActiveMqConfig {

@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置消息模型为队列
factory.setPubSubDomain(false);
return factory;
}

@Bean("topicListenerFactory")
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置消息模型为队列
factory.setPubSubDomain(true);
return factory;
}

/**
* 之后我们使用的时候指定对应的 containerFactory 即可
*/
@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
public void receiveMessage(String text){
System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}

/**
* 设置存入mq的数据格式为json
*/
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}

上面的配置列举了一些常见的配置,不涉及集群,如果需要 SC 集成,可以参考这篇文章
关于 JmsTemplate 的性能问题参考:使用Spring/Spring Boot集成JMS的陷阱

RabbitMQ

基于 AMQP 所以功能是绝对够用,这里就只说与 SB 的集成,也是主流了。
步骤也都是类似的,加依赖,写配置:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

主要的配置:

1
2
3
4
5
6
7
8
# rabbitmq配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /

下面就是简单的一些代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@Configuration
public class RabbitConfig {

/**
* 普通队列模式
* 定义demoQueue队列
*/
@Bean
public Queue demoString() {
return new Queue("demoQueue");
}

//=================== fanout广播模式 ====================

@Bean
public Queue fanoutA() {
return new Queue("fanout.a");
}
@Bean
public Queue fanoutB() {
return new Queue("fanout.b");
}
@Bean
public Queue fanoutC() {
return new Queue("fanout.c");
}

/**
* 定义个fanout交换器
*/
@Bean
FanoutExchange fanoutExchange() {
// 定义一个名为 fanoutExchange 的 fanout 交换器
return new FanoutExchange("fanoutExchange");
}

/**
* 将定义的fanoutA队列与fanoutExchange交换机绑定
*/
@Bean
public Binding bindingExchangeWithA() {
return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
}
/**
* 将定义的fanoutB队列与fanoutExchange交换机绑定
*/
@Bean
public Binding bindingExchangeWithB() {
return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
}
/**
* 将定义的fanoutC队列与fanoutExchange交换机绑定
*/
@Bean
public Binding bindingExchangeWithC() {
return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
}

//=================== topic主题模式 ====================

@Bean
public Queue topiocA() {
return new Queue("topic.a");
}
@Bean
public Queue topicB() {
return new Queue("topic.b");
}
@Bean
public Queue topicC() {
return new Queue("topic.c");
}

/**
* 定义个topic交换器
*/
@Bean
TopicExchange topicExchange() {
// 定义一个名为fanoutExchange的fanout交换器
return new TopicExchange("topicExchange");

// 另一种定义
// durable(true) 表面重启之后交换机还在
// return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
/**
* 将定义的topicA队列与topicExchange交换机绑定
*/
@Bean
public Binding bindingTopicExchangeWithA() {
// return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg");
}
/**
* 将定义的topicB队列与topicExchange交换机绑定
*/
@Bean
public Binding bindingTopicExchangeWithB() {
return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#");
}
/**
* 将定义的topicC队列与topicExchange交换机绑定
*/
@Bean
public Binding bindingTopicExchangeWithC() {
return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z");
}
}

/**
* 消息生产者
*/
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;

public void sendDemoQueue() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
System.out.println("[demoQueue] send msg: " + dateString);
// 第一个参数为刚刚定义的队列名称
this.rabbitTemplate.convertAndSend("demoQueue", dateString);

// 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey 我们不用管空着就可以,第三个是你要发送的消息
this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString);
// 这条信息将会被 topic.a topic.b接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString);
// 这条信息将会被topic.b接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString);
// 这条信息将会被topic.b、topic.c接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString);
}
}

@Component
@RabbitListener(queues = "demoQueue")
public class DemoQueueConsumer {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String msg) {
System.out.println("[demoQueue] recieved message: " + msg);
}
}

@Component
@RabbitListener(queues = "fanout.a")
public class FanoutAConsumer {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String msg) {
System.out.println("[fanout.a] recieved message: " + msg);
}
}

@Component
@RabbitListener(queues = "topic.b")
public class TopicBConsumer {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String msg) {
System.out.println("[topic.b] recieved message:" + msg);
}
}

/**
* 另一种消费模式
*/
@Component
public class ReceiveHandler {
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receiveEmail(String msg, Message message, Channel channel){
System.out.println(msg);
}

@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receiveSms(String msg, Message message, Channel channel){
System.out.println(msg);
}
}

以上,都是最基本的使用,不涉及集群相关,RocketMQ 也是类似,使用的是 RocketMQTemplate。

喜欢就请我吃包辣条吧!

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~