未验证 提交 03e29b93 编写于 作者: Q qiaozhanwei 提交者: GitHub

1,ZookeeperRegister use common.properties zookeeperRoot path 2,api start...

1,ZookeeperRegister use common.properties zookeeperRoot path 2,api start exclude org.apache.dolphinscheduler.server.* (#2307)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

* master fault tolerant bug modify

* UT add pom.xml

* timing online  modify

* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem

* TaskExecutionContext set host

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* add UT in pom.xml

* revert dolphinscheduler-daemon.sh

* ZookeeperRegister use common.properties zookeeperRoot path

* api start exclude org.apache.dolphinscheduler.server.*

* ZookeeperRegister use common.properties zookeeperRoot path

* 1,api start load server filter
2,SHELL task exitStatusCode modify

* java doc error modify

* java doc error modify

* remove todo
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 ace907e7
......@@ -21,11 +21,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@SpringBootApplication
@ServletComponentScan
@ComponentScan("org.apache.dolphinscheduler")
@ComponentScan(basePackages = {"org.apache.dolphinscheduler"},
excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX,
pattern = "org.apache.dolphinscheduler.server.*"))
public class ApiApplicationServer extends SpringBootServletInitializer {
public static void main(String[] args) {
......
......@@ -281,7 +281,7 @@ public class TaskNode {
/**
* get task time out parameter
* @return
* @return task time out parameter
*/
public TaskTimeoutParameter getTaskTimeoutParameter() {
if(StringUtils.isNotEmpty(this.getTimeout())){
......
......@@ -40,7 +40,7 @@ public abstract class AbstractParameters implements IParameters {
/**
* get local parameters list
* @return
* @return Property list
*/
public List<Property> getLocalParams() {
return localParams;
......@@ -52,7 +52,7 @@ public abstract class AbstractParameters implements IParameters {
/**
* get local parameters map
* @return
* @return parameters map
*/
public Map<String,Property> getLocalParametersMap() {
if (localParams != null) {
......
......@@ -25,7 +25,7 @@ public interface IParameters {
/**
* check parameters is valid
*
* @return
* @return result
*/
boolean checkParameters();
......
......@@ -71,7 +71,7 @@ public class ThreadPoolExecutors {
* Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread.
* If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached,
* the task is handled by the current RejectedExecutionHandler.
* @param event
* @param event event
*/
public void execute(final Runnable event) {
Executor executor = getExecutor();
......
......@@ -33,10 +33,11 @@ public class ThreadUtils {
private static final int STACK_DEPTH = 20;
/**
Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
* @param prefix
* @return
*
* @param prefix prefix
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix){
ThreadFactory threadFactory = namedThreadFactory(prefix);
......@@ -45,8 +46,8 @@ public class ThreadUtils {
/**
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
* @param prefix
* @return
* @param prefix prefix
* @return ThreadFactory
*/
private static ThreadFactory namedThreadFactory(String prefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build();
......@@ -56,10 +57,10 @@ public class ThreadUtils {
/**
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
* @param prefix
* @param maxThreadNumber
* @param keepAliveSeconds
* @return
* @param prefix prefix
* @param maxThreadNumber maxThreadNumber
* @param keepAliveSeconds keepAliveSeconds
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix ,
int maxThreadNumber,
......@@ -82,9 +83,9 @@ public class ThreadUtils {
/**
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
* @param nThreads
* @param prefix
* @return
* @param nThreads nThreads
* @param prefix prefix
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor newDaemonFixedThreadPool(int nThreads , String prefix){
ThreadFactory threadFactory = namedThreadFactory(prefix);
......@@ -93,8 +94,8 @@ public class ThreadUtils {
/**
* Wrapper over newSingleThreadExecutor.
* @param threadName
* @return
* @param threadName threadName
* @return ExecutorService
*/
public static ExecutorService newDaemonSingleThreadExecutor(String threadName){
ThreadFactory threadFactory = new ThreadFactoryBuilder()
......@@ -106,22 +107,23 @@ public class ThreadUtils {
/**
* Wrapper over newDaemonFixedThreadExecutor.
* @param threadName
* @param threadsNum
* @return
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName,int threadsNum){
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
return Executors.newFixedThreadPool(threadsNum,threadFactory);
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
/**
* Wrapper over ScheduledThreadPoolExecutor
* @param corePoolSize
* @return
* @param threadName threadName
* @param corePoolSize corePoolSize
* @return ScheduledExecutorService
*/
public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
......@@ -136,6 +138,11 @@ public class ThreadUtils {
}
/**
* get thread info
* @param t t
* @return ThreadInfo
*/
public static ThreadInfo getThreadInfo(Thread t) {
long tid = t.getId();
return threadBean.getThreadInfo(tid, STACK_DEPTH);
......@@ -144,7 +151,9 @@ public class ThreadUtils {
/**
* Format the given ThreadInfo object as a String.
* @param indent a prefix for each line, used for nested indentation
* @param threadInfo threadInfo
* @param indent indent
* @return threadInfo
*/
public static String formatThreadInfo(ThreadInfo threadInfo, String indent) {
StringBuilder sb = new StringBuilder();
......@@ -156,9 +165,9 @@ public class ThreadUtils {
/**
* Print all of the thread's information and stack traces.
*
* @param sb
* @param info
* @param indent
* @param sb StringBuilder
* @param info ThreadInfo
* @param indent indent
*/
public static void appendThreadInfo(StringBuilder sb,
ThreadInfo info,
......@@ -193,6 +202,12 @@ public class ThreadUtils {
}
}
/**
* getTaskName
* @param id id
* @param name name
* @return task name
*/
private static String getTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
......
......@@ -49,7 +49,7 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH,"/dolphinscheduler");
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
private static volatile HadoopUtils instance = new HadoopUtils();
private static volatile Configuration configuration;
......@@ -380,11 +380,8 @@ public class HadoopUtils implements Closeable {
}
/**
*
* haddop resourcemanager enabled or not
*
* @return true if haddop resourcemanager enabled
* @throws IOException errors
* hadoop resourcemanager enabled or not
* @return result
*/
public boolean isYarnEnabled() {
return yarnEnabled;
......@@ -429,7 +426,7 @@ public class HadoopUtils implements Closeable {
}
/**
*
* get data hdfs path
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
......
......@@ -416,6 +416,8 @@ public class OSUtils {
/**
* check memory and cpu usage
* @param systemCpuLoad systemCpuLoad
* @param systemReservedMemory systemReservedMemory
* @return check memory and cpu usage
*/
public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory){
......
......@@ -34,10 +34,9 @@ public final class Preconditions {
* Ensures that the given object reference is not null.
* Upon violation, a {@code NullPointerException} with no message is thrown.
*
* @param reference The object reference
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
* @param reference reference
* @param <T> T
* @return T
*/
public static <T> T checkNotNull(T reference) {
if (reference == null) {
......@@ -49,12 +48,10 @@ public final class Preconditions {
/**
* Ensures that the given object reference is not null.
* Upon violation, a {@code NullPointerException} with the given message is thrown.
*
* @param reference The object reference
* @param errorMessage The message for the {@code NullPointerException} that is thrown if the check fails.
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
* @param reference reference
* @param errorMessage errorMessage
* @param <T> T
* @return T
*/
public static <T> T checkNotNull(T reference, String errorMessage) {
if (reference == null) {
......@@ -78,9 +75,8 @@ public final class Preconditions {
* @param errorMessageArgs The arguments for the error message, to be inserted into the
* message template for the {@code %s} placeholders.
*
* @param <T>
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
*/
public static <T> T checkNotNull(T reference,
String errorMessageTemplate,
......
......@@ -89,45 +89,6 @@ public class ResInfo {
}
/**
* get heart beat info
* @param now now
* @return heart beat info
*/
public static String getHeartBeatInfo(Date now){
return buildHeartbeatForZKInfo(OSUtils.getHost(),
OSUtils.getProcessID(),
OSUtils.cpuUsage(),
OSUtils.memoryUsage(),
OSUtils.loadAverage(),
DateUtils.dateToString(now),
DateUtils.dateToString(now));
}
/**
* build heartbeat info for zk
* @param host host
* @param port port
* @param cpuUsage cpu usage
* @param memoryUsage memory usage
* @param loadAverage load average
* @param createTime create time
* @param lastHeartbeatTime last heartbeat time
* @return heartbeat info
*/
public static String buildHeartbeatForZKInfo(String host , int port ,
double cpuUsage , double memoryUsage,double loadAverage,
String createTime,String lastHeartbeatTime){
return host + Constants.COMMA + port + Constants.COMMA
+ cpuUsage + Constants.COMMA
+ memoryUsage + Constants.COMMA
+ loadAverage + Constants.COMMA
+ createTime + Constants.COMMA
+ lastHeartbeatTime;
}
/**
* parse heartbeat info for zk
* @param heartBeatInfo heartbeat info
......
......@@ -27,9 +27,9 @@ public class DependentDateUtils {
/**
* get last day interval list
* @param businessDate
* @param hourNumber
* @return
* @param businessDate businessDate
* @param hourNumber hourNumber
* @return DateInterval list
*/
public static List<DateInterval> getLastHoursInterval(Date businessDate, int hourNumber){
List<DateInterval> dateIntervals = new ArrayList<>();
......@@ -44,8 +44,8 @@ public class DependentDateUtils {
/**
* get today day interval list
* @param businessDate
* @return
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getTodayInterval(Date businessDate){
......@@ -59,9 +59,9 @@ public class DependentDateUtils {
/**
* get last day interval list
* @param businessDate
* @param someDay
* @return
* @param businessDate businessDate
* @param someDay someDay
* @return DateInterval list
*/
public static List<DateInterval> getLastDayInterval(Date businessDate, int someDay){
......@@ -78,8 +78,8 @@ public class DependentDateUtils {
/**
* get interval between this month first day and businessDate
* @param businessDate
* @return
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getThisMonthInterval(Date businessDate) {
Date firstDay = DateUtils.getFirstDayOfMonth(businessDate);
......@@ -88,8 +88,8 @@ public class DependentDateUtils {
/**
* get interval between last month first day and last day
* @param businessDate
* @return
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getLastMonthInterval(Date businessDate) {
......@@ -102,11 +102,12 @@ public class DependentDateUtils {
/**
* get interval on first/last day of the last month
* @param businessDate
* @param isBeginDay
* @return
* @param businessDate businessDate
* @param isBeginDay isBeginDay
* @return DateInterval list
*/
public static List<DateInterval> getLastMonthBeginInterval(Date businessDate, boolean isBeginDay) {
public static List<DateInterval> getLastMonthBeginInterval(Date businessDate,
boolean isBeginDay) {
Date firstDayThisMonth = DateUtils.getFirstDayOfMonth(businessDate);
Date lastDay = DateUtils.getSomeDay(firstDayThisMonth, -1);
......@@ -120,8 +121,8 @@ public class DependentDateUtils {
/**
* get interval between monday to businessDate of this week
* @param businessDate
* @return
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getThisWeekInterval(Date businessDate) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
......@@ -131,8 +132,8 @@ public class DependentDateUtils {
/**
* get interval between monday to sunday of last week
* default set monday the first day of week
* @param businessDate
* @return
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getLastWeekInterval(Date businessDate) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
......@@ -144,9 +145,9 @@ public class DependentDateUtils {
/**
* get interval on the day of last week
* default set monday the first day of week
* @param businessDate
* @param businessDate businessDate
* @param dayOfWeek monday:1,tuesday:2,wednesday:3,thursday:4,friday:5,saturday:6,sunday:7
* @return
* @return DateInterval list
*/
public static List<DateInterval> getLastWeekOneDayInterval(Date businessDate, int dayOfWeek) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
......@@ -156,6 +157,12 @@ public class DependentDateUtils {
return getDateIntervalListBetweenTwoDates(destDay, destDay);
}
/**
* get date interval list between two dates
* @param firstDay firstDay
* @param lastDay lastDay
* @return DateInterval list
*/
public static List<DateInterval> getDateIntervalListBetweenTwoDates(Date firstDay, Date lastDay) {
List<DateInterval> dateIntervals = new ArrayList<>();
while(!firstDay.after(lastDay)){
......
......@@ -37,17 +37,20 @@ public class PlaceholderUtils {
* The suffix of the position to be replaced
*/
public static final String placeholderSuffix = "}";
/**
* Replaces all placeholders of format {@code ${name}} with the value returned
* from the supplied {@link PropertyPlaceholderHelper.PlaceholderResolver}.
*
* @param value the value containing the placeholders to be replaced
* @param paramsMap placeholder data dictionary
* @param value the value containing the placeholders to be replaced
* @param paramsMap placeholder data dictionary
* @param ignoreUnresolvablePlaceholders ignoreUnresolvablePlaceholders
* @return the supplied value with placeholders replaced inline
*/
public static String replacePlaceholders(String value, Map<String, String> paramsMap, boolean ignoreUnresolvablePlaceholders) {
public static String replacePlaceholders(String value,
Map<String, String> paramsMap,
boolean ignoreUnresolvablePlaceholders) {
//replacement tool, parameter key will be replaced by value,if can't match , will throw an exception
PropertyPlaceholderHelper strictHelper = getPropertyPlaceholderHelper(false);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Date;
import org.apache.dolphinscheduler.common.model.Server;
public class ResInfoTest {
@Test
public void testGetHeartBeatInfo() {
String info = ResInfo.getHeartBeatInfo(new Date());
Assert.assertEquals(7, info.split(",").length);
}
@Test
public void testParseHeartbeatForZKInfo() {
//normal info
String info = ResInfo.getHeartBeatInfo(new Date());
Server s = ResInfo.parseHeartbeatForZKInfo(info);
Assert.assertNotNull(s);
Assert.assertNotNull(s.getResInfo());
//null param
s = ResInfo.parseHeartbeatForZKInfo(null);
Assert.assertNull(s);
}
}
......@@ -12,7 +12,6 @@
<artifactId>dolphinscheduler-remote</artifactId>
<name>dolphinscheduler-remote</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
......
......@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
......@@ -103,7 +102,7 @@ public class TaskUpdateQueueConsumer extends Thread{
/**
* TODO dispatch task
* dispatch task
*
* @param taskInstanceId taskInstanceId
* @return result
......
......@@ -152,7 +152,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* TODO dispatcht task
* dispatcht task
* @param taskInstance taskInstance
* @return whether submit task success
*/
......
......@@ -405,7 +405,7 @@ public class MasterExecThread implements Runnable {
}
/**
* TODO submit task to execute
* submit task to execute
* @param taskInstance task instance
* @return TaskInstance
*/
......@@ -910,7 +910,7 @@ public class MasterExecThread implements Runnable {
logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(), task.getState().toString());
//TODO node success , post node submit
// node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
......
......@@ -85,7 +85,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
private Boolean alreadyKilled = false;
/**
* TODO submit task instance and wait complete
* submit task instance and wait complete
*
* @return true is task quit is true
*/
......@@ -108,7 +108,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
/**
* TODO polling db
* polling db
*
* wait task quit
* @return true if task quit success
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -35,33 +36,36 @@ public class ZookeeperRegistryCenter implements InitializingBean {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* namespace
*/
public static final String NAMESPACE = "/dolphinscheduler";
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
@Autowired
private ZookeeperConfig zookeeperConfig;
/**
* nodes namespace
*/
public static final String NODES = NAMESPACE + "/nodes";
public String NODES;
/**
* master path
*/
public static final String MASTER_PATH = NODES + "/master";
public String MASTER_PATH;
/**
* worker path
*/
public static final String WORKER_PATH = NODES + "/worker";
public String WORKER_PATH;
public static final String EMPTY = "";
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
public final String EMPTY = "";
@Override
public void afterPropertiesSet() throws Exception {
NODES = zookeeperConfig.getDsRoot() + "/nodes";
MASTER_PATH = NODES + "/master";
WORKER_PATH = NODES + "/worker";
init();
}
......
......@@ -178,16 +178,20 @@ public abstract class AbstractCommandExecutor {
List<String> appIds = getAppIds(taskExecutionContext.getLogPath());
result.setAppIds(String.join(Constants.COMMA, appIds));
// SHELL task state
result.setExitStatusCode(process.exitValue());
// if yarn task , yarn state is final state
result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
if (process.exitValue() == 0){
result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
}
} else {
logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode());
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
// SHELL task state
result.setExitStatusCode(process.exitValue());
return result;
}
......
......@@ -54,7 +54,6 @@ public class MasterRegistryTest {
public void testRegistry() throws InterruptedException {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
Assert.assertEquals(ZookeeperRegistryCenter.MASTER_PATH, masterPath);
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
......
......@@ -56,7 +56,6 @@ public class WorkerRegistryTest {
public void testRegistry() throws InterruptedException {
workerRegistry.registry();
String workerPath = zookeeperRegistryCenter.getWorkerPath();
Assert.assertEquals(ZookeeperRegistryCenter.WORKER_PATH, workerPath);
Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim());
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort());
TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
......
......@@ -167,7 +167,6 @@ public class DataxTaskTest {
@Test
public void testHandle()
throws Exception {
//TODO Test goes here...
}
/**
......
......@@ -20,7 +20,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.quartz.*;
......@@ -37,7 +36,7 @@ import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
......@@ -72,17 +71,15 @@ public class QuartzExecutors {
*/
private static Configuration conf;
static {
private QuartzExecutors() {
try {
conf = new PropertiesConfiguration(Constants.QUARTZ_PROPERTIES_PATH);
conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.warn("not loaded quartz configuration file, will used default value",e);
}
}
private QuartzExecutors() {
}
/**
* thread safe and performance promote
* @return instance of Quartz Executors
......@@ -112,27 +109,27 @@ public class QuartzExecutors {
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
Properties properties = new Properties();
String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME);
if (dataSourceDriverClass.contains(ORG_POSTGRESQL_DRIVER)){
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
} else {
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
}
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, Constants.QUARTZ_INSTANCENAME));
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, Constants.QUARTZ_INSTANCEID));
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT, Constants.QUARTZ_THREADCOUNT));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY, Constants.QUARTZ_THREADPRIORITY));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX, Constants.QUARTZ_TABLE_PREFIX));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, Constants.QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, Constants.QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE, Constants.QUARTZ_DATASOURCE));
properties.setProperty(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
......@@ -308,7 +305,7 @@ public class QuartzExecutors {
*/
public static String buildJobName(int processId) {
StringBuilder sb = new StringBuilder(30);
sb.append(Constants.QUARTZ_JOB_PRIFIX).append(Constants.UNDERLINE).append(processId);
sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
return sb.toString();
}
......@@ -319,7 +316,7 @@ public class QuartzExecutors {
*/
public static String buildJobGroupName(int projectId) {
StringBuilder sb = new StringBuilder(30);
sb.append(Constants.QUARTZ_JOB_GROUP_PRIFIX).append(Constants.UNDERLINE).append(projectId);
sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
return sb.toString();
}
......@@ -333,9 +330,9 @@ public class QuartzExecutors {
*/
public static Map<String, Object> buildDataMap(int projectId, int scheduleId, Schedule schedule) {
Map<String, Object> dataMap = new HashMap<>(3);
dataMap.put(Constants.PROJECT_ID, projectId);
dataMap.put(Constants.SCHEDULE_ID, scheduleId);
dataMap.put(Constants.SCHEDULE, JSONUtils.toJson(schedule));
dataMap.put(PROJECT_ID, projectId);
dataMap.put(SCHEDULE_ID, scheduleId);
dataMap.put(SCHEDULE, JSONUtils.toJson(schedule));
return dataMap;
}
......
......@@ -32,13 +32,10 @@ import static org.apache.dolphinscheduler.common.Constants.*;
*/
@Service
public class TaskUpdateQueueImpl implements TaskUpdateQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueImpl.class);
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 100;
private static final Integer QUEUE_MAX_SIZE = 3000;
/**
* queue
......@@ -53,12 +50,6 @@ public class TaskUpdateQueueImpl implements TaskUpdateQueue {
*/
@Override
public void put(String taskPriorityInfo) throws Exception {
if (QUEUE_MAX_SIZE.equals(queue.size())){
//TODO need persist db , then load from db to queue when queue size is zero
logger.error("queue is full...");
return;
}
queue.put(taskPriorityInfo);
}
......
......@@ -20,6 +20,7 @@
#============================================================================
#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
#org.quartz.scheduler.instanceName = DolphinScheduler
#org.quartz.scheduler.instanceId = AUTO
#org.quartz.scheduler.makeSchedulerThreadDaemon = true
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册