本文主要介绍一下Kafka new Consumer的使用,关于new Consumer的基本概念可以参考上一篇博文Apache Kafka 0.9 Consumer Client 介绍【译】,这篇对于Kafka的new Consumer介绍得比较清楚。本文的一部分内容也来自上一篇文章。

Consumer Client

本节主要介绍Kafka从一些topic消费数据的示例。

配置

使用新版的Consumer,需要先在工程中添加kafka-clients依赖,添加的配置信息如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

初始化与配置

Consumer的创建过程与之前旧的API创建方法一样,一个Consumer必备的最小配置项如下所示:

1
2
3
4
5
6
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // 通过其中的一台broker来找到group的coordinator,并不需要列出所有的broker
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // consumer实例

Consumer的其他配置项可以参考New Consumer Configs,除了上面的这几个配置之外,其他的几个比较常用的配置信息如下表所示

参数 默认值 说明
heartbeat.interval.ms 3000 当使用Kafka的group管理机制时,consumer向coordinator发送心跳的间隔,这个值要比session.timeout.ms小,最好不要超过session.timeout.ms的\frac{1}{3}
session.timeout.ms 30000 当使用Kafka的group管理机制时用于检测到consumer失败的时长,如果在这个时间内没有收到consumer的心跳信息,就认为Consumer失败了
auto.offset.reset latest group首次开始消费数据时的offset,有以下几个值可以选择:earliest、latest、none、anything else.
enable.auto.commit true 设置为true时,Consumer的offset将会被周期性地自动commit
auto.commit.interval.ms 5000 Consumer的offset自动commit时的周期

Consumer Auto Offset Commit

本例使用Kafka的自动commit机制,每隔一段时间(可通过auto.commit.interval.ms来设置)就会自动进行commit offset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true"); // 自动commit
props.put("auto.commit.interval.ms", "1000"); // 自动commit的间隔
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2")); // 可消费多个topic,组成一个list
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

这里有几点需要注意:

  1. 在使用自动commit时,系统是保证at least once,因为offset是在这些messages被应用处理成功后才进行commit的;
  2. subscribe方法需要传入所有topic的列表,一个group所消费的topic是不能动态增加的,但是可以在任何时间改变这个列表,它会把前面的设置覆盖掉;
  3. poll中的参数就是设置一个时长,Consumer在进行拉取数据进行block的最大时间限制;

Consumer Manual Offset Control

要进行手动commit,需要在配置文件中将enable.auto.commit设置为false,来禁止自动commit,本例以手动同步commit为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //关闭自动commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2"));
final int minBatchSize = 10;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
int i = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
i++;
}
if (i >= minBatchSize) {
consumer.commitSync(); //批量完成写入后,手工同步commit offset
}
}
  1. 在本例中,我们调用了commitSync方法,这是同步commit的方式,同时Kafka还提供了commitAsync方法,它们的区别是:使用同步提交时,consumer会进行block知道commit的结果返回,这样的话如果commit失败就可以今早地发现错误,而当使用异步commit时,commit的结果还未返回,Consumer就会开始拉取下一批的数据,但是使用异步commit可以系统的吞吐量,具体使用哪种方式需要开发者自己权衡;
  2. 本例中的实现依然是保证at least once,但是如果每次拉取到数据之后,就进行commit,最后再处理数据,就可以保证at last once。

Consumer Manual Partition Assign

Kafka在进行消费数据时,可以指定消费某个topic的某个partition,这种使用情况比较特殊,并不需要coordinator进行rebalance,也就意味着这种模式虽然需要设置group id,但是它跟前面的group的机制并不一样,它与旧的Consumer中的Simple Consumer相似,这是Kafka在新的Consumer API中对这种情况的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //关闭自动commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
TopicPartition partition0 = new TopicPartition("test", 0);
TopicPartition partition1 = new TopicPartition("test", 2);
consumer.assign(Arrays.asList(partition0, partition1));

final int minBatchSize = 10;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
int i = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
i++;
}
if (i >= minBatchSize) {
consumer.commitSync(); //批量完成写入后,手工sync offset
}
}

注意:

  1. 与前面的subscribe方法一样,在调用assign方法时,需要传入这个Consumer要消费的所有TopicPartition的列表;
  2. 不管对于simple consumer还是consumer group,所有offset的commit都必须经过group coordinator;
  3. 在进行commit时,必须设置一个合适的group.id,避免与其他的group产生冲突。如果一个simple consumer试图使用一个与一个active group相同的id进行commit offset,coordinator将会拒绝这个commit请求,会返回一个CommitFailedException异常,但是,如果一个simple consumer与另一个simple consumer使用同一个id,系统就不会报任何错误。

KafkaStream使用

KafkaStream是在Kafka 0.10.0版中新提出的内容,Kafka官方也说了设计这个feature的原因——为了简单,之前在流处理方面,一般情况下都会使用Kafka作为消息队列,然后再搭建一个流处理环境做流处理,而现在我们可以直接在Kafka中进行流处理,不需要再搭建另外一个环境(加了这个feature之后会使得Kafka变得更加复杂,不过官网说,在使用时我们只需要在工程中添加一个外部依赖包即可使用这个功能)。

配置

需要在pom文件中添加如下依赖,KafkaStream在实际运行时也是依赖这个外部的jar包运行。

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>

初始化与配置

KafkaStream使用的一个基本初始化部分如下所示(代码来自Javadoc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

完整的配置选项如下表所示,也可以参考Streams Configs

名称 描述 类型 默认值
application.id 流处理应用的标识,对同一个应用需要一致,因为它是作为消费的group_id的 string
bootstrap.servers host1:port1,host2:port2 这样的列表,是用来发现所有Kafka节点的种子,因此不需要配上所有的Kafka节点 list
client.id 应用的一个客户端的逻辑名称,设定后可以区分是哪个客户端在请求 string “”
zookeeper.connect zookeeper string “”
key.serde 键的序列化/反序列化类 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
partition.grouper 用于分区组织的类,需要实现PartitionGrouper接口 class org.apache.kafka.streams.processor.DefaultPartitionGrouper
replication.factor 流处理应用会创建change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数 int 1
state.dir 状态仓库的存储路径 string /tmp/kafka-streams
timestamp.extractor 时间戳抽取类,需要实现TimestampExtractor接口 class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
value.serde 值的序列化/反序列化类 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
buffered.records.per.partition 每个分区缓存的最大记录数 int 1000
commit.interval.ms 存储处理器当前位置的间隔毫秒数 long 30000
metric.reporters 用于性能报告的类列表。需要实现MetricReporter接口。JmxReporter会永远开启不需要指定 list []
metric.num.samples 计算性能需要的采样数 int 2
metric.sample.window.ms 性能采样的时间间隔 long 30000
num.standby.replicas 每个任务的后备副本数 int 0
num.stream.threads 执行流处理的线程数 int 1
poll.ms 等待输入的毫秒数 long 100
state.cleanup.delay.ms 一个分区迁移后,在删除状态前等待的毫秒数 long 60000

小示例

这是个将一个topic的事件进行过滤的示例,处理很简单,下面给出了这个例子的完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Predicate;

import java.util.Properties;

/**
* Created by matt on 16/7/22.
*/
public class EventFilter {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.232.70:9091,10.4.232.77:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> source = builder.stream("test");

source.filter(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
return (value.split(",")[3]).equals("food");
}
}).to("food");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);

streams.close();
}
}

本文只是介绍这两个重要feature的使用方法,而KafkaStream并没有深入去讨论,后面会对本文再进行更新,并且还会增加Producer和Consumer使用安全机制的方法。