Apache SeaTunnel:新一代高性能、分布式、海量数据集成工具从入门到实践
作者:mmseoamin日期:2023-12-25

关于Apache SeaTunnel

Apache SeaTunnel 原名 Waterdrop,在 2021 年 10 月更名为 SeaTunnel 并申请加入 Apache孵化器。目前 Apache SeaTunnel 已发布 40+个版本,并在大量企业生产实践中使用,包括 J.P.Morgan、字节跳动、Stey、中国移动、富士康、腾讯云、国双、中科大数据研究院、360、Shoppe、Bilibili、新浪、搜狗、唯品会等企业,广泛应用于海量异构数据集成、CDC 数据同步,SaaS 数据集成以及多源数据处理等场景中。

2021 年 12 月 9 日, Apache SeaTunnel 以全票通过的优秀表现正式成为 Apache 孵化器项目。

2023 年 5 月 17 日,Apache 董事会通过 Apache SeaTunnel 毕业决议,结束了为期 18 个月的孵化,正式确定 Apache SeaTunnel 成为 Apache 顶级项目

Apache SeaTunnel 是新一代高性能、分布式、海量数据集成工具,支持上百种数据源 ( Database/Cloud/SaaS ) 支持海量数据的实时 CDC 和批量同步,可以稳定高效地同步万亿级数据。

作为一款简单一易用、超高性能、支持实时流式和离线批量处理的数据集成平台,Apache SeaTunnel 整体的特征和优势包括:

  • 丰富且可扩展的连接器:SeaTunnel提供了一个不依赖于特定执行引擎的连接器API。基于此API开发的连接器(Source, Transform, Sink)可以在许多不同的引擎上运行,例如当前支持的SeaTunnel Engine, Flink和Spark。
  • 连接器插件:插件设计允许用户轻松开发自己的连接器并将其集成到SeaTunnel项目中。目前,SeaTunnel支持100多个连接器,而且这个数字还在飙升。下面是当前支持的连接器列表
  • 批处理流集成:基于SeaTunnel Connector API开发的连接器完美兼容离线同步、实时同步、全同步、增量同步等场景。它们大大降低了管理数据集成任务的难度。
  • 支持分布式快照算法,保证数据一致性。
  • 多引擎支持:SeaTunnel默认使用SeaTunnel引擎进行数据同步。SeaTunnel还支持使用Flink或Spark作为连接器的执行引擎,以适应企业现有的技术组件。SeaTunnel支持多个版本的Spark和Flink。
  • JDBC多路复用,数据库日志多表解析:SeaTunnel支持多表或整个数据库同步,解决了JDBC过度连接的问题;支持多表或全数据库的日志读取和解析,解决了CDC多表同步场景需要处理日志重复读取和解析的问题。
  • 高吞吐量和低延迟:SeaTunnel支持并行读写,提供稳定可靠的高吞吐量和低延迟的数据同步能力。
  • 完善的实时监控:SeaTunnel支持对数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。
  • 支持两种作业开发方法:编码和画布设计。SeaTunnel web项目https://github.com/apache/seatunnel-web提供了作业、调度、运行和监控功能的可视化管理

    SeaTunnel系统架构设计

    在这里插入图片描述

    SeaTunnel 安装部署

    SeaTunnel 引擎是 SeaTunnel 的默认引擎。SeaTunnel的安装包中已经包含了SeaTunnel Engine的全部内容

    SeaTunnel 安装包获取

    1. 可以通过直接下载编译的包进行安装

    https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz

    下载完毕之后上传到服务器上面并解压

    # 解压到了/opt/module目录下
    tar -zxvf apache-seatunnel-2.3.3-bin.tar.gz -C /opt/module
    
    1. 通过源代码方式进行编译获取安装包
    • 从https://seatunnel.apache.org/download或https://github.com/apache/seatunnel.git获取源码包
    • 使用maven命令构建安装包./mvnw -U -T 1C clean install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"checkstyle.skip"=true -D"license.skipAddThirdParty"
    • 然后就可以在 中获取安装包${Your_code_dir}/seatunnel-dist/target,例如:apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz

      配置 SEATUNNEL_HOME

      安装Connectors插件

      从2.2.0-beta开始,二进制包默认不提供connectors的依赖,因此在第一次使用它时,需要执行以下命令来安装连接器:(当然,您也可以从Apache Maven Repository[https://repo.maven.apache.org/maven2/org/apache/seatunnel/]手动下载连接器,然后手动移动到connectors/seatunnel目录)

      sh bin/install-plugin.sh 2.3.3
      

      如果需要指定connector的版本,以2.3.3版本为例,需要执行

      sh bin/install-plugin.sh 2.3.3
      

      一般情况下我们不需要所有的连接器插件,所以你可以通过配置config/plugin_config来指定你需要的插件,例如,你只需要connector-console插件,然后你可以修改plugin.properties,比如

      --seatunnel-connectors--
      connector-console
      --end--
      

      如果希望示例应用程序正常工作,则需要添加以下插件

      --seatunnel-connectors--
      connector-fake
      connector-console
      --end--
      

      你可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称

      如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到${SEATUNNEL_HOME}/connectors/seatunnel目录下即可

      启动示例作业

      定义一个作业配置文件

      我们可以直接使用官方的模版config/v2.batch.config.template, 该模版分别定义了env、source、sink

      env:表示运行的参数设置,比如并发数,作业运行模式:BATCH/STRAM、checkpoint等等

      source:表示数据源的定义

      sink:表示目标端的数据源定义

      执行命令:

      ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
      

      我是用的2.3.3,在运行后会报错,提示缺少:java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap

      报错信息:

      2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,
      2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues
      2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed
      2023-11-08 17:47:32,244 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
      	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
      	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
      	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
      Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
      	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
      	at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
      	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
      	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
      	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
      	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
      	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
      	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
      	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
      	at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
      	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
      	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
      	at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
      	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
      	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
      	at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
      	at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
      	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
      Caused by: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
      	at org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan$Builder.(CheckpointPlan.java:66)
      

      下载安装包

      下载地址[https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.2/seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar]

      将安装包放入 $SEATUNNEL_HOME/lib 下面

      再次运行:

      ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
      

      运行结果

      2023-11-08 17:55:32,575 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
      2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@774572734674894849) notify finished!
      2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@7e4b2ce6
      2023-11-08 17:55:32,627 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
      2023-11-08 17:55:32,628 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_774572734674894849_1 state from null to FINISHED
      2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}
      2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} complete with state FINISHED
      2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}, state FINISHED
      2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] turn to end state FINISHED.
      2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] end with state FINISHED
      2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
      2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} complete with state FINISHED
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} complete with state FINISHED
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}, state FINISHED
      2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}, state FINISHED
      2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] turn to end state FINISHED.
      2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] turn to end state FINISHED.
      2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] end with state FINISHED
      2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] end with state FINISHED
      2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] end with state FINISHED
      2023-11-08 17:55:33,506 INFO  org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] resource
      2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
      2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
      2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=3, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
      2023-11-08 17:55:33,510 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] turn to end state FINISHED.
      2023-11-08 17:55:33,511 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774572734674894849) end with state FINISHED
      2023-11-08 17:55:33,523 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774572734674894849) end with state FINISHED
      2023-11-08 17:55:33,546 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
      ***********************************************
                 Job Statistic Information
      ***********************************************
      Start Time                : 2023-11-08 17:55:30
      End Time                  : 2023-11-08 17:55:33
      Total Time(s)             :                   2
      Total Read Count          :                  32
      Total Write Count         :                  32
      Total Failed Count        :                   0
      ***********************************************
      2023-11-08 17:55:33,546 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
      2023-11-08 17:55:33,548 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-610439] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
      2023-11-08 17:55:33,548 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-610439] [5.1] Removed connection to endpoint: [localhost]:5801:6f7f4921-7d71-42af-b26a-6f11a7960118, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33602->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 17:55:33.543, lastWriteTime=2023-11-08 17:55:33.523, closedTime=2023-11-08 17:55:33.547, connected server version=5.1}
      2023-11-08 17:55:33,548 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
      2023-11-08 17:55:33,549 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-610439] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=38fbe3e1-876c-48e9-b145-1989888393ab, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699437330660, latest clientAttributes=lastStatisticsCollectionTime=1699437330713,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699437330653,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=15730012160,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5290000000,os.systemLoadAverage=0.86,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994659352,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2424,runtime.usedMemory=34517992, labels=[]}
      2023-11-08 17:55:33,550 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
      2023-11-08 17:55:33,550 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
      2023-11-08 17:55:33,551 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTTING_DOWN
      2023-11-08 17:55:33,553 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-610439] [5.1] Shutdown request of Member [localhost]:5801 - 6f7f4921-7d71-42af-b26a-6f11a7960118 this is handled
      2023-11-08 17:55:33,557 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down connection manager...
      2023-11-08 17:55:33,558 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down node engine...
      2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-610439] [5.1] Destroying node NodeExtension.
      2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Hazelcast Shutdown is completed in 2427 ms.
      2023-11-08 17:55:35,980 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTDOWN
      2023-11-08 17:55:35,980 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
      2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
      2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
      

      日志输入如上内容,说明配置成功

      Seatunnel示例:mysql-to-mysql

      接下来,我们在测试一下使用seatunnel实现从mysql同步到mysql的配置

      首先,需要下载mysql jdbc的的依赖,这里我们可以选择在plugin-mapping.properties文件中配置connector-jdbc ,也可以直接将connector-jdbc的jar包放入到 $SEATUNNEL_HOME//connectors/seatunnel 下面

      创建mysql库和表

      create database test_01;
      create table test_01.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);
      create database test_02;
      create table test_02.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);
      

      插入数据

      insert into test_01.user (username) values ("zhangsan");
      insert into test_01.user (username) values ("lisi");
      

      创建同步配置文件

      env {
        job.mode = "BATCH"
      }
      # 配置数据源
      source {
        jdbc {
          url = "jdbc:mysql://172.1.1.54:3306/test_01"
          driver = "com.mysql.cj.jdbc.Driver"
          user = "admin"
          password = "xxxxxxx"
          generate_sink_sql = true
          database = "test_01"
          table = "user"
          query = "select * from test_01.user"
        }
      }
      transform {
      }
      # 配置目标库
      sink {
        jdbc {
          url = "jdbc:mysql://172.1.1.54:3306/test_02"
          driver = "com.mysql.cj.jdbc.Driver"
          user = "admin"
          password = "xxxxxx"
          generate_sink_sql = true
          database = "test_02"
          table = "user"
        }
      }
      

      运行命令:

      ./bin/seatunnel.sh -e LOCAL -c ./config/mysql-to-mysql.conf
      

      输出如下信息表示同步成功:

      023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
      2023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
      2023-11-08 20:55:00,470 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774617897816293377), Pipeline: [(1/1)] turn to end state FINISHED.
      2023-11-08 20:55:00,471 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774617897816293377) end with state FINISHED
      2023-11-08 20:55:00,483 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774617897816293377) end with state FINISHED
      2023-11-08 20:55:00,511 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
      ***********************************************
                 Job Statistic Information
      ***********************************************
      Start Time                : 2023-11-08 20:54:58
      End Time                  : 2023-11-08 20:55:00
      Total Time(s)             :                   1
      Total Read Count          :                   2
      Total Write Count         :                   2
      Total Failed Count        :                   0
      ***********************************************
      2023-11-08 20:55:00,511 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
      2023-11-08 20:55:00,514 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-250845] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
      2023-11-08 20:55:00,514 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-250845] [5.1] Removed connection to endpoint: [localhost]:5801:1373b501-f19f-47a2-9ef2-66d14e5a31c0, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33254->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 20:55:00.508, lastWriteTime=2023-11-08 20:55:00.483, closedTime=2023-11-08 20:55:00.512, connected server version=5.1}
      2023-11-08 20:55:00,515 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
      2023-11-08 20:55:00,516 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-250845] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699448098390, latest clientAttributes=lastStatisticsCollectionTime=1699448098443,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699448098383,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=12655243264,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5340000000,os.systemLoadAverage=0.24,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994543624,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2526,runtime.usedMemory=34633720, labels=[]}
      2023-11-08 20:55:00,516 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
      2023-11-08 20:55:00,516 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
      2023-11-08 20:55:00,517 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTTING_DOWN
      2023-11-08 20:55:00,520 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-250845] [5.1] Shutdown request of Member [localhost]:5801 - 1373b501-f19f-47a2-9ef2-66d14e5a31c0 this is handled
      2023-11-08 20:55:00,523 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down connection manager...
      2023-11-08 20:55:00,525 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down node engine...
      2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-250845] [5.1] Destroying node NodeExtension.
      2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Hazelcast Shutdown is completed in 2858 ms.
      2023-11-08 20:55:03,378 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTDOWN
      2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
      2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
      2023-11-08 20:55:03,379 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
      

      查看同步结果

      MySQL [(none)]>
      MySQL [(none)]> select * from test_02.user;
      +--------+----------+
      | userid | username |
      +--------+----------+
      |      1 | zhangsan |
      |      2 | lisi     |
      +--------+----------+
      2 rows in set (0.00 sec)
      

      Seatunnel 集成Flink&Spark引擎

      编辑seatunnel-env.sh文件

      修改FLINK_HOME为flink部署目录

      修改Spark_HOME为spark部署目录

      FLINK_HOME = /data/flink-1.14.5/
      SPRK_HOME  = /data/spark-2.4.6/
      

      启动Flink集群

      ./bin/start_cluster.sh
      

      执行命令,还是拿mysql-to-msyq.conf 为例

      注意:如果是同步mysql的话,需要将jdbc的jar包放在flink/lib目录下,这次其实和使用flink做一些数据同步一样,相关的依赖包都给到flink。

      ./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql-to-mysql.conf
      

      执行结果如下:

      Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /mnt/apache-seatunnel-2.3.3/starter/seatunnel-flink-13-starter.jar --config ./config/mysql-to-mysql.conf --name SeaTunnel
      SLF4J: Class path contains multiple SLF4J bindings.
      SLF4J: Found binding in [jar:file:/mnt/kmr/flink1/1/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/mnt/kmr/hadoop/1/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
      SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
      Job has been submitted with JobID a643a1990705f817479b1d2880c9f038
      Program execution finished
      Job with JobID a643a1990705f817479b1d2880c9f038 has finished.
      Job Runtime: 2356 ms
      

      清空test_02.user表,使用spark导入

      MySQL [(none)]> truncate table test_02.user;
      Query OK, 0 rows affected (0.01 sec)
      

      执行导入命令

      ./bin/start-seatunnel-spark-2-connector-v2.sh \
      --master local[2] \
      --deploy-mode client \
      --config ./config/mysql-to-mysql.conf
      

      执行结果如下:

      23/11/08 21:19:57 INFO executor.FieldNamedPreparedStatement: PrepareStatement sql is:
      INSERT INTO `test_02`.`user` (`userid`, `username`) VALUES (?, ?)
      23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0, stage 0.0)
      23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Committed partition 0 (task 0, attempt 0, stage 0.0)
      23/11/08 21:19:57 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1148 bytes result sent to driver
      23/11/08 21:19:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1486 ms on localhost (executor driver) (1/1)
      23/11/08 21:19:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
      23/11/08 21:19:57 INFO scheduler.DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:117) finished in 1.568 s
      23/11/08 21:19:57 INFO scheduler.DAGScheduler: Job 0 finished: save at SinkExecuteProcessor.java:117, took 1.610054 s
      23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e is committing.
      23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e committed.
      23/11/08 21:19:57 INFO execution.SparkExecution: Spark Execution started
      23/11/08 21:19:57 INFO spark.SparkContext: Invoking stop() from shutdown hook
      23/11/08 21:19:57 INFO server.AbstractConnector: Stopped Spark@6e8a9c30{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
      23/11/08 21:19:57 INFO ui.SparkUI: Stopped Spark web UI at http://kmr-b55b8d33-gn-0a6e9139-az1-master-1-2.ksc.com:4040
      23/11/08 21:19:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      23/11/08 21:19:57 INFO memory.MemoryStore: MemoryStore cleared
      23/11/08 21:19:57 INFO storage.BlockManager: BlockManager stopped
      23/11/08 21:19:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
      23/11/08 21:19:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
      23/11/08 21:19:57 INFO spark.SparkContext: Successfully stopped SparkContext
      23/11/08 21:19:57 INFO util.ShutdownHookManager: Shutdown hook called
      23/11/08 21:19:57 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6dfc2825-0da5-4e4c-9623-bddcccab0849
      

      查询数据表,已导入成功

      MySQL [(none)]> truncate table test_02.user;
      Query OK, 0 rows affected (0.01 sec)
      MySQL [(none)]> select * from test_02.user;
      +--------+----------+
      | userid | username |
      +--------+----------+
      |      1 | zhangsan |
      |      2 | lisi     |
      +--------+----------+