未验证 提交 707e3a6e 编写于 作者: A aaronlinv 提交者: GitHub

[Improvement][Logger] Logger server integrate into master and worker (#6894)

上级 73993e98
......@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
......@@ -192,19 +191,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
return getLogBytes(task);
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
/**
* query log
*
......@@ -214,11 +200,10 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
Host host = Host.of(taskInstance.getHost());
String host = getHost(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
PropertyUtils.getInt(Constants.RPC_PORT, 50051));
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
host.getPort());
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
......@@ -230,7 +215,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
}
log.append(logClient
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));
.rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));
return log.toString();
}
......@@ -242,12 +227,12 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
* @return log byte array
*/
private byte[] getLogBytes(TaskInstance taskInstance) {
String host = getHost(taskInstance.getHost());
Host host = Host.of(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
return Bytes.concat(head,
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
logClient.getLogBytes(host.getIp(), host.getPort(), taskInstance.getLogPath()));
}
}
......@@ -91,5 +91,4 @@ sudo.enable=true
development.state=false
# rpc port
rpc.port=50051
alert.rpc.port=50052
\ No newline at end of file
......@@ -49,41 +49,4 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>dolphinscheduler-logger-server</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>logger-server</finalName>
<descriptors>
<descriptor>src/main/assembly/dolphinscheduler-log-server.xml</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>docker</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.0 http://maven.apache.org/xsd/assembly-2.1.0.xsd">
<id>dolphinscheduler-logger-server</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<baseDirectory>logger-server</baseDirectory>
<fileSets>
<fileSet>
<directory>${basedir}/src/main/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>bin</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>${basedir}/../../dolphinscheduler-common/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>libs</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
if [[ "$DOCKER" == "true" ]]; then
JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport"
fi
java $JAVA_OPTS \
-cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \
org.apache.dolphinscheduler.server.log.LoggerServer
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-jre-slim-buster
ENV DOCKER true
ENV TZ Asia/Shanghai
ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler
RUN apt update ; \
apt install -y curl ; \
rm -rf /var/lib/apt/lists/*
WORKDIR $DOLPHINSCHEDULER_HOME
ADD ./target/logger-server $DOLPHINSCHEDULER_HOME
EXPOSE 50051
CMD [ "/bin/bash", "./bin/start.sh" ]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* logger server
*/
public class LoggerServer {
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
/**
* netty server
*/
private final NettyRemotingServer server;
/**
* netty server config
*/
private final NettyServerConfig serverConfig;
/**
* loggger request processor
*/
private final LoggerRequestProcessor requestProcessor;
public LoggerServer() {
this.serverConfig = new NettyServerConfig();
this.serverConfig.setListenPort(PropertyUtils.getInt(Constants.RPC_PORT, 50051));
this.server = new NettyRemotingServer(serverConfig);
this.requestProcessor = new LoggerRequestProcessor();
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**
* main launches the server from the command line.
* @param args arguments
*/
public static void main(String[] args) {
final LoggerServer server = new LoggerServer();
server.start();
}
/**
* server start
*/
public void start() {
this.server.start();
logger.info("logger server started, listening on port : {}", PropertyUtils.getInt(Constants.RPC_PORT, 50051));
Runtime.getRuntime().addShutdownHook(new Thread(LoggerServer.this::stop));
}
/**
* stop
*/
public void stop() {
this.server.close();
logger.info("logger server shut down");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class LoggerServerTest {
private LoggerServer loggerServer;
private LogClientService logClientService;
@Before
public void startServerAndClient() {
this.loggerServer = new LoggerServer();
this.loggerServer.start();
this.logClientService = new LogClientService();
}
@Test
public void testRollViewLog() throws IOException {
String expectedTmpDemoString = "testRolloViewLog";
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/demo.txt"), expectedTmpDemoString, Charset.defaultCharset());
String resultTmpDemoString = this.logClientService.rollViewLog(
"localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051), "/tmp/demo.txt", 0, 1000);
Assert.assertEquals(expectedTmpDemoString, resultTmpDemoString.replaceAll("[\r|\n|\t]", StringUtils.EMPTY));
FileUtils.deleteFile("/tmp/demo.txt");
}
@Test
public void testRemoveTaskLog() throws IOException {
String expectedTmpRemoveString = "testRemoveTaskLog";
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/remove.txt"), expectedTmpRemoveString, Charset.defaultCharset());
Boolean b = this.logClientService.removeTaskLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/remove.txt");
Assert.assertTrue(b);
String result = this.logClientService.viewLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/demo.txt");
Assert.assertEquals(StringUtils.EMPTY, result);
}
@After
public void stopServerAndClient() {
this.loggerServer.stop();
this.logClientService.close();
}
}
......@@ -82,6 +82,10 @@
<artifactId>dolphinscheduler-worker</artifactId>
<scope>test</scope> <!-- master should never depend on worker, this is only for tests -->
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
......@@ -115,6 +116,14 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
// logger server
LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// self tolerant
......
......@@ -187,9 +187,8 @@ public class ProcessUtils {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
String log;
try (LogClientService logClient = new LogClientService()) {
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
PropertyUtils.getInt(Constants.RPC_PORT, 50051),
taskExecutionContext.getLogPath());
Host host = Host.of(taskExecutionContext.getHost());
log = logClient.viewLog(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
}
if (!StringUtils.isEmpty(log)) {
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
......
......@@ -56,7 +56,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateEx
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
......@@ -523,16 +522,9 @@ public class ProcessService {
if (StringUtils.isEmpty(taskInstance.getHost())) {
continue;
}
int port = PropertyUtils.getInt(Constants.RPC_PORT, 50051);
String ip = "";
try {
ip = Host.of(taskInstance.getHost()).getIp();
} catch (Exception e) {
// compatible old version
ip = taskInstance.getHost();
}
Host host = Host.of(taskInstance.getHost());
// remove task log from loggerserver
logClient.removeTaskLog(ip, port, taskLogPath);
logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
}
}
}
......
......@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler;
import org.apache.dolphinscheduler.server.log.LoggerServer;
import org.apache.curator.test.TestingServer;
import org.springframework.boot.SpringApplication;
......@@ -29,7 +27,6 @@ public class StandaloneServer {
public static void main(String[] args) throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.zookeeper.connect-string", server.getConnectString());
new LoggerServer().start();
SpringApplication.run(StandaloneServer.class, args);
}
}
......@@ -126,6 +126,10 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
......@@ -125,6 +126,14 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
// logger server
LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// worker registry
......
......@@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
......@@ -138,7 +136,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.error("kill task error", e);
}
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
......@@ -179,10 +177,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
*/
private Pair<Boolean, List<String>> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
private Pair<Boolean, List<String>> killYarnJob(Host host, String logPath, String executePath, String tenantCode) {
try (LogClientService logClient = new LogClientService();) {
logger.info("view log host : {},logPath : {}", host, logPath);
String log = logClient.viewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), logPath);
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), logPath,
host.getPort());
String log = logClient.viewLog(host.getIp(), host.getPort(), logPath);
List<String> appIds = Collections.emptyList();
if (!StringUtils.isEmpty(log)) {
appIds = LoggerUtils.getAppIds(log, logger);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册