10.2.5 拆分器
有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter (如图 10.6) 将为分割并处理这些消息。
图 10.6 拆分器将消息分解为两个或多个可由单独的子流。
Splitter 在很多情况下都很有用,但是有两个基本用例可以使用 Splitter:
- 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
- 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。
当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个 POJO 即可,该 POJO 提取传入的有效负载的各个部分,并将它们作为集合的元素返回。
例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的 OrderSplitter 将完成这项工作:
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
然后,可以使用 @Splitter
注解将 OrderSplitter bean 声明为集成流的一部分,如下所示:
@Bean
@Splitter(inputChannel="poChannel",
outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
在这里,购买订单到达名为 poChannel 的通道,并被 OrderSplitter 分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为 splitOrderChannel 的通道。在流的这一点上,可以声明一个 PayloadTypeRouter 来将账单信息和项目,并路由到它们自己的子流:
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(
List.class.getName(), "lineItemsChannel");
return router;
}
顾名思义,PayloadTypeRouter 根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为 BillingInfo 的消息路由到一个名为 billingInfoChannel 的通道进行进一步处理。至于项目信息,它们在 java.util.List 集合包中;因此,可以将 List 类型的有效负载映射到名为 lineItemsChannel 的通道中。
按照目前的情况,流分为两个子流:一个是 BillingInfo 对象流,另一个是 List<LineItem>
流。但是,如果想进一步分割它,而不是处理 LineItem 列表,而是分别处理每个 LineItem,该怎么办呢?要将列表拆分为多个消息(每个行项对应一条消息),只需编写一个方法(而不是 bean),该方法使用 @Splitter 进行注解,并返回 LineItems 集合,可能类似如下:
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
当携带 List<LineItem>
的有效负载的消息到达名为 lineItemsChannel 的通道时,它将传递到 lineItemSplitter()
方法。根据 Splitter 的规则,该方法必须返回要 Splitter 的项的集合。在本例中,已经有了 LineItems 的集合,因此只需直接返回该集合。因此,集合中的每个 LineItem 都以其自己的消息形式发布到名为 lineItemChannel 的通道。
如果您想使用 Java DSL 来声明相同的 Splitter/Router 配置,您可以调用 split()
和 route()
:
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle((billingInfo, h) -> {
...
}))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle((lineItem, h) -> {
...
}))
)
.get();
流定义的 DSL 形式当然更简洁,如果不是说更难理解的话。跟随通过将 lambda 提取到方法,可以稍微清理一下这一点。例如 以下三种方法可用于替换流定义中使用的 lambda:
private String route(Object p) {
return p.getClass().isAssignableFrom(BillingInfo.class)
? "BILLING_INFO"
: "LINE_ITEMS";
}
private BillingInfo handleBillingInfo(
BillingInfo billingInfo, MessageHeaders h) {
// ...
}
private LineItem handleLineItems(
LineItem lineItem, MessageHeaders h) {
// ...
}
然后,可以使用如下方法引用重写集成流:
return IntegrationFlows
...
.split()
.route(
this::route,
mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle(this::handleBillingInfo))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle(this::handleLineItems)));
无论哪种方式,它都使用与 Java 配置示例相同的 OrderSplitter 来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。