From 3a28a71b007a26e7613d2f23094b8b4cbfe111ff Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 22 Dec 2020 15:32:41 +0800 Subject: [PATCH] [Improvement] Refactor code to support distributed tracing (#4270) * Refactor code to support tracing * Extension network protocol, support context and version * Extension master asynchronous queue support context * Extract scan task method from MasterSchedulerService for tracing * fix * fix * add test case * fix * fix Co-authored-by: hailin0 --- .../remote/codec/NettyDecoder.java | 33 +++++++ .../remote/codec/NettyEncoder.java | 7 ++ .../remote/command/Command.java | 14 +++ .../remote/command/CommandContext.java | 56 +++++++++++ .../remote/command/CommandHeader.java | 26 +++++ .../consumer/TaskPriorityQueueConsumer.java | 20 ++-- .../runner/MasterBaseTaskExecThread.java | 37 +++---- .../master/runner/MasterSchedulerService.java | 77 ++++++++------- .../TaskPriorityQueueConsumerTest.java | 27 ++++-- .../service/queue}/TaskPriority.java | 97 ++++++++++++------- .../service/queue/TaskPriorityQueueImpl.java | 46 +-------- .../src/test/java/queue/TaskPriorityTest.java | 83 ++++++++++++++++ .../test/java/queue/TaskUpdateQueueTest.java | 20 ++-- 13 files changed, 374 insertions(+), 169 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity => dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue}/TaskPriority.java (56%) create mode 100644 dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java index 179ae1bef..343e8c63d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandContext; import org.apache.dolphinscheduler.remote.command.CommandHeader; import org.apache.dolphinscheduler.remote.command.CommandType; import org.slf4j.Logger; @@ -54,16 +55,34 @@ public class NettyDecoder extends ReplayingDecoder { switch (state()){ case MAGIC: checkMagic(in.readByte()); + checkpoint(State.VERSION); + // fallthru + case VERSION: + checkVersion(in.readByte()); checkpoint(State.COMMAND); + // fallthru case COMMAND: commandHeader.setType(in.readByte()); checkpoint(State.OPAQUE); + // fallthru case OPAQUE: commandHeader.setOpaque(in.readLong()); + checkpoint(State.CONTEXT_LENGTH); + // fallthru + case CONTEXT_LENGTH: + commandHeader.setContextLength(in.readInt()); + checkpoint(State.CONTEXT); + // fallthru + case CONTEXT: + byte[] context = new byte[commandHeader.getContextLength()]; + in.readBytes(context); + commandHeader.setContext(context); checkpoint(State.BODY_LENGTH); + // fallthru case BODY_LENGTH: commandHeader.setBodyLength(in.readInt()); checkpoint(State.BODY); + // fallthru case BODY: byte[] body = new byte[commandHeader.getBodyLength()]; in.readBytes(body); @@ -71,6 +90,7 @@ public class NettyDecoder extends ReplayingDecoder { Command packet = new Command(); packet.setType(commandType(commandHeader.getType())); packet.setOpaque(commandHeader.getOpaque()); + packet.setContext(CommandContext.valueOf(commandHeader.getContext())); packet.setBody(body); out.add(packet); // @@ -105,10 +125,23 @@ public class NettyDecoder extends ReplayingDecoder { } } + /** + * check version + * @param version + */ + private void checkVersion(byte version) { + if (version != Command.VERSION) { + throw new IllegalArgumentException("illegal protocol [version]" + version); + } + } + enum State{ MAGIC, + VERSION, COMMAND, OPAQUE, + CONTEXT_LENGTH, + CONTEXT, BODY_LENGTH, BODY; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java index 4e9836a26..785ee5aaf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java @@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder { throw new Exception("encode msg is null"); } out.writeByte(Command.MAGIC); + out.writeByte(Command.VERSION); out.writeByte(msg.getType().ordinal()); out.writeLong(msg.getOpaque()); + writeContext(msg, out); out.writeInt(msg.getBody().length); out.writeBytes(msg.getBody()); } + private void writeContext(Command msg, ByteBuf out) { + byte[] headerBytes = msg.getContext().toBytes(); + out.writeInt(headerBytes.length); + out.writeBytes(headerBytes); + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java index ed46e1ff5..9baa321a9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java @@ -28,6 +28,7 @@ public class Command implements Serializable { private static final AtomicLong REQUEST_ID = new AtomicLong(1); public static final byte MAGIC = (byte) 0xbabe; + public static final byte VERSION = 0; public Command(){ this.opaque = REQUEST_ID.getAndIncrement(); @@ -47,6 +48,11 @@ public class Command implements Serializable { */ private long opaque; + /** + * request context + */ + private CommandContext context = new CommandContext(); + /** * data body */ @@ -76,6 +82,14 @@ public class Command implements Serializable { this.body = body; } + public CommandContext getContext() { + return context; + } + + public void setContext(CommandContext context) { + this.context = context; + } + @Override public int hashCode() { final int prime = 31; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java new file mode 100644 index 000000000..c9febee6f --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * command context + */ +public class CommandContext implements Serializable { + + private Map items = new LinkedHashMap<>(); + + public Map getItems() { + return items; + } + + public void setItems(Map items) { + this.items = items; + } + + public void put(String key, String value) { + items.put(key, value); + } + + public String get(String key) { + return items.get(key); + } + + public byte[] toBytes() { + return JSONUtils.toJsonByteArray(this); + } + + public static CommandContext valueOf(byte[] src) { + return JSONUtils.parseObject(src, CommandContext.class); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java index 78948a5c0..9e83a426f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java @@ -33,6 +33,16 @@ public class CommandHeader implements Serializable { */ private long opaque; + /** + * context length + */ + private int contextLength; + + /** + * context + */ + private byte[] context; + /** * body length */ @@ -61,4 +71,20 @@ public class CommandHeader implements Serializable { public void setOpaque(long opaque) { this.opaque = opaque; } + + public int getContextLength() { + return contextLength; + } + + public void setContextLength(int contextLength) { + this.contextLength = contextLength; + } + + public byte[] getContext() { + return context; + } + + public void setContext(byte[] context) { + this.context = context; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index a407e4ea9..23255084e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -50,13 +50,13 @@ import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskPriority; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import java.util.ArrayList; @@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread { * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService @@ -119,7 +119,7 @@ public class TaskPriorityQueueConsumer extends Thread { @Override public void run() { - List failedDispatchTasks = new ArrayList<>(); + List failedDispatchTasks = new ArrayList<>(); while (Stopper.isRunning()) { try { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); @@ -130,15 +130,14 @@ public class TaskPriorityQueueConsumer extends Thread { continue; } // if not task , blocking here - String taskPriorityInfo = taskPriorityQueue.take(); - TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); - boolean dispatchResult = dispatch(taskPriority.getTaskId()); + TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { - failedDispatchTasks.add(taskPriorityInfo); + failedDispatchTasks.add(taskPriority); } } if (!failedDispatchTasks.isEmpty()) { - for (String dispatchFailedTask : failedDispatchTasks) { + for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); } // If there are tasks in a cycle that cannot find the worker group, @@ -157,12 +156,13 @@ public class TaskPriorityQueueConsumer extends Thread { /** * dispatch task * - * @param taskInstanceId taskInstanceId + * @param taskPriority taskPriority * @return result */ - protected boolean dispatch(int taskInstanceId) { + protected boolean dispatch(TaskPriority taskPriority) { boolean result = false; try { + int taskInstanceId = taskPriority.getTaskId(); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f5c3708af..fcff67f15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,8 +16,6 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -27,17 +25,15 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import java.util.concurrent.Callable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; import java.util.Date; import java.util.concurrent.Callable; @@ -217,14 +213,14 @@ public class MasterBaseTaskExecThread implements Callable { logger.info("task ready to submit: {}", taskInstance); /** - * taskPriorityInfo + * taskPriority */ - String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(), + TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); - taskUpdateQueue.put(taskPriorityInfo); + taskUpdateQueue.put(taskPriority); logger.info(String.format("master submit success, task : %s", taskInstance.getName()) ); return true; }catch (Exception e){ @@ -235,29 +231,22 @@ public class MasterBaseTaskExecThread implements Callable { } /** - * buildTaskPriorityInfo + * buildTaskPriority * * @param processInstancePriority processInstancePriority * @param processInstanceId processInstanceId * @param taskInstancePriority taskInstancePriority * @param taskInstanceId taskInstanceId * @param workerGroup workerGroup - * @return TaskPriorityInfo + * @return TaskPriority */ - private String buildTaskPriorityInfo(int processInstancePriority, - int processInstanceId, - int taskInstancePriority, - int taskInstanceId, - String workerGroup) { - return processInstancePriority + - UNDERLINE + - processInstanceId + - UNDERLINE + - taskInstancePriority + - UNDERLINE + - taskInstanceId + - UNDERLINE + - workerGroup; + private TaskPriority buildTaskPriority(int processInstancePriority, + int processInstanceId, + int taskInstancePriority, + int taskInstanceId, + String workerGroup) { + return new TaskPriority(processInstancePriority, processInstanceId, + taskInstancePriority, taskInstanceId, workerGroup); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 30dd0f9f1..b0e0528c3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -124,52 +124,57 @@ public class MasterSchedulerService extends Thread { public void run() { logger.info("master scheduler started"); while (Stopper.isRunning()){ - InterProcessMutex mutex = null; try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); - if(!runCheckFlag) { + if (!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { + scheduleProcess(); + } + } catch (Exception e) { + logger.error("master scheduler thread error", e); + } + } + } - mutex = zkMasterClient.blockAcquireMutex(); - - int activeCount = masterExecService.getActiveCount(); - // make sure to scan and delete command table in one transaction - Command command = processService.findOneCommand(); - if (command != null) { - logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); - - try{ - - ProcessInstance processInstance = processService.handleCommand(logger, - getLocalAddress(), - this.masterConfig.getMasterExecThreads() - activeCount, command); - if (processInstance != null) { - logger.info("start master exec thread , split DAG ..."); - masterExecService.execute( - new MasterExecThread( - processInstance - , processService - , nettyRemotingClient - , alertManager - , masterConfig)); - } - }catch (Exception e){ - logger.error("scan command error ", e); - processService.moveToErrorCommand(command, e.toString()); - } - } else{ - //indicate that no command ,sleep for 1s - Thread.sleep(Constants.SLEEP_TIME_MILLIS); + private void scheduleProcess() throws Exception { + InterProcessMutex mutex = null; + try { + mutex = zkMasterClient.blockAcquireMutex(); + + int activeCount = masterExecService.getActiveCount(); + // make sure to scan and delete command table in one transaction + Command command = processService.findOneCommand(); + if (command != null) { + logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); + + try { + + ProcessInstance processInstance = processService.handleCommand(logger, + getLocalAddress(), + this.masterConfig.getMasterExecThreads() - activeCount, command); + if (processInstance != null) { + logger.info("start master exec thread , split DAG ..."); + masterExecService.execute( + new MasterExecThread( + processInstance + , processService + , nettyRemotingClient + , alertManager + , masterConfig)); } + } catch (Exception e) { + logger.error("scan command error ", e); + processService.moveToErrorCommand(command, e.toString()); } - } catch (Exception e){ - logger.error("master scheduler thread error",e); - } finally{ - zkMasterClient.releaseMutex(mutex); + } else { + //indicate that no command ,sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } + } finally { + zkMasterClient.releaseMutex(mutex); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 049e30e73..8c2321dd8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; @@ -72,7 +73,7 @@ public class TaskPriorityQueueConsumerTest { @Autowired - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueue; @Autowired private TaskPriorityQueueConsumer taskPriorityQueueConsumer; @@ -142,9 +143,8 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -180,7 +180,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(1); @@ -243,7 +244,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(80); @@ -310,7 +312,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(1); @@ -402,7 +405,8 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -455,7 +459,9 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - boolean res = taskPriorityQueueConsumer.dispatch(1); + TaskPriority taskPriority = new TaskPriority(); + taskPriority.setTaskId(1); + boolean res = taskPriorityQueueConsumer.dispatch(taskPriority); Assert.assertFalse(res); } @@ -649,7 +655,8 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + taskPriorityQueue.put(taskPriority); taskPriorityQueueConsumer.run(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java similarity index 56% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java index 991eeed49..a872f6db9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.service.queue; -import static org.apache.dolphinscheduler.common.Constants.*; +import java.util.Map; +import java.util.Objects; /** * task priority info */ -public class TaskPriority { +public class TaskPriority implements Comparable { /** * processInstancePriority @@ -50,9 +51,9 @@ public class TaskPriority { private String groupName; /** - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName} + * context */ - private String taskPriorityInfo; + private Map context; public TaskPriority(){} @@ -65,15 +66,6 @@ public class TaskPriority { this.taskInstancePriority = taskInstancePriority; this.taskId = taskId; this.groupName = groupName; - this.taskPriorityInfo = this.processInstancePriority + - UNDERLINE + - this.processInstanceId + - UNDERLINE + - this.taskInstancePriority + - UNDERLINE + - this.taskId + - UNDERLINE + - this.groupName; } public int getProcessInstancePriority() { @@ -104,6 +96,10 @@ public class TaskPriority { return taskId; } + public Map getContext() { + return context; + } + public void setTaskId(int taskId) { this.taskId = taskId; } @@ -116,32 +112,61 @@ public class TaskPriority { this.groupName = groupName; } - public String getTaskPriorityInfo() { - return taskPriorityInfo; + public void setContext(Map context) { + this.context = context; } - public void setTaskPriorityInfo(String taskPriorityInfo) { - this.taskPriorityInfo = taskPriorityInfo; - } + @Override + public int compareTo(TaskPriority other) { + if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) { + return 1; + } + if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) { + return -1; + } - /** - * taskPriorityInfo convert taskPriority - * - * @param taskPriorityInfo taskPriorityInfo - * @return TaskPriority - */ - public static TaskPriority of(String taskPriorityInfo){ - String[] parts = taskPriorityInfo.split(UNDERLINE); + if (this.getProcessInstanceId() > other.getProcessInstanceId()) { + return 1; + } + if (this.getProcessInstanceId() < other.getProcessInstanceId()) { + return -1; + } + + if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) { + return 1; + } + if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) { + return -1; + } + + if (this.getTaskId() > other.getTaskId()) { + return 1; + } + if (this.getTaskId() < other.getTaskId()) { + return -1; + } - if (parts.length != 5) { - throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo)); + return this.getGroupName().compareTo(other.getGroupName()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; } - TaskPriority taskPriority = new TaskPriority( - Integer.parseInt(parts[0]), - Integer.parseInt(parts[1]), - Integer.parseInt(parts[2]), - Integer.parseInt(parts[3]), - parts[4]); - return taskPriority; + TaskPriority that = (TaskPriority) o; + return processInstancePriority == that.processInstancePriority + && processInstanceId == that.processInstanceId + && taskInstancePriority == that.taskInstancePriority + && taskId == that.taskId + && Objects.equals(groupName, that.groupName); + } + + @Override + public int hashCode() { + return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index aefad8499..694d4c476 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -17,24 +17,18 @@ package org.apache.dolphinscheduler.service.queue; -import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; - import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; -import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; import org.springframework.stereotype.Service; - - /** * A singleton of a task queue implemented with zookeeper * tasks queue implementation */ @Service -public class TaskPriorityQueueImpl implements TaskPriorityQueue { +public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue size */ @@ -43,7 +37,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue */ - private PriorityBlockingQueue queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private PriorityBlockingQueue queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE); /** * put task takePriorityInfo @@ -52,7 +46,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * @throws TaskPriorityQueueException */ @Override - public void put(String taskPriorityInfo) throws TaskPriorityQueueException { + public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException { queue.put(taskPriorityInfo); } @@ -63,7 +57,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * @throws TaskPriorityQueueException */ @Override - public String take() throws TaskPriorityQueueException, InterruptedException { + public TaskPriority take() throws TaskPriorityQueueException, InterruptedException { return queue.take(); } @@ -77,36 +71,4 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { public int size() throws TaskPriorityQueueException { return queue.size(); } - - /** - * TaskInfoComparator - */ - private class TaskInfoComparator implements Comparator { - - /** - * compare o1 o2 - * - * @param o1 o1 - * @param o2 o2 - * @return compare result - */ - @Override - public int compare(String o1, String o2) { - String s1 = o1; - String s2 = o2; - String[] s1Array = s1.split(UNDERLINE); - if (s1Array.length > TASK_INFO_LENGTH) { - // warning: if this length > 5, need to be changed - s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE)); - } - - String[] s2Array = s2.split(UNDERLINE); - if (s2Array.length > TASK_INFO_LENGTH) { - // warning: if this length > 5, need to be changed - s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE)); - } - - return s1.compareTo(s2); - } - } } diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java new file mode 100644 index 000000000..151177016 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue; + +import org.apache.dolphinscheduler.service.queue.TaskPriority; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +public class TaskPriorityTest { + + @Test + public void testSort() { + TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default"); + TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default"); + TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default"); + List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 1, 0, 0, "default"); + priorityTwo = new TaskPriority(0, 2, 0, 0, "default"); + priorityThree = new TaskPriority(0, 3, 0, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 1, 0, "default"); + priorityTwo = new TaskPriority(0, 0, 2, 0, "default"); + priorityThree = new TaskPriority(0, 0, 3, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 1, "default"); + priorityTwo = new TaskPriority(0, 0, 0, 2, "default"); + priorityThree = new TaskPriority(0, 0, 0, 3, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, "default_3"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + } +} diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java index ca6c083a6..2c13afa22 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java @@ -17,6 +17,7 @@ package queue; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.junit.Test; @@ -31,19 +32,16 @@ public class TaskUpdateQueueTest { @Test public void testQueue() throws Exception{ - // ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName} - /** * 1_1_2_1_default * 1_1_2_2_default * 1_1_0_3_default * 1_1_0_4_default */ - - String taskInfo1 = "1_1_2_1_default"; - String taskInfo2 = "1_1_2_2_default"; - String taskInfo3 = "1_1_0_3_default"; - String taskInfo4 = "1_1_0_4_default"; + TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default"); + TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default"); + TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default"); + TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default"); TaskPriorityQueue queue = new TaskPriorityQueueImpl(); queue.put(taskInfo1); @@ -51,9 +49,9 @@ public class TaskUpdateQueueTest { queue.put(taskInfo3); queue.put(taskInfo4); - assertEquals("1_1_0_3_default", queue.take()); - assertEquals("1_1_0_4_default", queue.take()); - assertEquals("1_1_2_1_default",queue.take()); - assertEquals("1_1_2_2_default",queue.take()); + assertEquals(taskInfo3, queue.take()); + assertEquals(taskInfo4, queue.take()); + assertEquals(taskInfo1, queue.take()); + assertEquals(taskInfo2, queue.take()); } } -- GitLab