ZKMasterClient.java 10.2 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.zk;
L
ligang 已提交
18

T
Tboy 已提交
19 20
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
21
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
T
Tboy 已提交
22
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
Q
qiaozhanwei 已提交
23 24 25
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
26
import org.apache.dolphinscheduler.common.model.Server;
Q
qiaozhanwei 已提交
27
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
28
import org.apache.dolphinscheduler.common.utils.OSUtils;
Q
qiaozhanwei 已提交
29 30
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
31
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
Q
qiaozhanwei 已提交
32
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
Q
qiaozhanwei 已提交
33
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
34 35
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
L
ligang 已提交
36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
38 39
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
L
ligang 已提交
40 41 42 43

import java.util.Date;
import java.util.List;

Q
qiaozhanwei 已提交
44 45
import static org.apache.dolphinscheduler.common.Constants.*;

L
ligang 已提交
46 47 48 49 50 51

/**
 *  zookeeper master client
 *
 *  single instance
 */
52
@Component
L
ligang 已提交
53 54
public class ZKMasterClient extends AbstractZKClient {

55 56 57
	/**
	 * logger
	 */
L
ligang 已提交
58 59 60
	private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);

	/**
61
	 *  process service
L
ligang 已提交
62
	 */
63
	@Autowired
64
	private ProcessService processService;
L
ligang 已提交
65

66
	public void start() {
L
ligang 已提交
67

Q
qiaozhanwei 已提交
68 69
		InterProcessMutex mutex = null;
		try {
70
			// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
Q
qiaozhanwei 已提交
71
			String znodeLock = getMasterStartUpLockPath();
T
Tboy 已提交
72
			mutex = new InterProcessMutex(getZkClient(), znodeLock);
Q
qiaozhanwei 已提交
73 74 75 76 77
			mutex.acquire();

			// init system znode
			this.initSystemZNode();

Q
qiaozhanwei 已提交
78 79 80 81 82 83 84
			while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){
				ThreadUtils.sleep(SLEEP_TIME_MILLIS);
			}


			// self tolerant
			if (getActiveMasterNum() == 1) {
leon-baoliang's avatar
update  
leon-baoliang 已提交
85
				failoverWorker(null, true);
86
				failoverMaster(null);
Q
qiaozhanwei 已提交
87 88 89
			}

		}catch (Exception e){
Q
qiaozhanwei 已提交
90
			logger.error("master start up exception",e);
Q
qiaozhanwei 已提交
91
		}finally {
92
			releaseMutex(mutex);
L
ligang 已提交
93 94 95
		}
	}

T
Tboy 已提交
96 97 98
	@Override
	public void close(){
		super.close();
L
ligang 已提交
99 100 101
	}

	/**
D
DK.Pino 已提交
102 103 104 105
	 * handle path events that this class cares about
	 * @param client   zkClient
	 * @param event	   path event
	 * @param path     zk path
L
ligang 已提交
106
	 */
D
DK.Pino 已提交
107 108
	@Override
	protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
Q
qiaozhanwei 已提交
109 110
		//monitor master
		if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
D
DK.Pino 已提交
111
			handleMasterEvent(event,path);
Q
qiaozhanwei 已提交
112 113
		}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
			//monitor worker
D
DK.Pino 已提交
114 115 116
			handleWorkerEvent(event,path);
		}
	}
117

118 119 120 121 122 123 124
	/**
	 * remove zookeeper node path
	 *
	 * @param path			zookeeper node path
	 * @param zkNodeType	zookeeper node type
	 * @param failover		is failover
	 */
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
	private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
		logger.info("{} node deleted : {}", zkNodeType.toString(), path);
		InterProcessMutex mutex = null;
		try {
			String failoverPath = getFailoverLockPath(zkNodeType);
			// create a distributed lock
			mutex = new InterProcessMutex(getZkClient(), failoverPath);
			mutex.acquire();

			String serverHost = getHostByEventDataPath(path);
			// handle dead server
			handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
			//failover server
			if(failover){
				failoverServerWhenDown(serverHost, zkNodeType);
			}
		}catch (Exception e){
			logger.error("{} server failover failed.", zkNodeType.toString());
143
			logger.error("failover exception ",e);
144 145 146 147 148
		}
		finally {
			releaseMutex(mutex);
		}
	}
L
ligang 已提交
149

150 151 152 153 154 155 156
	/**
	 * failover server when server down
	 *
	 * @param serverHost	server host
	 * @param zkNodeType	zookeeper node type
	 * @throws Exception	exception
	 */
157
	private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
Q
qiaozhanwei 已提交
158
		if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){
159
			return ;
160 161 162 163 164 165 166 167 168 169
		}
		switch (zkNodeType){
			case MASTER:
				failoverMaster(serverHost);
				break;
			case WORKER:
				failoverWorker(serverHost, true);
			default:
				break;
		}
L
ligang 已提交
170 171
	}

172 173 174 175 176 177
	/**
	 * get failover lock path
	 *
	 * @param zkNodeType zookeeper node type
	 * @return fail over lock path
	 */
178
	private String getFailoverLockPath(ZKNodeType zkNodeType){
leon-baoliang's avatar
leon-baoliang 已提交
179

180 181 182 183 184 185 186 187 188 189
		switch (zkNodeType){
			case MASTER:
				return getMasterFailoverLockPath();
			case WORKER:
				return getWorkerFailoverLockPath();
			default:
				return "";
		}
	}

L
ligang 已提交
190
	/**
D
DK.Pino 已提交
191
	 * monitor master
Q
qiaozhanwei 已提交
192 193
	 * @param event event
	 * @param path path
L
ligang 已提交
194
	 */
D
DK.Pino 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
	public void handleMasterEvent(TreeCacheEvent event, String path){
		switch (event.getType()) {
			case NODE_ADDED:
				logger.info("master node added : {}", path);
				break;
			case NODE_REMOVED:
				removeZKNodePath(path, ZKNodeType.MASTER, true);
				break;
			default:
				break;
		}
	}

	/**
	 * monitor worker
Q
qiaozhanwei 已提交
210 211
	 * @param event event
	 * @param path path
D
DK.Pino 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224
	 */
	public void handleWorkerEvent(TreeCacheEvent event, String path){
		switch (event.getType()) {
			case NODE_ADDED:
				logger.info("worker node added : {}", path);
				break;
			case NODE_REMOVED:
				logger.info("worker node deleted : {}", path);
				removeZKNodePath(path, ZKNodeType.WORKER, true);
				break;
			default:
				break;
		}
L
ligang 已提交
225 226
	}

227 228
	/**
	 * task needs failover if task start before worker starts
229
	 *
230 231
	 * @param taskInstance task instance
	 * @return true if task instance need fail over
232 233 234 235 236
	 */
	private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {

		boolean taskNeedFailover = true;

237 238 239 240 241
		//now no host will execute this task instance,so no need to failover the task
		if(taskInstance.getHost() == null){
			return false;
		}

242
		// if the worker node exists in zookeeper, we must check the task starts after the worker
243 244 245 246
		if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
			//if task start after worker starts, there is no need to failover the task.
			if(checkTaskAfterWorkerStart(taskInstance)){
				taskNeedFailover = false;
247 248 249 250 251 252 253
			}
		}
		return taskNeedFailover;
	}

	/**
	 * check task start after the worker server starts.
254 255 256
	 *
	 * @param taskInstance task instance
	 * @return true if task instance start time after worker server start date
257 258
	 */
	private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
259 260
		if(StringUtils.isEmpty(taskInstance.getHost())){
			return false;
261
		}
262 263 264
		Date workerServerStartDate = null;
		List<Server> workerServers = getServersList(ZKNodeType.WORKER);
		for(Server workerServer : workerServers){
265
			if(workerServer.getHost().equals(taskInstance.getHost())){
266 267
				workerServerStartDate = workerServer.getCreateTime();
				break;
268
			}
269 270 271 272 273 274 275 276 277
		}

		if(workerServerStartDate != null){
			return taskInstance.getStartTime().after(workerServerStartDate);
		}else{
			return false;
		}
	}

leon-baoliang's avatar
leon-baoliang 已提交
278 279
	/**
	 * failover worker tasks
280
	 *
leon-baoliang's avatar
leon-baoliang 已提交
281 282
	 * 1. kill yarn job if there are yarn jobs in tasks.
	 * 2. change task state from running to need failover.
283
	 * 3. failover all tasks when workerHost is null
284 285 286 287 288 289 290 291 292 293 294 295
	 * @param workerHost worker host
	 */

	/**
	 * failover worker tasks
	 *
	 * 1. kill yarn job if there are yarn jobs in tasks.
	 * 2. change task state from running to need failover.
	 * 3. failover all tasks when workerHost is null
	 * @param workerHost			worker host
	 * @param needCheckWorkerAlive	need check worker alive
	 * @throws Exception			exception
leon-baoliang's avatar
leon-baoliang 已提交
296
	 */
297
	private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
leon-baoliang's avatar
leon-baoliang 已提交
298 299
		logger.info("start worker[{}] failover ...", workerHost);

300
		List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
leon-baoliang's avatar
leon-baoliang 已提交
301
		for(TaskInstance taskInstance : needFailoverTaskInstanceList){
302 303 304
			if(needCheckWorkerAlive){
				if(!checkTaskInstanceNeedFailover(taskInstance)){
					continue;
305
				}
306 307
			}

308 309 310
			ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
			if(processInstance != null){
				taskInstance.setProcessInstance(processInstance);
leon-baoliang's avatar
leon-baoliang 已提交
311
			}
312 313 314 315 316

			TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
					.buildTaskInstanceRelatedInfo(taskInstance)
					.buildProcessInstanceRelatedInfo(processInstance)
					.create();
leon-baoliang's avatar
leon-baoliang 已提交
317
			// only kill yarn job if exists , the local thread has exited
318
			ProcessUtils.killYarnJob(taskExecutionContext);
319 320

			taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
321
			processService.saveTaskInstance(taskInstance);
leon-baoliang's avatar
leon-baoliang 已提交
322 323 324 325 326 327
		}
		logger.info("end worker[{}] failover ...", workerHost);
	}

	/**
	 * failover master tasks
328 329
	 *
	 * @param masterHost master host
leon-baoliang's avatar
leon-baoliang 已提交
330
	 */
331
	private void failoverMaster(String masterHost) {
leon-baoliang's avatar
leon-baoliang 已提交
332 333
		logger.info("start master failover ...");

334
		List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
leon-baoliang's avatar
leon-baoliang 已提交
335 336 337

		//updateProcessInstance host is null and insert into command
		for(ProcessInstance processInstance : needFailoverProcessInstanceList){
338
			processService.processNeedFailoverProcessInstances(processInstance);
leon-baoliang's avatar
leon-baoliang 已提交
339 340 341 342
		}

		logger.info("master failover end");
	}
L
ligang 已提交
343

T
Tboy 已提交
344
	public InterProcessMutex blockAcquireMutex() throws Exception {
345 346 347
		InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
		mutex.acquire();
		return mutex;
T
Tboy 已提交
348 349
	}

L
ligang 已提交
350
}