Kafka的简单Producer和Consumer实现

在kafka单节点运行环境下,尝试使用java创建Kafka的Producer和Consumer进行测试,具体的代码环境如下:

  • OS:Ubuntu 16.4
  • Kafka:2.11_2.0.0
  • Zookeeper:使用Kafka中自带的Zookeeper进行启动
  • JDK: 1.8

项目使用maven,其中pom.xml的相关内容如下:

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <!--<scope>test</scope>-->
        </dependency>

kafka单节点环境的安装和启动见http://www.zhangyitian.cn/blog/kafka%E7%9A%84%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8%EF%BC%88kafka-doc-quickstart%EF%BC%89/

简单Producer的实现

简单Producer的代码如下:

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class TestProducer {

    private static Properties kafkaProps;

    private static void initKafka() {
        kafkaProps = new Properties();
        // broker url
        kafkaProps.put("bootstrap.servers", "localhost:9092"); //,192.168.216.139:9092,192.168.216.140:9092
        // request need to validate
        kafkaProps.put("acks", "all");
        // request failed to try
        kafkaProps.put("retries", 0);
        // memory cache size
        kafkaProps.put("batch.size", 16384);
        //
        kafkaProps.put("linger.ms", 1);
        kafkaProps.put("buffer.memory", 33554432);
        // define the way of key and value serializer
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    }

    public static void main(String[] args) {
        initKafka();
        Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        System.out.println("Message sent successfully!");
        producer.close();
    }
}

代码中需要注意的是:

bootstrap.servers的配置项,在默认kafka的单节点配置时,不能使用IP,而是使用localhost进行连接,否则会连接异常。

此处对代码中用到的几个参数进行解释:

bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;

acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。

retries:生产者发送失败后,重试的次数 batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。

linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。

batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。

buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。

key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

简单Consumer的实现

简单Consumer的代码如下:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class TestConsumer {

    private static Properties kafkaProps = new Properties();

    private static void kafkaInit() {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        // group id for each consumer
        kafkaProps.put("group.id", "test");
        // if value legal, auto add offset
        kafkaProps.put("enable.auto.commit", "true");
        // set how long time to udpate the offset value
        kafkaProps.put("auto.commit.interval.ms", "1000");
        // set session response time
        kafkaProps.put("session.timeout.ms", "30000");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {
        kafkaInit();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
        System.out.println("Subscribed to topic:" + "my-topic");

        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // ?
            for (ConsumerRecord<String, String> record : records) {
                // print the offset, key and value for the consumer records
                System.out.printf("Offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}

。。。

运行情况

Producer的成功运行后,部分输出如下:

可以看到,在producer的initKafka的相关配置项的值出现在ProducerConfig values中。

image

Consumer成功运行后,可以看到在producer中send的相关key和value值,在consumer的输出中出现:

image

关于上述参数的解释以及方法的简单使用,参考自:https://blog.csdn.net/cjf_wei/article/details/77920435

遇到的问题

在producer运行时,出现如下错误:

image

在提示的参考URL页面中,可以找到相关问题的说明:

image

具体的解决方法为,修改pom.xml文件:在pom.xml文件中加入slf4j的相关引用,并将slf4j-log4j12引用中<scope>test</scope>部分去掉,具体原因未查,但可以解决问题。

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <!--<scope>test</scope>-->
        </dependency>

解决参考:https://blog.csdn.net/beitiandijun/article/details/40510591

修改完成后,重新运行producer程序,可以正常运行。

资料参考