14.2.4 双向发送消息
到目前为止,在我们看到的所有通信模型中,客户端发送一个请求,而服务器响应为零、一或多个。在 请求/流
模型中,服务器能够将多个响应返回到客户端,但客户端仍仅限于发送单个请求。但是为什么只有服务器能发送多次数据呢?为什么客户不能发送多个请求呢?
这就是 通道
模型的用武之地。在 通道
模式下,客户端可以将多个请求流式传输到服务器,服务器也可以将多个请求流式传输回客户端。双方双向对话,它是 RSocket 中最灵活的通信模型,尽管也是最复杂的。
为了演示如何在 Spring 中使用 RSocket 通道通信,让我们创建一个计算账单上的小费、接收一个 Flux 请求并以 Flux 响应的服务。首先,我们需要定义表示请求和响应数据的模型对象。清单 14.8 中所示的类表示客户端发出,服务器接收的数据模型类。
清单 14.8 表示入站小费请求的模型。
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityIn {
private BigDecimal billTotal;
private int percent;
}
GratuityIn 包含计算小费所需的两项基本信息:账单总数和百分比。
清单 14.9 中的 GratuityOut 类表示响应,如下所示,包含账单总数以及包含计算出的小费金额。
清单 14.9 表示出站小费响应的模型。
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityOut {
private BigDecimal billTotal;
private int percent;
private BigDecimal gratuity;
}
清单 14.10 中的 GratuityController 处理小费请求,看起来非常像我们在本章之前写的控制器。
清单 14.10 一个 RSocket 控制器,它在一个通道上接收并返回多条消息。
package rsocket;
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class GratuityController {
@MessageMapping("gratuity")
public Flux<GratuityOut> calculate(Flux<GratuityIn> gratuityInFlux) {
return gratuityInFlux
.doOnNext(in -> log.info("Calculating gratuity: " + in))
.map(in -> {
double percentAsDecimal = in.getPercent() / 100.0;
BigDecimal gratuity = in.getBillTotal()
.multiply(BigDecimal.valueOf(percentAsDecimal));
return new GratuityOut(in.getBillTotal(), in.getPercent(), gratuity);
});
}
}
然而,有一个显著的区别:它不仅返回 Flux,而且还接受 Flux 作为输入。与 请求/流
模型一样,返回的 Flux 使控制器能够将多个值流式传输到客户端。但是 Flux 参数是 通道
模式与 请求/流
模型主要区别。输入的 Flux 参数,允许控制器处理来自客户端的请求流。
通道
模型的客户端与 请求/流
模型的客户端仅在以下方面不同:它向服务器发送 Flux<GratuityIn>
而不是 Mono<GratuityIn>
,如清单 14.11 所示。
清单 14.11 通过通道发送和接收多条消息的客户端。
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<GratuityIn> gratuityInFlux =
Flux.fromArray(new GratuityIn[] {
new GratuityIn(new BigDecimal(35.50), 18),
new GratuityIn(new BigDecimal(10.00), 15),
new GratuityIn(new BigDecimal(23.25), 20),
new GratuityIn(new BigDecimal(52.75), 18),
new GratuityIn(new BigDecimal(80.00), 15)
})
.delayElements(Duration.ofSeconds(1));
tcp
.route("gratuity")
.data(gratuityInFlux)
.retrieveFlux(GratuityOut.class)
.subscribe(out ->
log.info(out.getPercent() + "% gratuity on "
+ out.getBillTotal() + " is "
+ out.getGratuity()));
在本例中,Flux<GratuityIn>
是使用 fromArray()
方法静态创建的,但可以从任何数据源创建的 Flux。很可能是从响应式数据 Repository (我们将在下一章了解更多信息)。
您可能已经观察到,服务器接受和返回的响应类型,用于确定支持的 RSocket 通信模式。表 14.1 总结了服务器的输入/输出类型与 RSocket 通信模型。
表14.1 支持的 RSocket 模型由 handler 方法的参数和返回类型决定。
RSocket 通信模式 | Handler 参数类型 | Handler 返回类型 |
---|---|---|
请求/响应 | Mono | Mono |
请求/流 | Mono | Flux |
即发即忘 | Mono | Mono |
通道 | Flux | Flux |
您可能想知道服务器是否可以接受并返回一个 Mono。简言之,这不行。尽管您可以想象传入一个 Flux,处理多个请求后以 Mono<Void>
进行回应,这 `即发即忘 模型也可以混搭,但没有该场景的 RSocket 模型。因此,它不受支持。