14.2.4 双向发送消息

到目前为止,在我们看到的所有通信模型中,客户端发送一个请求,而服务器响应为零、一或多个。在 请求/流 模型中,服务器能够将多个响应返回到客户端,但客户端仍仅限于发送单个请求。但是为什么只有服务器能发送多次数据呢?为什么客户不能发送多个请求呢?

这就是 通道 模型的用武之地。在 通道 模式下,客户端可以将多个请求流式传输到服务器,服务器也可以将多个请求流式传输回客户端。双方双向对话,它是 RSocket 中最灵活的通信模型,尽管也是最复杂的。

为了演示如何在 Spring 中使用 RSocket 通道通信,让我们创建一个计算账单上的小费、接收一个 Flux 请求并以 Flux 响应的服务。首先,我们需要定义表示请求和响应数据的模型对象。清单 14.8 中所示的类表示客户端发出,服务器接收的数据模型类。

清单 14.8 表示入站小费请求的模型。

package rsocket;

import java.math.BigDecimal;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class GratuityIn {

  private BigDecimal billTotal;
  private int percent;

}

GratuityIn 包含计算小费所需的两项基本信息:账单总数和百分比。

清单 14.9 中的 GratuityOut 类表示响应,如下所示,包含账单总数以及包含计算出的小费金额。

清单 14.9 表示出站小费响应的模型。

package rsocket;

import java.math.BigDecimal;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class GratuityOut {

  private BigDecimal billTotal;
  private int percent;
  private BigDecimal gratuity;

}

清单 14.10 中的 GratuityController 处理小费请求,看起来非常像我们在本章之前写的控制器。

清单 14.10 一个 RSocket 控制器,它在一个通道上接收并返回多条消息。

package rsocket;
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Controller
@Slf4j
public class GratuityController {

  @MessageMapping("gratuity")
  public Flux<GratuityOut> calculate(Flux<GratuityIn> gratuityInFlux) {
    return gratuityInFlux
        .doOnNext(in -> log.info("Calculating gratuity: " + in))
        .map(in -> {
          double percentAsDecimal = in.getPercent() / 100.0;
          BigDecimal gratuity = in.getBillTotal()
            .multiply(BigDecimal.valueOf(percentAsDecimal));
          return new GratuityOut(in.getBillTotal(), in.getPercent(), gratuity);
    });
  }

}

然而,有一个显著的区别:它不仅返回 Flux,而且还接受 Flux 作为输入。与 请求/流 模型一样,返回的 Flux 使控制器能够将多个值流式传输到客户端。但是 Flux 参数是 通道 模式与 请求/流 模型主要区别。输入的 Flux 参数,允许控制器处理来自客户端的请求流。

通道 模型的客户端与 请求/流 模型的客户端仅在以下方面不同:它向服务器发送 Flux<GratuityIn> 而不是 Mono<GratuityIn>,如清单 14.11 所示。

清单 14.11 通过通道发送和接收多条消息的客户端。

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);

Flux<GratuityIn> gratuityInFlux =
    Flux.fromArray(new GratuityIn[] {
        new GratuityIn(new BigDecimal(35.50), 18),
        new GratuityIn(new BigDecimal(10.00), 15),
        new GratuityIn(new BigDecimal(23.25), 20),
        new GratuityIn(new BigDecimal(52.75), 18),
        new GratuityIn(new BigDecimal(80.00), 15)
    })
    .delayElements(Duration.ofSeconds(1));

    tcp
      .route("gratuity")
      .data(gratuityInFlux)
      .retrieveFlux(GratuityOut.class)
      .subscribe(out ->
        log.info(out.getPercent() + "% gratuity on "
            + out.getBillTotal() + " is "
            + out.getGratuity()));

在本例中,Flux<GratuityIn> 是使用 fromArray() 方法静态创建的,但可以从任何数据源创建的 Flux。很可能是从响应式数据 Repository (我们将在下一章了解更多信息)。

您可能已经观察到,服务器接受和返回的响应类型,用于确定支持的 RSocket 通信模式。表 14.1 总结了服务器的输入/输出类型与 RSocket 通信模型。

表14.1 支持的 RSocket 模型由 handler 方法的参数和返回类型决定。

RSocket 通信模式 Handler 参数类型 Handler 返回类型
请求/响应 Mono Mono
请求/流 Mono Flux
即发即忘 Mono Mono
通道 Flux Flux

您可能想知道服务器是否可以接受并返回一个 Mono。简言之,这不行。尽管您可以想象传入一个 Flux,处理多个请求后以 Mono<Void> 进行回应,这 `即发即忘 模型也可以混搭,但没有该场景的 RSocket 模型。因此,它不受支持。

results matching ""

    No results matching ""