未验证 提交 bd1be6cf 编写于 作者: K Kerwin 提交者: GitHub

[Task] Added Apache SeaTunnel 2.x task node (#10480)

上级 b4eee7ed
......@@ -141,6 +141,10 @@ export default {
title: 'Switch',
link: '/en-us/docs/dev/user_doc/guide/task/switch.html',
},
{
title: 'SeaTunnel',
link: '/en-us/docs/dev/user_doc/guide/task/seatunnel.html',
},
{
title: 'Amazon EMR',
link: '/en-us/docs/dev/user_doc/guide/task/emr.html',
......@@ -529,6 +533,10 @@ export default {
title: 'Switch',
link: '/zh-cn/docs/dev/user_doc/guide/task/switch.html',
},
{
title: 'SeaTunnel',
link: '/zh-cn/docs/dev/user_doc/guide/task/seatunnel.html',
},
{
title: 'Amazon EMR',
link: '/zh-cn/docs/dev/user_doc/guide/task/emr.html',
......
# Apache SeaTunnel
## Overview
`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` or `start-seatunnel-flink.sh` command.
Click [here](https://seatunnel.apache.org/) for more information about `Apache SeaTunnel`.
## Create Task
- Click Project Management -> Project Name -> Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
- Drag the <img src="../../../../img/tasks/icons/seatunnel.png" width="15"/> from the toolbar to the drawing board.
## Task Parameter
- Node name: The node name in a workflow definition is unique.
- Run flag: Identifies whether this node can be scheduled normally, if it does not need to be executed, you can turn on the prohibition switch.
- Descriptive information: describe the function of the node.
- Task priority: When the number of worker threads is insufficient, they are executed in order from high to low, and when the priority is the same, they are executed according to the first-in first-out principle.
- Worker grouping: Tasks are assigned to the machines of the worker group to execute. If Default is selected, a worker machine will be randomly selected for execution.
- Environment Name: Configure the environment name in which to run the script.
- Number of failed retry attempts: The number of times the task failed to be resubmitted.
- Failed retry interval: The time, in cents, interval for resubmitting the task after a failed task.
- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Delayed execution time: The time, in cents, that a task is delayed in execution.
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail.
- Engine: Supports FLINK and SPARK
- FLINK
- Run model: supports `run` and `run-application` modes
- Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel`
- SPARK
- Deployment mode: specify the deployment mode, `cluster` `client` `local`
- Master: Specify the `Master` model, `yarn` `local` `spark` `mesos`, where `spark` and `mesos` need to specify the `Master` service address, for example: 127.0.0.1:7077
> Click [here](https://seatunnel.apache.org/docs/2.1.2/command/usage) for more information on the usage of `Apache SeaTunnel command`
- Custom Configuration: Supports custom configuration or select configuration file from Resource Center
> Click [here](https://seatunnel.apache.org/docs/2.1.2/concept/config) for more information about `Apache SeaTunnel config` file
- Script: Customize configuration information on the task node, including four parts: `env` `source` `transform` `sink`
- Resource file: The configuration file of the resource center can be referenced in the task node, and only one configuration file can be referenced.
- Predecessor task: Selecting a predecessor task for the current task will set the selected predecessor task as upstream of the current task.
## Task Example
This sample demonstrates using the Flink engine to read data from a Fake source and print to the console.
### Configuring the SeaTunnel environment in DolphinScheduler
If you want to use the SeaTunnel task type in the production environment, you need to configure the required environment first. The configuration file is as follows: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
![seatunnel_task01](../../../../img/tasks/demo/seatunnel_task01.png)
### Configuring SeaTunnel Task Node
According to the above parameter description, configure the required content.
![seatunnel_task02](../../../../img/tasks/demo/seatunnel_task02.png)
### Config example
```Config
env {
execution.parallelism = 1
}
source {
FakeSource {
result_table_name = "fake"
field_name = "name,age"
}
}
transform {
sql {
sql = "select name,age from fake"
}
}
sink {
ConsoleSink {}
}
```
# Apache SeaTunnel
## 综述
`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh``start-seatunnel-flink.sh` 命令解析 config 文件。
点击 [这里](https://seatunnel.apache.org/) 获取更多关于 `Apache SeaTunnel` 的信息。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的<img src="../../../../img/tasks/icons/seatunnel.png" width="15"/> 任务节点到画板中。
## 任务参数
- 节点名称:设置任务节点的名称。一个工作流定义中的节点名称是唯一的。
- 运行标志:标识这个结点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
- 描述:描述该节点的功能。
- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
- Worker 分组:任务分配给 worker 组的机器执行,选择 Default ,会随机选择一台 worker 机执行。
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 延时执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 引擎:支持 FLINK 和 SPARK
- FLINK
- 运行模型:支持 `run``run-application` 两种模式
- 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel`
- SPARK
- 部署方式:指定部署模式,`cluster` `client` `local`
- Master:指定 `Master` 模型,`yarn` `local` `spark` `mesos`,其中 `spark``mesos` 需要指定 `Master` 服务地址,例如:127.0.0.1:7077
> 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
- 自定义配置:支持自定义配置或从资源中心选择配置文件
> 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/concept/config) 获取更多关于`Apache SeaTunnel config` 文件介绍
- 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
- 资源文件:在任务节点引用资源中心的配置文件,只可以引用一个配置文件。
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
## 任务样例
该样例演示为使用 Flink 引擎从 Fake 源读取数据打印到控制台。
### 在 DolphinScheduler 中配置 SeaTunnel 环境
若生产环境中要是使用到 SeaTunnel 任务类型,则需要先配置好所需的环境,配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`
![seatunnel_task01](../../../../img/tasks/demo/seatunnel_task01.png)
### 配置 SeaTunnel 任务节点
根据上述参数说明,配置所需的内容即可。
![seatunnel_task02](../../../../img/tasks/demo/seatunnel_task02.png)
### Config 样例
```Config
env {
execution.parallelism = 1
}
source {
FakeSource {
result_table_name = "fake"
field_name = "name,age"
}
}
transform {
sql {
sql = "select name,age from fake"
}
}
sink {
ConsoleSink {}
}
```
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
public class Constants {
private Constants() {
throw new IllegalStateException("Utility class");
}
public static final String CONFIG_OPTIONS = "--config";
public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode";
public static final String MASTER_OPTIONS = "--master";
public static final String QUEUE_OPTIONS = "--queue";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
public enum DeployModeEnum {
cluster("cluster"),
client("client"),
local("client");
private String command;
DeployModeEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
public enum EngineEnum {
FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
private String command;
EngineEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}
......@@ -19,14 +19,30 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkParameters;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "engine")
@JsonSubTypes({
@JsonSubTypes.Type(value = SeatunnelFlinkParameters.class, name = "FLINK"),
@JsonSubTypes.Type(value = SeatunnelSparkParameters.class, name = "SPARK")
})
public class SeatunnelParameters extends AbstractParameters {
/**
* shell script
*/
private EngineEnum engine;
private Boolean useCustom;
private String rawScript;
/**
......@@ -34,6 +50,22 @@ public class SeatunnelParameters extends AbstractParameters {
*/
private List<ResourceInfo> resourceList;
public EngineEnum getEngine() {
return engine;
}
public void setEngine(EngineEnum engine) {
this.engine = engine;
}
public Boolean getUseCustom() {
return useCustom;
}
public void setUseCustom(Boolean useCustom) {
this.useCustom = useCustom;
}
public String getRawScript() {
return rawScript;
}
......@@ -52,7 +84,9 @@ public class SeatunnelParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
return rawScript != null && !rawScript.isEmpty();
return Objects.nonNull(engine)
&& ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
|| (BooleanUtils.isFalse(useCustom) && CollectionUtils.isNotEmpty(resourceList) && resourceList.size() == 1));
}
@Override
......
......@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
......@@ -28,21 +28,20 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* seatunnel task
......@@ -62,7 +61,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
protected final TaskExecutionContext taskExecutionContext;
/**
* constructor
......@@ -80,12 +79,9 @@ public class SeatunnelTask extends AbstractTaskExecutor {
@Override
public void init() {
logger.info("seatunnel task params {}", taskExecutionContext.getTaskParams());
seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelParameters.class);
logger.info("SeaTunnel task params {}", taskExecutionContext.getTaskParams());
if (!seatunnelParameters.checkParameters()) {
throw new RuntimeException("seatunnel task params is not valid");
throw new RuntimeException("SeaTunnel task params is not valid");
}
}
......@@ -100,7 +96,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("seatunnel task error", e);
logger.error("SeaTunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
......@@ -112,43 +108,61 @@ public class SeatunnelTask extends AbstractTaskExecutor {
shellCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
Path path = new File(fileName).toPath();
List<String> args = new ArrayList<>();
args.add(seatunnelParameters.getEngine().getCommand());
args.addAll(buildOptions());
String command = String.join(" ", args);
logger.info("SeaTunnel Flink task command: {}", command);
if (Files.exists(path)) {
return fileName;
return command;
}
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
seatunnelParameters.setRawScript(script);
protected List<String> buildOptions() throws Exception {
List<String> args = new ArrayList<>();
if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
args.add(CONFIG_OPTIONS);
args.add(buildCustomConfigCommand());
} else {
seatunnelParameters.getResourceList().forEach(resourceInfo -> {
args.add(CONFIG_OPTIONS);
// TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized
args.add(resourceInfo.getResourceName().substring(1));
});
}
return args;
}
logger.info("raw script : {}", seatunnelParameters.getRawScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
protected String buildCustomConfigCommand() throws Exception {
String config = buildCustomConfigContent();
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(config, filePath);
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
return filePath;
}
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
private String buildCustomConfigContent() {
logger.info("raw custom config content : {}", seatunnelParameters.getRawScript());
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
return script;
}
private String buildConfigFilePath() {
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
}
Files.write(path, seatunnelParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
if (!Files.exists(Paths.get(scriptFile))) {
logger.info("generate script file:{}", scriptFile);
return fileName;
// write data to file
FileUtils.writeStringToFile(new File(scriptFile), script, StandardCharsets.UTF_8);
}
}
@Override
......@@ -161,4 +175,8 @@ public class SeatunnelTask extends AbstractTaskExecutor {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
public void setSeatunnelParameters(SeatunnelParameters seatunnelParameters) {
this.seatunnelParameters = seatunnelParameters;
}
}
......@@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkTask;
import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkTask;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class SeatunnelTaskChannel implements TaskChannel {
......@@ -33,7 +35,13 @@ public class SeatunnelTaskChannel implements TaskChannel {
@Override
public SeatunnelTask createTask(TaskExecutionContext taskRequest) {
return new SeatunnelTask(taskRequest);
SeatunnelParameters seatunnelParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class);
if (EngineEnum.FLINK == seatunnelParameters.getEngine()) {
return new SeatunnelFlinkTask(taskRequest);
} else if (EngineEnum.SPARK == seatunnelParameters.getEngine()) {
return new SeatunnelSparkTask(taskRequest);
}
throw new IllegalArgumentException("Unsupported engine type:" + seatunnelParameters.getEngine());
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel.flink;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
public class SeatunnelFlinkParameters extends SeatunnelParameters {
private RunModeEnum runMode;
private String others;
public static enum RunModeEnum {
RUN("--run-mode run"),
RUN_APPLICATION("--run-mode run-application");
private String command;
RunModeEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}
public RunModeEnum getRunMode() {
return runMode;
}
public void setRunMode(RunModeEnum runMode) {
this.runMode = runMode;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Objects;
public class SeatunnelFlinkTask extends SeatunnelTask {
private SeatunnelFlinkParameters seatunnelParameters;
public SeatunnelFlinkTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
}
@Override
public void init() {
seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelFlinkParameters.class);
setSeatunnelParameters(seatunnelParameters);
super.init();
}
@Override
public List<String> buildOptions() throws Exception {
List<String> args = super.buildOptions();
args.add(Objects.isNull(seatunnelParameters.getRunMode()) ? SeatunnelFlinkParameters.RunModeEnum.RUN.getCommand() : seatunnelParameters.getRunMode().getCommand());
if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) {
args.add(seatunnelParameters.getOthers());
}
return args;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel.spark;
import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
public class SeatunnelSparkParameters extends SeatunnelParameters {
private DeployModeEnum deployMode;
private MasterTypeEnum master;
private String masterUrl;
private String queue;
@Override
public boolean checkParameters() {
return super.checkParameters()
&& Objects.nonNull(deployMode)
&& (DeployModeEnum.local != deployMode && Objects.nonNull(master))
&& (DeployModeEnum.local != deployMode && (MasterTypeEnum.SPARK == master || MasterTypeEnum.MESOS == master) && StringUtils.isNotBlank(masterUrl));
}
public static enum MasterTypeEnum {
YARN("yarn"),
LOCAL("local"),
SPARK("spark://"),
MESOS("mesos://");
private String command;
MasterTypeEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}
public DeployModeEnum getDeployMode() {
return deployMode;
}
public void setDeployMode(DeployModeEnum deployMode) {
this.deployMode = deployMode;
}
public MasterTypeEnum getMaster() {
return master;
}
public void setMaster(MasterTypeEnum master) {
this.master = master;
}
public String getMasterUrl() {
return masterUrl;
}
public void setMasterUrl(String masterUrl) {
this.masterUrl = masterUrl;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel.spark;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.DEPLOY_MODE_OPTIONS;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.MASTER_OPTIONS;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.QUEUE_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask;
import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkParameters.MasterTypeEnum;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
public class SeatunnelSparkTask extends SeatunnelTask {
private SeatunnelSparkParameters seatunnelParameters;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public SeatunnelSparkTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
}
@Override
public void init() {
seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelSparkParameters.class);
setSeatunnelParameters(seatunnelParameters);
super.init();
}
@Override
public List<String> buildOptions() throws Exception {
List<String> args = super.buildOptions();
args.add(DEPLOY_MODE_OPTIONS);
args.add(seatunnelParameters.getDeployMode().getCommand());
MasterTypeEnum master = DeployModeEnum.local == seatunnelParameters.getDeployMode() ? MasterTypeEnum.LOCAL : seatunnelParameters.getMaster();
args.add(MASTER_OPTIONS);
args.add(master.getCommand());
if (MasterTypeEnum.SPARK.equals(master) || MasterTypeEnum.MESOS.equals(master)) {
args.add(seatunnelParameters.getMasterUrl());
}
if (StringUtils.isNotBlank(seatunnelParameters.getQueue())) {
args.add(QUEUE_OPTIONS);
args.add(seatunnelParameters.getQueue());
}
return args;
}
}
......@@ -352,6 +352,7 @@ export default {
init_script_tips: 'Please enter initialization script',
resources: 'Resources',
resources_tips: 'Please select resources',
resources_limit_tips: 'Please select again, resource limit:',
non_resources_tips: 'Please delete all non-existent resources',
useless_resources_tips: 'Unauthorized or deleted resources',
custom_parameters: 'Custom Parameters',
......@@ -707,6 +708,10 @@ export default {
please_enter_threshold_number_is_needed:
'Please enter threshold number is needed',
please_enter_comparison_title: 'please select comparison title',
custom_config: 'Custom Config',
engine: 'engine',
engine_tips: 'Please select engine',
run_mode: 'Run Mode',
dinky_address: 'Dinky address',
dinky_address_tips: 'Please enter the url of your dinky',
dinky_task_id: 'Dinky task id',
......
......@@ -348,6 +348,7 @@ export default {
init_script_tips: '请输入初始化脚本',
resources: '资源',
resources_tips: '请选择资源',
resources_limit_tips: '请重新选择,资源个数限制:',
no_resources_tips: '请删除所有未授权或已删除资源',
useless_resources_tips: '未授权或已删除资源',
custom_parameters: '自定义参数',
......@@ -693,6 +694,10 @@ export default {
please_enter_column_only_single_column_is_supported: '请选择源表检测列',
please_enter_threshold_number_is_needed: '请输入阈值',
please_enter_comparison_title: '请选择期望值类型',
custom_config: '自定义配置',
engine: '引擎',
engine_tips: '请选择引擎',
run_mode: '运行模式',
dinky_address: 'dinky 地址',
dinky_address_tips: '请输入 Dinky 地址',
dinky_task_id: 'dinky 作业ID',
......
......@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ref, watchEffect } from 'vue'
import { Ref, ref, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n'
import type { IJsonItem, IOption } from '../types'
export function useDeployMode(
span = 24,
span: number | Ref<number> = 24,
showClient = ref(true),
showCluster = ref(true)
): IJsonItem {
......@@ -44,7 +44,7 @@ export function useDeployMode(
field: 'deployMode',
name: t('project.node.deploy_mode'),
options: deployModeOptions,
span
span: span
}
}
......
......@@ -15,14 +15,18 @@
* limitations under the License.
*/
import { ref, onMounted } from 'vue'
import { ref, onMounted, Ref } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceList } from '@/service/modules/resources'
import { useTaskNodeStore } from '@/store/project/task-node'
import utils from '@/utils'
import type { IJsonItem, IResource } from '../types'
export function useResources(): IJsonItem {
export function useResources(
span: number | Ref<number> = 24,
required = false,
limit: number | Ref<number> = -1
): IJsonItem {
const { t } = useI18n()
const resourcesOptions = ref([] as IResource[])
......@@ -52,6 +56,7 @@ export function useResources(): IJsonItem {
type: 'tree-select',
field: 'resourceList',
name: t('project.node.resources'),
span: span,
options: resourcesOptions,
props: {
multiple: true,
......@@ -63,6 +68,21 @@ export function useResources(): IJsonItem {
keyField: 'id',
labelField: 'name',
loading: resourcesLoading
},
validate: {
trigger: ['input', 'blur'],
required: required,
validator(validate: any, value: IResource[]) {
if (required) {
if (!value) {
return new Error(t('project.node.resources_tips'))
}
if (limit > 0 && value.length > limit) {
return new Error(t('project.node.resources_limit_tips') + limit)
}
}
}
}
}
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { watch, computed } from 'vue'
import { computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { useDeployMode, useResources, useCustomParams } from '.'
import type { IJsonItem } from '../types'
......@@ -22,80 +22,64 @@ import type { IJsonItem } from '../types'
export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const masterTypeOptions = [
{
label: 'yarn',
value: 'yarn'
},
{
label: 'local',
value: 'local'
},
{
label: 'spark://',
value: 'spark://'
},
{
label: 'mesos://',
value: 'mesos://'
}
]
const queueOptions = [
{
label: 'default',
value: 'default'
}
]
const masterSpan = computed(() => (model.deployMode === 'local' ? 0 : 12))
const queueSpan = computed(() =>
model.deployMode === 'local' || model.master != 'yarn' ? 0 : 12
const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24))
const flinkSpan = computed(() => (model.engine === 'FLINK' ? 24 : 0))
const deployModeSpan = computed(() => (model.engine === 'SPARK' ? 24 : 0))
const masterSpan = computed(() =>
model.engine === 'SPARK' && model.deployMode !== 'local' ? 12 : 0
)
const masterUrlSpan = computed(() =>
model.deployMode === 'local' ||
(model.master != 'spark://' && model.master != 'mesos://')
? 0
: 12
model.engine === 'SPARK' &&
model.deployMode !== 'local' &&
(model.master === 'SPARK' || model.master === 'MESOS')
? 12
: 0
)
const queueSpan = computed(() =>
model.engine === 'SPARK' &&
model.deployMode != 'local' &&
model.master === 'YARN'
? 24
: 0
)
const baseScript = 'sh ${WATERDROP_HOME}/bin/start-waterdrop.sh'
const parseRawScript = () => {
if (model.rawScript) {
model.rawScript.split('\n').forEach((script: string) => {
const params = script.replace(baseScript, '').split('--')
params?.forEach((param: string) => {
const pair = param.split(' ')
if (pair && pair.length >= 2) {
if (pair[0] === 'master') {
const prefix = pair[1].substring(0, 8)
if (pair[1] && (prefix === 'mesos://' || prefix === 'spark://')) {
model.master = prefix
model.masterUrl = pair[1].substring(8, pair[1].length)
} else {
model.master = pair[1]
}
} else if (pair[0] === 'deploy-mode') {
model.deployMode = pair[1]
} else if (pair[0] === 'queue') {
model.queue = pair[1]
}
}
})
})
}
return [
{
type: 'select',
field: 'engine',
span: 12,
name: t('project.node.engine'),
options: ENGINE,
validate: {
trigger: ['input', 'blur'],
required: true,
message: t('project.node.engine_tips')
}
},
watch(
() => model.rawScript,
() => {
parseRawScript()
// SeaTunnel flink parameter
{
type: 'select',
field: 'runMode',
name: t('project.node.run_mode'),
options: FLINK_RUN_MODE,
value: model.runMode,
span: flinkSpan
},
{
type: 'input',
field: 'others',
name: t('project.node.option_parameters'),
span: flinkSpan,
props: {
type: 'textarea',
placeholder: t('project.node.option_parameters_tips')
}
)
},
return [
useDeployMode(),
// SeaTunnel spark parameter
useDeployMode(deployModeSpan),
{
type: 'select',
field: 'master',
......@@ -112,17 +96,88 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
span: masterUrlSpan,
props: {
placeholder: t('project.node.sea_tunnel_master_url_tips')
},
validate: {
trigger: ['input', 'blur'],
required: masterUrlSpan.value !== 0,
validator(validate: any, value: string) {
if (masterUrlSpan.value !== 0 && !value) {
return new Error(t('project.node.sea_tunnel_master_url_tips'))
}
}
}
},
{
type: 'select',
type: 'input',
field: 'queue',
name: t('project.node.sea_tunnel_queue'),
options: queueOptions,
value: model.queue,
span: queueSpan
},
useResources(),
// SeaTunnel config parameter
{
type: 'switch',
field: 'useCustom',
name: t('project.node.custom_config')
},
{
type: 'editor',
field: 'rawScript',
name: t('project.node.script'),
span: configEditorSpan,
validate: {
trigger: ['input', 'trigger'],
required: model.useCustom,
validator(validate: any, value: string) {
if (model.useCustom && !value) {
return new Error(t('project.node.script_tips'))
}
}
}
},
useResources(resourceEditorSpan, true, 1),
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
export const ENGINE = [
{
label: 'SPARK',
value: 'SPARK'
},
{
label: 'FLINK',
value: 'FLINK'
}
]
export const FLINK_RUN_MODE = [
{
label: 'run',
value: 'RUN'
},
{
label: 'run-application',
value: 'RUN_APPLICATION'
}
]
export const masterTypeOptions = [
{
label: 'yarn',
value: 'YARN'
},
{
label: 'local',
value: 'LOCAL'
},
{
label: 'spark://',
value: 'SPARK'
},
{
label: 'mesos://',
value: 'MESOS'
}
]
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
import { find, omit, cloneDeep } from 'lodash'
import { omit, cloneDeep } from 'lodash'
import type {
INodeData,
ITaskData,
......@@ -199,12 +199,23 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'SEATUNNEL') {
if (data.deployMode === 'local') {
data.master = 'local'
data.masterUrl = ''
data.deployMode = 'client'
taskParams.engine = data.engine
taskParams.useCustom = data.useCustom
taskParams.rawScript = data.rawScript
switch (data.engine) {
case 'FLINK':
taskParams.runMode = data.runMode
taskParams.others = data.others
break
case 'SPARK':
taskParams.deployMode = data.deployMode
taskParams.master = data.master
taskParams.masterUrl = data.masterUrl
taskParams.queue = data.queue
break
default:
break
}
buildRawScript(data)
}
if (data.taskType === 'SWITCH') {
......@@ -623,49 +634,3 @@ export function formatModel(data: ITaskData) {
}
return params
}
const buildRawScript = (model: INodeData) => {
const baseScript = 'sh ${WATERDROP_HOME}/bin/start-waterdrop.sh'
if (!model.resourceList) return
let master = model.master
let masterUrl = model?.masterUrl ? model?.masterUrl : ''
let deployMode = model.deployMode
const queue = model.queue
if (model.deployMode === 'local') {
master = 'local'
masterUrl = ''
deployMode = 'client'
}
if (master === 'yarn' || master === 'local') {
masterUrl = ''
}
let localParams = ''
model?.localParams?.forEach((param: any) => {
localParams = localParams + ' --variable ' + param.prop + '=' + param.value
})
let rawScript = ''
model.resourceList?.forEach((id: number) => {
const item = find(model.resourceFiles, { id: id })
rawScript =
rawScript +
baseScript +
' --master ' +
master +
masterUrl +
' --deploy-mode ' +
deployMode +
' --queue ' +
queue
if (item && item.fullName) {
rawScript = rawScript + ' --config ' + item.fullName
}
rawScript = rawScript + localParams + ' \n'
})
model.rawScript = rawScript ? rawScript : ''
}
......@@ -46,12 +46,36 @@ export function useSeaTunnel({
memoryMax: -1,
delayTime: 0,
timeout: 30,
engine: 'FLINK',
runMode: 'RUN',
useCustom: true,
deployMode: 'client',
queue: 'default',
master: 'yarn',
master: 'YARN',
masterUrl: '',
resourceFiles: [],
timeoutNotifyStrategy: ['WARN']
timeoutNotifyStrategy: ['WARN'],
rawScript:
'env {\n' +
' execution.parallelism = 1\n' +
'}\n' +
'\n' +
'source {\n' +
' FakeSourceStream {\n' +
' result_table_name = "fake"\n' +
' field_name = "name,age"\n' +
' }\n' +
'}\n' +
'\n' +
'transform {\n' +
' sql {\n' +
' sql = "select name,age from fake"\n' +
' }\n' +
'}\n' +
'\n' +
'sink {\n' +
' ConsoleSink {}\n' +
'}'
} as INodeData)
let extra: IJsonItem[] = []
......
......@@ -272,6 +272,7 @@ interface ITaskParams {
sourceParams?: string
queue?: string
master?: string
masterUrl?: string
switchResult?: ISwitchResult
dependTaskList?: IDependTask[]
nextNode?: number
......@@ -341,6 +342,8 @@ interface ITaskParams {
zk?: string
zkPath?: string
executeMode?: string
useCustom?: boolean
runMode?: string
dvcTaskType?: string
dvcRepository?: string
dvcVersion?: string
......@@ -397,7 +400,6 @@ interface INodeData
timeoutSetting?: boolean
isCustomTask?: boolean
method?: string
masterUrl?: string
resourceFiles?: { id: number; fullName: string }[] | null
relation?: RelationType
definition?: object
......
......@@ -43,5 +43,6 @@ export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$PATH
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册