1. 实时同步任务的创建

实时同步一个 Group 只能配置一个数据流, manager 配置数据流审批之后,直接提交任务到 flink 集群。

1.1 实时同步任务创建

1.1.1 配置基本信息

填写 groupId,责任人,并且选择是否是整库迁移,然后保存 http://ip/inlong/manager/api/group/save POST,body

 {
 "inlongGroupId": "data_sync_test",
  "inCharges": "admin",
  "sinkMultipleEnable": false,
  "mqType": "NONE",
  "inlongGroupMode": 1
}
// org.apache.inlong.manager.web.controller.InlongGroupController
1.1.2 配置数据流

创建一个和 groupId 相同的 streamId,保存 stream 信息 http://ip/inlong/manager/api/stream/save POST, body

{
 "inlongGroupId": "data_sync_test",
  "inlongStreamId": "data_sync_test",
 "sinkMultipleEnable": false
}
// org.apache.inlong.manager.web.controller.InlongStreamController

配置 source 基本信息, http://ip/inlong/manager/api/source/save POST

{
 "inlongGroupId": "data_sync_test",
  "inlongStreamId": "data_sync_test",
 "sourceType": "PULSAR",
  "sourceName": "ds_test",
 "pulsarTenant": "ds_tenant",
  "namespace": "ds_namespace",
  "adminUrl": "http://pulsar:8080",
  "serviceUrl": "pulsar://pulsar:6650",
  "topic": "ds_topic",
  "serializationType": "CSV",
  "dataEncoding": "UTF-8",
  "wrapType": "RAW"
}
// org.apache.inlong.manager.web.controller.StreamSourceController

配置 source 字段 http://ip/inlong/manager/api/stream/update POST

{
  "fieldList": [
    {
     "fieldName": "id",
      "fieldType": "int",
     "fieldComment": "id"
    }
 ],
  "inlongGroupId": "data_sync_test",
  "inlongStreamId": "data_sync_test",
  "version": 3
}

配置 sink,http://ip/inlong/manager/api/sink/save POST

{
  "inlongGroupId": "data_sync_test",
  "inlongStreamId": "data_sync_test",
  "sinkType": "MYSQL",
  "sinkName": "ds_sink",
 "databaseName": "ds_test_db",
  "tableName": "ds_test_table",
 "primaryKey": "id",
  "dataNodeName": "5c98718d-a51b-41c2-9122-a54b43fb3fe4",
 "sinkMultipleFormat": "canal",
  "sinkMultipleEnable": true,
  "enableCreateResource": 0,
  "properties": {}
}

配置 sink 字段 http://ip/inlong/manager/api/sink/update POST

{
  "inlongGroupId": "data_sync_test",
  "inlongStreamId": "data_sync_test",
  "id": 91,
  "sinkType": "MYSQL",
  "version": 1,
  "sinkFieldList": [
   {
      "sourceFieldName": "id",
     "sourceFieldType": "int",
      "fieldName": "id",
     "fieldType": "tinyint"
    }
  ]
}
1.1.3 提交任务

配置完成之后,将同步任务提交审批, http://ip/inlong/manager/api/group/startProcess/data_sync_test POST, manager 开始处理任务(一个任务对应于一个 InLong Group)信息。

首先,将 group 的状态置为 TO_BE_APPROVAL;

提交审批对应于一个 ApplyGroupProcess 的表单,然后创建一个 ApplyGroupProcess 对应的 WorkflowDefinition。

  • ApplyGroupProcess
    • GroupInfo
    • Stream list
  • WorkflowDefinition
    • Element 处理链:startEvent -> UserTask -> endEvent
      • startEvent:
      • UserTask: 对应于管理员的审批,当 Group 审批通过时回调处理 UserTask
        • formCLass:InLongGroupApproveForm,当审批通过时,会有 InLongGroupApproveForm 的表单生成
        • approvers:审批人列表
        • nameToListenerMap: <ListenerName, Listener>
          • afterApprovedTaskListener : AfterApprovedTaskListener
        • listerners: <ProcessEvent., Listener>
          • TaskEvent.APPROVE: AfterApprovedTaskListener
      • EndEvetn:
    • nameToListenerMap: <ListenerName, Listener>
      • cancelApplyProcessListener : CancelApplyProcessListener
      • rejectApplyProcessListener : RejectApplyProcessListener
      • approveApplyProcessListener : ApproveApplyProcessListener
    • listerners: <ProcessEvent., Listener>
      • ProcessEvent.CANCEL: CancelApplyProcessListener
      • ProcessEvent.REJECT: RejectApplyProcessListener
      • ProcessEvent.COMPLETE: ApproveApplyProcessListener

对 workFlowProcess 进行处理,处理通过 ELementProcessor 来完成,ElementProcessor 的实现逻辑和 Element 实现对应,每个 ElementProcessor watch 对应的 Element 类型。
处理 StartEvent,通过 StartEventProcessor 完成:

  • 保存 workfowProcess 信息到 DB,WorkflowProcessEntity:name(APPLY_GROUP_PROCESS)、displayName(Apply Group)、type(Apply Group)、groupId: 、Status:(PROCESSING)、fromData(ApplyGroupProcessForm form 的 json 格式)
  • WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START

处理 UserTask,WorkflowContext 设置当前 Element 为 UserTask,通过 UserTaskProcessor.create 继续执行后续逻辑:

  • 获取审批人信息,aprovers,未审批人生成 WorkflowTaskEntity
  • 生成 workflowTask 信息并保存到 DB,WorkflowTaskEntity:type(UserTask.class.getSimpleName)、processId(WorkflowProcessEntity.getId)、processName(APPLY_GROUP_PROCESS)、displayName(Apply Group)、name( ut_admin)、displayName(SystemAdmin)、approvers(approves)、formclass(InlongGroupApproveForm.class)、status(PENDING)
  • 更新 Workflowcontext 的 actionContext: task 设置为 UserTask、action 设置为 COMPLETE、taskEntity 设置为 WorkflowTaskEntity

提交任务审批的流程到此结束,后续的处理依赖于审批的审批情况,接下来看审批的流程。

1.2 实时同步任务审批http://ip/inlong/manager/api/workflow/approve/959 POST

{
  "remark": null,
  "form": {
    "groupApproveInfo": {
      "remark": null,
      "inlongGroupId": "data_sync_test",
      "inCharges": "admin",
      "sinkMultipleEnable": false,
      "description": "",
      "mqType": "NONE"
    },
    "formName": "InlongGroupApproveForm"
  }
}
// o.a.i.manager.web.controller.WorkflowController

先构造 WorflowContext:

  • ApplyGroupProcess

    • GroupInfo
    • Stream list
  • WorkflowDefinition

    • Element 处理链:startEvent -> UserTask -> endEvent
      • startEvent:
      • UserTask: 对应于管理员的审批,当 Group 审批通过时回调处理 UserTask
        • formCLass:InLongGroupApproveForm,当审批通过时,会有 InLongGroupApproveForm 的表单生成
        • approvers:审批人列表
        • nameToListenerMap: <ListenerName, Listener>
          • afterApprovedTaskListener : AfterApprovedTaskListener
        • listerners: <ProcessEvent., Listener>
          • TaskEvent.APPROVE: AfterApprovedTaskListener
      • EndEvetn:
    • nameToListenerMap: <ListenerName, Listener>
      • cancelApplyProcessListener : CancelApplyProcessListener
      • rejectApplyProcessListener : RejectApplyProcessListener
      • approveApplyProcessListener : ApproveApplyProcessListener
    • listerners: <ProcessEvent., Listener>
      • ProcessEvent.CANCEL: CancelApplyProcessListener
      • ProcessEvent.REJECT: RejectApplyProcessListener
      • ProcessEvent.COMPLETE: ApproveApplyProcessListener
  • processEntity:workflowProcessEntity

  • task:UserTask

  • ActionContext

    • action:approve
    • taskEntity:WorkflowTaskEntity
    • Task: userTask
    • taskForm:InlongGroupApproveForm
    • transfertoUsers:null
    • remark: null

WorflowContext 构建完成之后,开始处理 UserTask,修改 workflowContext 的 currentElement 为 UserTask,然后使用 UserTaskProcessor 处理

  • 检查 operator 支持的 action 是否匹配
  • 检查 taskEntity 状态是否为 pending
  • 检查审批人
  • 更新 WorkflowTaskEntity,并更新到 DB: status(TaskStatus.APPROVED)、operator、remark(null)、taskForm(InlongGroupApproveForm,包含 approveRequest 和 streamList 信息)
  • 回调 UserTaskProcessor 的 listener 来处理 (AfterApprovedTaskListener.listen)
    • 修改 Group状态为 APPROVE_PASSED
    • 将 Group 中 每个 stream 状态修改为 CONFIG_ING
    • 将 stream 中每个 sink 状态 CONFIG_ING

至此,UserTask(主要处理管理员审批)处理完毕;

接下来处理 WorkFlowProcess 中的最后一个 Element, endEvent

  • 修改 WorkflowProcessEntity 状态为 ProcessStatus.COMPLETED 并保存到 DB
  • 至此 Group 的 操作基本都已经完成,开始处理 stream,回调 workflowProcess 的 COMPLETE listener,即 ApproveApplyProcessListener 处理,生成新的表单 GroupResourceProcessForm(CREATE_GROUP_RESOURCE),然后开始后续的处理,GroupResourceProcessForm 包含:
    • InlongGroupInfo
    • StreamInfo list

接下来开始处理 GroupResourceProcessForm 的表单。

1.3 GroupResourceProcessForm 处理

同样地,GroupResourceProcessForm 表单的处理也需要先生成 WorkFlowProgress,然后和 ProcessForm 一起构造出 WorkflowContext:

  • GroupResourceProcessForm
    • InlongGroupInfo
    • StreamInfo list
    • INIT
  • workflowProcess:由 CreateGroupWorkflowDefinition 定义
    • Element 处理链:startEvent -> ServiceTask(Init MQ) -> ServiceTask(Init sort) -> EndEvent
      • startEvent
      • ServiceTask(Init MQ)
        • name(InitMQ)、displatyName(Group-InitMQ)
        • serviceTaskType: ServiceTaskType.INIT_MQ
        • listenerFactor: GroupTaskListenerFactory
      • ServiceTask(Init Sort):
        • name(InitSort)、displatyName(Group-InitSort)
        • serviceTaskType: ServiceTaskType.INIT_SORT
        • listenerFactor: GroupTaskListenerFactory
      • EndEvent
    • nameToListenerMap: <ListenerName, Listener>
      • InitGroupListener: InitGroupListener
      • initGroupCompleteListener: InitGroupCompleteListener
      • initGroupFailedListener : InitGroupFailedListener
    • listeners: <ProcessEvent., Listener>
      • ProcessEvent.CREATE: InitGroupListener
      • ProcessEvent.COMPLETE: InitGroupCompleteListener
      • ProcessEvent.FAIL: InitGroupFailedListener

处理 StartEvent(StartEventProcessor.create),WorkflowContext 设置 currentElement 为 startEvent,

  • 构建 workfowProcess 信息并保存到 DB,WorkflowProcessEntity:name(CREATE_GROUP_RESOURCE)、displayName(Create Group)、type(Create Group)、groupId、Status(PROCESSING)、fromData(GroupResourceProcessForm form 的 json 格式)

WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START

  • 回调 InitGroupListener(CREATE),将 Group 状态设置为 CONFIG_ING

处理 InitMQTask

  • WorkflowContext 设置 currentElement 为 initMQTask

  • 构建 WorkflowTaskEntity(processId+serviceName) 并且保存到 DB:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_GROUP_RESOURCE)、processDisplayName(Create Group)、name( InitMQ)、dispalyName(Group-InitMQ)、Approvers、status: PENDING)

  • 修改 workflowContext 的 actionContext:Task(InitMQTask)、Action( Complete)、taskEntity ( WorkflowTaskEntity)

  • 为 InitMQTask 初始化 listener:

    • nameToListenerMap:<ListenerName, Listener>
      • queueResourceListener:QueueResourceListener
    • listeners:<ProcessEvent., Listener>
      • TaskEvent.COMPLETE :QueueResourceListener
  • 回调 listener,TaskEvent.COMPLETE 事件,即回调 QueueResourceListener

    • 如果是数据同步类型的 stream,为每个 Stream 创建 StreamResourceProcessForm 表单
      • groupInfo
      • streamInfo list
      • INIT
    • 然后处理 StreamResourceProcessForm,StreamResourceProcessForm 的处理参考 1.4 章节。

处理 InitSortTask, WorkflowContext 设置 currentElement 为 InitSortTask

  • WorkflowContext 设置 currentElement 为 InitSortTask

  • 构建 WorkflowTaskEntity 并且保存: type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_GROUP_RESOURCE)、processDisplayName(Create Group)、name( InitSort)、dispalyName(Group-InitSort)、Approvers、status: PENDING)

  • 修改 workflowContext 的 actionContext:Task(InitSortTask)、Action( Complete)、taskEntity ( WorkflowTaskEntity)

  • 配置 listener,GroupTaskListenerFactory.get(workflowcontext, Init_Sort),

    • nameToListenerMap:<ListenerName, Listener>
      • sortConfigListener:SortConfigListener
      • startupSortListener: StartupSortListener
    • listeners: <ProcessEvent., Listener>
      • TaskEvent.COMPLETE: SortConfigListener
      • TaskEvent.COMPLETE : StartupSortListener
  • 回调 listener,TaskEvent.COMPLETE 事件,即回调 QueueResourceListener

    • SortConfigListener:
      • 将 Group 状态设置为 CONFIG_ING
      • 为每个 stream 创建 dataflow(GroupInfo)
        • 获取 StreamSOurce 信息,<InLongStreamId, List>
        • 获取 transformer 信息:<InLongStreamId, List>
        • 获取 source 和 transform 中的 field 信息<nodeId-fieldName, StreamField>
        • 将 InlongStreamInfo 中的属性拷贝到每个 StreamSink 中,并且添加 audit 信息
        • 使用 inlongStreamInfo 中的 fileldList 更新 streamSource 的 filedList
        • 创建 relation 信息,如果有 transform ,则有两个 relation,source:transform 和 transform:sink; 如果没有 transform,则只有一个 relation,source:sink
        • 为 source 增加 audit 信息
        • 为 source、transform、sink 创建对应的 E-T-L nodes
        • 将 streamId/nodes 和 relations 封装为 StreamInfo
        • 将 groupId 和 streamInfo 封装成为 GroupInfo,表示一个 dataflow
      • 将 dataflow 信息添加到 InlongStreamInfo 的 ext 字段中(InlongStreamExtInfo)
        • gropuId、streamId
        • InlongConstants.DATAFLOW:dataflow
    • StartupSortListener: 为每个 stream 启动 Flink 任务

1.4 StreamResourceProcessForm 处理

StreamResourceProcessForm 的处理也是先创建 WorkflowContext:

  • StreamResourceProcessForm

    • groupInfo
    • streamInfo list
    • INIT
  • WorkflowProcss (CREATE_STREAM_RESOURCE),由 CreateStreamWorkflowDefinition 定义:

    • name(CREATE_STREAM_RESOURCE)、 type( Create Stream)、displayName(Create Stream)、formClass(StreamResourceProcessForm)
    • Element 处理链:startEvent -> ServiceTask(Init MQ) -> ServiceTask(Init Sink)->ServiceTask(Init sort) -> ServiceTask(Init Source)-> EndEvent
      • initMQTask:ServiceTask
        • name (InitMQ)、displayName(Stream-InitMQ)
        • serviceTaskType: INIT_MQ
        • listernerFactor: StreamTaskListenerFactory
      • initSinkTask: ServiceTask
        • name ( InitSink)、displayName(Stream-InitSink)
        • serviceTaskType: INIT_SINK
        • listernerFactor: StreamTaskListenerFactory
      • initSortTask: ServiceTask
        • name :(InitSort)、displayName(Stream-InittSort)
        • serviceTaskType: INIT_SORT
        • listernerFactor: StreamTaskListenerFactory
      • initSourceTask: ServiceTask
        • name ( InitSource)、displayName(Stream-InitSource)
        • serviceTaskType: INIT_SOURCE
        • listernerFactor: StreamTaskListenerFactory
      • EndEvent
    • nameToListenerMap: <ListenerName, Listener>
      • initStreamListener: InitStreamListener
      • initStreamCompleteListener: InitStreamCompleteListener
      • initStreamFailedListener : initStreamFailedListener
    • listeners: <ProcessEvent., Listener>
      • ProcessEvent.CREATE: InitStreamListener
      • ProcessEvent.COMPLETE: InitStreamCompleteListener
      • ProcessEvent.FAIL: initStreamFailedListener

处理 StartEvent(StartEventProcessor.create),WorkflowContext 设置 currentElement 为 startEvent,

  • 构建 workfowProcess 信息并保存到 DB,WorkflowProcessEntity:ame(CREATE_STREAM_RESOURCE)、displayName(Create Stream)、type(Create Stream)、groupId、Status(PROCESSING)、fromData(StreamResourceProcessForm 的 json 格式)

  • WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START,回调 InitStreamListener

  • 更新 stream 状态为 CONFIG_ING

处理 initMQTask,WorkflowContext 设置 currentElement 为 initMQTask

  • 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitMQ)、dispalyName(Stream-InitMQ)、status(PENDING)
  • 修改 workflowContext 的 actionContext:Task(InitMQTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
  • 配置 listener,只有 standmode才需要,直接跳过

处理 initSinkTask,WorkflowContext 设置 currentElement 为 initSinkTask

  • 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName:(Create Stream)、name(InitSink)、dispalyName(Stream-InitSink)、status(PENDING)
  • 修改 workflowContext 的 actionContext::Task(InitSinkTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
  • 配置 listener,StreamTaskListenerFactory(workflowcontext, Init_Sink), StreamSinkResourceListener, listerner 监听 COMPLETE event
  • 回调 StreamSinkResourceListener,根据 ENABLE_CREATE_RESOURCE 的配置来决定是否创建 sink 资源(比如 mysql 创建 sink table)

处理 initSortTask,WorkflowContext 设置 currentElement 为 initSortTask

  • 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitSort)、dispalyName(Stream-InitSort)、status(PENDING)
  • 修改 workflowContext 的 actionContext:Task(InitSortTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
  • 配置 listener, StreamSortConfigListener,监听 TaskEvent.COMPLETE
  • 回调 StreamSortConfigListener,isStream = true, 直接返回

处理 initSourceTask,WorkflowContext 设置 currentElement 为 initSourceTask

  • 构建 WorkflowTaskEntity 并且保存: type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitSource)、dispalyName(Stream-InitSource)、status(PENDING)
  • 修改 workflowContext 的 actionContext:Task(InitSourceTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
  • 配置 listener,StreamTaskListenerFactory(workflowcontext, Init_Source), listerer 为空

处理 endEvent

  • 修改 WorkflowProcessEntity 状态为 Completed
  • 回调 InitStreamCompleteListener
    • 更新 stream 状态为 CONFIG_SUCCESSFUL
    • 如果是数据同步,更新 source 状态为 SOURCE_NORMAL

2. 实时同步任务启动

除了上述的状态处理之外,还有 plugin 的处理,InLong 针对不同的环境(开源、云、内网)使用不同的 plugin 来执行不同的逻辑。

以开源为例,审批通过之后,会回调 StartupSortListener 处理。

首先会判断是否是 GroupResourceProcessForm,如果是,则继续执行,否则抛出异常。

校验 sink 是否为 0,为 0 则抛出异常,否则继续;

然后遍历所有的 Stream,为每个 Stream 创建 flink job。

2.1 构建 FlinkInfo

  • 生成 flink job name:InLong-Sort--
  • 获取 sort.url,设置到 endpoint 中
  • 设置 streamInfo

2.2 构建 FlinkService

从配置文件 $INLONG_HOME/inlong-manager/plugins/flink-sort-plugin.properties 中读取 flink 相关信息:

# Flink 版本 , 支持 [1.13|1.15]
flink.version=1.15
# Flink Flink REST 服务器 ip 地址
flink.rest.address=127.0.0.1
# Flink Flink REST 服务器端口
flink.rest.port=8081
# Flink jobmanager 端口
flink.jobmanager.port=6123
# Flink savepoint 目录
flink.savepoint.directory=file:///data/inlong-sort/savepoints
# Flink 并发度
flink.parallelism=1
# flink stop request drain
flink.drain=false

使用这些信息构建 flink 的 Configuration,使用 configuration 构建 FlinkClientService。

2.3 构建 FlinkOperation

使用 FlinkService 构建 FlinkOperation。

查找 inlong-sort 对应的 flink jar 目录,处理dataflow 中的每个节点,并且超找对应的 connector

创建 flink 任务本地保存目录。

构建一个 IntegrationTaskRunner,主要逻辑是通过 flink service 提交 flink 作业,构建 PackagedProgram,然后结合 configuration 生成 jobGraph,PackagedProgram 中包含:

  • flink configuration
  • sort 入口类: org.apache.inlong.sort.Entrance
  • Sort jar 包
  • connector jar 目录
  • argument:jobName、localConfPath、checkpintInterval、auditProxy 地址

生成 jobGraph 之后,为 jobGraph 增加 connectorJars 配置。

然后获取 flink 的 RestClusterClient,提交jobGraph。