From ba41dbf2c1c785d3044a99741ffcce4b3315e13b Mon Sep 17 00:00:00 2001 From: He Wang Date: Mon, 25 Apr 2022 19:09:31 +0800 Subject: [PATCH] update ClientConf (#35) --- .../clogproxy/client/LogProxyClient.java | 16 +- .../clogproxy/client/config/ClientConf.java | 177 ++++++++++++++++-- .../client/connection/ClientHandler.java | 10 +- .../client/connection/ClientStream.java | 29 +-- .../client/connection/ConnectionFactory.java | 14 +- .../client/connection/NettyEventLoopUtil.java | 51 ----- .../client/connection/StreamContext.java | 38 ++-- .../NamedThreadFactory.java | 2 +- .../clogproxy/client/LogProxyClientTest.java | 5 +- .../client/config/ClientConfTest.java | 35 ++++ 10 files changed, 260 insertions(+), 117 deletions(-) delete mode 100644 logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java rename logproxy-client/src/main/java/com/oceanbase/clogproxy/client/{connection => util}/NamedThreadFactory.java (97%) create mode 100644 logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java index 1adc69f..ce0fa84 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java @@ -17,7 +17,6 @@ import com.oceanbase.clogproxy.client.connection.ClientStream; import com.oceanbase.clogproxy.client.connection.ConnectionParams; import com.oceanbase.clogproxy.client.listener.RecordListener; import com.oceanbase.clogproxy.client.listener.StatusListener; -import com.oceanbase.clogproxy.client.util.ClientIdGenerator; import com.oceanbase.clogproxy.client.util.Validator; import com.oceanbase.clogproxy.common.packet.ProtocolVersion; import io.netty.handler.ssl.SslContext; @@ -34,10 +33,10 @@ public class LogProxyClient { * @param host Log proxy hostname name or ip. * @param port Log proxy port. * @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}. - * @param sslContext {@link SslContext} to create netty handler. + * @param clientConf {@link ClientConf} used to create netty handler. */ public LogProxyClient( - String host, int port, AbstractConnectionConfig config, SslContext sslContext) { + String host, int port, AbstractConnectionConfig config, ClientConf clientConf) { try { Validator.notNull(config.getLogType(), "log type cannot be null"); Validator.notEmpty(host, "server cannot be null"); @@ -48,14 +47,15 @@ public class LogProxyClient { if (!config.valid()) { throw new IllegalArgumentException("Illegal argument for LogProxyClient"); } - String clientId = - ClientConf.USER_DEFINED_CLIENTID.isEmpty() - ? ClientIdGenerator.generate() - : ClientConf.USER_DEFINED_CLIENTID; + if (clientConf == null) { + clientConf = ClientConf.builder().build(); + } + + String clientId = clientConf.getClientId(); ConnectionParams connectionParams = new ConnectionParams(config.getLogType(), clientId, host, port, config); connectionParams.setProtocolVersion(ProtocolVersion.V2); - this.stream = new ClientStream(connectionParams, sslContext); + this.stream = new ClientStream(clientConf, connectionParams); } /** diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java index e398644..cbbf842 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java @@ -11,7 +11,9 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.config; +import com.oceanbase.clogproxy.client.util.ClientIdGenerator; import com.oceanbase.clogproxy.common.config.SharedConf; +import io.netty.handler.ssl.SslContext; /** The class that defines the constants that are used to generate the connection. */ public class ClientConf extends SharedConf { @@ -19,34 +21,183 @@ public class ClientConf extends SharedConf { public static final String VERSION = "1.0.1"; /** Queue size for storing records received from log proxy. */ - public static int TRANSFER_QUEUE_SIZE = 20000; + private final int transferQueueSize; /** Connection timeout in milliseconds. */ - public static int CONNECT_TIMEOUT_MS = 5000; + private final int connectTimeoutMs; /** Reading queue timeout in milliseconds. */ - public static int READ_WAIT_TIME_MS = 2000; + private final int readWaitTimeMs; /** Time to sleep in seconds when retrying. */ - public static int RETRY_INTERVAL_S = 2; + private final int retryIntervalS; + + /** Idle timeout in seconds for netty handler. */ + private final int idleTimeoutS; /** - * Maximum number of retries after disconnect, if not data income lasting {@link - * #IDLE_TIMEOUT_S}, a reconnection will be triggered. + * Maximum number of retries after disconnect, if not data income lasting {@link #idleTimeoutS}, + * a reconnection will be triggered. */ - public static int MAX_RECONNECT_TIMES = -1; - - /** Idle timeout in seconds for netty handler. */ - public static int IDLE_TIMEOUT_S = 15; + private final int maxReconnectTimes; /** Maximum number of reads, after which data will be discarded. */ - public static int NETTY_DISCARD_AFTER_READS = 16; + private final int nettyDiscardAfterReads; + /** User defined client id. */ - public static String USER_DEFINED_CLIENTID = ""; + private final String clientId; /** * Ignore unknown or unsupported record type with a warning log instead of throwing an * exception. */ - public static boolean IGNORE_UNKNOWN_RECORD_TYPE = false; + private final boolean ignoreUnknownRecordType; + + /** Netty ssl context */ + private final SslContext sslContext; + + private ClientConf( + int transferQueueSize, + int connectTimeoutMs, + int readWaitTimeMs, + int retryIntervalS, + int maxReconnectTimes, + int idleTimeoutS, + int nettyDiscardAfterReads, + String clientId, + boolean ignoreUnknownRecordType, + SslContext sslContext) { + this.transferQueueSize = transferQueueSize; + this.connectTimeoutMs = connectTimeoutMs; + this.readWaitTimeMs = readWaitTimeMs; + this.retryIntervalS = retryIntervalS; + this.maxReconnectTimes = maxReconnectTimes; + this.idleTimeoutS = idleTimeoutS; + this.nettyDiscardAfterReads = nettyDiscardAfterReads; + this.clientId = clientId; + this.ignoreUnknownRecordType = ignoreUnknownRecordType; + this.sslContext = sslContext; + } + + public int getTransferQueueSize() { + return transferQueueSize; + } + + public int getConnectTimeoutMs() { + return connectTimeoutMs; + } + + public int getReadWaitTimeMs() { + return readWaitTimeMs; + } + + public int getRetryIntervalS() { + return retryIntervalS; + } + + public int getMaxReconnectTimes() { + return maxReconnectTimes; + } + + public int getIdleTimeoutS() { + return idleTimeoutS; + } + + public int getNettyDiscardAfterReads() { + return nettyDiscardAfterReads; + } + + public String getClientId() { + return clientId; + } + + public boolean isIgnoreUnknownRecordType() { + return ignoreUnknownRecordType; + } + + public SslContext getSslContext() { + return sslContext; + } + + public static Builder builder() { + return new Builder(); + } + + /** ClientConf builder with default values. */ + public static class Builder { + private int transferQueueSize = 20000; + private int connectTimeoutMs = 5000; + private int readWaitTimeMs = 2000; + private int retryIntervalS = 2; + private int maxReconnectTimes = -1; + private int idleTimeoutS = 15; + private int nettyDiscardAfterReads = 16; + private String clientId = ClientIdGenerator.generate(); + private boolean ignoreUnknownRecordType = false; + private SslContext sslContext = null; + + public Builder transferQueueSize(int transferQueueSize) { + this.transferQueueSize = transferQueueSize; + return this; + } + + public Builder connectTimeoutMs(int connectTimeoutMs) { + this.connectTimeoutMs = connectTimeoutMs; + return this; + } + + public Builder readWaitTimeMs(int readWaitTimeMs) { + this.readWaitTimeMs = readWaitTimeMs; + return this; + } + + public Builder retryIntervalS(int retryIntervalS) { + this.retryIntervalS = retryIntervalS; + return this; + } + + public Builder maxReconnectTimes(int maxReconnectTimes) { + this.maxReconnectTimes = maxReconnectTimes; + return this; + } + + public Builder idleTimeoutS(int idleTimeoutS) { + this.idleTimeoutS = idleTimeoutS; + return this; + } + + public Builder nettyDiscardAfterReads(int nettyDiscardAfterReads) { + this.nettyDiscardAfterReads = nettyDiscardAfterReads; + return this; + } + + public Builder clientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder ignoreUnknownRecordType(boolean ignoreUnknownRecordType) { + this.ignoreUnknownRecordType = ignoreUnknownRecordType; + return this; + } + + public Builder sslContext(SslContext sslContext) { + this.sslContext = sslContext; + return this; + } + + public ClientConf build() { + return new ClientConf( + transferQueueSize, + connectTimeoutMs, + readWaitTimeMs, + retryIntervalS, + maxReconnectTimes, + idleTimeoutS, + nettyDiscardAfterReads, + clientId, + ignoreUnknownRecordType, + sslContext); + } + } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index be10307..35b3d05 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -52,6 +52,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { /** A client stream. */ private ClientStream stream; + /** Client config. */ + private ClientConf config; + /** Connection params. */ private ConnectionParams params; @@ -156,7 +159,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { numReads = 0; buffer.release(); buffer = null; - } else if (++numReads >= ClientConf.NETTY_DISCARD_AFTER_READS) { + } else if (++numReads >= config.getNettyDiscardAfterReads()) { numReads = 0; discardSomeReadBytes(); } @@ -317,7 +320,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { byte[] data = new byte[dataLength + 8]; System.arraycopy(bytes, offset, data, 0, data.length); logMessage.parse(data); - if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) { + if (config.isIgnoreUnknownRecordType()) { // unsupported type, ignore logger.debug("Unsupported record type: {}", logMessage); offset += (8 + dataLength); @@ -361,7 +364,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { StreamContext context = ctx.channel().attr(ConnectionFactory.CONTEXT_KEY).get(); stream = context.stream(); - params = context.getParams(); + config = context.config(); + params = context.params(); recordQueue = context.recordQueue(); logger.info( diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 992d02e..20a7a37 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -16,7 +16,6 @@ import com.oceanbase.clogproxy.client.enums.ErrorCode; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.listener.RecordListener; import com.oceanbase.clogproxy.client.listener.StatusListener; -import io.netty.handler.ssl.SslContext; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ public class ClientStream { private Thread thread = null; /** Context of stream. */ - private StreamContext context = null; + private StreamContext context; /** Checkpoint string used to resume writing into the queue. */ private String checkpointString; @@ -74,10 +73,10 @@ public class ClientStream { * Sole constructor. * * @param connectionParams Connection params. - * @param sslContext A {@link SslContext} for encrypted communication. + * @param clientConf Client config. */ - public ClientStream(ConnectionParams connectionParams, SslContext sslContext) { - context = new StreamContext(this, connectionParams, sslContext); + public ClientStream(ClientConf clientConf, ConnectionParams connectionParams) { + this.context = new StreamContext(this, clientConf, connectionParams); } /** Close and wait the connection. */ @@ -132,7 +131,7 @@ public class ClientStream { /** Start the process thread. */ public void start() { // if status listener exist, enable monitor - context.params.setEnableMonitor(!statusListeners.isEmpty()); + context.params().setEnableMonitor(!statusListeners.isEmpty()); if (started.compareAndSet(false, true)) { thread = @@ -150,7 +149,8 @@ public class ClientStream { } if (state == ReconnectState.RETRY) { try { - TimeUnit.SECONDS.sleep(ClientConf.RETRY_INTERVAL_S); + TimeUnit.SECONDS.sleep( + context.config().getRetryIntervalS()); } catch (InterruptedException e) { // do nothing } @@ -163,7 +163,8 @@ public class ClientStream { packet = context.recordQueue() .poll( - ClientConf.READ_WAIT_TIME_MS, + context.config() + .getReadWaitTimeMs(), TimeUnit.MILLISECONDS); break; } catch (InterruptedException e) { @@ -241,11 +242,11 @@ public class ClientStream { logger.warn("start to reconnect..."); try { - if (ClientConf.MAX_RECONNECT_TIMES != -1 - && retryTimes >= ClientConf.MAX_RECONNECT_TIMES) { + if (context.config().getMaxReconnectTimes() != -1 + && retryTimes >= context.config().getMaxReconnectTimes()) { logger.error( "failed to reconnect, exceed max reconnect retry time: {}", - ClientConf.MAX_RECONNECT_TIMES); + context.config().getMaxReconnectTimes()); reconnect.set(true); return ReconnectState.EXIT; } @@ -258,7 +259,7 @@ public class ClientStream { // reconnection. if (StringUtils.isNotEmpty(checkpointString)) { logger.warn("reconnect set checkpoint: {}", checkpointString); - context.getParams().updateCheckpoint(checkpointString); + context.params().updateCheckpoint(checkpointString); } connection = ConnectionFactory.instance().createConnection(context); if (connection != null) { @@ -271,7 +272,7 @@ public class ClientStream { logger.error( "failed to reconnect, retry count: {}, max: {}", ++retryTimes, - ClientConf.MAX_RECONNECT_TIMES); + context.config().getMaxReconnectTimes()); // not success, retry next time reconnect.set(true); return ReconnectState.RETRY; @@ -280,7 +281,7 @@ public class ClientStream { logger.error( "failed to reconnect, retry count: {}, max: {}, message: {}", ++retryTimes, - ClientConf.MAX_RECONNECT_TIMES, + context.config().getMaxReconnectTimes(), e); // not success, retry next time reconnect.set(true); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java index c8d4b37..507b71c 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java @@ -11,9 +11,10 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.connection; -import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.enums.ErrorCode; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; +import com.oceanbase.clogproxy.client.util.NamedThreadFactory; +import com.oceanbase.clogproxy.client.util.NettyEventLoopUtil; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -70,7 +71,7 @@ public class ConnectionFactory { .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true); - SslContext sslContext = context.getSslContext(); + SslContext sslContext = context.config().getSslContext(); bootstrap.handler( new ChannelInitializer() { @@ -80,7 +81,9 @@ public class ConnectionFactory { ch.pipeline().addFirst(sslContext.newHandler(ch.alloc())); } ch.pipeline() - .addLast(new IdleStateHandler(ClientConf.IDLE_TIMEOUT_S, 0, 0)); + .addLast( + new IdleStateHandler( + context.config().getIdleTimeoutS(), 0, 0)); ch.pipeline().addLast(new ClientHandler()); } }); @@ -96,11 +99,12 @@ public class ConnectionFactory { */ public Connection createConnection(StreamContext context) throws LogProxyClientException { Bootstrap bootstrap = initBootstrap(context); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConf.CONNECT_TIMEOUT_MS); + bootstrap.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, context.config().getConnectTimeoutMs()); ChannelFuture channelFuture = bootstrap.connect( new InetSocketAddress( - context.getParams().getHost(), context.getParams().getPort())); + context.params().getHost(), context.params().getPort())); channelFuture.awaitUninterruptibly(); if (!channelFuture.isDone()) { throw new LogProxyClientException(ErrorCode.E_CONNECT, "timeout of create connection!"); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java deleted file mode 100644 index b109a9e..0000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.connection; - - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import java.util.concurrent.ThreadFactory; - -/** This is a factory class of netty event loop. */ -public class NettyEventLoopUtil { - - /** Flag of whether epoll is enabled. */ - private static final boolean EPOLL_ENABLED = Epoll.isAvailable(); - - /** - * Create the right event loop according to current platform and system property, fallback to - * NIO when epoll not enabled. - * - * @param nThreads Number of threads. - * @param threadFactory ThreadFactory instance. - * @return An EventLoopGroup suitable for the current platform. - */ - public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - return EPOLL_ENABLED - ? new EpollEventLoopGroup(nThreads, threadFactory) - : new NioEventLoopGroup(nThreads, threadFactory); - } - - /** - * Get the suitable {@link SocketChannel} for the given EventLoopGroup implementation. - * - * @return A {@link SocketChannel} class suitable for the given EventLoopGroup implementation. - */ - public static Class getClientSocketChannelClass() { - return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class; - } -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java index a4b41bc..4c7548d 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java @@ -15,7 +15,7 @@ import static com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.Runti import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.common.packet.HeaderType; import com.oceanbase.oms.logmessage.LogMessage; -import io.netty.handler.ssl.SslContext; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -79,33 +79,29 @@ public class StreamContext { } /** Blocking queue which stores {@link TransferPacket}. */ - private final BlockingQueue recordQueue = - new LinkedBlockingQueue<>(ClientConf.TRANSFER_QUEUE_SIZE); + private final BlockingQueue recordQueue; /** Client stream. */ private final ClientStream stream; - /** Connection params. */ - ConnectionParams params; + /** Client config. */ + private final ClientConf config; - /** - * Netty ssl context. - * - * @see SslContext - */ - private final SslContext sslContext; + /** Connection params. */ + private final ConnectionParams params; /** * Constructor of StreamContext. * * @param stream Client stream. + * @param config Client config. * @param params Connection params. - * @param sslContext Netty ssl context. */ - public StreamContext(ClientStream stream, ConnectionParams params, SslContext sslContext) { - this.stream = stream; - this.params = params; - this.sslContext = sslContext; + public StreamContext(ClientStream stream, ClientConf config, ConnectionParams params) { + this.stream = Objects.requireNonNull(stream); + this.config = Objects.requireNonNull(config); + this.params = Objects.requireNonNull(params); + this.recordQueue = new LinkedBlockingQueue<>(config.getTransferQueueSize()); } /** @@ -113,17 +109,17 @@ public class StreamContext { * * @return Connection params. */ - public ConnectionParams getParams() { + public ConnectionParams params() { return params; } /** - * Get netty ssl context. + * Get client config. * - * @return Netty ssl context. + * @return Client config. */ - public SslContext getSslContext() { - return sslContext; + public ClientConf config() { + return config; } /** diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NamedThreadFactory.java similarity index 97% rename from logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java rename to logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NamedThreadFactory.java index e22a977..b5b5680 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NamedThreadFactory.java @@ -8,7 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ -package com.oceanbase.clogproxy.client.connection; +package com.oceanbase.clogproxy.client.util; import java.util.concurrent.ThreadFactory; diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java index f3965c3..323c35b 100644 --- a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java @@ -11,6 +11,7 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client; +import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.listener.RecordListener; @@ -68,7 +69,9 @@ public class LogProxyClientTest { config.setStartTimestamp(0L); config.setTableWhiteList("sys.test.*"); - LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, sslContext()); + ClientConf clientConf = ClientConf.builder().sslContext(sslContext()).build(); + + LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, clientConf); client.addListener( new RecordListener() { diff --git a/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java new file mode 100644 index 0000000..9d72c76 --- /dev/null +++ b/logproxy-client/src/test/java/com/oceanbase/clogproxy/client/config/ClientConfTest.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. + * oblogclient is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package com.oceanbase.clogproxy.client.config; + + +import org.junit.Assert; +import org.junit.Test; + +public class ClientConfTest { + + @Test + public void builderTest() { + ClientConf clientConf = ClientConf.builder().build(); + Assert.assertEquals(clientConf.getTransferQueueSize(), 20000); + Assert.assertEquals(clientConf.getConnectTimeoutMs(), 5000); + Assert.assertEquals(clientConf.getReadWaitTimeMs(), 2000); + Assert.assertEquals(clientConf.getRetryIntervalS(), 2); + Assert.assertEquals(clientConf.getMaxReconnectTimes(), -1); + Assert.assertEquals(clientConf.getIdleTimeoutS(), 15); + Assert.assertEquals(clientConf.getNettyDiscardAfterReads(), 16); + Assert.assertNotNull(clientConf.getClientId()); + Assert.assertFalse(clientConf.isIgnoreUnknownRecordType()); + Assert.assertNull(clientConf.getSslContext()); + } +} -- GitLab