WebFlux基础

响应式(或者叫反应式)的异步非阻塞编程模式,大概率是未来的主流,函数式编程是基础(所以要求 Java8+),可以看作是观察者模式(或者说生产者消费者模式)的延伸。
Spring 5 中最重要改动是把反应式编程的思想应用到了框架的各个方面,Spring 5 的反应式编程以 Reactor 库为基础,之前我其实已经过了把瘾(然后发现忘的差不多了,所以来复盘),毕竟 Spring5 也已经出来很久了,更不要说 RxJava,不过也仅仅是尝鲜,这一篇整理下相关基础知识入个门。
开发人员可以使用 WebFlux 创建高性能的 Web 应用和客户端(包括其中的 HTTP、服务器推送事件和 WebSocket 支持)。
在 SpringBoot2 中也跟进了 WebFlux 的支持,默认使用 Netty,也可以切换 Servlet3.0+ 的容器。
反应式编程主要解决的是吞吐量的问题(或者说内存、线程压力),而不是速度问题,一定要搞清楚这一点,意味着使用相同的资源可以处理更加多的请求。
随着网络应用和微服务的不断发展,高并发、高吞吐量的特性越来越吸引人,Node、Go 显示出了强劲的竞争力,Java 需要更新自己来适应潮流,拥有了 WebFlux (或者说 Reactive Programming)就有了一战的底气。

比较尴尬的是例如 JDBC 等配套设施还没有太多的跟进,所以目前来看用的还不算多,不过这些问题迟早会解决。

WebFlux 简介

WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。
该模块中包含了对反应式 HTTP、服务器推送事件和 WebSocket 的客户端和服务器端的支持。对于开发人员来说,比较重要的是服务器端的开发。
在服务器端,WebFlux 支持两种不同的编程模型:

  1. Spring MVC 中使用的基于 Java 注解的方式;
  2. 基于 Java 8 的 lambda 表达式的函数式编程模型。

这两种编程模型只是在代码编写方式上存在不同。它们运行在同样的反应式底层架构之上,因此在运行时是相同的

WebFlux 需要底层提供运行时的支持,WebFlux 可以运行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他异步运行时环境,如 Netty 和 Undertow。

对标 JSR-315 和 JSR-340,分别对应 Servlet 规范的 3.0 和 3.1
3.0 提供了异步化;而 3.1 提供了非阻塞。

最方便的构建 WebFlux 应用的方式是使用 SpringBoot 的初始化器,选择 Reactive Web 依赖。

Reactor简介

Reactor 是一个基础库,可用它构建时效性流式数据应用,或者有低延迟容错性要求的微/纳/皮级服务。
简单说,Reactor 是一个轻量级 JVM 基础库,帮助你的服务或应用高效,异步地传递消息。
Reactor 仅仅致力于解决异步和函数调用问题。和 Spring 天然无缝整合(毕竟 Reactor 框架是 Pivotal 公司开发的,实现了 Reactive Programming 思想)。

“高效”是指什么?

  • 消息从 A 传递到 B 时,产生很少的内存垃圾,甚至不产生。
  • 解决消费者处理消息的效率低于生产者时带来的溢出问题。
  • 尽可能提供非阻塞异步流

PS:Spring 5 其最大的意义就是能将反应式编程技术(它就是常见的观察者模式的一种延伸)的普及向前推进一大步。而作为在背后支持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。

从经验可知(主要是 rage 和 drunk 的推特),异步编程很难,而像 JVM 这类提供众多可选参数的平台则尤其困难。
Reactor 旨在帮助大多数用例真正非阻塞地运行。提供的 API 比 JDK 的 JUC 库低级原语更高效。Reactor 提供了下列功能的替代函数 (并建议不使用 JDK 原生语句):

  • 阻塞等待: 如 Future.get()
  • 不安全的数据访问: 如 ReentrantLock.lock()
  • 异常冒泡: 如 try…catch…finally
  • 同步阻塞: 如 synchronized{}
  • Wrapper 分配(GC 压力): 如 new Wrapper(event)

当消息传递效率成为系统性能瓶颈的时候(10k msg/s,100k msg/s,1M…),非阻塞机制就显得尤为重要。例如看下面的一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ExecutorService  threadPool = Executors.newFixedThreadPool(8);
final List<T> batches = new ArrayList<T>();

Callable<T> t = new Callable<T>() { // *1
public T run() {
synchronized(batches) { // *2
T result = callDatabase(msg); // *3
batches.add(result);
return result;
}
}
};

Future<T> f = threadPool.submit(t); // *4
T result = f.get() // *5

注释中标注的几点:

  1. Callable 分配:可能导致 GC 压力。
  2. 同步过程强制每个线程执行停:检查操作。
  3. 消息的消费可能比生产慢。
  4. 使用线程池(ThreadPool)将任务传递给目标线程:通过 FutureTask 方式肯定会产生 GC 压力。
  5. 阻塞直至 callDatabase() 回调。

在这个简单的例子中,存在的显著问题有:

  • 分配对象可能产生 GC 压力,特别是当任务运行时间过长。 每次 GC 暂停都会影响全局性能。
  • 默认的队列是无界的,任务会因为数据库调用而堆积。 积压虽然不会直接导致内存泄漏,但会带来严重副作用:GC 暂停时要扫描更多的对象;有丢失重要数据位的风险;等等 … 典型链式队列节点分配时会产生大量内存压力。
  • 阻塞回调容易产生恶性循环。 阻塞回调会降低消息生产者的效率。在实践中,任务提交后需要等待结果返回,此时流式过程几乎演变为同步的了。 会话过程抛出的任何带数据存储的异常都会以不受控的方式被传递给生产者,否定了任何通常在线程边界附近可用的容错性。

要实现完全非阻塞是很难办到的,尤其是在有着类似微服务架构这样时髦绰号的分布式系统的世界里。因此 Reactor 做了部分妥协,尝试利用最优的可用模式,使开发者觉得他们是在写异步纳米服务,而不是什么数学论文。
到了某个阶段,延迟是每一个系统到都要面对的实实在在的问题。为此:

Reactor 提供的框架可以帮助减轻应用中由延迟产生的副作用,只需要增加一点点开销:

  • 使用了一些聪明的结构,通过启动预分配策略解决运行时分配问题
  • 通过确定信息传递主结构的边界,避免任务的无限堆叠;
  • 采用主流的响应与事件驱动构架模式,提供包含反馈在内的非阻塞端对端流
  • 引入新的 Reactive Streams 标准,拒绝超过当前容量请求,从而保证限制结构的有效性;
  • 在 IPC 上也使用了类似理念,提供对流控制友好的非阻塞 IO 驱动
  • 开放了帮助开发者们以零副作用方式组织他们代码的函数接口,借助这些函数来处理容错性和线程安全。

为实现异步目标,响应式技术和 Reactor 模块该如何搭配:

  • Spring XD + Reactor-Net (Core/Stream): 使用 Reactor 作为 Sink/Source IO 驱动。
  • Grails | Spring + Reactor-Stream (Core): 用 Stream 和 Promise 做后台处理。
  • Spring Data + Reactor-Bus (Core): 发射数据库事件 (保存/删除/…)。
  • Spring Integration Java DSL + Reactor Stream (Core): Spring 集成的微批量信息通道。
  • RxJavaReactiveStreams + RxJava + Reactor-Core: 融合富结构与高效异步 IO 处理
  • RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream): 用 RxJava 做数据输入,异步 IO 驱动做传输。

Reactor 核心含有如下特性:

  • 通用 IO & 函数式类型,一些 Java 8 接口的反向移植函数,提供者,消费者,谓词,双向消费者,双向函数
  • 元组
  • 资源池、暂停器、定时器
  • 缓冲器,编解码和少量预定义的编解码器
  • 环境上下文
  • 调度者约定和几个预定义调度者
  • 预定义响应式数据流处理者

Reactor-核心自身可替代其它消息传递机制,完成时序任务调度,或者帮你将代码组织为函数块,实现 Java 8 的反向移植接口。这种拆分便于同其他的响应式库配合使用,而没耐心的开发者也不用再去费劲弄懂环形缓冲区了。

反应式编程

反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区中比较流行的是 RxJava 和 RxJava 2。Spring5 中使用的是另外一个新的反应式编程库 Reactor。

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。
其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。
在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

反应式编程来源于数据流和变化的传播,举个例子:比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。
反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。


在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式:每次由调用者通过 next()方法来获取序列中的下一个值。
使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式:当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。
在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。


反应式流中第一个重要概念是负压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。
负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request() 方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request() 方法调用。这实际上变成了推拉结合的模式。

Flux和Mono

Flux 和 Mono 是 Reactor 中的两个基本概念(Java9 中也看到了类似的对象,可以理解为 Reactor = JDK8 Stream + JDK9 Reactive Stream)。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:

  • 正常的包含元素的消息
  • 序列结束的消息
  • 序列出错的消息

当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()onError() 会被调用。
Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long> 对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表 0 到 1 个元素的发布者。
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表 0 到 N 个元素的发表者。
  • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。
主要接口有:Publisher、Subscriber、Subcription。

直接消费的 Mono 或 Flux 的方式就是调用 subscribe 方法。如果在 Web Flux 接口中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出工作。

异步并不代表并行,如果需要并行,使用 zip 方法完成。使用反应式,任何环节都需避免阻塞。对于客户端是透明的。

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

使用Reactor

创建 Flux,Reactor 提供了一系列的静态方法来创建 Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
Flux.just("Hello", "World").subscribe(System.out::println);

// (还有 fromIterable 和 fromStream)可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

// 创建一个不包含任何元素,只发布结束消息的序列。
// 此外还有 error 和 never
Flux.empty().subscribe(System.out::println);

// 创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
Flux.range(1, 10).subscribe(System.out::println);

// 创建一个包含了从 0 开始递增的 Long 对象的序列。
// 其中包含的元素按照指定的间隔来发布。
// 除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

// 与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
Flux.intervalMillis(1000).subscribe(System.out::println);

上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate()create() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// generate 方式
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);

final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);

// create 方式
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);

Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error() 和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法。

1
2
3
4
5
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
// 从一个 Optional 对象或可能为 null 的对象中创建 Mono。
// 只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。
例如 buffer 和 bufferTimeout 这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。
还有 filter 、take、reduce 和 reduceWith、merge 和 mergeSequential、flatMap 和 flatMapSequential、消息处理、调度器相关的方法,这方面其实有很多内容,但是没细看,估计短时间内接触不到,有个印象等用的时候知道有这么个东西然后再查 API 好了。

使用WebFlux

使用 WebFlux 与 Spring MVC 的不同在于,WebFlux 所使用的类型是与反应式编程相关的 Flux 和 Mono 等,而不是简单的对象。对于简单的 Hello World 示例来说,这两者之间并没有什么太大的差别。对于复杂的应用来说,反应式编程和负压的优势会体现出来,可以带来整体的性能的提升。

吞吐量为何会大幅提升?因为使用 WebFlux 后,容器的线程不会被阻塞,只会给业务代码一个回调函数(asyncContext.complete()),业务代码处理完了再通知我!这样就可以使用少量的线程处理更加高的请求,从而实现高吞吐量(结合负压不会造成过高的处理压力)。

类中的方法都以 Flux 或 Mono 对象作为返回值,这也是 WebFlux 应用的特征。Flux 类型的参数表示的是有多个对象需要处理。可以使用 doOnNext() 来对其中的每个对象进行处理。
除了服务器端实现之外,WebFlux 也提供了反应式客户端,可以访问 HTTP、SSE 和 WebSocket 服务器端。分别对应:Web 的 HTTP、SSE、WebSocket,这里不再多说。

这里不贴代码了,参考我 Github 的这个模块。

服务器推送事件

服务器推送事件(Server-Sent Events,SSE)允许服务器端不断地推送数据到客户端
相对于 WebSocket 而言,服务器推送事件只支持服务器端到客户端的单向数据传递。虽然功能较弱,但优势在于 SSE 在已有的 HTTP 协议上使用简单易懂的文本格式来表示传输的数据
作为 W3C 的推荐规范,SSE 在浏览器端的支持也比较广泛,除了 IE 之外的其他浏览器都提供了支持。在 IE 上也可以使用 polyfill 库来提供支持。
在服务器端来说,SSE 是一个不断产生新数据的流,非常适合于用反应式流来表示。在 WebFlux 中创建 SSE 的服务器端是非常简单的。只需要返回的对象的类型是 Flux<ServerSentEvent>,就会被自动按照 SSE 规范要求的格式来发送响应,或者指定 MediaType。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/sse")
public class SseController {

@GetMapping("/randomNumbers")
public Flux<ServerSentEvent<Integer>> randomNumbers() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt()))
.map(data -> ServerSentEvent.<Integer>builder()
.event("random")
.id(Long.toString(data.getT1()))
.data(data.getT2())
.build());
}

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamAll() {
return userRepository.findAll();
}
}

SseController 是一个使用 SSE 的控制器的示例。其中的方法 randomNumbers() 表示的是每隔一秒产生一个随机数的 SSE 端点。我们可以使用类 ServerSentEvent.Builder 来创建 ServerSentEvent 对象。这里我们指定了事件名称 random,以及每个事件的标识符和数据。事件的标识符是一个递增的整数,而数据则是产生的随机数。

PS:我记得在我写的 SB2.x 的初尝试那篇文章中关于这个有个小例子。

WebSocket

WebSocket 支持客户端与服务器端的双向通讯。当客户端与服务器端之间的交互方式比较复杂时,可以使用 WebSocket。
WebSocket 在主流的浏览器上都得到了支持。WebFlux 也对创建 WebSocket 服务器端提供了支持。在服务器端,我们需要实现接口 org.springframework.web.reactive.socket.WebSocketHandler 来处理 WebSocket 通讯。接口 WebSocketHandler 的方法 handle 的参数是接口 WebSocketSession 的对象,可以用来获取客户端信息、接送消息和发送消息。

1
2
3
4
5
6
7
8
9
@Component
public class EchoHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(final WebSocketSession session) {
return session.send(
session.receive()
.map(msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText())));
}
}

EchoHandler 对于每个接收的消息,会发送一个添加了 “ECHO -> “ 前缀的响应消息。WebSocketSession 的 receive 方法的返回值是一个 Flux<WebSocketMessage> 对象,表示的是接收到的消息流。而 send 方法的参数是一个 Publisher<WebSocketMessage> 对象,表示要发送的消息流。在 handle 方法,使用 map 操作对 receive 方法得到的 Flux<WebSocketMessage> 中包含的消息继续处理,然后直接由 send 方法来发送。

在创建了 WebSocket 的处理器 EchoHandler 之后,下一步需要把它注册到 WebFlux 中。我们首先需要创建一个类 WebSocketHandlerAdapter 的对象,该对象负责把 WebSocketHandler 关联到 WebFlux 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class WebSocketConfiguration {
@Autowired
@Bean
public HandlerMapping webSocketMapping(final EchoHandler echoHandler) {
final Map<String, WebSocketHandler> map = new HashMap<>(1);
map.put("/echo", echoHandler);

final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

其中的 HandlerMapping 类型的 bean 把 EchoHandler 映射到路径 /echo

参考代码:Github

函数式编程模型

WebFlux 还支持基于 lambda 表达式的函数式编程模型。与基于 Java 注解的编程模型相比,函数式编程模型的抽象层次更低,代码编写更灵活,可以满足一些对动态性要求更高的场景。不过在编写时的代码复杂度也较高,学习曲线也较陡。开发人员可以根据实际的需要来选择合适的编程模型。目前 Spring Boot 不支持在一个应用中同时使用两种不同的编程模式。
在函数式编程模型中,每个请求是由一个函数来处理的, 通过接口 org.springframework.web.reactive.function.server.HandlerFunction 来表示。
HandlerFunction 是一个函数式接口,其中只有一个方法 Mono<T extends ServerResponse> handle(ServerRequest request),因此可以用 labmda 表达式来实现该接口。
接口 ServerRequest 表示的是一个 HTTP 请求。通过该接口可以获取到请求的相关信息,如请求路径、HTTP 头、查询参数和请求内容等。方法 handle 的返回值是一个 Mono<T extends ServerResponse> 对象。
接口 ServerResponse 用来表示 HTTP 响应。ServerResponse 中包含了很多静态方法来创建不同 HTTP 状态码的响应对象。
下面是一个简单的计算器实现来展示函数式编程模型的用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class CalculatorHandler {
public Mono<ServerResponse> add(final ServerRequest request) {
return calculate(request, (v1, v2) -> v1 + v2);
}

public Mono<ServerResponse> subtract(final ServerRequest request) {
return calculate(request, (v1, v2) -> v1 - v2);
}

public Mono<ServerResponse> multiply(final ServerRequest request) {
return calculate(request, (v1, v2) -> v1 * v2);
}

public Mono<ServerResponse> divide(final ServerRequest request) {
return calculate(request, (v1, v2) -> v1 / v2);
}

private Mono<ServerResponse> calculate(final ServerRequest request,
final BiFunction<Integer, Integer, Integer> calculateFunc) {
final Tuple2<Integer, Integer> operands = extractOperands(request);
return ServerResponse
.ok()
.body(Mono.just(calculateFunc.apply(operands.getT1(), operands.getT2())), Integer.class);
}

private Tuple2<Integer, Integer> extractOperands(final ServerRequest request) {
return Tuples.of(parseOperand(request, "v1"), parseOperand(request, "v2"));
}

private int parseOperand(final ServerRequest request, final String param) {
try {
return Integer.parseInt(request.queryParam(param).orElse("0"));
} catch (final NumberFormatException e) {
return 0;
}
}
}

上述代码给出了处理不同请求的类 CalculatorHandler,其中包含的方法 add、subtract、multiply 和 divide 都是接口 HandlerFunction 的实现。这些方法分别对应加、减、乘、除四种运算。每种运算都是从 HTTP 请求中获取到两个作为操作数的整数,再把运算的结果返回。
在创建了处理请求的 HandlerFunction 之后,下一步是为这些 HandlerFunction 提供路由信息,也就是这些 HandlerFunction 被调用的条件。这是通过函数式接口 org.springframework.web.reactive.function.server.RouterFunction 来完成的。接口 RouterFunction 的方法 Mono<HandlerFunction<T extends ServerResponse>> route(ServerRequest request) 对每个 ServerRequest,都返回对应的 0 个或 1 个 HandlerFunction 对象,以 Mono<HandlerFunction> 来表示。

当找到对应的 HandlerFunction 时,该 HandlerFunction 被调用来处理该 ServerRequest,并把得到的 ServerResponse 返回。在使用 WebFlux 的 Spring Boot 应用中,只需要创建 RouterFunction 类型的 bean,就会被自动注册来处理请求并调用相应的 HandlerFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class Config {
@Bean
@Autowired
public RouterFunction<ServerResponse> routerFunction(final CalculatorHandler calculatorHandler) {
return RouterFunctions.route(
RequestPredicates.path("/calculator"),
request -> request.queryParam("operator").map(operator ->
Mono.justOrEmpty(ReflectionUtils.findMethod(
CalculatorHandler.class,
operator,
ServerRequest.class))
.flatMap(method -> (Mono<ServerResponse>) ReflectionUtils.invokeMethod(method, calculatorHandler, request))
.switchIfEmpty(ServerResponse.badRequest().build())
.onErrorResume(ex -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build()))
.orElse(ServerResponse.badRequest().build()));
}
}

上面的代码是相关的配置类 Config。方法 RouterFunctions.route 用来根据 Predicate 是否匹配来确定 HandlerFunction 是否被应用。RequestPredicates 中包含了很多静态方法来创建常用的基于不同匹配规则的 Predicate。如 RequestPredicates.path 用来根据 HTTP 请求的路径来进行匹配。此处我们检查请求的路径是 /calculator

使用 ServerRequest 的 queryParam 方法来获取到查询参数 operator 的值,然后通过反射 API 在类 CalculatorHandler 中找到与查询参数 operator 的值名称相同的方法来确定要调用的 HandlerFunction 的实现,最后调用查找到的方法来处理该请求。如果找不到查询参数 operator 或是 operator 的值不在识别的列表中,服务器端返回 400 错误;如果反射 API 的方法调用中出现错误,服务器端返回 500 错误。

其他

响应式数据流作为一种新的数据流规范应用于 Java 9 及其后续版本,并被多个供应商和技术企业采纳,这一规范的定位非常清晰,旨在提供同/异步数据序列流式控制机制,并在 JVM 上首先推广。该规范由 4 个 Java 接口,1 个 TCK 和一些样例组成。


响应式扩展,就是通常所说的 Rx,是一组定义良好的函数式 API,大规模扩展了观察者模式
Rx 模式支持响应式数据序列处理,主要的设计要点有:

  • 使用回调链分离时间/延迟:仅当数据可用时才会回调
  • 分离线程模型:用 Observable / Stream 来处理同步或异步
  • 控制错误链/终止:数据载荷信号以及错误与完成信号都传递给回调链
  • 解决各种预定义 API 中多重分散-聚合和构造问题

JVM 中响应式扩展的标准实现是 RxJava。它提供了强大的函数式 API,并将原始微软库中几乎全部的概念移植了过来。

响应式数据流和响应式扩展算是最近比较新的技术了,因为牵扯到异步非阻塞技术比较难理解,但是从 Spring5 的方向来看,这是未来,至于如何学习,我还在摸索那条路比较好。


Netty 示例相关参考:Github

参考

https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
https://www.jianshu.com/p/7ee89f70dfe5

喜欢就请我吃包辣条吧!

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~