学习 Kafka(四):生产者

依赖

新建一个 Maven 工程,修改 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>  

Producer

创建一个 Producer 对象:

Properties properties = new Properties();  
        properties.put("bootstrap.servers", "broker0:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

配置说明:

bootstrap.servers Kafka Broker 列表,生产环境上,至少配置两个 Kafka Broker

key.serializer Key 的序列化类

value.serializer Value 的序列化类

Kafka 客户端提供了 Integer、String 等常用的序列化类,在 org.apache.kafka.common.serialization 包下

Kafka 客户端支持自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer<T> 接口即可

序列化器的作用是将对象序列化为字节数据进行传输

👆为必配项,其它配置项参考:生产者配置列表

同步/异步发送

ProducerRecord 封装了一条记录,包含了记录主要的四个属性:

  • topic(必须)主题
  • partition 发送到主题的哪个分区
  • key
  • value(必须)

关于 Kafka 分区,希望可以在接下来的博文中详细介绍

同步发送:

try {  
    RecordMetadata metadata = producer.send(record).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

异步发送:

producer.send(record, (metadata, e) -> {  
    //TODO
});

问题

异常信息:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxx: xxx ms has passed since batch creation plus linger time  

原因:Producer 无法和 Kafka Broker 创建连接,连接超时

检查 Kafka Broker conf/server.properties 配置文件的 listernersadvertised.listeners 配置

listeners 是 Kafka Broker 服务监听的地址,修改为:PLAINTEXT://0.0.0.0:9092

advertised.listeners 是 Producer 和 Consumer 连接的主机和端口地址,修改为:PLAINTEXT://<主机名>:9092

客户端机器的 hosts 文件添加主机名的 IP 映射,并重启 Kafka Broker