14.2.2 处理 请求/流
消息
并非所有的交互都是一个请求和一个响应的交互。例如,在股票报价场景中,请求给定股票的报价流可能很有用。在 请求/响应
模型中,客户端需要重复轮询当前股价。但是在 请求/流
模型中,客户机只需要询问一次股票价格,然后订阅定期更新流。
为了说明 请求/流
模型,让我们实现股票报价的服务器和客户端。首先,我们需要定义一个可以携带股票报价信息的对象。清单 14.4 中的 StockQuote 类用于此目的。
清单 14.4 表示股票报价的模型类。
package rsocket;
import java.math.BigDecimal;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class StockQuote {
private String symbol;
private BigDecimal price;
private Instant timestamp;
}
如您所见,股票报价带有股票号、价格和表示价格的时间戳。为了简洁起见,我们使用 Lombok 来帮助构造和访问方法。
现在,让我们编写一个控制器来处理股票报价请求。您会发现清单 14.5 中的 StockQuoteController 与上一节的 GreetingController 有些相似。
清单 14.5 一个用于流式传输股票报价的 RSocket 控制器。
package rsocket;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
@Controller
public class StockQuoteController {
@MessageMapping("stock/{symbol}")
public Flux<StockQuote> getStockPrice(
@DestinationVariable("symbol") String symbol) {
return Flux
.interval(Duration.ofSeconds(1))
.map(i -> {
BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
return new StockQuote(symbol, price, Instant.now());
});
}
}
这里,getStockPrice()
法处理“stock/{symbol}”路由上的传入请求,接受带有 @DestinationVariable 注解的路由中的股票号码。为了简单起见,价格不是查找实际股票价格,而是随机值计算的(这可能不会准确地模拟某些实际股票的波动性)。
然而,最值得注意的是 getStockPrice()
返回一个 Flux<StockQuote>
,而不是 Mono<StockQuote>
。这给了 Spring 一个提示,该处理方法支持 请求/流
模型。在内部,Flux 按每秒一次的间隔来持续创。该 Flux 被映射到另一个生成随机股票报价的 Flux 上。简单地说,getStockPrice()
方法处理的单个请求将返回多个值,每秒钟一次。
请求/流
模型的客户端与 请求/响应
模型的客户端差别不大。唯一的关键区别在于,不是对请求程序调用 retrieveMono()
,而是
应该调用 retreiveFlux()
。股票报价服务的客户端可能如下所示:
String stockSymbol = "XYZ";
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
.route("stock/{symbol}", stockSymbol)
.retrieveFlux(StockQuote.class)
.doOnNext(stockQuote -> {
log.info(
"Price of " + stockQuote.getSymbol() +
" : " + stockQuote.getPrice() +
" (at " + stockQuote.getTimestamp() + ")");
})
.subscribe();
现在,我们已经了解了如何创建 RSocket 服务器和客户端来处理单个和多个响应。但是,如果服务器没有要发送的响应,或者客户端没有响应,该怎么办呢?让我们看看如何使用 即发即忘
沟通模式。