Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
操作系统:CentOS-7.x
kafka版本:kafka_2.12-3.4.0
三台服务器ip地址:
192.168.21.100,192.168.21.101,192.168.21.128
1、关闭selinux
sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
setenforce 0
2、关闭防火墙
kafka默认使用tcp9092端口号,建议在内网环境下部署kafka集群,关闭服务器防火墙。
CentOS-7.x默认使用的是firewall作为防火墙,关闭
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
systemctl mask firewalld
systemctl stop firewalld
yum remove firewalld
3、安装jdk
kafka依赖Java,需要安装java环境,我们使用Java 1.8版本
Linux系统下安装Java JDK:https://www.osyunwei.com/archives/12872.html
4、安装配置zookeeper
kafka依赖zookeeper,需要先安装部署,并且优先于kafka启动
也可以使用kafka自带的zookeeper,这里我们使用自定义安装的zookeeper
Linux系统下部署ZooKeeper集群:https://www.osyunwei.com/archives/13465.html
5、添加hosts解析
vi /etc/hosts #编辑配置文件
192.168.21.100 kafka01
192.168.21.101 kafka02
192.168.21.128 kafka03
:wq! #保存退出
6、安装kafka
6.1、下载地址
https://kafka.apache.org/downloads
https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
上传到/usr/local/src目录
6.1创建kafka安装目录
mkdir -p /data/server/kafka
mkdir -p /data/server/kafka/kafka-logs
cd /usr/local/src
#解压到安装目录
tar zxvf kafka_2.12-3.4.0.tgz -C /data/server/kafka --strip-components 1
6.2配置环境变量
vi /etc/profile
#set kafka environment
export KAFKA_HOME=/data/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
:wq! #保存退出
source /etc/profile #使配置立即生效
6.3配置kafka
cp /data/server/kafka/config/server.properties /data/server/kafka/config/server.properties.bak
vi /data/server/kafka/config/server.properties #kafka配置文件
broker.id=1 # 唯一标识在集群中的ID,每个节点的broker.id值唯一,1,2,3(192.168.21.100是1,192.168.21.101是2,192.168.21.128是3),最小是0,最大1000
listeners=PLAINTEXT://0.0.0.0:6667 #broker 服务器要监听的地址及端口,默认监听端口9092,这里改为6667,是kafka真正bind的地址
num.network.threads=32 #处理网络请求的最大线程数
num.io.threads=32 #处理I/O请求的线程数
advertised.listeners=PLAINTEXT://192.168.21.100:6667 #各自填写每个节点ip地址,暴露对外访问的地址,这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址,如果没有设置,会用listeners地址
log.dirs=/data/server/kafka/kafka-logs
zookeeper.connect=192.168.21.100:2181,192.168.21.100:2181,192.168.21.128:2181
offsets.topic.replication.factor=3 #默认主题__consumer_offsets的副本数量,默认值是1,集群环境下修改为节点数量3,否则挂掉一个节点整个集群就不可用了
num.partitions=3 #全局默认分区,集群环境下修改为节点数量3
default.replication.factor=3 #全局默认副本,集群环境下修改为节点数量3
delete.topic.enable=true #允许通过kafka命令行就可以直接删除topic
auto.create.topics.enable=false #关闭自动创建topic
log.retention.hours=7 #默认日志保留时间7天(168小时)
buffer.memory=2684354560
batch.size=524288
max.request.size=5242880
linger.ms=50
:wq! #保存退出
6.4启动kafka
zookeeper集群一定要在Kafka启动之前启动,因为kafka的节点需要向zookeeper注册,三台机器分别启动kafka
cd /data/server/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties #以后台方式启动
验证 ,有Kafka了证明启动没有问题
jps
jps | grep Kafka
ps -ef |grep kafka
6.5创建topics
只在1台主节点操作
./kafka-topics.sh --create --topic hostScreen --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3
./kafka-topics.sh --create --topic inspection --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3
./kafka-topics.sh --create --topic inspection.hardware.host --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3
./kafka-topics.sh --create --topic inspection.hardware.log --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3
./kafka-topics.sh --create --topic network --bootstrap-server localhost:6667 --replication-factor 3 --partitions 3
6.6查看主题
cd /data/server/kafka/bin
./kafka-topics.sh --list --bootstrap-server localhost:6667
6.7生产测试
cd /data/server/kafka/bin
./kafka-console-producer.sh --broker-list localhost:6667 --topic hostScreen
>>>
>test
>kafka-tets
6.8消费测试
cd /data/server/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:6667 --topic hostScreen --from-beginning
test
kafka-tets
#查看所有消费者组
cd /data/server/kafka/bin
./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --list
6.9查看topics信息
./kafka-topics.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka:6667 --topic inspection --describe
./kafka-topics.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka:6667 --topic inspection.hardware.log --describe
7、设置kafka开机启动
vi /etc/rc.d/rc.local
/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties
:wq! #保存退出
#设置普通用户启动,需要设置kafka目录所有者为myuser
su - myuser -c "/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties"
#默认/etc/rc.local没有执行权限,需要手动添加执行权限
chmod +x /etc/rc.d/rc.local
sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties #启动kafka
#关闭kafka
cd /data/server/kafka/bin
./kafka-server-stop.sh
ps -ef |grep kafka
#扩容分区
cd /data/server/kafka/bin
1、topic分区扩容,分区数只能增加不能减少
#扩容inspection.hardware.host这个topic
./kafka-topics.sh --bootstrap-server kafka01:6667kafka02:6667,kafka03:6667 --topic inspection.hardware.host -alter --partitions 3
2、根据topic的分区情况自行修改 partitions-topic.json 文件配置
#partitions-topic.json文件要放在执行副本搬迁命令的目录下
vi partitions-topic.json
{
"partitions":
[
{
"topic": "inspection.hardware.host",
"partition": 0,
"replicas": [2,3,1]
},
{
"topic": "inspection.hardware.host",
"partition": 1,
"replicas": [3,2,1]
},
{
"topic": "inspection.hardware.host",
"partition": 2,
"replicas": [1,2,3]
}
],
"version":1
}
:wq! #保存退出
3、执行副本搬迁
#partitions-topic.json文件要放在执行副本搬迁命令的目录下
./kafka-reassign-partitions.sh --bootstrap-server kafka01:6667,kafka02:6667,kafka03:6667 --reassignment-json-file partitions-topic.json --execute
4、查看迁移情况
../bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file partitions-topic.json --verify
说明:
kafka-reassign-partitions.sh工具来重新分布分区。该工具有三种使用模式:
generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
execute模式,根据指定的reassign plan重新分配Partition
verify模式,验证重新分配Partition是否成功
#相关概念
Broker:kafka节点,多个broker组成kafka集群。
Topic:即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。
Producer:即消息生产者,向Broker发送消息的客户端。
Consumer:即消息消费者,从Broker消费(接收)消息的客户端。
ConsumerGroup:即消费者组,消费者隶属于消费者组,同一个分区的消息可以被多个消费者消费,但是同一个消费者组中只能有一个消费者可以消费。
Partition:即分区,每个Topic下都至少有一个分区,分区内部的消息是有序的。
#结束进程
ps -aux | grep -E 'kafka' | grep -v grep |awk '{print $2}' |sudo xargs kill -s 9
#Kafka架构
#kafka配置参数
至此,Linux系统下kafka集群安装部署完成。