DataWorks MCP Server

Official

CreateDIJob

Create and configure data integration and synchronization tasks for various data sources such as Hologres, Kafka, and MySQL, enabling real-time or batch data migration and transformation.

Instructions

创建数据集成同步任务 *此Tool有MCP Resource,请查看CreateDIJob(MCP Resource)获取更多使用此Tool的示例详情。

Input Schema

NameRequiredDescriptionDefault
DescriptionNo任务的描述
DestinationDataSourceSettingsYes目标端数据源设置列表
DestinationDataSourceTypeYes目标端数据源类型,枚举值:Hologres,OSS-HDFS,OSS,MaxCompute,LogHub,StarRocks,DataHub,AnalyticDB_For_MySQL,Kafka,Hive
JobNameNo该字段已废弃,请使用Name字段
JobSettingsNo同步任务维度的设置,含DDL处理策略、源端和目标端列数据类型映射策略、任务运行时参数等
JobTypeNo任务类型,可选-DatabaseRealtimeMigration(整库实时):将源端多个库的多个表进行流同步,支持仅全量,仅增量,或全量+增量。-DatabaseOfflineMigration(整库离线):将源端多个库的多个表进行批同步,支持仅全量,仅增量,或全量+增量。-SingleTableRealtimeMigration(单表实时):将源端单个表进行流同步
MigrationTypeYes同步类型,可选的枚举值有:- FullAndRealtimeIncremental(全量和实时增量,整库实时)- RealtimeIncremental(实时增量,单表实时)- Full(全量,整库离线)- OfflineIncremental(离线增量,整库离线)- FullAndOfflineIncremental(全量+离线增量,整库离线)
NameNo任务名称
ProjectIdNoDataWorks工作空间的ID
ResourceSettingsYes资源设置
SourceDataSourceSettingsYes源端数据源设置列表
SourceDataSourceTypeYes源端数据源类型,枚举值: PolarDB,MySQL,Kafka,LogHub,Hologres,Oracle,OceanBase,MongoDB,RedShift,Hive,SQLServer,Doris,ClickHouse
TableMappingsYes同步对象转换映射列表,列表中每个元素描述了一组源端同步对象选择规则列表和在该组同步对象上应用的同步对象转换规则列表。>[{"SourceObjectSelectionRules":[{"ObjectType":"Database","Action":"Include","ExpressionType":"Exact","Expression":"biz_db"},{"ObjectType":"Schema","Action":"Include","ExpressionType":"Exact","Expression":"s1"},{"ObjectType":"Table","Action":"Include","ExpressionType":"Exact","Expression":"table1"}],"TransformationRuleNames":[{"RuleName":"my_database_rename_rule","RuleActionType":"Rename","RuleTargetType":"Schema" }]}]
TransformationRulesNo同步对象转换规则定义列表。>[{"RuleName":"my_database_rename_rule","RuleActionType":"Rename","RuleTargetType":"Schema","RuleExpression":"{\"expression\":\"${srcDatasoureName}_${srcDatabaseName}\"}"}]

Input Schema (JSON Schema)

{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { "Description": { "description": "任务的描述", "type": "string" }, "DestinationDataSourceSettings": { "description": "目标端数据源设置列表", "items": { "additionalProperties": false, "properties": { "DataSourceName": { "description": "数据源名称", "type": "string" } }, "type": "object" }, "type": "array" }, "DestinationDataSourceType": { "description": "目标端数据源类型,枚举值:Hologres,OSS-HDFS,OSS,MaxCompute,LogHub,StarRocks,DataHub,AnalyticDB_For_MySQL,Kafka,Hive", "type": "string" }, "JobName": { "description": "该字段已废弃,请使用Name字段", "type": "string" }, "JobSettings": { "additionalProperties": false, "description": "同步任务维度的设置,含DDL处理策略、源端和目标端列数据类型映射策略、任务运行时参数等", "properties": { "ChannelSettings": { "description": "通道相关任务设置,可以对一些特定通道进行特殊配置,目前支持Holo2Holo(从holo同步到holo),Holo2Kafka(从Holo同步到Kafka) 1. Holo2Kafka - 示例:{\"destinationChannelSettings\":{\"kafkaClientProperties\":[{\"key\":\"linger.ms\",\"value\":\"100\"}],\"keyColumns\":[\"col3\"],\"writeMode\":\"canal\"}}- kafkaClientProperties:kafka producer参数,写入kafka时使用- keyColumns, 需要写入的kafka列取值- writeMode,kafka写入格式,目前支持json/canal 2. Holo2Holo - 示例: {\"destinationChannelSettings\":{\"conflictMode\":\"replace\",\"dynamicColumnAction\":\"replay\",\"writeMode\":\"replay\"}}- conflictMode: 写入holo冲突处理策略,replace-覆盖、ignore-忽略- writeMode: 写入holo方式,replay-重放、insert-插入- dynamicColumnAction:写入holo 动态列方式replay-重放、insert-插入,ignore-忽略", "type": "string" }, "ColumnDataTypeSettings": { "description": "列类型映射数组。>\"ColumnDataTypeSettings\":[{\"SourceDataType\":\"Bigint\",\"DestinationDataType\":\"Text\"}]", "items": { "additionalProperties": false, "properties": { "DestinationDataType": { "description": "目标端类型。如bigint,boolean,string,text,datetime,timestamp,decimal,binary,不同数据源类型会有类型差异", "type": "string" }, "SourceDataType": { "description": "源端类型。如bigint,boolean,string,text,datetime,timestamp,decimal,binary,不同数据源类型会有类型差异", "type": "string" } }, "type": "object" }, "type": "array" }, "CycleScheduleSettings": { "additionalProperties": false, "description": "周期调度设置", "properties": { "CycleMigrationType": { "description": "需要周期调度的同步类型。取值范围:- Full:全量- OfflineIncremental:离线增量", "type": "string" }, "ScheduleParameters": { "description": "调度参数", "type": "string" } }, "type": "object" }, "DdlHandlingSettings": { "description": "DDL处理设置数组。>\"DDLHandlingSettings\":[{\"Type\":\"Insert\",\"Action\":\"Normal\"}]", "items": { "additionalProperties": false, "properties": { "Action": { "description": "处理动作,可选的枚举值:\t- Ignore(忽略)- Critical(报错)- Normal(正常处理)", "type": "string" }, "Type": { "description": "DDL类型,可选的枚举值:- RenameColumn(重命名列)- ModifyColumn(重命名列)- CreateTable(重命名列)- TruncateTable(清空表)- DropTable(删除表)- DropColumn(删除列)- AddColumn(新增列)", "type": "string" } }, "type": "object" }, "type": "array" }, "RuntimeSettings": { "description": "运行时设置", "items": { "additionalProperties": false, "properties": { "Name": { "description": "设置名称,可选的枚举值:- src.offline.datasource.max.connection(离线批量任务源端最大连接数)- dst.offline.truncate (是否清空目标表)- runtime.offline.speed.limit.enable(离线批量任务是否开启限流)- runtime.offline.concurrent(离线批量同步任务并发度)- runtime.enable.auto.create.schema(是否自动在目标端创建schema)- runtime.realtime.concurrent(实时任务并发度)- runtime.realtime.failover.minute.dataxcdc (failover失败重启等待时间单位分钟)- runtime.realtime.failover.times.dataxcdc (failover失败重启次数)", "type": "string" }, "Value": { "description": "设置取值", "type": "string" } }, "type": "object" }, "type": "array" } }, "type": "object" }, "JobType": { "description": "任务类型,可选-DatabaseRealtimeMigration(整库实时):将源端多个库的多个表进行流同步,支持仅全量,仅增量,或全量+增量。-DatabaseOfflineMigration(整库离线):将源端多个库的多个表进行批同步,支持仅全量,仅增量,或全量+增量。-SingleTableRealtimeMigration(单表实时):将源端单个表进行流同步", "type": "string" }, "MigrationType": { "description": "同步类型,可选的枚举值有:- FullAndRealtimeIncremental(全量和实时增量,整库实时)- RealtimeIncremental(实时增量,单表实时)- Full(全量,整库离线)- OfflineIncremental(离线增量,整库离线)- FullAndOfflineIncremental(全量+离线增量,整库离线)", "type": "string" }, "Name": { "description": "任务名称", "type": "string" }, "ProjectId": { "description": "DataWorks工作空间的ID" }, "ResourceSettings": { "additionalProperties": false, "description": "资源设置", "properties": { "OfflineResourceSettings": { "additionalProperties": false, "description": "离线同步资源", "properties": { "RequestedCu": { "description": "离线同步使用的数据集成资源组cu" }, "ResourceGroupIdentifier": { "description": "离线同步使用的数据集成资源组名称", "type": "string" } }, "type": "object" }, "RealtimeResourceSettings": { "additionalProperties": false, "description": "实时同步资源", "properties": { "RequestedCu": { "description": "实时同步使用的数据集成资源组cu" }, "ResourceGroupIdentifier": { "description": "实时同步使用的数据集成资源组名称", "type": "string" } }, "type": "object" }, "ScheduleResourceSettings": { "additionalProperties": false, "description": "调度资源", "properties": { "RequestedCu": { "description": "离线同步任务使用的调度资源组cu" }, "ResourceGroupIdentifier": { "description": "离线同步任务使用的调度资源组名称", "type": "string" } }, "type": "object" } }, "type": "object" }, "SourceDataSourceSettings": { "description": "源端数据源设置列表", "items": { "additionalProperties": false, "properties": { "DataSourceName": { "description": "数据源名称", "type": "string" }, "DataSourceProperties": { "additionalProperties": false, "description": "数据源属性", "properties": { "Encoding": { "description": "数据库编码", "type": "string" }, "Timezone": { "description": "时区", "type": "string" } }, "type": "object" } }, "type": "object" }, "type": "array" }, "SourceDataSourceType": { "description": "源端数据源类型,枚举值: PolarDB,MySQL,Kafka,LogHub,Hologres,Oracle,OceanBase,MongoDB,RedShift,Hive,SQLServer,Doris,ClickHouse", "type": "string" }, "TableMappings": { "description": "同步对象转换映射列表,列表中每个元素描述了一组源端同步对象选择规则列表和在该组同步对象上应用的同步对象转换规则列表。>[{\"SourceObjectSelectionRules\":[{\"ObjectType\":\"Database\",\"Action\":\"Include\",\"ExpressionType\":\"Exact\",\"Expression\":\"biz_db\"},{\"ObjectType\":\"Schema\",\"Action\":\"Include\",\"ExpressionType\":\"Exact\",\"Expression\":\"s1\"},{\"ObjectType\":\"Table\",\"Action\":\"Include\",\"ExpressionType\":\"Exact\",\"Expression\":\"table1\"}],\"TransformationRuleNames\":[{\"RuleName\":\"my_database_rename_rule\",\"RuleActionType\":\"Rename\",\"RuleTargetType\":\"Schema\"\t\t}]}]", "items": { "additionalProperties": false, "properties": { "SourceObjectSelectionRules": { "description": "每条规则可选择待同步源端对象的集合,多条规则组成选一个表", "items": { "additionalProperties": false, "properties": { "Action": { "description": "选择动作,取值范围:Include/Exclude", "type": "string" }, "Expression": { "description": "表达式", "type": "string" }, "ExpressionType": { "description": "表达式类型,取值范围:Exact/Regex", "type": "string" }, "ObjectType": { "description": "对象类型,可选的枚举值:- Table(表)- Schema(schema)- Database(数据库)", "type": "string" } }, "type": "object" }, "type": "array" }, "TransformationRules": { "description": "同步对象转换规则定义列表,列表中每个元素为一条转换规则定义", "items": { "additionalProperties": false, "properties": { "RuleActionType": { "description": "动作类型,可选的枚举值:- DefinePrimaryKey(定义主键)- Rename(重命名)- AddColumn(增加列)- HandleDml(DML处理)- DefineIncrementalCondition(定义增量条件)- DefineCycleScheduleSettings(定义周期调度设置)- DefineRuntimeSettings(定义高级配置参数)- DefinePartitionKey(定义分区列)", "type": "string" }, "RuleName": { "description": "规则名称,在一种动作类型+动作作用的目标类型下规则名称唯一", "type": "string" }, "RuleTargetType": { "description": "动作作用的目标类型,可选的枚举值:- Table(表)- Schema(schema)- Database(数据库)", "type": "string" } }, "type": "object" }, "type": "array" } }, "type": "object" }, "type": "array" }, "TransformationRules": { "description": "同步对象转换规则定义列表。>[{\"RuleName\":\"my_database_rename_rule\",\"RuleActionType\":\"Rename\",\"RuleTargetType\":\"Schema\",\"RuleExpression\":\"{\\\"expression\\\":\\\"${srcDatasoureName}_${srcDatabaseName}\\\"}\"}]", "items": { "additionalProperties": false, "properties": { "RuleActionType": { "description": "动作类型,可选的枚举值:- DefinePrimaryKey(定义主键)- Rename(重命名)- AddColumn(增加列)- HandleDml(DML处理)- DefineIncrementalCondition(定义增量条件)- DefineCycleScheduleSettings(定义周期调度设置)- DefinePartitionKey(定义分区列)", "type": "string" }, "RuleExpression": { "description": "规则表达式,json string格式。1. 重命名规则(Rename) - 示例:{\"expression\":\"${srcDatasourceName}_${srcDatabaseName}_0922\" } - expression:为重命名转换规则表达式,表达式内支持变量包括:${srcDatasourceName}(源端数据源名)、${srcDatabaseName}(源端库名)、${srcTableName}(源端表名)。2. 加列规则(AddColumn) - 示例:{\"columns\":[{\"columnName\":\"my_add_column\",\"columnValueType\":\"Constant\",\"columnValue\":\"123\"}]}- 如不指定,默认规则为不加列不复制。- columnName:附加的列名称。- columnValueType:附加的列取值类型,包括Constant(常量)、Variable(变量)。- columnValue:附加的列取值。当columnValueType=Constant时,value为自定义常量,String类型。当columnValueType=Variable时,value为内置变量。内置变量可选值包括:EXECUTE_TIME(执行时间,Long类型)、DB_NAME_SRC(源端数据库名称,String类型)、DATASOURCE_NAME_SRC(源端数据源名称,String类型)、TABLE_NAME_SRC(源端表名,String类型)、DB_NAME_DEST(目标端数据库名称,String类型)、DATASOURCE_NAME_DEST(目标端数据源名称,String类型)、TABLE_NAME_DEST(目标端表名,String类型)、DB_NAME_SRC_TRANSED(转换后的数据库名称,String类型)。3. 指定目标端表的主键列列名(DefinePrimaryKey)- 示例:{\"columns\":\\[\"ukcolumn1\",\"ukcolumn2\"\\]}- 如不指定默认使用源端主键列。- 当目标端为已有表:数据集成系统不会修改目标端表结构,如果指定的主键列不在目标端的列集合中,任务启动会报错提示。- 当目标端为自动建表:数据集成系统会自动创建目标端表结构,表结构包含定义的主键列。当指定的主键列不在目标端的列集合中时,任务启动会报错提示。4. DML处理规则(HandleDml)- 示例:{\"dmlPolicies\":\\[{\"dmlType\":\"Delete\",\"dmlAction\":\"Filter\",\"filterCondition\":\"id > 1\"}\\]}- 如不指定,默认规则为Insert、Update、Delete均为Normal- dmlType:DML操作类型,Insert(插入)、Update(更新)、Delete(删除)- dmlAction:DML处理策略,Normal(正常处理)、Ignore(忽略)、Filter(有条件的正常处理,当dmlType=Update/Delete时使用)、LogicalDelete(逻辑删除)- filterCondition:DML过滤条件,当dmlAction=Filter时使用5. 增量条件(DefineIncrementalCondition)- 示例:{\"where\":\"id > 0\"} - 指定增量过滤条件6. 周期调度参数(DefineCycleScheduleSettings)- 示例:{\"cronExpress\":\" * * * * * *\", \"cycleType\":\"1\"}- 指定周期任务调度参数7. 指定分区键(DefinePartitionKey)- 示例:{\"columns\":[\"id\"]} - 指定分区键", "type": "string" }, "RuleName": { "description": "规则名称,当动作类型和动作作用的目标类型相同时,规则名称需保证唯一性", "type": "string" }, "RuleTargetType": { "description": "动作作用的目标类型,可选的枚举值:- Table(表)- Schema(schema)- Database(数据库)", "type": "string" } }, "type": "object" }, "type": "array" } }, "required": [ "DestinationDataSourceType", "SourceDataSourceType", "MigrationType", "SourceDataSourceSettings", "DestinationDataSourceSettings", "ResourceSettings", "TableMappings" ], "type": "object" }

You must be authenticated.

Other Tools from DataWorks MCP Server

Related Tools

ID: 5xvvskonjr