ZKMasterClient.java 11.0 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.server.zk;

import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
21
import cn.escheduler.common.enums.ZKNodeType;
22
import cn.escheduler.common.model.MasterServer;
L
ligang 已提交
23 24 25 26 27 28 29 30
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.ProcessUtils;
leon-baoliang's avatar
leon-baoliang 已提交
31
import org.apache.commons.lang.StringUtils;
L
ligang 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadFactory;


/**
 *  zookeeper master client
 *
 *  single instance
 */
public class ZKMasterClient extends AbstractZKClient {

	private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);

	private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread");

	/**
	 *  master znode
	 */
	private String masterZNode = null;

	/**
	 *  master database access
	 */
	private ServerDao serverDao = null;
66

L
ligang 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	/**
	 *  alert database access
	 */
	private AlertDao alertDao = null;
	/**
	 *  flow database access
	 */
	private ProcessDao processDao;

	/**
	 *  zkMasterClient
	 */
	private static ZKMasterClient zkMasterClient = null;


	private ZKMasterClient(ProcessDao processDao){
		this.processDao = processDao;
		init();
	}

	private ZKMasterClient(){}

	/**
	 *  get zkMasterClient
	 * @param processDao
	 * @return
	 */
	public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){
		if(zkMasterClient == null){
			zkMasterClient = new ZKMasterClient(processDao);
		}
		zkMasterClient.processDao = processDao;

		return zkMasterClient;
	}

	/**
	 *  init
	 */
	public void init(){
		// init dao
		this.initDao();

Q
qiaozhanwei 已提交
110 111 112
		InterProcessMutex mutex = null;
		try {
			// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
Q
qiaozhanwei 已提交
113 114
			String znodeLock = getMasterStartUpLockPath();
			mutex = new InterProcessMutex(zkClient, znodeLock);
Q
qiaozhanwei 已提交
115 116 117 118 119 120 121
			mutex.acquire();

			// init system znode
			this.initSystemZNode();

			// monitor master
			this.listenerMaster();
L
ligang 已提交
122

Q
qiaozhanwei 已提交
123 124
			// monitor worker
			this.listenerWorker();
L
ligang 已提交
125

Q
qiaozhanwei 已提交
126
			// register master
127
			this.registerMaster();
L
ligang 已提交
128

Q
qiaozhanwei 已提交
129 130
			// check if fault tolerance is required,failure and tolerance
			if (getActiveMasterNum() == 1) {
leon-baoliang's avatar
update  
leon-baoliang 已提交
131
				failoverWorker(null, true);
132
				failoverMaster(null);
Q
qiaozhanwei 已提交
133 134 135
			}

		}catch (Exception e){
Q
qiaozhanwei 已提交
136
			logger.error("master start up  exception : " + e.getMessage(),e);
Q
qiaozhanwei 已提交
137
		}finally {
138
			releaseMutex(mutex);
L
ligang 已提交
139 140 141 142
		}
	}


143 144


L
ligang 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	/**
	 *  init dao
	 */
	public void initDao(){
		this.serverDao = DaoFactory.getDaoInstance(ServerDao.class);
		this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
		this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
	}
	/**
	 * get alert dao
	 * @return
	 */
	public AlertDao getAlertDao() {
		return alertDao;
	}

161 162 163



L
ligang 已提交
164 165 166
	/**
	 *  register master znode
	 */
167
	public void registerMaster(){
L
ligang 已提交
168
		try {
169 170 171
		    String serverPath = registerServer(ZKNodeType.MASTER);
		    if(StringUtils.isEmpty(serverPath)){
		    	System.exit(-1);
L
ligang 已提交
172 173 174
			}
		} catch (Exception e) {
			logger.error("register master failure : "  + e.getMessage(),e);
175
			System.exit(-1);
L
ligang 已提交
176 177 178 179
		}
	}


180

L
ligang 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
	/**
	 *  monitor master
	 */
	public void listenerMaster(){
		PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory);

		try {
			masterPc.start();
			masterPc.getListenable().addListener(new PathChildrenCacheListener() {
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
					switch (event.getType()) {
						case CHILD_ADDED:
							logger.info("master node added : {}",event.getData().getPath());
							break;
						case CHILD_REMOVED:
							String path = event.getData().getPath();
198 199 200
							String serverHost = getHostByEventDataPath(path);
							if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){
								return;
L
ligang 已提交
201
							}
202
							removeZKNodePath(path, ZKNodeType.MASTER, true);
L
ligang 已提交
203 204 205 206 207 208 209 210 211 212 213
							break;
						case CHILD_UPDATED:
							break;
						default:
							break;
					}
				}
			});
		}catch (Exception e){
			logger.error("monitor master failed : " + e.getMessage(),e);
		}
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
}

	private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
		logger.info("{} node deleted : {}", zkNodeType.toString(), path);
		InterProcessMutex mutex = null;
		try {
			String failoverPath = getFailoverLockPath(zkNodeType);
			// create a distributed lock
			mutex = new InterProcessMutex(getZkClient(), failoverPath);
			mutex.acquire();

			String serverHost = getHostByEventDataPath(path);
			// handle dead server
			handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
			//alert server down.
			alertServerDown(serverHost, zkNodeType);
			//failover server
			if(failover){
				failoverServerWhenDown(serverHost, zkNodeType);
			}
		}catch (Exception e){
			logger.error("{} server failover failed.", zkNodeType.toString());
			logger.error("failover exception : " + e.getMessage(),e);
		}
		finally {
			releaseMutex(mutex);
		}
	}
L
ligang 已提交
242

243 244 245 246 247 248 249 250 251 252 253 254 255
	private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
	    if(StringUtils.isEmpty(serverHost)){
	    	return ;
		}
		switch (zkNodeType){
			case MASTER:
				failoverMaster(serverHost);
				break;
			case WORKER:
				failoverWorker(serverHost, true);
			default:
				break;
		}
L
ligang 已提交
256 257
	}

258
	private String getFailoverLockPath(ZKNodeType zkNodeType){
leon-baoliang's avatar
leon-baoliang 已提交
259

260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
		switch (zkNodeType){
			case MASTER:
				return getMasterFailoverLockPath();
			case WORKER:
				return getWorkerFailoverLockPath();
			default:
				return "";
		}
	}

	private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {

	    String serverType = zkNodeType.toString();
		for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) {
			alertDao.sendServerStopedAlert(1, serverHost, serverType);
		}
	}
leon-baoliang's avatar
leon-baoliang 已提交
277

L
ligang 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
	/**
	 *  monitor worker
	 */
	public void listenerWorker(){

		PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory);
		try {
			workerPc.start();
			workerPc.getListenable().addListener(new PathChildrenCacheListener() {
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
					switch (event.getType()) {
						case CHILD_ADDED:
							logger.info("node added : {}" ,event.getData().getPath());
							break;
						case CHILD_REMOVED:
							String path = event.getData().getPath();
							logger.info("node deleted : {}",event.getData().getPath());
296
							removeZKNodePath(path, ZKNodeType.WORKER, true);
L
ligang 已提交
297 298 299 300 301 302 303 304 305 306 307
							break;
						default:
							break;
					}
				}
			});
		}catch (Exception e){
			logger.error("listener worker failed : " + e.getMessage(),e);
		}
	}

308

L
ligang 已提交
309 310 311 312 313 314 315 316
	/**
	 *  get master znode
	 * @return
	 */
	public String getMasterZNode() {
		return masterZNode;
	}

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
	/**
	 * task needs failover if task start before worker starts
     *
	 * @param taskInstance
	 * @return
	 */
	private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {

		boolean taskNeedFailover = true;

		// if the worker node exists in zookeeper, we must check the task starts after the worker
	    if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
	        //if task start after worker starts, there is no need to failover the task.
         	if(checkTaskAfterWorkerStart(taskInstance)){
         	    taskNeedFailover = false;
			}
		}
		return taskNeedFailover;
	}

	/**
	 * check task start after the worker server starts.
	 * @param taskInstance
	 * @return
	 */
	private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
343 344 345
	    if(StringUtils.isEmpty(taskInstance.getHost())){
	    	return false;
		}
346
	    Date workerServerStartDate = null;
347 348 349 350 351 352
	    List<MasterServer> workerServers= getServersList(ZKNodeType.WORKER);
	    for(MasterServer server : workerServers){
	    	if(server.getHost().equals(taskInstance.getHost())){
	    	    workerServerStartDate = server.getCreateTime();
	    	    break;
			}
353 354 355 356 357 358 359 360 361
		}

		if(workerServerStartDate != null){
			return taskInstance.getStartTime().after(workerServerStartDate);
		}else{
			return false;
		}
	}

leon-baoliang's avatar
leon-baoliang 已提交
362 363 364 365
	/**
	 * failover worker tasks
	 * 1. kill yarn job if there are yarn jobs in tasks.
	 * 2. change task state from running to need failover.
366
     * 3. failover all tasks when workerHost is null
leon-baoliang's avatar
leon-baoliang 已提交
367 368
	 * @param workerHost
	 */
369
	private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
leon-baoliang's avatar
leon-baoliang 已提交
370 371 372 373
		logger.info("start worker[{}] failover ...", workerHost);

		List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
		for(TaskInstance taskInstance : needFailoverTaskInstanceList){
374 375 376 377 378 379
			if(needCheckWorkerAlive){
				if(!checkTaskInstanceNeedFailover(taskInstance)){
					continue;
                }
			}

leon-baoliang's avatar
leon-baoliang 已提交
380 381 382 383 384 385
			ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
			if(instance!=null){
				taskInstance.setProcessInstance(instance);
			}
			// only kill yarn job if exists , the local thread has exited
			ProcessUtils.killYarnJob(taskInstance);
386 387 388

			taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
			processDao.saveTaskInstance(taskInstance);
leon-baoliang's avatar
leon-baoliang 已提交
389 390 391 392 393 394 395 396
		}
		logger.info("end worker[{}] failover ...", workerHost);
	}

	/**
	 * failover master tasks
	 * @param masterHost
	 */
397
	private void failoverMaster(String masterHost) {
leon-baoliang's avatar
leon-baoliang 已提交
398 399 400 401 402 403 404 405 406 407 408
		logger.info("start master failover ...");

		List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);

		//updateProcessInstance host is null and insert into command
		for(ProcessInstance processInstance : needFailoverProcessInstanceList){
			processDao.processNeedFailoverProcessInstances(processInstance);
		}

		logger.info("master failover end");
	}
L
ligang 已提交
409 410

}