未验证 提交 79952d92 编写于 作者: K Kerwin 提交者: GitHub

[Feature][SPI] Task (#5996)

上级 1887bde1
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.api.enums.Status;
......@@ -30,7 +31,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* check utils
*/
......@@ -43,8 +43,7 @@ public class CheckUtils {
/**
* check username
*
* @param userName
* user name
* @param userName user name
* @return true if user name regex valid,otherwise return false
*/
public static boolean checkUserName(String userName) {
......@@ -165,10 +164,8 @@ public class CheckUtils {
/**
* regex check
*
* @param str
* input string
* @param pattern
* regex pattern
* @param str input string
* @param pattern regex pattern
* @return true if regex pattern is right, otherwise return false
*/
private static boolean regexChecks(String str, Pattern pattern) {
......
......@@ -27,7 +27,8 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum PluginType {
ALERT(1, "alert", true),
REGISTER(2, "registry", false);
REGISTER(2, "register", false),
TASK(3,"task",true);
PluginType(int code, String desc, boolean hasUi) {
this.code = code;
......
......@@ -77,6 +77,7 @@ public class ParamUtils {
scheduleTime);
if (globalParamsMap != null) {
params.putAll(globalParamsMap);
}
......@@ -107,6 +108,7 @@ public class ParamUtils {
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
......@@ -117,6 +119,7 @@ public class ParamUtils {
/**
* format convert
*
* @param paramsMap params map
* @return Map of converted
*/
......@@ -125,23 +128,24 @@ public class ParamUtils {
return null;
}
Map<String,String> map = new HashMap<>();
Map<String, String> map = new HashMap<>();
Iterator<Map.Entry<String, Property>> iter = paramsMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
map.put(en.getKey(),en.getValue().getValue());
map.put(en.getKey(), en.getValue().getValue());
}
return map;
}
/**
* get parameters map
*
* @param definedParams definedParams
* @return parameters map
*/
public static Map<String,Property> getUserDefParamsMap(Map<String,String> definedParams) {
public static Map<String, Property> getUserDefParamsMap(Map<String, String> definedParams) {
if (definedParams != null) {
Map<String,Property> userDefParamsMaps = new HashMap<>();
Map<String, Property> userDefParamsMaps = new HashMap<>();
Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> en = iter.next();
......
package org.apache.dolphinscheduler.server.worker.plugin;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkState;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.classloader.ThreadContextClassLoader;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.plugin.AbstractDolphinPluginManager;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TaskPluginManager extends AbstractDolphinPluginManager {
private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
private final Map<String, TaskChannelFactory> taskChannelFactoryMap = new ConcurrentHashMap<>();
private final Map<String, TaskChannel> taskChannelMap = new ConcurrentHashMap<>();
/**
* k->pluginDefineId v->pluginDefineName
*/
private final Map<Integer, String> pluginDefineMap = new HashMap<>();
private void addTaskChannelFactory(TaskChannelFactory taskChannelFactory) {
requireNonNull(taskChannelFactory, "taskChannelFactory is null");
if (taskChannelFactoryMap.putIfAbsent(taskChannelFactory.getName(), taskChannelFactory) != null) {
throw new IllegalArgumentException(format("Task Plugin '%s' is already registered", taskChannelFactory.getName()));
}
try {
loadTaskChannel(taskChannelFactory.getName());
} catch (Exception e) {
throw new IllegalArgumentException(format("Task Plugin '%s' is can not load .", taskChannelFactory.getName()));
}
}
private void loadTaskChannel(String name) {
requireNonNull(name, "name is null");
TaskChannelFactory taskChannelFactory = taskChannelFactoryMap.get(name);
checkState(taskChannelFactory != null, "Task Plugin {} is not registered", name);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(taskChannelFactory.getClass().getClassLoader())) {
TaskChannel taskChannel = taskChannelFactory.create();
this.taskChannelMap.put(name, taskChannel);
}
logger.info("-- Loaded Task Plugin {} --", name);
}
private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
@Override
public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) {
for (TaskChannelFactory taskChannelFactory : dolphinSchedulerPlugin.getTaskChannelFactorys()) {
logger.info("Registering Task Plugin '{}'", taskChannelFactory.getName());
this.addTaskChannelFactory(taskChannelFactory);
List<PluginParams> params = taskChannelFactory.getParams();
String nameEn = taskChannelFactory.getName();
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(nameEn, PluginType.TASK.getDesc(), paramsJson);
int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
pluginDefineMap.put(id, pluginDefine.getPluginName());
}
}
}
......@@ -118,14 +118,14 @@ public class PythonTask extends AbstractTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
}
catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
if (paramsMap != null) {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
......@@ -140,5 +140,5 @@ public class PythonTask extends AbstractTask {
public AbstractParameters getParameters() {
return pythonParameters;
}
}
......@@ -14,219 +14,228 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz.cron;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.day;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.hour;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.min;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.month;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.week;
import static com.cronutils.model.CronType.QUARTZ;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.*;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import static com.cronutils.model.CronType.QUARTZ;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.*;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
/**
* cron utils
*/
public class CronUtils {
private CronUtils() {
throw new IllegalStateException("CronUtils class");
}
private static final Logger logger = LoggerFactory.getLogger(CronUtils.class);
private static final CronParser QUARTZ_CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
/**
* parse to cron
* @param cronExpression cron expression, never null
* @return Cron instance, corresponding to cron expression received
*/
public static Cron parse2Cron(String cronExpression) {
return QUARTZ_CRON_PARSER.parse(cronExpression);
}
/**
* build a new CronExpression based on the string cronExpression
* @param cronExpression String representation of the cron expression the new object should represent
* @return CronExpression
* @throws ParseException if the string expression cannot be parsed into a valid
*/
public static CronExpression parse2CronExpression(String cronExpression) throws ParseException {
return new CronExpression(cronExpression);
}
/**
* get max cycle
* @param cron cron
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).getCycle();
}
/**
* get min cycle
* @param cron cron
* @return CycleEnum
*/
public static CycleEnum getMiniCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).getMiniCycle();
}
/**
* get max cycle
* @param crontab crontab
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(String crontab) {
return getMaxCycle(parse2Cron(crontab));
}
/**
* gets all scheduled times for a period of time based on not self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @return date list
*/
public static List<Date> getFireDateList(Date startTime, Date endTime, CronExpression cronExpression) {
List<Date> dateList = new ArrayList<>();
while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime)) {
break;
}
dateList.add(startTime);
private CronUtils() {
throw new IllegalStateException("CronUtils class");
}
return dateList;
}
/**
* gets expect scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @param fireTimes fireTimes
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression,int fireTimes) {
List<Date> dateList = new ArrayList<>();
while (fireTimes > 0) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime) || startTime.equals(endTime)) {
break;
}
dateList.add(startTime);
fireTimes--;
private static final Logger logger = LoggerFactory.getLogger(CronUtils.class);
private static final CronParser QUARTZ_CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
/**
* parse to cron
* @param cronExpression cron expression, never null
* @return Cron instance, corresponding to cron expression received
*/
public static Cron parse2Cron(String cronExpression) {
return QUARTZ_CRON_PARSER.parse(cronExpression);
}
return dateList;
}
/**
* gets all scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression) {
List<Date> dateList = new ArrayList<>();
while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime) || startTime.equals(endTime)) {
break;
}
dateList.add(startTime);
/**
* build a new CronExpression based on the string cronExpression
* @param cronExpression String representation of the cron expression the new object should represent
* @return CronExpression
* @throws ParseException if the string expression cannot be parsed into a valid
*/
public static CronExpression parse2CronExpression(String cronExpression) throws ParseException {
return new CronExpression(cronExpression);
}
return dateList;
}
/**
* gets all scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cron cron
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, String cron) {
CronExpression cronExpression = null;
try {
cronExpression = parse2CronExpression(cron);
}catch (ParseException e){
logger.error(e.getMessage(), e);
return Collections.emptyList();
/**
* get max cycle
* @param cron cron
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).getCycle();
}
return getSelfFireDateList(startTime, endTime, cronExpression);
}
/**
* get expiration time
* @param startTime startTime
* @param cycleEnum cycleEnum
* @return date
*/
public static Date getExpirationTime(Date startTime, CycleEnum cycleEnum) {
Date maxExpirationTime = null;
Date startTimeMax = null;
try {
startTimeMax = getEndTime(startTime);
Calendar calendar = Calendar.getInstance();
calendar.setTime(startTime);
switch (cycleEnum) {
case HOUR:
calendar.add(Calendar.HOUR, 1);
break;
case DAY:
calendar.add(Calendar.DATE, 1);
break;
case WEEK:
calendar.add(Calendar.DATE, 1);
break;
case MONTH:
calendar.add(Calendar.DATE, 1);
break;
default:
logger.error("Dependent process definition's cycleEnum is {},not support!!", cycleEnum);
break;
}
maxExpirationTime = calendar.getTime();
} catch (Exception e) {
logger.error(e.getMessage(),e);
/**
* get min cycle
* @param cron cron
* @return CycleEnum
*/
public static CycleEnum getMiniCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).getMiniCycle();
}
/**
* get max cycle
* @param crontab crontab
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(String crontab) {
return getMaxCycle(parse2Cron(crontab));
}
/**
* gets all scheduled times for a period of time based on not self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @return date list
*/
public static List<Date> getFireDateList(Date startTime, Date endTime, CronExpression cronExpression) {
List<Date> dateList = new ArrayList<>();
while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime)) {
break;
}
dateList.add(startTime);
}
return dateList;
}
/**
* gets expect scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @param fireTimes fireTimes
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression,int fireTimes) {
List<Date> dateList = new ArrayList<>();
while (fireTimes > 0) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime) || startTime.equals(endTime)) {
break;
}
dateList.add(startTime);
fireTimes--;
}
return dateList;
}
/**
* gets all scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cronExpression cronExpression
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression) {
List<Date> dateList = new ArrayList<>();
while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime.after(endTime) || startTime.equals(endTime)) {
break;
}
dateList.add(startTime);
}
return dateList;
}
/**
* gets all scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cron cron
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, String cron) {
CronExpression cronExpression = null;
try {
cronExpression = parse2CronExpression(cron);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
return Collections.emptyList();
}
return getSelfFireDateList(startTime, endTime, cronExpression);
}
/**
* get expiration time
* @param startTime startTime
* @param cycleEnum cycleEnum
* @return date
*/
public static Date getExpirationTime(Date startTime, CycleEnum cycleEnum) {
Date maxExpirationTime = null;
Date startTimeMax = null;
try {
startTimeMax = getEndTime(startTime);
Calendar calendar = Calendar.getInstance();
calendar.setTime(startTime);
switch (cycleEnum) {
case HOUR:
calendar.add(Calendar.HOUR, 1);
break;
case DAY:
calendar.add(Calendar.DATE, 1);
break;
case WEEK:
calendar.add(Calendar.DATE, 1);
break;
case MONTH:
calendar.add(Calendar.DATE, 1);
break;
default:
logger.error("Dependent process definition's cycleEnum is {},not support!!", cycleEnum);
break;
}
maxExpirationTime = calendar.getTime();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
return DateUtils.compare(startTimeMax, maxExpirationTime) ? maxExpirationTime : startTimeMax;
}
/**
* get the end time of the day by value of date
* @param date
* @return date
*/
private static Date getEndTime(Date date) {
Calendar end = new GregorianCalendar();
end.setTime(date);
end.set(Calendar.HOUR_OF_DAY,23);
end.set(Calendar.MINUTE,59);
end.set(Calendar.SECOND,59);
end.set(Calendar.MILLISECOND,999);
return end.getTime();
}
return DateUtils.compare(startTimeMax,maxExpirationTime)?maxExpirationTime:startTimeMax;
}
/**
* get the end time of the day by value of date
* @param date
* @return date
*/
private static Date getEndTime(Date date) {
Calendar end = new GregorianCalendar();
end.setTime(date);
end.set(Calendar.HOUR_OF_DAY,23);
end.set(Calendar.MINUTE,59);
end.set(Calendar.SECOND,59);
end.set(Calendar.MILLISECOND,999);
return end.getTime();
}
}
......@@ -35,12 +35,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
......
......@@ -21,6 +21,7 @@ import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.register.RegistryFactory;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
/**
* Dolphinscheduler plugin interface
......@@ -48,4 +49,13 @@ public interface DolphinSchedulerPlugin {
default Iterable<RegistryFactory> getRegisterFactorys() {
return emptyList();
}
/**
* get task plugin factory
* @return registry factory
*/
default Iterable<TaskChannelFactory> getTaskChannelFactorys() {
return emptyList();
}
}
......@@ -17,31 +17,12 @@
package org.apache.dolphinscheduler.spi.alert;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
/**
* Each AlertPlugin need implement this interface
*/
public interface AlertChannelFactory {
/**
* plugin name
* Must be UNIQUE .
* This alert plugin name eg: email , message ...
* Name can often be displayed on the page ui eg : email , message , MR , spark , hive ...
*
* @return this alert plugin name
*/
String getName();
/**
* Returns the configurable parameters that this plugin needs to display on the web ui
*
* @return this alert plugin params
*/
List<PluginParams> getParams();
public interface AlertChannelFactory extends UiChannelFactory {
/**
* The parameters configured in the alert / xxx.properties file will be in the config map
......
package org.apache.dolphinscheduler.spi.common;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
public interface UiChannelFactory {
/**
* plugin name
* Must be UNIQUE .
* This alert plugin name eg: email , message ...
* Name can often be displayed on the page ui eg : email , message , MR , spark , hive ...
*
* @return this alert plugin name
*/
String getName();
/**
* Returns the configurable parameters that this plugin needs to display on the web ui
*
* @return this alert plugin params
*/
List<PluginParams> getParams();
}
......@@ -35,7 +35,7 @@ public class InputParam extends PluginParams {
}
public static Builder newBuilder(String name, String title) {
return new InputParam.Builder(name, title);
return new Builder(name, title);
}
public static class Builder extends PluginParams.Builder {
......
......@@ -35,7 +35,7 @@ public class PasswordParam extends PluginParams {
}
public static Builder newBuilder(String name, String title) {
return new PasswordParam.Builder(name, title);
return new Builder(name, title);
}
public static class Builder extends PluginParams.Builder {
......
......@@ -43,7 +43,7 @@ public class RadioParam extends PluginParams {
}
public static Builder newBuilder(String name, String title) {
return new RadioParam.Builder(name, title);
return new Builder(name, title);
}
public static class Builder extends PluginParams.Builder {
......
......@@ -64,6 +64,11 @@ public class PluginParams {
@JsonProperty(STRING_PLUGIN_PARAM_TITLE)
protected String title;
/**
* prompt information
*/
protected String info;
/**
* default value or value input by user in the page
*/
......@@ -73,6 +78,16 @@ public class PluginParams {
@JsonProperty(STRING_PLUGIN_PARAM_VALIDATE)
protected List<Validate> validateList;
/**
* whether to hide, the default value is false
*/
protected boolean hidden;
/**
* whether to display, the default value is true
*/
protected boolean display;
protected PluginParams(Builder builder) {
requireNonNull(builder, "builder is null");
......@@ -83,6 +98,7 @@ public class PluginParams {
this.name = builder.name;
this.formType = builder.formType.getFormType();
this.title = builder.title;
if (null == builder.props) {
builder.props = new ParamsProps();
}
......@@ -90,6 +106,9 @@ public class PluginParams {
this.props = builder.props;
this.value = builder.value;
this.validateList = builder.validateList;
this.info = builder.info;
this.display = builder.display;
this.hidden = builder.hidden;
}
......@@ -109,8 +128,14 @@ public class PluginParams {
protected Object value;
protected String info;
protected List<Validate> validateList;
protected boolean hidden;
protected boolean display;
public Builder(String name,
FormType formType,
String title) {
......@@ -131,7 +156,10 @@ public class PluginParams {
@JsonProperty("props") ParamsProps props,
@JsonProperty("value") Object value,
@JsonProperty("name") String fieldName,
@JsonProperty("validate") List<Validate> validateList
@JsonProperty("validate") List<Validate> validateList,
@JsonProperty("info") String info,
@JsonProperty("hidden") boolean hidden,
@JsonProperty("display") boolean display
) {
requireNonNull(name, "name is null");
requireNonNull(formType, "formType is null");
......@@ -143,6 +171,9 @@ public class PluginParams {
this.value = value;
this.validateList = validateList;
this.fieldName = fieldName;
this.info = info;
this.hidden = hidden;
this.display = display;
}
public PluginParams build() {
......@@ -177,6 +208,7 @@ public class PluginParams {
public void setValue(Object value) {
this.value = value;
}
}
......@@ -59,7 +59,7 @@ public class Validate {
}
public static Builder newBuilder() {
return new Validate.Builder();
return new Builder();
}
@JsonPOJOBuilder(buildMethodName = "build", withPrefix = "set")
......
package org.apache.dolphinscheduler.spi.task;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* job params related class
*/
public abstract class AbstractParameters implements IParameters {
/**
* local parameters
*/
private List<Property> localParams;
/**
* get local parameters list
* @return Property list
*/
public List<Property> getLocalParams() {
return localParams;
}
public void setLocalParams(List<Property> localParams) {
this.localParams = localParams;
}
/**
* get local parameters map
* @return parameters map
*/
public Map<String, Property> getLocalParametersMap() {
if (localParams != null) {
Map<String, Property> localParametersMaps = new LinkedHashMap<>();
for (Property property : localParams) {
localParametersMaps.put(property.getProp(),property);
}
return localParametersMaps;
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
/**
* executive task
*/
public abstract class AbstractTask {
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
/**
* varPool string
*/
protected String varPool;
/**
* taskExecutionContext
**/
TaskRequest taskRequest;
/**
* log record
*/
protected Logger logger;
/**
* SHELL process pid
*/
protected int processId;
/**
* SHELL result string
*/
protected String resultString;
/**
* other resource manager appId , for example : YARN etc
*/
protected String appIds;
/**
* cancel
*/
protected volatile boolean cancel = false;
/**
* exit code
*/
protected volatile int exitStatusCode = -1;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
protected AbstractTask(TaskRequest taskExecutionContext, Logger logger) {
this.taskRequest = taskExecutionContext;
this.logger = logger;
}
/**
* init task
*/
public void init() {
}
public String getPreScript() {
return null;
}
public void setCommand(String command) throws Exception {
}
/**
* task handle
*
* @throws Exception exception
*/
public abstract void handle() throws Exception;
/**
* cancel application
*
* @param status status
* @throws Exception exception
*/
public void cancelApplication(boolean status) throws Exception {
this.cancel = status;
}
/**
* log handle
*
* @param logs log list
*/
public void logHandle(List<String> logs) {
// note that the "new line" is added here to facilitate log parsing
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
logger.info(" -> {}", String.join("\n\t", logs));
}
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
/**
* get exit status code
*
* @return exit status code
*/
public int getExitStatusCode() {
return exitStatusCode;
}
public void setExitStatusCode(int exitStatusCode) {
this.exitStatusCode = exitStatusCode;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
/**
* get task parameters
*
* @return AbstractParameters
*/
public abstract AbstractParameters getParameters();
/**
* result processing maybe
*/
public void after() {
}
/**
* get exit status according to exitCode
*
* @return exit status
*/
public ExecutionStatus getExitStatus() {
ExecutionStatus status;
switch (getExitStatusCode()) {
case TaskConstants.EXIT_CODE_SUCCESS:
status = ExecutionStatus.SUCCESS;
break;
case TaskConstants.EXIT_CODE_KILL:
status = ExecutionStatus.KILL;
break;
default:
status = ExecutionStatus.FAILURE;
break;
}
return status;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
/**
* parameter of stored procedure
*/
public enum Direct {
/**
* 0 in; 1 out;
*/
IN,OUT
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import java.util.HashMap;
/**
* running status for workflow and task nodes
*/
public enum ExecutionStatus {
/**
* status:
* 0 submit success
* 1 running
* 2 ready pause
* 3 pause
* 4 ready stop
* 5 stop
* 6 failure
* 7 success
* 8 need fault tolerance
* 9 kill
* 10 waiting thread
* 11 waiting depend node complete
* 12 delay execution
* 13 forced success
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
READY_PAUSE(2, "ready pause"),
PAUSE(3, "pause"),
READY_STOP(4, "ready stop"),
STOP(5, "stop"),
FAILURE(6, "failure"),
SUCCESS(7, "success"),
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
ExecutionStatus(int code, String descp) {
this.code = code;
this.descp = descp;
}
private final int code;
private final String descp;
private static final HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
static {
for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
}
}
/**
* status is success
*
* @return status
*/
public boolean typeIsSuccess() {
return this == SUCCESS || this == FORCED_SUCCESS;
}
/**
* status is failure
*
* @return status
*/
public boolean typeIsFailure() {
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**
* status is finished
*
* @return status
*/
public boolean typeIsFinished() {
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsStop();
}
/**
* status is waiting thread
*
* @return status
*/
public boolean typeIsWaitingThread() {
return this == WAITTING_THREAD;
}
/**
* status is pause
*
* @return status
*/
public boolean typeIsPause() {
return this == PAUSE;
}
/**
* status is pause
*
* @return status
*/
public boolean typeIsStop() {
return this == STOP;
}
/**
* status is running
*
* @return status
*/
public boolean typeIsRunning() {
return this == RUNNING_EXECUTION || this == WAITTING_DEPEND || this == DELAY_EXECUTION;
}
/**
* status is cancel
*
* @return status
*/
public boolean typeIsCancel() {
return this == KILL || this == STOP;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public static ExecutionStatus of(int status) {
if (EXECUTION_STATUS_MAP.containsKey(status)) {
return EXECUTION_STATUS_MAP.get(status);
}
throw new IllegalArgumentException("invalid status : " + status);
}
}
package org.apache.dolphinscheduler.spi.task;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.List;
/**
* job params interface
*/
public interface IParameters {
/**
* check parameters is valid
*
* @return result
*/
boolean checkParameters();
/**
* get project resource files list
*
* @return resource files list
*/
List<ResourceInfo> getResourceFilesList();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.params.base.DataType;
import java.io.Serializable;
import java.util.Objects;
public class Property implements Serializable {
private static final long serialVersionUID = -4045513703397452451L;
/**
* key
*/
private String prop;
/**
* input/output
*/
private Direct direct;
/**
* data type
*/
private DataType type;
/**
* value
*/
private String value;
public Property() {
}
public Property(String prop, Direct direct, DataType type, String value) {
this.prop = prop;
this.direct = direct;
this.type = type;
this.value = value;
}
/**
* getter method
*
* @return the prop
* @see Property#prop
*/
public String getProp() {
return prop;
}
/**
* setter method
*
* @param prop the prop to set
* @see Property#prop
*/
public void setProp(String prop) {
this.prop = prop;
}
/**
* getter method
*
* @return the value
* @see Property#value
*/
public String getValue() {
return value;
}
/**
* setter method
*
* @param value the value to set
* @see Property#value
*/
public void setValue(String value) {
this.value = value;
}
public Direct getDirect() {
return direct;
}
public void setDirect(Direct direct) {
this.direct = direct;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Property property = (Property) o;
return Objects.equals(prop, property.prop)
&& Objects.equals(value, property.value);
}
@Override
public int hashCode() {
return Objects.hash(prop, value);
}
@Override
public String toString() {
return "Property{"
+ "prop='" + prop + '\''
+ ", direct=" + direct
+ ", type=" + type
+ ", value='" + value + '\''
+ '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
/**
* resource info
*/
public class ResourceInfo {
/**
* res the name of the resource that was uploaded
*/
private int id;
private String resourceName;
private String res;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getRes() {
return res;
}
public void setRes(String res) {
this.res = res;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
}
package org.apache.dolphinscheduler.spi.task;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.slf4j.Logger;
public interface TaskChannel {
void cancelApplication(boolean status);
AbstractTask createTask(TaskRequest taskRequest,Logger logger);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
public interface TaskChannelFactory extends UiChannelFactory {
TaskChannel create();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
public class TaskConstants {
private TaskConstants() {
throw new IllegalStateException("Utility class");
}
public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
/**
* string false
*/
public static final String STRING_FALSE = "false";
/**
* exit code kill
*/
public static final int EXIT_CODE_KILL = 137;
public static final String PID = "pid";
/**
* comma ,
*/
public static final String COMMA = ",";
/**
* sleep time
*/
public static final int SLEEP_TIME_MILLIS = 1000;
/**
* exit code failure
*/
public static final int EXIT_CODE_FAILURE = -1;
/**
* exit code success
*/
public static final int EXIT_CODE_SUCCESS = 0;
public static final String SH = "sh";
/**
* default log cache rows num,output when reach the number
*/
public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
/**
* log flush interval?output when reach the interval
*/
public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000;
/**
* pstree, get pud and sub pid
*/
public static final String PSTREE = "pstree";
public static final String RWXR_XR_X = "rwxr-xr-x";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import java.util.Date;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonFormat;
public class TaskRequest {
/**
* task id
*/
private int taskInstanceId;
/**
* task name
*/
private String taskName;
/**
* task first submit time.
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date firstSubmitTime;
/**
* task start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
/**
* task type
*/
private String taskType;
/**
* host
*/
private String host;
/**
* task execute path
*/
private String executePath;
/**
* log path
*/
private String logPath;
/**
* task json
*/
private String taskJson;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
/**
* process instance id
*/
private int processInstanceId;
/**
* process instance schedule time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date scheduleTime;
/**
* process instance global parameters
*/
private String globalParams;
/**
* execute user id
*/
private int executorId;
/**
* command type if complement
*/
private int cmdTypeIfComplement;
/**
* tenant code
*/
private String tenantCode;
/**
* task queue
*/
private String queue;
/**
* process define id
*/
private int processDefineId;
/**
* project id
*/
private int projectId;
/**
* taskParams
*/
private String taskParams;
/**
* envFile
*/
private String envFile;
/**
* definedParams
*/
private Map<String, String> definedParams;
/**
* task AppId
*/
private String taskAppId;
/**
* task timeout strategy
*/
private int taskTimeoutStrategy;
/**
* task timeout
*/
private int taskTimeout;
/**
* worker group
*/
private String workerGroup;
/**
* delay execution time.
*/
private int delayTime;
/**
* resources full name and tenant code
*/
private Map<String, String> resources;
private Map<String, Property> paramsMap;
public Map<String, String> getResources() {
return resources;
}
public void setResources(Map<String, String> resources) {
this.resources = resources;
}
public Map<String, Property> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, Property> paramsMap) {
this.paramsMap = paramsMap;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getFirstSubmitTime() {
return firstSubmitTime;
}
public void setFirstSubmitTime(Date firstSubmitTime) {
this.firstSubmitTime = firstSubmitTime;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public int getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public int getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
public int getProjectId() {
return projectId;
}
public void setProjectId(int projectId) {
this.projectId = projectId;
}
public String getTaskParams() {
return taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public String getEnvFile() {
return envFile;
}
public void setEnvFile(String envFile) {
this.envFile = envFile;
}
public Map<String, String> getDefinedParams() {
return definedParams;
}
public void setDefinedParams(Map<String, String> definedParams) {
this.definedParams = definedParams;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public int getTaskTimeoutStrategy() {
return taskTimeoutStrategy;
}
public void setTaskTimeoutStrategy(int taskTimeoutStrategy) {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}
public int getTaskTimeout() {
return taskTimeout;
}
public void setTaskTimeout(int taskTimeout) {
this.taskTimeout = taskTimeout;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.SH;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
/**
* abstract command executor
*/
public abstract class AbstractCommandExecutor {
/**
* rules for extracting application ID
*/
protected static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.APPLICATION_REGEX);
protected StringBuilder varPool = new StringBuilder();
/**
* process
*/
private Process process;
/**
* log handler
*/
protected Consumer<List<String>> logHandler;
/**
* logger
*/
protected Logger logger;
/**
* log list
*/
protected List<String> logBuffer;
protected boolean logOutputIsSuccess = false;
/*
* SHELL result string
*/
protected String taskResultString;
/**
* taskRequest
*/
protected TaskRequest taskRequest;
public AbstractCommandExecutor(Consumer<List<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logger = logger;
this.logBuffer = Collections.synchronizedList(new ArrayList<>());
}
public AbstractCommandExecutor(List<String> logBuffer) {
this.logBuffer = logBuffer;
}
/**
* build process
*
* @param commandFile command file
* @throws IOException IO Exception
*/
private void buildProcess(String commandFile) throws IOException {
// setting up user to run commands
List<String> command = new LinkedList<>();
//init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskRequest.getExecutePath()));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
command.add("sudo");
command.add("-u");
command.add(taskRequest.getTenantCode());
command.add(SH);
command.addAll(Collections.emptyList());
command.add(commandFile);
// setting commands
processBuilder.command(command);
process = processBuilder.start();
// print command
printCommand(command);
}
public TaskResponse run(String execCommand) throws IOException, InterruptedException {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
String commandFilePath = buildCommandFilePath();
// create command file if not exists
createCommandFileIfNotExists(execCommand, commandFilePath);
//build process
buildProcess(commandFilePath);
// parse process output
parseProcessOutput(process);
int processId = getProcessId(process);
result.setProcessId(processId);
// cache processId
taskRequest.setProcessId(processId);
boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskRequest);
result.setStatus(TaskRunStatus.FAIL_AND_NEED_KILL);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
// print process id
logger.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly
long remainTime = getRemainTime();
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskRequest.getExecutePath(),
processId
, result.getExitStatusCode());
// if SHELL task exit
if (status) {
// set appIds
List<String> appIds = getAppIds(taskRequest.getLogPath());
result.setAppIds(String.join(TaskConstants.COMMA, appIds));
// SHELL task state
result.setExitStatusCode(process.exitValue());
} else {
logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode());
ProcessUtils.kill(taskRequest);
result.setStatus(TaskRunStatus.FAIL_AND_NEED_KILL);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
return result;
}
public String getVarPool() {
return varPool.toString();
}
/**
* cancel application
*
* @throws Exception exception
*/
public void cancelApplication() throws Exception {
if (process == null) {
return;
}
// clear log
clear();
int processId = getProcessId(process);
logger.info("cancel process: {}", processId);
// kill , waiting for completion
boolean killed = softKill(processId);
if (!killed) {
// hard kill
hardKill(processId);
// destory
process.destroy();
process = null;
}
}
/**
* soft kill
*
* @param processId process id
* @return process is alive
* @throws InterruptedException interrupted exception
*/
private boolean softKill(int processId) {
if (processId != 0 && process.isAlive()) {
try {
// sudo -u user command to run command
String cmd = String.format("kill %d", processId);
cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
logger.info("soft kill task:{}, process id:{}, cmd:{}", taskRequest.getTaskAppId(), processId, cmd);
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
logger.info("kill attempt failed", e);
}
}
return process.isAlive();
}
/**
* hard kill
*
* @param processId process id
*/
private void hardKill(int processId) {
if (processId != 0 && process.isAlive()) {
try {
String cmd = String.format("kill -9 %d", processId);
cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd);
logger.info("hard kill task:{}, process id:{}, cmd:{}", taskRequest.getTaskAppId(), processId, cmd);
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
logger.error("kill attempt failed ", e);
}
}
}
/**
* print command
*
* @param commands process builder
*/
private void printCommand(List<String> commands) {
String cmdStr;
try {
cmdStr = ProcessUtils.buildCommandStr(commands);
logger.info("task run command:\n{}", cmdStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* clear
*/
private void clear() {
List<String> markerList = new ArrayList<>();
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
logBuffer.clear();
}
logHandler.accept(markerList);
}
/**
* get the standard output of the process
*
* @param process process
*/
private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId());
ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
getOutputLogService.submit(() -> {
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
logBuffer.add("welcome to use bigdata scheduling system...");
while ((line = inReader.readLine()) != null) {
if (line.startsWith("${setValue(")) {
varPool.append(line, "${setValue(".length(), line.length() - 2);
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
taskResultString = line;
}
}
logOutputIsSuccess = true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
logOutputIsSuccess = true;
}
});
getOutputLogService.shutdown();
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(() -> {
try {
long lastFlushTime = System.currentTimeMillis();
while (logBuffer.size() > 0 || !logOutputIsSuccess) {
if (logBuffer.size() > 0) {
lastFlushTime = flush(lastFlushTime);
} else {
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
clear();
}
});
parseProcessOutputExecutorService.shutdown();
}
public int getProcessId() {
return getProcessId(process);
}
/**
* get app links
*
* @param logPath log path
* @return app id list
*/
private List<String> getAppIds(String logPath) {
List<String> logs = convertFile2List(logPath);
List<String> appIds = new ArrayList<>();
/**
* analysis log?get submited yarn application id
*/
for (String log : logs) {
String appId = findAppId(log);
if (StringUtils.isNotEmpty(appId) && !appIds.contains(appId)) {
logger.info("find app id: {}", appId);
appIds.add(appId);
}
}
return appIds;
}
/**
* convert file to list
*
* @param filename file name
* @return line list
*/
private List<String> convertFile2List(String filename) {
List lineList = new ArrayList<String>(100);
File file = new File(filename);
if (!file.exists()) {
return lineList;
}
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
lineList.add(line);
}
} catch (Exception e) {
logger.error(String.format("read file: %s failed : ", filename), e);
}
return lineList;
}
/**
* find app id
*
* @param line line
* @return appid
*/
private String findAppId(String line) {
Matcher matcher = APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group();
}
return null;
}
/**
* get remain time(s)
*
* @return remain time
*/
private long getRemainTime() {
long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime().getTime()) / 1000;
long remainTime = taskRequest.getTaskTimeout() - usedTime;
if (remainTime < 0) {
throw new RuntimeException("task execution time out");
}
return remainTime;
}
/**
* get process id
*
* @param process process
* @return process id
*/
private int getProcessId(Process process) {
int processId = 0;
try {
Field f = process.getClass().getDeclaredField(TaskConstants.PID);
f.setAccessible(true);
processId = f.getInt(process);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return processId;
}
/**
* when log buffer siz or flush time reach condition , then flush
*
* @param lastFlushTime last flush time
* @return last flush time
*/
private long flush(long lastFlushTime) {
long now = System.currentTimeMillis();
/**
* when log buffer siz or flush time reach condition , then flush
*/
if (logBuffer.size() >= TaskConstants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL) {
lastFlushTime = now;
/** log handle */
logHandler.accept(logBuffer);
logBuffer.clear();
}
return lastFlushTime;
}
protected List<String> commandOptions() {
return Collections.emptyList();
}
protected abstract String buildCommandFilePath();
protected abstract String commandInterpreter();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
public String getTaskResultString() {
return taskResultString;
}
public void setTaskResultString(String taskResultString) {
this.taskResultString = taskResultString;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A base class for running a Unix command.
*
* <code>AbstractShell</code> can be used to run unix commands like <code>du</code> or
* <code>df</code>. It also offers facilities to gate commands by
* time-intervals.
*/
public abstract class AbstractShell {
private static final Logger logger = LoggerFactory.getLogger(AbstractShell.class);
/**
* Time after which the executing script would be timedout
*/
protected long timeOutInterval = 0L;
/**
* If or not script timed out
*/
private AtomicBoolean timedOut;
/**
* refresh interval in msec
*/
private long interval;
/**
* last time the command was performed
*/
private long lastTime;
/**
* env for the command execution
*/
private Map<String, String> environment;
private File dir;
/**
* sub process used to execute the command
*/
private Process process;
private int exitCode;
/**
* If or not script finished executing
*/
private AtomicBoolean completed;
AbstractShell() {
this(0L);
}
/**
* @param interval the minimum duration to wait before re-executing the
* command.
*/
AbstractShell(long interval) {
this.interval = interval;
this.lastTime = (interval < 0) ? 0 : -interval;
}
/**
* set the environment for the command
*
* @param env Mapping of environment variables
*/
protected void setEnvironment(Map<String, String> env) {
this.environment = env;
}
/**
* set the working directory
*
* @param dir The directory where the command would be executed
*/
protected void setWorkingDirectory(File dir) {
this.dir = dir;
}
/**
* check to see if a command needs to be executed and execute if needed
*
* @throws IOException errors
*/
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis()) {
return;
}
// reset for next run
exitCode = 0;
runCommand();
}
/**
* Run a command actual work
*/
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask;
timedOut = new AtomicBoolean(false);
completed = new AtomicBoolean(false);
if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}
process = builder.start();
ProcessContainer.putProcess(process);
if (timeOutInterval > 0) {
timeOutTimer = new Timer();
timeoutTimerTask = new ShellTimeoutTimerTask(
this);
//One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(
new InputStreamReader(process.getErrorStream()));
BufferedReader inReader =
new BufferedReader(
new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder();
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while ((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch (IOException ioe) {
logger.warn("Error reading the error stream", ioe);
}
}
};
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
logger.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try {
errThread.start();
inThread.start();
} catch (IllegalStateException e) {
logger.error(" read error and input streams start error", e);
}
try {
// parse the output
exitCode = process.waitFor();
try {
// make sure that the error and in thread exits
errThread.join();
inThread.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted while reading the error and in stream", ie);
}
completed.compareAndSet(false, true);
//the timeout thread handling
//taken care in finally block
if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
if ((timeOutTimer != null) && !timedOut.get()) {
timeOutTimer.cancel();
}
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
logger.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
}
try {
errReader.close();
} catch (IOException ioe) {
logger.warn("Error while closing the error stream", ioe);
}
ProcessContainer.removeProcess(process);
process.destroy();
lastTime = System.currentTimeMillis();
}
}
/**
* @return an array containing the command name and its parameters
*/
protected abstract String[] getExecString();
/**
* Parse the execution result
*
* @param lines lines
* @throws IOException errors
*/
protected abstract void parseExecResult(BufferedReader lines)
throws IOException;
/**
* get the current sub-process executing the given command
*
* @return process executing the command
*/
public Process getProcess() {
return process;
}
/**
* get the exit code
*
* @return the exit code of the process
*/
public int getExitCode() {
return exitCode;
}
/**
* Set if the command has timed out.
*/
private void setTimedOut() {
this.timedOut.set(true);
}
/**
* Timer which is used to timeout scripts spawned off by shell.
*/
private static class ShellTimeoutTimerTask extends TimerTask {
private AbstractShell shell;
public ShellTimeoutTimerTask(AbstractShell shell) {
this.shell = shell;
}
@Override
public void run() {
Process p = shell.getProcess();
try {
p.exitValue();
} catch (Exception e) {
//Process has not terminated.
//So check if it has completed
//if not just destroy it.
if (p != null && !shell.completed.get()) {
shell.setTimedOut();
p.destroy();
}
}
}
}
/**
* This is an IOException with exit code added.
*/
public static class ExitCodeException extends IOException {
private final int exitCode;
public ExitCodeException(int exitCode, String message) {
super(message);
this.exitCode = exitCode;
}
public int getExitCode() {
return exitCode;
}
}
/**
* process manage container
*/
public static class ProcessContainer extends ConcurrentHashMap<Integer, Process> {
private static final ProcessContainer container = new ProcessContainer();
private ProcessContainer() {
super();
}
public static final ProcessContainer getInstance() {
return container;
}
public static void putProcess(Process process) {
getInstance().put(process.hashCode(), process);
}
public static int processSize() {
return getInstance().size();
}
public static void removeProcess(Process process) {
getInstance().remove(process.hashCode());
}
public static void destroyAllProcess() {
Set<Entry<Integer, Process>> set = getInstance().entrySet();
for (Entry<Integer, Process> entry : set) {
try {
entry.getValue().destroy();
} catch (Exception e) {
logger.error("Destroy All Processes error", e);
}
}
logger.info("close " + set.size() + " executing process tasks");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.slf4j.Logger;
/**
* abstract yarn task
*/
public abstract class AbstractYarnTask extends AbstractTask {
/**
* process task
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* Abstract Yarn Task
*
* @param taskRequest taskRequest
* @param logger logger
*/
public AbstractYarnTask(TaskRequest taskRequest, Logger logger) {
super(taskRequest, logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
logger);
}
@Override
public void handle() throws Exception {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(getCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
setProcessId(response.getProcessId());
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
throw e;
}
}
/**
* cancel application
*
* @param status status
* @throws Exception exception
*/
@Override
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
//todo 交给上层处理
shellCommandExecutor.cancelApplication();
// TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
// if (status && taskInstance != null){
// ProcessUtils.killYarnJob(taskExecutionContext);
// }
}
/**
* create command
*
* @return String
* @throws Exception exception
*/
protected abstract String getCommand();
/**
* set main jar name
*/
protected abstract void setMainJarName();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
public class ArgsUtils {
private ArgsUtils() throws IllegalStateException {
throw new IllegalStateException("Utility class");
}
public static String escape(String arg) {
return arg.replace(" ", "\\ ").replace("\"", "\\\"").replace("'", "\\'");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
/**
* logger utils
*/
public class LoggerUtils {
private static final String APPLICATION_REGEX_NAME = "application_\\d+_\\d+";
private LoggerUtils() {
throw new UnsupportedOperationException("Construct LoggerUtils");
}
/**
* rules for extracting application ID
*/
private static final Pattern APPLICATION_REGEX = Pattern.compile(APPLICATION_REGEX_NAME);
/**
* Task Logger's prefix
*/
public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
/**
* Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* Task Logger Thread's name
*/
public static final String TASK_APPID_LOG_FORMAT = "[taskAppId=";
/**
* build job id
*
* @param affix Task Logger's prefix
* @param processDefId process define id
* @param processInstId process instance id
* @param taskId task id
* @return task id format
*/
public static String buildTaskId(String affix,
int processDefId,
int processInstId,
int taskId) {
// - [taskAppId=TASK_79_4084_15210]
return String.format(" - %s%s-%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix,
processDefId,
processInstId,
taskId);
}
/**
* processing log
* get yarn application id list
*
* @param log log content
* @param logger logger
* @return app id list
*/
public static List<String> getAppIds(String log, Logger logger) {
List<String> appIds = new ArrayList<>();
Matcher matcher = APPLICATION_REGEX.matcher(log);
// analyse logs to get all submit yarn application id
while (matcher.find()) {
String appId = matcher.group();
if (!appIds.contains(appId)) {
logger.info("find app id: {}", appId);
appIds.add(appId);
}
}
return appIds;
}
public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
}
public static void logError(Optional<Logger> optionalLogger
, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
}
public static void logError(Optional<Logger> optionalLogger
, String error, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
}
public static void logInfo(Optional<Logger> optionalLogger
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.StringTokenizer;
public class OSUtils {
private OSUtils() {
throw new IllegalStateException("Utility class");
}
/**
* get sudo command
*
* @param tenantCode tenantCode
* @param command command
* @return result of sudo execute command
*/
public static String getSudoCmd(String tenantCode, String command) {
return StringUtils.isEmpty(tenantCode) ? command : "sudo -u " + tenantCode + " " + command;
}
/**
* whether is macOS
*
* @return true if mac
*/
public static boolean isMacOS() {
return getOSName().startsWith("Mac");
}
/**
* whether is windows
*
* @return true if windows
*/
public static boolean isWindows() {
return getOSName().startsWith("Windows");
}
/**
* Execute the corresponding command of Linux or Windows
*
* @param command command
* @return result of execute command
* @throws IOException errors
*/
public static String exeCmd(String command) throws IOException {
StringTokenizer st = new StringTokenizer(command);
String[] cmdArray = new String[st.countTokens()];
for (int i = 0; st.hasMoreTokens(); i++) {
cmdArray[i] = st.nextToken();
}
return exeShell(cmdArray);
}
/**
* Execute the shell
*
* @param command command
* @return result of execute the shell
* @throws IOException errors
*/
public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command);
}
public static String getOSName() {
return System.getProperty("os.name");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProcessUtils {
private ProcessUtils() {
throw new IllegalStateException("Utility class");
}
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
/**
* Expression of PID recognition in Windows scene
*/
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands";
/**
* verification cmd bat.
*/
private static final int VERIFICATION_CMD_BAT = 0;
/**
* verification legacy.
*/
private static final int VERIFICATION_LEGACY = 2;
/**
* escape verification.
*/
private static final char[][] ESCAPE_VERIFICATION = {{' ', '\t', '<', '>', '&', '|', '^'}, {' ', '\t', '<', '>'}, {' ', '\t'}};
/**
* verification win32.
*/
private static final int VERIFICATION_WIN32 = 1;
/**
* Lazy Pattern.
*/
private static class LazyPattern {
/**
* Escape-support version:
* "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
*/
private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\"");
}
/**
* build command line characters.
*
* @param commandList command list
* @return command
*/
public static String buildCommandStr(List<String> commandList) {
String cmdstr;
String[] cmd = commandList.toArray(new String[0]);
SecurityManager security = System.getSecurityManager();
boolean allowAmbiguousCommands = isAllowAmbiguousCommands(security);
if (allowAmbiguousCommands) {
String executablePath = new File(cmd[0]).getPath();
if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
executablePath = quoteString(executablePath);
}
cmdstr = createCommandLine(
VERIFICATION_LEGACY, executablePath, cmd);
} else {
String executablePath;
try {
executablePath = getExecutablePath(cmd[0]);
} catch (IllegalArgumentException e) {
StringBuilder join = new StringBuilder();
for (String s : cmd) {
join.append(s).append(' ');
}
cmd = getTokensFromCommand(join.toString());
executablePath = getExecutablePath(cmd[0]);
// Check new executable name once more
if (security != null) {
security.checkExec(executablePath);
}
}
cmdstr = createCommandLine(
isShellFile(executablePath) ? VERIFICATION_CMD_BAT : VERIFICATION_WIN32, quoteString(executablePath), cmd);
}
return cmdstr;
}
/**
* whether is shell file.
*
* @param executablePath executable path
* @return true if endsWith .CMD or .BAT
*/
private static boolean isShellFile(String executablePath) {
String upPath = executablePath.toUpperCase();
return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT"));
}
/**
* create command line.
*
* @param verificationType verification type
* @param executablePath executable path
* @param cmd cmd
* @return command line
*/
private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) {
StringBuilder cmdbuf = new StringBuilder(80);
cmdbuf.append(executablePath);
for (int i = 1; i < cmd.length; ++i) {
cmdbuf.append(' ');
String s = cmd[i];
if (needsEscaping(verificationType, s)) {
cmdbuf.append('"').append(s);
if ((verificationType != VERIFICATION_CMD_BAT) && s.endsWith("\\")) {
cmdbuf.append('\\');
}
cmdbuf.append('"');
} else {
cmdbuf.append(s);
}
}
return cmdbuf.toString();
}
/**
* check is allow ambiguous commands
*
* @param security security manager
* @return allow ambiguous command flag
*/
private static boolean isAllowAmbiguousCommands(SecurityManager security) {
boolean allowAmbiguousCommands = false;
if (security == null) {
allowAmbiguousCommands = true;
String value = System.getProperty(LOCAL_PROCESS_EXEC);
if (value != null) {
allowAmbiguousCommands = !TaskConstants.STRING_FALSE.equalsIgnoreCase(value);
}
}
return allowAmbiguousCommands;
}
/**
* whether needs escaping.
*
* @param verificationType verification type
* @param arg arg
* @return boolean
*/
private static boolean needsEscaping(int verificationType, String arg) {
boolean argIsQuoted = isQuoted((verificationType == VERIFICATION_CMD_BAT), arg, "Argument has embedded quote, use the explicit CMD.EXE call.");
if (!argIsQuoted) {
char[] testEscape = ESCAPE_VERIFICATION[verificationType];
for (char c : testEscape) {
if (arg.indexOf(c) >= 0) {
return true;
}
}
}
return false;
}
/**
* whether is quoted.
*
* @param noQuotesInside no quotes inside
* @param arg arg
* @param errorMessage error message
* @return boolean
*/
private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) {
int lastPos = arg.length() - 1;
if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') {
// The argument has already been quoted.
if (noQuotesInside && arg.indexOf('"', 1) != lastPos) {
// There is ["] inside.
throw new IllegalArgumentException(errorMessage);
}
return true;
}
if (noQuotesInside && arg.indexOf('"') >= 0) {
// There is ["] inside.
throw new IllegalArgumentException(errorMessage);
}
return false;
}
/**
* quote string.
*
* @param arg argument
* @return format arg
*/
private static String quoteString(String arg) {
return '"' + arg + '"';
}
/**
* get executable path.
*
* @param path path
* @return executable path
*/
private static String getExecutablePath(String path) {
boolean pathIsQuoted = isQuoted(true, path, "Executable name has embedded quote, split the arguments");
File fileToRun = new File(pathIsQuoted ? path.substring(1, path.length() - 1) : path);
return fileToRun.getPath();
}
/**
* get tokens from command.
*
* @param command command
* @return token string array
*/
private static String[] getTokensFromCommand(String command) {
ArrayList<String> matchList = new ArrayList<>(8);
Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
while (regexMatcher.find()) {
matchList.add(regexMatcher.group());
}
return matchList.toArray(new String[0]);
}
/**
* kill tasks according to different task types.
*/
public static void kill(TaskRequest request) {
try {
int processId = request.getProcessId();
if (processId == 0) {
logger.error("process kill failed, process id :{}, task id:{}",
processId, request.getTaskInstanceId());
return;
}
String cmd = String.format("kill -9 %s", getPidsStr(processId));
cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
// find log and kill yarn job
// killYarnJob(request);
} catch (Exception e) {
logger.error("kill task failed", e);
}
}
/**
* get pids str.
*
* @param processId process id
* @return pids pid String
* @throws Exception exception
*/
public static String getPidsStr(int processId) throws Exception {
StringBuilder sb = new StringBuilder();
Matcher mat = null;
// pstree pid get sub pids
if (OSUtils.isMacOS()) {
String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
if (null != pids) {
mat = MACPATTERN.matcher(pids);
}
} else {
String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
mat = WINDOWSATTERN.matcher(pids);
}
if (null != mat) {
while (mat.find()) {
sb.append(mat.group(1)).append(" ");
}
}
return sb.toString().trim();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
/**
* shell command executor
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
/**
* For Unix-like, using sh
*/
private static final String SH = "sh";
/**
* For Windows, using cmd.exe
*/
private static final String CMD = "cmd.exe";
/**
* constructor
*
* @param logHandler logHandler
* @param taskRequest taskRequest
* @param logger logger
*/
public ShellCommandExecutor(Consumer<List<String>> logHandler,
TaskRequest taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
public ShellCommandExecutor(List<String> logBuffer) {
super(logBuffer);
}
@Override
protected String buildCommandFilePath() {
// command file
return String.format("%s/%s.%s"
, taskRequest.getExecutePath()
, taskRequest.getTaskAppId()
, OSUtils.isWindows() ? "bat" : "command");
}
/**
* get command type
*
* @return command type
*/
@Override
protected String commandInterpreter() {
return OSUtils.isWindows() ? CMD : SH;
}
/**
* create command file if not exists
*
* @param execCommand exec command
* @param commandFile command file
* @throws IOException io exception
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("tenantCode user:{}, task dir:{}", taskRequest.getTenantCode(),
taskRequest.getTaskAppId());
// create if non existence
if (!Files.exists(Paths.get(commandFile))) {
logger.info("create command file:{}", commandFile);
StringBuilder sb = new StringBuilder();
if (OSUtils.isWindows()) {
sb.append("@echo off\n");
sb.append("cd /d %~dp0\n");
if (taskRequest.getEnvFile() != null) {
sb.append("call ").append(taskRequest.getEnvFile()).append("\n");
}
} else {
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
if (taskRequest.getEnvFile() != null) {
sb.append("source ").append(taskRequest.getEnvFile()).append("\n");
}
}
sb.append(execCommand);
logger.info("command : {}", sb);
// write data to file
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
* shell command executor.
*
* <code>ShellExecutor</code> should be used in cases where the output
* of the command needs no explicit parsing and where the command, working
* directory and the environment remains unchanged. The output of the command
* is stored as-is and is expected to be small.
*/
public class ShellExecutor extends AbstractShell {
private String[] command;
private StringBuilder output;
public ShellExecutor(String... execString) {
this(execString, null);
}
public ShellExecutor(String[] execString, File dir) {
this(execString, dir, null);
}
public ShellExecutor(String[] execString, File dir,
Map<String, String> env) {
this(execString, dir, env, 0L);
}
/**
* Create a new instance of the ShellExecutor to execute a command.
*
* @param execString The command to execute with arguments
* @param dir If not-null, specifies the directory which should be set
* as the current working directory for the command.
* If null, the current working directory is not modified.
* @param env If not-null, environment of the command will include the
* key-value pairs specified in the map. If null, the current
* environment is not modified.
* @param timeout Specifies the time in milliseconds, after which the
* command will be killed and the status marked as timedout.
* If 0, the command will not be timed out.
*/
public ShellExecutor(String[] execString, File dir,
Map<String, String> env, long timeout) {
command = execString.clone();
if (dir != null) {
setWorkingDirectory(dir);
}
if (env != null) {
setEnvironment(env);
}
timeOutInterval = timeout;
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param cmd shell command to execute.
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(String... cmd) throws IOException {
return execCommand(null, cmd, 0L);
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @param timeout time in milliseconds after which script should be marked timeout
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(Map<String, String> env, String[] cmd,
long timeout) throws IOException {
ShellExecutor exec = new ShellExecutor(cmd, null, env,
timeout);
exec.execute();
return exec.getOutput();
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(Map<String, String> env, String... cmd)
throws IOException {
return execCommand(env, cmd, 0L);
}
/**
* Execute the shell command
*
* @throws IOException errors
*/
public void execute() throws IOException {
this.run();
}
@Override
protected String[] getExecString() {
return command;
}
@Override
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuilder();
char[] buf = new char[1024];
int nRead;
String line = "";
while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
line = new String(buf, 0, nRead);
output.append(line);
}
}
/**
* @return the output of the shell command
*/
public String getOutput() {
return (output == null) ? "" : output.toString();
}
/**
* Returns the commands of this instance.
* Arguments with spaces in are presented with quotes round; other
* arguments are presented raw
*
* @return a string representation of the object
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
String[] args = getExecString();
for (String s : args) {
if (s.indexOf(' ') >= 0) {
builder.append('"').append(s).append('"');
} else {
builder.append(s);
}
builder.append(' ');
}
return builder.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
public class TaskException extends RuntimeException {
private static final long serialVersionUID = 8155449302457294758L;
public TaskException() {
super();
}
public TaskException(String msg, Throwable cause) {
super(msg, cause);
}
public TaskException(String msg) {
super(msg);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TaskExecutionContextCacheManager {
private TaskExecutionContextCacheManager() {
throw new IllegalStateException("Utility class");
}
/**
* taskInstance cache
*/
private static Map<Integer, TaskRequest> taskRequestContextCache = new ConcurrentHashMap<>();
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
public static TaskRequest getByTaskInstanceId(Integer taskInstanceId) {
return taskRequestContextCache.get(taskInstanceId);
}
/**
* cache taskInstance
*
* @param request request
*/
public static void cacheTaskExecutionContext(TaskRequest request) {
taskRequestContextCache.put(request.getTaskInstanceId(), request);
}
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
public static void removeByTaskInstanceId(Integer taskInstanceId) {
taskRequestContextCache.remove(taskInstanceId);
}
public static boolean updateTaskExecutionContext(TaskRequest request) {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
return taskRequestContextCache.containsKey(request.getTaskInstanceId());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
public class TaskResponse {
/**
* varPool string
*/
private String varPool;
/**
* SHELL process pid
*/
private int processId;
/**
* SHELL result string
*/
private String resultString;
/**
* other resource manager appId , for example : YARN etc
*/
private String appIds;
/**
* process
*/
private Process process;
/**
* cancel
*/
private volatile boolean cancel = false;
/**
* exit code
*/
private volatile int exitStatusCode = -1;
private TaskRunStatus status;
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public boolean isCancel() {
return cancel;
}
public void setCancel(boolean cancel) {
this.cancel = cancel;
}
public int getExitStatusCode() {
return exitStatusCode;
}
public void setExitStatusCode(int exitStatusCode) {
this.exitStatusCode = exitStatusCode;
}
public Process getProcess() {
return process;
}
public void setProcess(Process process) {
this.process = process;
}
public TaskRunStatus getStatus() {
return status;
}
public void setStatus(TaskRunStatus status) {
this.status = status;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
public enum TaskRunStatus {
SUCCESS,
FAIL_AND_NEED_KILL,
FAIL;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* thread utils
*/
public class ThreadUtils {
private ThreadUtils() {
throw new IllegalStateException("Utility class");
}
private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
private static final int STACK_DEPTH = 20;
/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*
* @param prefix prefix
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix) {
ThreadFactory threadFactory = namedThreadFactory(prefix);
return ((ThreadPoolExecutor) Executors.newCachedThreadPool(threadFactory));
}
/**
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
*
* @param prefix prefix
* @return ThreadFactory
*/
private static ThreadFactory namedThreadFactory(String prefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build();
}
/**
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
*
* @param prefix prefix
* @param maxThreadNumber maxThreadNumber
* @param keepAliveSeconds keepAliveSeconds
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix,
int maxThreadNumber,
int keepAliveSeconds) {
ThreadFactory threadFactory = namedThreadFactory(prefix);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
// corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber,
// maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
maxThreadNumber,
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory);
threadPool.allowCoreThreadTimeOut(true);
return threadPool;
}
/**
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*
* @param nThreads nThreads
* @param prefix prefix
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonFixedThreadPool(int nThreads, String prefix) {
ThreadFactory threadFactory = namedThreadFactory(prefix);
return ((ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads, threadFactory));
}
/**
* Wrapper over newSingleThreadExecutor.
*
* @param threadName threadName
* @return ExecutorService
*/
public static ExecutorService newDaemonSingleThreadExecutor(String threadName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
return Executors.newSingleThreadExecutor(threadFactory);
}
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
/**
* Wrapper over ScheduledThreadPoolExecutor
*
* @param threadName threadName
* @param corePoolSize corePoolSize
* @return ScheduledExecutorService
*/
public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) {
return newThreadScheduledExecutor(threadName, corePoolSize, true);
}
/**
* Wrapper over ScheduledThreadPoolExecutor
*
* @param threadName threadName
* @param corePoolSize corePoolSize
* @param isDaemon isDaemon
* @return ScheduledThreadPoolExecutor
*/
public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(isDaemon)
.setNameFormat(threadName)
.build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true);
return executor;
}
/**
* get thread info
*
* @param t t
* @return thread info
*/
public static ThreadInfo getThreadInfo(Thread t) {
long tid = t.getId();
return threadBean.getThreadInfo(tid, STACK_DEPTH);
}
/**
* Format the given ThreadInfo object as a String.
*
* @param threadInfo threadInfo
* @param indent indent
* @return threadInfo
*/
public static String formatThreadInfo(ThreadInfo threadInfo, String indent) {
StringBuilder sb = new StringBuilder();
appendThreadInfo(sb, threadInfo, indent);
return sb.toString();
}
/**
* Print all of the thread's information and stack traces.
*
* @param sb StringBuilder
* @param info ThreadInfo
* @param indent indent
*/
public static void appendThreadInfo(StringBuilder sb,
ThreadInfo info,
String indent) {
boolean contention = threadBean.isThreadContentionMonitoringEnabled();
if (info == null) {
sb.append(indent).append("Inactive (perhaps exited while monitoring was done)\n");
return;
}
String taskName = getTaskName(info.getThreadId(), info.getThreadName());
sb.append(indent).append("Thread ").append(taskName).append(":\n");
Thread.State state = info.getThreadState();
sb.append(indent).append(" State: ").append(state).append("\n");
sb.append(indent).append(" Blocked count: ").append(info.getBlockedCount()).append("\n");
sb.append(indent).append(" Waited count: ").append(info.getWaitedCount()).append("\n");
if (contention) {
sb.append(indent).append(" Blocked time: ").append(info.getBlockedTime()).append("\n");
sb.append(indent).append(" Waited time: ").append(info.getWaitedTime()).append("\n");
}
if (state == Thread.State.WAITING) {
sb.append(indent).append(" Waiting on ").append(info.getLockName()).append("\n");
} else if (state == Thread.State.BLOCKED) {
sb.append(indent).append(" Blocked on ").append(info.getLockName()).append("\n");
sb.append(indent).append(" Blocked by ").append(
getTaskName(info.getLockOwnerId(), info.getLockOwnerName())).append("\n");
}
sb.append(indent).append(" Stack:").append("\n");
for (StackTraceElement frame : info.getStackTrace()) {
sb.append(indent).append(" ").append(frame.toString()).append("\n");
}
}
/**
* getTaskName
*
* @param id id
* @param name name
* @return task name
*/
private static String getTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
}
return id + " (" + name + ")";
}
/**
* sleep
*
* @param millis millis
*/
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
package org.apache.dolphinscheduler.task.plugin.api;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TaskTest {
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-flink</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.ArgsUtils;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* flink args utils
*/
public class FlinkArgsUtils {
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
/**
* build args
*
* @param param flink parameters
* @return argument list
*/
public static List<String> buildArgs(FlinkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(FlinkConstants.FLINK_RUN_MODE); //-m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager > 0) { //-yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}
String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
}
int parallelism = param.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
public class FlinkConstants {
private FlinkConstants() {
throw new IllegalStateException("Utility class");
}
/**
* flink
*/
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
*/
private FlinkParameters flinkParameters;
private String command;
private TaskRequest flinkRequest;
public FlinkTask(TaskRequest taskRequest, Logger logger) {
super(taskRequest, logger);
this.flinkRequest = taskRequest;
}
@Override
public String getPreScript() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
return String.join(" ", args);
}
@Override
public void setCommand(String command) {
this.command = command;
}
@Override
public void init() {
logger.info("flink task params {}", flinkRequest.getTaskParams());
flinkParameters = JSONUtils.parseObject(flinkRequest.getTaskParams(), FlinkParameters.class);
if (!flinkParameters.checkParameters()) {
throw new TaskException("flink task params is not valid");
}
}
/**
* create command
*
* @return command
*/
@Override
protected String getCommand() {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
//when update resource maybe has error ,也许也可以交给上层去做控制 需要看资源是否可以抽象为共性 目前来讲我认为是可以的
resourceName = mainJar.getResourceName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.slf4j.Logger;
public class FlinkTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public FlinkTask createTask(TaskRequest taskRequest, Logger logger) {
return new FlinkTask(taskRequest, logger);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class FlinkTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new FlinkTaskChannel();
}
@Override
public String getName() {
return "Flink";
}
@Override
public List<PluginParams> getParams() {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
/**
* support program types
*/
public enum ProgramType {
/**
* 0 JAVA,1 SCALA,2 PYTHON
*/
JAVA,
SCALA,
PYTHON
}
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册