Loading... # 消息中间件之Apache Kafka(三) ## 基础API ### 依赖 ``` <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.10</version> </dependency> ``` ### Topic管理 创建KafkaAdminClient ``` Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(props); ``` 创建Topic ``` CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(new NewTopic("topic02", 3, (short) 3))); ``` 创建一个Topic,名称是topic02,3个分区,副本因子3。该方法是异步执行的,若要同步等待结果,执行: ``` createResult.all().get(); ``` 查看Topic列表 ``` ListTopicsResult topicsResult = adminClient.listTopics(); Set<String> names = topicsResult.names().get(); for (String name : names) { System.out.println(name); } ``` 删除Topic ``` DeleteTopicsResult deleteResult = adminClient.deleteTopics(Arrays.asList("topic02", "topic03")); deleteResult.all().get(); ``` 查看Topic详细信息 ``` DescribeTopicsResult dtr = adminClient.describeTopics(Collections.singleton("topic01")); Map<String, TopicDescription> topicDescriptionMap = dtr.all().get(); for (Map.Entry<String, TopicDescription> entru : topicDescriptionMap.entrySet()) { System.out.println(entru.getKey() + "\t" + entru.getValue()); } ``` ### 生产消费实现 生产者 ``` public static void main(String[] args) { // 创建KafkaProducer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i); // 发送消息给服务器 producer.send(record); } // 关闭生产者 producer.close(); } ``` 消费者 ``` public static void main(String[] args) { // 创建KafkaProducer Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅相关的Topic consumer.subscribe(Pattern.compile("^topic.*")); // 遍历消息队列 while (true) { // 每隔1s去抓取一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { // 从队列中取到了数据 for (ConsumerRecord<String, String> record : records) { // 消息属于哪个Topic String topic = record.topic(); // 消息属于哪个Partition int partition = record.partition(); // 分区的消费偏移量 long offset = record.offset(); // 消息的key String key = record.key(); // 消息的value String value = record.value(); // 消息发送的时间 long timestamp = record.timestamp(); System.out.println(topic + "\t" + partition + "\t" + offset + "\t" + key + "\t" + value + "\t" + timestamp); } } } } ``` Kafka默认消费策略是Partition均分给一个组内的消费者,如果消费者数量多于分区数量,那么多出来的消费者会在正在消费的消费者宕机之后顶上来。Kakfa也提供了手动指定Topic分区的方式,但是会失去组管理的特性。 ``` Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("topic01", 0)); consumer.assign(partitions); ``` 将`subscribe()`方法替换为`assign()`方法,传入一个TopicPartition列表,上面代码表示,消费Topic01的第0个分区。Kafka由offset决定消费位置,消费端可以手动指定offset来自定义从哪里开始消费: ``` consumer.seekToBeginning(partitions); // 从头开始消费 ``` ``` consumer.seek(new TopicPartition("topic01", 0), 1); // 从第1个位置消费 ``` ### 生产者分区策略 生产者可以指定partitioner.class来指定分区策略,默认是DefaultPartitioner,该策略在设置了key得到时候,执行key的hash负载,分配到不同的分区。没有设置key的时候,采用round-ribbon,轮询策略,按顺序选择分区。也可以手动实现分区策略。 ``` props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UserDefinePartitioner.class.getName()); ``` UserDefinePartitioner实现 ``` public class UserDefinePartitioner implements Partitioner { private AtomicInteger counter = new AtomicInteger(); /** * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return 分区号 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取所有Topic的所有分区 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int numPartitions = partitionInfos.size(); if (keyBytes == null) { int andIncrement = counter.getAndIncrement(); // andIncrement & Integer.MAX_VALUE 因为Integer.MAX_VALUE首位是0 如果andIncrement是负数 将被转成正数 return (andIncrement & Integer.MAX_VALUE) % numPartitions; } else { int hash = Arrays.hashCode(keyBytes); hash = (hash ^ (hash >>> 16)) & Integer.MAX_VALUE; return hash % numPartitions; } } @Override public void close() { System.out.println("close"); } @Override public void configure(Map<String, ?> map) { System.out.println("configure"); } } ``` ### Kafka序列化 Kafka数据传输的是字节,所以发送端需要序列化,消费端需要反序列化,在上面的列子中,使用的是`StringSerializer`和`StringDeserializer`。Kafka给我们提供了基本类型的序列化与反序列化,例如:`IntegerSerializer`和`IntegerDeserializer`提供可int类型的序列化与反序列化,在我们传送对象的时候,也可以自己实现序列化与反序列化。 序列化实现 ``` public class UserDefineSerializer implements Serializer<Object> { @Override public byte[] serialize(String topic, Object data) { return SerializationUtils.serialize((Serializable) data); } @Override public void configure(Map<String, ?> configs, boolean isKey) { System.out.println("configure"); } @Override public void close() { System.out.println("close"); } } ``` 反序列化实现 ``` public class UserDefineDeserializer implements Deserializer<Object> { @Override public Object deserialize(String topic, byte[] data) { return SerializationUtils.deserialize(data); } @Override public void configure(Map<String, ?> configs, boolean isKey) { System.out.println("configure"); } @Override public void close() { System.out.println("close"); } } ``` ### 拦截器 Producer可以通过interceptor.class设置拦截器,可以在拦截器中控制每条消息的生命周期,在发送之前和之后添加逻辑。 添加拦截器 ``` props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, UserDefineProducerInterceptor.class.getName()); ``` UserDefineProducerInterceptor实现 ``` public class UserDefineProducerInterceptor implements ProducerInterceptor<String, Object> { /** * 发送之前 * @param record * @return */ @Override public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) { return new ProducerRecord<>(record.topic(),record.key(),record.value()+"--Syaro"); } /** * 发送成功ack * * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("metadata: " + metadata + ", exception: " + exception); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } } ``` ## 高级API ### offset Kafka消费者默认对于未订阅的Topic,也就是系统并没有存储该消费者的消费分区的记录信息的时候,默认首次的消费策略是:latest。通过`auto.offset.reset`来配置。 - earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 - latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 - none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常 ``` props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); ``` Kafka消费者在消费数据的时候,默认会定期的提交消费偏移量,这样就可以保证所有消息至少可以被消费者消费1次,用户可以通过以下两个参数配置: - 是否开启自动提交,默认: enable.auto.commit=true - 多少毫秒自动提交一次,默认: auto.commit.intervar.ms=5000 ``` props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); ``` 如果用户需要自己管理offset的提交,可以关闭自动提交。关闭之后`auto.commit.intervar.ms`配置将会失效,因为没有自动提交,自然就没有多少毫秒后提交了。所有消费的偏移量都不会提交,也就是说,消费记录不会保存,下一次来消费,依然会消费到这些数据,因此需要手动提交偏移量。注意,用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是Kafka消费者下一次抓取数据得到位置。 ``` while (true) { // 每隔1s去抓取一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { // 从队列中取到了数据 // 记录分区的消费元数据信息 Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { // 消息属于哪个Topic String topic = record.topic(); // 消息属于哪个Partition int partition = record.partition(); // 分区的消费偏移量 long offset = record.offset(); // 消息的key String key = record.key(); // 消息的value String value = record.value(); // 消息发送的时间 long timestamp = record.timestamp(); // 记录消费分区的偏移量数据,一定在提交的时候偏移量信息offset+1 offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset + 1)); // 提交消费者偏移量 consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { System.out.println("offsets: " + offsets + "\texception: " + exception); } }); System.out.println(topic + "\t" + partition + "\t" + offset + "\t" + key + "\t" + value + "\t" + timestamp); } } } ``` ### ACK和Retries Kafka生产者在发送完一个消息后,要求Broker在规定的时间内ACK应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。默认acks=1。 - acks=1: Leader会将Record写到其本地日志中,但不会等待Follower确认就做出响应。在这种情况下,如果Leader在收到记录,但Follower还没复制之前,Leader宕机了,记录将会丢失。 - acks=0: 生产者根本不会等待服务器的任何确认。该记录将立即添加到socket缓冲区中,并视为已发送。在这种情况下,不能保证服务器已收到记录。 - acks=all: 这意味着Leader将等待所有副本确认记录。这保证了只要有一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。等效于acks=-1。 ``` props.put(ProducerConfig.ACKS_CONFIG, "all"); ``` 如果生产者在规定时间内,并没有得到Kafka的Leader的Ack应答,可以开启retries机制。 - request.timeout.ms = 30000 默认 - retries = 2147483647 默认 `request.timeout.ms`表示多长时间超时,`retries`表示超时重试的次数,不包含第一次发送。 ``` props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1); props.put(ProducerConfig.RETRIES_CONFIG, 3); ``` 注意,超时重试可能Broker并没有宕机,它已经将数据记录日志,但是由于网络原因,它没来得及给Producer应答,造成Producer重新发送数据,Broker就会保留多个相同的数据,造成重复消费。所以需要做**幂等**处理。 ### 幂等 HTTP/1.1中对幂等性的定义是: 一次和多次请求某一个资源,对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。 Kafka在0.11.0.0版本增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证生产者发送的消息不会丢失,也不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复需要两点: 1. **唯一标识**: 要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中的订单号。 2. **记录下已处理过的请求标识**: 光有唯一标识还不够,还需要记录下哪些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝处理。 幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Topic中一次。在初始期间,Kafka会给生产者生成一个唯一的ID称为Producer ID或PID。 - PID: 每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。 - Sequence Numbler: 对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。 Broker端在缓存中保存了seq number,对于接收的每条消息,如果其序号比Broker缓存中序号正好大1则接受它,否则将其丢弃。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。 - enable.idempotence=false 默认 注意,在使用幂等性的时候,要求必须设置retries大于等于1,和acks=all。 ``` props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); // 如果发送不成功就阻塞 默认5 开启幂等性必须小于等于5 设置为1 将严格保证发送顺序 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); ``` ### 事务控制 Kafka的幂等性,只能保证一条记录在一个分区的原子性。但是如果要保证多条记录(多分区)之间的完整性,这个时候就需要开启Kafka的事务操作了。 在Kafka0.11.0.0除了引入幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为**生产者事务Only**和**消费者&生产者事务**。一般来说默认消费者消费消息的隔离级别是read_uncommitted,这有可能读取到事务失败的数据,所以在开启生产者事务之后,需要用户设置消费者的事务隔离级别。 - isolation.level=read_uncommitted 默认 该选项有两个值read_committed和read_uncommitted,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed。 开启生产者事务的时候,只需指定transaction.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求transaction.id的取值必须是唯一的,同一时刻只能有一个transaction.id存在,其它的将会被关闭。 #### 生产者事务Only 生产者 ``` public class KafkaProducerTransactionProducerOnly { public static void main(String[] args) { KafkaProducer<String, String> producer = buildKafkaProducer(); //初始化事务 producer.initTransactions(); try { // 开启事务 producer.beginTransaction(); for (int i = 0; i < 10; i++) { // if (i == 8) { // int j = 10 / 0; // } ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "transaction" + i, "rightdata" + i); producer.send(record); producer.flush(); } producer.commitTransaction(); } catch (Exception e) { System.out.println("出现错误: " + e.getMessage()); // 终止事务 producer.abortTransaction(); }finally { // 关闭生产者 producer.close(); } } public static KafkaProducer<String, String> buildKafkaProducer() { // 创建KafkaProducer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 必须配置事务ID 必须是唯一的 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id" + UUID.randomUUID().toString()); // 配置Kafka批处理大小 默认16384 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024); // 等待5ms 如果batch中的数据不足 1024 props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 配置Kafka重试机制和幂等性 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200000); return new KafkaProducer<>(props); } } ``` 消费者添加: ``` props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); ``` #### 消费者&生产者事务 比如这样一个场景,生产者向TopicA中发送数据,消费者收到数据,处理之后放入TopicB,然后有另一个消费者去TopicB中消费数据,也就是第一个消费者相当于一个中转站。当生产者向中转站发送失败时,要求中转站与消费者都不能消费到数据。当中转站写入TopicB失败时,要求生产者也回滚。 生产者 ``` public class KafkaProducerTransactionProducerOnly { public static void main(String[] args) { KafkaProducer<String, String> producer = buildKafkaProducer(); //初始化事务 producer.initTransactions(); try { // 开启事务 producer.beginTransaction(); for (int i = 0; i < 10; i++) { // if (i == 8) { // int j = 10 / 0; // } ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "transaction" + i, "rightdata" + i); producer.send(record); producer.flush(); } producer.commitTransaction(); } catch (Exception e) { System.out.println("出现错误: " + e.getMessage()); // 终止事务 producer.abortTransaction(); }finally { // 关闭生产者 producer.close(); } } public static KafkaProducer<String, String> buildKafkaProducer() { // 创建KafkaProducer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 必须配置事务ID 必须是唯一的 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id" + UUID.randomUUID().toString()); // 配置Kafka批处理大小 默认16384 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024); // 等待5ms 如果batch中的数据不足 1024 props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 配置Kafka重试机制和幂等性 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200000); return new KafkaProducer<>(props); } } ``` 中转站 ``` public class KafkaProducerTransactionProducerAndConsumer { public static void main(String[] args) { KafkaProducer<String, String> producer = buildKafkaProducer(); KafkaConsumer<String, String> consumer = buildKafkaConsumer("g1"); //初始化事务 producer.initTransactions(); consumer.subscribe(Collections.singleton("topic01")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); // 开启事务 producer.beginTransaction(); try { // 迭代数据 进行业务处理 for (ConsumerRecord<String, String> record : records) { // 存储元数据 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); ProducerRecord<String, String> pRecord = new ProducerRecord<>("topic02", record.key(), record.value() + " Syaro"); producer.send(pRecord); } // 提交事务 producer.sendOffsetsToTransaction(offsets,"g1");//提交消费者偏移量 producer.commitTransaction(); } catch (Exception e) { System.err.println("错误了: " + e.getMessage()); // 终止事务 producer.abortTransaction(); } } } } public static KafkaProducer<String, String> buildKafkaProducer() { // 创建KafkaProducer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 必须配置事务ID 必须是唯一的 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id" + UUID.randomUUID().toString()); // 配置Kafka批处理大小 默认16384 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024); // 等待5ms 如果batch中的数据不足 1024 props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 配置Kafka重试机制和幂等性 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200000); return new KafkaProducer<>(props); } public static KafkaConsumer<String, String> buildKafkaConsumer(String groupId) { // 创建KafkaProducer Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01:9092,kafka02:9092,kafka03:9092"); // key反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 设置消费者的消费事务隔离级别 read_committed 默认 read_uncommitted props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 必须关闭消费者端的 offset自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new KafkaConsumer<>(props); } } ``` 中转站关闭自动偏移量提交,由中转站写入TopicB成功后,提交偏移量,这样如果失败了,下一次依然会消费这些数据,相当于对生产者发送的数据进行了回滚。 消费者中设置: ``` props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); ``` 这样一套完整的消费者&生产者事务就实现了,任何环节异常,都会导致全局事务回滚。 Last modification:July 10th, 2020 at 01:14 am © 允许规范转载
想想你的文章写的特别好https://www.237fa.com/