Kafka安装和常用命令使用介绍
kafka 安装
安装快速入门 -> 官方文档
安装步骤
-
这里使用了二进制包版本 v3.1.0 - kafka_2.13-3.1.0.tgz。下载后直接解压可用:
$ tar -xzvf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0
-
启动zookeeper
kafka使用zk 做集群管理,如broker的注册感知等。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
查看zookeeper 启动是否成功(再开一个终端窗口)
$ cat config/zookeeper.properties
# 可以看到zk对外客户端连接的端口是 2181
$ lsof -i :2181
- 启动kafka broker服务
这里启动之前修改下 server.properties 的参数(本地搭建)
advertised.listeners=PLAINTEXT://localhost:9092
启动broker:
$ bin/kafka-server-start.sh config/server.properties
服务默认启动9092端口
$ lsof -i :9092
zookeeper上查看注册上来的信息
$ bin/zookeeper-shell.sh localhost:2181 ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
常用命令使用
kafka bin自带很多命令方便使用,比如创建topic、查看topic、查看消费者组、生产消息、消费消息等等
1.创建一个topic
$ bin/kafka-topics.sh --create --topic my-first-topic --partitions 2 --bootstrap-server localhost:9092
Created topic my-first-topic.
--partitions 2
指定分区数是2
--bootstrap-server localhost:9092
表示指定要连上这个启动的kafka broker服务。即使多机部署,指定某一个broker服务即可。
2.查看topic 情况
- 查看zookeeper上的注册信息
$ bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics/my-first-topic/partitions
[0, 1]
- kafka 命令查看
$ bin/kafka-topics.sh --describe --topic my-first-topic --bootstrap-server localhost:9092
Topic: my-first-topic TopicId: sF-k4P7lQuO25ZeHziXD3Q PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: my-first-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my-first-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
- kafka 存储查看
kafka 消息存储目录
$ grep log.dirs config/server.properties
log.dirs=/tmp/kafka-logs
$ ls /tmp/kafka-logs
可以看到新建的topic对应的两个分区 my-first-topic-0 my-first-topic-1
3.生产者发消息
$ bin/kafka-console-producer.sh --topic my-first-topic --bootstrap-server localhost:9092
>news1
>news2
日志目录
消息落盘到日志
$ cd /tmp/kafka-logs/my-first-topic-0
4.消费者消费消息
$ bin/kafka-console-consumer.sh --topic my-first-topic --from-beginning --bootstrap-server localhost:9092 --consumer-property group.id=my-first-group
news1
news2
--consumer-property group.id=my-first-group
表示把这个消费者加入到这个消费者组里。
此时查看 zk 的topic,会多一个消费位移的topic: __consumer_offsets
$ bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics
[__consumer_offsets, my-first-topic]
#默认消费位移topic 有50个分区
$ bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics/__consumer_offsets/partitions
5.查看消费者组信息
- 列出所有消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
my-first-group
- 查询指定消费者组信息
查询指定消费者组信息,包括该消费者组:
- 订阅了哪几个topic
- 订阅的每个topic下各个分区的消费进度(生产者最新写入的进度,当前消费到的进度,以及延迟落后的进度)
- 消费者信息(CONSUMER-ID、Host、CLIENT-ID)
这个命令可以查看消费者组下各消费者消费情况。
另外这里又创建了一个新的topic:my-second-topic,并同样用这个消费者组 my-first-group 订阅这个topic。当把消费者关闭后, CONSUMER-ID、HOST、LIENT-ID 则显示 - 空。
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-group
GROUP | TOPIC | PARTITION | CURRENT-OFFSET(当前消费进度) | LOG-END-OFFSET(生产者最新写入进度) | LAG (延迟) | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|---|
my-first-group | my-first-topic | 0 | 19 | 22 | 3 | console-consumer-820274b5-c7a9-4602-a57f-feef4e742e35 | /127.0.0.1 | console-consumer |
my-first-group | my-first-topic | 1 | 10 | 10 | 0 | console-consumer-10053fe1-3e3f-44ee-a5b7-773a248bada9 | /127.0.0.1 | console-consumer |
my-first-group | my-second-topic | 0 | 0 | 5 | 5 | - | - | - |
my-first-group | my-second-topic | 1 | 0 | 8 | 8 | - | - | - |
6.重置消费位移
kafka支持消费位移重置,比如可以让消费者跳过一些消息,或者重复消费某些消息。重置支持:
-
指定offset
指定 offset 更有技术方面的考虑,因为不同分区有不同offset,因此更多会指定某个分区的 offset。
比如,针对消费者组是my-first-group,topic 是 my-second-topic,分区是 1 的消费位移重置,要重置 offset 是 6。
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-offset 6 --topic my-second-topic:1 --group my-first-group --execute
GROUP TOPIC PARTITION NEW-OFFSET
my-first-group my-second-topic 1 6
-
指定发生的时间
指定发生时间,更有业务方面的考虑。让kafka 自己去根据指定的时间找到对应的消息的 offset 来偏移。因为指定了时间,一般就不会再指定重置哪个分区了,是针对整个topic的。实际使用中会更多去用它。
比如,针对消费者组是my-first-group,topic 是 my-second-topic,重置消费位移到 2022-06-22T10:00:00.000 时间之后的消息位移。
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2022-06-22T10:00:00.000 --topic my-second-topic --group my-first-group --execute
-
以及以上两种的相对偏移量
比如 –to-earliest、–to-latest、–shift-by n、–by-duration 等
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-latest --topic my-second-topic --group my-first-group
如果不加参数 –execute,表示空跑一下 不去真正执行,避免不小心的操作。
总结
以上,介绍了kafka 单机的安装 以及 常用的基本命令使用。
-
安装:主要是 启动zookeeper,再启动 broker。注意配置文件的修改。
-
常用命令:这在 bin 目录下有很多顾名思义的脚本命令。如果不记得参数,可以 –help 查看。