提交 64d538e1 编写于 作者: T Technoboy-

updates

上级 d6ea202e
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
package org.apache.dolphinscheduler.remote.entity;
import java.io.Serializable;
import java.util.Date;
......@@ -23,7 +23,7 @@ import java.util.Date;
/**
* master/worker task transport
*/
public class TaskInfo implements Serializable{
public class TaskExecutionContext implements Serializable{
/**
* task instance id
......@@ -229,7 +229,7 @@ public class TaskInfo implements Serializable{
@Override
public String toString() {
return "TaskInfo{" +
return "TaskExecutionContext{" +
"taskId=" + taskId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import java.util.Objects;
public class Host {
private String address;
private String ip;
private int port;
public Host() {
}
public Host(String ip, int port) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
this.address = ip + ":" + port;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
this.address = ip + ":" + port;
}
public static Host of(String address){
String[] parts = address.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException(String.format("Address : %s illegal.", address));
}
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
return host;
}
@Override
public String toString() {
return "Host{" +
"address='" + address + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Host host = (Host) o;
return Objects.equals(getAddress(), host.getAddress());
}
@Override
public int hashCode() {
return Objects.hash(getAddress());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
public interface HostManager {
Host select(TaskExecutionContext context);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@Service
public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
@Autowired
private RoundRobinSelector<Host> selector;
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
@Override
public Host select(TaskExecutionContext context){
Host host = new Host();
Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
return selector.select(candidateHosts);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
import java.util.Random;
public class RandomSelector<T> implements Selector<T> {
private final Random random = new Random();
@Override
public T select(final Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
if (source.size() == 1) {
return (T) source.toArray()[0];
}
int size = source.size();
int randomIndex = random.nextInt(size);
return (T) source.toArray()[randomIndex];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinSelector<T> implements Selector<T> {
private final AtomicInteger index = new AtomicInteger(0);
@Override
public T select(Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
if (source.size() == 1) {
return (T)source.toArray()[0];
}
int size = source.size();
return (T) source.toArray()[index.getAndIncrement() % size];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
public interface Selector<T> {
T select(Collection<T> source);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* master registry
*/
public class MasterRegistry {
private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
/**
* zookeeper registry center
*/
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* port
*/
private final int port;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
}
/**
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if(newState == ConnectionState.LOST){
logger.error("master : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("master : {} connection SUSPENDED ", address);
}
}
});
logger.info("master node : {} registry to ZK successfully.", address);
}
/**
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
logger.info("worker node : {} unRegistry to ZK.", address);
}
/**
* get worker path
* @return
*/
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
}
/**
* get local address
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
}
......@@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
......@@ -137,7 +137,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
try {
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
taskRequestCommand.convert2Command(), 2000);
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class);
......@@ -156,6 +156,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
/**
* set task instance relation
*
......@@ -203,25 +204,25 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance
* @return taskInfo
*/
private TaskInfo convertToTaskInfo(TaskInstance taskInstance){
TaskInfo taskInfo = new TaskInfo();
taskInfo.setTaskId(taskInstance.getId());
taskInfo.setTaskName(taskInstance.getName());
taskInfo.setStartTime(taskInstance.getStartTime());
taskInfo.setTaskType(taskInstance.getTaskType());
taskInfo.setExecutePath(getExecLocalPath(taskInstance));
taskInfo.setTaskJson(taskInstance.getTaskJson());
taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId());
taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
taskInfo.setQueue(taskInstance.getProcessInstance().getQueue());
taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId());
taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId());
return taskInfo;
private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance));
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId());
taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue());
taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId());
taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId());
return taskExecutionContext;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service
public abstract class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
private final Lock masterLock = new ReentrantLock();
private final Lock workerLock = new ReentrantLock();
private final Set<String> workerNodes = new HashSet<>();
private final Set<String> masterNodes = new HashSet<>();
@Autowired
private ZookeeperRegistryCenter registryCenter;
@Override
public void afterPropertiesSet() throws Exception {
load();
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
}
private void load(){
Set<String> schedulerNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(schedulerNodes);
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(workersNodes);
}
class WorkerNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(registryCenter.isWorkerPath(path)){
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("worker node : {} added.", path);
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("worker node : {} down.", path);
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
}
} catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage());
} catch (Exception ex) {
logger.error("WorkerListener capture data change and get data failed", ex);
}
}
}
}
class MasterNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if (registryCenter.isMasterPath(path)) {
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("master node : {} added.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("master node : {} down.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes);
}
} catch (Exception ex) {
logger.error("MasterNodeListener capture data change and get data failed.", ex);
}
}
}
}
public Set<String> getMasterNodes() {
masterLock.lock();
try {
return Collections.unmodifiableSet(masterNodes);
} finally {
masterLock.unlock();
}
}
private void syncMasterNodes(Set<String> nodes){
masterLock.lock();
try {
masterNodes.clear();
masterNodes.addAll(nodes);
} finally {
masterLock.unlock();
}
}
private void syncWorkerNodes(Set<String> nodes){
workerLock.lock();
try {
workerNodes.clear();
workerNodes.addAll(nodes);
} finally {
workerLock.unlock();
}
}
public Set<String> getWorkerNodes(){
workerLock.lock();
try {
return Collections.unmodifiableSet(workerNodes);
} finally {
workerLock.unlock();
}
}
public void close(){
registryCenter.close();
}
}
......@@ -22,6 +22,9 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
......@@ -76,6 +79,28 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return WORKER_PATH;
}
public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH);
return new HashSet<>(masters);
}
public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH);
return new HashSet<>(workers);
}
public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH);
}
public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH);
}
public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key);
}
public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator;
}
......
......@@ -19,18 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......@@ -40,7 +35,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
/**
......@@ -87,23 +81,23 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class);
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
// local execute path
String execLocalPath = getExecLocalPath(taskInfo);
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath);
try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode());
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskInfo.getTaskId(),
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
workerExecService.submit(new TaskScheduleThread(taskInfo,
workerExecService.submit(new TaskScheduleThread(taskExecutionContext,
processService, taskCallbackService));
}
......@@ -111,13 +105,13 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
/**
* get execute local path
*
* @param taskInfo taskInfo
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskInfo taskInfo){
return FileUtils.getProcessExecDir(taskInfo.getProjectId(),
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId());
private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId());
}
}
......@@ -71,7 +71,7 @@ public class WorkerRegistry {
}
}
});
logger.info("scheduler node : {} registry to ZK successfully.", address);
logger.info("worker node : {} registry to ZK successfully.", address);
}
/**
......
......@@ -31,11 +31,9 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
......@@ -63,7 +61,7 @@ public class TaskScheduleThread implements Runnable {
/**
* task instance
*/
private TaskInfo taskInfo;
private TaskExecutionContext taskExecutionContext;
/**
* process service
......@@ -83,67 +81,67 @@ public class TaskScheduleThread implements Runnable {
/**
* constructor
*
* @param taskInfo taskInfo
* @param taskExecutionContext taskExecutionContext
* @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService
*/
public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService;
this.taskInfo = taskInfo;
this.taskExecutionContext = taskExecutionContext;
this.taskInstanceCallbackService = taskInstanceCallbackService;
}
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType());
taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand);
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand);
logger.info("script path : {}", taskInfo.getExecutePath());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class);
TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
downloadResource(
taskInfo.getExecutePath(),
taskExecutionContext.getExecutePath(),
resourceFiles,
logger);
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInfo.getExecutePath(),
taskInfo.getScheduleTime(),
taskInfo.getTaskName(),
taskInfo.getTaskType(),
taskInfo.getTaskId(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(),
taskExecutionContext.getTaskId(),
CommonUtils.getSystemEnvPath(),
taskInfo.getTenantCode(),
taskInfo.getQueue(),
taskInfo.getStartTime(),
taskExecutionContext.getTenantCode(),
taskExecutionContext.getQueue(),
taskExecutionContext.getStartTime(),
getGlobalParamsMap(),
null,
CommandType.of(taskInfo.getCmdTypeIfComplement()));
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()));
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId()));
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId()));
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId()));
task = TaskManager.newTask(taskInfo.getTaskType(),
task = TaskManager.newTask(taskExecutionContext.getTaskType(),
taskProps,
taskLogger);
......@@ -159,14 +157,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand);
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand);
}
}
......@@ -178,7 +176,7 @@ public class TaskScheduleThread implements Runnable {
Map<String,String> globalParamsMap = new HashMap<>(16);
// global params string
String globalParamsStr = taskInfo.getGlobalParams();
String globalParamsStr = taskExecutionContext.getGlobalParams();
if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
......@@ -199,7 +197,7 @@ public class TaskScheduleThread implements Runnable {
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskInfo.getExecutePath());
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
......@@ -215,15 +213,15 @@ public class TaskScheduleThread implements Runnable {
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log";
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log";
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskId() + ".log";
}
/**
......@@ -329,7 +327,7 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInfo.getExecutorId();
int userId = taskExecutionContext.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public abstract class AbstractListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
}
protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
}
......@@ -20,6 +20,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -32,7 +33,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
TreeCache treeCache;
private TreeCache treeCache;
/**
* register a unified listener of /${dsRoot},
*/
......@@ -72,6 +73,10 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return treeCache;
}
public void addListener(TreeCacheListener listener){
this.treeCache.getListenable().addListener(listener);
}
@Override
public void close() {
treeCache.close();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册