diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java index 55c0f9ffbf601f0dd0e068d70cee49e76a03a2a2..94d95b3c26de46ca2bd521578956d29ae37ba620 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java @@ -20,7 +20,9 @@ package org.apache.dolphinscheduler.alert.utils; * constants */ public class Constants { - + private Constants() { + throw new IllegalStateException("Constants class"); + } /** * alert properties path */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index 29a16447e18f4afcf870f985c8ea6cd0d8c90874..6438e206f8ed1e1bcb55e4b95a810276e1b8398b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -119,7 +119,7 @@ public class ResourcesService extends BaseService { putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); return result; } - if (file.getSize() > Constants.maxFileSize) { + if (file.getSize() > Constants.MAX_FILE_SIZE) { logger.error("file size is too large: {}", file.getOriginalFilename()); putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index 7099378b1d3fedc6bc051c76c3d127ce254a3e62..c5c702404da2f46c4e76e09f986a2bb3aec47497 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -35,7 +35,9 @@ import java.util.regex.Pattern; */ public class CheckUtils { - + private CheckUtils() { + throw new IllegalStateException("CheckUtils class"); + } /** * check username * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index b0b4319fb4d6d3957f93ec77c20dd0af71723e74..24a0ed31d65b54b91cd27416ab9fcd8f9f524567 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -17,8 +17,26 @@ package org.apache.dolphinscheduler.api.utils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; +import org.apache.dolphinscheduler.common.task.http.HttpParameters; +import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; +import org.apache.dolphinscheduler.common.task.python.PythonParameters; +import org.apache.dolphinscheduler.common.task.shell.ShellParameters; +import org.apache.dolphinscheduler.common.task.spark.SparkParameters; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,6 +96,14 @@ public class CheckUtilsTest { } + @Test + public void testCheckOtherParams() { + assertFalse(CheckUtils.checkOtherParams(null)); + assertFalse(CheckUtils.checkOtherParams("")); + assertTrue(CheckUtils.checkOtherParams("xxx")); + assertFalse(CheckUtils.checkOtherParams("{}")); + assertFalse(CheckUtils.checkOtherParams("{\"key1\":111}")); + } /** * check passwd */ @@ -106,5 +132,90 @@ public class CheckUtilsTest { assertTrue(CheckUtils.checkPhone("17362537263")); } + @Test + public void testCheckTaskNodeParameters() { + + assertFalse(CheckUtils.checkTaskNodeParameters(null,null)); + assertFalse(CheckUtils.checkTaskNodeParameters(null,"unKnown")); + assertFalse(CheckUtils.checkTaskNodeParameters("unKnown","unKnown")); + assertFalse(CheckUtils.checkTaskNodeParameters("unKnown",null)); + + // sub SubProcessParameters + SubProcessParameters subProcessParameters = new SubProcessParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(subProcessParameters), TaskType.SUB_PROCESS.toString())); + subProcessParameters.setProcessDefinitionId(1234); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(subProcessParameters), TaskType.SUB_PROCESS.toString())); + + // ShellParameters + ShellParameters shellParameters = new ShellParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString())); + shellParameters.setRawScript(""); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString())); + shellParameters.setRawScript("sss"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString())); + + // ProcedureParameters + ProcedureParameters procedureParameters = new ProcedureParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(procedureParameters), TaskType.PROCEDURE.toString())); + procedureParameters.setDatasource(1); + procedureParameters.setType("xx"); + procedureParameters.setMethod("yy"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(procedureParameters), TaskType.PROCEDURE.toString())); + + // SqlParameters + SqlParameters sqlParameters = new SqlParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString())); + sqlParameters.setDatasource(1); + sqlParameters.setType("xx"); + sqlParameters.setSql("yy"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString())); + + // MapreduceParameters + MapreduceParameters mapreduceParameters = new MapreduceParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); + mapreduceParameters.setMainJar(new ResourceInfo()); + mapreduceParameters.setProgramType(ProgramType.JAVA); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); + + // SparkParameters + SparkParameters sparkParameters = new SparkParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sparkParameters), TaskType.SPARK.toString())); + sparkParameters.setMainJar(new ResourceInfo()); + sparkParameters.setProgramType(ProgramType.SCALA); + sparkParameters.setSparkVersion("1.1.1"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sparkParameters), TaskType.SPARK.toString())); + + // PythonParameters + PythonParameters pythonParameters = new PythonParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(pythonParameters), TaskType.PYTHON.toString())); + pythonParameters.setRawScript("ss"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(pythonParameters), TaskType.PYTHON.toString())); + + // DependentParameters + DependentParameters dependentParameters = new DependentParameters(); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dependentParameters), TaskType.DEPENDENT.toString())); + + // FlinkParameters + FlinkParameters flinkParameters = new FlinkParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(flinkParameters), TaskType.FLINK.toString())); + flinkParameters.setMainJar(new ResourceInfo()); + flinkParameters.setProgramType(ProgramType.JAVA); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(flinkParameters), TaskType.FLINK.toString())); + + // HTTP + HttpParameters httpParameters = new HttpParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(httpParameters), TaskType.HTTP.toString())); + httpParameters.setUrl("httpUrl"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(httpParameters), TaskType.HTTP.toString())); + + // DataxParameters + DataxParameters dataxParameters = new DataxParameters(); + assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString())); + dataxParameters.setDataSource(111); + dataxParameters.setDataTarget(333); + dataxParameters.setSql("sql"); + dataxParameters.setTargetTable("tar"); + assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString())); + } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 9cb1a5821e2e9fcb035bc453e6045399ded37111..73125f49266521c17cbcf6f7a8fe83b1562157db 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -25,7 +25,9 @@ import java.util.regex.Pattern; * Constants */ public final class Constants { - + private Constants() { + throw new IllegalStateException("Constants class"); + } /** * common properties path */ @@ -124,49 +126,41 @@ public final class Constants { /** * MasterServer directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters"; /** * WorkerServer directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers"; /** * all servers directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "/dead-servers"; /** * MasterServer lock directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters"; /** * WorkerServer lock directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "/lock/workers"; /** * MasterServer failover directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; /** * WorkerServer failover directory registered in zookeeper */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; /** * MasterServer startup failover runing and fault tolerance process */ - //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; /** @@ -354,87 +348,87 @@ public final class Constants { /** * heartbeat threads number */ - public static final int defaulWorkerHeartbeatThreadNum = 1; + public static final int DEFAUL_WORKER_HEARTBEAT_THREAD_NUM = 1; /** * heartbeat interval */ - public static final int defaultWorkerHeartbeatInterval = 60; + public static final int DEFAULT_WORKER_HEARTBEAT_INTERVAL = 60; /** * worker fetch task number */ - public static final int defaultWorkerFetchTaskNum = 1; + public static final int DEFAULT_WORKER_FETCH_TASK_NUM = 1; /** * worker execute threads number */ - public static final int defaultWorkerExecThreadNum = 10; + public static final int DEFAULT_WORKER_EXEC_THREAD_NUM = 10; /** * master cpu load */ - public static final int defaultMasterCpuLoad = Runtime.getRuntime().availableProcessors() * 2; + public static final int DEFAULT_MASTER_CPU_LOAD = Runtime.getRuntime().availableProcessors() * 2; /** * master reserved memory */ - public static final double defaultMasterReservedMemory = OSUtils.totalMemorySize() / 10; + public static final double DEFAULT_MASTER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10; /** * worker cpu load */ - public static final int defaultWorkerCpuLoad = Runtime.getRuntime().availableProcessors() * 2; + public static final int DEFAULT_WORKER_CPU_LOAD = Runtime.getRuntime().availableProcessors() * 2; /** * worker reserved memory */ - public static final double defaultWorkerReservedMemory = OSUtils.totalMemorySize() / 10; + public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10; /** * master execute threads number */ - public static final int defaultMasterExecThreadNum = 100; + public static final int DEFAULT_MASTER_EXEC_THREAD_NUM = 100; /** * default master concurrent task execute num */ - public static final int defaultMasterTaskExecNum = 20; + public static final int DEFAULT_MASTER_TASK_EXEC_NUM = 20; /** * default log cache rows num,output when reach the number */ - public static final int defaultLogRowsNum = 4 * 16; + public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16; /** * log flush interval,output when reach the interval */ - public static final int defaultLogFlushInterval = 1000; + public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000; /** * default master heartbeat thread number */ - public static final int defaulMasterHeartbeatThreadNum = 1; + public static final int DEFAULT_MASTER_HEARTBEAT_THREAD_NUM = 1; /** * default master heartbeat interval */ - public static final int defaultMasterHeartbeatInterval = 60; + public static final int DEFAULT_MASTER_HEARTBEAT_INTERVAL = 60; /** * default master commit retry times */ - public static final int defaultMasterCommitRetryTimes = 5; + public static final int DEFAULT_MASTER_COMMIT_RETRY_TIMES = 5; /** * default master commit retry interval */ - public static final int defaultMasterCommitRetryInterval = 3000; + public static final int DEFAULT_MASTER_COMMIT_RETRY_INTERVAL = 3000; /** * time unit secong to minutes @@ -474,9 +468,9 @@ public final class Constants { public static final String THREAD_NAME_MASTER_SERVER = "Master-Server"; public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server"; - public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; + public static final String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; - public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; + public static final String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; /** @@ -874,7 +868,7 @@ public final class Constants { public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; - public static final String FLINK_detach = "-d"; + public static final String FLINK_DETACH = "-d"; public static final String FLINK_MAIN_CLASS = "-c"; @@ -989,7 +983,7 @@ public final class Constants { * session timeout */ public static final int SESSION_TIME_OUT = 7200; - public static final int maxFileSize = 1024 * 1024 * 1024; + public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024; public static final String UDF = "UDF"; public static final String CLASS = "class"; public static final String RECEIVERS = "receivers"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java index fe76497ff87f913afd00338dd59f19b10b2e27e5..48550c31cc7113fab9ae58844ce2a4ec8bede5e0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java @@ -38,7 +38,7 @@ public class ClickHouseDataSource extends BaseDataSource { @Override public String getJdbcUrl() { String jdbcUrl = getAddress(); - if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) { + if (jdbcUrl.lastIndexOf('/') != (jdbcUrl.length() - 1)) { jdbcUrl += "/"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java index 21e3ce224840909342c42f95ca6c98f0c4cad7f0..c7784de8ddc997d056c651ac40df1c3e2d9eb624 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java @@ -38,7 +38,7 @@ public class SubProcessParameters extends AbstractParameters { @Override public boolean checkParameters() { - return this.processDefinitionId != 0; + return this.processDefinitionId != null && this.processDefinitionId != 0; } @Override diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java index 9c02111c3695c33710378ced75051e423cc9821d..22c58640ccd314f6bbbe6d050fa2009560a997a8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java @@ -37,6 +37,9 @@ import java.util.*; */ public class CollectionUtils { + private CollectionUtils() { + throw new IllegalStateException("CollectionUtils class"); + } /** * Returns a new {@link Collection} containing a minus a subset of * b. Only the elements of b that satisfy the predicate @@ -139,26 +142,6 @@ public class CollectionUtils { cardinalityB = CollectionUtils.getCardinalityMap(b); } - /** - * Returns the maximum frequency of an object. - * - * @param obj the object - * @return the maximum frequency of the object - */ - private int max(final Object obj) { - return Math.max(freqA(obj), freqB(obj)); - } - - /** - * Returns the minimum frequency of an object. - * - * @param obj the object - * @return the minimum frequency of the object - */ - private int min(final Object obj) { - return Math.min(freqA(obj), freqB(obj)); - } - /** * Returns the frequency of this object in collection A. * @@ -225,7 +208,7 @@ public class CollectionUtils { if (a.size() != b.size()) { return false; } - final CardinalityHelper helper = new CardinalityHelper(a, b); + final CardinalityHelper helper = new CardinalityHelper<>(a, b); if (helper.cardinalityA.size() != helper.cardinalityB.size()) { return false; } @@ -250,7 +233,7 @@ public class CollectionUtils { * @return the populated cardinality map */ public static Map getCardinalityMap(final Iterable coll) { - final Map count = new HashMap(); + final Map count = new HashMap<>(); for (final O obj : coll) { count.put(obj, count.getOrDefault(obj, 0) + 1); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java index 842c74edc09d5b176c58c5dfb817ac0ac682062d..b4b89bfe2664c205a7e0646c17bc6bfe184ce456 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java @@ -20,8 +20,6 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; @@ -29,8 +27,9 @@ import java.io.File; * common utils */ public class CommonUtils { - - private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class); + private CommonUtils() { + throw new IllegalStateException("CommonUtils class"); + } /** * @return get the path of system environment variables diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index 0c061fc0ba1e5d272124e7e07f92204431c37aa4..c8d33e3ef2b2c15886058e189fd7506e647b05d0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -370,25 +370,14 @@ public class OSUtils { double systemCpuLoad; double systemReservedMemory; - if(isMaster){ - systemCpuLoad = conf.getDouble(Constants.MASTER_MAX_CPULOAD_AVG, Constants.defaultMasterCpuLoad); - systemReservedMemory = conf.getDouble(Constants.MASTER_RESERVED_MEMORY, Constants.defaultMasterReservedMemory); + if(Boolean.TRUE.equals(isMaster)){ + systemCpuLoad = conf.getDouble(Constants.MASTER_MAX_CPULOAD_AVG, Constants.DEFAULT_MASTER_CPU_LOAD); + systemReservedMemory = conf.getDouble(Constants.MASTER_RESERVED_MEMORY, Constants.DEFAULT_MASTER_RESERVED_MEMORY); }else{ - systemCpuLoad = conf.getDouble(Constants.WORKER_MAX_CPULOAD_AVG, Constants.defaultWorkerCpuLoad); - systemReservedMemory = conf.getDouble(Constants.WORKER_RESERVED_MEMORY, Constants.defaultWorkerReservedMemory); - } - - // judging usage - double loadAverage = OSUtils.loadAverage(); - // - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); - - if(loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory){ - logger.warn("load or availablePhysicalMemorySize(G) is too high, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage); - return false; - }else{ - return true; + systemCpuLoad = conf.getDouble(Constants.WORKER_MAX_CPULOAD_AVG, Constants.DEFAULT_WORKER_CPU_LOAD); + systemReservedMemory = conf.getDouble(Constants.WORKER_RESERVED_MEMORY, Constants.DEFAULT_WORKER_RESERVED_MEMORY); } + return checkResource(systemCpuLoad,systemReservedMemory); } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java index 7321879ab80d3ad15cd031ce41ca7ee6739d7235..99685265e688cddd4b67b65da27f82423b3d859f 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java @@ -76,11 +76,11 @@ public class CollectionUtilsTest { a = CollectionUtils.stringToMap("a=b;c=d", null); Assert.assertTrue(a.isEmpty()); a = CollectionUtils.stringToMap("a=b;c=d;e=f", ";"); - Assert.assertEquals(a.size(), 3); + Assert.assertEquals(3, a.size()); a = CollectionUtils.stringToMap("a;b=f", ";"); Assert.assertTrue(a.isEmpty()); a = CollectionUtils.stringToMap("a=b;c=d;e=f;", ";", "test"); - Assert.assertEquals(a.size(), 3); + Assert.assertEquals(3, a.size()); Assert.assertNotNull(a.get("testa")); } @@ -91,14 +91,14 @@ public class CollectionUtilsTest { originList.add(1); originList.add(2); List> ret = CollectionUtils.getListByExclusion(originList, null); - Assert.assertEquals(ret.size(), 2); + Assert.assertEquals(2, ret.size()); ret = CollectionUtils.getListByExclusion(originList, new HashSet<>()); - Assert.assertEquals(ret.size(), 2); + Assert.assertEquals(2, ret.size()); Assert.assertFalse(ret.get(0).isEmpty()); Set exclusion = new HashSet<>(); exclusion.add(Constants.CLASS); ret = CollectionUtils.getListByExclusion(originList, exclusion); - Assert.assertEquals(ret.size(), 2); + Assert.assertEquals(2, ret.size()); Assert.assertTrue(ret.get(0).isEmpty()); } @@ -108,5 +108,38 @@ public class CollectionUtilsTest { Assert.assertFalse(CollectionUtils.isNotEmpty(list)); Assert.assertFalse(CollectionUtils.isNotEmpty(null)); } + @Test + public void isEmpty(){ + List list = new ArrayList<>(); + Assert.assertTrue(CollectionUtils.isEmpty(list)); + Assert.assertTrue(CollectionUtils.isEmpty(null)); + list.add(1); + Assert.assertFalse(CollectionUtils.isEmpty(list)); + } + @Test + public void isEqualCollection() { + List a = new ArrayList<>(); + a.add(1); + List b = new ArrayList<>(); + b.add(1); + Assert.assertTrue(CollectionUtils.isEqualCollection(a,b)); + b.add(2); + Assert.assertFalse(CollectionUtils.isEqualCollection(a,b)); + } + @Test + public void getCardinalityMap(){ + List a = new ArrayList<>(); + a.add(1); + a.add(2); + a.add(2); + a.add(3); + a.add(3); + a.add(3); + Map cardinalityMap = CollectionUtils.getCardinalityMap(a); + Assert.assertEquals(3, cardinalityMap.size()); + Assert.assertEquals(1, cardinalityMap.get(1).intValue()); + Assert.assertEquals(2, cardinalityMap.get(2).intValue()); + Assert.assertEquals(3, cardinalityMap.get(3).intValue()); + } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java index f38b9b4c3bf00a99438a9c5565f9db5f4528140a..42c9958810c91378b48ba6836c93ff833ee5d64c 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,24 +29,58 @@ import java.net.UnknownHostException; */ public class CommonUtilsTest { private static final Logger logger = LoggerFactory.getLogger(CommonUtilsTest.class); + @Test + public void getSystemEnvPath() { + logger.info(CommonUtils.getSystemEnvPath()); + Assert.assertTrue(true); + } + @Test + public void getQueueImplValue(){ + logger.info(CommonUtils.getQueueImplValue()); + Assert.assertTrue(true); + } + @Test + public void isDevelopMode() { + logger.info("develop mode: {}",CommonUtils.isDevelopMode()); + Assert.assertTrue(true); + } + @Test + public void getKerberosStartupState(){ + logger.info("kerberos startup state: {}",CommonUtils.getKerberosStartupState()); + Assert.assertTrue(true); + } + @Test + public void loadKerberosConf(){ + try { + CommonUtils.loadKerberosConf(); + Assert.assertTrue(true); + } catch (Exception e) { + Assert.fail("load Kerberos Conf failed"); + } + } + @Test public void getHdfsDataBasePath() { logger.info(HadoopUtils.getHdfsDataBasePath()); + Assert.assertTrue(true); } @Test public void getDownloadFilename() { logger.info(FileUtils.getDownloadFilename("a.txt")); + Assert.assertTrue(true); } @Test public void getUploadFilename() { logger.info(FileUtils.getUploadFilename("1234", "a.txt")); + Assert.assertTrue(true); } @Test public void getHdfsDir() { logger.info(HadoopUtils.getHdfsResDir("1234")); + Assert.assertTrue(true); } @Test @@ -57,5 +92,6 @@ public class CommonUtilsTest { } catch (UnknownHostException e) { e.printStackTrace(); } + Assert.assertTrue(true); } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index 391fb594f00d1ff1195460bcb485dcb5153b7f0f..5b23847ba3a410abbfd3606bff7c8084779568dd 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -16,10 +16,16 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.yetus.audience.InterfaceAudience; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.List; public class OSUtilsTest { @@ -31,4 +37,81 @@ public class OSUtilsTest { Assert.assertNotEquals("System user list should not be empty", userList.size(), 0); logger.info("OS user list : {}", userList.toString()); } + @Test + public void testOSMetric(){ + + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + Assert.assertTrue(availablePhysicalMemorySize > 0.0f); + double totalMemorySize = OSUtils.totalMemorySize(); + Assert.assertTrue(totalMemorySize > 0.0f); + double loadAverage = OSUtils.loadAverage(); + logger.info("loadAverage {}", loadAverage); + double memoryUsage = OSUtils.memoryUsage(); + Assert.assertTrue(memoryUsage > 0.0f); + double cpuUsage = OSUtils.cpuUsage(); + Assert.assertTrue(cpuUsage > 0.0f); + } + @Test + public void getGroup() { + if(OSUtils.isMacOS() || !OSUtils.isWindows()){ + try { + String group = OSUtils.getGroup(); + Assert.assertNotNull(group); + } catch (IOException e) { + Assert.fail("get group failed " + e.getMessage()); + } + } + } + @Test + public void exeCmd() { + if(OSUtils.isMacOS() || !OSUtils.isWindows()){ + try { + String result = OSUtils.exeCmd("echo helloWorld"); + Assert.assertEquals("helloWorld\n",result); + } catch (IOException e) { + Assert.fail("exeCmd " + e.getMessage()); + } + } + } + @Test + public void getProcessID(){ + int processId = OSUtils.getProcessID(); + Assert.assertNotEquals(0, processId); + } + @Test + public void getHost(){ + String host = OSUtils.getHost(); + Assert.assertNotNull(host); + Assert.assertNotEquals("", host); + } + @Test + public void checkResource(){ + boolean resource = OSUtils.checkResource(100,0); + Assert.assertTrue(resource); + resource = OSUtils.checkResource(0,Double.MAX_VALUE); + Assert.assertFalse(resource); + + Configuration configuration = new PropertiesConfiguration(); + + configuration.setProperty(Constants.MASTER_MAX_CPULOAD_AVG,100); + configuration.setProperty(Constants.MASTER_RESERVED_MEMORY,0); + resource = OSUtils.checkResource(configuration,true); + Assert.assertTrue(resource); + + configuration.setProperty(Constants.MASTER_MAX_CPULOAD_AVG,0); + configuration.setProperty(Constants.MASTER_RESERVED_MEMORY,Double.MAX_VALUE); + resource = OSUtils.checkResource(configuration,true); + Assert.assertFalse(resource); + + configuration.setProperty(Constants.WORKER_MAX_CPULOAD_AVG,100); + configuration.setProperty(Constants.WORKER_RESERVED_MEMORY,0); + resource = OSUtils.checkResource(configuration,false); + Assert.assertTrue(resource); + + configuration.setProperty(Constants.WORKER_MAX_CPULOAD_AVG,0); + configuration.setProperty(Constants.WORKER_RESERVED_MEMORY,Double.MAX_VALUE); + resource = OSUtils.checkResource(configuration,false); + Assert.assertFalse(resource); + + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java index 6fdc233455cb73e2a0f90eebb2749985ade9b9dc..a3bc6a015005a2ab391b32903dfd6f8f953f610f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java @@ -20,8 +20,6 @@ import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.mybatisplus.core.MybatisConfiguration; import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.dolphinscheduler.common.Constants; import org.apache.ibatis.mapping.Environment; import org.apache.ibatis.session.SqlSession; @@ -31,8 +29,6 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; import org.mybatis.spring.SqlSessionTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Service; import javax.sql.DataSource; @@ -117,7 +113,6 @@ public class ConnectionFactory extends SpringConnectionFactory{ sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums"); sqlSessionFactory = sqlSessionFactoryBean.getObject(); - return sqlSessionFactory; } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 18b643513f2a13624a4c1bdf43e2f437a2929914..c358cab3f3f95f9b4547cb71e116d383da3774b1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -16,13 +16,10 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.CommandCount; -import com.baomidou.mybatisplus.core.conditions.Wrapper; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.toolkit.Constants; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; import java.util.Date; import java.util.List; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java index 8a9087a33c3e7971e311261f0e3798351a127588..2ad029064e92ead69c311e98bb3faa6150b9297f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java @@ -38,7 +38,9 @@ import static org.apache.dolphinscheduler.dao.utils.cron.CycleFactory.*; * cron utils */ public class CronUtils { - + private CronUtils() { + throw new IllegalStateException("CronUtils class"); + } private static final Logger logger = LoggerFactory.getLogger(CronUtils.class); @@ -169,7 +171,7 @@ public class CronUtils { cronExpression = parse2CronExpression(cron); }catch (ParseException e){ logger.error(e.getMessage(), e); - return Collections.EMPTY_LIST; + return Collections.emptyList(); } return getSelfFireDateList(startTime, endTime, cronExpression); } @@ -202,7 +204,7 @@ public class CronUtils { calendar.add(Calendar.DATE, 1); break; default: - logger.error("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name()); + logger.error("Dependent process definition's cycleEnum is {},not support!!", cycleEnum); break; } maxExpirationTime = calendar.getTime(); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java index 10906b42a3d7d9b1f46535e0cba88da22f7f6f5e..b2f52566fcaf2a6b6910a8db3de7d4d9422e6c31 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java @@ -25,7 +25,9 @@ import com.cronutils.model.field.expression.QuestionMark; * Crontab Cycle Tool Factory */ public class CycleFactory { - + private CycleFactory() { + throw new IllegalStateException("CycleFactory class"); + } /** * min * @param cron cron diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java index 1135cf20f52e640b7bb5c82dc722b91573dfec96..5ecc6620ddfa42de9e682ab954a3fee8c6f4253b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java @@ -61,7 +61,7 @@ public class CronUtilsTest { String cronAsString = cron.asString(); // 0 */5 * * * ? * Every five minutes(once every 5 minutes) - Assert.assertEquals(cronAsString, "0 */5 * * * ? *"); + Assert.assertEquals("0 */5 * * * ? *", cronAsString); } @@ -74,12 +74,12 @@ public class CronUtilsTest { String strCrontab = "0 1 2 3 * ? *"; Cron depCron = CronUtils.parse2Cron(strCrontab); - Assert.assertEquals(depCron.retrieve(CronFieldName.SECOND).getExpression().asString(), "0"); - Assert.assertEquals(depCron.retrieve(CronFieldName.MINUTE).getExpression().asString(), "1"); - Assert.assertEquals(depCron.retrieve(CronFieldName.HOUR).getExpression().asString(), "2"); - Assert.assertEquals(depCron.retrieve(CronFieldName.DAY_OF_MONTH).getExpression().asString(), "3"); - Assert.assertEquals(depCron.retrieve(CronFieldName.MONTH).getExpression().asString(), "*"); - Assert.assertEquals(depCron.retrieve(CronFieldName.YEAR).getExpression().asString(), "*"); + Assert.assertEquals("0", depCron.retrieve(CronFieldName.SECOND).getExpression().asString()); + Assert.assertEquals("1", depCron.retrieve(CronFieldName.MINUTE).getExpression().asString()); + Assert.assertEquals("2", depCron.retrieve(CronFieldName.HOUR).getExpression().asString()); + Assert.assertEquals("3", depCron.retrieve(CronFieldName.DAY_OF_MONTH).getExpression().asString()); + Assert.assertEquals("*", depCron.retrieve(CronFieldName.MONTH).getExpression().asString()); + Assert.assertEquals("*", depCron.retrieve(CronFieldName.YEAR).getExpression().asString()); } /** @@ -89,13 +89,13 @@ public class CronUtilsTest { @Test public void testScheduleType() throws ParseException { CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *")); - Assert.assertEquals(cycleEnum.name(), "MINUTE"); + Assert.assertEquals("MINUTE", cycleEnum.name()); CycleEnum cycleEnum2 = CronUtils.getMaxCycle("0 * * * * ? *"); - Assert.assertEquals(cycleEnum2.name(), "MINUTE"); + Assert.assertEquals("MINUTE", cycleEnum2.name()); CycleEnum cycleEnum3 = CronUtils.getMiniCycle(CronUtils.parse2Cron("0 * * * * ? *")); - Assert.assertEquals(cycleEnum3.name(), "MINUTE"); + Assert.assertEquals("MINUTE", cycleEnum3.name()); } /** @@ -164,6 +164,7 @@ public class CronUtilsTest { logger.info("can't get scheduleType"); } } + Assert.assertTrue(true); } @Test diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java index 57b002e18d3cd257908efd83e58f689da46a7660..5ba2936aaf364d15e4fa9e107c7dfd724e32678d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java @@ -32,6 +32,6 @@ public class ConnectionFactoryTest { @Test public void testConnection()throws Exception{ Connection connection = ConnectionFactory.getDataSource().getPooledConnection().getConnection(); - Assert.assertEquals(connection != null , true); + Assert.assertTrue(connection != null); } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 0647b9450ba3a763ca2c49ffa2706b3ac7e85d86..ab4ba5c8ab5cb8c7307cbeab52fcb6cce2f676c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -111,7 +111,7 @@ public class MasterServer implements IStoppable { masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); + heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM); // heartbeat thread implement Runnable heartBeatThread = heartBeatThread(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index ab34ddfc2b610bc3e410371be0ac213b44dcfb28..4c33ef8db25c2862c8692a32011cfbdc5c3ce658 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -87,7 +87,7 @@ public class FlinkArgsUtils { args.add(taskManagerMemory); } - args.add(Constants.FLINK_detach); //-d + args.add(Constants.FLINK_DETACH); //-d } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d270880408cfd4233549d55f6fc947b324707d88..99d418f048179de35b58aa8e22205a8947d62e1d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -149,7 +149,7 @@ public class WorkerServer implements IStoppable { this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.defaulWorkerHeartbeatThreadNum); + heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM); // heartbeat thread implement Runnable heartBeatThread = heartBeatThread(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 04098215dd9fe5d7e7fdf758187b0672e9750170..f1c01aff36d43729a9505572044ef844fde39394 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -535,7 +535,7 @@ public abstract class AbstractCommandExecutor { /** * when log buffer siz or flush time reach condition , then flush */ - if (logBuffer.size() >= Constants.defaultLogRowsNum || now - lastFlushTime > Constants.defaultLogFlushInterval) { + if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer);