Kafka环境配置流程

一、Docker配置Kafka环境

0、拉取镜像

docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper

1、创建子网

首先在docker中创建子网

docker network create --driver bridge --subnet 192.168.1.0/16 --gateway 192.168.1.0 mynet
解析:
--driver bridge 表示使用桥接模式
--subnet 192.168.1.0/16 表示子网ip 可以分配 192.168.1.2 到 192.168.255.255
--gateway 192.168.1.0 表示网关
mynet 表示网络名

docker network inspect mynet命令可以查看自己创建的子网信息

2、创建zookeeper容器

docker run -d --name zookeeper -p 2181:2181 --network kafka_local -t wurstmeister/zookeeper

指定刚刚创建的子网

3、创建kafka容器

docker run -d --name kafka_1 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.tanhuiri.cn:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --network kafka_local -t wurstmeister/kafka

docker run -d --name kafka_2 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.tanhuiri.cn:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 --network kafka_local -t wurstmeister/kafka

docker run -d --name kafka_3 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.tanhuiri.cn:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 --network kafka_local -t wurstmeister/kafka

指定zookeeper的内网id,同时KAFKA_ADVERTISED_LISTENERS需要指定为本机的公网ip,否则外网无法访问

docker run -d --name kafka-manager -p 9000:9000 --network kafka_local --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager

4、使用Docker Compose创建Kafka环境

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: "zookeeper"
    volumes:
      - ./data:/data
    ports:
      - 2181:2181
       
  kafka:
    image: wurstmeister/kafka
    container_name: "kafka"
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: host.tanhuiri.cn
      KAFKA_MESSAGE_MAX_BYTES: 2000000
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./kafka-logs:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
 
  kafka-manager:
    image: sheepkiller/kafka-manager
    container_name: "kafka-manager"
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper:2181

二、Linux搭建Kafka单机环境

0、下载Zookeeper和Kafka安装包

http://mirror.bit.edu.cn/apache/zookeeper/下载Zookeeper的最新稳定版安装包,需要注意在官网上有两个tar.gz文件,从版本3.5.5开始,带有bin名称的包才是我们想要的下载可以直接使用的里面有编译后的二进制的包,而之前的普通的tar.gz的包里面是只是源码包,无法直接使用。

我这里下载的是apache-zookeeper-3.6.3-bin.tar.gz 版本。

在Kafka官网下载Kafka安装包http://kafka.apache.org/downloads

我下载的是kafka_2.12-2.7.0.tgz,其中2.12表示Scala的版本(服务器要有对应Scala环境),2.7.0是Kafka的版本号。

下载后在服务器上的/opt文件夹下分别创建Zookeeper和Kafka的目录

cd /opt
mkdir zookeeper
mkdir kafka

然后将安装包上传至服务器的对应目录。

1、安装Zookeeper

首先解压安装包

cd /opt/zookeeper
tar -xvzf apache-zookeeper-3.6.3-bin.tar.gz
cd apache-zookeeper-3.6.3-bin.tar.gz

创建配置文件zoo.cfg

cd conf
cp zoo_sample.cfg zoo.cfg

修改配置文件

# data目录改成zookeeper文件夹下的data文件夹(可选)
mkdir /opt/zookeeper/apache-zookeeper-3.6.3-bin.tar.gz/data
vi zoo.cfg
dataDir=/tmp/zookeeper

启动zookeeper

/opt/zookeeper/apache-zookeeper-3.6.3-bin.tar.gz/bin/zkServer.sh start

执行此命令后,你将收到以下响应
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

停止zookeeper

/opt/zookeeper/apache-zookeeper-3.6.3-bin.tar.gz/bin/zkServer.sh stop

启动cli

/opt/zookeeper/apache-zookeeper-3.6.3-bin.tar.gz/bin/zkCli.sh

响应结果如下
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

2、安装Kafka

首先需要按照上面的步骤启动zookeeper

cd /opt/kafka
tar -xvzf kafka_2.12-2.7.0.tgz
cd kafka_2.12-2.7.0
mkdir logs

// 修改server.properties配置文件
vi config/server.properties
	# broker的全局唯一编号,不能重复
	broker.id=0
	# 监听
	listeners=PLAINTEXT://0.0.0.0:9092
	# 日志目录
	log.dirs=/opt/kafka/kafka_2.12-2.7.0/logs
	# 配置zookeeper的连接(如果是不是本机,那么需要设置为具体ip)
	zookeeper.connect=localhost:2181
	# 配置kafka的公网或局域网监听地址
	advertised.listeners=PLAINTEXT://www.yourhost.com:9092

3、启动Kafka

启动kafka

/bin/kafka-server-start.sh /config/server.properties

使用jps命令可以查看是否在运行

jps
13826 QuorumPeerMain
20041 Kafka
22170 Jps
2877 metro-backend-1.0-SNAPSHOT.jar

4、测试Kafka

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
参数说明: 
–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 
–replication-factor:指定副本数量 
–partitions:指定分区数量 
–topic:主题名称

创建一个消费者:

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test

此时,命令行会进入运行状态,界面就卡住不动了,然后再新建一个窗口,进入kafka目录,继续创建一个生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

同样这个时候命令行进入运行状态,我们输入一个字符串:Hello Kafka!此时打开刚才创建消费者的窗口,会看到刚才的输入消息被消费了

Q.E.D.


悟已往之不谏,知来者之可追