学习 Kafka(五):消费者

依赖

生产者和消费者使用相同的客户端 JAR,修改 pom.xml 文件添加依赖:

<dependency>  
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>  

Consumer

创建一个 Consumer 对象:

Properties properties = new Properties();  
properties.put("bootstrap.servers", "192.168.1.106:9092");  
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("group.id", "tiger");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  

配置说明:

bootstrap.servers Kafka Broker 列表

key.deserializer Key 的反序列化类,与生产者 key.serializer 保持匹配

value.deserializer Value 的反序列化类,与生产者 value.serializer 保持匹配

group.id 消费组标识

相同消费组内的消费者,将会从主题(topic)不同分区(partition)接收消息

利用分组的特点,可以实现传统消息队列的“队列”模型(多个消费者在同一个消费组)和“发布订阅”模型(多个消费者在不同的消费组)

消费者配置列表

订阅主题

consumer.subscribe(Collections.singletonList("test"));

Thread mainThread = Thread.currentThread();  
Runtime.getRuntime().addShutdownHook(new Thread(() -> {  
    consumer.wakeup();
    try {
        mainThread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));

try {  
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(500);

        for (ConsumerRecord record: records) {
            System.out.println("consumer fetch");
            System.out.println(record.value());
        }
    }
}
catch (WakeupException ignore) {}  
finally {  
    consumer.close();
}

使用 consumer.subscribe 以列表的方式或正则表达式方式,订阅多个主题

使用 consumer.poll 获取记录

ConsumerRecords 是封装了消费者 poll 获取的多条记录,并实现了 java.lang.Iterable 接口,可以 for 循环进行遍历

consumer.wakeup 是一个线程安全的方法,会让消费者在 poll 消息过程中产生一个 WakeupException 异常,从而优雅地中断 while(true) 循环