Springboot——kafka(二十五)

使用 idea 创建一个 maven 项目

kafka 安装

详见

java

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.qn</groupId>
<artifactId>springboot</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>

<!--参数-->
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
</dependencies>

</project>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 8089

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: myGroup
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

消息体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.qn.kafka;

import java.util.Date;

public class Message {
private long id;
private String msg;
private Date sendTime;

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public Date getSendTime() {
return sendTime;
}

public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.qn.kafka;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

private Gson gson = new GsonBuilder().create();

public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send("test", gson.toJson(message));
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.qn.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("接收信息"+message);
}
}
}

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.qn;

import com.qn.kafka.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
sender.send();
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

测试

直接运行启动类:
springboot实战之kafka_2019-12-05-14-54-28.png

参考资料

Spring Boot 集成 Kafka

文章目录
  1. 1. 使用 idea 创建一个 maven 项目
  2. 2. kafka 安装
  3. 3. java
    1. 3.1. 依赖
    2. 3.2. 配置
    3. 3.3. 消息体
    4. 3.4. 生产者
    5. 3.5. 消费者
    6. 3.6. 启动类
  4. 4. 测试
  5. 5. 参考资料
|