下面是一个Spring Boot + Kafka 的最小可运行 Demo,用于生产和消费 Kafka 消息,适合刚开始学习 Kafka 的你参考
技术栈
Spring Boot 3.x
Spring for Apache Kafka
Kafka 3.x(建议用 Docker 本地启动 Kafka)
快速搭建 Kafka 本地环境(推荐 Docker)
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.192:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ./kafka-data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: 192.168.1.192:9092
depends_on:
- kafka
注:配置中192.168.1.192改为自己的电脑ip
启动命令:
docker-compose up -d
浏览器输入http://localhost:8080 即可访问kafka ui界面
项目结构
springboot-kafka-demo/
├── src/
│ └── main/
│ ├── java/com/example/kafka/
│ │ ├── KafkaDemoApplication.java
│ │ ├── KafkaProducerService.java
│ │ ├── KafkaConsumerService.java
│ │ └── KafkaController.java
│ └── resources/
│ ├── templates
│ │ ├── index.html
│ └── application.yml
└── pom.xml
springboot配置文件 application.yml
# Spring Boot 应用服务端口
server:
port: 8081 # 应用启动后监听的端口号
spring:
kafka:
# Kafka broker 地址(可配置多个,用逗号分隔)
bootstrap-servers: 192.168.1.192:9092
consumer:
# Kafka 消费者所属的消费组 ID
group-id: demo-group
# 设置自动偏移量重置策略
# earliest:如果没有已提交的偏移量,则从分区的最早数据开始消费
# latest:默认值,从最新的数据开始消费
auto-offset-reset: earliest
# Key 反序列化器,将接收到的 key 从字节转换为字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化器,将接收到的 value 从字节转换为字符串
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# Key 序列化器,用于将 key 转换为字节以发送到 Kafka
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化器,用于将 value 转换为字节以发送到 Kafka
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 若监听的 topic 不存在,是否抛出异常
# false:不会因为 topic 不存在而启动失败(推荐开发或测试环境使用)
missing-topics-fatal: false
# 日志配置
logging:
level:
root: info # 设置默认日志级别为 info,可改为 debug、warn、error 等
maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.iocoder.boot</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-demo</name>
<description>kafka-demo</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
启动类 KafkaDemoApplication.java
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
生产者 KafkaProducerService.java
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
消费者 KafkaConsumerService.java
package com.example.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(id="kafka-demo-id",topics = "demo-topic", groupId = "demo-group")
public void listen(String message) {
System.out.println("收到 Kafka 消息: " + message);
}
}
发送消息控制类 KafkaController.java
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;
@Controller
public class KafkaController {
@Autowired
private KafkaProducerService producerService;
@GetMapping("/")
public String index() {
return "index";
}
@PostMapping("/send")
public String sendMessage(@RequestParam String topic,
@RequestParam String message,
Model model) {
producerService.send(topic, message);
model.addAttribute("msg", "消息已发送!");
return "index";
}
}
发送消息控制类 index.html
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>Kafka Web Demo</title>
</head>
<body>
<h2>Kafka 消息发送</h2>
<form action="/send" method="post">
<label>Topic:</label><br/>
<input type="text" name="topic" value="demo-topic"/><br/><br/>
<label>Message:</label><br/>
<textarea name="message" rows="4" cols="50"></textarea><br/><br/>
<input type="submit" value="发送"/>
</form>
<p th:if="${msg}" th:text="${msg}"></p>
</body>
</html>
测试
项目启动成功后,浏览器访问http://localhost:8081,发送消息,在控制台可以查看到接收到的消息