未验证 提交 b1fb1711 编写于 作者: X xiangzihao 提交者: GitHub

[Feature-6758][Task] Add limit resource usage for tasks base on cgroup (#10373)

上级 7e39e0d4
......@@ -202,6 +202,7 @@ yarn.resourcemanager.ha.rm.ids||specify the yarn resourcemanager url. if resourc
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false| specify whether in development state
task.resource.limit.state|false|specify whether in resource limit state
### application-api.properties [API-service log config]
......
......@@ -19,6 +19,8 @@ DataX task type for executing DataX programs. For DataX nodes, the worker will e
- **Environment Name**: Configure the environment name in which to run the script.
- **Number of failed retry attempts**: The number of times the task failed to be resubmitted.
- **Failed retry interval**: The time, in cents, interval for resubmitting the task after a failed task.
- **Cpu quota**: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- **Max memory**:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- **Delayed execution time**: The time, in cents, that a task is delayed in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail.
- **Custom template**: Custom the content of the DataX node's json profile when the default data source provided does not meet the required requirements.
......
......@@ -25,6 +25,8 @@ Click [here](https://docs.conda.io/en/latest/) for more information about `conda
- Worker grouping: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Conda Env Name: Name of conda environment.
- Input Note Path: Path of input jupyter note template.
......
......@@ -20,6 +20,8 @@ it will generate a temporary python script, and executes the script by the Linux
- Environment Name: Configure the environment name in which to run the script.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Script: Python program developed by the user.
- Resource: Refers to the list of resource files that need to be called in the script, and the files uploaded or created by the resource center-file management.
......
......@@ -19,6 +19,8 @@ Shell task used to create a shell task type and execute a series of shell script
- Environment Name: Configure the environment name in which run the script.
- Times of failed retry attempts: The number of times the task failed to resubmit. You can select from drop-down or fill-in a number.
- Failed retry interval: The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number.
- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- Script: Shell program developed by users.
- Resource: Refers to the list of resource files that called in the script, and upload or create files by the Resource Center file management.
......
......@@ -194,6 +194,7 @@ yarn.resourcemanager.ha.rm.ids||yarn resourcemanager 地址, 如果resourcemanag
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false|是否处于开发模式
task.resource.limit.state|false|是否启用资源限制模式
## 5.application-api.properties [API服务配置]
......
......@@ -19,6 +19,8 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 延时执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 自定义模板:当默认提供的数据源不满足所需要求的时,可自定义 datax 节点的 json 配置文件内容。
......
......@@ -26,7 +26,9 @@
- Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。
- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Conda Env Name: Conda环境名称。
- Input Note Path: 输入的jupyter note模板路径。
......
......@@ -20,6 +20,8 @@ Python 任务类型,用于创建 Python 类型的任务并执行一系列的 P
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的PYTHON程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
......
......@@ -19,6 +19,8 @@ Shell 任务类型,用于创建 Shell 类型的任务并执行一系列的 She
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的 SHELL 程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
......
......@@ -178,6 +178,16 @@ public class TaskNode {
*/
private int delayTime;
/**
* cpu quota
*/
private Integer cpuQuota;
/**
* max memory
*/
private Integer memoryMax;
public String getId() {
return id;
}
......@@ -497,4 +507,20 @@ public class TaskNode {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
public Integer getCpuQuota() {
return cpuQuota == null ? -1 : cpuQuota;
}
public void setCpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
}
public Integer getMemoryMax() {
return memoryMax == null ? -1 : memoryMax;
}
public void setMemoryMax(Integer memoryMax) {
this.memoryMax = memoryMax;
}
}
......@@ -95,4 +95,7 @@ alert.rpc.port=50052
zeppelin.rest.url=http://localhost:8080
# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
\ No newline at end of file
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
# Task resource limit state
task.resource.limit.state=false
\ No newline at end of file
......@@ -201,6 +201,16 @@ public class TaskDefinition {
*/
private int taskGroupPriority;
/**
* cpu quota
*/
private Integer cpuQuota;
/**
* max memory
*/
private Integer memoryMax;
public TaskDefinition() {
}
......@@ -457,6 +467,22 @@ public class TaskDefinition {
this.environmentCode = environmentCode;
}
public Integer getCpuQuota() {
return cpuQuota == null ? -1 : cpuQuota;
}
public void setCpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
}
public Integer getMemoryMax() {
return memoryMax == null ? -1 : memoryMax;
}
public void setMemoryMax(Integer memoryMax) {
this.memoryMax = memoryMax;
}
@Override
public boolean equals(Object o) {
if (o == null) {
......@@ -481,7 +507,9 @@ public class TaskDefinition {
|| ("".equals(that.resourceIds) && resourceIds == null))
&& environmentCode == that.environmentCode
&& taskGroupId == that.taskGroupId
&& taskGroupPriority == that.taskGroupPriority;
&& taskGroupPriority == that.taskGroupPriority
&& Objects.equals(cpuQuota, that.cpuQuota)
&& Objects.equals(memoryMax, that.memoryMax);
}
@Override
......@@ -513,6 +541,8 @@ public class TaskDefinition {
+ ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\''
+ ", cpuQuota=" + cpuQuota
+ ", memoryMax=" + memoryMax
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
......
......@@ -70,6 +70,8 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
}
public int getOperator() {
......
......@@ -279,6 +279,16 @@ public class TaskInstance implements Serializable {
*/
private int taskGroupId;
/**
* cpu quota
*/
private Integer cpuQuota;
/**
* max memory
*/
private Integer memoryMax;
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
......@@ -753,4 +763,20 @@ public class TaskInstance implements Serializable {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
public Integer getCpuQuota() {
return cpuQuota == null ? -1 : cpuQuota;
}
public void setCpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
}
public Integer getMemoryMax() {
return memoryMax == null ? -1 : memoryMax;
}
public void setMemoryMax(Integer memoryMax) {
this.memoryMax = memoryMax;
}
}
......@@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time,task_group_id,task_group_priority
resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max
</sql>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
......@@ -51,7 +51,8 @@
<insert id="batchInsert">
insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time,task_group_id,task_group_priority)
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time,
update_time, task_group_id, task_group_priority, cpu_quota, memory_max)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
......@@ -59,7 +60,8 @@
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority})
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
#{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax})
</foreach>
</insert>
<delete id="deleteByCodeAndVersion">
......
......@@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time, task_group_id,task_group_priority
resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max
</sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
......
......@@ -22,7 +22,7 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, cpu_quota, memory_max
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
......
......@@ -486,6 +486,8 @@ CREATE TABLE t_ds_task_definition
delay_time int(11) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
cpu_quota int(11) DEFAULT '-1' NOT NULL,
memory_max int(11) DEFAULT '-1' NOT NULL,
resource_ids text,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
......@@ -521,6 +523,8 @@ CREATE TABLE t_ds_task_definition_log
operator int(11) DEFAULT NULL,
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
cpu_quota int(11) DEFAULT '-1' NOT NULL,
memory_max int(11) DEFAULT '-1' NOT NULL,
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
......@@ -860,6 +864,8 @@ CREATE TABLE t_ds_task_instance
task_group_id int(11) DEFAULT NULL,
var_pool longtext,
dry_run int NULL DEFAULT 0,
cpu_quota int(11) DEFAULT '-1' NOT NULL,
memory_max int(11) DEFAULT '-1' NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE
);
......
......@@ -487,6 +487,8 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority',
`cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
`memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
......@@ -521,6 +523,8 @@ CREATE TABLE `t_ds_task_definition_log` (
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT 0 COMMENT 'task group priority',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
`memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
......@@ -853,6 +857,8 @@ CREATE TABLE `t_ds_task_instance` (
`var_pool` longtext COMMENT 'var_pool',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
`cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
`memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
......
......@@ -404,6 +404,8 @@ CREATE TABLE t_ds_task_definition (
task_group_id int DEFAULT NULL,
task_group_priority int DEFAULT '0',
resource_ids text ,
cpu_quota int DEFAULT '-1' NOT NULL,
memory_max int DEFAULT '-1' NOT NULL,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
......@@ -441,6 +443,8 @@ CREATE TABLE t_ds_task_definition_log (
task_group_id int DEFAULT NULL,
task_group_priority int DEFAULT '0',
operate_time timestamp DEFAULT NULL ,
cpu_quota int DEFAULT '-1' NOT NULL,
memory_max int DEFAULT '-1' NOT NULL,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
......@@ -759,6 +763,8 @@ CREATE TABLE t_ds_task_instance (
task_group_id int DEFAULT NULL,
var_pool text ,
dry_run int DEFAULT '0' ,
cpu_quota int DEFAULT '-1' NOT NULL,
memory_max int DEFAULT '-1' NOT NULL,
PRIMARY KEY (id),
CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE
) ;
......
......@@ -80,6 +80,15 @@ ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMEN
ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE;
-- Add resource limit column
ALTER TABLE `t_ds_task_definition` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `task_group_priority`;
ALTER TABLE `t_ds_task_definition` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `operate_time`;
ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
ALTER TABLE `t_ds_task_instance` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `dry_run`;
ALTER TABLE `t_ds_task_instance` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
--
-- Table structure for table `t_ds_dq_comparison_type`
--
......
......@@ -36,6 +36,15 @@ EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_resources ALTER COLUMN
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_alert ADD COLUMN IF NOT EXISTS sign varchar(40) NOT NULL DEFAULT '''' ';
EXECUTE 'comment on column ' || quote_ident(v_schema) ||'.t_ds_alert.sign is ''sign=sha1(content)''';
-- Add resource limit column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
......
......@@ -85,3 +85,6 @@ aws.access.key.id=accessKey123
aws.secret.access.key=secretKey123
aws.region=us-east-1
aws.endpoint=http://s3:9000
# Task resource limit state
task.resource.limit.state=false
\ No newline at end of file
......@@ -63,6 +63,8 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setVarPool(taskInstance.getVarPool());
taskExecutionContext.setDryRun(taskInstance.getDryRun());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUBMITTED_SUCCESS);
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
return this;
}
......
......@@ -1177,6 +1177,10 @@ public class WorkflowExecuteThread {
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
//set task cpu quota and max memory
taskInstance.setCpuQuota(taskNode.getCpuQuota());
taskInstance.setMemoryMax(taskNode.getMemoryMax());
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
......
......@@ -2767,6 +2767,8 @@ public class ProcessServiceImpl implements ProcessService {
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority());
taskNode.setCpuQuota(taskDefinitionLog.getCpuQuota());
taskNode.setMemoryMax(taskDefinitionLog.getMemoryMax());
taskNodeList.add(taskNode);
}
}
......
......@@ -56,6 +56,11 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
......
......@@ -21,8 +21,10 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import java.io.BufferedReader;
import java.io.File;
......@@ -127,9 +129,13 @@ public abstract class AbstractCommandExecutor {
// if sudo.enable=true,setting up user to run commands
if (OSUtils.isSudoEnable()) {
command.add("sudo");
command.add("-u");
command.add(taskRequest.getTenantCode());
if (OSUtils.isLinux() && PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
generateCgroupCommand(command);
} else {
command.add("sudo");
command.add("-u");
command.add(taskRequest.getTenantCode());
}
}
command.add(commandInterpreter());
command.addAll(Collections.emptyList());
......@@ -142,6 +148,39 @@ public abstract class AbstractCommandExecutor {
printCommand(command);
}
/**
* generate systemd command.
* eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryMax=200M --uid=root
* @param command command
*/
private void generateCgroupCommand(List<String> command) {
Integer cpuQuota = taskRequest.getCpuQuota();
Integer memoryMax = taskRequest.getMemoryMax();
command.add("sudo");
command.add("systemd-run");
command.add("-q");
command.add("--scope");
if (cpuQuota == -1) {
command.add("-p");
command.add("CPUQuota=");
} else {
command.add("-p");
command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
}
if (memoryMax == -1) {
command.add("-p");
command.add(String.format("MemoryMax=%s", "infinity"));
} else {
command.add("-p");
command.add(String.format("MemoryMax=%sM", taskRequest.getMemoryMax()));
}
command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
}
public TaskResponse run(String execCommand) throws IOException, InterruptedException {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
......
......@@ -239,6 +239,16 @@ public class TaskExecutionContext {
private DataQualityTaskExecutionContext dataQualityTaskExecutionContext;
/**
* cpu quota
*/
private Integer cpuQuota;
/**
* max memory
*/
private Integer memoryMax;
public String getTaskLogName() {
return taskLogName;
}
......@@ -567,6 +577,22 @@ public class TaskExecutionContext {
this.endTime = endTime;
}
public Integer getCpuQuota() {
return cpuQuota;
}
public void setCpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
}
public Integer getMemoryMax() {
return memoryMax;
}
public void setMemoryMax(Integer memoryMax) {
this.memoryMax = memoryMax;
}
public K8sTaskExecutionContext getK8sTaskExecutionContext() {
return k8sTaskExecutionContext;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
public class AbstractCommandExecutorConstants {
private AbstractCommandExecutorConstants() {
throw new IllegalStateException("Utility class");
}
public static final String TASK_RESOURCE_LIMIT_STATE = "task.resource.limit.state";
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.ShellExecutor;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
......@@ -68,6 +69,15 @@ public class OSUtils {
return getOSName().startsWith("Windows");
}
/**
* whether is linux
*
* @return true if linux
*/
public static boolean isLinux() {
return SystemUtils.IS_OS_LINUX;
}
/**
* Execute the corresponding command of Linux or Windows
*
......
......@@ -308,6 +308,8 @@ export default {
task_group_name: 'Task group name',
task_group_queue_priority: 'Priority',
number_of_failed_retries: 'Number of failed retries',
cpu_quota: 'CPU quota',
memory_max: 'Max memory',
times: 'Times',
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
......
......@@ -308,6 +308,8 @@ export default {
task_group_name: '任务组名称',
task_group_queue_priority: '组内优先级',
number_of_failed_retries: '失败重试次数',
cpu_quota: 'CPU配额',
memory_max: '最大内存',
times: '',
failed_retry_interval: '失败重试间隔',
minute: '',
......
......@@ -23,6 +23,7 @@ export { useWorkerGroup } from './use-worker-group'
export { useEnvironmentName } from './use-environment-name'
export { useTaskGroup } from './use-task-group'
export { useFailed } from './use-failed'
export { useResourceLimit } from './use-resource-limit'
export { useDelayTime } from './use-delay-time'
export { useTimeoutAlarm } from './use-timeout-alarm'
export { usePreTasks } from './use-pre-tasks'
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
export function useResourceLimit(): IJsonItem[] {
const { t } = useI18n()
return [
{
type: 'input-number',
field: 'cpuQuota',
name: t('project.node.cpu_quota'),
span: 12,
slots: {
suffix: () => t('%')
},
props: {min: -1}
},
{
type: 'input-number',
field: 'memoryMax',
name: t('project.node.memory_max'),
span: 12,
slots: {
suffix: () => t('MB')
},
props: {min: -1}
}
]
}
......@@ -409,7 +409,9 @@ export function formatParams(data: INodeData): {
timeout: data.timeoutFlag ? data.timeout : 0,
timeoutFlag: data.timeoutFlag ? 'OPEN' : 'CLOSE',
timeoutNotifyStrategy: data.timeoutFlag ? timeoutNotifyStrategy : '',
workerGroup: data.workerGroup
workerGroup: data.workerGroup,
cpuQuota: data.cpuQuota || -1,
memoryMax: data.memoryMax || -1
}
} as {
processDefinitionCode: string
......
......@@ -42,6 +42,8 @@ export function useDataX({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
customConfig: false,
......@@ -77,6 +79,7 @@ export function useDataX({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useDataX(model),
......
......@@ -41,6 +41,8 @@ export function useJupyter({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN']
......@@ -71,6 +73,7 @@ export function useJupyter({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useJupyter(model),
......
......@@ -42,6 +42,8 @@ export function usePython({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
rawScript: '',
......@@ -73,6 +75,7 @@ export function usePython({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useShell(model),
......
......@@ -42,6 +42,8 @@ export function useSeaTunnel({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
deployMode: 'client',
......@@ -77,6 +79,7 @@ export function useSeaTunnel({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useSeaTunnel(model),
......
......@@ -43,6 +43,8 @@ export function useShell({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
rawScript: ''
} as INodeData)
......@@ -72,6 +74,7 @@ export function useShell({
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useShell(model),
......
......@@ -41,6 +41,8 @@ export function useSqoop({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
isCustomTask: false,
......@@ -91,6 +93,7 @@ export function useSqoop({
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useSqoop(model),
......
......@@ -365,6 +365,8 @@ interface INodeData
environmentCode?: number | null
failRetryInterval?: number
failRetryTimes?: number
cpuQuota?: number
memoryMax?: number
flag?: 'YES' | 'NO'
taskGroupId?: number
taskGroupPriority?: number
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册