Kafka 入门教程之二

Kafka 生产者

生产者消息发送流程

生产者消息发送原理

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 Sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,Sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

生产者重要参数列表

生产者异步发送 API

普通的异步发送

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的异步发送

回调方法会在 Producer 收到 ack 时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exceptionnull,则说明消息发送成功,如果 Exception 不为 null,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。

  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息(带回调函数)
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0

生产者同步发送 API

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02

普通的同步发送

同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack 应答。由于 send() 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,只需调用 Future 对象的 get() 方法即可实现同步发送。

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i)).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息(带回调函数)
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 2
topic: test, partition: 0
topic: test, partition: 1
topic: test, partition: 2

生产者分区

生产者分区分区的优点

  • 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据
  • 便于合理使用存储资源,每个 Partition 在一个 Broker 上存储,可以把海量的数据按照分区切割成一块一块的数据并存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果

生产者发送消息的分区策略

默认的分区器类是 DefaultPartitioner,部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {

......

}

通过 KafkaProducer 类的 send() 方法发送消息时,需要指定 ProducerRecord 对象作为参数,ProducerRecord 类的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProducerRecord<K, V> {

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
......
}

public ProducerRecord(String topic, K key, V value) {
......
}

public ProducerRecord(String topic, V value) {
......
}

调用 ProducerRecord 类不同的构造方法时,有以下几种分区策略:

  • 在指明 partition 的情况下,直接将指明的值作为 partition 值。例如:partition=0,那么数据会被写入分区 0。

  • 在没有指明 partition 值,但有指定 key 的情况下,将 key 的 Hash 值与 topicpartition 数进行取余来得到 partition 值。例如:key 的 Hash 值是 5,topicpartition 数是 2,那么 key 对应的 value 会被写入 1 号分区。

  • 在既没有指明 partition 值,又没有指定 key 的情况下,Kafka 会采用 Sticky Partition 黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms 设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。

自定义生产者的分区器

开发人员可以根据业务需求自定义分区器,只需要实现 Partitioner 接口即可。

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-03

  • 自定义分区器类,实现 Partitioner 接口,并重写 partition() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {

/**
* 返回消息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息内容
String msgValue = value.toString();

// 定义分区号
int partition;

if (msgValue.contains("order")) {
partition = 0;
} else {
partition = 1;
}
// 返回分区号
return partition;
}

/**
* 关闭资源
*/
@Override
public void close() {

}

/**
* 配置信息
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

}

}
  • 在生产者的配置中添加分区器参数,以此来指定自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 异步发送
*/
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定自定义分区器
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Partition : " + metadata.partition());
}
});
}
// 关闭资源
producer.close();
}

}

生产者最佳实践

生产者如何提高吞吐量

参数优化

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

  • batch.size:批次大小,默认 16k
  • linger.ms:等待时间,默认 0ms,修改为 5-100ms
  • compression.type:压缩方式,默认是 none,修改过为 snappy
  • RecordAccumulator:缓冲区(双端队列)大小,默认是 32m,修改为 64m
参数说明

示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-04

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 等待时间(默认0ms)
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 批次大小(默认16K)
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
// 压缩方式(默认none)
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 缓冲区大小(默认32M)
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}