Springboot+kafka Demo


下面是一个Spring Boot + Kafka 的最小可运行 Demo,用于生产和消费 Kafka 消息,适合刚开始学习 Kafka 的你参考

技术栈

Spring Boot 3.x
Spring for Apache Kafka
Kafka 3.x(建议用 Docker 本地启动 Kafka)

快速搭建 Kafka 本地环境(推荐 Docker)

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.192:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ./kafka-data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: 192.168.1.192:9092
    depends_on:
      - kafka

注:配置中192.168.1.192改为自己的电脑ip
启动命令:

docker-compose up -d

浏览器输入http://localhost:8080 即可访问kafka ui界面

项目结构

springboot-kafka-demo/
├── src/
│   └── main/
│       ├── java/com/example/kafka/
│       │   ├── KafkaDemoApplication.java
│       │   ├── KafkaProducerService.java
│       │   ├── KafkaConsumerService.java
│       │   └── KafkaController.java
│       └── resources/
│           ├── templates
│           │   ├── index.html
│           └── application.yml
└── pom.xml

springboot配置文件 application.yml

# Spring Boot 应用服务端口
server:
  port: 8081  # 应用启动后监听的端口号

spring:
  kafka:
    # Kafka broker 地址(可配置多个,用逗号分隔)
    bootstrap-servers: 192.168.1.192:9092

    consumer:
      # Kafka 消费者所属的消费组 ID
      group-id: demo-group

      # 设置自动偏移量重置策略
      # earliest:如果没有已提交的偏移量,则从分区的最早数据开始消费
      # latest:默认值,从最新的数据开始消费
      auto-offset-reset: earliest

      # Key 反序列化器,将接收到的 key 从字节转换为字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # Value 反序列化器,将接收到的 value 从字节转换为字符串
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    producer:
      # Key 序列化器,用于将 key 转换为字节以发送到 Kafka
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      # Value 序列化器,用于将 value 转换为字节以发送到 Kafka
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    listener:
      # 若监听的 topic 不存在,是否抛出异常
      # false:不会因为 topic 不存在而启动失败(推荐开发或测试环境使用)
      missing-topics-fatal: false

# 日志配置
logging:
  level:
    root: info  # 设置默认日志级别为 info,可改为 debug、warn、error 等

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.iocoder.boot</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-demo</name>
    <description>kafka-demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

启动类 KafkaDemoApplication.java

package com.example.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }

}

生产者 KafkaProducerService.java

package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

消费者 KafkaConsumerService.java

package com.example.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(id="kafka-demo-id",topics = "demo-topic", groupId = "demo-group")
    public void listen(String message) {
        System.out.println("收到 Kafka 消息: " + message);
    }
}

发送消息控制类 KafkaController.java

package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;

@Controller
public class KafkaController {

    @Autowired
    private KafkaProducerService producerService;

    @GetMapping("/")
    public String index() {
        return "index";
    }

    @PostMapping("/send")
    public String sendMessage(@RequestParam String topic,
                              @RequestParam String message,
                              Model model) {
        producerService.send(topic, message);
        model.addAttribute("msg", "消息已发送!");
        return "index";
    }
}

发送消息控制类 index.html

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Kafka Web Demo</title>
</head>
<body>
<h2>Kafka 消息发送</h2>
<form action="/send" method="post">
    <label>Topic:</label><br/>
    <input type="text" name="topic" value="demo-topic"/><br/><br/>
    <label>Message:</label><br/>
    <textarea name="message" rows="4" cols="50"></textarea><br/><br/>
    <input type="submit" value="发送"/>
</form>

<p th:if="${msg}" th:text="${msg}"></p>
</body>
</html>

测试

项目启动成功后,浏览器访问http://localhost:8081,发送消息,在控制台可以查看到接收到的消息


文章作者: 一剑潇
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 一剑潇 !
  目录
{% if theme.mermaid.enable %} {% endif %}