开启Kafka

先进入kafka的安装目录

1
bin/kafka-server-start.sh config/server.properties

运行程序

有以下两种方式来运行程序:

1
2
java -Djava.ext.dirs=/opt/kafka_2.10-0.8.1.1/libs -jar testProducer.jar  
java -cp traffic-distribution.jar producer.producer /home/matt/test/ topicName ...

第一种是jar包中没有打入相关依赖包的情况;
第二种是jar宝中已经包含相关依赖包。

列出所有的topic

列出当前集群上的所有topic:

1
bin/kafka-topics.sh --list --zookeeper serverIP:serverPort

注:serverIP为zookeeper所在机器的IP,serverPort为对应的端口。

删除topic

删除topic的命令:

1
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand -topic test -zookeeper serverIP:serverPort

这种删除方式只是删除了topic,但是该topic中Partition的数据依然存在,等数据过期之后就会自动删除。(也可以手动删除/tmp/kafka-logs/下的partition信息)

kafka集群设置

主要是配置kafka的config的server.properties文件,后面我写一篇kafka安装配置的blog。

查看topic的信息

1
bin/kafka-topics.sh --describe --zookeeper serverIP:serverPort --topic test

删除group

进入zookeeper的安装目录,如/opt/cloudera/parcels/CDH/bin/

1
2
3
sh zookeeper-client
ls /consumers
rmr /consumers/groupName

开启topic

建立topic的几种方式:

1
2
bin/kafka-topics.sh --zookeeper serverIP:serverPort --create --topic test --partitions 1 --replication-factor 1
bin/kafka-topics.sh --zookeeper serverIP:serverPort --create --topic test --replica-assignment 29,29

第一种是一般的方式,建立的topic的partition为1,replication为1;
第二种是直接指定partition所在机器。

KafkaOffsetMonitor

使用KafkaOffsetMonitor对kafka集群进行监控。

KafkaOffsetMonitor的jar包下载,源码地址

运行命令:

1
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk serverIP1,serverIP2 --port 8080 --refresh 10.seconds --retain 2.days

参数的意思是:

  • ZK the ZooKeeper hosts
  • port on what port will the app be available
  • refresh how often should the app refresh and store a point in the DB
  • retain how long should points be kept in the DB
  • dbName where to store the history (default ‘offsetapp’)