9.3.2 使用 KafkaTemplate 发送消息
在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);
注意到的第一件事是没有 convertAndSend()
方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send()
方法都在做 convertAndSend()
的工作。
再者 send()
和 sendDefault()
的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:
- 发送消息的 topic(
send()
方法必要的参数) - 写入 topic 的分区(可选)
- 发送记录的键(可选)
- 时间戳(可选;默认为
System.currentTimeMillis()
) - payload(必须)
topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send()
和 sendDefault()
的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。
对于 send()
方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。
使用 KafkaTemplate 及其 send()
方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。
程序清单 8.8 使用 KafkaTemplate 发送订单
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import tacos.TacoOrder;
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, TacoOrder> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, TacoOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
在 OrderMessagingService 的这个实现中,sendOrder()
方法使用注入的 KafkaTemplate 的 send()
方法向名为 tacocloud.orders.topic 的主题发送 Order。代码中除了使用 "Kafka" 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。而且,就像其他实现一样,对于 OrderMessagingService,可以将其注入 OrderApicController 当通过 /api/orders
下单时,可以通过 Kafka 发送消息。
在我们创建消息接收器的 Kafka 实现之前,您需要一个控制台来查看发送的内容。Kafka 有几种可用的管理控制台,包括 Offset Explorer ( https://www.kafkatool.com/ ) 以及 Confluent’s Apache Kafka UI ( https://www.confluent.io/product/confluent-platform/gui-driven-management-and-monitoring/ ).
如果设置了默认主题,可以稍微简化 sendOrder()
方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: tacocloud.orders.topic
然后,在 sendOrder()
方法中,可以调用 sendDefault()
而不是 send()
,并且不指定主题名称:
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.sendDefault(order);
}
现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。