/ 城寒 / Kafka - Basic Usage

Kafka - Basic Usage

2018-01-30 posted in [各种]

Kafka 基本使用

安装

# 1. 配置java环境
vim ~/bash_profile
## 添加
JAVA_HOME=$HOME/JDK_PATH
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
######
source ~/bash_profile
 
# 解压kafka并修改配置文件
vim config/server.properties
vim config/zookeeper.properties
# 内容见文末
 
# 修改kafka JVM配置
vim bin/kafka-server-start.sh
## 修改 JVM 配置,比如
export KAFKA_HEAP_OPTS="-Xms16g -Xmx16g -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
##############
 
# 逐个节点启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 逐个节点启动kafka
bin/kafka-server-start.sh -daemon config/server.properties

基本命令

Kafka 版本:kafka_2.10-0.10.2.1 不同版本命令风格会有不同

# KAFKA-SHELL
# 消费数据
./kafka-console-consumer.sh \
--bootstrap-server kafka-host-1:9092,kafka-host-2:9092,kafka-host-3:9092 \
--topic TOPIP_NAME
# 生产数据
./kafka-console-producer.sh \
--broker-list kafka-host-1:9092,kafka-host-2:9092,kafka-host-3:9092 \
--topic TOPIP_NAME
# 查看总体
./kafka-topics.sh \
--zookeeper kafka-host-1:2181,kafka-host-2:2181,kafka-host-3:2181 --list
# 查看详细
./kafka-topics.sh \
--zookeeper kafka-host-1:2181,kafka-host-2:2181,kafka-host-3:2181 \
--topic TOPIP_NAME --describe
# 查看分片
./kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka-host-1:9092,kafka-host-2:9092,kafka-host-3:9092 \
--topic TOPIP_NAME --time -1 \
--partitions 0
# 添加分片
./kafka-topics.sh \
--zookeeper kafka-host-1:2181,kafka-host-2:2181,kafka-host-3:2181 \
--topic TOPIP_NAME --alter --partitions 3
# kafka 对数据的分片规则是: partition_index = (key.hashCode) % partition_num
# 所以没有key的数据,都会发到0分区上,添加分区也不起作用

Kafka 添加备份和重启

需要增加吞吐量时,需要增加partition,为了保证稳定性需要添加replica 前者会增加内存消耗(修改KAFKA_HEAP_OPTS)和不稳定性,同时破坏整体消息的有序 后者会大量增加占用空间,降低速度

添加备份

# 添加备份Replicas
# 先用查看详细的命令查看每个partition的位置即replica的情况,再写json进行配置
# 保存如下格式的json到文件partitions-to-move.json
{"version":1,
"partitions":[{"topic":"test","partition":0,"replicas":[0, 1]},
              {"topic":"test","partition":1,"replicas":[1, 2]},
              {"topic":"test","partition":2,"replicas":[2, 0]},
              {"topic":"test","partition":3,"replicas":[0, 1]},
              {"topic":"test","partition":4,"replicas":[1, 2]},
              {"topic":"test","partition":5,"replicas":[2, 0]}]
}

执行:

./kafka-reassign-partitions.sh --zookeeper kafka-host-1:2181,kafka-host-2:2181,kafka-host-3:2181 \
--reassignment-json-file partitions-to-move.json --execute

重启

对于每个在使用中的topic,如果每个partition在别的broker上都有replicas,则可以进行平滑重启,对consumer和producer无影响

# 直接
kill -15 kafka-pid
# 然后启动
bin/kafka-server-start.sh config/server.properties >/dev/null &
# 可以在kafka-server-start.sh 中修改JVM配置
export KAFKA_HEAP_OPTS="-Xms16g -Xmx16g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
# 可以在server.properties中修改kafka配置,添加threads数量

配置

zookeeper.properties

# the directory where the snapshot is stored.
dataDir=/data/zookeeper-snapshot
## 在各个节点的这个文件夹下,创建 myId 文件,内容为一个数字,对应server编号 0,1,2

# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=200
tickTime=200
initLimit=5
syncLimit=2
server.0=ip1:2888:3888
server.1=ip2:2888:3888
server.2=ip3:2888:3888

server.properties

# 内容很多,主要介绍几个

# 适当增加,提高性能
num.network.threads=6
num.io.threads=16

# 默认保存时间
log.retention.hours=168

# 可以配置用来控制topic总的文件大小,可以针对每个topic单独添加设置
# topic总大小 = partition数量 * log.retention.bytes
log.retention.bytes=1073741824