本文主要是介绍一下kafka基于Producer API的程序设计,使用的kafka版本为2.10-0.8.1.1

1.前言

在写完Kafka集群的安装与配置Kafka的简单介绍这两篇博客之后,从本文开始准备介绍一下Kafka的程序设计部分,大概会分为三篇介绍,第一篇是基于Kafka Producer API的程序设计,也就是本文,第二篇是基于Kafka High Level Consumer API的程序设计,第三篇是基于Kafka Simple Consumer API的程序设计。本文主要是根据kafka提供的官方文档来介绍,希望能给刚接触kafka程序设计的初学者提供一些帮助。

我们知道,kafka的基本架构其实非常简单,但kafka作为管道传输为了保证其强大的功能与稳定的性能,kafka在内部实现上是做了非常多的努力的,这些我会在后续的文章中慢慢讲解。然而对大部分人而言重要的就是如何进行程序设计来实现所需的功能,kafka给我们提供了丰富而简介的API接口,本文的例子是通过Kafka Examples中的例子来讲解如何使用这些API接口来进行程序设计。本文中使用的kafka的版本为kafka_2.10-0.8.1.1,并且使用maven建立工程,需要在pom.xml文件加入如下的依赖包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
</exclusions>
</dependency>

下面具体讲解一下Producer

2.实例分析

Producer是用来把消息发送到Kafka的Broker端,Producer应用非常广泛,在本文中只涉及发送随机消息和本地文件。

2.1.API介绍

实现Producer程序主要会使用到以下三个类:

  • kafka.producer.ProducerConfig:配置Producer,比如定义metadata.bokers.list、partitioner.class等;
  • kafka.javaapi.producer.Producer:最主要的类,用来发送消息等;
  • kafka.producer.KeyedMessage:定义要发送的消息,比如发送到哪个topic的哪个partition等。

2.2.示例分析

借用官方文档0.8.0 Producer Example中给出的样例程序。

Producer的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}

Partitioner的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}

这个样例程序实现的功能非常简单,就是发送多条(参数指定)类似于runtime + “,www.example.com,” + “192.168.2.” + rnd.nextInt(255);的消息。通过这个程序可以看出实现Kafka producer功能主要有以下三点需要注意:

Kafka Properties

这是实现代码的第一步,在代码中定义了一个Properties。这个Properties是通过kafka.producer.ProducerConfig将一些参数传递给Producer,告诉Producer如何找到找到集群,怎么序列化消息和消息如何发给Partition等。

样例程序中的这些参数意义为:

  • metadata.broker.list 定义Producer为每个Partition选作Leader的broker,应至少有两个,而且这两个broker一定要是开启Kafka服务;
  • serializer.class定义message传送给broker时,应该使用什么类型的序列化方式,但是注意这个类型的编码也一定要能够接受KeyMessage对象定义的类型(Java对象在传输前需要进行序列化);
  • partitioner.class决定了这个message应该发送给这个topic的哪个Partition(如果程序中为key指定了一个值但是没有定义一个partitioner.class,kafka就会使用默认的partitioner发送到指定的Partition,如果key没有定义,Producer就会把message发送到随机的Partition);
  • request.required.acks默认是0,可以设置0,1,-1;

程序中可以传入的参数参考Producer API,经常使用参数有 producer.type, batch.size, receive.buffer.bytes, send.buffer.bytesacks等。

使用的方法:

1
2
props.put("producer.type", async);
props.put("batch.num.messages", batch);

Producer object

再定义完PropertiesProducer object之后,下面就是将topic,partition和message传递给KeyedMessage,然后通过producer的send方法将消息发送出去。

1
2
3
4
5
6
7
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
/**
* 数据的处理过程
*/
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);//KeyedMessage<String, String>(topicName, partitionKey, msg)
producer.send(data);

自定义Partition

样例代码中的SimplePartitioner是自定义的,PartitionKey的值通过十进制ip地址小数点最后一位与a_numPartitions取余得到。在自定义Partition时,需要在程序(如:props.put("partitioner.class", "example.producer.SimplePartitioner"))中指定Partitioner的位置。

3.实例程序设计

在解析完样例程序之后,下面我们通过一个实际案例来设计Producer程序。

3.1.实现功能

程序要实现的功能是监控本地一个文件目录,将此目录中文件数据发送到Kafka的Broker端,并且每当发送完一个文件后就删除该文件,然后当有新的文件传进来之后就发送这个文件。

3.2.程序设计

本例我们就不在使用自定义的Partition,而直接由参数传入PartitionKey的值。程序设计的思路:

  1. 监控给定的目录;
  2. 如果目录没有文件,sleep一段时间(sleep的时间需要根据具体的应用来设置),返回第1步;
  3. 当目录中有文件时,将文件中的数据按行发送;
  4. 这个文件发送完,就将该文件删除,继续发送下一个文件,知道目录中文件发送完毕,再返回第1步。

根据这个思路,程序主要实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class producer {
public static void main(String[] args) throws IOException {
if (args.length!=3) {
System.err.println("please input <input> <topic> <partitionKey> ");
System.exit(1);
}
String input = args[0];
String topic = args[1];
String partitionKey=args[2];
Properties props = new Properties();
props.put("metadata.broker.list", "ip1:9092,ip2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
BufferedReader reader=null;
int fileLength = 0;
File file = new File(inputFold);//输入目录由参数给定
if (file.exists()) {
while (true) {//一直监控目录
File[] files = file.listFiles();
if (files.length == 0) {
System.out.println("文件夹是空的!");
try {
Thread.sleep(60000);//sleep1min
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
} else {
for (File file2 : files) {
if (file2.isDirectory()) {
System.out.println("有递归目录:" + file2.getAbsolutePath());
} else {
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file2.getAbsolutePath())));
} catch (Exception e) {
System.err.println("输入文件错误");
System.exit(2);
}
while (true) {
String line=null;
try {
line = reader.readLine();
} catch (IOException e) {
System.err.println("输入文件错误");
e.printStackTrace();
System.exit(2);
}
if(line==null)
{
break;
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic,partitionKey,line);
producer.send(data);
}
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
file2.delete();
}
}
}
} else {
System.out.println("目录不存在!");
}
producer.close();
}
}

参考: