Kafka的安装和使用(Kafka Doc-QuickStart)

1. 安装Kafka

Kafka下载地址:Download

yitian@heron01:~$ cd kafka/
yitian@heron01:~/kafka$ ll
total 54456
drwxrwxr-x  2 yitian yitian     4096 Sep 15 17:30 ./
drwxr-xr-x 37 yitian yitian     4096 Sep 15 17:30 ../
-rwxrw-rw-  1 yitian yitian 55751827 Sep 15 12:36 kafka_2.11-2.0.0.tgz*
yitian@heron01:~/kafka$ tar -zxf kafka_2.11-2.0.0.tgz 
yitian@heron01:~/kafka$ cd kafka_2.11-2.0.0/
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ ll
total 60
drwxr-xr-x 6 yitian yitian  4096 Jul 24 22:19 ./
drwxrwxr-x 3 yitian yitian  4096 Sep 15 17:31 ../
drwxr-xr-x 3 yitian yitian  4096 Jul 24 22:19 bin/
drwxr-xr-x 2 yitian yitian  4096 Jul 24 22:19 config/
drwxr-xr-x 2 yitian yitian  4096 Sep 15 17:31 libs/
-rw-r--r-- 1 yitian yitian 28824 Jul 24 22:17 LICENSE
-rw-r--r-- 1 yitian yitian   336 Jul 24 22:17 NOTICE
drwxr-xr-x 2 yitian yitian  4096 Jul 24 22:19 site-docs/

2. 启动Kafka Server

Kafka使用Zookeeper组件,因此在启动Kafka之前需要前启动Zookeeper,如果没有安装Zookeeper,则可以使用Kafka中自带的Zookeeper启动一个单节点的Zookeper实例(这里即采用这种方式):

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties 

新打开一个Ubuntu终端窗口,启动Kafka Server:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-server-start.sh config/server.properties 

3. 创建一个Topic

在Kafka成功启动完成后,先创建一个名为test的Topic,并使用如下配置:

  • partition:1
  • replica:1
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

查看已创建的Topic列表:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4. 发送一些信息

Kafka提供CLC(Command Line Client)来文件或标准输入中读取信息到Kafka集群。默认情况下,每一行都作为一个Separate Message。

现在运行一个命令行的Proceduer并向其中输入以下信息给Kafka Server:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message 

5. 启动Consumer

Kafka同样可是使用Command Line Consumer来接受并输出Proceduer中发送的信息:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-consumer.sh --bootsap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你在不同的终端窗口中运行了proceduer和consumer的命令,将会看到proceduer发送的信息在consumer中输出。

6. 设置Multi-Broker集群(本地)

目前为止,我们只运行了单个的brpoker。对于Kafka来说,1个broker意味着Kafka集群的规模为1。下面我们将集群的规模设置为3,进行尝试(仍然是在本地单节点环境中)。

首先,为每个broker创建配置文件:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ cp config/server.properties config/server-1.properties 
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ cp config/server.properties config/server-2.properties 

然后新创建的文件,进行如下配置:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ vim config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ vim config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each other’s data.

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-server-start.sh config/server-1.properties & 
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-server-start.sh config/server-2.properties &

Now create a new topic with a replication factor of three:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

Okay but now that we have a cluster how can we know which broker is doing what? To see that run the “describe topics” command:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.

  • “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

Note that in my example node 1 is the leader for the only partition of the topic.

We can run the same command on the original topic we created to see where it is:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.

Let’s publish a few messages to our new topic:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^Cyitian@heron01:~/kafka/kafka_2.11-2.0.0$ 

Now let’s consume these messages:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
^CProcessed a total of 2 messages
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ 

Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ ps aux | grep server-1.properties
yitian    11584  3.6 16.4 4689304 332620 pts/21 Sl   20:38   0:23 /usr/java/jdk1 ...
kill -9 11584

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0

But the messages are still available for consumption even though the leader that took the writes originally is down:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
^CProcessed a total of 2 messages
yitian@heron01:~/kafka/kafka_2.11-2.0.0$ 

7. 使用Kafka导入/导出数据

Writing data from the console and writing it back to the console is a convenient place to start, but you’ll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data.

Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we’ll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.

First, we’ll start by creating some seed data to test with:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ echo -e "foo\nbar" > test.txt

Next, we’ll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file.

During startup you’ll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from test.txt and producing them to the topic connect-test, and the sink connector should start reading messages from the topic connect-test and write them to the file test.sink.txt. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ more test.sink.txt 
foo
bar

Note that the data is being stored in the Kafka topic connect-test, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

The connectors continue to process data, so we can add data to the file and see it move through the pipeline:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ echo Another line>> test.txt

You should see the line appear in the console consumer output and in the sink file.

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

8. 使用Kafka Streams处理数据

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more. This quickstart example will demonstrate how to run a streaming application coded in this library.

Kafka常见异常及解决

1. Kafka启动之后出现如下异常:

[2018-09-15 17:42:25,203] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-09-15 17:42:28,273] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-09-15 17:42:31,347] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

并且,在启动consumer时出现:

[2018-09-15 17:52:31,936] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22333] Error while fetching metadata with correlation id 48 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-09-15 17:52:32,043] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22333] Error while fetching metadata with correlation id 49 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-09-15 17:52:32,149] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22333] Error while fetching metadata with correlation id 50 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

问题解决:

修改kafka的配置文件:

yitian@heron01:~/kafka/kafka_2.11-2.0.0/config$ vim server.properties 

image

2. 启动多broker时遇到的问题:

yitian@heron01:~/kafka/kafka_2.11-2.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2018-09-15 20:35:19,352] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
 (kafka.admin.TopicCommand$)

解决方法:

原因是broker2对应的server未启动造成的,完成启动三个broker server,即可创建该topic。

资料参考

来源:http://kafka.apache.org/quickstart

https://www.cnblogs.com/hei12138/p/7805475.html

异常处理:https://blog.csdn.net/luozhonghua2014/article/details/80369469