12.1.2 编写响应式 Controller
您可能还记得,在第 7 章中,您为 Taco Cloud 的 REST API 创建了一些 controller。这些 controller 具有处理请求的方法,这些方法根据域类型(如 Order 和 Taco)或域类型的集合,处理输入和输出。提醒一下,请考虑您在第 7 章中写过的 DesignTacoController 中的以下片段:
@RestController
@RequestMapping(path="/api/tacos",
produces="application/json")
@CrossOrigin(origins="*")
public class TacoController {
...
@GetMapping(params="recent")
public Iterable<Taco> recentTacos() {
PageRequest page = PageRequest.of(
0, 12, Sort.by("createdAt").descending());
return tacoRepo.findAll(page).getContent();
}
...
}
如前所述,recentTacos()
controller 处理 /design/recent
的 HTTP GET 请求,以返回最近创建的 tacos 的列表。更具体地说,它返回一个 Iterable 类型的 Taco。这主要是因为这是从 respository 的 findAll()
方法返回的,或者更准确地说,是从 findAll()
返回的页面对象的 getContent()
方法返回的。
这很好,但是 Iterable 不是一个响应式的。您将不能对它应用任何响应式操作,也不能让框架利用它作为响应式类型在多个线程上分割任何工作。您想要的是 recentTacos()
返回一个 Flux<Taco>
。
这里有一个简单但有点有限的选项,就是重写 recentTacos()
将 Iterable 转换为 Flux。而且,当您使用它时,可以去掉分页代码,并用调用 take()
来替换它:
@GetMapping(params="recent")
public Flux<Taco> recentTacos() {
return Flux.fromIterable(tacoRepo.findAll()).take(12);
}
使用 Flux.fromIterable()
,可以将 Iterable<Taco>
转换为 Flux<Taco>
。现在您正在使用一个 Flux,可以使用take()
操作将返回的 Flux 限制为最多 12 个 Taco 对象。不仅代码简单,它还处理一个响应式 Flux,而不是一个简单的 Iterable。
迄今为止,编写响应式代码是一个成功的举措。但是,如果 repository 提供了一个可以开始使用的 Flux,那就更好了,这样就不需要进行转换。如果是这样的话,那么 recentTacos()
可以写成如下:
@GetMapping(params="recent")
public Flux<Taco> recentTacos() {
return tacoRepo.findAll().take(12);
}
那就更好了!理想情况下,一个响应式 cotroller 将是一个端到端的响应式栈的顶端,包括 controller、repository、database 和任何可能位于两者之间的 serviec。这种端到端的响应式栈如图 12.3 所示:
图 12.3 为了最大限度地发挥响应式 web 框架的优势,它应该是完整的端到端响应式堆栈的一部分。
这样的端到端的栈要求 repository 被写入以返回一个 Flux,而不是一个Iterable。在下一章中,我们将探讨如何编写响应式 repostitory,但下面我们将看一看响应式 TacoRepository 可能是什么样子:
package tacos.data;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import tacos.Taco;
public interface TacoRepository
extends ReactiveCrudRepository<Taco, Long> {
}
然而,在这一点上,最重要的是,除了使用 Flux 而不是 Iterable 以及如何获得 Flux 外,定义响应式 WebFlux controller 的编程模型与非响应式 Spring MVC controller 没有什么不同。两者都用 @RestController 和类级别的 @RequestMapping 进行了注解。它们都有请求处理函数,在方法级别用 @GetMapping 进行注解。真正的问题是处理程序方法返回什么类型。
另一个要做的重要观察是,尽管从 repository 中获得了一个 Flux<Taco>
,但您可以在不调用 subscribe()
的情况下返回它。实际上,框架将为您调用 subscribe()
。这意味着当处理对 /api/tacos?recent
的请求时,recentTacos()
方法将被调用,并在从数据库中获取数据之前返回!
返回单个值
作为另一个例子,请考虑 DesignTacoController 中的 tacoById() 方法,如第 6 章中所述:
@GetMapping("/{id}")
public Taco tacoById(@PathVariable("id") Long id) {
Optional<Taco> optTaco = tacoRepo.findById(id);
if (optTaco.isPresent()) {
return optTaco.get();
}
return null;
}
在这里,这个方法处理 /design/{id}
的 GET 请求并返回一个 Taco 对象。因为 repository 的 findById()
返回一个 Optional,所以还必须编写一些笨拙的代码来处理这个问题。但是假设 findById()
返回 Mono<Taco>
而不是 Optional<Taco>
。在这种情况下,可以重写 controller 的 tacoById()
,如下所示:
@GetMapping("/{id}")
public Mono<Taco> tacoById(@PathVariable("id") Long id) {
return tacoRepo.findById(id);
}
哇!这就简单多了。然而,更重要的是,通过返回 Mono<Taco>
而不是 Taco,可以使 Spring WebFlux 以一种被动的方式处理响应。因此,您的API将更好地响应大的负载。
使用 RxJava 类型
值得指出的是,虽然在使用 Spring WebFlux 时,像 Flux 和 Mono 这样的 Reactor 类型是一个自然的选择,但是您也可以选择使用像 Observable 和 Single 这样的 RxJava 类型。例如,假设 DesignTacoController 和后端 repository 之间有一个 service,它处理 RxJava 类型。在这种情况下,recentTacos()
方法的编写方式如下:
@GetMapping(params = "recent")
public Observable<Taco> recentTacos() {
return tacoService.getRecentTacos();
}
类似地,可以编写 tacoById()
方法来处理 RxJava 的 Single 元素,而不是 Mono:
@GetMapping("/{id}")
public Single<Taco> tacoById(@PathVariable("id") Long id) {
return tacoService.lookupTaco(id);
}
此外,Spring WebFlux controller 方法还可以返回 RxJava 的 Completable,这相当于 Reactor 中的 Mono<Void>
。WebFlux 还可以返回一个 Flowable,作为 Observable 或 Reactor 的 Flux 的替代。
响应式地处理输入
到目前为止,我们只关心控制器方法返回的响应式类型。但是使用 Spring WebFlux,您还可以接受 Mono 或 Flux 作为处理程序方法的输入。请考虑 DesignTacoController 中 postTaco()
的原始实现:
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
public Taco postTaco(@RequestBody Taco taco) {
return tacoRepo.save(taco);
}
正如最初编写的,postTaco()
不仅返回一个简单的 Taco 对象,而且还接受一个绑定到请求主体内容的 Taco 对象。这意味着在请求有效负载完全解析并用于实例化 Taco 对象之前,无法调用 postTaco()
。这也意味着 postTaco()
在对 repository 的 save()
方法的阻塞调用,在返回之前无法返回。简言之,请求被阻塞了两次:当它进入 postTaco()
时,然后在 postTaco()
内部被再次阻塞。但是,通过对 postTaco()
应用一点响应式编码,可以使其成为一种完全无阻塞的请求处理方法:
@PostMapping(consumes = "application/json")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
return tacoRepo.saveAll(tacoMono).next();
}
在这里,postTaco()
接受 Mono<Taco>
并调用 repository 的 saveAll()
方法,正如您将在下一章中看到的,该方法接受 Reactive Streams Publisher 的任何实现,包括 Mono 或 Flux。saveAll()
方法返回一个 Flux<Taco>
,但是因为是从 Mono 开始的,所以 Flux 最多会发布一个 Taco。因此,您可以调用 next()
来获取将从 postTaco()
返回的 Mono<Taco>
。
通过接受 Mono<Taco>
作为输入,可以立即调用该方法,而无需等待 Taco 从请求体被解析。由于 repository 也是被动的,它将接受一个 Mono 并立即返回一个 Flux<Taco>
,从中调用 next()
并返回 Mono<Taco>
。所有这些都是在处理请求之前完成的!
或者,您也可以像这样实现 postTaco()
:
@PostMapping(consumes = "application/json")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
return tacoMono.flatMap(tacoRepo::save);
}
这种方法将事情翻转过来,使 tacoMono 成为行为的驱动者。tacoMono 中包含的 Taco 通过 flatMap()
传递给 repository 的 save()
方法,导致返回一个新的 Mono<Taco>
。
任何一种方法都有效,postTaco()
可能还有其他几种写法,选择对您最容易理解的方法就好。
Spring WebFlux 是 Spring MVC 的一个极好的替代品,它提供了使用与 Spring MVC 相同的开发模型编写响应式 web 应用程序的选项。不过,Spring 还有另一个新的窍门。让我们看看如何使用 Spring 的新函数式编程风格创建响应式 API。