提交 9a70e779 编写于 作者: Q qiaozhanwei 提交者: GitHub

1,remove dolphinscheduler-rpc module 2,add dolphinscheduler-remote module...

1,remove dolphinscheduler-rpc module  2,add dolphinscheduler-remote module 3,add dolphinscheduler-service module 4,refactor LoggerServer module (#1925)

* 1,remove dolphinscheduler-rpc module
2,add dolphinscheduler-remote module
3,add dolphinscheduler-service module
4,refactor LoggerServer module

* ProcessUtils modify
上级 b77dddfe
...@@ -129,13 +129,13 @@ ...@@ -129,13 +129,13 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.github.xiaoymin</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>swagger-bootstrap-ui</artifactId> <artifactId>dolphinscheduler-service</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>com.github.xiaoymin</groupId>
<artifactId>dolphinscheduler-rpc</artifactId> <artifactId>swagger-bootstrap-ui</artifactId>
</dependency> </dependency>
<dependency> <dependency>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.log;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import org.apache.dolphinscheduler.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* log client
*/
public class LogClient {
private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
private final ManagedChannel channel;
private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
/**
* construct client connecting to HelloWorld server at {@code host:port}
*
* @param host host
* @param port port
*/
public LogClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext(true));
}
/**
* construct client for accessing RouteGuide server using the existing channel
*
*/
LogClient(ManagedChannelBuilder<?> channelBuilder) {
/**
* set max read size
*/
channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
channel = channelBuilder.build();
blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
}
/**
* shutdown
*
* @throws InterruptedException InterruptedException
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* roll view log
*
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
public String rollViewLog(String path,int skipLineNum,int limit) {
logger.info("roll view log : path {},skipLineNum {} ,limit {}", path, skipLineNum, limit);
LogParameter pathParameter = LogParameter
.newBuilder()
.setPath(path)
.setSkipLineNum(skipLineNum)
.setLimit(limit)
.build();
RetStrInfo retStrInfo;
try {
retStrInfo = blockingStub.rollViewLog(pathParameter);
return retStrInfo.getMsg();
} catch (StatusRuntimeException e) {
logger.error("roll view log error", e);
return null;
}
}
/**
* view log
*
* @param path path
* @return log content
*/
public String viewLog(String path) {
logger.info("view log path {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetStrInfo retStrInfo;
try {
retStrInfo = blockingStub.viewLog(pathParameter);
return retStrInfo.getMsg();
} catch (StatusRuntimeException e) {
logger.error("view log error", e);
return null;
}
}
/**
* get log size
*
* @param path log path
* @return log content bytes
*/
public byte[] getLogBytes(String path) {
logger.info("log path {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetByteInfo retByteInfo;
try {
retByteInfo = blockingStub.getLogBytes(pathParameter);
return retByteInfo.getData().toByteArray();
} catch (StatusRuntimeException e) {
logger.error("log size error", e);
return null;
}
}
}
\ No newline at end of file
...@@ -17,12 +17,12 @@ ...@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.log.LogClient;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -65,7 +65,7 @@ public class LoggerService { ...@@ -65,7 +65,7 @@ public class LoggerService {
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClient logClient = new LogClient(host, Constants.RPC_PORT); LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log); result.setData(log);
logger.info(log); logger.info(log);
...@@ -85,7 +85,7 @@ public class LoggerService { ...@@ -85,7 +85,7 @@ public class LoggerService {
throw new RuntimeException("task instance is null"); throw new RuntimeException("task instance is null");
} }
String host = taskInstance.getHost(); String host = taskInstance.getHost();
LogClient logClient = new LogClient(host, Constants.RPC_PORT); LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath()); return logClient.getLogBytes(taskInstance.getLogPath());
} }
} }
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-remote</artifactId>
<name>dolphinscheduler-remote</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* remoting netty client
*/
public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
private final Bootstrap bootstrap = new Bootstrap();
private final NettyEncoder encoder = new NettyEncoder();
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final NioEventLoopGroup workerGroup;
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
private final NettyClientConfig clientConfig;
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
this.start();
}
private void start(){
this.bootstrap
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyDecoder(),
clientHandler,
encoder);
}
});
//
isStarted.compareAndSet(false, true);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
registerProcessor(commandType, processor, null);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
public void send(final Address address, final Command command) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
logger.info("sent command {} to {}", command, address);
} else{
logger.error("send command {} to {} failed, error {}", command, address, future.cause());
}
}
});
} catch (Exception ex) {
String msg = String.format("send command %s to address %s encounter error", command, address);
throw new RemotingException(msg, ex);
}
}
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
return channel;
}
return createChannel(address, true);
}
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap){
future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
}
if(isSync){
future.sync();
}
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(address, channel);
return channel;
}
} catch (Exception ex) {
logger.info("connect to {} error {}", address, ex);
}
return null;
}
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
closeChannels();
if(workerGroup != null){
this.workerGroup.shutdownGracefully();
}
if(defaultExecutor != null){
defaultExecutor.shutdown();
}
} catch (Exception ex) {
logger.error("netty client close exception", ex);
}
logger.info("netty client closed");
}
}
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
}
this.channels.clear();
}
public void removeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
channel.close();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* remoting netty server
*/
public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final NettyEncoder encoder = new NettyEncoder();
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
private final NioEventLoopGroup bossGroup;
private final NioEventLoopGroup workGroup;
private final NettyServerConfig serverConfig;
private final NettyServerHandler serverHandler = new NettyServerHandler(this);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
public NettyRemotingServer(final NettyServerConfig serverConfig){
this.serverConfig = serverConfig;
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
public void start(){
if(this.isStarted.get()){
return;
}
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
} else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
//
isStarted.compareAndSet(false, true);
}
private void initNettyChannel(NioSocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder());
pipeline.addLast("handler", serverHandler);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor);
}
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
if(bossGroup != null){
this.bossGroup.shutdownGracefully();
}
if(workGroup != null){
this.workGroup.shutdownGracefully();
}
if(defaultExecutor != null){
defaultExecutor.shutdown();
}
} catch (Exception ex) {
logger.error("netty server close exception", ex);
}
logger.info("netty server closed");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandHeader;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.util.List;
/**
* netty decoder
*/
public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
public NettyDecoder(){
super(State.MAGIC);
}
private final CommandHeader commandHeader = new CommandHeader();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()){
case MAGIC:
checkMagic(in.readByte());
checkpoint(State.COMMAND);
case COMMAND:
commandHeader.setType(in.readByte());
checkpoint(State.OPAQUE);
case OPAQUE:
commandHeader.setOpaque(in.readLong());
checkpoint(State.BODY_LENGTH);
case BODY_LENGTH:
commandHeader.setBodyLength(in.readInt());
checkpoint(State.BODY);
case BODY:
byte[] body = new byte[commandHeader.getBodyLength()];
in.readBytes(body);
//
Command packet = new Command();
packet.setType(commandType(commandHeader.getType()));
packet.setOpaque(commandHeader.getOpaque());
packet.setBody(body);
out.add(packet);
//
checkpoint(State.MAGIC);
}
}
private CommandType commandType(byte type){
for(CommandType ct : CommandType.values()){
if(ct.ordinal() == type){
return ct;
}
}
return null;
}
private void checkMagic(byte magic) {
if (magic != Command.MAGIC) {
throw new IllegalArgumentException("illegal packet [magic]" + magic);
}
}
enum State{
MAGIC,
COMMAND,
OPAQUE,
BODY_LENGTH,
BODY;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
/**
* netty encoder
*/
@Sharable
public class NettyEncoder extends MessageToByteEncoder<Command> {
@Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("encode msg is null");
}
out.writeByte(Command.MAGIC);
out.writeByte(msg.getType().ordinal());
out.writeLong(msg.getOpaque());
out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
/**
* receive task log request command and content fill
* for netty data serializable transfer
*/
public class Command implements Serializable {
private static final long serialVersionUID = 1L;
public static final byte MAGIC = (byte) 0xbabe;
public Command(){
}
public Command(long opaque){
this.opaque = opaque;
}
/**
* comman type
*/
private CommandType type;
/**
* request unique identification
*/
private long opaque;
private byte[] body;
public CommandType getType() {
return type;
}
public void setType(CommandType type) {
this.type = type;
}
public long getOpaque() {
return opaque;
}
public void setOpaque(long opaque) {
this.opaque = opaque;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (opaque ^ (opaque >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Command other = (Command) obj;
return opaque == other.opaque;
}
@Override
public String toString() {
return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]";
}
}
...@@ -6,96 +6,50 @@ ...@@ -6,96 +6,50 @@
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*
*/
syntax = "proto3";
package schduler;
option java_multiple_files = true;
option java_package = "org.apache.dolphinscheduler.rpc";
option java_outer_classname = "SchdulerProto";
/**
* return str info
*/ */
message RetStrInfo { package org.apache.dolphinscheduler.remote.command;
/**
* str msg info
*/
string msg = 1 ;
}
/** import java.io.Serializable;
* return byte info
*/
message RetByteInfo {
/**
* byte data info
*/
bytes data = 1;
}
/** /**
* log parameter * command header
*/ */
message LogParameter { public class CommandHeader implements Serializable {
/** private byte type;
* path
*/
string path = 1 ;
/** private long opaque;
* skip line num
*/
int32 skipLineNum = 2 ;
/** private int bodyLength;
* display limt num
*/
int32 limit = 3 ;
}
public int getBodyLength() {
return bodyLength;
}
/** public void setBodyLength(int bodyLength) {
* path parameter this.bodyLength = bodyLength;
*/ }
message PathParameter {
/**
* path
*/
string path = 1 ;
}
/** public byte getType() {
* log view service return type;
*/ }
service LogViewService {
/** public void setType(byte type) {
* roll view log this.type = type;
*/ }
rpc rollViewLog(LogParameter) returns (RetStrInfo) {};
/** public long getOpaque() {
* view all log return opaque;
*/ }
rpc viewLog(PathParameter) returns (RetStrInfo) {};
/** public void setOpaque(long opaque) {
* get log bytes this.opaque = opaque;
*/ }
rpc getLogBytes(PathParameter) returns (RetByteInfo) {};
} }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, PING, PONG;}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.List;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class Ping implements Serializable {
private static final AtomicLong ID = new AtomicLong(1);
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PING_BUF;
static {
ByteBuf ping = Unpooled.buffer();
ping.writeByte(Command.MAGIC);
ping.writeByte(CommandType.PING.ordinal());
ping.writeLong(0);
ping.writeInt(0);
ping.writeBytes(EMPTY_BODY);
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
public static ByteBuf pingContent(){
return PING_BUF.duplicate();
}
public static Command create(){
Command command = new Command(ID.getAndIncrement());
command.setType(CommandType.PING);
command.setBody(EMPTY_BODY_ARRAY);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.Serializable;
public class Pong implements Serializable {
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PONG_BUF;
static {
ByteBuf ping = Unpooled.buffer();
ping.writeByte(Command.MAGIC);
ping.writeByte(CommandType.PONG.ordinal());
ping.writeLong(0);
ping.writeInt(0);
ping.writeBytes(EMPTY_BODY);
PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
public static ByteBuf pingContent(){
return PONG_BUF.duplicate();
}
public static Command create(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.PONG);
command.setBody(EMPTY_BODY_ARRAY);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* get log bytes request command
*/
public class GetLogBytesRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
public GetLogBytesRequestCommand() {
}
public GetLogBytesRequestCommand(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
/**
*
* @return
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* get log bytes response command
*/
public class GetLogBytesResponseCommand implements Serializable {
private byte[] data;
public GetLogBytesResponseCommand() {
}
public GetLogBytesResponseCommand(byte[] data) {
this.data = data;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* roll view log request command
*/
public class RollViewLogRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
private int skipLineNum;
private int limit;
public RollViewLogRequestCommand() {
}
public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
this.path = path;
this.skipLineNum = skipLineNum;
this.limit = limit;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public int getSkipLineNum() {
return skipLineNum;
}
public void setSkipLineNum(int skipLineNum) {
this.skipLineNum = skipLineNum;
}
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* roll view log response command
*/
public class RollViewLogResponseCommand implements Serializable {
private String msg;
public RollViewLogResponseCommand() {
}
public RollViewLogResponseCommand(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* view log request command
*/
public class ViewLogRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
public ViewLogRequestCommand() {
}
public ViewLogRequestCommand(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* view log response command
*/
public class ViewLogResponseCommand implements Serializable {
private String msg;
public ViewLogResponseCommand() {
}
public ViewLogResponseCommand(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.config;
import org.apache.dolphinscheduler.remote.utils.Constants;
/**
* netty client config
*/
public class NettyClientConfig {
private int workerThreads = Constants.CPUS;
private boolean tcpNoDelay = true;
private boolean soKeepalive = true;
private int sendBufferSize = 65535;
private int receiveBufferSize = 65535;
public int getWorkerThreads() {
return workerThreads;
}
public void setWorkerThreads(int workerThreads) {
this.workerThreads = workerThreads;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public boolean isSoKeepalive() {
return soKeepalive;
}
public void setSoKeepalive(boolean soKeepalive) {
this.soKeepalive = soKeepalive;
}
public int getSendBufferSize() {
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
public int getReceiveBufferSize() {
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.config;
import org.apache.dolphinscheduler.remote.utils.Constants;
/**
* netty server config
*/
public class NettyServerConfig {
private int soBacklog = 1024;
private boolean tcpNoDelay = true;
private boolean soKeepalive = true;
private int sendBufferSize = 65535;
private int receiveBufferSize = 65535;
private int workerThread = Constants.CPUS;
private int listenPort = 12346;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getSoBacklog() {
return soBacklog;
}
public void setSoBacklog(int soBacklog) {
this.soBacklog = soBacklog;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public boolean isSoKeepalive() {
return soKeepalive;
}
public void setSoKeepalive(boolean soKeepalive) {
this.soKeepalive = soKeepalive;
}
public int getSendBufferSize() {
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
public int getReceiveBufferSize() {
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public int getWorkerThread() {
return workerThread;
}
public void setWorkerThread(int workerThread) {
this.workerThread = workerThread;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* remote exception
*/
public class RemotingException extends Exception {
public RemotingException() {
super();
}
/** Constructs a new runtime exception with the specified detail message.
* The cause is not initialized, and may subsequently be initialized by a
* call to {@link #initCause}.
*
* @param message the detail message. The detail message is saved for
* later retrieval by the {@link #getMessage()} method.
*/
public RemotingException(String message) {
super(message);
}
/**
* Constructs a new runtime exception with the specified detail message and
* cause. <p>Note that the detail message associated with
* {@code cause} is <i>not</i> automatically incorporated in
* this runtime exception's detail message.
*
* @param message the detail message (which is saved for later retrieval
* by the {@link #getMessage()} method).
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A <tt>null</tt> value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @since 1.4
*/
public RemotingException(String message, Throwable cause) {
super(message, cause);
}
/** Constructs a new runtime exception with the specified cause and a
* detail message of <tt>(cause==null ? null : cause.toString())</tt>
* (which typically contains the class and detail message of
* <tt>cause</tt>). This constructor is useful for runtime exceptions
* that are little more than wrappers for other throwables.
*
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A <tt>null</tt> value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @since 1.4
*/
public RemotingException(Throwable cause) {
super(cause);
}
/**
* Constructs a new runtime exception with the specified detail
* message, cause, suppression enabled or disabled, and writable
* stack trace enabled or disabled.
*
* @param message the detail message.
* @param cause the cause. (A {@code null} value is permitted,
* and indicates that the cause is nonexistent or unknown.)
* @param enableSuppression whether or not suppression is enabled
* or disabled
* @param writableStackTrace whether or not the stack trace should
* be writable
*
* @since 1.7
*/
protected RemotingException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
/**
* netty client request handler
*/
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private final NettyRemotingClient nettyRemotingClient;
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
this.nettyRemotingClient = nettyRemotingClient;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
executorRef = nettyRemotingClient.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
}
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
@Override
public void run() {
try {
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error : {}", msg, ex);
}
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
new Object[]{ch, config.getWriteBufferHighWaterMark()});
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
new Object[]{ch, config.getWriteBufferLowWaterMark()});
}
config.setAutoRead(true);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
/**
* netty server request handler
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final NettyRemotingServer nettyRemotingServer;
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
this.nettyRemotingServer = nettyRemotingServer;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
}
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
public void run() {
try {
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error : {}", msg, ex);
}
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
ctx.channel().close();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
new Object[]{ch, config.getWriteBufferHighWaterMark()});
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
new Object[]{ch, config.getWriteBufferLowWaterMark()});
}
config.setAutoRead(true);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
/**
* netty request processor
*/
public interface NettyRequestProcessor {
void process(final Channel channel, final Command command);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable;
/**
* server address
*/
public class Address implements Serializable {
private String host;
private int port;
public Address(){
//NOP
}
public Address(String host, int port){
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Address other = (Address) obj;
if (host == null) {
if (other.host != null) {
return false;
}
} else if (!host.equals(other.host)) {
return false;
}
return port == other.port;
}
@Override
public String toString() {
return "Address [host=" + host + ", port=" + port + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
/**
* channel utils
*/
public class ChannelUtils {
public static String getLocalAddress(Channel channel){
return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
}
public static String getRemoteAddress(Channel channel){
return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
}
public static Address toAddress(Channel channel){
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import java.nio.charset.Charset;
public class Constants {
public static final String COMMA = ",";
public static final String SLASH = "/";
public static final Charset UTF8 = Charset.forName("UTF-8");
public static final int CPUS = Runtime.getRuntime().availableProcessors();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import com.alibaba.fastjson.JSON;
/**
* json serialize or deserialize
*/
public class FastJsonSerializer {
public static <T> byte[] serialize(T obj) {
String json = JSON.toJSONString(obj);
return json.getBytes(Constants.UTF8);
}
public static <T> String serializeToString(T obj) {
return JSON.toJSONString(obj);
}
public static <T> T deserialize(byte[] src, Class<T> clazz) {
return JSON.parseObject(new String(src, Constants.UTF8), clazz);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
public class Pair<L, R> {
private L left;
private R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L getLeft() {
return left;
}
public void setLeft(L left) {
this.left = left;
}
public R getRight() {
return right;
}
public void setRight(R right) {
this.right = right;
}
}
package org.apache.dolphinscheduler.remote;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.Ping;
import org.apache.dolphinscheduler.remote.command.Pong;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
public class NettyRemotingClientTest {
@Test
public void testSend(){
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
@Override
public void process(Channel channel, Command command) {
channel.writeAndFlush(Pong.create(command.getOpaque()));
}
});
server.start();
//
CountDownLatch latch = new CountDownLatch(1);
AtomicLong opaque = new AtomicLong(1);
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
@Override
public void process(Channel channel, Command command) {
opaque.set(command.getOpaque());
latch.countDown();
}
});
Command commandPing = Ping.create();
try {
client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(opaque.get(), commandPing.getOpaque());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>1.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-rpc</artifactId>
<name>dolphinscheduler-rpc</name>
<url>https://github.com/apache/incubator-dolphinscheduler</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<protobuf.version>3.5.1</protobuf.version>
<grpc.version>1.9.0</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>compile-custom</id>
<goals>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
...@@ -71,7 +71,7 @@ ...@@ -71,7 +71,7 @@
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-rpc</artifactId> <artifactId>dolphinscheduler-service</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
......
...@@ -14,152 +14,101 @@ ...@@ -14,152 +14,101 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.rpc; package org.apache.dolphinscheduler.server.log;
import io.grpc.stub.StreamObserver; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.remote.command.Command;
import com.google.protobuf.ByteString; import org.apache.dolphinscheduler.remote.command.CommandType;
import io.grpc.Server; import org.apache.dolphinscheduler.remote.command.log.*;
import io.grpc.ServerBuilder; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.rpc.*; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* logger server * logger request process logic
*/ */
public class LoggerServer { public class LoggerRequestProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class); private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
/** private final ThreadPoolExecutor executor;
* server
*/
private Server server;
/**
* server start
* @throws IOException io exception
*/
public void start() throws IOException {
/* The port on which the server should run */
int port = Constants.RPC_PORT;
server = ServerBuilder.forPort(port)
.addService(new LogViewServiceGrpcImpl())
.build()
.start();
logger.info("server started, listening on port : {}" , port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
logger.info("shutting down gRPC server since JVM is shutting down");
LoggerServer.this.stop();
logger.info("server shut down");
}
});
}
/** public LoggerRequestProcessor(){
* stop this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
*/
private void stop() {
if (server != null) {
server.shutdown();
}
} }
/** @Override
* await termination on the main thread since the grpc library uses daemon threads. public void process(Channel channel, Command command) {
*/ logger.info("received command : {}", command);
private void blockUntilShutdown() throws InterruptedException {
if (server != null) { /**
server.awaitTermination(); * reuqest task log command type
*/
final CommandType commandType = command.getType();
switch (commandType){
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
case ROLL_VIEW_LOG_REQUEST:
RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), RollViewLogRequestCommand.class);
List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
StringBuilder builder = new StringBuilder();
for (String line : lines){
builder.append(line + "\r\n");
}
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType");
} }
} }
/** public ExecutorService getExecutor(){
* main launches the server from the command line. return this.executor;
*/
/**
* main launches the server from the command line.
* @param args arguments
* @throws IOException io exception
* @throws InterruptedException interrupted exception
*/
public static void main(String[] args) throws IOException, InterruptedException {
final LoggerServer server = new LoggerServer();
server.start();
server.blockUntilShutdown();
} }
/** /**
* Log View Service Grpc Implementation * get files content bytes,for down load file
*/
static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase {
@Override
public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) {
logger.info("log parameter path : {} ,skip line : {}, limit : {}",
request.getPath(),
request.getSkipLineNum(),
request.getLimit());
List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit());
StringBuilder sb = new StringBuilder();
boolean errorLineFlag = false;
for (String line : list){
sb.append(line + "\r\n");
}
RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
responseObserver.onNext(retInfoBuild);
responseObserver.onCompleted();
}
@Override
public void viewLog(PathParameter request, StreamObserver<RetStrInfo> responseObserver) {
logger.info("task path is : {} " , request.getPath());
RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build();
responseObserver.onNext(retInfoBuild);
responseObserver.onCompleted();
}
@Override
public void getLogBytes(PathParameter request, StreamObserver<RetByteInfo> responseObserver) {
try {
ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath()));
RetByteInfo.Builder builder = RetByteInfo.newBuilder();
builder.setData(bytes);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}catch (Exception e){
logger.error("get log bytes failed",e);
}
}
}
/**
* get files bytes
* *
* @param path path * @param filePath file path
* @return byte array of file * @return byte array of file
* @throws Exception exception * @throws Exception exception
*/ */
private static byte[] getFileBytes(String path){ private byte[] getFileContentBytes(String filePath){
InputStream in = null; InputStream in = null;
ByteArrayOutputStream bos = null; ByteArrayOutputStream bos = null;
try { try {
in = new FileInputStream(path); in = new FileInputStream(filePath);
bos = new ByteArrayOutputStream(); bos = new ByteArrayOutputStream();
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
int len = 0; int len;
while ((len = in.read(buf)) != -1) { while ((len = in.read(buf)) != -1) {
bos.write(buf, 0, len); bos.write(buf, 0, len);
} }
...@@ -170,69 +119,61 @@ public class LoggerServer { ...@@ -170,69 +119,61 @@ public class LoggerServer {
if (bos != null){ if (bos != null){
try { try {
bos.close(); bos.close();
} catch (IOException e) { } catch (IOException ignore) {}
e.printStackTrace();
}
} }
if (in != null){ if (in != null){
try { try {
in.close(); in.close();
} catch (IOException e) { } catch (IOException ignore) {}
e.printStackTrace();
}
} }
} }
return null; return new byte[0];
} }
/** /**
* read file content * read part file content,can skip any line and read some lines
* *
* @param path * @param filePath file path
* @param skipLine * @param skipLine skip line
* @param limit * @param limit read lines limit
* @return * @return part file content
*/ */
private static List<String> readFile(String path,int skipLine,int limit){ private List<String> readPartFileContent(String filePath,
try (Stream<String> stream = Files.lines(Paths.get(path))) { int skipLine,
int limit){
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
} catch (IOException e) { } catch (IOException e) {
logger.error("read file failed",e); logger.error("read file error",e);
} }
return null; return Collections.EMPTY_LIST;
} }
/** /**
* read file content * read whole file content
* *
* @param path path * @param filePath file path
* @return string of file content * @return whole file content
* @throws Exception exception
*/ */
private static String readFile(String path){ private String readWholeFileContent(String filePath){
BufferedReader br = null; BufferedReader br = null;
String line = null; String line;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
try { try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(path))); br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
boolean errorLineFlag = false;
while ((line = br.readLine()) != null){ while ((line = br.readLine()) != null){
sb.append(line + "\r\n"); sb.append(line + "\r\n");
} }
return sb.toString(); return sb.toString();
}catch (IOException e){ }catch (IOException e){
logger.error("read file failed",e); logger.error("read file error",e);
}finally { }finally {
try { try {
if (br != null){ if (br != null){
br.close(); br.close();
} }
} catch (IOException e) { } catch (IOException ignore) {}
logger.error(e.getMessage(),e);
}
} }
return null; return "";
} }
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* logger server
*/
public class LoggerServer {
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
/**
* netty server
*/
private final NettyRemotingServer server;
/**
* netty server config
*/
private final NettyServerConfig serverConfig;
/**
* loggger request processor
*/
private final LoggerRequestProcessor requestProcessor;
public LoggerServer(){
this.serverConfig = new NettyServerConfig();
this.serverConfig.setListenPort(Constants.RPC_PORT);
this.server = new NettyRemotingServer(serverConfig);
this.requestProcessor = new LoggerRequestProcessor();
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**
* main launches the server from the command line.
* @param args arguments
*/
public static void main(String[] args) {
final LoggerServer server = new LoggerServer();
server.start();
}
/**
* server start
*/
public void start() {
this.server.start();
logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LoggerServer.this.stop();
}
});
}
/**
* stop
*/
public void stop() {
this.server.close();
logger.info("logger server shut down");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.rpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import org.apache.dolphinscheduler.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* log client
*/
public class LogClient {
/**
* logger of LogClient
*/
private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
/**
* managed channel
*/
private final ManagedChannel channel;
/**
* blocking stub
*/
private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
/**
* Construct client connecting to HelloWorld server at host:port.
*
* @param host host
* @param port port
*/
public LogClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext(true));
}
/**
* Construct client for accessing RouteGuide server using the existing channel.
*
* @param channelBuilder channel builder
*/
LogClient(ManagedChannelBuilder<?> channelBuilder) {
/**
* set max message read size
*/
channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
channel = channelBuilder.build();
blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
}
/**
* shut down channel
*
* @throws InterruptedException interrupted exception
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* roll view log
*
* @param path log path
* @param skipLineNum skip line num
* @param limit limit
* @return log content
*/
public String rollViewLog(String path,int skipLineNum,int limit) {
logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit);
LogParameter pathParameter = LogParameter
.newBuilder()
.setPath(path)
.setSkipLineNum(skipLineNum)
.setLimit(limit)
.build();
RetStrInfo retStrInfo;
try {
retStrInfo = blockingStub.rollViewLog(pathParameter);
return retStrInfo.getMsg();
} catch (StatusRuntimeException e) {
logger.error("roll view log failed", e);
return null;
}
}
/**
* view all log
*
* @param path log path
* @return log content
*/
public String viewLog(String path) {
logger.info("view log path : {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetStrInfo retStrInfo;
try {
retStrInfo = blockingStub.viewLog(pathParameter);
return retStrInfo.getMsg();
} catch (StatusRuntimeException e) {
logger.error("view log failed", e);
return null;
}
}
/**
* get log bytes
*
* @param path log path
* @return log content
*/
public byte[] getLogBytes(String path) {
logger.info("get log bytes {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetByteInfo retByteInfo;
try {
retByteInfo = blockingStub.getLogBytes(pathParameter);
return retByteInfo.getData().toByteArray();
} catch (StatusRuntimeException e) {
logger.error("get log bytes failed ", e);
return null;
}
}
}
\ No newline at end of file
...@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; ...@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -375,7 +375,7 @@ public class ProcessUtils { ...@@ -375,7 +375,7 @@ public class ProcessUtils {
public static void killYarnJob(TaskInstance taskInstance) { public static void killYarnJob(TaskInstance taskInstance) {
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT); LogClientService logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
String log = logClient.viewLog(taskInstance.getLogPath()); String log = logClient.viewLog(taskInstance.getLogPath());
if (StringUtils.isNotEmpty(log)) { if (StringUtils.isNotEmpty(log)) {
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-service</artifactId>
<name>dolphinscheduler-service</name>
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-remote</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* view log response command
*/
public class MasterResponseCommand implements Serializable {
private String msg;
public MasterResponseCommand() {
}
public MasterResponseCommand(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.MASTER_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* view log request command
*/
public class WorkerRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
public WorkerRequestCommand() {
}
public WorkerRequestCommand(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.WORKER_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.log;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* log client
*/
public class LogClientService implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
private final NettyClientConfig clientConfig;
private final NettyRemotingClient client;
private final Address address;
/**
* request time out
*/
private final long logRequestTimeout = 10 * 1000;
/**
* construct client
* @param host host
* @param port port
*/
public LogClientService(String host, int port) {
this.address = new Address(host, port);
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1);
this.client = new NettyRemotingClient(clientConfig);
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
}
/**
* shutdown
*/
public void shutdown() {
this.client.close();
logger.info("logger client shutdown");
}
/**
* roll view log
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
public String rollViewLog(String path,int skipLineNum,int limit) {
logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = ((String)promise.getResult());
} catch (Exception e) {
logger.error("roll view log error", e);
}
return result;
}
/**
* view log
* @param path path
* @return log content
*/
public String viewLog(String path) {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = ((String)promise.getResult());
} catch (Exception e) {
logger.error("view log error", e);
}
return result;
}
/**
* get log size
* @param path log path
* @return log content bytes
*/
public byte[] getLogBytes(String path) {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = (byte[])promise.getResult();
} catch (Exception e) {
logger.error("get log size error", e);
}
return result;
}
@Override
public void process(Channel channel, Command command) {
logger.info("received log response : {}", command);
switch (command.getType()){
case ROLL_VIEW_LOG_RESPONSE:
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
command.getBody(), RollViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
break;
case VIEW_WHOLE_LOG_RESPONSE:
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
command.getBody(), ViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), viewLog.getMsg());
break;
case GET_LOG_BYTES_RESPONSE:
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesResponseCommand.class);
LogPromise.notify(command.getOpaque(), getLog.getData());
break;
default:
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
}
}
public static void main(String[] args) throws Exception{
LogClientService logClient = new LogClientService("192.168.220.247", 50051);
byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
System.out.println(new String(logBytes));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.log;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* log asyc callback
*/
public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
private long opaque;
private final long start;
private final long timeout;
private final CountDownLatch latch;
private Object result;
public LogPromise(long opaque, long timeout){
this.opaque = opaque;
this.timeout = timeout;
this.start = System.currentTimeMillis();
this.latch = new CountDownLatch(1);
PROMISES.put(opaque, this);
}
/**
* notify client finish
* @param opaque unique identification
* @param result result
*/
public static void notify(long opaque, Object result){
LogPromise promise = PROMISES.remove(opaque);
if(promise != null){
promise.doCountDown(result);
}
}
private void doCountDown(Object result){
this.result = result;
this.latch.countDown();
}
public boolean isTimeout(){
return System.currentTimeMillis() - start > timeout;
}
public Object getResult(){
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
PROMISES.remove(opaque);
return this.result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.worker;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.MasterResponseCommand;
import org.apache.dolphinscheduler.service.WorkerRequestCommand;
import org.apache.dolphinscheduler.service.log.LogPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* log client
*/
public class WorkerClientService implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(WorkerClientService.class);
private final NettyClientConfig clientConfig;
private final NettyRemotingClient client;
private final Address address;
/**
* request time out
*/
private final long logRequestTimeout = 10 * 1000;
/**
* construct client
* @param host host
* @param port port
*/
public WorkerClientService(String host, int port) {
this.address = new Address(host, port);
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1);
this.client = new NettyRemotingClient(clientConfig);
this.client.registerProcessor(CommandType.MASTER_RESPONSE, this);
}
/**
* shutdown
*/
public void shutdown() {
this.client.close();
logger.info("logger client shutdown");
}
public String reportResult() {
WorkerRequestCommand request = new WorkerRequestCommand();
String result = "";
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = ((String)promise.getResult());
} catch (Exception e) {
e.printStackTrace();
logger.error("roll view log error", e);
}
return result;
}
@Override
public void process(Channel channel, Command command) {
logger.info("received log response : {}", command);
MasterResponseCommand masterResponseCommand = FastJsonSerializer.deserialize(
command.getBody(), MasterResponseCommand.class);
LogPromise.notify(command.getOpaque(), masterResponseCommand.getMsg());
}
public static void main(String[] args) throws Exception{
WorkerClientService workerClientService = new WorkerClientService("192.168.220.247", 1128);
String result = workerClientService.reportResult();
System.out.println(result);
}
}
\ No newline at end of file
...@@ -229,7 +229,12 @@ ...@@ -229,7 +229,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-rpc</artifactId> <artifactId>dolphinscheduler-remote</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
...@@ -770,9 +775,6 @@ ...@@ -770,9 +775,6 @@
<exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude> <exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude> <exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude> <exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude>
<exclude>
**/dolphinscheduler-rpc/src/main/java/org/apache/dolphinscheduler/rpc/LogViewServiceGrpc.java
</exclude>
</excludes> </excludes>
<consoleOutput>true</consoleOutput> <consoleOutput>true</consoleOutput>
</configuration> </configuration>
...@@ -859,8 +861,9 @@ ...@@ -859,8 +861,9 @@
<module>dolphinscheduler-api</module> <module>dolphinscheduler-api</module>
<module>dolphinscheduler-dao</module> <module>dolphinscheduler-dao</module>
<module>dolphinscheduler-alert</module> <module>dolphinscheduler-alert</module>
<module>dolphinscheduler-rpc</module>
<module>dolphinscheduler-dist</module> <module>dolphinscheduler-dist</module>
<module>dolphinscheduler-remote</module>
<module>dolphinscheduler-service</module>
</modules> </modules>
</project> </project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册