未验证 提交 29f1123d 编写于 作者: Q qiaozhanwei 提交者: GitHub

[Fix-3616][Server] when worker akc/response master exception , async retry (#3748)

上级 7486ad01
......@@ -58,7 +58,9 @@ jobs:
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
sudo dpkg -i google-chrome*.deb
sudo apt-get install -f -y
wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip
google-chrome -version
googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE)
wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip
unzip chromedriver_linux64.zip
sudo mv -f chromedriver /usr/local/share/chromedriver
sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver
......@@ -66,9 +68,7 @@ jobs:
run: cd ./e2e && mvn -B clean test
- name: Collect logs
if: failure()
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v2
with:
name: dslogs
path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data
path: ${{ github.workspace }}/docker/docker-swarm/dolphinscheduler-logs
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum Event {
ACK,
RESULT;
}
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG;}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* db task ack
*/
DB_TASK_ACK,
/**
* db task response
*/
DB_TASK_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* db task ack request command
*/
public class DBTaskAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public DBTaskAckCommand(int status,int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.DB_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "DBTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* db task final result response command
*/
public class DBTaskResponseCommand implements Serializable {
private int taskInstanceId;
private int status;
public DBTaskResponseCommand(int status,int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.DB_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "DBTaskResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
'}';
}
}
......@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
......@@ -34,11 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task ack processor
......@@ -57,16 +52,9 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
......@@ -92,19 +80,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
taskAckCommand.getTaskInstanceId(),
channel);
taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}
}
......@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
......@@ -33,11 +30,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task response processor
......@@ -56,15 +51,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
......@@ -83,25 +72,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
responseCommand.getTaskInstanceId(),
channel);
taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}
......
......@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
......@@ -76,7 +78,18 @@ public class TaskResponseEvent {
*/
private Event event;
public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){
/**
* channel
*/
private Channel channel;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
String workerAddress,
String executePath,
String logPath,
int taskInstanceId,
Channel channel){
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setStartTime(startTime);
......@@ -85,10 +98,16 @@ public class TaskResponseEvent {
event.setLogPath(logPath);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.ACK);
event.setChannel(channel);
return event;
}
public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){
public static TaskResponseEvent newResult(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstanceId,
Channel channel){
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
......@@ -96,6 +115,7 @@ public class TaskResponseEvent {
event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT);
event.setChannel(channel);
return event;
}
......@@ -179,8 +199,11 @@ public class TaskResponseEvent {
this.event = event;
}
public enum Event{
ACK,
RESULT;
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
}
......@@ -17,7 +17,13 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -121,23 +127,48 @@ public class TaskResponseService {
* @param taskResponseEvent taskResponseEvent
*/
private void persist(TaskResponseEvent taskResponseEvent){
TaskResponseEvent.Event event = taskResponseEvent.getEvent();
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();
switch (event){
case ACK:
processService.changeTaskState(taskResponseEvent.getState(),
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null){
processService.changeTaskState(taskResponseEvent.getState(),
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}catch (Exception e){
logger.error("worker ack master error",e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);
channel.writeAndFlush(taskAckCommand.convert2Command());
}
break;
case RESULT:
processService.changeTaskState(taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId());
try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null){
processService.changeTaskState(taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
}catch (Exception e){
logger.error("worker response master error",e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
......
......@@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -70,6 +73,9 @@ public class WorkerServer {
@Autowired
private SpringApplicationContext springApplicationContext;
@Autowired
private RetryReportTaskStatusThread retryReportTaskStatusThread;
/**
* worker server startup
*
......@@ -95,11 +101,16 @@ public class WorkerServer {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
// worker registry
this.workerRegistry.registry();
// retry report task status
this.retryReportTaskStatusThread.start();
/**
* register hooks, which are called before the process exits
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Responce Cache : cache worker send master result
*/
public class ResponceCache {
private static final ResponceCache instance = new ResponceCache();
private ResponceCache(){}
public static ResponceCache get(){
return instance;
}
private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
/**
* cache response
* @param taskInstanceId taskInstanceId
* @param command command
* @param event event ACK/RESULT
*/
public void cache(Integer taskInstanceId, Command command, Event event){
switch (event){
case ACK:
ackCache.put(taskInstanceId,command);
break;
case RESULT:
responseCache.put(taskInstanceId,command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
}
/**
* remove ack cache
* @param taskInstanceId taskInstanceId
*/
public void removeAckCache(Integer taskInstanceId){
ackCache.remove(taskInstanceId);
}
/**
* remove reponse cache
* @param taskInstanceId taskInstanceId
*/
public void removeResponseCache(Integer taskInstanceId){
responseCache.remove(taskInstanceId);
}
/**
* getAckCache
* @return getAckCache
*/
public Map<Integer,Command> getAckCache(){
return ackCache;
}
/**
* getResponseCache
* @return getResponseCache
*/
public Map<Integer,Command> getResponseCache(){
return responseCache;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* db task ack processor
*/
public class DBTaskAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
command.getBody(), DBTaskAckCommand.class);
if (taskAckCommand == null){
return;
}
if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* db task response processor
*/
public class DBTaskResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
String.format("invalid command type : %s", command.getType()));
DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
command.getBody(), DBTaskResponseCommand.class);
if (taskResponseCommand == null){
return;
}
if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
}
}
}
......@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Random;
/**
* callback channel
*/
......@@ -50,6 +52,12 @@ public class NettyRemoteChannel {
this.opaque = opaque;
}
public NettyRemoteChannel(Channel channel) {
this.channel = channel;
this.host = ChannelUtils.toAddress(channel);
this.opaque = -1;
}
public Channel getChannel() {
return channel;
}
......
......@@ -86,20 +86,19 @@ public class TaskCallbackService {
* @return callback channel
*/
private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if(nettyRemoteChannel == null){
throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
}
if(nettyRemoteChannel.isActive()){
return nettyRemoteChannel;
}
Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
if(nettyRemoteChannel != null){
if(nettyRemoteChannel.isActive()){
return nettyRemoteChannel;
}
newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
}
}
logger.warn("original master : {} for task : {} is not reachable, random select master",
nettyRemoteChannel.getHost(),
taskInstanceId);
Set<String> masterNodes = null;
int ntries = 0;
while (Stopper.isRunning()) {
......@@ -119,7 +118,7 @@ public class TaskCallbackService {
for (String masterNode : masterNodes) {
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
if (newChannel != null) {
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
return getRemoteChannel(newChannel,taskInstanceId);
}
}
masterNodes = null;
......@@ -141,6 +140,12 @@ public class TaskCallbackService {
return remoteChannel;
}
private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
}
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
......
......@@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
......@@ -36,6 +37,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
......@@ -101,12 +103,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
try {
this.doAck(taskExecutionContext);
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
this.doAck(taskExecutionContext);
}
this.doAck(taskExecutionContext);
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
......@@ -115,6 +112,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Retry Report Task Status Thread
*/
@Component
public class RetryReportTaskStatusThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
/**
* every 5 minutes
*/
private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public void start(){
Thread thread = new Thread(this,"RetryReportTaskStatusThread");
thread.start();
}
public RetryReportTaskStatusThread(){
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
}
/**
* retry ack/response
*/
@Override
public void run() {
ResponceCache responceCache = ResponceCache.get();
while (Stopper.isRunning()){
try {
if (!responceCache.getAckCache().isEmpty()){
Map<Integer,Command> ackCache = responceCache.getAckCache();
for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command ackCommand = entry.getValue();
taskCallbackService.sendAck(taskInstanceId,ackCommand);
}
}
if (!responceCache.getResponseCache().isEmpty()){
Map<Integer,Command> responseCache = responceCache.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.sendAck(taskInstanceId,responseCommand);
}
}
}catch (Exception e){
logger.warn("retry report task status error", e);
}
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME);
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
......@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
......@@ -144,13 +146,10 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
try {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
}
......
......@@ -44,7 +44,7 @@ public class TaskResponseServiceTest {
@Test
public void testAdd(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
"", "", "", 1);
"", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
try {
......@@ -58,7 +58,7 @@ public class TaskResponseServiceTest {
@Test
public void testStop(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
"", "", "", 1);
"", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
taskResponseService.stop();
Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册