producer详解及源码解析
1、Producer消息发送的基本步骤
- (0) 创建 KafkaProducer 对象,此对象接收 Properties 类型的参数,我们配置了 bootstrap.servers、key.serializer、value.serializer 三个参数。
- (1) 接着创建了一个 ProducerRecord 对象,创建此对象时,我们传入了 topic、消息的 key 和消息的 value 等参数。然后我们使用(0)步骤中创建的 KafkaProducer 对象的 send 方法将消息发送了出去。上面两个步骤是我们开发的 Producer 程序所完成的工作。消息发送出去之后,接下来发生了什么呢?
- (2) 消息发送出去之后,首先,KafkaProducer 对象会对消息的 key 和 value 进行序列化,序列化后的数据才可以通过网络传输。使用的序列化类就是我们配置的 key.serializer 和 value.serializer 两个参数的值所指向的类。
- (3) 接着,消息会发送到 partitioner,partitioner 负责将消息发送到 topic 的某个 partition。如果我们在创建 ProducerRecord 对象时声明了分区,那么 partitioner 会直接返回声明的分区。如果没有声明分区,partitioner 会选一个分区,通常会基于消息的 key 值做分区选择。一旦分区选定,producer 就知道了:消息要发送到哪个 topic 的哪个分区。
- (4) 接着,producer 会把发送到相同 topic,相同 partition 的消息进行打包,形成 Batch ,后续消息,如果有相同的 topic 和 partition,都会添加到相应的 Batch 中。producer 会启动一个独立的线程,将这些打包的消息批量发送到对应的 kafka broker。
- (5) 当 broker 收到消息,会发回一个响应。
- (6) 如果消息成功写入 Kafka,会响应一个 RecordMetadata 对象到 Java Producer 程序,其中包括 topic、partition 和 offset 等信息。如果消息没有成功写 入 Kafka,将会响应一个错误 error。
- (7) 当 producer 收到一个错误响应,producer 会尝试重发消息,当尝试次数达到配置的值时,仍未发送成功,此时会返回一个错误到 Java Producer 程序。
2、Properties参数详解
- bootstrap.servers
1 | props.put("bootstrap.servers", "132.232.14.247:9092"); |
- key.deserializer
1 | props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); |
- value.deserializer
1 | props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); |
- KafkaProducer
1 | // 创建一个kafkaProdecer对象,传入上面创建的Properties对象 |
3、源码解析
- ProducerRecord构造方法
1 | # ProducerRecord提供了四个构造方法,我们可以指定 |
- send方法–一路进入dosend方法
- 序列化key和value
- 序列化key和value
- 构建partition并且添加到batch中
此时使用的分区器是默认的分区器
- 发送消息到kafka