未验证 提交 229c5549 编写于 作者: J JieguangZhou 提交者: GitHub

[feature][task] Add Kubeflow task plugin for MLOps scenario (#12843)

上级 019e7475
......@@ -213,6 +213,10 @@ export default {
title: 'AWS Datasync',
link: '/en-us/docs/dev/user_doc/guide/task/datasync.html',
},
{
title: 'Kubeflow',
link: '/en-us/docs/dev/user_doc/guide/task/kubeflow.html',
},
],
},
{
......@@ -869,6 +873,10 @@ export default {
title: 'AWS Datasync',
link: '/zh-cn/docs/dev/user_doc/guide/task/datasync.html',
},
{
title: 'Kubeflow',
link: '/zh-cn/docs/dev/user_doc/guide/task/kubeflow.html',
},
],
},
{
......
# Kubeflow Node
## Overview
[Kubeflow](https://www.kubeflow.org) task type is used to create tasks on Kubeflow.
The backend mainly uses the `kubectl` command to create kubeflow tasks, and continues to monitor the resource status on Kubeflow until the task is completed.
Now it mainly supports creating kubeflow tasks using yaml files. If you need to publish `kubeflow pipeline` tasks, you can use the [python task type](./python.md).
## Create Task
- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
- Drag <img src="../../../../img/tasks/icons/kubeflow.png" width="15"/> from the toolbar to the canvas.
## Task Example
The task plugin picture is as follows
![kubeflow](../../../../img/tasks/demo/kubeflow.png)
### First, introduce some general parameters of DolphinScheduler
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
### Here are some specific parameters for the Kubeflow plugin
- **Namespace**:The namespace parameter of the cluster
- **yamlContent**:CRD YAML file content
## Environment Configuration
**Configure Kubernetes environment**
Reference [Cluster Management and Namespace Management](../security.md).
Only the required fields need to be filled in, and the others do not need to be filled in. The resource management depends on the YAML file definition in the specific Job.
**kubectl**
Install [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/), and make sure `kubectl` can submit tasks to kubeflow normally.
# Kubeflow
## 综述
[Kubeflow](https://www.kubeflow.org) 任务类型,用于在Kubeflow上创建任务。
后台主要使用 `kubectl` 命令来创建kubeflow任务, 并持续Kubeflow上资源状态直至任务完成。
目前主要支持通过使用yaml文件来创建kubeflow任务。 如果需要发布`kubeflow pipeline`任务可以使用[python任务类型](./python.md)
## 创建任务
- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的 <img src="../../../../img/tasks/icons/kubeflow.png" width="15"/> 任务节点到画板中。
## 任务样例
组件图示如下:
![kubeflow](../../../../img/tasks/demo/kubeflow.png)
### 首先介绍一些DS通用参数
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
### Kubeflow组件独有的参数
- **Namespace**:集群命名空间参数
- **yamlContent**:CRD YAML文件内容
## 环境配置
**配置Kubernetes环境**
参考[集群管理和命名空间管理](../security.md)
只需填写必填项即可,其他无需填写,资源管理依赖于具体Job中的YAML文件定义。
**kubectl**
安装[kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/),并确保`kubectl`能正常提交任务到kubeflow。
......@@ -51,8 +51,9 @@ task:
- 'DVC'
- 'SAGEMAKER'
- 'PYTORCH'
- 'KUBEFLOW'
other:
- 'PIGEON'
- 'ZEPPELIN'
- 'CHUNJUN'
- 'DATASYNC'
\ No newline at end of file
- 'DATASYNC'
......@@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLAS
import static org.apache.dolphinscheduler.common.constants.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
......@@ -325,7 +325,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
}
K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
if (TASK_TYPE_K8S.equalsIgnoreCase(taskInstance.getTaskType())) {
if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}
......
......@@ -230,6 +230,12 @@
<artifactId>dolphinscheduler-task-datasync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-kubeflow</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.plugin.task.api;
import java.time.Duration;
import java.util.Set;
import com.google.common.collect.Sets;
public class TaskConstants {
......@@ -433,6 +436,8 @@ public class TaskConstants {
public static final String TASK_TYPE_K8S = "K8S";
public static final Set<String> TASK_TYPE_SET_K8S = Sets.newHashSet("K8S", "KUBEFLOW");
public static final String TASK_TYPE_BLOCKING = "BLOCKING";
public static final String TASK_TYPE_STREAM = "STREAM";
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-task-kubeflow</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-api</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
public class KubeflowHelper {
protected final Logger logger =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
private final String clusterConfigPath;
private int messageIndex = 0;
public KubeflowHelper(String clusterConfigPath) {
this.clusterConfigPath = clusterConfigPath;
}
public String buildSubmitCommand(String yamlFilePATH) {
List<String> args = new ArrayList<>();
args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
args.add(String.format(COMMAND.APPLY, yamlFilePATH));
return String.join("\n", args);
}
public String buildGetCommand(String yamlFilePATH) {
List<String> args = new ArrayList<>();
args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
args.add(String.format(COMMAND.GET, yamlFilePATH));
return String.join("\n", args);
}
public String buildDeleteCommand(String yamlFilePATH) {
List<String> args = new ArrayList<>();
args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
args.add(String.format(COMMAND.DELETE, yamlFilePATH));
return String.join("\n", args);
}
public String parseGetMessage(String message) {
JsonNode data = JSONUtils.parseObject(message);
if (!data.has("status")) {
return "";
}
JsonNode status = data.get("status");
if (status.has("conditions")) {
JsonNode conditions = status.get("conditions");
for (int x = messageIndex; x < conditions.size(); x = x + 1) {
JsonNode condition = conditions.get(x);
String stepMessage = condition.toString();
logger.info(stepMessage);
}
messageIndex = conditions.size();
}
String phase;
if (status.has("phase")) {
phase = status.get("phase").asText();
} else {
phase = "";
}
return phase;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ApplicationIds {
boolean isAlreadySubmitted;
}
public static class STATUS {
public static final HashSet<String> SUCCESS_SET = Sets.newHashSet("Succeeded", "Available", "Bound");
public static final HashSet<String> FAILED_SET = Sets.newHashSet("Failed");
}
public static class CONSTANTS {
public static final int TRACK_INTERVAL = 3000;
public static final String YAML_FILE_PATH = "kubeflow.yaml";
public static final String CLUSTER_CONFIG_PATH = ".cluster.yaml";
}
public static class COMMAND {
public static final String SET_CONFIG = "export KUBECONFIG=%s";
public static final String APPLY = "kubectl apply -f %s";
public static final String GET = "kubectl get -f %s -o json";
public static final String DELETE = "kubectl delete -f %s";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.commons.lang3.StringUtils;
import lombok.Data;
@Data
public class KubeflowParameters extends AbstractParameters {
private String yamlContent;
private String clusterYAML;
public boolean checkParameters() {
return StringUtils.isNotEmpty(yamlContent);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class KubeflowTask extends AbstractRemoteTask {
private final TaskExecutionContext taskExecutionContext;
protected KubeflowHelper kubeflowHelper;
private KubeflowParameters kubeflowParameters;
private Path clusterYAMLPath;
private Path yamlPath;
public KubeflowTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() throws TaskException {
logger.info("Kubeflow task params {}", taskExecutionContext.getTaskParams());
kubeflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), KubeflowParameters.class);
kubeflowParameters.setClusterYAML(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml());
if (!kubeflowParameters.checkParameters()) {
throw new TaskException("Kubeflow task params is not valid");
}
writeFiles();
kubeflowHelper = new KubeflowHelper(clusterYAMLPath.toString());
}
@Override
public void submitApplication() throws TaskException {
String command = kubeflowHelper.buildSubmitCommand(yamlPath.toString());
logger.info("Kubeflow task submit command: \n{}", command);
String message = runCommand(command);
logger.info("Kubeflow task submit result: \n{}", message);
KubeflowHelper.ApplicationIds applicationIds = new KubeflowHelper.ApplicationIds();
applicationIds.setAlreadySubmitted(true);
setAppIds(JSONUtils.toJsonString(applicationIds));
}
/**
* keep checking application status
*
* @throws TaskException
*/
@Override
public void trackApplicationStatus() throws TaskException {
String command = kubeflowHelper.buildGetCommand(yamlPath.toString());
logger.info("Kubeflow task get command: \n{}", command);
do {
ThreadUtils.sleep(KubeflowHelper.CONSTANTS.TRACK_INTERVAL);
String message = runCommand(command);
String phase = kubeflowHelper.parseGetMessage(message);
if (KubeflowHelper.STATUS.FAILED_SET.contains(phase)) {
exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
logger.info("Kubeflow task get Failed result: \n{}", message);
break;
} else if (KubeflowHelper.STATUS.SUCCESS_SET.contains(phase)) {
exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
logger.info("Kubeflow task get Succeeded result: \n{}", message);
break;
}
} while (true);
}
@Override
public void cancelApplication() throws TaskException {
String command = kubeflowHelper.buildDeleteCommand(yamlPath.toString());
logger.info("Kubeflow task delete command: \n{}", command);
String message = runCommand(command);
logger.info("Kubeflow task delete result: \n{}", message);
}
protected String runCommand(String command) {
try {
exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
return OSUtils.exeShell(new String[]{"sh", "-c", command});
} catch (Exception e) {
exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
throw new TaskException("Kubeflow task submit command failed", e);
}
}
@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
public void writeFiles() {
String yamlContent = kubeflowParameters.getYamlContent();
String clusterYAML = kubeflowParameters.getClusterYAML();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
yamlContent = ParameterUtils.convertParameterPlaceholders(yamlContent, ParamUtils.convert(paramsMap));
yamlPath = Paths.get(taskExecutionContext.getExecutePath(), KubeflowHelper.CONSTANTS.YAML_FILE_PATH);
clusterYAMLPath =
Paths.get(taskExecutionContext.getExecutePath(), KubeflowHelper.CONSTANTS.CLUSTER_CONFIG_PATH);
logger.info("Kubeflow task yaml content: \n{}", yamlContent);
try {
Files.write(yamlPath, yamlContent.getBytes(), StandardOpenOption.CREATE);
Files.write(clusterYAMLPath, clusterYAML.getBytes(), StandardOpenOption.CREATE);
} catch (IOException e) {
throw new TaskException("Kubeflow task write yaml file failed", e);
}
}
@Override
public KubeflowParameters getParameters() {
return kubeflowParameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
public class KubeflowTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public KubeflowTask createTask(TaskExecutionContext taskRequest) {
return new KubeflowTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), KubeflowParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.Collections;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class KubeflowTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new KubeflowTaskChannel();
}
@Override
public String getName() {
return "KUBEFLOW";
}
@Override
public List<PluginParams> getParams() {
return Collections.emptyList();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class KubeflowHelperTest {
private String clusterConfigPath = "/tmp/dolphinscheduler/.kube/config";
private KubeflowHelper kubeflowHelper;
@BeforeEach
public void init() {
kubeflowHelper = new KubeflowHelper(clusterConfigPath);
}
@Test
public void testBuildSubmitCommand() {
String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
String command = kubeflowHelper.buildSubmitCommand(yamlFilePATH);
String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
String.format("kubectl apply -f %s", yamlFilePATH);
Assertions.assertEquals(expectCommand, command);
}
@Test
public void testBuildGetCommand() {
String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
String command = kubeflowHelper.buildGetCommand(yamlFilePATH);
String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
String.format("kubectl get -f %s -o json", yamlFilePATH);
Assertions.assertEquals(expectCommand, command);
}
@Test
public void testBuildDeleteCommand() {
String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
String command = kubeflowHelper.buildDeleteCommand(yamlFilePATH);
String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
String.format("kubectl delete -f %s", yamlFilePATH);
Assertions.assertEquals(expectCommand, command);
}
@Test
public void testParseGetMessage() {
String message = "{\n" +
" \"apiVersion\": \"kubeflow.org/v1\",\n" +
" \"kind\": \"PyTorchJob\",\n" +
" \"status\": {\n" +
" \"conditions\": [\n" +
" {\n" +
" \"key\": \"value\"\n" +
" },\n" +
" {\n" +
" \"key\": \"value\"\n" +
" }\n" +
" ],\n" +
" \"phase\": \"Succeeded\"\n" +
" }\n" +
"}\n";
Assertions.assertEquals("Succeeded", kubeflowHelper.parseGetMessage(message));
String messageError1 = "{\n" +
" \"apiVersion\": \"kubeflow.org/v1\",\n" +
" \"kind\": \"PyTorchJob\"\n" +
"}\n";
Assertions.assertDoesNotThrow(() -> kubeflowHelper.parseGetMessage(messageError1));
String messageError2 = "{\n" +
" \"apiVersion\": \"kubeflow.org/v1\",\n" +
" \"kind\": \"PyTorchJob\",\n" +
" \"status\": {\n" +
" \"phase\": \"Failed\"\n" +
" }\n" +
"}\n";
Assertions.assertEquals("Failed", kubeflowHelper.parseGetMessage(messageError2));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class KubeflowTaskTest {
public static String clusterConfigName = "clusterConfigYAML.yaml";
public static String jobConfigName = "jobConfigYAML.yaml";
public static String readFile(String fileName) throws IOException {
String path = KubeflowHelperTest.class.getClassLoader().getResource(fileName).getPath();
String content = Files.lines(Paths.get(path), StandardCharsets.UTF_8)
.collect(Collectors.joining(System.lineSeparator()));
return content;
}
@Test
public void testInit() throws IOException {
KubeflowParameters kubeflowParameters = createKubeflowParameters();
KubeflowTask task = createTask(kubeflowParameters);
Assertions.assertEquals(kubeflowParameters.getClusterYAML(), task.getParameters().getClusterYAML());
Assertions.assertEquals(kubeflowParameters.getYamlContent(), task.getParameters().getYamlContent());
KubeflowParameters kubeflowParametersError2 = new KubeflowParameters();
kubeflowParameters.setYamlContent(readFile(clusterConfigName));
Assertions.assertThrows(TaskException.class, () -> {
createTask(kubeflowParametersError2);
});
}
@Test
public void TestSubmit() throws IOException {
KubeflowParameters kubeflowParameters = createKubeflowParameters();
KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("test_result");
task.submitApplication();
Assertions.assertNotEquals(task.getAppIds(), null);
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
}
@Test
public void TestTrack() throws IOException {
KubeflowParameters kubeflowParameters = createKubeflowParameters();
TaskExecutionContext taskExecutionContext = createTaskExecutionContext(kubeflowParameters);
TestTask task = Mockito.spy(new TestTask(taskExecutionContext));
Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("track_result");
task.init();
KubeflowHelper kubeflowHelper = Mockito.mock(KubeflowHelper.class);
Mockito.when(kubeflowHelper.buildGetCommand(Mockito.anyString())).thenReturn("");
task.setKubeflowHelper(kubeflowHelper);
Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("Succeeded");
task.trackApplicationStatus();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("Failed");
task.trackApplicationStatus();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_FAILURE);
Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("", "Succeeded");
task.trackApplicationStatus();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
}
@Test
public void TestCancel() throws IOException {
KubeflowParameters kubeflowParameters = createKubeflowParameters();
KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("delete_result");
task.cancelApplication();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
}
public KubeflowTask createTask(KubeflowParameters kubeflowParameters) {
TaskExecutionContext taskExecutionContext = createTaskExecutionContext(kubeflowParameters);
KubeflowTask kubeflowTask = new KubeflowTask(taskExecutionContext);
kubeflowTask.init();
return kubeflowTask;
}
public TaskExecutionContext createTaskExecutionContext(KubeflowParameters kubeflowParameters) {
String parameters = JSONUtils.toJsonString(kubeflowParameters);
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp/dolphinscheduler/kubeflow");
File file = new File("/tmp/dolphinscheduler/kubeflow");
if (!file.exists()) {
file.mkdirs();
}
Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
.thenReturn(kubeflowParameters.getClusterYAML());
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
public KubeflowParameters createKubeflowParameters() throws IOException {
KubeflowParameters kubeflowParameters = new KubeflowParameters();
kubeflowParameters.setClusterYAML(readFile(clusterConfigName));
kubeflowParameters.setYamlContent(readFile(jobConfigName));
return kubeflowParameters;
}
public static class TestTask extends KubeflowTask {
public TestTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
}
public void setKubeflowHelper(KubeflowHelper kubeflowHelper) {
this.kubeflowHelper = kubeflowHelper;
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: 123
server: https://192.168.2.90:40787
name: kind-kubeflow
contexts:
- context:
cluster: kind-kubeflow
user: kind-kubeflow
name: kind-kubeflow
current-context: kind-kubeflow
kind: Config
preferences: {}
users:
- name: kind-kubeflow
user:
client-certificate-data: 123
client-key-data: 123
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: "kubeflow.org/v1"
kind: TFJob
metadata:
name: test-tfjob
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: repo/mnist_with_summaries:v1
command:
- "python"
- "mnist_with_summaries.py"
......@@ -64,6 +64,7 @@
<module>dolphinscheduler-task-hivecli</module>
<module>dolphinscheduler-task-dms</module>
<module>dolphinscheduler-task-datasync</module>
<module>dolphinscheduler-task-kubeflow</module>
</modules>
<dependencyManagement>
......
......@@ -141,6 +141,10 @@ export const TASK_TYPES_MAP = {
DATASYNC: {
alias: 'DATASYNC',
helperLinkDisable: true
},
KUBEFLOW: {
alias: 'KUBEFLOW',
helperLinkDisable: true
}
} as {
[key in TaskType]: {
......
......@@ -54,6 +54,7 @@ type TaskType =
| 'HIVECLI'
| 'DMS'
| 'DATASYNC'
| 'KUBEFLOW'
type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
type DependentResultType = {
......
......@@ -82,3 +82,4 @@ export { usePytorch } from './use-pytorch'
export { useHiveCli } from './use-hive-cli'
export { useDms } from './use-dms'
export { useDatasync } from './use-datasync'
export { useKubeflow } from './use-kubeflow'
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { IJsonItem } from '../types'
import { useCustomParams, useNamespace } from '.'
export function useKubeflow(model: { [field: string]: any }): IJsonItem[] {
return [
useNamespace(),
{
type: 'editor',
field: 'yamlContent',
name: 'yamlContent',
props: {
language: 'yaml'
},
validate: {
trigger: ['input', 'trigger'],
required: true,
message: 'requestJson'
}
},
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}
......@@ -456,6 +456,11 @@ export function formatParams(data: INodeData): {
taskParams.cloudWatchLogGroupArn = data.cloudWatchLogGroupArn
}
if (data.taskType === 'KUBEFLOW') {
taskParams.yamlContent = data.yamlContent
taskParams.namespace = data.namespace
}
let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) {
......
......@@ -47,7 +47,9 @@ import { useChunjun } from './use-chunjun'
import { usePytorch } from './use-pytorch'
import { useHiveCli } from './use-hive-cli'
import { useDms } from './use-dms'
import {useDatasync} from "./use-datasync";
import { useDatasync } from './use-datasync'
import { useKubeflow } from './use-kubeflow'
export default {
SHELL: useShell,
......@@ -82,5 +84,6 @@ export default {
PYTORCH: usePytorch,
HIVECLI: useHiveCli,
DMS: useDms,
DATASYNC: useDatasync
DATASYNC: useDatasync,
KUBEFLOW: useKubeflow
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData, ITaskData } from '../types'
export function useKubeflow({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'KUBEFLOW',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN']
} as INodeData)
return {
json: [
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useKubeflow(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}
......@@ -395,6 +395,7 @@ interface ITaskParams {
sourceLocationArn?: string
name?: string
cloudWatchLogGroupArn?: string
yamlContent?: string
}
interface INodeData
......
......@@ -48,6 +48,7 @@ export type TaskType =
| 'HIVECLI'
| 'DMS'
| 'DATASYNC'
| 'KUBEFLOW'
export type TaskExecuteType = 'STREAM' | 'BATCH'
......@@ -170,6 +171,10 @@ export const TASK_TYPES_MAP = {
DATASYNC: {
alias: 'DATASYNC',
helperLinkDisable: true
},
KUBEFLOW: {
alias: 'KUBEFLOW',
helperLinkDisable: true
}
} as {
[key in TaskType]: {
......
......@@ -198,6 +198,9 @@ $bgLight: #ffffff;
&.icon-datasync {
background-image: url('/images/task-icons/datasync_hover.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow_hover.png');
}
}
&:hover {
......@@ -299,6 +302,9 @@ $bgLight: #ffffff;
&.icon-datasync {
background-image: url('/images/task-icons/datasync.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow.png');
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册