docker——Kafka的安装使用

安装

下载镜像

docker pull wurstmeister/zookeeper

docker pull wurstmeister/kafka

启动镜像

docker run -d –name zookeeper -p 2181 -t wurstmeister/zookeeper

docker run -d –name kafka –publish 9092:9092 –link zookeeper –env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 –env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 –env KAFKA_ADVERTISED_PORT=9092 –volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest

查看启动镜像

docker ps

docker使用之Kafka的安装使用_2019-12-04-16-04-31.png

测试消息发送

进入容器

docker exec -it 83156456fb03 /bin/bash

进入 kafka 默认目录

cd /opt/kafka_2.12-2.3.0/

创建 topic

bin/kafka-topics.sh –create –zookeeper zookeeper:2181 –replication-factor 1 –partitions 1 –topic test

发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

docker使用之Kafka的安装使用_2019-12-04-16-11-42.png

接受消息

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

docker使用之Kafka的安装使用_2019-12-04-16-13-40.png

java 生产者与消费者

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.qn.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

public class KafKaProducer {
public static void main(String[] args) {
// TODO Auto-generated method stub

Properties props = new Properties();

//broker地址
props.put("bootstrap.servers", "localhost:9092");

//请求时候需要验证
props.put("acks", "all");

//请求失败时候需要重试
props.put("retries", 0);

//内存缓存区大小
props.put("buffer.memory", 33554432);

//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i =0;i<100;i++){
// 生产一条消息的时间有点长
producer.send(new ProducerRecord<>("test", "测试用例"+Integer.toString(i)));
}

System.out.println("Message sent successfully");
producer.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.qn.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class KafKaConsumer {
@SuppressWarnings({ "deprecation", "resource" })
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092"); // "localhost:9092"
//每个消费者分配独立的组号
props.put("group.id", "test1");

//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");

//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");

//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");

//
//props.put("auto.offset.reset", "earliest");

props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
System.out.println("建立消费者");
consumer.subscribe(Collections.singletonList("test")); //核心函数1:订阅topic
System.out.println("订阅成功");
//消费轮询
try{
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100000);
for(ConsumerRecord<String,String> record : records) {
System.out.println("取出数据:"+ record.value());
System.out.printf("offset = %d ,key = %s, value = %s%n",record.offset(),record.key(),record.value());
}
}
}finally{
consumer.close();
}
}
}

参考资料

使用 docker 安装 kafka

Kafka 的使用和错误解决

文章目录
  1. 1. 安装
    1. 1.1. 下载镜像
    2. 1.2. 启动镜像
    3. 1.3. 查看启动镜像
  2. 2. 测试消息发送
    1. 2.1. 进入容器
    2. 2.2. 进入 kafka 默认目录
    3. 2.3. 创建 topic
    4. 2.4. 发送消息
    5. 2.5. 接受消息
  3. 3. java 生产者与消费者
    1. 3.1. 生产者
    2. 3.2. 消费者
  4. 4. 参考资料
|