diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java index a5b8176a489c66772d5578d938ee5fdf1412cfc4..7d612b8b1d306b6d4bb248f5b515e76a0933fb0e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java @@ -17,25 +17,34 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_LOG_ERROR; + import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.User; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; -import springfox.documentation.annotations.ApiIgnore; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; -import static org.apache.dolphinscheduler.api.enums.Status.*; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import springfox.documentation.annotations.ApiIgnore; /** @@ -70,7 +79,7 @@ public class LoggerController extends BaseController { @GetMapping(value = "/detail") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) - public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "taskInstanceId") int taskInstanceId, @RequestParam(value = "skipLineNum") int skipNum, @RequestParam(value = "limit") int limit) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java index 90d1afea49f3d1abab84973ad4c47d1af341894c..cd6ac2b622d4b1caf273b280bae4b4a7f7aa800b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java @@ -18,17 +18,18 @@ package org.apache.dolphinscheduler.api.exceptions; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Result; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestControllerAdvice; import org.springframework.web.method.HandlerMethod; /** * Exception Handler */ -@ControllerAdvice +@RestControllerAdvice @ResponseBody public class ApiExceptionHandler { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index 3c7b421d5e259a546e37ec00b9782c76568c34c8..14440ee61ee7dd00ea75e7f94943ebabc860ebf9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -16,117 +16,30 @@ */ package org.apache.dolphinscheduler.api.service; -import java.nio.charset.StandardCharsets; -import javax.annotation.PreDestroy; -import org.apache.commons.lang.ArrayUtils; -import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.log.LogClientService; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; /** * log service */ -@Service -public class LoggerService { +public interface LoggerService { + + /** + * view log + * + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + Result queryLog(int taskInstId, int skipLineNum, int limit); + + + /** + * get log size + * + * @param taskInstId task instance id + * @return log byte array + */ + byte[] getLogBytes(int taskInstId); - private static final Logger logger = LoggerFactory.getLogger(LoggerService.class); - - private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s"; - - @Autowired - private ProcessService processService; - - private final LogClientService logClient; - - public LoggerService() { - logClient = new LogClientService(); - } - - @PreDestroy - public void close() { - logClient.close(); - } - - /** - * view log - * - * @param taskInstId task instance id - * @param skipLineNum skip line number - * @param limit limit - * @return log string data - */ - public Result queryLog(int taskInstId, int skipLineNum, int limit) { - - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - - if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { - return Result.error(Status.TASK_INSTANCE_NOT_FOUND); - } - - String host = getHost(taskInstance.getHost()); - - Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - - logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), - Constants.RPC_PORT); - - StringBuilder log = new StringBuilder(); - if (skipLineNum == 0) { - String head = String.format(LOG_HEAD_FORMAT, - taskInstance.getLogPath(), - host, - Constants.SYSTEM_LINE_SEPARATOR); - log.append(head); - } - - log.append(logClient - .rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit)); - - result.setData(log); - return result; - } - - - /** - * get log size - * - * @param taskInstId task instance id - * @return log byte array - */ - public byte[] getLogBytes(int taskInstId) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { - throw new RuntimeException("task instance is null or host is null"); - } - String host = getHost(taskInstance.getHost()); - byte[] head = String.format(LOG_HEAD_FORMAT, - taskInstance.getLogPath(), - host, - Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); - return ArrayUtils.addAll(head, - logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath())); - } - - - /** - * get host - * - * @param address address - * @return old version return true ,otherwise return false - */ - private String getHost(String address) { - if (Host.isOldVersion(address)) { - return address; - } - return Host.of(address).getIp(); - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..c71f2980f52636c33947b16791ca1d69c95f9c3f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.LoggerService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.log.LogClientService; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.apache.commons.lang.ArrayUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * log service + */ +@Service +public class LoggerServiceImpl implements LoggerService { + + private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class); + + private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s"; + + @Autowired + private ProcessService processService; + + private LogClientService logClient; + + @PostConstruct + public void init() { + if (Objects.isNull(this.logClient)) { + this.logClient = new LogClientService(); + } + } + + @PreDestroy + public void close() { + if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) { + logClient.close(); + } + } + + /** + * view log + * + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + @SuppressWarnings("unchecked") + public Result queryLog(int taskInstId, int skipLineNum, int limit) { + + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + + if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { + return Result.error(Status.TASK_INSTANCE_NOT_FOUND); + } + + String host = getHost(taskInstance.getHost()); + + Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); + + logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), + Constants.RPC_PORT); + + StringBuilder log = new StringBuilder(); + if (skipLineNum == 0) { + String head = String.format(LOG_HEAD_FORMAT, + taskInstance.getLogPath(), + host, + Constants.SYSTEM_LINE_SEPARATOR); + log.append(head); + } + + log.append(logClient + .rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit)); + + result.setData(log.toString()); + return result; + } + + + /** + * get log size + * + * @param taskInstId task instance id + * @return log byte array + */ + public byte[] getLogBytes(int taskInstId) { + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { + throw new ServiceException("task instance is null or host is null"); + } + String host = getHost(taskInstance.getHost()); + byte[] head = String.format(LOG_HEAD_FORMAT, + taskInstance.getLogPath(), + host, + Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); + return ArrayUtils.addAll(head, + logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath())); + } + + + /** + * get host + * + * @param address address + * @return old version return true ,otherwise return false + */ + private String getHost(String address) { + if (Boolean.TRUE.equals(Host.isOldVersion(address))) { + return address; + } + return Host.of(address).getIp(); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 4e41ed39b010f272e992cc1770e2c08ea69f032b..3952a25542c07b9ad5bfea8d116168a63e14c928 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -17,10 +17,14 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -32,25 +36,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @RunWith(MockitoJUnitRunner.class) -@PrepareForTest({LoggerService.class}) +@PrepareForTest({LoggerServiceImpl.class}) public class LoggerServiceTest { private static final Logger logger = LoggerFactory.getLogger(LoggerServiceTest.class); @InjectMocks - private LoggerService loggerService; + private LoggerServiceImpl loggerService; @Mock private ProcessService processService; + @Before + public void init() { + this.loggerService.init(); + } + @Test - public void testQueryDataSourceList(){ + public void testQueryDataSourceList() { TaskInstance taskInstance = new TaskInstance(); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); - Result result = loggerService.queryLog(2,1,1); + Result result = loggerService.queryLog(2, 1, 1); //TASK_INSTANCE_NOT_FOUND - Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(),result.getCode().intValue()); + Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); try { //HOST NOT FOUND OR ILLEGAL @@ -59,36 +68,36 @@ public class LoggerServiceTest { Assert.assertTrue(true); logger.error("testQueryDataSourceList error {}", e.getMessage()); } - Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(),result.getCode().intValue()); + Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); //SUCCESS taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); - result = loggerService.queryLog(1,1,1); - Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); + result = loggerService.queryLog(1, 1, 1); + Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } @Test - public void testGetLogBytes(){ + public void testGetLogBytes() { TaskInstance taskInstance = new TaskInstance(); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); //task instance is null - try{ + try { loggerService.getLogBytes(2); - }catch (RuntimeException e){ + } catch (RuntimeException e) { Assert.assertTrue(true); - logger.error("testGetLogBytes error: {}","task instance is null"); + logger.error("testGetLogBytes error: {}", "task instance is null"); } //task instance host is null - try{ + try { loggerService.getLogBytes(1); - }catch (RuntimeException e){ + } catch (RuntimeException e) { Assert.assertTrue(true); - logger.error("testGetLogBytes error: {}","task instance host is null"); + logger.error("testGetLogBytes error: {}", "task instance host is null"); } //success @@ -100,4 +109,9 @@ public class LoggerServiceTest { } + @After + public void close() { + this.loggerService.close(); + } + } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 651964bb1657067237ddeb7e35a3ab2fb5ccc9aa..95ce1794c3ea71f25ff9396ae004318be0a027cd 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -16,16 +16,43 @@ */ package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.io.IOException; +import java.text.MessageFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,22 +60,13 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; -import java.io.IOException; -import java.text.MessageFormat; -import java.text.ParseException; -import java.util.*; - -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @RunWith(MockitoJUnitRunner.Silent.class) @SpringBootTest(classes = ApiApplicationServer.class) public class ProcessInstanceServiceTest { - private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceServiceTest.class); @InjectMocks ProcessInstanceService processInstanceService; @@ -78,9 +96,7 @@ public class ProcessInstanceServiceTest { TaskInstanceMapper taskInstanceMapper; @Mock - LoggerService loggerService; - - + LoggerServiceImpl loggerService; @Mock UsersService usersService; @@ -153,16 +169,16 @@ public class ProcessInstanceServiceTest { User loginUser = getAdminUser(); Map result = new HashMap<>(5); putMsg(result, Status.PROJECT_NOT_FOUNT, projectName); - int size=10; - String startTime="2020-01-01 00:00:00"; - String endTime="2020-08-02 00:00:00"; + int size = 10; + String startTime = "2020-01-01 00:00:00"; + String endTime = "2020-08-02 00:00:00"; Date start = DateUtils.getScheduleDate(startTime); Date end = DateUtils.getScheduleDate(endTime); //project auth fail when(projectMapper.queryByName(projectName)).thenReturn(null); when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result); - Map proejctAuthFailRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime); + Map proejctAuthFailRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser, projectName, size, startTime, endTime); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS)); //project auth success @@ -176,7 +192,7 @@ public class ProcessInstanceServiceTest { when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); - Map successRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime); + Map successRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser, projectName, size, startTime, endTime); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index 92d38d470a842f292444b6152fc51c989cda230b..474bf12c77474246ab91e8bc1b506d96c12254d9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -16,13 +16,20 @@ */ package org.apache.dolphinscheduler.service.log; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.log.*; +import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +45,10 @@ public class LogClientService { private final NettyRemotingClient client; + private volatile boolean isRunning; + /** - * request time out + * request time out */ private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L; @@ -50,18 +59,21 @@ public class LogClientService { this.clientConfig = new NettyClientConfig(); this.clientConfig.setWorkerThreads(4); this.client = new NettyRemotingClient(clientConfig); + this.isRunning = true; } /** * close */ - public void close() { + public void close() { this.client.close(); + this.isRunning = false; logger.info("logger client closed"); } /** * roll view log + * * @param host host * @param port port * @param path path @@ -69,7 +81,7 @@ public class LogClientService { * @param limit limit * @return log content */ - public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) { + public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) { logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); String result = ""; @@ -77,7 +89,7 @@ public class LogClientService { try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); - if(response != null){ + if (response != null) { RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize( response.getBody(), RollViewLogResponseCommand.class); return rollReviewLog.getMsg(); @@ -92,6 +104,7 @@ public class LogClientService { /** * view log + * * @param host host * @param port port * @param path path @@ -105,7 +118,7 @@ public class LogClientService { try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); - if(response != null){ + if (response != null) { ViewLogResponseCommand viewLog = JsonSerializer.deserialize( response.getBody(), ViewLogResponseCommand.class); return viewLog.getMsg(); @@ -120,6 +133,7 @@ public class LogClientService { /** * get log size + * * @param host host * @param port port * @param path log path @@ -133,7 +147,7 @@ public class LogClientService { try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); - if(response != null){ + if (response != null) { GetLogBytesResponseCommand getLog = JsonSerializer.deserialize( response.getBody(), GetLogBytesResponseCommand.class); return getLog.getData(); @@ -149,6 +163,7 @@ public class LogClientService { /** * remove task log + * * @param host host * @param port port * @param path path @@ -162,7 +177,7 @@ public class LogClientService { try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); - if(response != null){ + if (response != null) { RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize( response.getBody(), RemoveTaskLogResponseCommand.class); return taskLogResponse.getStatus(); @@ -174,4 +189,8 @@ public class LogClientService { } return result; } + + public boolean isRunning() { + return isRunning; + } } \ No newline at end of file