From b1fb17119b7e929792933040e26c5013313a43f7 Mon Sep 17 00:00:00 2001
From: xiangzihao <460888207@qq.com>
Date: Wed, 8 Jun 2022 19:42:05 +0800
Subject: [PATCH] [Feature-6758][Task] Add limit resource usage for tasks base
on cgroup (#10373)
---
docs/docs/en/architecture/configuration.md | 1 +
docs/docs/en/guide/task/datax.md | 2 +
docs/docs/en/guide/task/jupyter.md | 2 +
docs/docs/en/guide/task/python.md | 2 +
docs/docs/en/guide/task/shell.md | 2 +
docs/docs/zh/architecture/configuration.md | 1 +
docs/docs/zh/guide/task/datax.md | 2 +
docs/docs/zh/guide/task/jupyter.md | 4 +-
docs/docs/zh/guide/task/python.md | 2 +
docs/docs/zh/guide/task/shell.md | 2 +
.../common/model/TaskNode.java | 26 +++++++++++
.../src/main/resources/common.properties | 5 ++-
.../dao/entity/TaskDefinition.java | 32 ++++++++++++-
.../dao/entity/TaskDefinitionLog.java | 2 +
.../dao/entity/TaskInstance.java | 26 +++++++++++
.../dao/mapper/TaskDefinitionLogMapper.xml | 8 ++--
.../dao/mapper/TaskDefinitionMapper.xml | 2 +-
.../dao/mapper/TaskInstanceMapper.xml | 2 +-
.../resources/sql/dolphinscheduler_h2.sql | 6 +++
.../resources/sql/dolphinscheduler_mysql.sql | 6 +++
.../sql/dolphinscheduler_postgresql.sql | 6 +++
.../mysql/dolphinscheduler_ddl.sql | 9 ++++
.../postgresql/dolphinscheduler_ddl.sql | 9 ++++
.../docker/file-manage/common.properties | 3 ++
.../builder/TaskExecutionContextBuilder.java | 2 +
.../master/runner/WorkflowExecuteThread.java | 4 ++
.../service/process/ProcessServiceImpl.java | 2 +
.../dolphinscheduler-task-api/pom.xml | 5 +++
.../task/api/AbstractCommandExecutor.java | 45 +++++++++++++++++--
.../plugin/task/api/TaskExecutionContext.java | 26 +++++++++++
.../AbstractCommandExecutorConstants.java | 27 +++++++++++
.../plugin/task/api/utils/OSUtils.java | 10 +++++
.../src/locales/en_US/project.ts | 2 +
.../src/locales/zh_CN/project.ts | 2 +
.../task/components/node/fields/index.ts | 1 +
.../node/fields/use-resource-limit.ts | 45 +++++++++++++++++++
.../task/components/node/format-data.ts | 4 +-
.../task/components/node/tasks/use-datax.ts | 3 ++
.../task/components/node/tasks/use-jupyter.ts | 3 ++
.../task/components/node/tasks/use-python.ts | 3 ++
.../components/node/tasks/use-sea-tunnel.ts | 3 ++
.../task/components/node/tasks/use-shell.ts | 3 ++
.../task/components/node/tasks/use-sqoop.ts | 3 ++
.../projects/task/components/node/types.ts | 2 +
44 files changed, 345 insertions(+), 12 deletions(-)
create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/AbstractCommandExecutorConstants.java
create mode 100644 dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resource-limit.ts
diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md
index 37718319b..604884f20 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -202,6 +202,7 @@ yarn.resourcemanager.ha.rm.ids||specify the yarn resourcemanager url. if resourc
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false| specify whether in development state
+task.resource.limit.state|false|specify whether in resource limit state
### application-api.properties [API-service log config]
diff --git a/docs/docs/en/guide/task/datax.md b/docs/docs/en/guide/task/datax.md
index d53781775..a57656688 100644
--- a/docs/docs/en/guide/task/datax.md
+++ b/docs/docs/en/guide/task/datax.md
@@ -19,6 +19,8 @@ DataX task type for executing DataX programs. For DataX nodes, the worker will e
- **Environment Name**: Configure the environment name in which to run the script.
- **Number of failed retry attempts**: The number of times the task failed to be resubmitted.
- **Failed retry interval**: The time, in cents, interval for resubmitting the task after a failed task.
+- **Cpu quota**: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- **Max memory**:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- **Delayed execution time**: The time, in cents, that a task is delayed in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail.
- **Custom template**: Custom the content of the DataX node's json profile when the default data source provided does not meet the required requirements.
diff --git a/docs/docs/en/guide/task/jupyter.md b/docs/docs/en/guide/task/jupyter.md
index 92d614d65..94208bbe1 100644
--- a/docs/docs/en/guide/task/jupyter.md
+++ b/docs/docs/en/guide/task/jupyter.md
@@ -25,6 +25,8 @@ Click [here](https://docs.conda.io/en/latest/) for more information about `conda
- Worker grouping: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Conda Env Name: Name of conda environment.
- Input Note Path: Path of input jupyter note template.
diff --git a/docs/docs/en/guide/task/python.md b/docs/docs/en/guide/task/python.md
index 6d0a37669..c54a13c6e 100644
--- a/docs/docs/en/guide/task/python.md
+++ b/docs/docs/en/guide/task/python.md
@@ -20,6 +20,8 @@ it will generate a temporary python script, and executes the script by the Linux
- Environment Name: Configure the environment name in which to run the script.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Script: Python program developed by the user.
- Resource: Refers to the list of resource files that need to be called in the script, and the files uploaded or created by the resource center-file management.
diff --git a/docs/docs/en/guide/task/shell.md b/docs/docs/en/guide/task/shell.md
index e397df7b0..3119cfae1 100644
--- a/docs/docs/en/guide/task/shell.md
+++ b/docs/docs/en/guide/task/shell.md
@@ -19,6 +19,8 @@ Shell task used to create a shell task type and execute a series of shell script
- Environment Name: Configure the environment name in which run the script.
- Times of failed retry attempts: The number of times the task failed to resubmit. You can select from drop-down or fill-in a number.
- Failed retry interval: The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- Script: Shell program developed by users.
- Resource: Refers to the list of resource files that called in the script, and upload or create files by the Resource Center file management.
diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md
index 94cfa57c9..1ea4cc5ad 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -194,6 +194,7 @@ yarn.resourcemanager.ha.rm.ids||yarn resourcemanager 地址, 如果resourcemanag
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false|是否处于开发模式
+task.resource.limit.state|false|是否启用资源限制模式
## 5.application-api.properties [API服务配置]
diff --git a/docs/docs/zh/guide/task/datax.md b/docs/docs/zh/guide/task/datax.md
index 7f9248e9d..49340db72 100644
--- a/docs/docs/zh/guide/task/datax.md
+++ b/docs/docs/zh/guide/task/datax.md
@@ -19,6 +19,8 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 延时执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 自定义模板:当默认提供的数据源不满足所需要求的时,可自定义 datax 节点的 json 配置文件内容。
diff --git a/docs/docs/zh/guide/task/jupyter.md b/docs/docs/zh/guide/task/jupyter.md
index bcde5122d..1372c843b 100644
--- a/docs/docs/zh/guide/task/jupyter.md
+++ b/docs/docs/zh/guide/task/jupyter.md
@@ -26,7 +26,9 @@
- Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
-- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Conda Env Name: Conda环境名称。
- Input Note Path: 输入的jupyter note模板路径。
diff --git a/docs/docs/zh/guide/task/python.md b/docs/docs/zh/guide/task/python.md
index d7e2b9561..c389618cd 100644
--- a/docs/docs/zh/guide/task/python.md
+++ b/docs/docs/zh/guide/task/python.md
@@ -20,6 +20,8 @@ Python 任务类型,用于创建 Python 类型的任务并执行一系列的 P
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的PYTHON程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
diff --git a/docs/docs/zh/guide/task/shell.md b/docs/docs/zh/guide/task/shell.md
index 79c7e4fab..d50a3d04d 100644
--- a/docs/docs/zh/guide/task/shell.md
+++ b/docs/docs/zh/guide/task/shell.md
@@ -19,6 +19,8 @@ Shell 任务类型,用于创建 Shell 类型的任务并执行一系列的 She
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的 SHELL 程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index a9601a6fe..6999ac3ba 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -178,6 +178,16 @@ public class TaskNode {
*/
private int delayTime;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public String getId() {
return id;
}
@@ -497,4 +507,20 @@ public class TaskNode {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
+
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
}
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 76c98d78e..13c7d3a47 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -95,4 +95,7 @@ alert.rpc.port=50052
zeppelin.rest.url=http://localhost:8080
# set path of conda.sh
-conda.path=/opt/anaconda3/etc/profile.d/conda.sh
\ No newline at end of file
+conda.path=/opt/anaconda3/etc/profile.d/conda.sh
+
+# Task resource limit state
+task.resource.limit.state=false
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 0206719a3..c90876449 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -201,6 +201,16 @@ public class TaskDefinition {
*/
private int taskGroupPriority;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public TaskDefinition() {
}
@@ -457,6 +467,22 @@ public class TaskDefinition {
this.environmentCode = environmentCode;
}
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null) {
@@ -481,7 +507,9 @@ public class TaskDefinition {
|| ("".equals(that.resourceIds) && resourceIds == null))
&& environmentCode == that.environmentCode
&& taskGroupId == that.taskGroupId
- && taskGroupPriority == that.taskGroupPriority;
+ && taskGroupPriority == that.taskGroupPriority
+ && Objects.equals(cpuQuota, that.cpuQuota)
+ && Objects.equals(memoryMax, that.memoryMax);
}
@Override
@@ -513,6 +541,8 @@ public class TaskDefinition {
+ ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\''
+ + ", cpuQuota=" + cpuQuota
+ + ", memoryMax=" + memoryMax
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index 7301148e3..f070fdb16 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -70,6 +70,8 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
+ this.setCpuQuota(taskDefinition.getCpuQuota());
+ this.setMemoryMax(taskDefinition.getMemoryMax());
}
public int getOperator() {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 4ec1dfde2..fc66bfd4c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -279,6 +279,16 @@ public class TaskInstance implements Serializable {
*/
private int taskGroupId;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@@ -753,4 +763,20 @@ public class TaskInstance implements Serializable {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
+
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index 14314d0fd..16dc698d2 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -21,7 +21,7 @@
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
- resource_ids, operator, operate_time, create_time, update_time,task_group_id,task_group_priority
+ resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max