...
 
Commits (5)
    https://gitcode.net/apache/dolphinscheduler/-/commit/2dbc79693e99f7a10d8bddeef691d9321dc5f10c [feature][task plugin] Add new task plugin for apache linkis (#12693) 2022-11-17T10:14:36+08:00 Assert 42203474+shangeyao@users.noreply.github.com https://gitcode.net/apache/dolphinscheduler/-/commit/d02991a2e6332609e6d745d769f5bebbe17ed78f [Bug] [Alert] Ignore alert not write info to db (#12867) 2022-11-17T14:14:12+08:00 旺阳 qingwli@cisco.com * add alert not match return info https://gitcode.net/apache/dolphinscheduler/-/commit/528f45acc5a899d96dbe7caca1a05f8174648e2f [improve]Source skip check (#12900) 2022-11-17T14:23:12+08:00 insist777 84278047+insist777@users.noreply.github.com * Source skip check https://gitcode.net/apache/dolphinscheduler/-/commit/7c90bf01bcd19949f7e3c3a92f7169093ef3700c [Improvement][Task Plugin] Improvement Kubeflow task plugin (#12928) 2022-11-17T15:59:44+08:00 JieguangZhou jieguang_zhou@163.com add example check phase in status:conditions https://gitcode.net/apache/dolphinscheduler/-/commit/401fb4edd4760abbad5f569446bb421c6f096da1 Fix execute shell task exception no dolphinscheduler_env.sh file execute perm... 2022-11-17T21:43:35+08:00 Kerwin 37063904+zhuangchong@users.noreply.github.com
......@@ -217,6 +217,10 @@ export default {
title: 'Kubeflow',
link: '/en-us/docs/dev/user_doc/guide/task/kubeflow.html',
},
{
title: 'Apache Linkis',
link: '/en-us/docs/dev/user_doc/guide/task/linkis.html',
},
],
},
{
......@@ -877,6 +881,10 @@ export default {
title: 'Kubeflow',
link: '/zh-cn/docs/dev/user_doc/guide/task/kubeflow.html',
},
{
title: 'Apache Linkis',
link: '/zh-cn/docs/dev/user_doc/guide/task/linkis.html',
},
],
},
{
......
......@@ -26,7 +26,31 @@ The task plugin picture is as follows
### Here are some specific parameters for the Kubeflow plugin
- **Namespace**:The namespace parameter of the cluster
- **yamlContent**:CRD YAML file content
- **yamlContent**:CRD YAML file content, for example:
```yaml
apiVersion: "kubeflow.org/v1"
kind: TFJob
metadata:
name: tfjob-simple
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
command:
- "python"
- "/var/tf_mnist/mnist_with_summaries.py"
```
## Environment Configuration
......
# Apache Linkis
## Overview
`Linkis` task type for creating and executing `Linkis` tasks. When the worker executes this task, it will parse the shell parameters through the `linkis-cli` command.
Click [here](https://linkis.apache.org/) for more information about `Apache Linkis`.
## Create Task
- Click Project Management -> Project Name -> Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
- Drag the <img src="../../../../img/tasks/icons/linkis.png" width="15"/> from the toolbar to the drawing board.
## Task Parameter
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix]&#40;appendix.md#default-task-parameters&#41; `Default Task Parameters` section for default parameters.)
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
- Please refer to [Linkis-Cli Task Parameters](https://linkis.apache.org/zh-CN/docs/latest/user-guide/linkiscli-manual) `Linkis Support Parameters` section for Linkis parameters.
## Task Example
This sample demonstrates using the Spark engine to execute sql script.
### Configuring the Linkis environment in DolphinScheduler
If you want to use the Linkis task type in the production environment, you need to configure the required environment first. The configuration file is as follows: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
![linkis_task01](../../../../img/tasks/demo/linkis_task01.png)
### Configuring Linkis Task Node
According to the above parameter description, configure the required content.
![linkis_task02](../../../../img/tasks/demo/linkis_task02.png)
### Config example
```
sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;" -submitUser hadoop -proxyUser hadoop
```
### Attention
- No need to fill `sh ./bin/linkis-cli` in the configuration column, it has been configured in advance.
- The default configuration is asynchronous submission. You do not need to configure the `--async` parameter.
......@@ -24,7 +24,31 @@
### Kubeflow组件独有的参数
- **Namespace**:集群命名空间参数
- **yamlContent**:CRD YAML文件内容
- **yamlContent**:CRD YAML文件内容, 如:
```yaml
apiVersion: "kubeflow.org/v1"
kind: TFJob
metadata:
name: tfjob-simple
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
command:
- "python"
- "/var/tf_mnist/mnist_with_summaries.py"
```
## 环境配置
......
# Apache Linkis
## 综述
`Linkis` 任务类型,用于创建并执行 `Linkis` 类型任务。worker 执行该任务的时候,会通过 `linkis-cli` 执行命令行。
点击 [这里](https://linkis.apache.org/) 获取更多关于 `Apache Linkis` 的信息。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的<img src="../../../../img/tasks/icons/linkis.png" width="15"/> 任务节点到画板中。
## 任务参数
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录]&#40;appendix.md#默认任务参数&#41;`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
- Linkis支持的参数列表请参考[linkis-cli任务参数](https://linkis.apache.org/zh-CN/docs/latest/user-guide/linkiscli-manual)`支持的参数列表`一栏。
## 任务样例
该样例演示为使用 Spark 引擎提交sql执行。
### 在 DolphinScheduler 中配置 Linkis 环境
若生产环境中要是使用到 Linkis 任务类型,则需要先配置好所需的环境,配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`
![linkis_task01](../../../../img/tasks/demo/linkis_task01.png)
### 配置 Linkis 任务节点
根据上述参数说明,配置所需的内容即可。
![linkis_task02](../../../../img/tasks/demo/linkis_task02.png)
### Config 样例
```
sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;" -submitUser hadoop -proxyUser hadoop
```
### 注意事项
- 无需在配置栏里再填写`sh ./bin/linkis-cli`,已提前配置。
- 配置默认为异步提交,您无需再配置`--async`参数。
......@@ -255,10 +255,13 @@ public final class AlertSenderService extends Thread {
}
if (!sendWarning) {
String message = String.format(
"Alert Plugin %s send ignore warning type not match: plugin warning type is %s, alert data warning type is %s",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
logger.info(
"Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
return null;
return new AlertResult("false", message);
}
AlertInfo alertInfo = AlertInfo.builder()
......
......@@ -119,13 +119,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
// check connect
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(datasourceParam);
Result<Object> isConnection = checkConnection(datasourceParam.getType(), connectionParam);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
// build datasource
DataSource dataSource = new DataSource();
......@@ -202,11 +196,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
connectionParam.setPassword(oldParams.path(Constants.PASSWORD).asText());
}
Result<Object> isConnection = checkConnection(dataSource.getType(), connectionParam);
if (isConnection.isFailed()) {
return isConnection;
}
Date now = new Date();
dataSource.setName(dataSourceParam.getName().trim());
......
......@@ -57,3 +57,4 @@ task:
- 'ZEPPELIN'
- 'CHUNJUN'
- 'DATASYNC'
- 'LINKIS'
......@@ -128,19 +128,12 @@ public class DataSourceServiceTest {
try (
MockedStatic<DataSourceClientProvider> mockedStaticDataSourceClientProvider =
Mockito.mockStatic(DataSourceClientProvider.class)) {
// DATASOURCE_CONNECT_FAILED
DataSourceClientProvider clientProvider = Mockito.mock(DataSourceClientProvider.class);
mockedStaticDataSourceClientProvider.when(DataSourceClientProvider::getInstance).thenReturn(clientProvider);
Mockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
Mockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(null);
Result connectFailedResult = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assertions.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(),
connectFailedResult.getCode().intValue());
// SUCCESS
Connection connection = Mockito.mock(Connection.class);
Mockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
Result success = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assertions.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
}
......@@ -204,15 +197,9 @@ public class DataSourceServiceTest {
DataSourceClientProvider clientProvider = Mockito.mock(DataSourceClientProvider.class);
mockedStaticDataSourceClientProvider.when(DataSourceClientProvider::getInstance).thenReturn(clientProvider);
Mockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(null);
Mockito.when(dataSourceMapper.queryDataSourceByName(postgreSqlDatasourceParam.getName())).thenReturn(null);
Result connectFailed =
dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assertions.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), connectFailed.getCode().intValue());
// SUCCESS
Connection connection = Mockito.mock(Connection.class);
Mockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assertions.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
}
......
......@@ -236,6 +236,11 @@
<artifactId>dolphinscheduler-task-kubeflow</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-linkis</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -75,6 +77,7 @@ public class KubeflowHelper {
}
JsonNode status = data.get("status");
String lastConditionType = "";
if (status.has("conditions")) {
JsonNode conditions = status.get("conditions");
for (int x = messageIndex; x < conditions.size(); x = x + 1) {
......@@ -83,10 +86,14 @@ public class KubeflowHelper {
logger.info(stepMessage);
}
messageIndex = conditions.size();
JsonNode lastCondition = conditions.get(conditions.size() - 1);
lastConditionType = lastCondition.has("type") ? lastCondition.get("type").asText() : "";
}
String phase;
if (status.has("phase")) {
phase = status.get("phase").asText();
} else if (StringUtils.isNotEmpty(lastConditionType)) {
phase = lastConditionType;
} else {
phase = "";
}
......
......@@ -109,6 +109,7 @@ public class KubeflowTask extends AbstractRemoteTask {
logger.info("Kubeflow task delete command: \n{}", command);
String message = runCommand(command);
logger.info("Kubeflow task delete result: \n{}", message);
exitStatusCode = TaskConstants.EXIT_CODE_KILL;
}
protected String runCommand(String command) {
......
......@@ -105,7 +105,7 @@ public class KubeflowTaskTest {
KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("delete_result");
task.cancelApplication();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_KILL);
}
public KubeflowTask createTask(KubeflowParameters kubeflowParameters) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-task-linkis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
public class Constants {
private Constants() {
throw new IllegalStateException("Utility class");
}
public static final String SHELL_CLI_OPTIONS = "${LINKIS_HOME}/bin/linkis-cli";
public static final String KILL_OPTIONS = "--kill";
public static final String STATUS_OPTIONS = "--status";
public static final String ASYNC_OPTIONS = "--async true";
public static final String SPACE = " ";
public static final String LINKIS_TASK_ID_REGEX = "\"taskID\": \"\\d+";
public static final String LINKIS_STATUS_REGEX = "\"status\": \"\\w+";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.commons.lang3.StringUtils;
public enum LinkisJobStatus {
UNSUBMITTED("Unsubmitted", 0),
SUBMITTING("Submitting", 1),
INITED("Inited", 2),
WAIT_FOR_RETRY("WaitForRetry", 3),
SCHEDULED("Scheduled", 4),
RUNNING("Running", 5),
SUCCEED("Succeed", 6),
FAILED("Failed", 7),
CANCELLED("Cancelled", 8),
TIMEOUT("Timeout", 9),
UNKNOWN("Unknown", 10),
SHUTTINGDOWN("Shuttingdown", 11);
private String name;
private int id;
LinkisJobStatus(String name, int id) {
this.name = name;
this.id = id;
}
public static LinkisJobStatus convertFromJobStatusString(String status) {
if (StringUtils.isNotBlank(status)) {
if (LinkisJobStatus.INITED.name().equalsIgnoreCase(status))
return LinkisJobStatus.INITED;
else if (LinkisJobStatus.WAIT_FOR_RETRY.name().equalsIgnoreCase(status))
return LinkisJobStatus.WAIT_FOR_RETRY;
else if (LinkisJobStatus.SCHEDULED.name().equalsIgnoreCase(status))
return LinkisJobStatus.SCHEDULED;
else if (LinkisJobStatus.RUNNING.name().equalsIgnoreCase(status))
return LinkisJobStatus.RUNNING;
else if (LinkisJobStatus.SUCCEED.name().equalsIgnoreCase(status))
return LinkisJobStatus.SUCCEED;
else if (LinkisJobStatus.FAILED.name().equalsIgnoreCase(status))
return LinkisJobStatus.FAILED;
else if (LinkisJobStatus.CANCELLED.name().equalsIgnoreCase(status))
return LinkisJobStatus.CANCELLED;
else if (LinkisJobStatus.TIMEOUT.name().equalsIgnoreCase(status))
return LinkisJobStatus.TIMEOUT;
else
return LinkisJobStatus.UNKNOWN;
} else {
return LinkisJobStatus.UNKNOWN;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
@Data
public class LinkisParameters extends AbstractParameters {
private Boolean useCustom;
private List<Param> paramScript;
private String rawScript;
@Getter
@Setter
public static class Param {
private String props;
private String value;
}
@Override
public boolean checkParameters() {
return ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
|| (BooleanUtils.isFalse(useCustom) && paramScript.size() > 0));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* linkis task
*/
public class LinkisTask extends AbstractRemoteTask {
/**
* linkis parameters
*/
private LinkisParameters linkisParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
protected final TaskExecutionContext taskExecutionContext;
private String taskId;
protected static final Pattern LINKIS_TASK_ID_REGEX = Pattern.compile(Constants.LINKIS_TASK_ID_REGEX);
protected static final Pattern LINKIS_STATUS_REGEX = Pattern.compile(Constants.LINKIS_STATUS_REGEX);
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public LinkisTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
public void init() {
logger.info("Linkis task params {}", taskExecutionContext.getTaskParams());
if (!linkisParameters.checkParameters()) {
throw new RuntimeException("Linkis task params is not valid");
}
}
@Override
public void submitApplication() throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
linkisParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
logger.error("Linkis task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute Linkis task failed", e);
}
}
@Override
public void trackApplicationStatus() throws TaskException {
initTaskId();
try {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.STATUS_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) {
case FAILED:
setExitStatusCode(EXIT_CODE_FAILURE);
break;
case SUCCEED:
setExitStatusCode(EXIT_CODE_SUCCESS);
break;
case CANCELLED:
setExitStatusCode(EXIT_CODE_KILL);
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
throw new TaskException("track linkis status error", e);
}
}
@Override
public void cancelApplication() throws TaskException {
// cancel process
initTaskId();
try {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.KILL_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
shellCommandExecutor.run(command);
setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
throw new TaskException("cancel linkis task error", e);
}
}
private String buildCommand() {
List<String> args = new ArrayList<>();
args.addAll(buildOptions());
String command = String.join(Constants.SPACE, args);
logger.info("Linkis task command: {}", command);
return command;
}
protected List<String> buildOptions() {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.ASYNC_OPTIONS);
if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) {
args.add(buildCustomConfigContent());
} else {
args.add(buildParamConfigContent());
}
return args;
}
private String buildCustomConfigContent() {
logger.info("raw custom config content : {}", linkisParameters.getRawScript());
String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
return script;
}
private String buildParamConfigContent() {
logger.info("raw param config content : {}", linkisParameters.getParamScript());
String script = "";
List<LinkisParameters.Param> paramList = linkisParameters.getParamScript();
for (LinkisParameters.Param param : paramList) {
script = script.concat(param.getProps())
.concat(Constants.SPACE)
.concat(param.getValue());
}
script = parseScript(script);
return script;
}
private void initTaskId() {
if (taskId == null) {
if (StringUtils.isNotEmpty(getAppIds())) {
taskId = getAppIds();
}
}
if (taskId == null) {
throw new TaskException("linkis task id is null");
}
}
protected String findTaskId(String line) {
Matcher matcher = LINKIS_TASK_ID_REGEX.matcher(line);
if (matcher.find()) {
String str = matcher.group();
return str.substring(11);
}
return null;
}
protected String findStatus(String line) {
Matcher matcher = LINKIS_STATUS_REGEX.matcher(line);
if (matcher.find()) {
String str = matcher.group();
return str.substring(11);
}
return null;
}
@Override
public AbstractParameters getParameters() {
return linkisParameters;
}
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
public void setLinkisParameters(LinkisParameters linkisParameters) {
this.linkisParameters = linkisParameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
public class LinkisTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public LinkisTask createTask(TaskExecutionContext taskRequest) {
return new LinkisTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), LinkisParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class LinkisTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new LinkisTaskChannel();
}
@Override
public String getName() {
return "LINKIS";
}
@Override
public List<PluginParams> getParams() {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class LinkisTaskTest {
@Test
public void testBuildLinkisExecuteCommand() throws Exception {
Assertions.assertEquals("sh ./bin/shell-cli -engineType spark-2.4.3",
testBuildRunCommandLine(testBuildLinkisParameters()));
}
private LinkisParameters testBuildLinkisParameters() {
LinkisParameters linkisParameters = new LinkisParameters();
List<LinkisParameters.Param> testParamList = new ArrayList<>();
LinkisParameters.Param testParam = new LinkisParameters.Param();
testParam.setProps("-engineType");
testParam.setValue("spark-2.4.3");
testParamList.add(testParam);
linkisParameters.setUseCustom(false);
linkisParameters.setParamScript(testParamList);
return linkisParameters;
}
private static String testBuildRunCommandLine(LinkisParameters linkisParameters) {
List<String> args = new ArrayList<>();
String script = "";
List<LinkisParameters.Param> paramList = linkisParameters.getParamScript();
for (LinkisParameters.Param param : paramList) {
script = script.concat(param.getProps())
.concat(Constants.SPACE)
.concat(param.getValue());
}
args.add("sh ./bin/shell-cli");
args.add(script);
return String.join(Constants.SPACE, args);
}
}
......@@ -65,6 +65,7 @@
<module>dolphinscheduler-task-dms</module>
<module>dolphinscheduler-task-datasync</module>
<module>dolphinscheduler-task-kubeflow</module>
<module>dolphinscheduler-task-linkis</module>
</modules>
<dependencyManagement>
......
......@@ -145,6 +145,10 @@ export const TASK_TYPES_MAP = {
KUBEFLOW: {
alias: 'KUBEFLOW',
helperLinkDisable: true
},
LINKIS: {
alias: 'LINKIS',
helperLinkDisable: true
}
} as {
[key in TaskType]: {
......
......@@ -55,6 +55,7 @@ type TaskType =
| 'DMS'
| 'DATASYNC'
| 'KUBEFLOW'
| 'LINKIS'
type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
type DependentResultType = {
......
......@@ -83,3 +83,4 @@ export { useHiveCli } from './use-hive-cli'
export { useDms } from './use-dms'
export { useDatasync } from './use-datasync'
export { useKubeflow } from './use-kubeflow'
export { useLinkis } from './use-linkis'
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCustomParams } from '.'
import type { IJsonItem } from '../types'
export function useLinkis(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
const parmaEditorSpan = computed(() => (model.useCustom ? 0 : 24))
computed(() => (model.useCustom ? 0 : 24));
return [
{
type: 'switch',
field: 'useCustom',
name: t('project.node.custom_config')
},
{
type: 'custom-parameters',
field: 'paramScript',
name: t('project.node.option_parameters'),
span: parmaEditorSpan,
children: [
{
type: 'input',
field: 'prop',
span: 10,
props: {
placeholder: t('project.node.prop_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.prop_tips'))
}
const sameItems = model.localParams.filter(
(item: { prop: string }) => item.prop === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 10,
props: {
placeholder: t('project.node.value_tips'),
maxLength: 256
}
}
]
},
{
type: 'editor',
field: 'rawScript',
name: t('project.node.script'),
span: configEditorSpan,
validate: {
trigger: ['input', 'trigger'],
required: model.useCustom,
validator(validate: any, value: string) {
if (model.useCustom && !value) {
return new Error(t('project.node.script_tips'))
}
}
}
},
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
......@@ -460,6 +460,12 @@ export function formatParams(data: INodeData): {
taskParams.yamlContent = data.yamlContent
taskParams.namespace = data.namespace
}
if (data.taskType === 'LINKIS') {
taskParams.useCustom = data.useCustom
taskParams.paramScript = data.paramScript
taskParams.rawScript = data.rawScript
}
let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) {
......
......@@ -49,7 +49,7 @@ import { useHiveCli } from './use-hive-cli'
import { useDms } from './use-dms'
import { useDatasync } from './use-datasync'
import { useKubeflow } from './use-kubeflow'
import { useLinkis } from './use-linkis'
export default {
SHELL: useShell,
......@@ -85,5 +85,6 @@ export default {
HIVECLI: useHiveCli,
DMS: useDms,
DATASYNC: useDatasync,
KUBEFLOW: useKubeflow
KUBEFLOW: useKubeflow,
LINKIS: useLinkis
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useLinkis({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'LINKIS',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN'],
useCustom: false,
paramScript: [
{
prop: '',
value: ''
},
],
rawScript: ''
} as INodeData)
return {
json: [
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useLinkis(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}
......@@ -396,6 +396,7 @@ interface ITaskParams {
name?: string
cloudWatchLogGroupArn?: string
yamlContent?: string
paramScript?: ILocalParam[]
}
interface INodeData
......
......@@ -49,6 +49,7 @@ export type TaskType =
| 'DMS'
| 'DATASYNC'
| 'KUBEFLOW'
| 'LINKIS'
export type TaskExecuteType = 'STREAM' | 'BATCH'
......@@ -175,6 +176,10 @@ export const TASK_TYPES_MAP = {
KUBEFLOW: {
alias: 'KUBEFLOW',
helperLinkDisable: true
},
LINKIS: {
alias: 'LINKIS',
helperLinkDisable: true
}
} as {
[key in TaskType]: {
......
......@@ -196,10 +196,13 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/dms.png');
}
&.icon-datasync {
background-image: url('/images/task-icons/datasync_hover.png');
background-image: url('/images/task-icons/datasync.png');
}
&.icon-linkis {
background-image: url('/images/task-icons/linkis.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow_hover.png');
background-image: url('/images/task-icons/kubeflow.png');
}
}
......@@ -300,10 +303,13 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/dms_hover.png');
}
&.icon-datasync {
background-image: url('/images/task-icons/datasync.png');
background-image: url('/images/task-icons/datasync_hover.png');
}
&.icon-linkis {
background-image: url('/images/task-icons/linkis_hover.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow.png');
background-image: url('/images/task-icons/kubeflow_hover.png');
}
}
}
......
......@@ -21,7 +21,6 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf
export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
......
......@@ -31,6 +31,7 @@ export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export LINKIS_HOME=${LINKIS_HOME:-/opt/soft/linkis}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
......