相关推荐recommended
springboot集成Flink-CDC
作者:mmseoamin日期:2023-12-21

文章目录

  • 一、Flink&Flink CDC官网
  • 二、CDC&Flink CDC介绍
    • 1、 什么是cdc
    • 2、什么是Flink CDC
    • 3、支持的连接器
    • 三、springboot整合Filnk CDC
      • 1、官网示例
      • 2、Maven依赖
        • 1) Flink和Flink CDC版本映射
        • 2)具体maven依赖
        • 3)项目坑点
        • 3、springboot代码示例
          • 1)创建变更监听器
          • 2)自定义数据解析器
          • 3)创建变更对象
          • 4)创建业务处理类
          • 5)运行代码监听mysql CDC事件

            一、Flink&Flink CDC官网

            Flink CDC地址

            Flink官网地址

            二、CDC&Flink CDC介绍

            1、 什么是cdc

            CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

            2、什么是Flink CDC

            用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®

            springboot集成Flink-CDC,在这里插入图片描述,第1张

            3、支持的连接器

            springboot集成Flink-CDC,在这里插入图片描述,第2张

            三、springboot整合Filnk CDC

            1、官网示例

            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
            import com.ververica.cdc.connectors.mysql.source.MySqlSource;
            public class MySqlBinlogSourceExample {
              public static void main(String[] args) throws Exception {
                MySqlSource mySqlSource = MySqlSource.builder()
                        .hostname("yourHostname")
                        .port(yourPort)
                        .databaseList("yourDatabaseName") // set captured database
                        .tableList("yourDatabaseName.yourTableName") // set captured table
                        .username("yourUsername")
                        .password("yourPassword")
                        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                        .build();
                
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
                // enable checkpoint
                env.enableCheckpointing(3000);
                
                env
                  .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                  // set 4 parallel source tasks
                  .setParallelism(4)
                  .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
                
                env.execute("Print MySQL Snapshot + Binlog");
              }
            }
            

            2、Maven依赖

            1) Flink和Flink CDC版本映射

            springboot集成Flink-CDC,在这里插入图片描述,第3张

            2)具体maven依赖

            
                    1.8
                    UTF-8
                    UTF-8
                    2.3.12.RELEASE
                    2.17.0
                    3.4.1
                    4.1.0
                    1.16.0
                
                
                    
                        org.springframework.boot
                        spring-boot-starter-web
                        
                            
                                org.springframework.boot
                                spring-boot-starter-logging
                            
                        
                    
                    
                    
                        org.apache.logging.log4j
                        log4j-core
                        ${log4j2.version}
                    
                    
                        org.apache.logging.log4j
                        log4j-api
                        ${log4j2.version}
                    
                    
                        org.apache.logging.log4j
                        log4j-slf4j-impl
                        ${log4j2.version}
                    
                    
                        org.apache.logging.log4j
                        log4j-1.2-api
                        ${log4j2.version}
                    
                    
                    
                        org.springframework.boot
                        spring-boot-starter-log4j2
                    
                    
                    
                        com.github.pagehelper
                        pagehelper-spring-boot-starter
                        1.4.1
                    
                    
                    
                        mysql
                        mysql-connector-java
                        
                        8.0.29
                    
                    
                    
                        com.alibaba
                        druid-spring-boot-starter
                        1.2.4
                    
                    
                    
                        com.alibaba
                        easyexcel
                        3.1.1
                    
                    
                    
                        org.apache.shardingsphere
                        sharding-jdbc-spring-boot-starter
                        
                        4.1.1
                    
                    
                    
                        com.baomidou
                        mybatis-plus-boot-starter
                        ${mybatis-plus.version}
                    
                    
                        com.baomidou
                        mybatis-plus-generator
                        ${mybatis-plus.version}
                    
                    
                        org.apache.velocity
                        velocity-engine-core
                        2.3
                    
                    
                    
                        com.github.pagehelper
                        pagehelper-spring-boot-starter
                        1.4.1
                    
                    
                    
                        com.baomidou
                        dynamic-datasource-spring-boot-starter
                        3.6.1
                    
                    
                    
                        org.springframework.boot
                        spring-boot-starter-validation
                    
                    
                    
                    
                        org.apache.httpcomponents
                        httpclient
                        4.5.14
                    
                    
                    
                        com.alibaba
                        fastjson
                        1.2.76
                    
                    
                    
                        com.aliyun.oss
                        aliyun-sdk-oss
                        3.15.0
                    
                    
                    
                        com.xuxueli
                        xxl-job-core
                        2.4.0
                    
                    
                   
                    
                    
                        org.apache.flink
                        flink-streaming-java
                        ${flink.version}
                        
                        
                    
                    
                    
                        org.apache.flink
                        flink-clients
                        ${flink.version}
                    
                    
                    
                        com.ververica
                        flink-connector-mysql-cdc
                        2.3.0
                         
                        
                            
                                mysql
                                mysql-connector-java
                            
                        
                       
                    
                    
                    
                    
                    
                        org.apache.flink
                        flink-table-api-java
                        1.12.1
                    
            

            3)项目坑点

            • mysql jdbc驱动包版本过低,项目启动时导致 java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;,找不到该方法,必须引入8.0.28及以上版本
            • flink-connector-mysql-cdc 的2.3.0版本依赖中引入的是8.0.25,需要去除掉
              
                      
                          mysql
                          mysql-connector-java
                          
                          8.0.29
                      
              		
                          com.ververica
                          flink-connector-mysql-cdc
                          2.3.0
                           
                          
                              
                                  mysql
                                  mysql-connector-java
                              
                          
              
              • 项目启动报错:java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException,找不到此类,是因为缺少依赖包,引入相关依赖包
                		
                            org.apache.flink
                            flink-table-api-java
                            1.12.1
                        
                
                • Flink的版本和Flink CDC的版本一定要兼容,按照官方给定的版本进行引入

                  springboot集成Flink-CDC,在这里插入图片描述,第4张

                  版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,

                  要解决此问题,你可以按照以下步骤操作:

                  确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:

                  GRANT RELOAD ON *.* TO 'your_user'@'localhost';
                  

                  Flink官方解决上述问题

                  MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。

                  从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。

                  在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。

                  springboot集成Flink-CDC,在这里插入图片描述,第5张

                  代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例

                  Configuration config = new Configuration();
                  // 设置增量快照开启为 true
                  config.setBoolean("scan.incremental.snapshot.enabled", true);
                  env.configure(config);
                  

                  3、springboot代码示例

                  1)创建变更监听器

                  创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听

                  import com.ververica.cdc.connectors.mysql.source.MySqlSource;
                  import com.ververica.cdc.connectors.mysql.table.StartupOptions;
                  import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
                  import org.apache.flink.api.common.eventtime.WatermarkStrategy;
                  import org.apache.flink.configuration.Configuration;
                  import org.apache.flink.streaming.api.datastream.DataStreamSource;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                  import org.springframework.boot.ApplicationArguments;
                  import org.springframework.boot.ApplicationRunner;
                  /**
                   * @Description: mysql变更监听器
                   * @Date: 2023/10/11
                   **/
                  public class MysqlEventListener implements ApplicationRunner {
                      @Override
                      public void run(ApplicationArguments args) throws Exception {
                          MySqlSource mySqlSource = MySqlSource.builder()
                                  .hostname("yourHostname")
                                  .port(3306)
                                  .databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
                                  .tableList("yourDatabaseName.yourTableName") // 设置捕获的表,数据库.表名
                                  .username("yourUsername")
                                  .password("yourPassword")
                                  .deserializer(new MysqlDeserialization()) // 将 SourceRecord 转换为 自定义对象
                                  /**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                                   * latest:只进行增量导入(不读取历史变化)
                                   * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
                                   */
                                  .startupOptions(StartupOptions.latest())
                                  .build();
                          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          Configuration config = new Configuration();
                          // 设置增量快照开启为 true
                          config.setBoolean("scan.incremental.snapshot.enabled", true);
                          env.configure(config);
                          env.setParallelism(1);
                          //        DebeziumSourceFunction dataChangeInfoMySqlSource = buildDataChangeSource();
                          DataStreamSource streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                                  .setParallelism(1);
                          streamSource.addSink(new DataChangeSink());
                          env.execute("mysql-stream-cdc");
                      };
                  }
                  

                  2)自定义数据解析器

                  import com.alibaba.fastjson.JSONObject;
                  import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
                  import io.debezium.data.Envelope;
                  import org.apache.flink.api.common.typeinfo.TypeInformation;
                  import org.apache.flink.util.Collector;
                  import org.apache.kafka.connect.data.Field;
                  import org.apache.kafka.connect.data.Schema;
                  import org.apache.kafka.connect.data.Struct;
                  import org.apache.kafka.connect.source.SourceRecord;
                  import java.util.List;
                  import java.util.Optional;
                  /**
                   * @Description: mysql自定序列化
                   * @Date: 2023/10/11
                   **/
                  public class MysqlDeserialization implements DebeziumDeserializationSchema {
                      public static final String TS_MS = "ts_ms";
                      public static final String BIN_FILE = "file";
                      public static final String POS = "pos";
                      public static final String CREATE = "CREATE";
                      public static final String BEFORE = "before";
                      public static final String AFTER = "after";
                      public static final String SOURCE = "source";
                      public static final String UPDATE = "UPDATE";
                      /**
                       *  反序列化数据,转变为自定义对象DataChangeInfo
                       * @param sourceRecord
                       * @param collector
                       * @throws Exception
                       */
                      @Override
                      public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
                          String topic = sourceRecord.topic();
                          String[] fields = topic.split("\\.");
                          String database = fields[1];
                          String tableName = fields[2];
                          Struct struct = (Struct) sourceRecord.value();
                          final Struct source = struct.getStruct(SOURCE);
                          DataChangeInfo dataChangeInfo = new DataChangeInfo();
                          dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());
                          dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
                          //5.获取操作类型  CREATE UPDATE DELETE
                          Envelope.Operation operation = Envelope.operationFor(sourceRecord);
                          String type = operation.toString().toUpperCase();
                          int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
                          dataChangeInfo.setEventType(eventType);
                          dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
                          dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
                          dataChangeInfo.setDatabase(database);
                          dataChangeInfo.setTableName(tableName);
                          dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
                          //7.输出数据
                          collector.collect(dataChangeInfo);
                      }
                      /**
                       * 从元数据获取变更前或者变更后的数据
                       * @param value
                       * @param fieldElement
                       * @return
                       */
                      private JSONObject getJsonObject(Struct value, String fieldElement) {
                          Struct element = value.getStruct(fieldElement);
                          JSONObject jsonObject = new JSONObject();
                          if (element != null) {
                              Schema afterSchema = element.schema();
                              List fieldList = afterSchema.fields();
                              for (Field field : fieldList) {
                                  Object afterValue = element.get(field);
                                  jsonObject.put(field.name(), afterValue);
                              }
                          }
                          return jsonObject;
                      }
                      @Override
                      public TypeInformation getProducedType() {
                          return TypeInformation.of(DataChangeInfo.class);
                      }
                  }
                  

                  3)创建变更对象

                  import lombok.Data;
                  /**
                   * @Description: 数据变更对象
                   * @Date: 2023/10/11
                   **/
                  @Data
                  public class DataChangeInfo {
                      /**
                       * 变更前数据
                       */
                      private String beforeData;
                      /**
                       * 变更后数据
                       */
                      private String afterData;
                      /**
                       * 变更类型 1新增 2修改 3删除
                       */
                      private Integer eventType;
                      /**
                       * binlog文件名
                       */
                      private String fileName;
                      /**
                       * binlog当前读取点位
                       */
                      private Integer filePos;
                      /**
                       * 数据库名
                       */
                      private String database;
                      /**
                       * 表名
                       */
                      private String tableName;
                      /**
                       * 变更时间
                       */
                      private Long changeTime;
                  }
                  

                  4)创建业务处理类

                  import lombok.extern.slf4j.Slf4j;
                  import org.apache.flink.streaming.api.functions.sink.SinkFunction;
                  /**
                   * @Description: 数据处理
                   * @Date: 2023/10/11
                   **/
                  @Slf4j
                  public class DataChangeSink implements SinkFunction {
                      @Override
                      public void invoke(String value, Context context) throws Exception {
                          log.info("收到变更原始数据:{}", value);
                          //业务代码
                      }
                  }
                  

                  5)运行代码监听mysql CDC事件

                  项目启动成功

                  springboot集成Flink-CDC,在这里插入图片描述,第6张

                  • 修改mysql数据库数据

                    变更前

                    springboot集成Flink-CDC,在这里插入图片描述,第7张

                    变更后:点击保存

                    springboot集成Flink-CDC,在这里插入图片描述,第8张

                    服务监听结果

                    springboot集成Flink-CDC,在这里插入图片描述,第9张