未验证 提交 bc1495f2 编写于 作者: W Wenjun Ruan 提交者: GitHub

Refactor log (#148)

上级 434b43a2
......@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.RollViewLogResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.utils.Result;
......@@ -75,10 +76,10 @@ public class LoggerController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
public Result<RollViewLogResponse> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}
......@@ -128,12 +129,12 @@ public class LoggerController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<RollViewLogResponse> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit));
return Result.success(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit));
}
/**
......
package org.apache.dolphinscheduler.api.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RollViewLogResponse {
/**
* Current log message
*/
private String log;
/**
* Current log line number
*/
private long currentLogLineNumber;
/**
* False means there are no extra log.
*/
private boolean hasNext;
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.RollViewLogResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
......@@ -35,7 +36,7 @@ public interface LoggerService {
* @param limit limit
* @return log string data
*/
Result<String> queryLog(int taskInstId, int skipLineNum, int limit);
Result<RollViewLogResponse> queryLog(int taskInstId, int skipLineNum, int limit);
/**
* get log size
......@@ -55,7 +56,7 @@ public interface LoggerService {
* @param limit limit
* @return log string data
*/
Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);
RollViewLogResponse queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);
/**
* get log bytes
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import com.google.common.primitives.Bytes;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.RollViewLogResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService;
......@@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -73,14 +75,14 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
/**
* view log
*
* @param taskInstId task instance id
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
public Result<RollViewLogResponse> queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
......@@ -90,10 +92,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
if (StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance, skipLineNum, limit);
result.setData(log);
return result;
RollViewLogResponse rollViewLogResponse = queryLog(taskInstance, skipLineNum, limit);
return Result.success(rollViewLogResponse);
}
/**
......@@ -123,28 +123,24 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
*/
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
public RollViewLogResponse queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
throw new ServiceException((Status) result.get(Constants.STATUS));
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND);
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
return result;
throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
}
String log = queryLog(task, skipLineNum, limit);
result.put(Constants.DATA_LIST, log);
return result;
return queryLog(task, skipLineNum, limit);
}
/**
......@@ -179,30 +175,34 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
/**
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
private RollViewLogResponse queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
Host host = Host.of(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
host.getPort());
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
String head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR);
String head = String.format(LOG_HEAD_FORMAT, taskInstance.getLogPath(), host, Constants.SYSTEM_LINE_SEPARATOR);
log.append(head);
}
log.append(logClient
.rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));
return log.toString();
RollViewLogResponseCommand rollViewLogResponseCommand = logClient.rollViewLog(host, taskInstance.getLogPath(), skipLineNum, limit);
if (rollViewLogResponseCommand.getResponseStatus() != RollViewLogResponseCommand.Status.SUCCESS) {
log.append(rollViewLogResponseCommand.getResponseStatus().getDesc());
return RollViewLogResponse.builder()
.log(log.toString())
.hasNext(false)
.build();
}
log.append(rollViewLogResponseCommand.getLog());
// If the task doesn't finish or the log doesn't end can query next
return RollViewLogResponse.builder()
.log(log.toString())
.currentLogLineNumber(rollViewLogResponseCommand.getCurrentLineNumber())
.hasNext(!taskInstance.getState().typeIsFinished()
|| rollViewLogResponseCommand.getCurrentLineNumber() < rollViewLogResponseCommand.getCurrentTotalLineNumber())
.build();
}
/**
......
......@@ -21,6 +21,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.RollViewLogResponse;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
......@@ -369,11 +370,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
for (TaskInstance taskInstance : taskInstanceList) {
if (TASK_TYPE_DEPENDENT.equalsIgnoreCase(taskInstance.getTaskType())) {
Result<String> logResult = loggerService.queryLog(
taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
Result<RollViewLogResponse> logResult = loggerService.queryLog(taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
if (logResult.getCode() == Status.SUCCESS.ordinal()) {
String log = logResult.getData();
Map<String, DependResult> resultMap = parseLogForDependentResult(log);
RollViewLogResponse response = logResult.getData();
Map<String, DependResult> resultMap = parseLogForDependentResult(response.getLog());
taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap));
}
}
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.RollViewLogResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
......@@ -154,8 +155,8 @@ public class LoggerServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
RollViewLogResponse rollViewLogResponse = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assert.assertNotNull(rollViewLogResponse);
}
@Test
......
......@@ -47,7 +47,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -95,22 +94,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
case ROLL_VIEW_LOG_REQUEST:
RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject(
command.getBody(), RollViewLogRequestCommand.class);
String rollViewLogPath = rollViewLogRequest.getPath();
if (!checkPathSecurity(rollViewLogPath)) {
throw new IllegalArgumentException("Illegal path");
}
List<String> lines = readPartFileContent(rollViewLogPath,
rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
StringBuilder builder = new StringBuilder();
for (String line : lines) {
builder.append(line).append("\r\n");
}
RollViewLogResponseCommand rollViewLogRequestResponse =
new RollViewLogResponseCommand(builder.toString());
RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject(command.getBody(), RollViewLogRequestCommand.class);
// todo: solve the NPE, this shouldn't happen in normal case
RollViewLogResponseCommand rollViewLogRequestResponse = readPartFileContent(rollViewLogRequest);
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
case REMOVE_TAK_LOG_REQUEST:
......@@ -193,28 +179,34 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
return new byte[0];
}
/**
* read part file content,can skip any line and read some lines
*
* @param filePath file path
* @param skipLine skip line
* @param limit read lines limit
* @return part file content
*/
private List<String> readPartFileContent(String filePath,
int skipLine,
int limit) {
File file = new File(filePath);
if (file.exists() && file.isFile()) {
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
} catch (IOException e) {
logger.error("read file error", e);
}
} else {
logger.info("file path: {} not exists", filePath);
private RollViewLogResponseCommand readPartFileContent(RollViewLogRequestCommand rollViewLogRequest) {
String rollViewLogPath = rollViewLogRequest.getPath();
if (!checkPathSecurity(rollViewLogPath)) {
logger.error("Log file path: {} is not a security path", rollViewLogPath);
return RollViewLogResponseCommand.error(RollViewLogResponseCommand.Status.LOG_PATH_IS_NOT_SECURITY);
}
File file = new File(rollViewLogPath);
if (!file.exists() || file.isFile()) {
logger.error("Log file path: {} doesn't exists", rollViewLogPath);
return RollViewLogResponseCommand.error(RollViewLogResponseCommand.Status.LOG_FILE_NOT_FOUND);
}
int skipLine = rollViewLogRequest.getSkipLineNum();
int limit = rollViewLogRequest.getLimit();
try (Stream<String> stream = Files.lines(Paths.get(rollViewLogPath))) {
List<String> lines = stream.skip(skipLine).limit(limit).collect(Collectors.toList());
long totalLineNumber = stream.count();
return RollViewLogResponseCommand.builder()
.currentLineNumber(skipLine + lines.size())
.currentTotalLineNumber(totalLineNumber)
.log(String.join("\r\n", lines))
.build();
} catch (IOException e) {
logger.error("Rolling view log error, meet an unknown exception, request: {}", rollViewLogRequest, e);
return RollViewLogResponseCommand.error(RollViewLogResponseCommand.Status.UNKNOWN_ERROR);
}
return Collections.emptyList();
}
}
......@@ -17,15 +17,20 @@
package org.apache.dolphinscheduler.remote.command.log;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
/**
* roll view log request command
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RollViewLogRequestCommand implements Serializable {
/**
......@@ -43,39 +48,6 @@ public class RollViewLogRequestCommand implements Serializable {
*/
private int limit;
public RollViewLogRequestCommand() {
}
public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
this.path = path;
this.skipLineNum = skipLineNum;
this.limit = limit;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public int getSkipLineNum() {
return skipLineNum;
}
public void setSkipLineNum(int skipLineNum) {
this.skipLineNum = skipLineNum;
}
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
/**
* package request command
*
......
......@@ -17,43 +17,38 @@
package org.apache.dolphinscheduler.remote.command.log;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
/**
* roll view log response command
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RollViewLogResponseCommand implements Serializable {
/**
* response data
*/
private String msg;
private String log;
public RollViewLogResponseCommand() {
}
@Builder.Default
private Status responseStatus = Status.SUCCESS;
public RollViewLogResponseCommand(String msg) {
this.msg = msg;
}
private long currentLineNumber;
private long currentTotalLineNumber;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
public static RollViewLogResponseCommand error(Status status) {
RollViewLogResponseCommand rollViewLogResponseCommand = new RollViewLogResponseCommand();
rollViewLogResponseCommand.setResponseStatus(status);
return rollViewLogResponseCommand;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque) {
Command command = new Command(opaque);
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
......@@ -61,4 +56,22 @@ public class RollViewLogResponseCommand implements Serializable {
command.setBody(body);
return command;
}
public enum Status {
SUCCESS("success"),
LOG_PATH_IS_NOT_SECURITY("Log file path is not at a security directory"),
LOG_FILE_NOT_FOUND("Log file doesn't exist"),
UNKNOWN_ERROR("Meet an unknown exception"),
;
private final String desc;
Status(String desc) {
this.desc = desc;
}
public String getDesc() {
return desc;
}
}
}
......@@ -63,31 +63,24 @@ public class LogClient implements AutoCloseable {
/**
* roll view log
*
* @param host host
* @param port port
* @param path path
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @param limit limit
* @return log content
*/
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
skipLineNum, limit);
public RollViewLogResponseCommand rollViewLog(Host host, String path, int skipLineNum, int limit) {
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
Command response = client.sendSync(host, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RollViewLogResponseCommand rollReviewLog =
JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
return JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
}
return "Roll view log response is null";
logger.error("Roll view log response is null, request: {}", request);
return RollViewLogResponseCommand.error(RollViewLogResponseCommand.Status.UNKNOWN_ERROR);
} catch (Exception e) {
logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port,
path, skipLineNum, limit, e);
return "Roll view log error: " + e.getMessage();
logger.error("Roll view log failed, meet an unknown exception: {}", request, e);
return RollViewLogResponseCommand.error(RollViewLogResponseCommand.Status.UNKNOWN_ERROR);
}
}
......
......@@ -27,9 +27,6 @@ import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseComma
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.nio.charset.StandardCharsets;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Test.None;
......@@ -39,6 +36,8 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.charset.StandardCharsets;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
public class LogClientTest {
......@@ -97,13 +96,16 @@ public class LogClientTest {
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonByteArray(new RollViewLogResponseCommand("success")));
RollViewLogResponseCommand rollViewLogResponseCommand = RollViewLogResponseCommand.builder()
.log("success")
.build();
command.setBody(JSONUtils.toJsonByteArray(rollViewLogResponseCommand));
PowerMockito
.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClient logClient = new LogClient();
String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
RollViewLogResponseCommand msg = logClient.rollViewLog(Host.of("localhost:1234"), "/tmp/log", 0, 10);
Assert.assertNotNull(msg);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册