9.3.2 使用 KafkaTemplate 发送消息

在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, 
                Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, 
                Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                         K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                   Long timestamp, K key, V data);

注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。

再者 send()sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:

  • 发送消息的 topic(send() 方法必要的参数)
  • 写入 topic 的分区(可选)
  • 发送记录的键(可选)
  • 时间戳(可选;默认为 System.currentTimeMillis()
  • payload(必须)

topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send()sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。

对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。

使用 KafkaTemplate 及其 send() 方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。

程序清单 8.8 使用 KafkaTemplate 发送订单

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import tacos.TacoOrder;

@Service
public class KafkaOrderMessagingService
                  implements OrderMessagingService {

  private KafkaTemplate<String, TacoOrder> kafkaTemplate;

  @Autowired
  public KafkaOrderMessagingService(
        KafkaTemplate<String, TacoOrder> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  @Override
  public void sendOrder(TacoOrder order) {
    kafkaTemplate.send("tacocloud.orders.topic", order);
  }
}

在 OrderMessagingService 的这个实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法向名为 tacocloud.orders.topic 的主题发送 Order。代码中除了使用 "Kafka" 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。而且,就像其他实现一样,对于 OrderMessagingService,可以将其注入 OrderApicController 当通过 /api/orders 下单时,可以通过 Kafka 发送消息。

在我们创建消息接收器的 Kafka 实现之前,您需要一个控制台来查看发送的内容。Kafka 有几种可用的管理控制台,包括 Offset Explorer ( https://www.kafkatool.com/ ) 以及 Confluent’s Apache Kafka UI ( https://www.confluent.io/product/confluent-platform/gui-driven-management-and-monitoring/ ).

如果设置了默认主题,可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: tacocloud.orders.topic

然后,在 sendOrder() 方法中,可以调用 sendDefault() 而不是 send(),并且不指定主题名称:

@Override
public void sendOrder(TacoOrder order) {
  kafkaTemplate.sendDefault(order);
}

现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。

results matching ""

    No results matching ""