1. 实时同步任务的创建
实时同步一个 Group 只能配置一个数据流, manager 配置数据流审批之后,直接提交任务到 flink 集群。
1.1 实时同步任务创建
1.1.1 配置基本信息
填写 groupId,责任人,并且选择是否是整库迁移,然后保存 http://ip/inlong/manager/api/group/save
POST,body
{
"inlongGroupId": "data_sync_test",
"inCharges": "admin",
"sinkMultipleEnable": false,
"mqType": "NONE",
"inlongGroupMode": 1
}
// org.apache.inlong.manager.web.controller.InlongGroupController
1.1.2 配置数据流
创建一个和 groupId 相同的 streamId,保存 stream 信息 http://ip/inlong/manager/api/stream/save
POST, body
{
"inlongGroupId": "data_sync_test",
"inlongStreamId": "data_sync_test",
"sinkMultipleEnable": false
}
// org.apache.inlong.manager.web.controller.InlongStreamController
配置 source 基本信息, http://ip/inlong/manager/api/source/save
POST
{
"inlongGroupId": "data_sync_test",
"inlongStreamId": "data_sync_test",
"sourceType": "PULSAR",
"sourceName": "ds_test",
"pulsarTenant": "ds_tenant",
"namespace": "ds_namespace",
"adminUrl": "http://pulsar:8080",
"serviceUrl": "pulsar://pulsar:6650",
"topic": "ds_topic",
"serializationType": "CSV",
"dataEncoding": "UTF-8",
"wrapType": "RAW"
}
// org.apache.inlong.manager.web.controller.StreamSourceController
配置 source 字段 http://ip/inlong/manager/api/stream/update
POST
{
"fieldList": [
{
"fieldName": "id",
"fieldType": "int",
"fieldComment": "id"
}
],
"inlongGroupId": "data_sync_test",
"inlongStreamId": "data_sync_test",
"version": 3
}
配置 sink,http://ip/inlong/manager/api/sink/save
POST
{
"inlongGroupId": "data_sync_test",
"inlongStreamId": "data_sync_test",
"sinkType": "MYSQL",
"sinkName": "ds_sink",
"databaseName": "ds_test_db",
"tableName": "ds_test_table",
"primaryKey": "id",
"dataNodeName": "5c98718d-a51b-41c2-9122-a54b43fb3fe4",
"sinkMultipleFormat": "canal",
"sinkMultipleEnable": true,
"enableCreateResource": 0,
"properties": {}
}
配置 sink 字段 http://ip/inlong/manager/api/sink/update
POST
{
"inlongGroupId": "data_sync_test",
"inlongStreamId": "data_sync_test",
"id": 91,
"sinkType": "MYSQL",
"version": 1,
"sinkFieldList": [
{
"sourceFieldName": "id",
"sourceFieldType": "int",
"fieldName": "id",
"fieldType": "tinyint"
}
]
}
1.1.3 提交任务
配置完成之后,将同步任务提交审批, http://ip/inlong/manager/api/group/startProcess/data_sync_test
POST, manager 开始处理任务(一个任务对应于一个 InLong Group)信息。
首先,将 group 的状态置为 TO_BE_APPROVAL;
提交审批对应于一个 ApplyGroupProcess 的表单,然后创建一个 ApplyGroupProcess 对应的 WorkflowDefinition。
- ApplyGroupProcess:
- GroupInfo
- Stream list
- WorkflowDefinition:
- Element 处理链:startEvent -> UserTask -> endEvent
- startEvent:
- UserTask: 对应于管理员的审批,当 Group 审批通过时回调处理 UserTask
- formCLass:InLongGroupApproveForm,当审批通过时,会有 InLongGroupApproveForm 的表单生成
- approvers:审批人列表
- nameToListenerMap: <ListenerName, Listener>
- afterApprovedTaskListener : AfterApprovedTaskListener
- listerners: <ProcessEvent., Listener>
- TaskEvent.APPROVE: AfterApprovedTaskListener
- EndEvetn:
- nameToListenerMap: <ListenerName, Listener>
- cancelApplyProcessListener : CancelApplyProcessListener
- rejectApplyProcessListener : RejectApplyProcessListener
- approveApplyProcessListener : ApproveApplyProcessListener
- listerners: <ProcessEvent., Listener>
- ProcessEvent.CANCEL: CancelApplyProcessListener
- ProcessEvent.REJECT: RejectApplyProcessListener
- ProcessEvent.COMPLETE: ApproveApplyProcessListener
- Element 处理链:startEvent -> UserTask -> endEvent
对 workFlowProcess 进行处理,处理通过 ELementProcessor 来完成,ElementProcessor 的实现逻辑和 Element 实现对应,每个 ElementProcessor watch 对应的 Element 类型。
处理 StartEvent,通过 StartEventProcessor 完成:
- 保存 workfowProcess 信息到 DB,WorkflowProcessEntity:name(APPLY_GROUP_PROCESS)、displayName(Apply Group)、type(Apply Group)、groupId: 、Status:(PROCESSING)、fromData(ApplyGroupProcessForm form 的 json 格式)
- WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START
处理 UserTask,WorkflowContext 设置当前 Element 为 UserTask,通过 UserTaskProcessor.create 继续执行后续逻辑:
- 获取审批人信息,aprovers,未审批人生成 WorkflowTaskEntity
- 生成 workflowTask 信息并保存到 DB,WorkflowTaskEntity:type(UserTask.class.getSimpleName)、processId(WorkflowProcessEntity.getId)、processName(APPLY_GROUP_PROCESS)、displayName(Apply Group)、name( ut_admin)、displayName(SystemAdmin)、approvers(approves)、formclass(InlongGroupApproveForm.class)、status(PENDING)
- 更新 Workflowcontext 的 actionContext: task 设置为 UserTask、action 设置为 COMPLETE、taskEntity 设置为 WorkflowTaskEntity
提交任务审批的流程到此结束,后续的处理依赖于审批的审批情况,接下来看审批的流程。
1.2 实时同步任务审批http://ip/inlong/manager/api/workflow/approve/959
POST
{
"remark": null,
"form": {
"groupApproveInfo": {
"remark": null,
"inlongGroupId": "data_sync_test",
"inCharges": "admin",
"sinkMultipleEnable": false,
"description": "",
"mqType": "NONE"
},
"formName": "InlongGroupApproveForm"
}
}
// o.a.i.manager.web.controller.WorkflowController
先构造 WorflowContext:
-
ApplyGroupProcess:
- GroupInfo
- Stream list
-
WorkflowDefinition:
- Element 处理链:startEvent -> UserTask -> endEvent
- startEvent:
- UserTask: 对应于管理员的审批,当 Group 审批通过时回调处理 UserTask
- formCLass:InLongGroupApproveForm,当审批通过时,会有 InLongGroupApproveForm 的表单生成
- approvers:审批人列表
- nameToListenerMap: <ListenerName, Listener>
- afterApprovedTaskListener : AfterApprovedTaskListener
- listerners: <ProcessEvent., Listener>
- TaskEvent.APPROVE: AfterApprovedTaskListener
- EndEvetn:
- nameToListenerMap: <ListenerName, Listener>
- cancelApplyProcessListener : CancelApplyProcessListener
- rejectApplyProcessListener : RejectApplyProcessListener
- approveApplyProcessListener : ApproveApplyProcessListener
- listerners: <ProcessEvent., Listener>
- ProcessEvent.CANCEL: CancelApplyProcessListener
- ProcessEvent.REJECT: RejectApplyProcessListener
- ProcessEvent.COMPLETE: ApproveApplyProcessListener
- Element 处理链:startEvent -> UserTask -> endEvent
-
processEntity:workflowProcessEntity
-
task:UserTask
-
ActionContext
- action:approve
- taskEntity:WorkflowTaskEntity
- Task: userTask
- taskForm:InlongGroupApproveForm
- transfertoUsers:null
- remark: null
WorflowContext 构建完成之后,开始处理 UserTask,修改 workflowContext 的 currentElement 为 UserTask,然后使用 UserTaskProcessor 处理
- 检查 operator 支持的 action 是否匹配
- 检查 taskEntity 状态是否为 pending
- 检查审批人
- 更新 WorkflowTaskEntity,并更新到 DB: status(TaskStatus.APPROVED)、operator、remark(null)、taskForm(InlongGroupApproveForm,包含 approveRequest 和 streamList 信息)
- 回调 UserTaskProcessor 的 listener 来处理 (AfterApprovedTaskListener.listen)
- 修改 Group状态为 APPROVE_PASSED
- 将 Group 中 每个 stream 状态修改为 CONFIG_ING
- 将 stream 中每个 sink 状态 CONFIG_ING
至此,UserTask(主要处理管理员审批)处理完毕;
接下来处理 WorkFlowProcess 中的最后一个 Element, endEvent
- 修改 WorkflowProcessEntity 状态为 ProcessStatus.COMPLETED 并保存到 DB
- 至此 Group 的 操作基本都已经完成,开始处理 stream,回调 workflowProcess 的 COMPLETE listener,即 ApproveApplyProcessListener 处理,生成新的表单 GroupResourceProcessForm(CREATE_GROUP_RESOURCE),然后开始后续的处理,GroupResourceProcessForm 包含:
- InlongGroupInfo
- StreamInfo list
接下来开始处理 GroupResourceProcessForm 的表单。
1.3 GroupResourceProcessForm 处理
同样地,GroupResourceProcessForm 表单的处理也需要先生成 WorkFlowProgress,然后和 ProcessForm 一起构造出 WorkflowContext:
- GroupResourceProcessForm
- InlongGroupInfo
- StreamInfo list
- INIT
- workflowProcess:由 CreateGroupWorkflowDefinition 定义
- Element 处理链:startEvent -> ServiceTask(Init MQ) -> ServiceTask(Init sort) -> EndEvent
- startEvent
- ServiceTask(Init MQ)
- name(InitMQ)、displatyName(Group-InitMQ)
- serviceTaskType: ServiceTaskType.INIT_MQ
- listenerFactor: GroupTaskListenerFactory
- ServiceTask(Init Sort):
- name(InitSort)、displatyName(Group-InitSort)
- serviceTaskType: ServiceTaskType.INIT_SORT
- listenerFactor: GroupTaskListenerFactory
- EndEvent
- nameToListenerMap: <ListenerName, Listener>
- InitGroupListener: InitGroupListener
- initGroupCompleteListener: InitGroupCompleteListener
- initGroupFailedListener : InitGroupFailedListener
- listeners: <ProcessEvent., Listener>
- ProcessEvent.CREATE: InitGroupListener
- ProcessEvent.COMPLETE: InitGroupCompleteListener
- ProcessEvent.FAIL: InitGroupFailedListener
- Element 处理链:startEvent -> ServiceTask(Init MQ) -> ServiceTask(Init sort) -> EndEvent
处理 StartEvent(StartEventProcessor.create),WorkflowContext 设置 currentElement 为 startEvent,
- 构建 workfowProcess 信息并保存到 DB,WorkflowProcessEntity:name(CREATE_GROUP_RESOURCE)、displayName(Create Group)、type(Create Group)、groupId、Status(PROCESSING)、fromData(GroupResourceProcessForm form 的 json 格式)
WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START
- 回调 InitGroupListener(CREATE),将 Group 状态设置为 CONFIG_ING
处理 InitMQTask
-
WorkflowContext 设置 currentElement 为 initMQTask
-
构建 WorkflowTaskEntity(processId+serviceName) 并且保存到 DB:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_GROUP_RESOURCE)、processDisplayName(Create Group)、name( InitMQ)、dispalyName(Group-InitMQ)、Approvers、status: PENDING)
-
修改 workflowContext 的 actionContext:Task(InitMQTask)、Action( Complete)、taskEntity ( WorkflowTaskEntity)
-
为 InitMQTask 初始化 listener:
- nameToListenerMap:<ListenerName, Listener>
- queueResourceListener:QueueResourceListener
- listeners:<ProcessEvent., Listener>
- TaskEvent.COMPLETE :QueueResourceListener
- nameToListenerMap:<ListenerName, Listener>
-
回调 listener,TaskEvent.COMPLETE 事件,即回调 QueueResourceListener
- 如果是数据同步类型的 stream,为每个 Stream 创建 StreamResourceProcessForm 表单
- groupInfo
- streamInfo list
- INIT
- 然后处理 StreamResourceProcessForm,StreamResourceProcessForm 的处理参考 1.4 章节。
- 如果是数据同步类型的 stream,为每个 Stream 创建 StreamResourceProcessForm 表单
处理 InitSortTask, WorkflowContext 设置 currentElement 为 InitSortTask
-
WorkflowContext 设置 currentElement 为 InitSortTask
-
构建 WorkflowTaskEntity 并且保存: type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_GROUP_RESOURCE)、processDisplayName(Create Group)、name( InitSort)、dispalyName(Group-InitSort)、Approvers、status: PENDING)
-
修改 workflowContext 的 actionContext:Task(InitSortTask)、Action( Complete)、taskEntity ( WorkflowTaskEntity)
-
配置 listener,GroupTaskListenerFactory.get(workflowcontext, Init_Sort),
- nameToListenerMap:<ListenerName, Listener>
- sortConfigListener:SortConfigListener
- startupSortListener: StartupSortListener
- listeners: <ProcessEvent., Listener>
- TaskEvent.COMPLETE: SortConfigListener
- TaskEvent.COMPLETE : StartupSortListener
- nameToListenerMap:<ListenerName, Listener>
-
回调 listener,TaskEvent.COMPLETE 事件,即回调 QueueResourceListener
- SortConfigListener:
- 将 Group 状态设置为 CONFIG_ING
- 为每个 stream 创建 dataflow(GroupInfo)
- 获取 StreamSOurce 信息,<InLongStreamId, List
> - 获取 transformer 信息:<InLongStreamId, List
> - 获取 source 和 transform 中的 field 信息<nodeId-fieldName, StreamField>
- 将 InlongStreamInfo 中的属性拷贝到每个 StreamSink 中,并且添加 audit 信息
- 使用 inlongStreamInfo 中的 fileldList 更新 streamSource 的 filedList
- 创建 relation 信息,如果有 transform ,则有两个 relation,source:transform 和 transform:sink; 如果没有 transform,则只有一个 relation,source:sink
- 为 source 增加 audit 信息
- 为 source、transform、sink 创建对应的 E-T-L nodes
- 将 streamId/nodes 和 relations 封装为 StreamInfo
- 将 groupId 和 streamInfo 封装成为 GroupInfo,表示一个 dataflow
- 获取 StreamSOurce 信息,<InLongStreamId, List
- 将 dataflow 信息添加到 InlongStreamInfo 的 ext 字段中(InlongStreamExtInfo)
- gropuId、streamId
- InlongConstants.DATAFLOW:dataflow
- StartupSortListener: 为每个 stream 启动 Flink 任务
- SortConfigListener:
1.4 StreamResourceProcessForm 处理
StreamResourceProcessForm 的处理也是先创建 WorkflowContext:
-
StreamResourceProcessForm
- groupInfo
- streamInfo list
- INIT
-
WorkflowProcss (CREATE_STREAM_RESOURCE),由 CreateStreamWorkflowDefinition 定义:
- name(CREATE_STREAM_RESOURCE)、 type( Create Stream)、displayName(Create Stream)、formClass(StreamResourceProcessForm)
- Element 处理链:startEvent -> ServiceTask(Init MQ) -> ServiceTask(Init Sink)->ServiceTask(Init sort) -> ServiceTask(Init Source)-> EndEvent
- initMQTask:ServiceTask
- name (InitMQ)、displayName(Stream-InitMQ)
- serviceTaskType: INIT_MQ
- listernerFactor: StreamTaskListenerFactory
- initSinkTask: ServiceTask
- name ( InitSink)、displayName(Stream-InitSink)
- serviceTaskType: INIT_SINK
- listernerFactor: StreamTaskListenerFactory
- initSortTask: ServiceTask
- name :(InitSort)、displayName(Stream-InittSort)
- serviceTaskType: INIT_SORT
- listernerFactor: StreamTaskListenerFactory
- initSourceTask: ServiceTask
- name ( InitSource)、displayName(Stream-InitSource)
- serviceTaskType: INIT_SOURCE
- listernerFactor: StreamTaskListenerFactory
- EndEvent
- initMQTask:ServiceTask
- nameToListenerMap: <ListenerName, Listener>
- initStreamListener: InitStreamListener
- initStreamCompleteListener: InitStreamCompleteListener
- initStreamFailedListener : initStreamFailedListener
- listeners: <ProcessEvent., Listener>
- ProcessEvent.CREATE: InitStreamListener
- ProcessEvent.COMPLETE: InitStreamCompleteListener
- ProcessEvent.FAIL: initStreamFailedListener
处理 StartEvent(StartEventProcessor.create),WorkflowContext 设置 currentElement 为 startEvent,
-
构建 workfowProcess 信息并保存到 DB,WorkflowProcessEntity:ame(CREATE_STREAM_RESOURCE)、displayName(Create Stream)、type(Create Stream)、groupId、Status(PROCESSING)、fromData(StreamResourceProcessForm 的 json 格式)
-
WorkflowContext 设置 processEntity 为 WorkflowProcessEntity,修改 actionContext 为 START,回调 InitStreamListener
-
更新 stream 状态为 CONFIG_ING
处理 initMQTask,WorkflowContext 设置 currentElement 为 initMQTask
- 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitMQ)、dispalyName(Stream-InitMQ)、status(PENDING)
- 修改 workflowContext 的 actionContext:Task(InitMQTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
- 配置 listener,只有 standmode才需要,直接跳过
处理 initSinkTask,WorkflowContext 设置 currentElement 为 initSinkTask
- 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName:(Create Stream)、name(InitSink)、dispalyName(Stream-InitSink)、status(PENDING)
- 修改 workflowContext 的 actionContext::Task(InitSinkTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
- 配置 listener,StreamTaskListenerFactory(workflowcontext, Init_Sink), StreamSinkResourceListener, listerner 监听 COMPLETE event
- 回调 StreamSinkResourceListener,根据 ENABLE_CREATE_RESOURCE 的配置来决定是否创建 sink 资源(比如 mysql 创建 sink table)
处理 initSortTask,WorkflowContext 设置 currentElement 为 initSortTask
- 构建 WorkflowTaskEntity 并且保存:type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitSort)、dispalyName(Stream-InitSort)、status(PENDING)
- 修改 workflowContext 的 actionContext:Task(InitSortTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
- 配置 listener, StreamSortConfigListener,监听 TaskEvent.COMPLETE
- 回调 StreamSortConfigListener,isStream = true, 直接返回
处理 initSourceTask,WorkflowContext 设置 currentElement 为 initSourceTask
- 构建 WorkflowTaskEntity 并且保存: type(ServcieTask)、processId(WorkflowProcessEntity.id)、processName(CREATE_STREAM_RESOURCE)、processDisplayName(Create Stream)、name(InitSource)、dispalyName(Stream-InitSource)、status(PENDING)
- 修改 workflowContext 的 actionContext:Task(InitSourceTask)、Action(Complete)、taskEntity (WorkflowTaskEntity)
- 配置 listener,StreamTaskListenerFactory(workflowcontext, Init_Source), listerer 为空
处理 endEvent,
- 修改 WorkflowProcessEntity 状态为 Completed
- 回调 InitStreamCompleteListener
- 更新 stream 状态为 CONFIG_SUCCESSFUL
- 如果是数据同步,更新 source 状态为 SOURCE_NORMAL
2. 实时同步任务启动
除了上述的状态处理之外,还有 plugin 的处理,InLong 针对不同的环境(开源、云、内网)使用不同的 plugin 来执行不同的逻辑。
以开源为例,审批通过之后,会回调 StartupSortListener
处理。
首先会判断是否是 GroupResourceProcessForm,如果是,则继续执行,否则抛出异常。
校验 sink 是否为 0,为 0 则抛出异常,否则继续;
然后遍历所有的 Stream,为每个 Stream 创建 flink job。
2.1 构建 FlinkInfo
- 生成 flink job name:InLong-Sort--
- 获取 sort.url,设置到 endpoint 中
- 设置 streamInfo
2.2 构建 FlinkService
从配置文件 $INLONG_HOME/inlong-manager/plugins/flink-sort-plugin.properties
中读取 flink 相关信息:
# Flink 版本 , 支持 [1.13|1.15]
flink.version=1.15
# Flink Flink REST 服务器 ip 地址
flink.rest.address=127.0.0.1
# Flink Flink REST 服务器端口
flink.rest.port=8081
# Flink jobmanager 端口
flink.jobmanager.port=6123
# Flink savepoint 目录
flink.savepoint.directory=file:///data/inlong-sort/savepoints
# Flink 并发度
flink.parallelism=1
# flink stop request drain
flink.drain=false
使用这些信息构建 flink 的 Configuration,使用 configuration 构建 FlinkClientService。
2.3 构建 FlinkOperation
使用 FlinkService 构建 FlinkOperation。
查找 inlong-sort 对应的 flink jar 目录,处理dataflow 中的每个节点,并且超找对应的 connector
创建 flink 任务本地保存目录。
构建一个 IntegrationTaskRunner,主要逻辑是通过 flink service 提交 flink 作业,构建 PackagedProgram,然后结合 configuration 生成 jobGraph,PackagedProgram 中包含:
- flink configuration
- sort 入口类: org.apache.inlong.sort.Entrance
- Sort jar 包
- connector jar 目录
- argument:jobName、localConfPath、checkpintInterval、auditProxy 地址
生成 jobGraph 之后,为 jobGraph 增加 connectorJars 配置。
然后获取 flink 的 RestClusterClient,提交jobGraph。