省流:推荐使用带回调方法的异步发送

发后即忘

这种模式只管往broker里发数据,但是broker收没收到它不管。
通常情况下,这种发后即忘(fire-and-forget)的方式并不会出现问题,但也有意外情况:

  1. 遇到可重试异常,如果发送消息到broker时抛出异常,且是可重试异常,就会去重试执行方法。org.apache.kafka.clients.producer.internals.Sender类中有如下方法:

    1
    2
    3
    4
    5
    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
    return batch.attempts() < this.retries &&
    ((response.error.exception() instanceof RetriableException) ||
    (transactionManager != null && transactionManager.canRetry(response, batch)));
    }

    允许重试需要满足两个条件:

  2. 重试次数少于参数retries指定的值;

  3. 异常是RetriableException类型或者TransactionManager允许重试;

同步发送

严格上来说,Kafka并没有同步发送机制。因为发送消息的send方法本身是异步的。
但是Kafka也有提供一些机制让我们做到同步发送。
具体怎么做到同步发送呢?

  • send方法是有返回值的,返回值是Future类型,这是一个接口,可以提供异步的执行结果。我们可以使用Future的get方法去等待异步结束。这样就把异步改成了同步。
1
2
3
4
5
6
7
8
9
10
11
12
try {
// RecordMetadata包含了成功发送到 Kafka Broker 的消息的元数据信息
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.printf("Send Record(key=%s value=%s) " +
"meta(Partition=%d, offset=%d) \n",
record.key(), record.value(),
metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

  • 如果遇到可重试异常,在超出最大重试次数之前如果能解决,我们的代码就不会捕获到异常,否则就会爆出异常需要捕获
  • 如果是不可重试异常,是肯定要做异常捕获的。做好异常捕获记录详细日志信息,日后排查问题也方便

同步发送的可靠性比较高,如果发生异常,可以捕获并处理异常;并不会像 “发后即忘” 的方式直接造成消息丢失。但是,同步发送的方式性能会比较差,需要阻塞等待一条消息发送成功之后才会发送下一条消息。kafka的批消息发送也就成摆设了。

异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null ) {
System.out.printf("Send Record(key=%s value=%s) " +
"meta(Partition=%d, offset=%d) \n",
record.key(), record.value(),
metadata.partition(), metadata.offset());
}
if (exception != null) {
exception.printStackTrace();
}
}
});

异步发送消息,比前两种方法多了一个回调函数的参数
不需要 try/catch,因为异常会在回调函数中处理。
回调方法中的 onCompletion() 方法中的两个参数是互斥的。消息发送成功时,metadata 不为 null 而 exception 为 null,反之则反之。
回调方法的真正优势在于发送数据失败时,可以在数据发送失败时采取一些措施,如抛出异常、记录错误或将失败的数据写入到其他地方进行分析。很显然,使用带回调函数的异步发送方式既高效、又优雅,通常在生产环境中都建议使用这种方式发送消息。

:::warning
注意:回调函数是在生产者的主线程中执行的。必须要确保回调方法的执行速度足够快,绝对不能在回调方法中产生阻塞,否则会减慢生产者的运行速度。
:::