Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
操作系统:CentOS-7.x
kafka版本:kafka_2.12-3.4.0
1、关闭selinux
sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
setenforce 0
2、防火墙设置
CentOS-7.x默认使用的是firewall作为防火墙,这里改为iptables防火墙。
kafka默认使用tcp9092端口号
2.1、关闭firewall:
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
systemctl mask firewalld
systemctl stop firewalld
yum remove firewalld
2.2、安装iptables防火墙
yum install iptables-services #安装
vi /etc/sysconfig/iptables #编辑防火墙配置文件
# sample configuration for iptables service
# you can edit this manually or use system-config-firewall
# please do not ask us to add additional ports/services to this default configuration
*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
-A INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT
-A INPUT -p icmp -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 9092 -j ACCEPT
-A INPUT -j REJECT --reject-with icmp-host-prohibited
-A FORWARD -j REJECT --reject-with icmp-host-prohibited
COMMIT
:wq! #保存退出
systemctl restart iptables.service #最后重启防火墙使配置生效
systemctl enable iptables.service #设置防火墙开机启动
/usr/libexec/iptables/iptables.init restart #重启防火墙
3、添加hosts解析
vi /etc/hosts #编辑配置文件
192.168.21.128 Kafka001
:wq! #保存退出
4、安装jdk(kafka依赖软件)
下载openjdk:https://adoptium.net/zh-CN/temurin/releases
上传OpenJDK8U-jdk_x64_linux_hotspot_8u362b09.tar.gz到/usr/local/src目录下
安装JDK
#创建jdk安装路径
mkdir -p /data/server/java
#解压
tar -zxvf /usr/local/src/OpenJDK8U-jdk_x64_linux_hotspot_8u362b09.tar.gz -C /data/server/java
#进入安装目录
cd /data/server/java/jdk8u362-b09
#查看版本信息
./bin/java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_362-b09)
OpenJDK 64-Bit Server VM (Temurin)(build 25.362-b09, mixed mode)
#设置环境变量
vi /etc/profile #添加下面的信息
#set java environment
JAVA_HOME=/data/server/java/jdk8u362-b09
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
:wq! #保存退出
#让修改立即生效
source /etc/profile
#查看版本信息
java -version
[root@localhost jdk8u362-b09]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_362-b09)
OpenJDK 64-Bit Server VM (Temurin)(build 25.362-b09, mixed mode)
#其他版本的jdk安装方法相同
5、安装kafka
5.1、下载地址
https://kafka.apache.org/downloads
https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
上传到/usr/local/src目录
5.1创建kafka安装目录
mkdir -p /data/server/kafka
cd /usr/local/src
#解压到安装目录
tar zxvf kafka_2.12-3.4.0.tgz -C /data/server/kafka --strip-components 1
5.2配置环境变量
vi /etc/profile
#set kafka environment
export KAFKA_HOME=/data/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
:wq! #保存退出
source /etc/profile #使配置立即生效
5.3配置zookeeper(kafka依赖软件)
#创建zookeeper数据目录
mkdir -p /data/server/kafka/zookeeper
#创建zookeeper日志目录
mkdir -p /data/server/kafka/log/zookeeper
##进入kafka配置目录
cd /data/server/kafka/config
cp zookeeper.properties zookeeper.properties.bak #备份
vi zookeeper.properties #编辑修改
dataDir=/data/server/kafka/zookeeper #zookeeper目录
dataLogDir= /data/server/kafka/log/zookeeper #zookeeper日志目录
clientPort=2181
maxClientCnxns=10000
tickTime=2000
initLimit=10
syncLimit=5
#metricsProvider.exportJvmInfo=true
4lw.commands.whitelist=*
#添加zookeeper节点
#服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
server.1=192.168.21.128:2888:3888
#server.2=192.168.21.129:2888:3888
#server.3=192.168.21.130:2888:3888
:wq! #保存退出
#启动zookeeper(zookeeper要在kafka前启动)
cd /data/server/kafka/bin
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties # -daemon:以后台方式启动
#查看进程
ps -ef |grep zookeeper
#测试端口
telnet 192.168.21.128 2181
5.4配置kafka
cp /data/server/kafka/config/server.properties /data/server/kafka/config/server.properties.bak
vi /data/server/kafka/config/server.properties #kafka配置文件
broker.id=1 # 唯一标识在集群中的ID,每个节点的broker.id值唯一,1,2,3
listeners=PLAINTEXT://0.0.0.0:9092 #broker 服务器要监听的地址及端口,默认监听端口9092
num.network.threads=3 #处理网络请求的最大线程数
num.io.threads=8 #处理I/O请求的线程数
advertised.listeners=PLAINTEXT://192.168.21.128:9092 #暴露对外访问的地址 这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址
log.dirs=/data/server/kafka/kafka-logs
zookeeper.connect=192.168.21.128:2181
delete.topic.enable=true #允许通过kafka命令行就可以直接删除topic
auto.create.topics.enable=false #关闭自动创建topic
log.retention.hours=7 #默认日志保留时间7天(168小时)
:wq! #保存退出
#启动kafka
cd /data/server/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties # -daemon:以后台方式启动
#创建topics
./kafka-topics.sh --create --topic hostScreen --bootstrap-server 192.168.21.128:9092 --replication-factor 1 --partitions 1
#生产测试
./kafka-console-producer.sh --broker-list 192.168.21.128:9092 --topic hostScreen
#消费测试
./kafka-console-consumer.sh --bootstrap-server 192.168.21.128:9092 --topic hostScreen --from-beginning
#查看主题
./kafka-topics.sh --list --bootstrap-server 192.168.21.128:9092
#查看所有消费者组
./kafka-consumer-groups.sh --bootstrap-server 192.168.21.128:9092 --list
5.5、设置zookeeper和kafka开机启动
vi /etc/rc.d/rc.local
/bin/sh /data/server/kafka/bin/zookeeper-server-start.sh -daemon /data/server/kafka/config/zookeeper.properties
/bin/sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties
:wq! #保存退出
#默认/etc/rc.local没有执行权限,需要手动添加执行权限
chmod +x /etc/rc.d/rc.local
sh /data/server/kafka/bin/zookeeper-server-start.sh -daemon /data/server/kafka/config/zookeeper.properties #启动zookeeper
sh /data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties #启动kafka
#关闭kafka
cd /data/server/kafka/bin
./kafka-server-stop.sh
#关闭zookeeper
cd /data/server/kafka/bin
./zookeeper-server-stop.sh
6、相关概念
Broker:kafka节点,多个broker组成kafka集群。
Topic:即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。
Producer:即消息生产者,向Broker发送消息的客户端。
Consumer:即消息消费者,从Broker消费(接收)消息的客户端。
ConsumerGroup:即消费者组,消费者隶属于消费者组,同一个分区的消息可以被多个消费者消费,但是同一个消费者组中只能有一个消费者可以消费。
Partition:即分区,每个Topic下都至少有一个分区,分区内部的消息是有序的。
至此,Linux系统下kafka单节点安装部署完成。