10.2.5 拆分器

有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter (如图 10.6) 将为分割并处理这些消息。

图 10.6 拆分器将消息分解为两个或多个可由单独的子流。

Splitter 在很多情况下都很有用,但是有两个基本用例可以使用 Splitter:

  • 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
  • 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。

当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个 POJO 即可,该 POJO 提取传入的有效负载的各个部分,并将它们作为集合的元素返回。

例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的 OrderSplitter 将完成这项工作:

public class OrderSplitter {
  public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
    ArrayList<Object> parts = new ArrayList<>();
    parts.add(po.getBillingInfo());
    parts.add(po.getLineItems());
    return parts;
  }
}

然后,可以使用 @Splitter 注解将 OrderSplitter bean 声明为集成流的一部分,如下所示:

@Bean
@Splitter(inputChannel="poChannel",
      outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
  return new OrderSplitter();
}

在这里,购买订单到达名为 poChannel 的通道,并被 OrderSplitter 分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为 splitOrderChannel 的通道。在流的这一点上,可以声明一个 PayloadTypeRouter 来将账单信息和项目,并路由到它们自己的子流:

@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
  PayloadTypeRouter router = new PayloadTypeRouter();
  router.setChannelMapping(
      BillingInfo.class.getName(), "billingInfoChannel");
  router.setChannelMapping(
      List.class.getName(), "lineItemsChannel");
  return router;
}

顾名思义,PayloadTypeRouter 根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为 BillingInfo 的消息路由到一个名为 billingInfoChannel 的通道进行进一步处理。至于项目信息,它们在 java.util.List 集合包中;因此,可以将 List 类型的有效负载映射到名为 lineItemsChannel 的通道中。

按照目前的情况,流分为两个子流:一个是 BillingInfo 对象流,另一个是 List<LineItem> 流。但是,如果想进一步分割它,而不是处理 LineItem 列表,而是分别处理每个 LineItem,该怎么办呢?要将列表拆分为多个消息(每个行项对应一条消息),只需编写一个方法(而不是 bean),该方法使用 @Splitter 进行注解,并返回 LineItems 集合,可能类似如下:

@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
  return lineItems;
}

当携带 List<LineItem> 的有效负载的消息到达名为 lineItemsChannel 的通道时,它将传递到 lineItemSplitter() 方法。根据 Splitter 的规则,该方法必须返回要 Splitter 的项的集合。在本例中,已经有了 LineItems 的集合,因此只需直接返回该集合。因此,集合中的每个 LineItem 都以其自己的消息形式发布到名为 lineItemChannel 的通道。

如果您想使用 Java DSL 来声明相同的 Splitter/Router 配置,您可以调用 split()route()

return IntegrationFlows
  ...
    .split(orderSplitter())
    .<Object, String> route(
      p -> {
        if (p.getClass().isAssignableFrom(BillingInfo.class)) {
          return "BILLING_INFO";
        } else {
          return "LINE_ITEMS";
        }
      }, mapping -> mapping
        .subFlowMapping("BILLING_INFO", sf -> sf
          .<BillingInfo> handle((billingInfo, h) -> {
          ...
        }))
        .subFlowMapping("LINE_ITEMS", sf -> sf
          .split()
          .<LineItem> handle((lineItem, h) -> {
           ...
        }))
      )
    .get();

流定义的 DSL 形式当然更简洁,如果不是说更难理解的话。跟随通过将 lambda 提取到方法,可以稍微清理一下这一点。例如 以下三种方法可用于替换流定义中使用的 lambda:

private String route(Object p) {
  return p.getClass().isAssignableFrom(BillingInfo.class)
    ? "BILLING_INFO"
    : "LINE_ITEMS";
}
private BillingInfo handleBillingInfo(
    BillingInfo billingInfo, MessageHeaders h) {
  // ...
}
private LineItem handleLineItems(
    LineItem lineItem, MessageHeaders h) {
  // ...
}

然后,可以使用如下方法引用重写集成流:

return IntegrationFlows
  ...
    .split()
    .route(
      this::route,
      mapping -> mapping
        .subFlowMapping("BILLING_INFO", sf -> sf
          .<BillingInfo> handle(this::handleBillingInfo))
        .subFlowMapping("LINE_ITEMS", sf -> sf
          .split()
          .<LineItem> handle(this::handleLineItems)));

无论哪种方式,它都使用与 Java 配置示例相同的 OrderSplitter 来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。

results matching ""

    No results matching ""