安装
下载镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
启动镜像
docker run -d –name zookeeper -p 2181 -t wurstmeister/zookeeper
docker run -d –name kafka –publish 9092:9092 –link zookeeper –env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 –env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 –env KAFKA_ADVERTISED_PORT=9092 –volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
查看启动镜像
docker ps
测试消息发送
进入容器
docker exec -it 83156456fb03 /bin/bash
进入 kafka 默认目录
cd /opt/kafka_2.12-2.3.0/
创建 topic
bin/kafka-topics.sh –create –zookeeper zookeeper:2181 –replication-factor 1 –partitions 1 –topic test
发送消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
接受消息
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
java 生产者与消费者
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.qn.kafka;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Producer;
import java.util.Properties;
public class KafKaProducer { public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props); for(int i =0;i<100;i++){ producer.send(new ProducerRecord<>("test", "测试用例"+Integer.toString(i))); }
System.out.println("Message sent successfully"); producer.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.qn.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections; import java.util.Properties;
public class KafKaConsumer { @SuppressWarnings({ "deprecation", "resource" }) public static void main(String[] args) { Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); System.out.println("建立消费者"); consumer.subscribe(Collections.singletonList("test")); System.out.println("订阅成功"); try{ while (true) { ConsumerRecords<String,String> records = consumer.poll(100000); for(ConsumerRecord<String,String> record : records) { System.out.println("取出数据:"+ record.value()); System.out.printf("offset = %d ,key = %s, value = %s%n",record.offset(),record.key(),record.value()); } } }finally{ consumer.close(); } } }
|
参考资料
使用 docker 安装 kafka
Kafka 的使用和错误解决