14.2.2 处理 请求/流 消息

并非所有的交互都是一个请求和一个响应的交互。例如,在股票报价场景中,请求给定股票的报价流可能很有用。在 请求/响应 模型中,客户端需要重复轮询当前股价。但是在 请求/流模型中,客户机只需要询问一次股票价格,然后订阅定期更新流。

为了说明 请求/流 模型,让我们实现股票报价的服务器和客户端。首先,我们需要定义一个可以携带股票报价信息的对象。清单 14.4 中的 StockQuote 类用于此目的。

清单 14.4 表示股票报价的模型类。

package rsocket;
import java.math.BigDecimal;
import java.time.Instant;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class StockQuote {
    private String symbol;
    private BigDecimal price;
    private Instant timestamp;
}

如您所见,股票报价带有股票号、价格和表示价格的时间戳。为了简洁起见,我们使用 Lombok 来帮助构造和访问方法。

现在,让我们编写一个控制器来处理股票报价请求。您会发现清单 14.5 中的 StockQuoteController 与上一节的 GreetingController 有些相似。

清单 14.5 一个用于流式传输股票报价的 RSocket 控制器。

package rsocket;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;

import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import reactor.core.publisher.Flux;

@Controller
public class StockQuoteController {

  @MessageMapping("stock/{symbol}")
  public Flux<StockQuote> getStockPrice(
      @DestinationVariable("symbol") String symbol) {
    return Flux
      .interval(Duration.ofSeconds(1))
      .map(i -> {
        BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
      return new StockQuote(symbol, price, Instant.now());
    });
  }
}

这里,getStockPrice() 法处理“stock/{symbol}”路由上的传入请求,接受带有 @DestinationVariable 注解的路由中的股票号码。为了简单起见,价格不是查找实际股票价格,而是随机值计算的(这可能不会准确地模拟某些实际股票的波动性)。

然而,最值得注意的是 getStockPrice() 返回一个 Flux<StockQuote>,而不是 Mono<StockQuote>。这给了 Spring 一个提示,该处理方法支持 请求/流 模型。在内部,Flux 按每秒一次的间隔来持续创。该 Flux 被映射到另一个生成随机股票报价的 Flux 上。简单地说,getStockPrice() 方法处理的单个请求将返回多个值,每秒钟一次。

请求/流 模型的客户端与 请求/响应 模型的客户端差别不大。唯一的关键区别在于,不是对请求程序调用 retrieveMono(),而是 应该调用 retreiveFlux()。股票报价服务的客户端可能如下所示:

String stockSymbol = "XYZ";

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
  .route("stock/{symbol}", stockSymbol)
  .retrieveFlux(StockQuote.class)
  .doOnNext(stockQuote -> {
    log.info(
        "Price of " + stockQuote.getSymbol() +
        " : " + stockQuote.getPrice() +
        " (at " + stockQuote.getTimestamp() + ")");
  })
  .subscribe();

现在,我们已经了解了如何创建 RSocket 服务器和客户端来处理单个和多个响应。但是,如果服务器没有要发送的响应,或者客户端没有响应,该怎么办呢?让我们看看如何使用 即发即忘 沟通模式。

results matching ""

    No results matching ""