Kafka API使用

本节演示如何通过Java代码创建Kafka生产者、消费者和拦截器,依赖使用Kafka-clients。因为我安装的Kafka版本为2.4.1,所以依赖的版本也用2.4.1,避免兼容问题。新增一个maven项目,引入:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

生产者API

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

简单发送数据

创建ProducerDemo类,编写一个简单的生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ProducerDemo {

public static void main(String[] args) {
// 1. 生产者配置
Properties properties = new Properties();
// 指定kafka地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 指定ack等级
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 指定重试次数,即生产者发送数据后没有收到ack应答时的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 指定批次大小 16k = 16 * 1024
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 指定等待时间,单位毫秒
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 指定RecordAccumulator缓冲区大小 32m = 32 * 1024 * 1024
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 指定k-v序列化规则
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. 准备数据
ProducerRecord<String, String> record = new ProducerRecord<>("test", "mesage");
// 4. 发送数据(不带回调)
producer.send(record);
// 5. 关闭连接
producer.close();
}
}

无论是创建生产者还是消费者,我们都要指定配置。生产者配置可以使用常量类ProducerConfig指定,里面包含了生产者所有可用配置,并且包含对应的解释:

QQ20200402-103628@2x

上面代码创建了一个kv都是String类型的生产者。配置了Kafka的地址,ack等级为all(在Kafka生产者一节中介绍过了,不赘述),重试次数为3。并且指定了生产者发送数据的批次为16k,等待时间为1ms。也就是说,要么发送的数据量达到了16k,要么等待时间超过了1ms才会真正地把数据发往Kafka。

此外,我们还指定RecordAccumulator缓冲区大小,kv序列化规则采用StringSerializer

Kafka消息使用KafkaProducer对象表示,他有6个重载构造器,这在Kafka生产者介绍过了,这里我们指定了主题为test,消息内容为message。

运行这段代码之前,我们先启动zk和kafka:

1
2
3
4
5
6
# 启动zk
sh bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka broker
sh bin/kafka-server-start.sh config/server.properties
# 启动kafka消费者
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

然后运行上面的程序,观察消费者输出:

QQ20200402-110459@2x

带回调函数

KafkaProducersend方法可以制定回调函数。回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

改造上面的send方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
// 3. 准备数据
ProducerRecord<String, String> record = new ProducerRecord<>("test", "callback-test");
// 4. 发送数据(带回调)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
// 回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
String result = String.format("消息发送成功,主题Topic: %s,分区Partition: %s,偏移量Offset: %s",
metadata.topic(), metadata.partition(), metadata.offset());
System.out.println(result);
} else {
System.out.println("消息发送失败");
exception.printStackTrace();
}

});
...

运行该方法,控制台输出如下:

1
消息发送成功,主题Topic: test,分区Partition: 0,偏移量Offset: 4

消费者:

QQ20200402-111752@2x

同步发送

如前面所说,Kafka 的 Producer 发送消息采用的是异步发送的方式,KafkaProducersend方法返回Future对象,所以我们可以手动调用Future对象的get方法实现同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 3. 准备数据
ProducerRecord<String, String> record = new ProducerRecord<>("test", "test-sync");
// 4. 发送数据(带回调)
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
if (exception == null) {
// 回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
String result = String.format("消息发送成功,主题Topic: %s,分区Partition: %s,偏移量Offset: %s",
metadata.topic(), metadata.partition(), metadata.offset());
System.out.println(result);
} else {
System.out.println("消息发送失败");
exception.printStackTrace();
}

});
future.get();

自定义分区器

自定义分区器只需要要实现Partitioner接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

partition方法中,我们可以根据主题topic,key值,序列化后的key值,value值,序列化后的value值,Kafka集群cluster信息,来个性化分区规则。

要使用自定义分区器,可通过配置类指定:

1
2
// 指定自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

消费者API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

自动提交offset

创建一个消费者,自动提交offset,演示消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ConsumerDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 消费者配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交offset的时间,单位ms,即1秒钟提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 指定k-v反序列化规则
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 指定消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// 2. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("test"));
while (true){
// 拉取数据,指定轮询时间为1秒
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.toString());
}
TimeUnit.SECONDS.sleep(1);
}
}
}

启动消费者,然后再使用生产者发送mrbird消息到test主题,消费者控制台输出如下:

1
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1585811850841, serialized key size = -1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = mrbird)

手动提交offset

虽然自动提交 offset 十分便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交 offset 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ConsumerDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 消费者配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 关闭自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 指定k-v反序列化规则
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 指定消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// 2. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("test"));
while (true) {
// 拉取数据,指定轮询时间为1秒
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.toString());
}
// 同步提交
consumer.commitSync();
TimeUnit.SECONDS.sleep(1);
}
}
}

commitSync()也可以指定超时时间。

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。更多的情况下,会选用异步提交 offset 的方式。

以下为异步提交 offset 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ConsumerDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 消费者配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 关闭自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 提交offset的时间,单位ms,即1秒钟提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 指定k-v反序列化规则
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 指定消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// 2. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("test"));
while (true){
// 拉取数据,指定轮询时间为1秒
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.toString());
}
// 异步提交
consumer.commitAsync();
TimeUnit.SECONDS.sleep(1);
}
}
}

commitAsync()可以指定回调:

1
2
3
4
5
6
7
8
consumer.commitAsync((offsets, exception) -> {
if (exception == null) {
System.out.println("提交offset: " + offsets + "成功");
} else {
System.out.println("提交失败");
exception.printStackTrace();
}
});

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

拦截器

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按顺序形成一个拦截器链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

QQ20200402-154559@2x

  • configure(configs):获取配置信息和初始化数据时调用;
  • onSend(ProducerRecord):在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好不要修改消息所属的 topic 和分区,否则会影响目标分区的计算;

  • onAcknowledgement(RecordMetadata, Exception):在返回ack时,或者发送失败时调用该方法。

  • close:关闭 interceptor,主要用于执行一些资源清理工作。

下面举个自定义拦截器的例子:定义一个拦截器链,包含两个拦截器MessageFormatInterceptorCountInterceptor,分别用于消息加工和统计消息发送成功和失败的笔数。

定义MessageFormatInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MessageFormatInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + " - " + record.value());
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

定义CountInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CountInterceptor implements ProducerInterceptor<String, String> {

private int errorCounter = 0;
private int successCounter = 0;

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}

@Override
public void close() {
System.out.println("成功笔数: " + successCounter);
System.out.println("失败笔数: " + errorCounter);
}

@Override
public void configure(Map<String, ?> configs) {

}
}

其实这两个功能完全可以由一个过滤器来完成,这里仅仅是为了演示过滤器链。

在生产者配置中指定这个过滤器链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ProducerDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

// 1. 生产者配置
Properties properties = new Properties();
// 指定kafka地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 指定ack等级
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 指定重试次数,即生产者发送数据后没有收到ack应答时的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 指定批次大小 16k = 16 * 1024
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 指定等待时间,单位毫秒
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 指定RecordAccumulator缓冲区大小 32m = 32 * 1024 * 1024
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 指定k-v序列化规则
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 指定过滤器链
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("cc.mrbird.kafka.MessageFormatInterceptor", "cc.mrbird.kafka.CountInterceptor"));

// 2. 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. 准备数据
ProducerRecord<String, String> record = new ProducerRecord<>("test", "kafka");
// 4. 发送数据
producer.send(record);
// 5. 关闭连接
producer.close();
}
}

运行程序,控制台输出:

1
2
成功笔数: 1
失败笔数: 0

消费者:

QQ20200402-160619@2x

「尚硅谷大数据技术之 Kafka」 学习笔记

请作者喝瓶肥宅水🥤

0