未验证 提交 513eb769 编写于 作者: L luoyuan 提交者: GitHub

[Improvement-5147][Service]This judgment always returns true (#5156)

* this expression which always evaluates "true"

* delete unused variables

* LogClientService add AutoCloseable

* reformat code

* reformat code

* reformat code

* add log in ProcessService

* reformat code

* Optimize the code

* Optimize the code

* [improvement]modify all methods that refer to LogClientService in the code.

* delete unused code
上级 fe144e46
......@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
......@@ -283,7 +282,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
// send warn mail
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return true;
......
......@@ -442,17 +442,11 @@ public class ProcessUtils {
public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null;
String log;
try {
logClient = new LogClientService();
try (LogClientService logClient = new LogClientService()) {
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
Constants.RPC_PORT,
taskExecutionContext.getLogPath());
} finally {
if (logClient != null) {
logClient.close();
}
}
if (StringUtils.isNotEmpty(log)) {
List<String> appIds = LoggerUtils.getAppIds(log, logger);
......
......@@ -97,7 +97,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Pair<Boolean, List<String>> result = doKill(killCommand);
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
new NettyRemoteChannel(channel, command.getOpaque()));
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
......@@ -107,7 +107,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* do kill
*
* @param killCommand
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
......@@ -148,7 +147,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* build TaskKillResponseCommand
*
* @param killCommand kill command
* @param result exe result
* @param result exe result
* @return build TaskKillResponseCommand
*/
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
......@@ -168,16 +167,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* kill yarn job
*
* @param host host
* @param logPath logPath
* @param host host
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @return Pair<Boolean, List<String>> yarn kill result
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
*/
private Pair<Boolean, List<String>> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
LogClientService logClient = null;
try {
logClient = new LogClientService();
try (LogClientService logClient = new LogClientService();) {
logger.info("view log host : {},logPath : {}", host, logPath);
String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
List<String> appIds = Collections.emptyList();
......@@ -194,10 +191,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return Pair.of(true, appIds);
} catch (Exception e) {
logger.error("kill yarn job error", e);
} finally {
if (logClient != null) {
logClient.close();
}
}
return Pair.of(false, Collections.emptyList());
}
......
......@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* log client
*/
public class LogClientService {
public class LogClientService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
......@@ -67,6 +67,7 @@ public class LogClientService {
/**
* close
*/
@Override
public void close() {
this.client.close();
this.isRunning = false;
......
......@@ -384,17 +384,11 @@ public class ProcessService {
* @param processInstanceId processInstanceId
*/
public void removeTaskLogFile(Integer processInstanceId) {
LogClientService logClient = null;
try {
logClient = new LogClientService();
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
try (LogClientService logClient = new LogClientService()) {
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (StringUtils.isEmpty(taskInstance.getHost())) {
......@@ -408,14 +402,9 @@ public class ProcessService {
// compatible old version
ip = taskInstance.getHost();
}
// remove task log from loggerserver
logClient.removeTaskLog(ip, port, taskLogPath);
}
} finally {
if (logClient != null) {
logClient.close();
}
}
}
......@@ -435,7 +424,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
* @param ids ids
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
......@@ -466,7 +455,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
......@@ -518,7 +507,7 @@ public class ProcessService {
/**
* get schedule time from command
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return date
*/
......@@ -534,8 +523,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
* @param command command
* @param cmdParam cmdParam map
* @param command command
* @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
......@@ -569,7 +558,7 @@ public class ProcessService {
processInstance.setConnects(processDefinition.getConnects());
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition,cmdParam);
setGlobalParamIfCommanded(processDefinition, cmdParam);
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
......@@ -621,7 +610,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
* @param userId userId
* @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
......@@ -644,7 +633,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
......@@ -664,7 +653,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
* @param host host
* @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host) {
......@@ -827,7 +816,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
* @param command command
* @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
......@@ -842,8 +831,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
* @param processInstance processInstance
* @param cmdParam cmdParam
* @param processInstance processInstance
* @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
......@@ -915,7 +904,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams
* @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
......@@ -985,7 +974,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
......@@ -1014,7 +1003,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
......@@ -1040,7 +1029,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
* @param task task
* @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
......@@ -1067,7 +1056,7 @@ public class ProcessService {
/**
* complement data needs transform parent parameter to child.
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,Map<String,String> fatherParams) {
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, Map<String, String> fatherParams) {
// set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
......@@ -1111,13 +1100,13 @@ public class ProcessService {
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
Map<String,String> fatherParams = new HashMap<>();
Map<String, String> fatherParams = new HashMap<>();
if (CollectionUtils.isNotEmpty(allParam)) {
for (Property info : allParam) {
fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
}
}
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance,fatherParams);
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
return new Command(
commandType,
......@@ -1163,7 +1152,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId
* @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
......@@ -1177,7 +1166,7 @@ public class ProcessService {
/**
* submit task to mysql
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
......@@ -1231,7 +1220,7 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
......@@ -1407,7 +1396,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
......@@ -1462,7 +1451,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
......@@ -1484,7 +1473,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
......@@ -1516,12 +1505,12 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param startTime startTime
* @param host host
* @param state state
* @param startTime startTime
* @param host host
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
* @param logPath logPath
* @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
......@@ -1549,12 +1538,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String processJson,
......@@ -1575,10 +1564,10 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param endTime endTime
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
......@@ -1746,7 +1735,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
......@@ -1783,7 +1772,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
* @param resName resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
......@@ -1821,9 +1810,9 @@ public class ProcessService {
/**
* get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId
* @param masterId masterId
* @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
......@@ -1836,8 +1825,8 @@ public class ProcessService {
/**
* get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId
* @param ids ids
* @param masterId masterId
* @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
......@@ -1932,8 +1921,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionId process definition id
* @param startTime start time
* @param endTime end time
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
......@@ -2033,7 +2022,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
* @param userId user id
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
......@@ -2124,8 +2113,6 @@ public class ProcessService {
/**
* solve the branch rename bug
*
* @param processData
* @param oldJson
* @return String
*/
public String changeJson(ProcessData processData, String oldJson) {
......@@ -2180,6 +2167,7 @@ public class ProcessService {
/**
* add authorized resources
*
* @param ownResources own resources
* @param userId userId
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册