更新时间:2023-10-18 来源:黑马程序员 浏览量:

Kafka的ACK机制是指生产者发送消息到Kafka代理并接收确认的方式。ACK机制有三种不同级别,用于控制生产者在消息发送后接收确认时的可靠性。这些级别分别是:
这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。
这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。
这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。

下面是使用Java语言演示如何配置不同的ACK机制:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置 ACKs
// acks=0:不等待确认
// acks=1:等待分区领导者确认
// acks=all:等待所有副本确认
props.put("acks", "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功,偏移量:" + metadata.offset());
} else {
System.err.println("消息发送失败: " + exception.getMessage());
}
}
});
producer.close();
}
}在上面的示例中,我们配置了ACKs为 "all",这意味着生产者将等待所有副本的确认,以确保消息的可靠性。根据实际需求,我们可以将acks的值设置为"0"或"1"以实现不同级别的可靠性。