提交 e4df4131 编写于 作者: B break60

Merge branch 'dev-1.3.0' of https://github.com/apache/incubator-dolphinscheduler into dev-1.3.0

......@@ -21,12 +21,6 @@
<description>alert type is EMAIL/SMS</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>alert.template</name>
<value>html</value>
<description>alter msg template, default is html template</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mail.protocol</name>
<value>SMTP</value>
......
......@@ -203,265 +203,4 @@
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.mapper-locations</name>
<value>classpath*:/org.apache.dolphinscheduler.dao.mapper/*.xml</value>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.typeEnumsPackage</name>
<value>org.apache.dolphinscheduler.*.enums</value>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.typeAliasesPackage</name>
<value>org.apache.dolphinscheduler.dao.entity</value>
<description>
Entity scan, where multiple packages are separated by a comma or semicolon
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.id-type</name>
<value>AUTO</value>
<value-attributes>
<type>value-list</type>
<entries>
<entry>
<value>AUTO</value>
<label>AUTO</label>
</entry>
<entry>
<value>INPUT</value>
<label>INPUT</label>
</entry>
<entry>
<value>ID_WORKER</value>
<label>ID_WORKER</label>
</entry>
<entry>
<value>UUID</value>
<label>UUID</label>
</entry>
</entries>
<selection-cardinality>1</selection-cardinality>
</value-attributes>
<description>
Primary key type AUTO:" database ID AUTO ",
INPUT:" user INPUT ID",
ID_WORKER:" global unique ID (numeric type unique ID)",
UUID:" global unique ID UUID";
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.field-strategy</name>
<value>NOT_NULL</value>
<value-attributes>
<type>value-list</type>
<entries>
<entry>
<value>IGNORED</value>
<label>IGNORED</label>
</entry>
<entry>
<value>NOT_NULL</value>
<label>NOT_NULL</label>
</entry>
<entry>
<value>NOT_EMPTY</value>
<label>NOT_EMPTY</label>
</entry>
</entries>
<selection-cardinality>1</selection-cardinality>
</value-attributes>
<description>
Field policy IGNORED:" ignore judgment ",
NOT_NULL:" not NULL judgment "),
NOT_EMPTY:" not NULL judgment"
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.column-underline</name>
<value>true</value>
<value-attributes>
<type>boolean</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.logic-delete-value</name>
<value>1</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.logic-not-delete-value</name>
<value>0</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.global-config.db-config.banner</name>
<value>true</value>
<value-attributes>
<type>boolean</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.configuration.map-underscore-to-camel-case</name>
<value>true</value>
<value-attributes>
<type>boolean</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.configuration.cache-enabled</name>
<value>false</value>
<value-attributes>
<type>boolean</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.configuration.call-setters-on-nulls</name>
<value>true</value>
<value-attributes>
<type>boolean</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>mybatis-plus.configuration.jdbc-type-for-null</name>
<value>null</value>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.exec.threads</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.exec.task.num</name>
<value>20</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.heartbeat.interval</name>
<value>10</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.task.commit.retryTimes</name>
<value>5</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.task.commit.interval</name>
<value>1000</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.max.cpuload.avg</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>master.reserved.memory</name>
<value>0.1</value>
<value-attributes>
<type>float</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.exec.threads</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.heartbeat.interval</name>
<value>10</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.fetch.task.num</name>
<value>3</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.max.cpuload.avg</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.reserved.memory</name>
<value>0.1</value>
<value-attributes>
<type>float</type>
</value-attributes>
<description></description>
<on-ambari-upgrade add="true"/>
</property>
</configuration>
\ No newline at end of file
......@@ -33,15 +33,6 @@
<description>worker heartbeat interval</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.fetch.task.num</name>
<value>3</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description>submit the number of tasks at a time</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.max.cpuload.avg</name>
<value>100</value>
......
......@@ -15,14 +15,6 @@
~ limitations under the License.
-->
<configuration>
<property>
<name>dolphinscheduler.queue.impl</name>
<value>zookeeper</value>
<description>
Task queue implementation, default "zookeeper"
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>zookeeper.dolphinscheduler.root</name>
<value>/dolphinscheduler</value>
......
......@@ -114,10 +114,6 @@ else:
dolphin_common_map_tmp = config['configurations']['dolphin-common']
data_basedir_path = dolphin_common_map_tmp['data.basedir.path']
process_exec_basepath = data_basedir_path + '/exec'
data_download_basedir_path = data_basedir_path + '/download'
dolphin_common_map['process.exec.basepath'] = process_exec_basepath
dolphin_common_map['data.download.basedir.path'] = data_download_basedir_path
dolphin_common_map['dolphinscheduler.env.path'] = dolphin_env_path
dolphin_common_map.update(config['configurations']['dolphin-common'])
......
......@@ -124,6 +124,11 @@ public class TaskNode {
*/
private String workerGroup;
/**
* worker group id
*/
private Integer workerGroupId;
/**
* task time out
......@@ -341,4 +346,12 @@ public class TaskNode {
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}
public Integer getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
}
......@@ -33,47 +33,12 @@ public class ShellExecutorTest {
@Test
public void execCommand() throws InterruptedException {
ThreadPoolExecutors executors = ThreadPoolExecutors.getInstance();
CountDownLatch latch = new CountDownLatch(200);
executors.execute(new Runnable() {
@Override
public void run() {
try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("groups");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0,5));
Thread.sleep(100l);
latch.countDown();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("whoami");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result2:" + res);
Thread.sleep(100l);
latch.countDown();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
latch.await();
try {
String res = ShellExecutor.execCommand("groups");
logger.info("thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0, 5));
} catch (Exception e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
......@@ -102,4 +102,14 @@ public class ProcessData {
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
@Override
public String toString() {
return "ProcessData{" +
"tasks=" + tasks +
", globalParams=" + globalParams +
", timeout=" + timeout +
", tenantId=" + tenantId +
'}';
}
}
......@@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date;
import java.util.List;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
public class ProcessDefinitionDao {
public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class);
/**
* queryAllProcessDefinition
* @param conn jdbc connection
* @return ProcessDefinition Json List
*/
public Map<Integer,String> queryAllProcessDefinition(Connection conn){
Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()){
Integer id = rs.getInt(1);
String processDefinitionJson = rs.getString(2);
processDefinitionJsonMap.put(id,processDefinitionJson);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitionJsonMap;
}
/**
* updateProcessDefinitionJson
* @param conn jdbc connection
* @param processDefinitionJsonMap processDefinitionJsonMap
*/
public void updateProcessDefinitionJson(Connection conn,Map<Integer,String> processDefinitionJsonMap){
String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?";
try {
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()){
try(PreparedStatement pstmt= conn.prepareStatement(sql)) {
pstmt.setString(1,entry.getValue());
pstmt.setInt(2,entry.getKey());
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}
......@@ -16,14 +16,12 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,6 +32,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.*;
public abstract class UpgradeDao extends AbstractBaseDao {
......@@ -44,6 +43,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
protected static final DataSource dataSource = getDataSource();
private static final DbType dbType = getCurrentDbType();
@Override
protected void init() {
......@@ -119,6 +119,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
// Execute the dolphinscheduler DML, it can be rolled back
runInitDML(initSqlPath);
}
/**
......@@ -256,6 +257,43 @@ public abstract class UpgradeDao extends AbstractBaseDao {
upgradeDolphinSchedulerDML(schemaDir);
updateProcessDefinitionJsonWorkerGroup();
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup(){
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class);
List<TaskNode> tasks = processData.getTasks();
for (TaskNode taskNode : tasks){
Integer workerGroupId = taskNode.getWorkerGroupId();
if (workerGroupId == -1){
taskNode.setWorkerGroup("default");
}else {
taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId));
}
}
replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData));
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json workergroup error",e);
}
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class WorkerGroupDao {
public static final Logger logger = LoggerFactory.getLogger(WorkerGroupDao.class);
/**
* query all old worker group
* @param conn jdbc connection
* @return old worker group Map
*/
public Map<Integer,String> queryAllOldWorkerGroup(Connection conn){
Map<Integer,String> workerGroupMap = new HashMap<>();
String sql = String.format("select id,name from t_ds_worker_group");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()){
int id = rs.getInt(1);
String name = rs.getString(2);
workerGroupMap.put(id,name);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return workerGroupMap;
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper">
<select id="queryAllWorkerGroup" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
order by update_time desc
</select>
<select id="queryWorkerGroupByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
where name = #{name}
</select>
<select id="queryListPaging" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
where 1 = 1
<if test="searchVal != null and searchVal != ''">
and name like concat('%', #{searchVal}, '%')
</if>
order by update_time desc
</select>
</mapper>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class ProcessDefinitionDaoTest {
final DataSource dataSource = getDataSource();
final ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
@Test
public void testQueryAllProcessDefinition() throws Exception{
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
assertThat(processDefinitionJsonMap.size(),greaterThanOrEqualTo(0));
}
@Test
public void testUpdateProcessDefinitionJson() throws Exception{
Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
processDefinitionJsonMap.put(1,"test");
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),processDefinitionJsonMap);
}
@Test(expected = Exception.class)
public void testQueryAllProcessDefinitionException() throws Exception{
processDefinitionDao.queryAllProcessDefinition(null);
}
@Test(expected = Exception.class)
public void testUpdateProcessDefinitionJsonException() throws Exception{
processDefinitionDao.updateProcessDefinitionJson(null,null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class UpgradeDaoTest {
PostgresqlUpgradeDao postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance();
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{
postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class WokrerGrouopDaoTest {
protected final DataSource dataSource = getDataSource();
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
Map<Integer, String> workerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
assertThat(workerGroupMap.size(),greaterThanOrEqualTo(0));
}
@Test(expected = Exception.class)
public void testQueryQueryAllOldWorkerGroupException() throws Exception{
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
workerGroupDao.queryAllOldWorkerGroup(null);
}
}
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
......@@ -107,6 +108,10 @@ public class TaskPriorityQueueConsumer extends Thread{
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
failedDispatchTasks.clear();
for(int i = 0; i < fetchTaskNum; i++){
if(taskPriorityQueue.size() <= 0){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// if not task , blocking here
String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
......@@ -115,8 +120,8 @@ public class TaskPriorityQueueConsumer extends Thread{
failedDispatchTasks.add(taskPriorityInfo);
}
}
for(String taskPriorityInfo: failedDispatchTasks){
taskPriorityQueue.put(taskPriorityInfo);
for(String dispatchFailedTask : failedDispatchTasks){
taskPriorityQueue.put(dispatchFailedTask);
}
}catch (Exception e){
logger.error("dispatcher task error",e);
......
......@@ -829,7 +829,6 @@
<include>**/dao/mapper/ProjectMapperTest.java</include>
<include>**/dao/mapper/ProjectUserMapperTest.java</include>
<include>**/dao/mapper/QueueMapperTest.java</include>
<!--<include>**/dao/mapper/ResourceMapperTest.java</include>-->
<include>**/dao/mapper/ResourceUserMapperTest.java</include>
<include>**/dao/mapper/ScheduleMapperTest.java</include>
<include>**/dao/mapper/SessionMapperTest.java</include>
......@@ -841,6 +840,9 @@
<include>**/dao/mapper/UserMapperTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
<include>**/dao/upgrade/ProcessDefinitionDaoTest.java</include>
<include>**/dao/upgrade/WokrerGrouopDaoTest.java</include>
<include>**/dao/upgrade/UpgradeDaoTest.java</include>
<include>**/plugin/model/AlertDataTest.java</include>
<include>**/plugin/model/AlertInfoTest.java</include>
<include>**/plugin/utils/PropertyUtilsTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册