未验证 提交 c956185e 编写于 作者: Z zwZjut 提交者: GitHub

[Feature] 2.0.2-prepare bug fix of Pressure tests #7511 (#7540)

* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 05aef279 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 8ebe0606 and handle conflicts

* cherry-pick 1f184440 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: NKirs <acm_master@163.com>

* add default constructor (#6780)
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>

* to #7108 (#7109)

* to #7511

* to #7511

* to #7511

* to #7511
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: NKirs <acm_master@163.com>
上级 1657ce31
......@@ -38,6 +38,9 @@ server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javasc
# max http post size
server.jetty.max-http-form-post-size=5000000
# max http header size
server.max-http-header-size=81920
# messages encoding
spring.messages.encoding=UTF-8
......
......@@ -44,3 +44,7 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG}
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
master.reserved.memory=${MASTER_RESERVED_MEMORY}
# master failover interval minutes
master.failover.interval=${MASTER_FAILOVER_INTERVAL}
# master kill yarn job when handle failover
master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
\ No newline at end of file
......@@ -32,7 +32,8 @@
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.makeThreadsDaemons = true
#org.quartz.threadPool.threadCount = 25
org.quartz.threadPool.threadCount = ${ORG_QUARTZ_THREADPOOL_THREADCOUNT}
org.quartz.scheduler.batchTriggerAcquisitionMaxCount = ${ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT}
#org.quartz.threadPool.threadPriority = 5
#============================================================================
......
......@@ -17,3 +17,4 @@
registry.plugin.name=${REGISTRY_PLUGIN_NAME}
registry.servers=${REGISTRY_SERVERS}
session.timeout.ms=${SESSION_TIMEOUT_MS}
\ No newline at end of file
......@@ -41,3 +41,6 @@ worker.groups=${WORKER_GROUPS}
# alert server listen host
alert.listen.host=${ALERT_LISTEN_HOST}
# worker retry report task statues interval seconds
worker.retry.report.task.statues.interval=${WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL}
\ No newline at end of file
......@@ -53,6 +53,10 @@ externalDatabase:
## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper will use it.
zookeeper:
enabled: true
tickTime: 3000
maxSessionTimeout: 60000
initLimit: 300
maxClientCnxns: 2000
fourlwCommandsWhitelist: "srvr,ruok,wchs,cons"
persistence:
enabled: false
......@@ -158,6 +162,10 @@ master:
MASTER_TASK_COMMIT_INTERVAL: "1000"
MASTER_MAX_CPULOAD_AVG: "-1"
MASTER_RESERVED_MEMORY: "0.3"
MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
SESSION_TIMEOUT_MS: 60000
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
......@@ -225,6 +233,8 @@ worker:
WORKER_MAX_CPULOAD_AVG: "-1"
WORKER_RESERVED_MEMORY: "0.3"
WORKER_GROUPS: "default"
SESSION_TIMEOUT_MS: 60000
WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL: 600
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
......
......@@ -53,7 +53,7 @@ public final class Constants {
public static final String ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK = "org.quartz.jobStore.acquireTriggersWithinLock";
public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = "org.quartz.jobStore.dataSource";
public static final String ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = "org.quartz.dataSource.myDs.connectionProvider.class";
public static final String ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT = "org.quartz.scheduler.batchTriggerAcquisitionMaxCount";
/**
* quartz config default value
*/
......@@ -66,6 +66,7 @@ public final class Constants {
public static final String QUARTZ_INSTANCENAME = "DolphinScheduler";
public static final String QUARTZ_INSTANCEID = "AUTO";
public static final String QUARTZ_ACQUIRETRIGGERSWITHINLOCK = "true";
public static final String QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT = "100";
/**
* common properties path
......
......@@ -244,6 +244,12 @@ public class ProcessInstance {
*/
private int dryRun;
/**
* re-start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date restartTime;
public ProcessInstance() {
}
......@@ -516,6 +522,14 @@ public class ProcessInstance {
this.dryRun = dryRun;
}
public Date getRestartTime() {
return restartTime;
}
public void setRestartTime(Date restartTime) {
this.restartTime = restartTime;
}
/**
* add command to history
*
......@@ -684,6 +698,10 @@ public class ProcessInstance {
+ ", dryRun='"
+ dryRun
+ '\''
+ '}'
+ ", restartTime='"
+ restartTime
+ '\''
+ '}';
}
......
......@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process instance map mapper interface
*/
......
......@@ -43,6 +43,14 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
*/
ProcessInstance queryDetailById(@Param("processId") int processId);
/**
* query process instance host by stateArray
*
* @param stateArray
* @return
*/
List<String> queryNeedFailoverProcessInstanceHost(@Param("states") int[] stateArray);
/**
* query process instance by host and stateArray
*
......
......@@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime
);
int updateHostAndSubmitTimeById(@Param("id") int id, @Param("host") String host, @Param("submitTime") Date submitTime);
}
......@@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run, restart_time
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
......@@ -45,7 +45,14 @@
</foreach>
order by id asc
</select>
<select id="queryNeedFailoverProcessInstanceHost" resultType="String">
select distinct host
from t_ds_process_instance
where state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
<select id="queryTopNProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
......@@ -93,7 +100,7 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run, instance.restart_time
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
......
......@@ -162,4 +162,10 @@
</if>
order by instance.start_time desc
</select>
<update id="updateHostAndSubmitTimeById">
update t_ds_task_instance
set host = #{host},
submit_time = #{submitTime}
where id = #{id}
</update>
</mapper>
......@@ -601,6 +601,7 @@ CREATE TABLE t_ds_process_instance
tenant_id int(11) NOT NULL DEFAULT '-1',
var_pool longtext,
dry_run int NULL DEFAULT 0,
restart_time datetime DEFAULT NULL,
PRIMARY KEY (id)
);
......
......@@ -596,6 +596,7 @@ CREATE TABLE `t_ds_process_instance` (
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId',
`restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`,`end_time`) USING BTREE
......
......@@ -501,6 +501,7 @@ CREATE TABLE t_ds_process_instance (
var_pool text ,
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
restart_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
......
2.0.1
\ No newline at end of file
2.0.2
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
alter table t_ds_process_instance add column if not exists `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time';
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema varchar;
BEGIN
---get schema name
v_schema =current_schema();
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_update_metadata();
d//
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
\ No newline at end of file
......@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master;
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
......@@ -31,8 +33,9 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
......@@ -51,8 +54,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
/**
* master server
*/
......@@ -105,6 +106,9 @@ public class MasterServer implements IStoppable {
@Autowired
private EventExecuteService eventExecuteService;
@Autowired
private FailoverExecuteThread failoverExecuteThread;
@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
......@@ -145,8 +149,8 @@ public class MasterServer implements IStoppable {
// self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps);
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
this.masterRegistryClient.start();
this.eventExecuteService.init(this.processInstanceExecMaps);
this.eventExecuteService.start();
......@@ -155,6 +159,8 @@ public class MasterServer implements IStoppable {
this.masterSchedulerService.start();
this.failoverExecuteThread.start();
// start QuartzExecutors
// what system should do if exception
try {
......@@ -217,8 +223,17 @@ public class MasterServer implements IStoppable {
}
// close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("springApplicationContext close");
} catch (Exception e) {
logger.error("master server stop exception ", e);
} finally {
try {
// thread sleep 60 seconds for quietly stop
Thread.sleep(60000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
System.exit(1);
}
}
......
......@@ -63,6 +63,12 @@ public class MasterConfig {
@Value("${master.cache.process.definition:true}")
private boolean masterCacheProcessDefinition;
@Value("${master.failover.interval:10}")
private int failoverInterval;
@Value("${master.kill.yarn.job.when.handle.fail.over:true}")
private boolean masterKillYarnJobWhenHandleFailOver;
public int getListenPort() {
return listenPort;
}
......@@ -162,4 +168,19 @@ public class MasterConfig {
this.masterCacheProcessDefinition = masterCacheProcessDefinition;
}
public int getFailoverInterval() {
return failoverInterval;
}
public void setFailoverInterval(int failoverInterval) {
this.failoverInterval = failoverInterval;
}
public boolean getMasterKillYarnJobWhenHandleFailOver() {
return masterKillYarnJobWhenHandleFailOver;
}
public void setMasterKillYarnJobWhenHandleFailOver(boolean masterKillYarnJobWhenHandleFailOver) {
this.masterKillYarnJobWhenHandleFailOver = masterKillYarnJobWhenHandleFailOver;
}
}
\ No newline at end of file
......@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
......@@ -136,8 +137,13 @@ public class TaskPriorityQueueConsumer extends Thread {
} else {
result = dispatcher.dispatch(executionContext);
}
if (result) {
processService.updateHostAndSubmitTimeById(taskPriority.getTaskId(), executionContext.getHost().getAddress(), new Date());
}
} catch (ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage(),e);
logger.error("ExecuteException dispatch error: {}", e.getMessage(), e);
} catch (Throwable t) {
logger.error("dispatch error: {}", t, t);
}
return result;
}
......
......@@ -110,6 +110,7 @@ public class TaskResponseService {
public void addResponse(TaskResponseEvent taskResponseEvent) {
try {
eventQueue.put(taskResponseEvent);
logger.debug("eventQueue size:{}", eventQueue.size());
} catch (InterruptedException e) {
logger.error("put task : {} error :{}", taskResponseEvent, e);
Thread.currentThread().interrupt();
......@@ -155,36 +156,49 @@ public class TaskResponseService {
try {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status,
boolean result = processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} catch (Exception e) {
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}
break;
case RESULT:
try {
boolean result = true;
if (taskInstance != null) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
if (!result) {
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} else {
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
......
......@@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Collections;
......@@ -127,16 +128,16 @@ public class MasterRegistryClient {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// self tolerant
if (registryClient.getActiveMasterNum() == 1) {
removeNodePath(null, NodeType.MASTER, true);
removeNodePath(null, NodeType.WORKER, true);
}
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
this.registryClient.getStoppable().stop("master start up exception");
} finally {
registryClient.releaseLock(nodeLock);
try {
registryClient.releaseLock(nodeLock);
} catch (Exception e) {
logger.error("release lock error", e);
}
}
}
......@@ -150,18 +151,57 @@ public class MasterRegistryClient {
}
/**
* remove zookeeper node path
* remove master node path
*
* @param path zookeeper node path
* @param nodeType zookeeper node type
* @param path node path
* @param nodeType node type
* @param failover is failover
*/
public void removeNodePath(String path, NodeType nodeType, boolean failover) {
public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
logger.info("{} node deleted : {}", nodeType, path);
String failoverPath = getFailoverLockPath(nodeType);
if (StringUtils.isEmpty(path)) {
logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType);
return;
}
String serverHost = registryClient.getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType);
return;
}
String failoverPath = getFailoverLockPath(nodeType, serverHost);
try {
registryClient.getLock(failoverPath);
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* remove worker node path
*
* @param path node path
* @param nodeType node type
* @param failover is failover
*/
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
logger.info("{} node deleted : {}", nodeType, path);
try {
String serverHost = null;
if (!StringUtils.isEmpty(path)) {
serverHost = registryClient.getHostByEventDataPath(path);
......@@ -169,18 +209,18 @@ public class MasterRegistryClient {
logger.error("server down error: unknown path: {}", path);
return;
}
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed.", nodeType);
logger.error("failover exception ", e);
} finally {
registryClient.releaseLock(failoverPath);
logger.error("{} server failover failed", nodeType, e);
}
}
......@@ -209,12 +249,12 @@ public class MasterRegistryClient {
* @param nodeType zookeeper node type
* @return fail over lock path
*/
private String getFailoverLockPath(NodeType nodeType) {
public String getFailoverLockPath(NodeType nodeType, String host) {
switch (nodeType) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
return "";
}
......@@ -226,7 +266,11 @@ public class MasterRegistryClient {
* @param taskInstance task instance
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {
// first submit: host is null
// dispatch succeed: host is not null && submit_time is null
// ACK || RESULT from worker: host is not null && start_time is not null
boolean taskNeedFailover = true;
......@@ -234,14 +278,15 @@ public class MasterRegistryClient {
if (taskInstance.getHost() == null) {
return false;
}
// if the worker node exists in zookeeper, we must check the task starts after the worker
if (registryClient.checkNodeExists(taskInstance.getHost(), NodeType.WORKER)) {
//if task start after worker starts, there is no need to failover the task.
if (checkTaskAfterWorkerStart(taskInstance)) {
taskNeedFailover = false;
}
// host is not null and submit time is null, master will retry
if (taskInstance.getSubmitTime() == null) {
return false;
}
//if task start after worker starts, there is no need to failover the task.
if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
taskNeedFailover = false;
}
return taskNeedFailover;
}
......@@ -269,6 +314,54 @@ public class MasterRegistryClient {
return false;
}
/**
* check task start after the worker server starts.
*
* @param taskInstance task instance
* @return true if task instance start time after worker server start date
*/
private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return false;
}
Date taskTime = taskInstance.getStartTime() == null ? taskInstance.getSubmitTime() : taskInstance.getStartTime();
Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
if (workerServerStartDate != null) {
return taskTime.after(workerServerStartDate);
}
return false;
}
/**
* get server startup time
*/
private Date getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return null;
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
return serverStartupTime;
}
/**
* get server startup time
*/
private Date getServerStartupTime(NodeType nodeType, String host) {
if (StringUtils.isEmpty(host)) {
return null;
}
List<Server> servers = registryClient.getServerList(nodeType);
return getServerStartupTime(servers, host);
}
/**
* failover worker tasks
* <p>
......@@ -279,10 +372,13 @@ public class MasterRegistryClient {
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {
if (StringUtils.isEmpty(workerHost)) {
return;
}
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
......@@ -300,11 +396,17 @@ public class MasterRegistryClient {
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}
// only failover the task owned myself if worker down.
if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}
......@@ -316,11 +418,15 @@ public class MasterRegistryClient {
*
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
public void failoverMaster(String masterHost) {
if (StringUtils.isEmpty(masterHost)) {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
......@@ -330,6 +436,11 @@ public class MasterRegistryClient {
continue;
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
logger.info("failover process instance id: {}", processInstance.getId());
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
......@@ -337,6 +448,12 @@ public class MasterRegistryClient {
if (Constants.NULL.equals(taskInstance.getHost())) {
continue;
}
if (taskInstance.getState().typeIsFinished()) {
continue;
}
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
......@@ -347,6 +464,13 @@ public class MasterRegistryClient {
logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}
/**
* failover task instance
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*/
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("failover task instance error, taskInstance is null");
......@@ -359,24 +483,23 @@ public class MasterRegistryClient {
return;
}
if (!checkTaskInstanceNeedFailover(taskInstance)) {
return;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()) {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId());
if (workflowExecuteThreadNotify == null) {
logger.info("workflowExecuteThreadNotify is null, just return, task id:{},process id:{}", taskInstance.getId(), processInstance.getId());
return;
}
StateEvent stateEvent = new StateEvent();
......
......@@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
logger.info("master node added : {}", path);
break;
case REMOVE:
masterRegistryClient.removeNodePath(path, NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
break;
default:
break;
......@@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
break;
case REMOVE:
logger.info("worker node deleted : {}", path);
masterRegistryClient.removeNodePath(path, NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
break;
default:
break;
......
......@@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
......@@ -387,7 +388,8 @@ public class ServerNodeManager implements InitializingBean {
workerGroup = workerGroup.toLowerCase();
Set<String> nodes = workerGroupNodes.get(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {
return Collections.unmodifiableSet(nodes);
// avoid ConcurrentModificationException
return Collections.unmodifiableSet(nodes.stream().collect(Collectors.toSet()));
}
return nodes;
} finally {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FailoverExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterConfig masterConfig;
/**
* process service
*/
@Autowired
private ProcessService processService;
@Override
public synchronized void start() {
super.setName("FailoverExecuteThread");
super.start();
}
@Override
public void run() {
while (Stopper.isRunning()) {
logger.info("failover execute started");
try {
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
if (CollectionUtils.isEmpty(hosts)) {
continue;
}
for (String host : hosts) {
String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
try {
registryClient.getLock(failoverPath);
masterRegistryClient.failoverMaster(host);
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
} catch (Exception e) {
logger.error("failover execute error", e);
} finally {
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
}
}
}
}
\ No newline at end of file
......@@ -234,6 +234,7 @@ public class MasterSchedulerService extends Thread {
if (ServerNodeManager.MASTER_SIZE == 0) {
return null;
}
logger.debug("master size:{}",ServerNodeManager.MASTER_SIZE);
List<Command> commandList = processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber);
if (commandList.size() == 0) {
return null;
......
......@@ -409,9 +409,10 @@ public class WorkflowExecuteThread implements Runnable {
private boolean checkStateEvent(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}",
logger.error("mismatch process instance id: {}, state event:{}, task instance id:{}",
this.processInstance.getId(),
stateEvent.toString());
stateEvent.toString(),
stateEvent.getTaskInstanceId());
return false;
}
return true;
......@@ -482,6 +483,7 @@ public class WorkflowExecuteThread implements Runnable {
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processService.saveProcessInstance(processInstance);
this.taskInstanceHashMap.clear();
......@@ -876,11 +878,11 @@ public class WorkflowExecuteThread implements Runnable {
}
if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
logger.info("task {} has already run success", task.getName());
logger.info("task {} has already run success, task id:{}", task.getName(), task.getId());
continue;
}
if (task.getState().typeIsPause() || task.getState().typeIsCancel()) {
logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
logger.info("task {} stopped, the state is {}, task id:{}", task.getName(), task.getState(), task.getId());
} else {
addTaskToStandByList(task);
}
......@@ -1167,13 +1169,13 @@ public class WorkflowExecuteThread implements Runnable {
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName());
logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId());
try {
if (!readyToSubmitTaskQueue.contains(taskInstance)) {
readyToSubmitTaskQueue.put(taskInstance);
}
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e);
logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
}
}
......@@ -1253,7 +1255,7 @@ public class WorkflowExecuteThread implements Runnable {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
logger.info("task name: {} has been forced success, put it into complete task list and stop retrying, task id:{}", task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskList.put(Long.toString(task.getTaskCode()), task);
submitPostNode(Long.toString(task.getTaskCode()));
......
......@@ -70,8 +70,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
dispatchTask(taskInstance, processInstance);
return true;
return dispatchTask(taskInstance, processInstance);
}
@Override
......@@ -127,7 +126,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
logger.info("master submit success, task id:{}, task name:{}, process id:{}",
taskInstance.getId(), taskInstance.getName(), taskInstance.getProcessInstanceId());
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
......
......@@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRI
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
......@@ -29,12 +28,19 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.*;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -45,9 +51,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
import java.util.Set;
/**
* worker server
*/
......@@ -146,14 +149,14 @@ public class WorkerServer implements IStoppable {
try {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("worker registry error", e);
throw new RuntimeException(e);
}
// solve dead lock
logger.info(org.apache.dolphinscheduler.spi.utils.PropertyUtils.dumpProperties());
// task execute manager
this.workerManagerThread.start();
......@@ -194,8 +197,17 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
this.springApplicationContext.close();
logger.info("springApplicationContext close");
} catch (Exception e) {
logger.error("worker server stop exception ", e);
} finally {
try {
// thread sleep 60 seconds for quietly stop
Thread.sleep(60000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
System.exit(1);
}
}
......
......@@ -65,6 +65,17 @@ public class WorkerConfig {
@Value("${task.plugin.binding:}")
private String taskPluginBinding;
@Value("${worker.retry.report.task.statues.interval:10}")
private int retryReportTaskStatusInterval;
public int getRetryReportTaskStatusInterval() {
return retryReportTaskStatusInterval;
}
public void setRetryReportTaskStatusInterval(int retryReportTaskStatusInterval) {
this.retryReportTaskStatusInterval = retryReportTaskStatusInterval;
}
public int getListenPort() {
return listenPort;
}
......
......@@ -17,17 +17,21 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* db task ack processor
*/
......@@ -50,6 +54,7 @@ public class DBTaskAckProcessor implements NettyRequestProcessor {
if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
logger.debug("removeAckCache: taskinstance id:{}", taskAckCommand.getTaskInstanceId());
}
}
......
......@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -25,11 +24,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* db task response processor
*/
......@@ -51,6 +53,7 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor {
if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
logger.debug("removeResponseCache: taskinstance id:{}", taskResponseCommand.getTaskInstanceId());
}
}
......
......@@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......@@ -104,6 +107,14 @@ public class WorkerRegistryClient {
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
}
while (!this.checkNodeExists()) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
this.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
registryClient.addConnectionStateListener(this::handleConnectionState);
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getWorkerMaxCpuloadAvg(),
workerConfig.getWorkerReservedMemory(),
......@@ -119,6 +130,32 @@ public class WorkerRegistryClient {
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
}
public void handleConnectionState(ConnectionState state) {
switch (state) {
case CONNECTED:
logger.info("registry connection state is {}", state);
break;
case SUSPENDED:
logger.info("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself");
break;
case RECONNECTED:
logger.info("registry connection state is {}, clean the node info", state);
String address = NetUtils.getAddr(workerConfig.getListenPort());
Set<String> workerZkPaths = getWorkerZkPaths();
for (String workerZKPath : workerZkPaths) {
registryClient.persistEphemeral(workerZKPath, "");
logger.info("worker node : {} reconnect to ZK {} successfully", address, workerZKPath);
}
break;
case DISCONNECTED:
logger.info("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
break;
default:
}
}
/**
* remove registry info
*/
......@@ -177,4 +214,11 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
public boolean checkNodeExists() {
boolean result = registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER);
if (result) {
logger.info("check worker, node exist success, host:{}", NetUtils.getHost());
}
return result;
}
}
......@@ -18,18 +18,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Retry Report Task Status Thread
*/
......@@ -38,10 +40,8 @@ public class RetryReportTaskStatusThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
/**
* every 5 minutes
*/
private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
@Autowired
WorkerConfig workerConfig;
/**
* task callback service
......@@ -68,7 +68,7 @@ public class RetryReportTaskStatusThread implements Runnable {
while (Stopper.isRunning()){
// sleep 5 minutes
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
ThreadUtils.sleep(workerConfig.getRetryReportTaskStatusInterval() * 1000);
try {
if (!responceCache.getAckCache().isEmpty()){
......
......@@ -114,9 +114,9 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true);
//Cannot mock static methods
masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true);
}
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
......@@ -27,8 +28,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -579,6 +578,7 @@ public class ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
......@@ -775,6 +775,7 @@ public class ProcessService {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
processInstance.setRestartTime(new Date());
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes();
switch (commandType) {
......@@ -844,6 +845,7 @@ public class ProcessService {
updateTaskInstance(taskInstance);
}
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processInstance.setRunTimes(runTime + 1);
initComplementDataParam(processDefinition, processInstance, cmdParam);
......@@ -1015,6 +1017,7 @@ public class ProcessService {
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
logger.debug("update task instance, task instance id:{}", taskInstance.getId());
}
/**
......@@ -1317,9 +1320,6 @@ public class ProcessService {
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
if (taskInstance.getFirstSubmitTime() == null) {
taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
}
......@@ -1435,6 +1435,11 @@ public class ProcessService {
}
}
public boolean updateHostAndSubmitTimeById(int id, String host, Date date) {
int count = taskInstanceMapper.updateHostAndSubmitTimeById(id, host, date);
return count > 0;
}
/**
* insert task instance
*
......@@ -1454,6 +1459,7 @@ public class ProcessService {
*/
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
logger.debug("updateTaskInstance, task instance id:{}, state;{}", taskInstance.getId(), taskInstance.getState());
return count > 0;
}
......@@ -1691,8 +1697,9 @@ public class ProcessService {
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
* @reutrn
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
String logPath,
int taskInstId) {
......@@ -1701,7 +1708,7 @@ public class ProcessService {
taskInstance.setHost(host);
taskInstance.setExecutePath(executePath);
taskInstance.setLogPath(logPath);
saveTaskInstance(taskInstance);
return saveTaskInstance(taskInstance);
}
/**
......@@ -1721,8 +1728,9 @@ public class ProcessService {
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
* @return
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
int processId,
String appIds,
......@@ -1734,7 +1742,7 @@ public class ProcessService {
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
changeOutParam(taskInstance);
saveTaskInstance(taskInstance);
return saveTaskInstance(taskInstance);
}
/**
......@@ -1819,6 +1827,10 @@ public class ProcessService {
return processInstanceMapper.queryByHostAndStatus(host, stateArray);
}
public List<String> queryNeedFailoverProcessInstanceHost() {
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
}
/**
* process need failover process instance
*
......
......@@ -17,39 +17,6 @@
package org.apache.dolphinscheduler.service.quartz;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
......@@ -61,6 +28,7 @@ import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_I
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
......@@ -70,6 +38,7 @@ import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
......@@ -91,6 +60,42 @@ import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* single Quartz executors instance
*/
......@@ -170,6 +175,8 @@ public class QuartzExecutors {
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, HikariConnectionProvider.class.getName()));
properties.setProperty(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT,
conf.getString(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT, QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
......
......@@ -32,7 +32,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 3000;
private static final Integer QUEUE_MAX_SIZE = 10000;
/**
* queue
......
......@@ -330,7 +330,7 @@ public class RegistryClient {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
logger.info("{} server {} deleted from zk dead server path:{} success", serverType, host,server);
}
}
}
......
......@@ -181,4 +181,7 @@ public class PropertyUtils {
properties.setProperty(key, value);
}
public static String dumpProperties() {
return properties.toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册