未验证 提交 ba41dbf2 编写于 作者: H He Wang 提交者: GitHub

update ClientConf (#35)

上级 4d3ed115
......@@ -17,7 +17,6 @@ import com.oceanbase.clogproxy.client.connection.ClientStream;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.client.util.Validator;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import io.netty.handler.ssl.SslContext;
......@@ -34,10 +33,10 @@ public class LogProxyClient {
* @param host Log proxy hostname name or ip.
* @param port Log proxy port.
* @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}.
* @param sslContext {@link SslContext} to create netty handler.
* @param clientConf {@link ClientConf} used to create netty handler.
*/
public LogProxyClient(
String host, int port, AbstractConnectionConfig config, SslContext sslContext) {
String host, int port, AbstractConnectionConfig config, ClientConf clientConf) {
try {
Validator.notNull(config.getLogType(), "log type cannot be null");
Validator.notEmpty(host, "server cannot be null");
......@@ -48,14 +47,15 @@ public class LogProxyClient {
if (!config.valid()) {
throw new IllegalArgumentException("Illegal argument for LogProxyClient");
}
String clientId =
ClientConf.USER_DEFINED_CLIENTID.isEmpty()
? ClientIdGenerator.generate()
: ClientConf.USER_DEFINED_CLIENTID;
if (clientConf == null) {
clientConf = ClientConf.builder().build();
}
String clientId = clientConf.getClientId();
ConnectionParams connectionParams =
new ConnectionParams(config.getLogType(), clientId, host, port, config);
connectionParams.setProtocolVersion(ProtocolVersion.V2);
this.stream = new ClientStream(connectionParams, sslContext);
this.stream = new ClientStream(clientConf, connectionParams);
}
/**
......
......@@ -11,7 +11,9 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.common.config.SharedConf;
import io.netty.handler.ssl.SslContext;
/** The class that defines the constants that are used to generate the connection. */
public class ClientConf extends SharedConf {
......@@ -19,34 +21,183 @@ public class ClientConf extends SharedConf {
public static final String VERSION = "1.0.1";
/** Queue size for storing records received from log proxy. */
public static int TRANSFER_QUEUE_SIZE = 20000;
private final int transferQueueSize;
/** Connection timeout in milliseconds. */
public static int CONNECT_TIMEOUT_MS = 5000;
private final int connectTimeoutMs;
/** Reading queue timeout in milliseconds. */
public static int READ_WAIT_TIME_MS = 2000;
private final int readWaitTimeMs;
/** Time to sleep in seconds when retrying. */
public static int RETRY_INTERVAL_S = 2;
private final int retryIntervalS;
/** Idle timeout in seconds for netty handler. */
private final int idleTimeoutS;
/**
* Maximum number of retries after disconnect, if not data income lasting {@link
* #IDLE_TIMEOUT_S}, a reconnection will be triggered.
* Maximum number of retries after disconnect, if not data income lasting {@link #idleTimeoutS},
* a reconnection will be triggered.
*/
public static int MAX_RECONNECT_TIMES = -1;
/** Idle timeout in seconds for netty handler. */
public static int IDLE_TIMEOUT_S = 15;
private final int maxReconnectTimes;
/** Maximum number of reads, after which data will be discarded. */
public static int NETTY_DISCARD_AFTER_READS = 16;
private final int nettyDiscardAfterReads;
/** User defined client id. */
public static String USER_DEFINED_CLIENTID = "";
private final String clientId;
/**
* Ignore unknown or unsupported record type with a warning log instead of throwing an
* exception.
*/
public static boolean IGNORE_UNKNOWN_RECORD_TYPE = false;
private final boolean ignoreUnknownRecordType;
/** Netty ssl context */
private final SslContext sslContext;
private ClientConf(
int transferQueueSize,
int connectTimeoutMs,
int readWaitTimeMs,
int retryIntervalS,
int maxReconnectTimes,
int idleTimeoutS,
int nettyDiscardAfterReads,
String clientId,
boolean ignoreUnknownRecordType,
SslContext sslContext) {
this.transferQueueSize = transferQueueSize;
this.connectTimeoutMs = connectTimeoutMs;
this.readWaitTimeMs = readWaitTimeMs;
this.retryIntervalS = retryIntervalS;
this.maxReconnectTimes = maxReconnectTimes;
this.idleTimeoutS = idleTimeoutS;
this.nettyDiscardAfterReads = nettyDiscardAfterReads;
this.clientId = clientId;
this.ignoreUnknownRecordType = ignoreUnknownRecordType;
this.sslContext = sslContext;
}
public int getTransferQueueSize() {
return transferQueueSize;
}
public int getConnectTimeoutMs() {
return connectTimeoutMs;
}
public int getReadWaitTimeMs() {
return readWaitTimeMs;
}
public int getRetryIntervalS() {
return retryIntervalS;
}
public int getMaxReconnectTimes() {
return maxReconnectTimes;
}
public int getIdleTimeoutS() {
return idleTimeoutS;
}
public int getNettyDiscardAfterReads() {
return nettyDiscardAfterReads;
}
public String getClientId() {
return clientId;
}
public boolean isIgnoreUnknownRecordType() {
return ignoreUnknownRecordType;
}
public SslContext getSslContext() {
return sslContext;
}
public static Builder builder() {
return new Builder();
}
/** ClientConf builder with default values. */
public static class Builder {
private int transferQueueSize = 20000;
private int connectTimeoutMs = 5000;
private int readWaitTimeMs = 2000;
private int retryIntervalS = 2;
private int maxReconnectTimes = -1;
private int idleTimeoutS = 15;
private int nettyDiscardAfterReads = 16;
private String clientId = ClientIdGenerator.generate();
private boolean ignoreUnknownRecordType = false;
private SslContext sslContext = null;
public Builder transferQueueSize(int transferQueueSize) {
this.transferQueueSize = transferQueueSize;
return this;
}
public Builder connectTimeoutMs(int connectTimeoutMs) {
this.connectTimeoutMs = connectTimeoutMs;
return this;
}
public Builder readWaitTimeMs(int readWaitTimeMs) {
this.readWaitTimeMs = readWaitTimeMs;
return this;
}
public Builder retryIntervalS(int retryIntervalS) {
this.retryIntervalS = retryIntervalS;
return this;
}
public Builder maxReconnectTimes(int maxReconnectTimes) {
this.maxReconnectTimes = maxReconnectTimes;
return this;
}
public Builder idleTimeoutS(int idleTimeoutS) {
this.idleTimeoutS = idleTimeoutS;
return this;
}
public Builder nettyDiscardAfterReads(int nettyDiscardAfterReads) {
this.nettyDiscardAfterReads = nettyDiscardAfterReads;
return this;
}
public Builder clientId(String clientId) {
this.clientId = clientId;
return this;
}
public Builder ignoreUnknownRecordType(boolean ignoreUnknownRecordType) {
this.ignoreUnknownRecordType = ignoreUnknownRecordType;
return this;
}
public Builder sslContext(SslContext sslContext) {
this.sslContext = sslContext;
return this;
}
public ClientConf build() {
return new ClientConf(
transferQueueSize,
connectTimeoutMs,
readWaitTimeMs,
retryIntervalS,
maxReconnectTimes,
idleTimeoutS,
nettyDiscardAfterReads,
clientId,
ignoreUnknownRecordType,
sslContext);
}
}
}
......@@ -52,6 +52,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
/** A client stream. */
private ClientStream stream;
/** Client config. */
private ClientConf config;
/** Connection params. */
private ConnectionParams params;
......@@ -156,7 +159,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
numReads = 0;
buffer.release();
buffer = null;
} else if (++numReads >= ClientConf.NETTY_DISCARD_AFTER_READS) {
} else if (++numReads >= config.getNettyDiscardAfterReads()) {
numReads = 0;
discardSomeReadBytes();
}
......@@ -317,7 +320,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
byte[] data = new byte[dataLength + 8];
System.arraycopy(bytes, offset, data, 0, data.length);
logMessage.parse(data);
if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) {
if (config.isIgnoreUnknownRecordType()) {
// unsupported type, ignore
logger.debug("Unsupported record type: {}", logMessage);
offset += (8 + dataLength);
......@@ -361,7 +364,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
StreamContext context = ctx.channel().attr(ConnectionFactory.CONTEXT_KEY).get();
stream = context.stream();
params = context.getParams();
config = context.config();
params = context.params();
recordQueue = context.recordQueue();
logger.info(
......
......@@ -16,7 +16,6 @@ import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import io.netty.handler.ssl.SslContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
......@@ -37,7 +36,7 @@ public class ClientStream {
private Thread thread = null;
/** Context of stream. */
private StreamContext context = null;
private StreamContext context;
/** Checkpoint string used to resume writing into the queue. */
private String checkpointString;
......@@ -74,10 +73,10 @@ public class ClientStream {
* Sole constructor.
*
* @param connectionParams Connection params.
* @param sslContext A {@link SslContext} for encrypted communication.
* @param clientConf Client config.
*/
public ClientStream(ConnectionParams connectionParams, SslContext sslContext) {
context = new StreamContext(this, connectionParams, sslContext);
public ClientStream(ClientConf clientConf, ConnectionParams connectionParams) {
this.context = new StreamContext(this, clientConf, connectionParams);
}
/** Close and wait the connection. */
......@@ -132,7 +131,7 @@ public class ClientStream {
/** Start the process thread. */
public void start() {
// if status listener exist, enable monitor
context.params.setEnableMonitor(!statusListeners.isEmpty());
context.params().setEnableMonitor(!statusListeners.isEmpty());
if (started.compareAndSet(false, true)) {
thread =
......@@ -150,7 +149,8 @@ public class ClientStream {
}
if (state == ReconnectState.RETRY) {
try {
TimeUnit.SECONDS.sleep(ClientConf.RETRY_INTERVAL_S);
TimeUnit.SECONDS.sleep(
context.config().getRetryIntervalS());
} catch (InterruptedException e) {
// do nothing
}
......@@ -163,7 +163,8 @@ public class ClientStream {
packet =
context.recordQueue()
.poll(
ClientConf.READ_WAIT_TIME_MS,
context.config()
.getReadWaitTimeMs(),
TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException e) {
......@@ -241,11 +242,11 @@ public class ClientStream {
logger.warn("start to reconnect...");
try {
if (ClientConf.MAX_RECONNECT_TIMES != -1
&& retryTimes >= ClientConf.MAX_RECONNECT_TIMES) {
if (context.config().getMaxReconnectTimes() != -1
&& retryTimes >= context.config().getMaxReconnectTimes()) {
logger.error(
"failed to reconnect, exceed max reconnect retry time: {}",
ClientConf.MAX_RECONNECT_TIMES);
context.config().getMaxReconnectTimes());
reconnect.set(true);
return ReconnectState.EXIT;
}
......@@ -258,7 +259,7 @@ public class ClientStream {
// reconnection.
if (StringUtils.isNotEmpty(checkpointString)) {
logger.warn("reconnect set checkpoint: {}", checkpointString);
context.getParams().updateCheckpoint(checkpointString);
context.params().updateCheckpoint(checkpointString);
}
connection = ConnectionFactory.instance().createConnection(context);
if (connection != null) {
......@@ -271,7 +272,7 @@ public class ClientStream {
logger.error(
"failed to reconnect, retry count: {}, max: {}",
++retryTimes,
ClientConf.MAX_RECONNECT_TIMES);
context.config().getMaxReconnectTimes());
// not success, retry next time
reconnect.set(true);
return ReconnectState.RETRY;
......@@ -280,7 +281,7 @@ public class ClientStream {
logger.error(
"failed to reconnect, retry count: {}, max: {}, message: {}",
++retryTimes,
ClientConf.MAX_RECONNECT_TIMES,
context.config().getMaxReconnectTimes(),
e);
// not success, retry next time
reconnect.set(true);
......
......@@ -11,9 +11,10 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.util.NamedThreadFactory;
import com.oceanbase.clogproxy.client.util.NettyEventLoopUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
......@@ -70,7 +71,7 @@ public class ConnectionFactory {
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
SslContext sslContext = context.getSslContext();
SslContext sslContext = context.config().getSslContext();
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
......@@ -80,7 +81,9 @@ public class ConnectionFactory {
ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast(new IdleStateHandler(ClientConf.IDLE_TIMEOUT_S, 0, 0));
.addLast(
new IdleStateHandler(
context.config().getIdleTimeoutS(), 0, 0));
ch.pipeline().addLast(new ClientHandler());
}
});
......@@ -96,11 +99,12 @@ public class ConnectionFactory {
*/
public Connection createConnection(StreamContext context) throws LogProxyClientException {
Bootstrap bootstrap = initBootstrap(context);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConf.CONNECT_TIMEOUT_MS);
bootstrap.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, context.config().getConnectTimeoutMs());
ChannelFuture channelFuture =
bootstrap.connect(
new InetSocketAddress(
context.getParams().getHost(), context.getParams().getPort()));
context.params().getHost(), context.params().getPort()));
channelFuture.awaitUninterruptibly();
if (!channelFuture.isDone()) {
throw new LogProxyClientException(ErrorCode.E_CONNECT, "timeout of create connection!");
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
/** This is a factory class of netty event loop. */
public class NettyEventLoopUtil {
/** Flag of whether epoll is enabled. */
private static final boolean EPOLL_ENABLED = Epoll.isAvailable();
/**
* Create the right event loop according to current platform and system property, fallback to
* NIO when epoll not enabled.
*
* @param nThreads Number of threads.
* @param threadFactory ThreadFactory instance.
* @return An EventLoopGroup suitable for the current platform.
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return EPOLL_ENABLED
? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}
/**
* Get the suitable {@link SocketChannel} for the given EventLoopGroup implementation.
*
* @return A {@link SocketChannel} class suitable for the given EventLoopGroup implementation.
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class;
}
}
......@@ -15,7 +15,7 @@ import static com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.Runti
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.handler.ssl.SslContext;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -79,33 +79,29 @@ public class StreamContext {
}
/** Blocking queue which stores {@link TransferPacket}. */
private final BlockingQueue<TransferPacket> recordQueue =
new LinkedBlockingQueue<>(ClientConf.TRANSFER_QUEUE_SIZE);
private final BlockingQueue<TransferPacket> recordQueue;
/** Client stream. */
private final ClientStream stream;
/** Connection params. */
ConnectionParams params;
/** Client config. */
private final ClientConf config;
/**
* Netty ssl context.
*
* @see SslContext
*/
private final SslContext sslContext;
/** Connection params. */
private final ConnectionParams params;
/**
* Constructor of StreamContext.
*
* @param stream Client stream.
* @param config Client config.
* @param params Connection params.
* @param sslContext Netty ssl context.
*/
public StreamContext(ClientStream stream, ConnectionParams params, SslContext sslContext) {
this.stream = stream;
this.params = params;
this.sslContext = sslContext;
public StreamContext(ClientStream stream, ClientConf config, ConnectionParams params) {
this.stream = Objects.requireNonNull(stream);
this.config = Objects.requireNonNull(config);
this.params = Objects.requireNonNull(params);
this.recordQueue = new LinkedBlockingQueue<>(config.getTransferQueueSize());
}
/**
......@@ -113,17 +109,17 @@ public class StreamContext {
*
* @return Connection params.
*/
public ConnectionParams getParams() {
public ConnectionParams params() {
return params;
}
/**
* Get netty ssl context.
* Get client config.
*
* @return Netty ssl context.
* @return Client config.
*/
public SslContext getSslContext() {
return sslContext;
public ClientConf config() {
return config;
}
/**
......
......@@ -8,7 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
package com.oceanbase.clogproxy.client.util;
import java.util.concurrent.ThreadFactory;
......
......@@ -11,6 +11,7 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
......@@ -68,7 +69,9 @@ public class LogProxyClientTest {
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.test.*");
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, sslContext());
ClientConf clientConf = ClientConf.builder().sslContext(sslContext()).build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf);
client.addListener(
new RecordListener() {
......
/*
* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
* oblogclient is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package com.oceanbase.clogproxy.client.config;
import org.junit.Assert;
import org.junit.Test;
public class ClientConfTest {
@Test
public void builderTest() {
ClientConf clientConf = ClientConf.builder().build();
Assert.assertEquals(clientConf.getTransferQueueSize(), 20000);
Assert.assertEquals(clientConf.getConnectTimeoutMs(), 5000);
Assert.assertEquals(clientConf.getReadWaitTimeMs(), 2000);
Assert.assertEquals(clientConf.getRetryIntervalS(), 2);
Assert.assertEquals(clientConf.getMaxReconnectTimes(), -1);
Assert.assertEquals(clientConf.getIdleTimeoutS(), 15);
Assert.assertEquals(clientConf.getNettyDiscardAfterReads(), 16);
Assert.assertNotNull(clientConf.getClientId());
Assert.assertFalse(clientConf.isIgnoreUnknownRecordType());
Assert.assertNull(clientConf.getSslContext());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册