未验证 提交 5b0347e8 编写于 作者: A aiwenmo 提交者: GitHub

[Task] Add Dinky task to better support the development and execution of FlinkSQL (#10640)

上级 182b9c9b
......@@ -169,6 +169,10 @@ export default {
title: 'DVC',
link: '/en-us/docs/dev/user_doc/guide/task/dvc.html',
},
{
title: 'Dinky',
link: '/en-us/docs/dev/user_doc/guide/task/dinky.html',
},
],
},
{
......@@ -553,6 +557,10 @@ export default {
title: 'DVC',
link: '/zh-cn/docs/dev/user_doc/guide/task/dvc.html',
},
{
title: 'Dinky',
link: '/zh-cn/docs/dev/user_doc/guide/task/dinky.html',
},
],
},
{
......
# Dinky
## Overview
Use `Dinky Task` to create a dinky-type task and support one-stop development, debugging, operation and maintenance of FlinkSql, Flink jar and SQL. When the worker executes `Dinky Task`,
it will call `Dinky API` to trigger dinky task. Click [here](http://www.dlink.top/) for details about `Dinky`.
## Create Task
- Click Project Management-Project Name-Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
- Drag <img src="../../../../img/tasks/icons/dinky.png" width="15"/> from the toolbar to the canvas.
## Task Parameter
| **Parameter** | **Description** |
| ------- | ---------- |
| Node Name | Set the name of the task. Node names within a workflow definition are unique. |
| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. |
| Description | Describes the function of this node. |
| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. |
| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. |
| Task group name | The group in Resources, if not configured, it will not be used. |
| Environment Name | Configure the environment in which to run the script. |
| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. |
| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. |
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Dinky Address | The url for a dinky server. |
| Dinky Task ID | The unique task id for a dinky task. |
| Online Task | Specify whether the current dinky job is online. If yes, the submitted job can only be submitted successfully when it is published and there is no corresponding Flink job instance running. |
## Task Example
### Dinky Task Example
This example illustrates how to create a dinky task node.
![demo-dinky](../../../../img/tasks/demo/dinky.png)
![demo-get-dinky-task-id](../../../../img/tasks/demo/dinky_task_id.png)
# Dinky
## Overview
`Dinky`任务类型,用于创建并执行`Dinky`类型任务以支撑一站式的开发、调试、运维 FlinkSQL、Flink Jar、SQL。worker 执行该任务的时候,会通过`Dinky API`触发`Dinky 的作业`
点击[这里](http://www.dlink.top/) 获取更多关于`Dinky`的信息。
## Create Task
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
- 工具栏中拖动 <img src="../../../../img/tasks/icons/dinky.png" width="15"/> 到画板中,即可完成创建。
## Task Parameter
| **参数** | **描述** |
|-------------|--------------------------------------------------------------------|
| 任务名称 | 设置任务的名称。一个工作流定义中的节点名称是唯一的。 |
| 运行标志 | 标识这个节点是否可以正常调度。如果不需要执行,可以打开禁止执行开关。 |
| 描述 | 描述该节点的功能。 |
| 任务优先级 | worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。 |
| Worker 分组 | 任务分配给 worker 组的机器机执行,选择 Default,会随机选择一台 worker 机执行。 |
| 任务组名 | 资源中的组,如果未配置,将不会使用。 |
| 环境名称 | 配置运行脚本的环境。 |
| 失败重试次数 | 任务失败重新提交的次数,支持下拉和手填。 |
| 失败重试间隔 | 任务失败重新提交任务的时间间隔,支持下拉和手填。 |
| 超时告警 | 勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败. |
| Dinky 地址 | Dinky 服务的 url。 |
| Dinky 任务 ID | Dinky 作业对应的唯一ID。 |
| 上线作业 | 指定当前 Dinky 作业是否上线,如果是,则该被提交的作业只能处于已发布且当前无对应的 Flink Job 实例在运行才可提交成功。 |
## Task Example
### Dinky Task Example
这个示例展示了如何创建 Dinky 任务节点:
![demo-dinky](../../../../img/tasks/demo/dinky.png)
![demo-get-dinky-task-id](../../../../img/tasks/demo/dinky_task_id.png)
......@@ -177,6 +177,12 @@
<artifactId>dolphinscheduler-task-dvc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-dinky</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>dolphinscheduler-task-dinky</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
public class DinkyParameters extends AbstractParameters {
/**
* parameters for dinky Open API
*
* @see <a href="http://www.dlink.top/docs/administrator_guide/studio/openapi">Dinky_Open_API</a>
*/
private String address;
private String taskId;
private boolean online = false;
@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(this.address) && StringUtils.isNotEmpty(this.taskId);
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return Collections.emptyList();
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public boolean isOnline() {
return online;
}
public void setOnline(boolean online) {
this.online = online;
}
@Override
public String toString() {
return "DinkyParameters{" +
"address='" + address + '\'' +
", taskId='" + taskId + '\'' +
", online='" + online + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
public class DinkyTask extends AbstractTaskExecutor {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* dinky parameters
*/
private DinkyParameters dinkyParameters;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
protected DinkyTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("dinky task params:{}", taskParams);
this.dinkyParameters = JSONUtils.parseObject(taskParams, DinkyParameters.class);
if (this.dinkyParameters == null || !this.dinkyParameters.checkParameters()) {
throw new DinkyTaskException("dinky task params is not valid");
}
}
@Override
public void handle() throws Exception {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
if (isOnline) {
// Online dinky task, and only one job is allowed to execute
result = onlineTask(address, taskId);
} else {
// Submit dinky task
result = submitTask(address, taskId);
}
if (checkResult(result)) {
boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResult(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
}
}
/**
* map dinky task status to exitStatusCode
*
* @param status dinky job status
* @return exitStatusCode
*/
private int mapStatusToExitCode(boolean status) {
if (status) {
return TaskConstants.EXIT_CODE_SUCCESS;
} else {
return TaskConstants.EXIT_CODE_FAILURE;
}
}
private boolean checkResult(JsonNode result) {
if (result instanceof MissingNode || result == null) {
errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
return false;
} else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) {
errorHandle(result.get("msg"));
return false;
}
return true;
}
private void errorHandle(Object msg) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("dinky task submit failed with error: {}", msg);
}
@Override
public AbstractParameters getParameters() {
return dinkyParameters;
}
@Override
public void cancelApplication(boolean status) throws Exception {
super.cancelApplication(status);
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}",
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
cancelTask(address, taskId);
logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}",
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
}
private JsonNode submitTask(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params));
}
private JsonNode onlineTask(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params));
}
private JsonNode cancelTask(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId);
params.put(DinkyTaskConstants.PARAM_SAVEPOINT_TYPE, DinkyTaskConstants.SAVEPOINT_CANCEL);
return parse(sendJsonStr(address + DinkyTaskConstants.SAVEPOINT_TASK, JSONUtils.toJsonString(params)));
}
private JsonNode getJobInstanceInfo(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_JOB_INSTANCE_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.GET_JOB_INFO, params));
}
private JsonNode parse(String res) {
ObjectMapper mapper = new ObjectMapper();
JsonNode result = null;
try {
result = mapper.readTree(res);
} catch (JsonProcessingException e) {
logger.error("dinky task submit failed with error", e);
}
return result;
}
private String doGet(String url, Map<String, String> params) {
String result = "";
HttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = null;
try {
URIBuilder uriBuilder = new URIBuilder(url);
if (null != params && !params.isEmpty()) {
for (Map.Entry<String, String> entry : params.entrySet()) {
uriBuilder.addParameter(entry.getKey(), entry.getValue());
}
}
URI uri = uriBuilder.build();
httpGet = new HttpGet(uri);
logger.info("access url: {}", uri);
HttpResponse response = httpClient.execute(httpGet);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
result = EntityUtils.toString(response.getEntity());
logger.info("dinky task succeed with results: {}", result);
} else {
logger.error("dinky task terminated,response: {}", response);
}
} catch (IllegalArgumentException ie) {
logger.error("dinky task terminated: {}", ie.getMessage());
} catch (Exception e) {
logger.error("dinky task terminated: ", e);
} finally {
if (null != httpGet) {
httpGet.releaseConnection();
}
}
return result;
}
private String sendJsonStr(String url, String params) {
String result = "";
HttpClient httpClient = HttpClientBuilder.create().build();
HttpPost httpPost = new HttpPost(url);
try {
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
if (StringUtils.isNotBlank(params)) {
httpPost.setEntity(new StringEntity(params, StandardCharsets.UTF_8));
}
HttpResponse response = httpClient.execute(httpPost);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
result = EntityUtils.toString(response.getEntity());
logger.info("dinky task succeed with results: {}", result);
} else {
logger.error("dinky task terminated,response: {}", response);
}
} catch (IllegalArgumentException ie) {
logger.error("dinky task terminated: {}", ie.getMessage());
} catch (Exception he) {
logger.error("dinky task terminated: ", he);
}
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class DinkyTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
// nothing to do
}
@Override
public AbstractTask createTask(TaskExecutionContext taskRequest) {
return new DinkyTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), DinkyParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.ArrayList;
import java.util.List;
@AutoService(TaskChannelFactory.class)
public class DinkyTaskChannelFactory implements TaskChannelFactory {
@Override
public String getName() {
return "DINKY";
}
@Override
public List<PluginParams> getParams() {
return new ArrayList<>();
}
@Override
public TaskChannel create() {
return new DinkyTaskChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
/**
* Custom DinkyTaskConstants
*/
public class DinkyTaskConstants {
private DinkyTaskConstants() {
throw new IllegalStateException("Utility class");
}
private static final String API_ROUTE = "/openapi/";
public static final String SUBMIT_TASK = API_ROUTE + "submitTask";
public static final String ONLINE_TASK = API_ROUTE + "onLineTask";
public static final String SAVEPOINT_TASK = API_ROUTE + "savepointTask";
public static final String GET_JOB_INFO = API_ROUTE + "getJobInstance";
public static final int API_ERROR = 1;
public static final String API_VERSION_ERROR_TIPS = "Please check that the dinky version is greater than or equal to 0.6.5";
public static final String API_RESULT_DATAS = "datas";
public static final String SAVEPOINT_CANCEL = "cancel";
public static final String PARAM_TASK_ID = "id";
public static final String PARAM_JSON_TASK_ID = "taskId";
public static final String PARAM_SAVEPOINT_TYPE = "type";
public static final String PARAM_JOB_INSTANCE_ID = "id";
public static final String STATUS_FINISHED = "FINISHED";
public static final String STATUS_CANCELED = "CANCELED";
public static final String STATUS_FAILED = "FAILED";
public static final String STATUS_UNKNOWN = "UNKNOWN";
public static final long SLEEP_MILLIS = 3000;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.dinky;
/**
* Custom DinkyTaskException
*/
public class DinkyTaskException extends RuntimeException {
public DinkyTaskException() {
super();
}
public DinkyTaskException(String message) {
super(message);
}
public DinkyTaskException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -56,5 +56,6 @@
<module>dolphinscheduler-task-mlflow</module>
<module>dolphinscheduler-task-openmldb</module>
<module>dolphinscheduler-task-dvc</module>
<module>dolphinscheduler-task-dinky</module>
</modules>
</project>
\ No newline at end of file
......@@ -706,6 +706,11 @@ export default {
'Please select column, only single column is supported',
please_enter_threshold_number_is_needed:
'Please enter threshold number is needed',
please_enter_comparison_title: 'please select comparison title'
please_enter_comparison_title: 'please select comparison title',
dinky_address: 'Dinky address',
dinky_address_tips: 'Please enter the url of your dinky',
dinky_task_id: 'Dinky task id',
dinky_task_id_tips: 'Please enter the task id of your dinky',
dinky_online: 'Online task'
}
}
......@@ -692,6 +692,11 @@ export default {
please_enter_filter_expression: '请输入源表过滤条件',
please_enter_column_only_single_column_is_supported: '请选择源表检测列',
please_enter_threshold_number_is_needed: '请输入阈值',
please_enter_comparison_title: '请选择期望值类型'
please_enter_comparison_title: '请选择期望值类型',
dinky_address: 'dinky 地址',
dinky_address_tips: '请输入 Dinky 地址',
dinky_task_id: 'dinky 作业ID',
dinky_task_id_tips: '请输入作业 ID',
dinky_online: '是否上线作业'
}
}
......@@ -71,3 +71,4 @@ export { useMlflowProjects } from './use-mlflow-projects'
export { useMlflowModels } from './use-mlflow-models'
export { useOpenmldb } from './use-openmldb'
export { useDvc } from './use-dvc'
export { useDinky } from './use-dinky'
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import { useCustomParams } from '.'
import type { IJsonItem } from '../types'
export function useDinky(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
return [
{
type: 'input',
field: 'address',
name: t('project.node.dinky_address'),
props: {
placeholder: t('project.node.dinky_address_tips')
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(_validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dinky_address_tips'))
}
}
}
},
{
type: 'input',
field: 'taskId',
name: t('project.node.dinky_task_id'),
props: {
placeholder: t('project.node.dinky_task_id_tips')
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(_validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dinky_task_id_tips'))
}
}
}
},
{
type: 'switch',
field: 'online',
name: t('project.node.dinky_online')
},
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}
......@@ -360,7 +360,6 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'DVC') {
taskParams.dvcTaskType = data.dvcTaskType
taskParams.dvcRepository = data.dvcRepository
taskParams.dvcVersion = data.dvcVersion
......@@ -370,6 +369,12 @@ export function formatParams(data: INodeData): {
taskParams.dvcStoreUrl = data.dvcStoreUrl
}
if (data.taskType === 'DINKY') {
taskParams.address = data.address
taskParams.taskId = data.taskId
taskParams.online = data.online
}
if (data.taskType === 'OPENMLDB') {
taskParams.zk = data.zk
taskParams.zkPath = data.zkPath
......
......@@ -39,6 +39,7 @@ import { useJupyter } from './use-jupyter'
import { useMlflow } from './use-mlflow'
import { useOpenmldb } from './use-openmldb'
import { useDvc } from './use-dvc'
import { useDinky } from './use-dinky'
export default {
SHELL: useShell,
......@@ -64,5 +65,6 @@ export default {
JUPYTER: useJupyter,
MLFLOW: useMlflow,
OPENMLDB: useOpenmldb,
DVC: useDvc
DVC: useDvc,
DINKY: useDinky
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData, ITaskData } from '../types'
export function useDinky({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'DINKY',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN']
} as INodeData)
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName
})
]
}
return {
json: [
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useDinky(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}
......@@ -348,6 +348,9 @@ interface ITaskParams {
dvcMessage?: string
dvcLoadSaveDataPath?: string
dvcStoreUrl?: string
address?: string
taskId?: string
online?: boolean
}
interface INodeData
......
......@@ -39,6 +39,7 @@ export type TaskType =
| 'MLFLOW'
| 'OPENMLDB'
| 'DVC'
| 'DINKY'
export const TASK_TYPES_MAP = {
SHELL: {
......@@ -123,5 +124,9 @@ export const TASK_TYPES_MAP = {
DVC: {
alias: 'DVC',
helperLinkDisable: true
},
DINKY: {
alias: 'DINKY',
helperLinkDisable: true
}
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
......@@ -173,6 +173,9 @@ $bgLight: #ffffff;
&.icon-dvc {
background-image: url('/images/task-icons/dvc.png');
}
&.icon-dinky {
background-image: url('/images/task-icons/dinky.png');
}
}
&:hover {
......@@ -249,6 +252,9 @@ $bgLight: #ffffff;
&.icon-dvc {
background-image: url('/images/task-icons/dvc_hover.png');
}
&.icon-dinky {
background-image: url('/images/task-icons/dinky_hover.png');
}
}
}
}
......
......@@ -133,6 +133,11 @@ export default defineComponent({
color: '#8c8c8f',
image: `${import.meta.env.BASE_URL}images/task-icons/seatunnel.png`
},
{
taskType: 'DINKY',
color: '#d69f5b',
image: `${import.meta.env.BASE_URL}images/task-icons/dinky.png`
},
{ taskType: 'DAG', color: '#bbdde9' }
])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册