【Spring Boot】集成Kafka实现消息发送和订阅
作者:mmseoamin日期:2023-12-05

文章目录

  • 一,新建Spring Boot
    • 1,Maven配置
    • 2,无法识别为SpringBoot项目
    • 3,无效的源发行版
    • 4,无法访问SpringApplication
    • 5,运行直接Finish
    • 6,服务运行成功
    • 二,安装启动Kafka
      • 1,下载
      • 2,配置
      • 3,启动
      • 4,其他命令
      • 三,生产消费消息
        • 1,加入依赖
        • 2,yam配置文件
        • 3,报错enabled mechanisms are []
        • 4,生产者生产消息
        • 5,订阅和消费消息
        • 6,接口
        • 7,测试结果
        • 四,参考博文

          一,新建Spring Boot

          最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第1张

          注意Type选Maven,java选8,其他默认

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第2张

          1,Maven配置

          点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第3张

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第4张

          2,无法识别为SpringBoot项目

          在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第5张

          把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第6张

          3,无效的源发行版

          接下来运行项目报错:java: 无效的源发行版: 14

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第7张

          修改pom.xml中java.version值为8,原来是17

          	
                  17
              
          

          4,无法访问SpringApplication

          继续运行,继续报错【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第8张

          降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

          5,运行直接Finish

          继续运行,没报错,服务直接Finished

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第9张

          需要添加web依赖

           		
                      org.springframework.boot
                      spring-boot-starter-web
                  
          

          6,服务运行成功

          终于,一个空的spring boot项目成功跑起来了,喜极而泣

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第10张

          二,安装启动Kafka

          1,下载

          官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第11张

          2,配置

          解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1

          修改配置文件

          (1) server.properties

          broker.id=1
          log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka
          

          (2) zookeeper.properties

          dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper
          

          3,启动

          先启动zookeeper

          bin\windows\zookeeper-server-start.bat config\zookeeper.properties	
          

          再启动kafka

          bin\windows\kafka-server-start.bat config\server.properties
          

          停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

          4,其他命令

          1,查看topic列表

          bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
          

          2,查看topic具体信息

          bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test
          

          3,创建topic

          bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
          

          三,生产消费消息

          1,加入依赖

           		
                      org.springframework.kafka
                      spring-kafka
                  
          

          2,yam配置文件

          application.yaml

          spring:
            profiles:
              active: dev
          

          application-dev.yaml

          server:
            port: 8082
            servlet:
              context-path: /test-kafka
          spring:
            cache:
              type: ehcache
              config: classpath:ehcache.xml
            jpa:
              database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
            kafka:
              bootstrap-servers: 127.0.0.1:9092
              consumer:
                group-id: kafka-demo-kafka-group
                key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
                value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
              producer:
                key-serializer: org.apache.kafka.common.serialization.StringSerializer 
                value-serializer: org.apache.kafka.common.serialization.StringSerializer 
                retries: 10
          

          3,报错enabled mechanisms are []

          Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第12张

          这个错误我本地测试下来是因为没把账号密码配置这块注释掉

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第13张

          4,生产者生产消息

          @Slf4j
          @Component
          public class KafkaProducer {
              @Autowired
              private KafkaTemplate kafkaTemplate;
              public String sendMessage(String content) {
                  String topic = "test_topic";
                  kafkaTemplate.send(topic, content).addCallback(success -> {
                      String topic = success.getRecordMetadata().topic();
                      int partition = success.getRecordMetadata().partition();
                      long offset = success.getRecordMetadata().offset();
                      log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);
                  }, failure -> {
                      log.info("发送失败:{}",failure.getMessage());
                  });
                  return "发送成功";
              }
          }
          

          5,订阅和消费消息

          一,订阅主题

          1,获取消费者

          import org.apache.kafka.clients.consumer.ConsumerConfig;
          import org.apache.kafka.clients.consumer.KafkaConsumer;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.stereotype.Component;
          import java.util.Properties;
          /**
           * kafka消费者配置
           * @author liuxunming
           */
          @Configuration
          @Component
          public class KafkaConfig {
              @Value("${spring.kafka.bootstrap-servers}")
              private String bootstrapServers;
              @Value("${spring.kafka.consumer.group-id}")
              private String groupId;
              @Value("${spring.kafka.consumer.key-deserializer}")
              private String keyDeserializer;
              @Value("${spring.kafka.consumer.value-deserializer}")
              private String valueDeserializer;
              public KafkaConsumer createConsumer() {
                  Properties props = new Properties();
                  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
                  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
                  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                  KafkaConsumer consumer = new KafkaConsumer<>(props);
                  return consumer;
              }
          }
          

          2,订阅topic

           		KafkaConsumer consumer = kafkaConfig.createConsumer();
                  consumer.subscribe(Collections.singleton("traffic"));
          

          3,拉取消息

           ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
           for (ConsumerRecord record : records) {
          		String key = record.key();
          		String value = record.value();
          		log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
          }
          

          4,消费位移,释放资源

          // 提交消费位移
          consumer.commitSync();
          // 关闭消费者以释放资源
          consumer.close();
          

          二,点对点模式

          @Slf4j
          @Component
          public class KafkaConsumer {
              @KafkaListener(topics = {"test_topic"})
              public void handlerMsg(String content) {
                  log.info("接收到消息:消息值:{} ",content);
              }
          }
          

          6,接口

          @Slf4j
          @RestController
          public class KafkaController {
              @Autowired
              private KafkaProducer kafkaProducer;
              @PostMapping("/sendMessage")
              public String sendMessage(@RequestParam String content) {
                  kafkaProducer.sendMessage(content);
                  return "ok";
              }
          }
          

          7,测试结果

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第14张

          接收到消息

          【Spring Boot】集成Kafka实现消息发送和订阅,在这里插入图片描述,第15张

          四,参考博文

          1. 解决IDEA无法识别SpringBoot项目
          2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
          3. Kafka的下载安装以及使用
          4. Kafka消息消费流程详解
          5. Kafka之Consumer使用与基本原理