更新时间:2023-10-18 来源:黑马程序员 浏览量:
Kafka的ACK机制是指生产者发送消息到Kafka代理并接收确认的方式。ACK机制有三种不同级别,用于控制生产者在消息发送后接收确认时的可靠性。这些级别分别是:
这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。
这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。
这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。
下面是使用Java语言演示如何配置不同的ACK机制:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 配置 ACKs // acks=0:不等待确认 // acks=1:等待分区领导者确认 // acks=all:等待所有副本确认 props.put("acks", "all"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("消息发送成功,偏移量:" + metadata.offset()); } else { System.err.println("消息发送失败: " + exception.getMessage()); } } }); producer.close(); } }
在上面的示例中,我们配置了ACKs为 "all",这意味着生产者将等待所有副本的确认,以确保消息的可靠性。根据实际需求,我们可以将acks的值设置为"0"或"1"以实现不同级别的可靠性。