未验证 提交 9bf67d80 编写于 作者: _和's avatar _和 提交者: GitHub

Revise annotation spelling errors & Enhanced code robustness (#3042)

* revise annotation spelling errors & enhanced code robustness

* revise annotation spelling errors & enhanced code robustness
Co-authored-by: Nsunchaohe <sunzhaohe@linklogis.com>
Co-authored-by: Ndailidong <dailidong66@gmail.com>
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>
上级 8213da50
...@@ -115,7 +115,7 @@ public class CheckUtils { ...@@ -115,7 +115,7 @@ public class CheckUtils {
* *
* @param parameter parameter * @param parameter parameter
* @param taskType task type * @param taskType task type
* @return true if taks node parameters are valid, otherwise return false * @return true if task node parameters are valid, otherwise return false
*/ */
public static boolean checkTaskNodeParameters(String parameter, String taskType) { public static boolean checkTaskNodeParameters(String parameter, String taskType) {
AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter); AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter);
......
...@@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{ ...@@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{
/** /**
* taks name * task name
*/ */
private String taskName; private String taskName;
......
...@@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{ ...@@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{
/** /**
* taks name * task name
*/ */
private String taskName; private String taskName;
......
...@@ -34,7 +34,7 @@ public class NettyRemoteChannel { ...@@ -34,7 +34,7 @@ public class NettyRemoteChannel {
private final Channel channel; private final Channel channel;
/** /**
* equest unique identification * request unique identification
*/ */
private final long opaque; private final long opaque;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
...@@ -33,14 +32,12 @@ import org.slf4j.Logger; ...@@ -33,14 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* taks callback service * task callback service
*/ */
@Service @Service
public class TaskCallbackService { public class TaskCallbackService {
......
...@@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/** /**
* thread executor service * thread executor service
*/ */
...@@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("received command : {}", taskRequestCommand); logger.info("received command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext(); if(taskRequestCommand == null){
logger.error("task execute request command is null");
return;
}
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){
logger.error("task execution context is null");
return;
}
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// local execute path // local execute path
...@@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// tell master that task is in executing // tell master that task is in executing
final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
try { try {
RetryerUtils.retryCall(() -> { RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
......
...@@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor; ...@@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
...@@ -52,10 +55,23 @@ import java.util.Date; ...@@ -52,10 +55,23 @@ import java.util.Date;
* test task call back service * test task call back service
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, @ContextConfiguration(classes={
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, TaskCallbackServiceTestConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, SpringZKServer.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) SpringApplicationContext.class,
MasterRegistry.class,
WorkerRegistry.class,
ZookeeperRegistryCenter.class,
MasterConfig.class,
WorkerConfig.class,
ZookeeperCachedOperator.class,
ZookeeperConfig.class,
ZookeeperNodeManager.class,
TaskCallbackService.class,
TaskResponseService.class,
TaskAckProcessor.class,
TaskResponseProcessor.class,
TaskExecuteProcessor.class})
public class TaskCallbackServiceTest { public class TaskCallbackServiceTest {
@Autowired @Autowired
...@@ -70,6 +86,9 @@ public class TaskCallbackServiceTest { ...@@ -70,6 +86,9 @@ public class TaskCallbackServiceTest {
@Autowired @Autowired
private TaskResponseProcessor taskResponseProcessor; private TaskResponseProcessor taskResponseProcessor;
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
/** /**
* send ack test * send ack test
* @throws Exception * @throws Exception
...@@ -176,6 +195,35 @@ public class TaskCallbackServiceTest { ...@@ -176,6 +195,35 @@ public class TaskCallbackServiceTest {
} }
} }
@Test
public void testTaskExecuteProcessor() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand();
nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext()));
nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
// @Test(expected = IllegalStateException.class) // @Test(expected = IllegalStateException.class)
// public void testSendAckWithIllegalStateException2(){ // public void testSendAckWithIllegalStateException2(){
// masterRegistry.registry(); // masterRegistry.registry();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册