diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/config/ShareConf.java b/common/src/main/java/com/oceanbase/clogproxy/common/config/SharedConf.java similarity index 79% rename from common/src/main/java/com/oceanbase/clogproxy/common/config/ShareConf.java rename to common/src/main/java/com/oceanbase/clogproxy/common/config/SharedConf.java index b1150d36b5544b9e68edb3a73e41c5e432d7bc89..87d59bcf8625a6a84b078e090cc9863240e26cf9 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/config/ShareConf.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/config/SharedConf.java @@ -10,6 +10,13 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.common.config; -public class ShareConf { +/** + * The class that defines the shared constants. + */ +public class SharedConf { + + /** + * Flag of whether to use the hash function to process password. + */ public static boolean AUTH_PASSWORD_HASH = true; } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/packet/CompressType.java b/common/src/main/java/com/oceanbase/clogproxy/common/packet/CompressType.java index 344f0aa8ccd593b4f2a4f54812a618f83a350f8d..b96f3552ec6d90d67cae0785b7c854d61fdb18f9 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/packet/CompressType.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/packet/CompressType.java @@ -10,23 +10,40 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.common.packet; +/** + * Compress type enumeration. Primarily used for {@link com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.RecordData}. + */ public enum CompressType { /** - * no compress + * No compress. */ NONE(0), /** - * lz4 compress + * LZ4 compress. */ LZ4(1); - private int code; + /** + * The ordinal of this enumeration constant. + */ + private final int code; + /** + * Constructor. + * + * @param code The ordinal of this enumeration constant. + */ CompressType(int code) { this.code = code; } + /** + * Returns the enum constant of CompressType with the specified code. + * + * @param code The ordinal of this enumeration constant. + * @return The enum constant. + */ public static CompressType codeOf(int code) { for (CompressType v : values()) { if (v.code == code) { @@ -36,6 +53,11 @@ public enum CompressType { return null; } + /** + * Get the ordinal of this enumeration constant. + * + * @return The ordinal of this enumeration constant. + */ public int code() { return code; } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/packet/HeaderType.java b/common/src/main/java/com/oceanbase/clogproxy/common/packet/HeaderType.java index 3c6564f7f08920bbb763b2f257ce7e6b359b54f9..38f1c976d4a2fd8f6f19e100ec9dbcc27a9542ad 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/packet/HeaderType.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/packet/HeaderType.java @@ -10,59 +10,75 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.common.packet; +/** + * Header type enumeration. Used to identify the type of request or response in protobuf. + */ public enum HeaderType { /** - * error response + * Error response. */ ERROR_RESPONSE(-1), /** - * client request handshake + * Client handshake request. */ HANDSHAKE_REQUEST_CLIENT(1), /** - * response to client handshake + * Client handshake response. */ HANDSHAKE_RESPONSE_CLIENT(2), /** - * logreader request handshake + * LogReader handshake request. */ HANDSHAKE_REQUEST_LOGREADER(3), /** - * logreader response handshake + * LogReader handshake response. */ HANDSHAKE_RESPONSE_LOGREADER(4), /** - * logreader data stream + * LogReader data stream. */ DATA_LOGREADER(5), /** - * client data stream + * Client data stream. */ DATA_CLIENT(6), /** - * status info of server runtime + * Status info of server runtime. */ STATUS(7), /** - * status info of LogReader + * Status info of LogReader. */ - STATUS_LOGREADER(8), - ; + STATUS_LOGREADER(8); + /** + * The ordinal of this enumeration constant. + */ private final int code; + /** + * Constructor. + * + * @param code The ordinal of this enumeration constant. + */ HeaderType(int code) { this.code = code; } + /** + * Returns the enum constant of HeaderType with the specified code. + * + * @param code The ordinal of this enumeration constant. + * @return The enum constant. + */ public static HeaderType codeOf(int code) { for (HeaderType t : values()) { if (t.code == code) { @@ -72,6 +88,11 @@ public enum HeaderType { return null; } + /** + * Get the ordinal of this enumeration constant. + * + * @return The ordinal of this enumeration constant. + */ public int code() { return code; } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/packet/LogType.java b/common/src/main/java/com/oceanbase/clogproxy/common/packet/LogType.java index 7d7372e56c2371504e42a90a9b72367782d410f2..ec3b81f3e5bb49d30aa8418fd8a04be911046e1b 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/packet/LogType.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/packet/LogType.java @@ -10,45 +10,51 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.common.packet; -import java.util.HashMap; -import java.util.Map; - +/** + * Log type enumeration. + */ public enum LogType { /** - * LogProxy OceanBase LogReader + * LogProxy OceanBase LogReader. */ OCEANBASE(0); + /** + * The ordinal of this enumeration constant. + */ private final int code; - private static final Map CODE_TYPES = new HashMap<>(values().length); - - static { - for (LogType logCaptureType : values()) { - CODE_TYPES.put(logCaptureType.code, logCaptureType); - } - } - + /** + * Constructor. + * + * @param code The ordinal of this enumeration constant. + */ LogType(int code) { this.code = code; } - public int getCode() { - return this.code; - } - - public static LogType fromString(String string) { - if (string == null) { - throw new NullPointerException("logTypeString is null"); + /** + * Returns the enum constant of LogType with the specified code. + * + * @param code The ordinal of this enumeration constant. + * @return The enum constant. + */ + public static LogType codeOf(int code) { + for (LogType t : values()) { + if (t.code == code) { + return t; + } } - return valueOf(string.toUpperCase()); + return null; } - public static LogType fromCode(int code) { - if (CODE_TYPES.containsKey(code)) { - return CODE_TYPES.get(code); - } - return null; + /** + * Get the ordinal of this enumeration constant. + * + * @return The ordinal of this enumeration constant. + */ + public int code() { + return this.code; } } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/packet/ProtocolVersion.java b/common/src/main/java/com/oceanbase/clogproxy/common/packet/ProtocolVersion.java index a689c17f1e4c37f7f96977cd52a651fdfeed371d..bef5a0b0bdb1552b551965ec8c5d4682c9f531cb 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/packet/ProtocolVersion.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/packet/ProtocolVersion.java @@ -10,20 +10,46 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.common.packet; +/** + * Protocol version enumeration. + */ public enum ProtocolVersion { + /** - * v0 version + * Protocol version 0. */ V0(0), + + /** + * Protocol version 1. + */ V1(1), + + /** + * Protocol version 2. + */ V2(2); + /** + * The ordinal of this enumeration constant. + */ private final int code; + /** + * Constructor. + * + * @param code The ordinal of this enumeration constant. + */ ProtocolVersion(int code) { this.code = code; } + /** + * Returns the enum constant of ProtocolVersion with the specified code. + * + * @param code The ordinal of this enumeration constant. + * @return The enum constant. + */ public static ProtocolVersion codeOf(int code) { for (ProtocolVersion v : values()) { if (v.code == code) { @@ -33,6 +59,11 @@ public enum ProtocolVersion { return null; } + /** + * Get the ordinal of this enumeration constant. + * + * @return The ordinal of this enumeration constant. + */ public int code() { return code; } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/CryptoUtil.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/CryptoUtil.java index 217f1251f06c723e54af3802340ed04c820d0ef4..1ceef226d96cd9ca554ff650d679b5ed16ac7a98 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/CryptoUtil.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/util/CryptoUtil.java @@ -22,26 +22,73 @@ import java.security.InvalidAlgorithmParameterException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +/** + * Utils class for crypto. + */ public class CryptoUtil { + /** + * Default cipher key. + */ private static final String KEY = "LogProxy123*"; + /** + * AES key length. + */ private static final int AES_KEY_SIZE = 256; + + /** + * GCM tag length. + */ private static final int GCM_TAG_LENGTH = 16; + /** + * Create an {@link Encryptor} instance using given cipher key. + * + * @param key Cipher key. + * @return An {@link Encryptor} instance. + */ public static Encryptor newEncryptor(String key) { return new Encryptor(key); } + /** + * Create an Encryptor instance using default cipher key. + * + * @return An {@link Encryptor} instance. + */ public static Encryptor newEncryptor() { return new Encryptor(KEY); } + /** + * This class provides the functionality of encryption and decryption with a specific cipher key. + */ public static class Encryptor { + /** + * The cipher instance. + */ private Cipher cipher = null; // not thread-safe - private byte[] key = new byte[AES_KEY_SIZE / 16]; - private byte[] iv = new byte[12]; + /** + * The key material of the secret key. + * + * @see SecretKeySpec#SecretKeySpec(byte[], String) + */ + private final byte[] key = new byte[AES_KEY_SIZE / 16]; + + /** + * The IV source buffer. + * + * @see GCMParameterSpec#GCMParameterSpec(int, byte[]) + */ + private final byte[] iv = new byte[12]; + + /** + * Constructor. + * + * @param cipherKey The cipher key used to generate {@link Encryptor#key} and {@link Encryptor#iv}. + */ private Encryptor(String cipherKey) { try { cipher = Cipher.getInstance("AES/GCM/NoPadding"); @@ -55,6 +102,12 @@ public class CryptoUtil { } } + /** + * Encrypt given text. + * + * @param text The original text. + * @return Encrypted data. + */ public byte[] encrypt(String text) { SecretKeySpec keySpec = new SecretKeySpec(key, "AES"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv); @@ -68,6 +121,12 @@ public class CryptoUtil { } } + /** + * Decrypt given data. + * + * @param cipherText Encrypted data. + * @return The original string. + */ public String decrypt(byte[] cipherText) { SecretKeySpec keySpec = new SecretKeySpec(key, "AES"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv); @@ -82,10 +141,22 @@ public class CryptoUtil { } } + /** + * Compute hash value of given array of bytes. + * + * @param bytes The origin array of bytes. + * @return The array of bytes for the resulting hash value. + */ public static byte[] sha1(byte[] bytes) { return DigestUtils.sha1(bytes); } + /** + * Compute hash value of given string. + * + * @param text The origin string. + * @return The array of bytes for the resulting hash value. + */ public static byte[] sha1(String text) { return DigestUtils.sha1(text); } diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/Decoder.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/Decoder.java deleted file mode 100644 index 3eaea7716b21516345971d2c3c8d5c659d4913fd..0000000000000000000000000000000000000000 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/Decoder.java +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.common.util; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - -public class Decoder { - - public static String decodeStringInt(ByteBuf buffer) { - if (buffer.readableBytes() < Integer.BYTES) { - return null; - } - buffer.markReaderIndex(); - int length = buffer.readInt(); - if (buffer.readableBytes() < length) { - buffer.resetReaderIndex(); - return null; - } - byte[] bytes = new byte[length]; - buffer.readBytes(bytes); - String str = new String(bytes); - if (str.isEmpty()) { - throw new RuntimeException("decode string is null or empty"); - } - return str; - } - - public static String decodeStringByte(ByteBuf buffer) { - if (buffer.readableBytes() < Byte.BYTES) { - return null; - } - buffer.markReaderIndex(); - short length = buffer.readByte(); - if (buffer.readableBytes() < length) { - buffer.resetReaderIndex(); - return null; - } - byte[] bytes = new byte[length]; - buffer.readBytes(bytes); - String str = new String(bytes); - if (str.isEmpty()) { - throw new RuntimeException("decode string is null or empty"); - } - return str; - } - - public static ByteBuf encodeStringInt(String string) { - if (string == null || string.length() == 0) { - throw new RuntimeException("encode string is null or empty"); - } - ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(4 + string.length()); - byteBuf.writeInt(string.length()); - byteBuf.writeBytes(string.getBytes()); - return byteBuf; - } -} diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/Hex.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/Hex.java index ba7c521c82bcc931b710766fcce492b6dbcef52e..e7d67b26bf1dbab68ef81ef0bac2a670907eef97 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/Hex.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/util/Hex.java @@ -14,24 +14,54 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import org.apache.commons.codec.DecoderException; +/** + * This class is used to convert hexadecimal strings. + */ public final class Hex { + + /** + * Returns a multi-line hexadecimal dump of the array that is easy to read by humans. + * + * @param array An array of bytes + * @return A multi-line hexadecimal dump string + */ public static String dump(byte[] array) { return dump(array, 0, array.length); } + /** + * Returns a multi-line hexadecimal dump of the specified sub-region of bytes that is easy to read by humans. + * + * @param bytes An array of bytes + * @param offset The offset of the sub-region start position + * @param length The length of the sub-region + * @return A multi-line hexadecimal dump string + */ public static String dump(byte[] bytes, int offset, int length) { return ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(bytes, offset, length)); } + /** + * Converts an array of bytes into a string representing the hexadecimal values of each byte in order. + * + * @param bytes An array of bytes + * @return A String containing uppercase hexadecimal characters + */ public static String str(byte[] bytes) { return org.apache.commons.codec.binary.Hex.encodeHexString(bytes, false); } - public static byte[] toBytes(String hexstr) { + /** + * Converts a String representing hexadecimal values into an array of bytes of those same values. + * + * @param hexStr A String representing hexadecimal values + * @return An array of bytes + */ + public static byte[] toBytes(String hexStr) { try { - return org.apache.commons.codec.binary.Hex.decodeHex(hexstr); + return org.apache.commons.codec.binary.Hex.decodeHex(hexStr); } catch (DecoderException e) { return null; } } -} \ No newline at end of file +} diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/NetworkUtil.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/NetworkUtil.java index bb3f9c101e88ee46f7828ec40ed973d1ce0b36ce..938a4b72aad9762261e15438f92c7a33f643838a 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/NetworkUtil.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/util/NetworkUtil.java @@ -13,17 +13,17 @@ package com.oceanbase.clogproxy.common.util; import io.netty.channel.Channel; import org.apache.commons.lang3.StringUtils; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.InterfaceAddress; -import java.net.NetworkInterface; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.UnknownHostException; +import java.net.*; import java.util.Enumeration; +/** + * Utils class for network. + */ public class NetworkUtil { + /** + * Local ip. + */ private static String IP; static { @@ -50,9 +50,9 @@ public class NetworkUtil { } /** - * get local ip + * Get local ip. * - * @return local ip + * @return Local ip. */ public static String getLocalIp() { return IP; @@ -60,6 +60,9 @@ public class NetworkUtil { /** * Parse the remote address of the channel. + * + * @param channel A channel + * @return The address string. */ public static String parseRemoteAddress(final Channel channel) { if (null == channel) { @@ -70,9 +73,10 @@ public class NetworkUtil { } /** + * Parse the address with rules: *
    - *
  1. if an address starts with a '/', skip it. - *
  2. if an address contains a '/', substring it. + *
  3. If an address starts with a '/', skip it. + *
  4. If an address contains a '/', substring it. *
*/ private static String doParse(String addr) { diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/Password.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/Password.java deleted file mode 100644 index dc3d4a1c03227bbc9a80e526d2def0e468fcd6c1..0000000000000000000000000000000000000000 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/Password.java +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.common.util; - -public class Password { - private String value; - - public void set(String value) { - this.value = value; - } - - public String get() { - return value; - } - - @Override - public String toString() { - return "******"; - } -} diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/TaskExecutor.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/TaskExecutor.java deleted file mode 100644 index 702e8fc35d2a5f4ef498687898d260ab615173b9..0000000000000000000000000000000000000000 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/TaskExecutor.java +++ /dev/null @@ -1,146 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.common.util; - -import com.google.common.collect.Maps; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class TaskExecutor { - - private static class Singleton { - private static final TaskExecutor INSTANCE = new TaskExecutor(); - } - - public static TaskExecutor instance() { - return Singleton.INSTANCE; - } - - private TaskExecutor() { } - - public static class Task { - protected Future future; - protected Failure failure; - - public T get() { - try { - return future.get(); - } catch (InterruptedException | ExecutionException e) { - if (failure != null) { - failure.onError(e); - } - return null; - } - } - - public void join() { - get(); - } - } - - public static class BackgroundTask extends Task { - - @Override - public void join() { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - if (failure != null) { - failure.onError(e); - } - } - } - } - - public interface Failure { - void onError(Exception e); - } - - private ExecutorService asyncTasks = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), (ThreadFactory) Thread::new); - - private ExecutorService bgTasks = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), (ThreadFactory) Thread::new); - - private Map concurrentTasks = Maps.newConcurrentMap(); - - public Task async(Callable callable) { - return async(callable, null); - } - - public Task async(Callable callable, Failure failure) { - Task task = new Task<>(); - task.future = asyncTasks.submit(callable); - task.failure = failure; - return task; - } - - public BackgroundTask background(Callable callable) { - return background(callable, null); - } - - public BackgroundTask background(Callable callable, Failure failure) { - BackgroundTask task = new BackgroundTask(); - task.future = bgTasks.submit(callable); - task.failure = failure; - return task; - } - - public static class ConcurrentTask { - private ExecutorService concurrentTasks; - - public ConcurrentTask(int parallelism) { - // Never exceed actual CPU core count for init count, or got an Exception - concurrentTasks = new ForkJoinPool(Math.min(parallelism, - Runtime.getRuntime().availableProcessors()), - ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); - } - - public Future concurrent(Callable callable) { - return concurrentTasks.submit(callable); - } - } - - public ConcurrentTask refConcurrent(String name, int parallelism) { - ConcurrentTask task = concurrentTasks.get(name); - if (task != null) { - return task; - } - task = new ConcurrentTask(parallelism); - concurrentTasks.put(name, task); - return task; - } - - public int getAsyncTaskCount() { - return ((ThreadPoolExecutor) asyncTasks).getActiveCount(); - } - - public int getBgTaskCount() { - return ((ThreadPoolExecutor) bgTasks).getActiveCount(); - } - - public int getConcurrentTaskCount() { - int count = 0; - for (ConcurrentTask t : concurrentTasks.values()) { - count += ((ForkJoinPool) t.concurrentTasks).getActiveThreadCount(); - } - return count; - } -} diff --git a/common/src/main/java/com/oceanbase/clogproxy/common/util/TypeTrait.java b/common/src/main/java/com/oceanbase/clogproxy/common/util/TypeTrait.java index 83a7208e0718d377a0194e66cf726090501d8cb4..ad4df0cc9738909d3a286ab137ef0580a74b3ff6 100644 --- a/common/src/main/java/com/oceanbase/clogproxy/common/util/TypeTrait.java +++ b/common/src/main/java/com/oceanbase/clogproxy/common/util/TypeTrait.java @@ -14,56 +14,123 @@ import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Field; +/** + * Utils class to check and convert data types. + */ public class TypeTrait { + + /** + * Checks if it is a number type object. + * + * @param obj An object to check. + * @return True if it is a number type object, false otherwise. + */ public static boolean isNumber(Object obj) { return (obj instanceof Byte) || (obj instanceof Short) || - (obj instanceof Integer) || (obj instanceof Long); + (obj instanceof Integer) || (obj instanceof Long); } + /** + * Checks if it is a number type {@link Field}. + * + * @param field A field to check. + * @return True if it is a number type field, false otherwise. + */ public static boolean isNumber(Field field) { String typeName = field.getGenericType().getTypeName(); return "byte".equals(typeName) || "java.lang.Byte".equals(typeName) || - "short".equals(typeName) || "java.lang.Short".equals(typeName) || - "int".equals(typeName) || "java.lang.Integer".equals(typeName) || - "long".equals(typeName) || "java.lang.Long".equals(typeName); + "short".equals(typeName) || "java.lang.Short".equals(typeName) || + "int".equals(typeName) || "java.lang.Integer".equals(typeName) || + "long".equals(typeName) || "java.lang.Long".equals(typeName); } + /** + * Checks if it is a real number type object. + * + * @param obj An object to check. + * @return True if it is a real number type object, false otherwise. + */ public static boolean isReal(Object obj) { return (obj instanceof Float) || (obj instanceof Double); } + /** + * Checks if it is a real number type {@link Field}. + * + * @param field A field to check. + * @return True if it is a real number type field, false otherwise. + */ public static boolean isReal(Field field) { String typeName = field.getGenericType().getTypeName(); return "float".equals(typeName) || "java.lang.Float".equals(typeName) || - "double".equals(typeName) || "java.lang.Double".equals(typeName); + "double".equals(typeName) || "java.lang.Double".equals(typeName); } + /** + * Checks if it is a boolean type object. + * + * @param obj An object to check. + * @return True if it is a boolean type object, false otherwise. + */ public static boolean isBool(Object obj) { return obj instanceof Boolean; } + /** + * Checks if it is a boolean type {@link Field}. + * + * @param field A field to check. + * @return True if it is a boolean type field, false otherwise. + */ public static boolean isBool(Field field) { String typeName = field.getGenericType().getTypeName(); return "boolean".equals(typeName) || "java.lang.Boolean".equals(typeName); } + /** + * Checks if it is a string type object. + * + * @param obj An object to check. + * @return True if it is a string type object, false otherwise. + */ public static boolean isString(Object obj) { return (obj instanceof Character) || (obj instanceof String); } + /** + * Checks if it is a string type {@link Field}. + * + * @param field A field to check. + * @return True if it is a string type field, false otherwise. + */ public static boolean isString(Field field) { String typeName = field.getGenericType().getTypeName(); return "char".equals(typeName) || "java.lang.Character".equals(typeName) || - "java.lang.String".equals(typeName); + "java.lang.String".equals(typeName); } + /** + * Checks if the object and field are the same loose type. + * + * @param object An object to check. + * @param field A field to check. + * @return True if the object and field are the same loose type, false otherwise. + */ public static boolean isSameLooseType(Object object, Field field) { return (isNumber(object) && isNumber(field)) || - (isReal(object) && isReal(field)) || - (isBool(object) && isBool(field)) || - (isString(object) && isString(field)); + (isReal(object) && isReal(field)) || + (isBool(object) && isBool(field)) || + (isString(object) && isString(field)); } + /** + * Convert a value from string type. + * + * @param value The source string. + * @param clazz Value class. + * @param Expected value type. + * @return The value converted from string. + */ @SuppressWarnings("unchecked") public static T fromString(String value, Class clazz) { if (clazz == Byte.class || clazz == byte.class) { diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java index 6918b990ea9b868f172903d7b808fae9b59bf860..221fe29a2f19d0b265d69e753c31e4bbd767cc79 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/LogProxyClient.java @@ -21,46 +21,87 @@ import com.oceanbase.clogproxy.client.util.Validator; import com.oceanbase.clogproxy.common.packet.ProtocolVersion; import io.netty.handler.ssl.SslContext; +/** + * A client that makes it easy to connect to log proxy and start a {@link ClientStream}. + */ public class LogProxyClient { + /** + * A {@link ClientStream} instance. + */ private final ClientStream stream; /** - * @param host server hostname name or ip - * @param port server port - * @param config real config object according to what-you-expected - * @param sslContext ssl context to create netty handler + * Constructor with {@link SslContext}. + * + * @param host Log proxy hostname name or ip. + * @param port Log proxy port. + * @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}. + * @param sslContext {@link SslContext} to create netty handler. */ public LogProxyClient(String host, int port, AbstractConnectionConfig config, SslContext sslContext) { - Validator.notNull(config.getLogType(), "log type cannot be null"); - Validator.notNull(host, "server cannot be null"); - Validator.validatePort(port, "port is not valid"); + try { + Validator.notNull(config.getLogType(), "log type cannot be null"); + Validator.notEmpty(host, "server cannot be null"); + Validator.validatePort(port, "port is not valid"); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal argument for LogProxyClient"); + } + if (!config.valid()) { + throw new IllegalArgumentException("Illegal argument for LogProxyClient"); + } String clientId = ClientConf.USER_DEFINED_CLIENTID.isEmpty() ? ClientIdGenerator.generate() : ClientConf.USER_DEFINED_CLIENTID; ConnectionParams connectionParams = new ConnectionParams(config.getLogType(), clientId, host, port, config); connectionParams.setProtocolVersion(ProtocolVersion.V2); this.stream = new ClientStream(connectionParams, sslContext); } + /** + * Constructor without {@link SslContext}. + * + * @param host Log proxy hostname name or ip. + * @param port Log proxy port. + * @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}. + */ public LogProxyClient(String host, int port, AbstractConnectionConfig config) { this(host, port, config, null); } + /** + * Start the client. + */ public void start() { stream.start(); } + /** + * Stop the client. + */ public void stop() { stream.stop(); } + /** + * Join and wait the client. + */ public void join() { stream.join(); } + /** + * Add a {@link RecordListener} to {@link #stream}. + * + * @param recordListener A {@link RecordListener}. + */ public synchronized void addListener(RecordListener recordListener) { stream.addListener(recordListener); } + /** + * Add a {@link StatusListener} to {@link #stream}. + * + * @param statusListener A {@link StatusListener}. + */ public synchronized void addStatusListener(StatusListener statusListener) { stream.addStatusListener(statusListener); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java index c799c43907aef92eff49e0af7a8371382d03945d..9217ccb1b0f165dfffcd4a18ca7c42b264fc77a7 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/AbstractConnectionConfig.java @@ -17,33 +17,57 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +/** + * This is an abstract implementation class of the interface {@link ConnectionConfig}. + */ public abstract class AbstractConnectionConfig implements ConnectionConfig { /** - * defined structure configurations + * Defined configurations map. */ protected static Map> configs = new HashMap<>(); /** - * extra configurations + * Extra configurations map. */ protected final Map extraConfigs = new HashMap<>(); + /** + * This class is used to define configuration with a default value. + * + * @param The type of stored value. + */ @SuppressWarnings("unchecked") protected static class ConfigItem { protected String key; protected T val; + /** + * Sole constructor. + * + * @param key Config key. + * @param val Config value. + */ public ConfigItem(String key, T val) { this.key = key; this.val = val; configs.put(key, (ConfigItem) this); } + /** + * Set value to config item. + * + * @param val Value of specific type. + */ public void set(T val) { this.val = val; } + /** + * Set value of specific type from string. + * + * @param val Value of string type. + */ public void fromString(String val) { this.val = TypeTrait.fromString(val, this.val.getClass()); } @@ -54,6 +78,11 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig { } } + /** + * Sole constructor. + * + * @param allConfigs The map of configurations. + */ public AbstractConnectionConfig(Map allConfigs) { if (allConfigs != null) { for (Entry entry : allConfigs.entrySet()) { @@ -66,12 +95,28 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig { } } + /** + * Get log type set in configurations. + * + * @return The enum constant of {@link LogType}. + */ public abstract LogType getLogType(); + /** + * Add configurations to {@link #extraConfigs} + * + * @param extraConfigs A map of configurations. + */ public void setExtraConfigs(Map extraConfigs) { this.extraConfigs.putAll(extraConfigs); } + /** + * Update value into define configurations map. + * + * @param key Config key. + * @param val New config value. + */ void set(String key, String val) { ConfigItem cs = configs.get(key); if (cs != null) { @@ -80,9 +125,9 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig { } /** - * validate if defined configurations + * Validate defined configurations. * - * @return True or False + * @return Flag of whether all the defined configurations are valid. */ public abstract boolean valid(); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java index 0f9bf61be9543618782f063b3708aa441e5d752c..01b58d11a249cb6bba3ab74c8925924e509a7681 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ClientConf.java @@ -10,29 +10,58 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.config; -import com.oceanbase.clogproxy.common.config.ShareConf; +import com.oceanbase.clogproxy.common.config.SharedConf; -public class ClientConf extends ShareConf { - public static final String VERSION = "1.1.0"; +/** + * The class that defines the constants that are used to generate the connection. + */ +public class ClientConf extends SharedConf { + /** + * Client version. + */ + public static final String VERSION = "1.0.1"; + /** + * Queue size for storing records received from log proxy. + */ public static int TRANSFER_QUEUE_SIZE = 20000; + + /** + * Connection timeout in milliseconds. + */ public static int CONNECT_TIMEOUT_MS = 5000; + + /** + * Reading queue timeout in milliseconds. + */ public static int READ_WAIT_TIME_MS = 2000; + + /** + * Time to sleep in seconds when retrying. + */ public static int RETRY_INTERVAL_S = 2; + /** - * max retry time after disconnect, if not data income lasting IDLE_TIMEOUT_S, a reconnect we be trigger + * Maximum number of retries after disconnect, if not data income lasting {@link #IDLE_TIMEOUT_S}, a reconnection will be triggered. */ public static int MAX_RECONNECT_TIMES = -1; + + /** + * Idle timeout in seconds for netty handler. + */ public static int IDLE_TIMEOUT_S = 15; + + /** + * Maximum number of reads, after which data will be discarded. + */ public static int NETTY_DISCARD_AFTER_READS = 16; /** - * set user defined userid, - * for inner use only + * User defined client id. */ public static String USER_DEFINED_CLIENTID = ""; /** - * ignore unknown or unsupported record type with a warning log instead throwing an exception + * Ignore unknown or unsupported record type with a warning log instead of throwing an exception. */ public static boolean IGNORE_UNKNOWN_RECORD_TYPE = false; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java index c4722b2b015a127c6e4a6e979b814569c3fb1947..df6b143368d9268720a9b60e93e4f24009820b73 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ConnectionConfig.java @@ -10,10 +10,30 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.config; +/** + * This is the interface of connection config. + */ public interface ConnectionConfig { + + /** + * Generate a configuration string from connection parameters. + * + * @return The configuration string. + */ String generateConfigurationString(); + /** + * Update the checkpoint. + * + * @param checkpoint A checkpoint string. + */ void updateCheckpoint(String checkpoint); + /** + * Overrides {@link Object#toString()} to structure a string. + * + * @return The structured string. + */ + @Override String toString(); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/DRCConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/DRCConfig.java deleted file mode 100644 index 21ad022989cd258747a175f55fdac3ecd6fb1f96..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/DRCConfig.java +++ /dev/null @@ -1,688 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.config; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; - -import com.oceanbase.clogproxy.client.fliter.DataFilterBase; -import com.oceanbase.clogproxy.client.message.Checkpoint; -import com.oceanbase.clogproxy.client.util.StringUtils; - -public class DRCConfig { - - public enum DataAcquireOption { - ASK_SELF_UNIT, ASK_OTHER_UNIT, ASK_ALL_DATA, - }; - - /* Configures used by DRCClient. */ - private final Map configures; - - /* All configures used to send to the server is stored */ - private final Map userDefinedParams; - - /* All persistent-required attributes. */ - private final Set persists; - - private Checkpoint checkpoint; - private DataFilterBase filter; - private String blackList; - private int recordsPerBatch = 0; - private int maxRetryTimes = 100; - private int socketTimeout = 10; - private int connectionTimeout = 10; - private boolean useBinaryFormat = false; - private boolean txnMark = true; - private boolean requireCompleteTxn = false; - private int maxRecordsCached = 10240; - private int maxRecordsBatched = 1024; - private int maxTxnsBatched = 10240; - private long maxTimeoutBatched = 500; // ms - private boolean useDrcNet = false; - private boolean drcMarkWorking = false; - private boolean requiredOtherUnitData = false; - private boolean usePublicIp = false; - - private boolean useCaseSensitive = false; - - private boolean trimLongType = false; - - private boolean useCheckCRC = false; - - private boolean useIndexIterator = false; - - private boolean needUKRecord = false; - - /* Properties defined for DRCClient, always defined in drc.properties. */ - private final static String DRC_VERSION = "version"; - private final static String DRC_MANAGERHOST = "manager.host"; - private final static String HTTPS_USE = "client.https"; - private final static String DRC_BINLOGLOGNAME = "DRCClient.Binlog"; - private final static String DRC_CHECKPOINT_POLLPERIOD = "checkpoint.period"; - private static final String SERVER_MAX_RETRIES = "server.maxRetriedTimes"; - private static final String SERVER_MESSAGE_TYPE = "server.messageType"; - private static final String CLIENT_SO_TIMEOUT = "client.socketTimeout"; - private static final String CLIENT_CONN_TIMEOUT = "client.connectionTimeout"; - private static final String CLIENT_REQ_COMP_TXN = "client.requireCompleteTxn"; - private static final String CLIENT_MAX_RECS_CACHE = "client.maxNumOfRecordsCached"; - private static final String CLIENT_MAX_RECS_BATCH = "client.maxNumOfRecordsPerMessage"; - private static final String CLIENT_MAX_TXNS_BATCH = "client.maxNumOfTxnsPerMessage"; - private static final String CLIENT_MAX_TIMEOUT_BATCH = "client.maxTimeoutPerMessage"; - private static final String CLIENT_USE_INDEX_ITERATOR = "enableIndexIter"; - - /* Parameters could be sent to remote servers, usually provided by users. */ - private final static String USER_FILTERCONDITIONS = "condition"; - private final static String USER_DBNAME = "dbname"; - private final static String USER_GROUPNAME = "groupname"; - private final static String USER_IDENTIFICATION = "password"; - private final static String USER_GROUP = "group"; - private final static String USER_SUBGROUP = "subgroup"; - private final static String USER_MYSQL = "instance"; - private final static String DRC_MARK = "drcMark"; - private final static String NEED_UK_RECORD = "needUKRecord"; - private final static String BLACK_REGION_NO = "black_region_no"; - - private final static String DEFAULT_DRC_MARK = "drc.t*x_begin4unit_mark_[0-9]*|*.drc_txn"; - private static final String USE_DRC_NET = "useDrcNet"; - private static final String IPMAPS = "ipmaps"; - //from 40,if store receive this parameter,timestamp return is unix timestamp,otherwise yyyy-mm-dd hh:mm:ss - public static final String CLIENT_VERSION = "client.version"; - public static final String CLIENT_VERSION_ID = "58_SP"; - public static final String REQUIRE_OTHER_UNIT_DATA = "askOtherUnit"; - public static final String DATA_REQUIRED_OPTION = "client.data_acquire_option"; - public static final String SELF_UNIT = "self_unit"; - public static final String OTHER_UNIT = "other_unit"; - @Deprecated - private final static String USER_FILTERSTRICT = "strict"; - @Deprecated - private final static String USER_FILTERWHERE = "where"; - - /* - * Mark the persistent location information in the local file. - */ - public final static String POSITION_INFO = "Global_position_info:"; - - private static Map ipportMaps; - - private String regionId; - private String blackRegionNo; - - /** - * Private constructor providing primary initialization. - */ - private DRCConfig() { - configures = new HashMap(); - userDefinedParams = new HashMap(); - userDefinedParams.put(DRC_MARK, DEFAULT_DRC_MARK); - persists = new HashSet(); - configures.put(DRC_VERSION, "2.0.0"); - configures.put(DRC_CHECKPOINT_POLLPERIOD, "500"); - configures.put(CLIENT_SO_TIMEOUT, "120"); - configures.put(CLIENT_CONN_TIMEOUT, "120"); - checkpoint = new Checkpoint(); - ipportMaps = new HashMap(); - useDrcNet = false; - } - - /** - * Read configures from a reader. - * @param reader java.io.Reader - * @throws IOException when properties load failed - */ - public DRCConfig(final Reader reader) throws IOException { - this(); - Properties properties = new Properties(); - properties.load(reader); - loadProperties(properties); - } - - /** - * Read configures from a properties file. - * @param propertiesFile is the file under the classpath. - * @throws IOException when properties load failed - */ - public DRCConfig(final String propertiesFile) throws IOException { - this(); - InputStream drcProperties = this.getClass().getClassLoader() - .getResourceAsStream(propertiesFile); - Properties properties = new Properties(); - properties.load(drcProperties); - loadProperties(properties); - } - - public DRCConfig(final Properties properties) { - this(); - loadProperties(properties); - } - - /** - * Use privately to load all properties to inner map. - * @param properties - */ - private void loadProperties(Properties properties) { - - for (Entry entry : properties.entrySet()) { - if (entry.getKey().equals(SERVER_MAX_RETRIES)) { - maxRetryTimes = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(CLIENT_SO_TIMEOUT)) { - socketTimeout = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(CLIENT_CONN_TIMEOUT)) { - connectionTimeout = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(SERVER_MESSAGE_TYPE)) { - useBinaryFormat = "binary".equals(entry.getValue()); - } else if (entry.getKey().equals(CLIENT_REQ_COMP_TXN)) { - requireCompleteTxn = "true".equals(entry.getValue()); - } else if (entry.getKey().equals(CLIENT_MAX_RECS_BATCH)) { - // requireCompleteTxn = true; - maxRecordsBatched = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(CLIENT_MAX_TIMEOUT_BATCH)) { - // requireCompleteTxn = true; - maxTimeoutBatched = Long.parseLong((String) entry.getValue()); - } else if (entry.getKey().equals(CLIENT_MAX_TXNS_BATCH)) { - maxTxnsBatched = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(CLIENT_MAX_RECS_CACHE)) { - maxRecordsCached = Integer.parseInt((String) entry.getValue()); - } else if (entry.getKey().equals(IPMAPS)) { - String[] ipmaps = StringUtils.split((String) entry.getValue(), '|'); - for (String ippair : ipmaps) { - String[] ips = StringUtils.split(ippair, '-'); - if (ips.length == 2) { - ipportMaps.put(ips[0], ips[1]); - } - } - } else if (entry.getKey().equals(USE_DRC_NET)) { - useDrcNet = "true".equals(entry.getValue()); - } else if (entry.getKey().equals(CLIENT_USE_INDEX_ITERATOR)) { - useIndexIterator = "true".equals(entry.getValue()); - } else if (entry.getKey().equals(DATA_REQUIRED_OPTION)) { - String value = ((String) entry.getValue()); - DataAcquireOption dataAcquireOption = DataAcquireOption.ASK_ALL_DATA; - if (org.apache.commons.lang3.StringUtils.equals(value, SELF_UNIT)) { - dataAcquireOption = DataAcquireOption.ASK_SELF_UNIT; - } else if (org.apache.commons.lang3.StringUtils.equals(value, OTHER_UNIT)) { - dataAcquireOption = DataAcquireOption.ASK_OTHER_UNIT; - } - setRequireOtherUnitData(dataAcquireOption); - } else if (entry.getKey().equals(NEED_UK_RECORD)) { - needUKRecord = "true".equals(entry.getValue()); - } else if (entry.getKey().equals(BLACK_REGION_NO)) { - blackRegionNo = (String) entry.getValue(); - } - configures.put((String) entry.getKey(), (String) entry.getValue()); - } - } - - public boolean needUKRecord() { - return needUKRecord; - } - - public String getBlackRegionNo() { - return blackRegionNo; - } - - public boolean getUseDrcNet() { - return useDrcNet; - } - - public void setUseDrcNet(boolean useDrcNet) { - this.useDrcNet = useDrcNet; - } - - public void setForceUseIndexIter(boolean forceUseIndexIter) { - this.useIndexIterator = forceUseIndexIter; - } - - public boolean getForceUseIndexIter() { - return this.useIndexIterator; - } - - public void setDataFilter(final DataFilterBase filter) { - this.filter = filter; - } - - public DataFilterBase getDataFilter() { - return filter; - } - - public Checkpoint getCheckpoint() { - return checkpoint; - } - - public int getMaxRetriedTimes() { - return maxRetryTimes; - } - - public int getSocketTimeout() { - return socketTimeout; - } - - public int getConnectionTimeout() { - return connectionTimeout; - } - - /** - * Get the group name. - * @return the group name. - */ - final String getGroupName() { - return userDefinedParams.get(USER_GROUPNAME); - } - - /** - * Set the group name. - * @param groupName is the group name. - */ - final void setGroupName(final String groupName) { - userDefinedParams.put(USER_GROUPNAME, groupName); - } - - /** - * Get the group title, if empty, use group name (user) instead. - * @return - */ - final String getGroup() { - final String group = userDefinedParams.get(USER_GROUP); - if (group == null || group.isEmpty()) { - return getGroupName(); - } - return group; - } - - /** - * Set the group title, note that the group could be different - * from groupname(user name). - * @param group - */ - final void setGroup(final String group) { - userDefinedParams.put(USER_GROUP, group); - } - - /** - * Set the target physical database name. - * @param dbname database name such as icdb0, uic_main_000 and so on. - */ - final void setDbname(final String dbname) { - userDefinedParams.put(USER_DBNAME, dbname); - } - - /** - * Set the subgroup name. - * @param subgroup - */ - final void setSubGroup(final String subgroup) { - userDefinedParams.put(USER_SUBGROUP, subgroup); - } - - /** - * Get the subgroup name, if empty, use db name instead - * @return subgroup - */ - final String getSubGroup() { - final String subgroup = userDefinedParams.get(USER_SUBGROUP); - if (subgroup == null || subgroup.isEmpty()) { - return getDbname(); - } - return subgroup; - } - - /** - * Get the target physical database name. - * @return the database name. - */ - final String getDbname() { - return userDefinedParams.get(USER_DBNAME); - } - - /** - * Set the user's identification, e.t., password. - * @param id is the identification. - */ - final void setIdentification(final String id) { - userDefinedParams.put(USER_IDENTIFICATION, id); - } - - /** - * Set a filename to store checkpoint. - * @param filename - */ - final void setBinlogFilename(final String filename) { - configures.put(DRC_BINLOGLOGNAME, filename); - } - - final String getIdentification() { - return userDefinedParams.get(USER_IDENTIFICATION); - } - - /** - * Get the binlog filename. - * @return the filename. - */ - final String getBinlogFilename() { - return configures.get(DRC_BINLOGLOGNAME); - } - - /** - * Get the period to record one checkpoint. - * @return the period. - */ - final int getCheckpointPeriod() { - return Integer.parseInt(configures.get(DRC_CHECKPOINT_POLLPERIOD)); - } - - /** - * Set user-defined checkpoint. - * @param checkpoint - */ - final void setCheckpoint(final Checkpoint checkpoint) { - this.checkpoint = checkpoint; - } - - final void setCheckpoint(final String checkpoint) { - this.checkpoint.setPosition(checkpoint); - } - - /** - * Set user-defined starting time stamp. - * @param timestamp - */ - final void setGmtModified(final String timestamp) { - checkpoint.setTimestamp(timestamp); - } - - /** - * Set checkpoint or gmtModified as the starting point. - * @param startingPoint - */ - final void setStartingPoint(final String startingPoint) { - - if (startingPoint.contains("@")) { - setCheckpoint(startingPoint); - } else { - if (startingPoint.length() == 13) { - throw new IllegalArgumentException( - "Error the unit of the starting time is second, but " + startingPoint - + " is in ms"); - } - setGmtModified(startingPoint); - } - } - - /** - * Get meta version. - * @return meta version. - */ - final String getVersion() { - return configures.get(DRC_VERSION); - } - - /** - * Get address of the cluster manager. - * @return a URL. - */ - final String getClusterManagerAddresses() { - return configures.get(DRC_MANAGERHOST); - } - - /** - * Set the connected mysql address. - * @param mysql is the mysql address. - */ - final void setInstance(final String mysql) { - userDefinedParams.put(USER_MYSQL, mysql); - checkpoint.setServerId(mysql); - } - - /** - * Get the last connected mysql address. - * @return the mysql address. - */ - final String getInstance() { - return userDefinedParams.get(USER_MYSQL); - } - - /** - * Usually use internally to add user-defined parameters. - * @param key is the name of the parameter. - * @param value is the value of the parameter. - */ - final void addParam(final String key, final String value) { - userDefinedParams.put(key, value); - } - - /** - * Get user-defined parameter by name. - * @param key is the name of the parameter. - * @return - */ - final String getParam(final String key) { - return userDefinedParams.get(key); - } - - /** - * Get all user-defined parameters. - * @return parameters used by the server. - */ - final Map getParams() { - return userDefinedParams; - } - - /** - * Usually use internally to add drc-related configures. - * @param key is the name of the configure. - * @param value is the value of the configure. - */ - final void addConfigure(final String key, final String value) { - configures.put(key, value); - } - - /** - * Get the value of one specific parameter. - * @param key the name of the parameter. - * @return - */ - final String getConfigure(final String key) { - return configures.get(key); - } - - /** - * Get all configures. - * @return all the configures. - */ - final Map getConfigures() { - return configures; - } - - /** - * Get the persistent-required attributes' names. - * @return the list of names. - */ - final Set getPersists() { - return persists; - } - - /** - * Add more persistent-required attributes. - * @param p is the new attributes. - */ - final void addPersists(List p) { - persists.addAll(p); - } - - /** - * Set user-wanted tables and columns. - * @param conditions well-formatted tables and columns. - */ - final void setRequiredTablesAndColumns(final String conditions) { - userDefinedParams.put(USER_FILTERCONDITIONS, conditions); - } - - /** - * Set user-wanted where conditions. - * @param where well-formated where conditions. - */ - final void setWhereFilters(final String where) { - userDefinedParams.put(USER_FILTERWHERE, where); - } - - /** - * Define that the user only want a record which at least has one - * field required changed. - */ - final void setFilterUnchangedRecords() { - userDefinedParams.put(USER_FILTERSTRICT, "true"); - } - - public void setDRCMark(String mark) { - userDefinedParams.put(DRC_MARK, mark); - } - - public String getDRCMark() { - return userDefinedParams.get(DRC_MARK); - } - - final void requireTxnMark(boolean need) { - txnMark = need; - } - - final boolean isTxnMarkRequired() { - return txnMark; - } - - final boolean isBinaryFormat() { - return useBinaryFormat; - } - - public void setNumOfRecordsPerBatch(int threshold) { - recordsPerBatch = threshold; - } - - public int getNumOfRecordsPerBatch() { - return recordsPerBatch; - } - - public boolean isTxnRequiredCompleted() { - return requireCompleteTxn; - } - - public int getMaxRecordsPerTxn() { - return maxRecordsCached; - } - - public int getMaxRecordsBatched() { - return maxRecordsBatched; - } - - public long getMaxTimeoutBatched() { - return maxTimeoutBatched; - } - - public int getMaxTxnsBatched() { - return maxTxnsBatched; - } - - public final String getMappedIpPort(final String ip) { - return ipportMaps.get(ip); - } - - public void useDrcMark() { - drcMarkWorking = true; - } - - public boolean isDrcMarkWorking() { - return drcMarkWorking; - } - - public String getBlackList() { - return blackList; - } - - public void setBlackList(String blackList) { - this.blackList = blackList; - } - - public boolean getUseHTTPS() { - String use = configures.get(HTTPS_USE); - return use != null && !"false".equals(use); - } - - public void usePublicIp() { - usePublicIp = true; - } - - public boolean isUsePublicIp() { - return usePublicIp; - } - - public void useCaseSensitive() { - useCaseSensitive = true; - } - - public boolean isUseCaseSensitive() { - return useCaseSensitive; - } - - public void trimLongType() { - trimLongType = true; - } - - public boolean isTrimLongType() { - return trimLongType; - } - - public boolean isUseCheckCRC() { - return useCheckCRC; - } - - public void setUseCheckCRC(boolean useCheckCRC) { - this.useCheckCRC = useCheckCRC; - } - - public String getRegionId() { - return regionId; - } - - public void setRegionId(String regionId) { - this.regionId = regionId; - } - - public void setRequireOtherUnitData(DataAcquireOption dataAcquireOption) { - switch (dataAcquireOption) { - case ASK_SELF_UNIT: { - this.requiredOtherUnitData = false; - this.drcMarkWorking = true; - break; - } - case ASK_OTHER_UNIT: { - this.requiredOtherUnitData = true; - this.drcMarkWorking = false; - break; - } - case ASK_ALL_DATA: { - this.requiredOtherUnitData = false; - this.drcMarkWorking = false; - break; - } - } - } - - public boolean getRequireOtherUnitData() { - return this.requiredOtherUnitData; - } -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java index bf2e1385aab3f13812b37ac68197e92437277607..a5219ea6e0dc0048257ecdb80c90ed984b4787af 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/config/ObReaderConfig.java @@ -12,7 +12,7 @@ package com.oceanbase.clogproxy.client.config; import com.google.common.collect.Maps; import com.oceanbase.clogproxy.client.util.Validator; -import com.oceanbase.clogproxy.common.config.ShareConf; +import com.oceanbase.clogproxy.common.config.SharedConf; import com.oceanbase.clogproxy.common.packet.LogType; import com.oceanbase.clogproxy.common.util.CryptoUtil; import com.oceanbase.clogproxy.common.util.Hex; @@ -21,19 +21,49 @@ import org.slf4j.LoggerFactory; import java.util.Map; +/** + * This is a configuration class for connection to log proxy. + */ public class ObReaderConfig extends AbstractConnectionConfig { private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class); + /** + * Root server list. + */ private static final ConfigItem RS_LIST = new ConfigItem<>("rootserver_list", ""); + + /** + * Cluster username. + */ private static final ConfigItem CLUSTER_USER = new ConfigItem<>("cluster_user", ""); + + /** + * Cluster password. + */ private static final ConfigItem CLUSTER_PASSWORD = new ConfigItem<>("cluster_password", ""); + + /** + * Table whitelist. + */ private static final ConfigItem TABLE_WHITE_LIST = new ConfigItem<>("tb_white_list", ""); + + /** + * Start timestamp. + */ private static final ConfigItem START_TIMESTAMP = new ConfigItem<>("first_start_timestamp", 0L); + /** + * Constructor with empty arguments. + */ public ObReaderConfig() { super(Maps.newHashMap()); } + /** + * Constructor with a config map. + * + * @param allConfigs Config map. + */ public ObReaderConfig(Map allConfigs) { super(allConfigs); } @@ -64,7 +94,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { StringBuilder sb = new StringBuilder(); for (Map.Entry> entry : configs.entrySet()) { String value = entry.getValue().val.toString(); - if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && ShareConf.AUTH_PASSWORD_HASH) { + if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) { value = Hex.str(CryptoUtil.sha1(value)); } sb.append(entry.getKey()).append("=").append(value).append(" "); @@ -88,49 +118,50 @@ public class ObReaderConfig extends AbstractConnectionConfig { @Override public String toString() { return "rootserver_list=" + RS_LIST + ", cluster_user=" + CLUSTER_USER + ", cluster_password=******, " + - "tb_white_list=" + TABLE_WHITE_LIST + ", start_timestamp=" + START_TIMESTAMP; + "tb_white_list=" + TABLE_WHITE_LIST + ", start_timestamp=" + START_TIMESTAMP; } /** - * 设置管控服务列表 + * Set root server list. * - * @param rsList 管控服务列表 + * @param rsList Root server list. */ public void setRsList(String rsList) { RS_LIST.set(rsList); } /** - * 设置连接OB用户名 + * Set cluster username * - * @param clusterUser 用户名 + * @param clusterUser Cluster username. */ public void setUsername(String clusterUser) { CLUSTER_USER.set(clusterUser); } /** - * 设置连接OB密码 + * Set cluster password * - * @param clusterPassword 密码 + * @param clusterPassword Cluster password. */ public void setPassword(String clusterPassword) { CLUSTER_PASSWORD.set(clusterPassword); } /** - * 配置过滤规则,由租户.库.表3个维度组成,每一段 * 表示任意,如:A.foo.bar,B.foo.*,C.*.*,*.*.* + * Set table whitelist. It is composed of three dimensions: tenant, library, and table. + * Asterisk means any, such as: "A.foo.bar", "B.foo.*", "C.*.*", "*.*.*". * - * @param tableWhiteList 监听表的过滤规则 + * @param tableWhiteList Table whitelist. */ public void setTableWhiteList(String tableWhiteList) { TABLE_WHITE_LIST.set(tableWhiteList); } /** - * 设置起始订阅的 UNIX时间戳,0表示从当前,通常不要早于1小时 + * Set start timestamp, zero means from now on. * - * @param startTimestamp 起始时间戳 + * @param startTimestamp Start timestamp. */ public void setStartTimestamp(Long startTimestamp) { START_TIMESTAMP.set(startTimestamp); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index c66e787dc053b340f26f819d3dd5e89bfb323ca4..103fe77b7d6c10e5ce97a0efc5e4fdf970bbb5a6 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -11,9 +11,9 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.connection; import com.google.protobuf.InvalidProtocolBufferException; +import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.enums.ErrorCode; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; -import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.message.LogMessage; import com.oceanbase.clogproxy.common.packet.CompressType; import com.oceanbase.clogproxy.common.packet.HeaderType; @@ -35,42 +35,129 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +/** + * This is an implementation class of {@link ChannelInboundHandlerAdapter}. + */ public class ClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); + /** + * Magic string used to request log proxy. + */ private static final byte[] MAGIC_STRING = new byte[]{'x', 'i', '5', '3', 'g', ']', 'q'}; + + /** + * Client ip address. + */ private static final String CLIENT_IP = NetworkUtil.getLocalIp(); - private static final int HEAD_LENGTH = 7; + /** + * Length of packet header. + */ + private static final int HEAD_LENGTH = 7; + + /** + * A client stream. + */ private ClientStream stream; + + /** + * Connection params. + */ private ConnectionParams params; + + /** + * Record queue, it's a {@link BlockingQueue} for storing {@link StreamContext.TransferPacket}. + */ private BlockingQueue recordQueue; + /** + * Handshake type enumeration. + */ enum HandshakeStateV1 { + /** + * State of parsing the packet header. + */ PB_HEAD, + /** + * State of handling handshake response. + */ CLIENT_HANDSHAKE_RESPONSE, + /** + * State of handling record. + */ RECORD, + /** + * State of handling error response. + */ ERROR_RESPONSE, + /** + * State of handling runtime status response. + */ STATUS } + /** + * Handshake state. + */ private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD; + /** + * A {@link Cumulator} instance. + */ private final Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR; + + /** + * A {@link ByteBuf} used for channel reading. + */ ByteBuf buffer; + + /** + * A flag of whether channel is active. + */ private boolean poolFlag = true; + + /** + * A flag of whether it is the first part of {@link ByteBuf}. + */ private boolean first; + + /** + * Number of read attempts. + */ private int numReads = 0; + + /** + * A flag of whether the message is not readable. + */ private boolean dataNotEnough = false; + + /** + * The length of message body. + */ private int dataLength = 0; + /** + * A {@link LZ4Factory} instance. + */ LZ4Factory factory = LZ4Factory.fastestInstance(); + + /** + * A {@link LZ4FastDecompressor} instance. + */ LZ4FastDecompressor fastDecompressor = factory.fastDecompressor(); - public ClientHandler() { } + /** + * Constructor with empty arguments. + */ + public ClientHandler() { + } + /** + * Reset {@link #state} to {@link HandshakeStateV1#PB_HEAD}. + */ protected void resetState() { state = HandshakeStateV1.PB_HEAD; } @@ -125,6 +212,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Handle header response. + */ private void handleHeader() { if (buffer.readableBytes() >= HEAD_LENGTH) { int version = buffer.readShort(); @@ -133,13 +223,13 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { checkHeader(version, type, dataLength); HeaderType headerType = HeaderType.codeOf(type); - if(headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) { + if (headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) { state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE; - } else if(headerType == HeaderType.ERROR_RESPONSE) { + } else if (headerType == HeaderType.ERROR_RESPONSE) { state = HandshakeStateV1.ERROR_RESPONSE; - } else if(headerType == HeaderType.DATA_CLIENT) { + } else if (headerType == HeaderType.DATA_CLIENT) { state = HandshakeStateV1.RECORD; - } else if(headerType == HeaderType.STATUS) { + } else if (headerType == HeaderType.STATUS) { state = HandshakeStateV1.STATUS; } } else { @@ -147,8 +237,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Handle handshake response. + */ private void handleHandshakeResponse() throws InvalidProtocolBufferException { - if(buffer.readableBytes() >= dataLength) { + if (buffer.readableBytes() >= dataLength) { byte[] bytes = new byte[dataLength]; buffer.readBytes(bytes); LogProxyProto.ClientHandshakeResponse response = LogProxyProto.ClientHandshakeResponse.parseFrom(bytes); @@ -159,8 +252,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Handle error response. + */ private void handleErrorResponse() throws InvalidProtocolBufferException { - if(buffer.readableBytes() >= dataLength) { + if (buffer.readableBytes() >= dataLength) { byte[] bytes = new byte[dataLength]; buffer.readBytes(bytes); LogProxyProto.ErrorResponse response = LogProxyProto.ErrorResponse.parseFrom(bytes); @@ -171,8 +267,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Handle server status response. + */ private void handleServerStatus() throws InvalidProtocolBufferException { - if(buffer.readableBytes() >= dataLength) { + if (buffer.readableBytes() >= dataLength) { byte[] bytes = new byte[dataLength]; buffer.readBytes(bytes); LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes); @@ -183,8 +282,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Handle record data response. + */ private void handleRecord() { - if(buffer.readableBytes() >= dataLength) { + if (buffer.readableBytes() >= dataLength) { parseDataNew(); state = HandshakeStateV1.PB_HEAD; } else { @@ -192,6 +294,13 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Check if the header is valid. + * + * @param version Protocol version. + * @param type Header type. + * @param length Data length. + */ private void checkHeader(int version, int type, int length) { if (ProtocolVersion.codeOf(version) == null) { logger.error("unsupported protocol version: {}", version); @@ -207,6 +316,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Do parse record data from buffer. It will firstly decompress the raw data if necessary. + */ private void parseDataNew() { try { byte[] buff = new byte[dataLength]; @@ -221,7 +333,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { int decompress = fastDecompressor.decompress(rawData, 0, bytes, 0, compressedLen); if (decompress != rawLen) { throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress - + "] is not expected [" + rawLen + "]"); + + "] is not expected [" + rawLen + "]"); } parseRecord(bytes); } else { @@ -232,26 +344,30 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } - - + /** + * Do parse record data from an array of bytes to a {@link LogMessage} and add it into {@link #recordQueue}. + * + * @param bytes An array of bytes of record data. + * @throws LogProxyClientException If exception occurs. + */ private void parseRecord(byte[] bytes) throws LogProxyClientException { int offset = 0; while (offset < bytes.length) { int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4); - LogMessage drcRecord; + LogMessage logMessage; try { /* * We must copy a byte array and call parse after then, * or got a !!!RIDICULOUS EXCEPTION!!!, - * if we wrap a upooled buffer with offset and call setByteBuf just as same as `parse` function do. + * if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do. */ - drcRecord = new LogMessage(false); + logMessage = new LogMessage(false); byte[] data = new byte[dataLength + 8]; System.arraycopy(bytes, offset, data, 0, data.length); - drcRecord.parse(data); + logMessage.parse(data); if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) { // unsupported type, ignore - logger.debug("Unsupported record type: {}", drcRecord); + logger.debug("Unsupported record type: {}", logMessage); offset += (8 + dataLength); continue; } @@ -262,7 +378,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { while (true) { try { - recordQueue.put(new StreamContext.TransferPacket(drcRecord)); + recordQueue.put(new StreamContext.TransferPacket(logMessage)); break; } catch (InterruptedException e) { // do nothing @@ -273,6 +389,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * Discard the bytes in buffer. + */ protected final void discardSomeReadBytes() { if (buffer != null && !first && buffer.refCnt() == 1) { // discard some bytes if possible to make more room in the @@ -299,9 +418,14 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().writeAndFlush(generateConnectRequest(params.getProtocolVersion())); } + /** + * Generate the request body for protocol v2. + * + * @return Request body. + */ public ByteBuf generateConnectRequestV2() { LogProxyProto.ClientHandshakeRequest handShake = LogProxyProto.ClientHandshakeRequest.newBuilder(). - setLogType(params.getLogType().getCode()). + setLogType(params.getLogType().code()). setIp(CLIENT_IP). setId(params.getClientId()). setVersion(ClientConf.VERSION). @@ -319,6 +443,12 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { return byteBuf; } + /** + * Generate the request body. + * + * @param version Protocol version. + * @return Request body. + */ public ByteBuf generateConnectRequest(ProtocolVersion version) { if (version == ProtocolVersion.V2) { return generateConnectRequestV2(); @@ -331,7 +461,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1); byteBuf.writeShort(ProtocolVersion.V0.code()); byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code()); - byteBuf.writeByte(params.getLogType().getCode()); + byteBuf.writeByte(params.getLogType().code()); // body int length = CLIENT_IP.length(); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 27a8db9f691ebd574138a66c3c3f4e3358c200e6..20731fe67ca73bd9ee26b6aab76f958a733fe981 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -26,27 +26,64 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +/** + * This class represents a stream of log client. Stream means a channel of log transmission here. + */ public class ClientStream { private static final Logger logger = LoggerFactory.getLogger(ClientStream.class); - // routine + /** + * Flag of whether the stream is started. + */ private final AtomicBoolean started = new AtomicBoolean(false); + /** + * The process thread. + */ private Thread thread = null; - // status + /** + * Context of stream. + */ private StreamContext context = null; + + /** + * Checkpoint string used to resume writing into the queue. + */ private String checkpointString; - // reconnection + /** + * Number of reconnections + */ private int retryTimes = 0; + + /** + * Connection to log proxy with netty channel. + */ private Connection connection = null; + + /** + * Flag of whether the stream is reconnecting now. + */ private final AtomicBoolean reconnecting = new AtomicBoolean(true); + + /** + * Flag of whether the stream need reconnect. + */ private final AtomicBoolean reconnect = new AtomicBoolean(true); - // user callbacks + /** + * The list of {@link RecordListener}. + */ private final List listeners = new ArrayList<>(); + + /** + * The list of {@link StatusListener} + */ private final List statusListeners = new ArrayList<>(); + /** + * Reconnect state type enumeration. + */ private enum ReconnectState { /** * success @@ -62,10 +99,19 @@ public class ClientStream { EXIT; } + /** + * Sole constructor. + * + * @param connectionParams Connection params. + * @param sslContext A {@link SslContext} for encrypted communication. + */ public ClientStream(ConnectionParams connectionParams, SslContext sslContext) { context = new StreamContext(this, connectionParams, sslContext); } + /** + * Close and wait the connection. + */ public void stop() { if (!started.compareAndSet(true, false)) { logger.info("stopping LogProxy Client...."); @@ -81,6 +127,9 @@ public class ClientStream { logger.info("stopped LogProxy Client"); } + /** + * Call {@link Thread#join()} method of process thread. + */ public void join() { if (thread != null) { try { @@ -91,10 +140,18 @@ public class ClientStream { } } + /** + * Call {@link #stop()} asynchronously. + */ public void triggerStop() { new Thread(this::stop).start(); } + /** + * Call {@link RecordListener#onException(LogProxyClientException)} asynchronously. + * + * @param e An exception. + */ public void triggerException(LogProxyClientException e) { // use thread make sure non-blocking new Thread(() -> { @@ -104,6 +161,9 @@ public class ClientStream { }).start(); } + /** + * Start the process thread. + */ public void start() { // if status listener exist, enable monitor context.params.setEnableMonitor(CollectionUtils.isNotEmpty(statusListeners)); @@ -182,10 +242,20 @@ public class ClientStream { } } + /** + * Get the flag of whether the stream is started. + * + * @return The flag of whether the stream is started. + */ public boolean isRunning() { return started.get(); } + /** + * Reconnect to log proxy. It is also used for first time connecting. + * + * @return A {@link ReconnectState}. + */ private ReconnectState reconnect() { // reconnect flag mark, tiny load for checking if (reconnect.compareAndSet(true, false)) { @@ -233,6 +303,9 @@ public class ClientStream { return ReconnectState.SUCCESS; } + /** + * Reset the flags for reconnection. + */ public void triggerReconnect() { // reconnection action guard, avoid concurrent or multiple invoke if (reconnecting.compareAndSet(false, true)) { @@ -240,10 +313,20 @@ public class ClientStream { } } + /** + * Add a {@link RecordListener} to {@link #listeners}. + * + * @param recordListener A {@link RecordListener}. + */ public synchronized void addListener(RecordListener recordListener) { listeners.add(recordListener); } + /** + * Add a {@link StatusListener} to {@link #statusListeners}. + * + * @param statusListener A {@link StatusListener}. + */ public synchronized void addStatusListener(StatusListener statusListener) { statusListeners.add(statusListener); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/Connection.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/Connection.java index 89cbe655b06fde8d0975d5f67183a0402efb1436..4dbb614550a5452d9a055b505037a6a9f62593cb 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/Connection.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/Connection.java @@ -18,18 +18,35 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; +/** + * This class represents a connection which contains a netty channel. + */ public class Connection { private static final Logger logger = LoggerFactory.getLogger(Connection.class); + /** + * A netty channel. + */ private Channel channel; + /** + * A flag of whether the channel is closed. + */ private final AtomicBoolean closed = new AtomicBoolean(false); + /** + * Sole constructor. + * + * @param channel A netty channel. + */ public Connection(Channel channel) { this.channel = channel; } + /** + * Close this connection. + */ public void close() { if (!closed.compareAndSet(false, true)) { logger.warn("connection already closed"); @@ -40,13 +57,19 @@ public class Connection { channel.close().addListener(this::logCloseResult).syncUninterruptibly(); } catch (Exception e) { logger.warn("close connection to remote address {} exception", - NetworkUtil.parseRemoteAddress(channel), e); + NetworkUtil.parseRemoteAddress(channel), e); } } channel = null; } } + /** + * A callback that will logging the result of {@link Channel#close()}. + * + * @param future The source {@link Future} which called this callback. + */ + @SuppressWarnings("rawtypes") private void logCloseResult(Future future) { if (future.isSuccess()) { if (logger.isInfoEnabled()) { diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java index 2800e466151b0848b7b2cbe6830fc15c3577a6cc..033fcc041e62821f8cd37d693e08f8a30bbf68e1 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionFactory.java @@ -25,31 +25,60 @@ import io.netty.util.AttributeKey; import java.net.InetSocketAddress; +/** + * This is a factory class of {@link Connection}. + */ public class ConnectionFactory { + /** + * A static class that holds the singleton instance of {@link ConnectionFactory}. + */ private static class Singleton { + /** + * The singleton instance of {@link ConnectionFactory}. + */ private static final ConnectionFactory INSTANCE = new ConnectionFactory(); } + /** + * Get the singleton instance of {@link ConnectionFactory}. + * + * @return The singleton instance of {@link ConnectionFactory}. + */ public static ConnectionFactory instance() { return Singleton.INSTANCE; } + /** + * Sole constructor. It can only be used in {@link Singleton} class. + */ private ConnectionFactory() { } + /** + * Context key. + */ public static final AttributeKey CONTEXT_KEY = AttributeKey.valueOf("context"); + /** + * Worker group in type of {@link EventLoopGroup}. + */ private static final EventLoopGroup WORKER_GROUP = NettyEventLoopUtil.newEventLoopGroup(1, - new NamedThreadFactory("log-proxy-client-worker", true)); + new NamedThreadFactory("log-proxy-client-worker", true)); + /** + * Create a {@link Bootstrap} instance. + * + * @param sslContext The {@link SslContext} used for encrypted communication. + * @return A {@link Bootstrap} instance. + */ private Bootstrap initBootstrap(SslContext sslContext) { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(WORKER_GROUP) - .channel(NettyEventLoopUtil.getClientSocketChannelClass()) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true); + .channel(NettyEventLoopUtil.getClientSocketChannelClass()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer() { @@ -65,6 +94,13 @@ public class ConnectionFactory { return bootstrap; } + /** + * Create a {@link Connection} with specific {@link StreamContext}. + * + * @param context Stream context. + * @return A {@link Connection}. + * @throws LogProxyClientException If exception occurs. + */ public Connection createConnection(StreamContext context) throws LogProxyClientException { Bootstrap bootstrap = initBootstrap(context.getSslContext()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConf.CONNECT_TIMEOUT_MS); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionParams.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionParams.java index b252b11450c6cf72562b4999c5bc0f4ade40fa5e..8f2ff0f507a650808d0545523c2f10a9f61b0e61 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionParams.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ConnectionParams.java @@ -16,19 +16,60 @@ import com.oceanbase.clogproxy.common.packet.ProtocolVersion; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; - +/** + * This is a configuration class of connection parameters. + */ public class ConnectionParams { + + /** + * Log type. + */ private final LogType logType; + + /** + * Client id. + */ private final String clientId; + + /** + * Log proxy host. + */ private final String host; + + /** + * Log proxy port. + */ private final int port; + /** + * Connection config. + */ private final ConnectionConfig connectionConfig; + + /** + * Generated configuration string. + */ private String configurationString; + /** + * Protocol version. + */ private ProtocolVersion protocolVersion; + + /** + * Flag of whether enable monitor. + */ private boolean enableMonitor; + /** + * Constructor. + * + * @param logType Log type. + * @param clientId Client id. + * @param host Log proxy host. + * @param port Log proxy port. + * @param connectionConfig Connection config. + */ public ConnectionParams(LogType logType, String clientId, String host, int port, ConnectionConfig connectionConfig) { this.logType = logType; this.clientId = clientId; @@ -38,6 +79,11 @@ public class ConnectionParams { this.configurationString = connectionConfig.generateConfigurationString(); } + /** + * Update checkpoint in connection config. + * + * @param checkpoint Checkpoint of the last record put in queue. + */ public void updateCheckpoint(String checkpoint) { connectionConfig.updateCheckpoint(checkpoint); configurationString = connectionConfig.generateConfigurationString(); @@ -48,42 +94,92 @@ public class ConnectionParams { return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); } + /** + * Get the basic info of connection, which contains client id and connection config. + * + * @return A string of client id and connection config. + */ public String info() { return clientId + ": " + connectionConfig.toString(); } + /** + * Get the log type. + * + * @return The log type. + */ public LogType getLogType() { return logType; } + /** + * Get the client id. + * + * @return The client id. + */ public String getClientId() { return clientId; } + /** + * Get the host of log proxy. + * + * @return The host of log proxy. + */ public String getHost() { return host; } + /** + * Get the port of log proxy. + * + * @return The port of log proxy. + */ public int getPort() { return port; } + /** + * Get the generated configuration string. + * + * @return The configuration string. + */ public String getConfigurationString() { return configurationString; } + /** + * Get the protocol version. + * + * @return Protocol version. + */ public ProtocolVersion getProtocolVersion() { return protocolVersion; } + /** + * Set the protocol version. + * + * @param protocolVersion Protocol version. + */ public void setProtocolVersion(ProtocolVersion protocolVersion) { this.protocolVersion = protocolVersion; } + /** + * Get the flag of whether enable monitor. + * + * @return The flag of whether enable monitor. + */ public boolean isEnableMonitor() { return enableMonitor; } + /** + * Set the flag of whether enable monitor. + * + * @param enableMonitor The flag of whether enable monitor. + */ public void setEnableMonitor(boolean enableMonitor) { this.enableMonitor = enableMonitor; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java index 05017a3ca7fc5f581ffa0008e64163589e0156ba..04ec8e0611a5aa5721049cfd1c2b1a86d253c001 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NamedThreadFactory.java @@ -13,22 +13,54 @@ package com.oceanbase.clogproxy.client.connection; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +/** + * This is a factory class for {@link ThreadFactory}. + */ public class NamedThreadFactory implements ThreadFactory { + /** + * Pool number. + */ private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final ThreadGroup group; - private final String namePrefix; - private final boolean isDaemon; + /** + * Thread number. + */ + private final AtomicInteger threadNumber = new AtomicInteger(1); + /** + * Thread group. + */ + private final ThreadGroup group; + /** + * Prefix of thread name. + */ + private final String namePrefix; + /** + * Flag of whether the thread is daemon. + */ + private final boolean isDaemon; + /** + * Constructor with no arguments. It will take "ThreadPool" as its name. + */ public NamedThreadFactory() { this("ThreadPool"); } + /** + * Constructor with name. + * + * @param name Name of thread factory. + */ public NamedThreadFactory(String name) { this(name, false); } + /** + * Constructor with name prefix and daemon flag. + * + * @param prefix Name prefix of thread factory. + * @param daemon A flag of whether starting the thread on daemon mode. + */ public NamedThreadFactory(String prefix, boolean daemon) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); @@ -36,11 +68,6 @@ public class NamedThreadFactory implements ThreadFactory { isDaemon = daemon; } - /** - * Create a thread. - * - * @see ThreadFactory#newThread(Runnable) - */ @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java index 99dcce35931b15236a0d0ac9f16a3228d716d369..4badc6d9f1b1e59d1f597ec2f3e9b9b16571c283 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/NettyEventLoopUtil.java @@ -20,27 +20,34 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.util.concurrent.ThreadFactory; +/** + * This is a factory class of netty event loop. + */ public class NettyEventLoopUtil { - /** check whether epoll enabled, and it would not be changed during runtime. */ - private static boolean epollEnabled = Epoll.isAvailable(); + /** + * Flag of whether epoll is enabled. + */ + private static final boolean EPOLL_ENABLED = Epoll.isAvailable(); /** * Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled. * - * @param nThreads number of threads - * @param threadFactory ThreadFactory - * @return an EventLoopGroup suitable for the current platform + * @param nThreads Number of threads. + * @param threadFactory ThreadFactory instance. + * @return An EventLoopGroup suitable for the current platform. */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory) + return EPOLL_ENABLED ? new EpollEventLoopGroup(nThreads, threadFactory) : new NioEventLoopGroup(nThreads, threadFactory); } /** - * @return a SocketChannel class suitable for the given EventLoopGroup implementation + * Get the suitable {@link SocketChannel} for the given EventLoopGroup implementation. + * + * @return A {@link SocketChannel} class suitable for the given EventLoopGroup implementation. */ public static Class getClientSocketChannelClass() { - return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class; + return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class; } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java index 19414800ac373aa2e3f4ba2434f0b0fbded86449..131fcdff5dd933a92f171a533174d726fd8e65b8 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/StreamContext.java @@ -20,59 +20,140 @@ import java.util.concurrent.LinkedBlockingQueue; import static com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.RuntimeStatus; +/** + * This class represents the context of client stream. + */ public class StreamContext { + public static class TransferPacket { - private HeaderType type; + /** + * Packet header type. + */ + private final HeaderType type; + /** + * Log message record. + */ private LogMessage record; + /** + * Log proxy runtime status. + */ private RuntimeStatus status; + /** + * Constructor with a {@link LogMessage}. + * + * @param record A {@link LogMessage}. + */ public TransferPacket(LogMessage record) { this.type = HeaderType.DATA_CLIENT; this.record = record; } + /** + * Constructor with a {@link RuntimeStatus}. + * + * @param status A {@link RuntimeStatus}. + */ public TransferPacket(RuntimeStatus status) { this.type = HeaderType.STATUS; this.status = status; } + /** + * Get header type. + * + * @return Packet header type. + */ public HeaderType getType() { return type; } + /** + * Get the log message record. + * + * @return Log message record. + */ public LogMessage getRecord() { return record; } + /** + * Get the log proxy runtime status. + * + * @return Log proxy runtime status. + */ public RuntimeStatus getStatus() { return status; } } + /** + * Blocking queue which stores {@link TransferPacket}. + */ private final BlockingQueue recordQueue = new LinkedBlockingQueue<>(ClientConf.TRANSFER_QUEUE_SIZE); + /** + * Client stream. + */ private final ClientStream stream; + + /** + * Connection params. + */ ConnectionParams params; + + /** + * Netty ssl context. + * + * @see SslContext + */ private final SslContext sslContext; + /** + * Constructor of StreamContext. + * + * @param stream Client stream. + * @param params Connection params. + * @param sslContext Netty ssl context. + */ public StreamContext(ClientStream stream, ConnectionParams params, SslContext sslContext) { this.stream = stream; this.params = params; this.sslContext = sslContext; } + /** + * Get connection params. + * + * @return Connection params. + */ public ConnectionParams getParams() { return params; } + /** + * Get netty ssl context. + * + * @return Netty ssl context. + */ public SslContext getSslContext() { return sslContext; } + /** + * Get the client stream. + * + * @return Client stream. + */ public ClientStream stream() { return stream; } + /** + * Get the record queue. + * + * @return Record queue. + */ public BlockingQueue recordQueue() { return recordQueue; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/constants/DataType.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/constants/DataType.java index 70dea8a8843ebe6d933a7c5cfdf173324468d44a..b4e98a8909d06cc743e703010fb5c354372bde24 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/constants/DataType.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/constants/DataType.java @@ -10,6 +10,9 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.constants; +/** + * The class that defines the constants that are used to identify data types. + */ public class DataType { public static final byte DT_UNKNOWN = 0x00; diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/DBType.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/DBType.java index 5048f6a12e8028f547d83245e2240168521198ba..af464a3934ec47a89ea6e3a247335d5c29e098ff 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/DBType.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/DBType.java @@ -10,6 +10,9 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.enums; +/** + * Database type enumeration. + */ public enum DBType { MYSQL, diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/ErrorCode.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/ErrorCode.java index 46c7a18b027476a73c8080afb49c6cb2c7860c67..b3460c349c9ca740d915b88f112db17bf88f9174 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/ErrorCode.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/enums/ErrorCode.java @@ -10,66 +10,77 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.enums; +/** + * Error code enumeration. + */ public enum ErrorCode { ////////// 0~499: process error //////////// /** - * general error + * General error. */ NONE(0), /** - * inner error + * Inner error */ E_INNER(1), /** - * failed to connect + * Failed to connect. */ E_CONNECT(2), /** - * exceed max retry connect count + * Exceed max retry connect count. */ E_MAX_RECONNECT(3), /** - * user callback throws exception + * User callback throws exception. */ E_USER(4), ////////// 500~: receive data error //////////// /** - * unknown data protocol + * Unknown data protocol. */ E_PROTOCOL(500), /** - * unknown header type + * Unknown header type. */ E_HEADER_TYPE(501), /** - * failed to auth + * Failed to auth. */ NO_AUTH(502), /** - * unknown compress type + * Unknown compress type. */ E_COMPRESS_TYPE(503), /** - * length not match + * Length not match. */ E_LEN(504), /** - * failed to parse data + * Failed to parse data. */ E_PARSE(505); + /** + * The ordinal of this enumeration constant. + */ int code; + /** + * Constructor. + * + * @param code The ordinal of this enumeration constant. + */ ErrorCode(int code) { this.code = code; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientException.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientException.java deleted file mode 100644 index 15f7f488a604ace3d74d2550afc535125961c258..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.exception; - -public class DRCClientException extends Exception { - - private static final long serialVersionUID = 1L; - - public DRCClientException() { - } - - public DRCClientException(final String message) { - super(message); - } -} \ No newline at end of file diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientRunTimeException.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientRunTimeException.java deleted file mode 100644 index fdaccc7e8566f80ed3be532ec0d4ad8190c80f8e..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/DRCClientRunTimeException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.exception; - -public class DRCClientRunTimeException extends RuntimeException { - public DRCClientRunTimeException(String errMessage) { - super(errMessage); - } - - public DRCClientRunTimeException() { - super(); - } - - public DRCClientRunTimeException(Throwable cause) { - super(cause); - } - - public DRCClientRunTimeException(String errMessage, Throwable cause) { - super(errMessage, cause); - } -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogMessageException.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogMessageException.java new file mode 100644 index 0000000000000000000000000000000000000000..2969c4a61d69f0488e0a996fe1f16916d43cd030 --- /dev/null +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogMessageException.java @@ -0,0 +1,52 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +oblogclient is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +package com.oceanbase.clogproxy.client.exception; + +/** + * This is a subclasses of {@link RuntimeException} primarily used in process of parsing {@link com.oceanbase.clogproxy.client.message.LogMessage}. + */ +public class LogMessageException extends RuntimeException { + + /** + * Constructor with message. + * + * @param errMessage Error message. + */ + public LogMessageException(String errMessage) { + super(errMessage); + } + + /** + * Constructor with no arguments. + */ + public LogMessageException() { + super(); + } + + /** + * Constructor with cause. + * + * @param cause Cause error or exception. + */ + public LogMessageException(Throwable cause) { + super(cause); + } + + /** + * Constructor with error message and cause. + * + * @param errMessage Error message. + * @param cause Cause error or exception. + */ + public LogMessageException(String errMessage, Throwable cause) { + super(errMessage, cause); + } +} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogProxyClientException.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogProxyClientException.java index 0c18c894b9ce45d021cf23dbe1b14eda3dd306f6..5d70c138c1550d2a7e4d1e8856f94e700a2e2354 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogProxyClientException.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/exception/LogProxyClientException.java @@ -12,31 +12,67 @@ package com.oceanbase.clogproxy.client.exception; import com.oceanbase.clogproxy.client.enums.ErrorCode; +/** + * This is a subclasses of {@link RuntimeException} used to indicate the exception occurs in client with an error code. + */ public class LogProxyClientException extends RuntimeException { + + /** + * Error code. + */ private ErrorCode code = ErrorCode.NONE; + /** + * Constructor with error code and message. + * + * @param code Error code. + * @param message Error message. + */ public LogProxyClientException(ErrorCode code, String message) { super(message); this.code = code; } + /** + * Constructor with error code and exception. + * + * @param code Error code. + * @param exception Exception instance. + */ public LogProxyClientException(ErrorCode code, Exception exception) { super(exception.getMessage(), exception.getCause()); this.code = code; } + /** + * Constructor with error code, error message and cause. + * + * @param code Error code. + * @param message Error message. + * @param throwable Cause. + */ public LogProxyClientException(ErrorCode code, String message, Throwable throwable) { super(message, throwable); this.code = code; } + /** + * Identify whether the client should stop the stream. + * + * @return The flag of whether the client should stop the stream. + */ public boolean needStop() { return (code == ErrorCode.E_MAX_RECONNECT) || (code == ErrorCode.E_PROTOCOL) || - (code == ErrorCode.E_HEADER_TYPE) || (code == ErrorCode.NO_AUTH) || - (code == ErrorCode.E_COMPRESS_TYPE) || (code == ErrorCode.E_LEN) || - (code == ErrorCode.E_PARSE); + (code == ErrorCode.E_HEADER_TYPE) || (code == ErrorCode.NO_AUTH) || + (code == ErrorCode.E_COMPRESS_TYPE) || (code == ErrorCode.E_LEN) || + (code == ErrorCode.E_PARSE); } + /** + * Get the error code. + * + * @return Error code. + */ public ErrorCode getCode() { return code; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilter.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilter.java deleted file mode 100644 index 14dff958e4ad34d24a395a48a0b5d5a338c23a8d..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilter.java +++ /dev/null @@ -1,228 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.fliter; - -import com.oceanbase.clogproxy.client.enums.DBType; -import com.oceanbase.clogproxy.client.exception.DRCClientException; -import com.oceanbase.clogproxy.client.util.DataFilterUtil; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class DataFilter implements DataFilterBase { - /* Used to be compatibility with old version */ - private String oldBranchDb; - - private String filterInfo; - // String save the source filter string user passed through - private String sourceFilter; - // String save the filter that will be sent to store to acquire data - // For ob1.0, that must be four columns, dbname like a.b. - private String connectStoreFilterConditions; - - private final StringBuilder builder; - - private final Map>> requires; - - private final Map>> dbTableColsReflectionMap; - //If all cols needed is '*', then we don't need do filter operation. - //In that case, we can save a lot compute. - private boolean isAllMatch = true; - - private String tenant; - - public DataFilter() { - oldBranchDb = null; - filterInfo = null; - builder = new StringBuilder(); - requires = new HashMap>>(); - dbTableColsReflectionMap = new HashMap>>(); - } - - /** - * Initialize the filter using formatted string. - * @param tenant tenant name - * @param tableFields the formatted filter information such as - * "tableName1;fieldName1;fieldName2|tableName2;fieldName1". No ";" or "|" should be - * transfer to - * *.tableName1.fieldName1|*.tableName1.fieldName2|... - * added at the beginning or end of the string. - */ - public DataFilter(String tenant, String tableFields) { - this(tableFields); - this.tenant = tenant; - } - - public DataFilter(String tableFields) { - oldBranchDb = null; - builder = new StringBuilder(); - requires = new HashMap>>(); - dbTableColsReflectionMap = new HashMap>>(); - builder.append(tableFields); - this.sourceFilter = tableFields; - } - - /** - * The current version uses topic instead of dbname, so use the - * method to be compatible with the older version. - * @param db is the original branched db name. - */ - @Override - public void setBranchDb(final String db) { - oldBranchDb = db; - } - - /** - * Add more filter information after initializing, note that the user should - * make it consistent to the formatted parameters. - * @param tableFields consistent formatted filter information. - */ - public void addTablesFields(String tableFields) { - builder.append(tableFields); - } - - @Override - public boolean getIsAllMatch() { - return isAllMatch; - } - - @Override - public Map>> getReflectionMap() { - return dbTableColsReflectionMap; - } - - @Override - public Map>> getRequireMap() { - return requires; - } - - //Before validate function called, toString may return null; - //Yet, user should not care about this. That's inter behavior. - @Override - public String toString() { - return connectStoreFilterConditions; - } - - /** - * The validate function will form mysql, ob0.5, oracle eg filter condition. - */ - private boolean validateNormalFilterString() { - if (filterInfo != null) { - return true; - } - - String s = builder.toString(); - String[] tbs = s.split("\\|"); - - int colStart; - StringBuilder builder1 = new StringBuilder(); - for (String s1 : tbs) { - String[] tb = s1.split("[;,\\.]"); - if (tb.length > 0) { - - String itemDb; - String itemTb; - - if (tb.length <= 2) { - if (oldBranchDb != null) { - itemDb = oldBranchDb; - } else { - itemDb = "*"; - } - colStart = 1; - itemTb = tb[0]; - } else { - colStart = 2; - itemDb = tb[0]; - itemTb = tb[1]; - } - if (tenant != null) { - builder1.append(tenant).append("."); - } - builder1.append(itemDb).append(".").append(itemTb).append("|"); - if (tb.length > colStart) { - List cols = new ArrayList(); - for (int i = colStart; i < tb.length; i++) { - cols.add(tb[i]); - //here, we don't use trim in case that " *" or "* " or " * " is kind of col names - if (!"*".equals(tb[i])) { - isAllMatch = false; - } - } - - DataFilterUtil.putColNames(itemDb, itemTb, cols, this); - } - } - } - if (builder1.charAt(builder1.length() - 1) == '|') { - builder1.deleteCharAt(builder1.length() - 1); - } - filterInfo = builder1.toString(); - connectStoreFilterConditions = filterInfo; - return true; - } - - /** - * The validate function will reform the filter condition and cols info - */ - private boolean validateOB10FilterString() { - if (sourceFilter == null) { - return false; - } - String[] tenantAndDbAndTBAndCols = sourceFilter.split("\\|"); - requires.clear(); - StringBuilder filterConditionBuilder = new StringBuilder(); - for (String s1 : tenantAndDbAndTBAndCols) { - String[] tb = s1.split("[;,\\.]"); - if (tb.length < 4) { - // tenant dbname tableName columnName is strictly required for 0b1.0 - return false; - } - String tenant = tb[0]; - String dbname = (oldBranchDb != null) ? oldBranchDb : tb[1]; - String tableName = tb[2]; - List cols = new ArrayList(); - for (int i = 3; i < tb.length; ++i) { - cols.add(tb[i]); - if (!"*".equals(tb[i])) { - isAllMatch = false; - } - } - //format string passed to store - String formatDBName = tenant + "." + dbname; - filterConditionBuilder.append(formatDBName).append(FILTER_SEPARATOR_INNER); - filterConditionBuilder.append(tableName).append(FILTER_SEPARATOR); - DataFilterUtil.putColNames(formatDBName, tableName, cols, this); - } - connectStoreFilterConditions = filterConditionBuilder.toString(); - return true; - } - - // When source type is ocean base 1.0, filter's content is like tenant.dbname.tablename.colvalues| .... - @Override - public boolean validateFilter(DBType dbType) throws DRCClientException { - switch (dbType) { - case OCEANBASE1: { - return validateOB10FilterString(); - } - default: { - return validateNormalFilterString(); - } - } - } - - @Override - public String getConnectStoreFilterConditions() { - return this.connectStoreFilterConditions; - } -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilterBase.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilterBase.java deleted file mode 100644 index 81991849ede933c0d80b33fbb4a3916cd4954f55..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/fliter/DataFilterBase.java +++ /dev/null @@ -1,55 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.fliter; - -import com.oceanbase.clogproxy.client.enums.DBType; -import com.oceanbase.clogproxy.client.exception.DRCClientException; - -import java.util.List; -import java.util.Map; - -public interface DataFilterBase { - String FILTER_SEPARATOR_INNER = "."; - String FILTER_SEPARATOR = "|"; - - /** - * Get the formatted filter string which will be delivered to store. - * Notice: before validate filter function called, getConnectStoreFilterConditions may return null. - * @return filter string - */ - String getConnectStoreFilterConditions(); - - /** - * Validate if the filter user passed is legal - * @param dbType database type which may be ob, mysql, oracle. - * For now, only ob1.0 need special handle which 4 tuple contains tenant, db, tb, cols is strictly required. - * @return true if filter is valid - * @throws DRCClientException if exception occurs - */ - boolean validateFilter(DBType dbType) throws DRCClientException; - - /** - * This function is compatible for old usage. - * @param branchDb the original branched db name. - */ - void setBranchDb(String branchDb); - - /** - * Fast match if cols are all needed. - * @return true if all cols are needed - */ - boolean getIsAllMatch(); - - Map>> getReflectionMap(); - - Map>> getRequireMap(); - -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/FieldParseListener.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/FieldParseListener.java index 0fa20dbb9ac63f09611280efb30ff125f3205b19..1b557462c3a1143e90b8d619bd1980c2fb313b72 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/FieldParseListener.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/FieldParseListener.java @@ -12,7 +12,17 @@ package com.oceanbase.clogproxy.client.listener; import com.oceanbase.clogproxy.client.message.DataMessage; +/** + * This interface defined a kind of listener for field parsing. + */ public interface FieldParseListener { - public void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception; -} \ No newline at end of file + /** + * Handle the filed parsing result. + * + * @param prev The original field. + * @param next The field after parsing. + * @throws Exception When exception occurs. + */ + void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception; +} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/RecordListener.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/RecordListener.java index 1681663e10e2cfafe482385ea0e3a7e071c624df..52787606a3e96b8f1fd885909e6e7fbd80af0e18 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/RecordListener.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/RecordListener.java @@ -14,9 +14,22 @@ package com.oceanbase.clogproxy.client.listener; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.message.LogMessage; +/** + * This interface defined a kind of listener for record response. + */ public interface RecordListener { - void notify(LogMessage record); + /** + * Handle the {@link LogMessage}. + * + * @param logMessage A {@link LogMessage} instance. + */ + void notify(LogMessage logMessage); + /** + * Handle the exception. + * + * @param e An exception. + */ void onException(LogProxyClientException e); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/StatusListener.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/StatusListener.java index c846429c2f087bdb500fe7e3375c6b459ee33cca..dd000c65de01247c0ea8f411388671f8f46d62f2 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/StatusListener.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/listener/StatusListener.java @@ -12,6 +12,14 @@ package com.oceanbase.clogproxy.client.listener; import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto; +/** + * This interface defined a kind of listener for {@link LogProxyProto.RuntimeStatus} response. + */ public interface StatusListener { + /** + * Handle the response of {@link LogProxyProto.RuntimeStatus}. + * + * @param status A {@link LogProxyProto.RuntimeStatus} response. + */ void notify(LogProxyProto.RuntimeStatus status); } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/DataMessage.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/DataMessage.java index d6cb21b460cf39a0446db96ccbc9b398e2a89988..4e93e01f6155e333b7e50d80923328bb70877fa0 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/DataMessage.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/DataMessage.java @@ -10,7 +10,6 @@ See the Mulan PSL v2 for more details. */ package com.oceanbase.clogproxy.client.message; -import com.oceanbase.clogproxy.client.config.DRCConfig; import com.oceanbase.clogproxy.client.enums.DBType; import com.oceanbase.clogproxy.client.listener.FieldParseListener; import com.oceanbase.clogproxy.client.util.StringUtils; @@ -854,25 +853,6 @@ public class DataMessage extends Message { return records; } - /** - * Construct the message from DataInputStream. - * - * @param reader is the DataInputStream. - * @param drcConfig DRCConfig - * @throws IOException if an I/O error occurs - */ - public void mergeFrom(final DataInputStream reader, DRCConfig drcConfig) throws IOException { - do { - Record record = new Record(); - record.mergeFrom(reader); - record.setRegionId(drcConfig.getRegionId()); - if (record.isEnding()) { - break; - } - records.add(record); - } while (true); - } - @Override public void clear() { super.clear(); diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/LogMessage.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/LogMessage.java index 9c2029add60df6362f5ce4c681d9848691b8ebaf..5a640414c1ce04425357a2dae1058beb2ff75de1 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/LogMessage.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/message/LogMessage.java @@ -12,7 +12,7 @@ package com.oceanbase.clogproxy.client.message; import com.oceanbase.clogproxy.client.constants.DataType; import com.oceanbase.clogproxy.client.enums.DBType; -import com.oceanbase.clogproxy.client.exception.DRCClientRunTimeException; +import com.oceanbase.clogproxy.client.exception.LogMessageException; import com.oceanbase.clogproxy.client.listener.FieldParseListener; import com.oceanbase.clogproxy.client.util.BinaryMessageUtils; import io.netty.buffer.ByteBuf; @@ -224,7 +224,7 @@ public class LogMessage extends DataMessage.Record { try { dbName = BinaryMessageUtils.getString(byteBuf.array(), (int) dbNameOffset, UTF8_ENCODING); } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } } @@ -240,7 +240,7 @@ public class LogMessage extends DataMessage.Record { try { tableName = BinaryMessageUtils.getString(byteBuf.array(), (int) tbNameOffset, UTF8_ENCODING); } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } } @@ -267,7 +267,7 @@ public class LogMessage extends DataMessage.Record { serverId = BinaryMessageUtils.getString(byteBuf.array(), (int) instanceOffset, DEFAULT_ENCODING); } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } } @@ -634,7 +634,7 @@ public class LogMessage extends DataMessage.Record { } catch (Exception e) { fields = null; - throw new DRCClientRunTimeException(e.getMessage(), e); + throw new LogMessageException(e.getMessage(), e); } return fields; @@ -660,7 +660,7 @@ public class LogMessage extends DataMessage.Record { } } } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } return primaryKeyIndexList; } @@ -1014,7 +1014,7 @@ public class LogMessage extends DataMessage.Record { return pkValues; } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } @@ -1076,7 +1076,7 @@ public class LogMessage extends DataMessage.Record { } } } catch (Exception e) { - throw new DRCClientRunTimeException(e); + throw new LogMessageException(e); } return tuples; @@ -1092,7 +1092,7 @@ public class LogMessage extends DataMessage.Record { } } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } @@ -1157,7 +1157,7 @@ public class LogMessage extends DataMessage.Record { } } } catch (Exception e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } return uniqueKeyList; } @@ -1234,7 +1234,7 @@ public class LogMessage extends DataMessage.Record { return BinaryMessageUtils.getString(byteBuf.array(), (int) encoding, DEFAULT_ENCODING); } catch (UnsupportedEncodingException e) { - throw new DRCClientRunTimeException(e.getMessage(), e.getCause()); + throw new LogMessageException(e.getMessage(), e.getCause()); } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/BinaryMessageUtils.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/BinaryMessageUtils.java index a15ed2d96e54d4664390138b02ecb25b9c8fbf82..e76fcf339fc4b6863614dee9d44a71b6d60dea82 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/BinaryMessageUtils.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/BinaryMessageUtils.java @@ -21,19 +21,24 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; - +/** + * Utils class for binary message. + */ public class BinaryMessageUtils { + /** + * + */ private static final int PREFIX_LENGTH = 12; /** - * get string begin with offset + * Get string begin with offset. * - * @param data bytes array - * @param offset read offset - * @param encoding string encoding - * @return result string - * @throws UnsupportedEncodingException when the encoding is not supported + * @param data A bytes array. + * @param offset Reading offset. + * @param encoding String encoding. + * @return Result string. + * @throws UnsupportedEncodingException When the encoding is not supported. */ public static String getString(byte[] data, int offset, String encoding) throws UnsupportedEncodingException { ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN); @@ -47,12 +52,12 @@ public class BinaryMessageUtils { } /** - * get list begin with offset + * Get list begin with offset. * - * @param data bytes array - * @param offset read offset - * @return result list - * @throws IOException if data type is unsigned long + * @param data A bytes array. + * @param offset Reading offset. + * @return Result list. + * @throws IOException If data type is unsigned long. */ public static List getArray(byte[] data, int offset) throws IOException { ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN); @@ -107,11 +112,11 @@ public class BinaryMessageUtils { } /** - * get ByteString begin with offset + * Get ByteString begin with offset. * - * @param data bytes array - * @param offset read offset - * @return list of ByteString + * @param data A bytes array. + * @param offset Reading offset. + * @return A list of {@link ByteString}. */ public static List getByteStringList(byte[] data, long offset) { if (offset == -1) { @@ -138,8 +143,8 @@ public class BinaryMessageUtils { lists.add(null); } else { lists.add(new ByteString(wrapByteBuf.array(), - PREFIX_LENGTH + currentOffset + readBytes + (int) offset, - nextOffset - currentOffset - 1)); + PREFIX_LENGTH + currentOffset + readBytes + (int) offset, + nextOffset - currentOffset - 1)); } currentOffset = nextOffset; } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/ClientIdGenerator.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/ClientIdGenerator.java index 26eab7ce418b275e1234075c032c511c151b6520..f653ad031153c242c2a27d9c044edaff25befe9f 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/ClientIdGenerator.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/ClientIdGenerator.java @@ -14,18 +14,25 @@ import com.oceanbase.clogproxy.common.util.NetworkUtil; import java.lang.management.ManagementFactory; +/** + * The class used to generate client id. + */ public class ClientIdGenerator { /** - * LocalIP_PID_currentTimestamp - * pattern may be change, never depend on the content of this - - * @return client id string + * Generate a new client id in format "LocalIP"."PID"."currentTimestamp". + * Pattern may be changed, never depend on the content of this. + * + * @return Client id string. */ public static String generate() { - return NetworkUtil.getLocalIp() + "_" + getProcessId() + "_" + (System.currentTimeMillis() / 1000); } + /** + * Get the process id. + * + * @return Process id. + */ private static String getProcessId() { // Note: may fail in some JVM implementations // therefore fallback has to be provided diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/DataFilterUtil.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/DataFilterUtil.java deleted file mode 100644 index 68e8572164475a92dcdbb3307ffde1a0a9ff8b60..0000000000000000000000000000000000000000 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/DataFilterUtil.java +++ /dev/null @@ -1,163 +0,0 @@ -/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. -oblogclient is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. */ - -package com.oceanbase.clogproxy.client.util; - -import com.oceanbase.clogproxy.client.fliter.DataFilterBase; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class DataFilterUtil { - /** - * - * @param db db name - * @param tb table name - * @param cols column names - * @param dataFilterBase DataFilterBase - */ - public static void putColNames(String db, String tb, List cols, - DataFilterBase dataFilterBase) { - - if (tb == null) { - return; - } - Map>> dbAndTablePair = dataFilterBase.getRequireMap(); - boolean founded = false; - for (Map.Entry>> dbEntry : dbAndTablePair.entrySet()) { - if (db == null || db.equalsIgnoreCase(dbEntry.getKey())) { - for (Map.Entry> entry : dbEntry.getValue().entrySet()) { - if (tb.equalsIgnoreCase(entry.getKey())) { - founded = true; - entry.getValue().addAll(cols); - } - } - - if (!founded) { - // db is already in the filter, but the table is not, so add the table - Map> tabMap = dbEntry.getValue(); - tabMap.put(tb, cols); - founded = true; - } - } - } - - if (!founded) { - // db is not in the filter, so add two maps - Map> tabMap = new HashMap>(); - tabMap.put(tb, cols); - dbAndTablePair.put(db, tabMap); - } - } - - /** - * Use the give db and tb name to retrieve cols list - * @param db db name - * @param tb table name - * @param dataFilterBase DataFilterBase - * @return cols reference to corresponded db name and table name - * Note: this function get cols from map in old DataFilter implementation - */ - public static List getColNamesWithMapping(String db, String tb, - DataFilterBase dataFilterBase) { - if (tb == null) { - return null; - } - Map>> dbAndTablePair = dataFilterBase.getReflectionMap(); - Map> tableAndCols = dbAndTablePair.get(db); - if (tableAndCols == null) { - //if we don't find tableAndCols, that mean this dbName appears for the first time, - //and we use getColNames to require the missing cols and update map; - tableAndCols = new HashMap>(); - List cols = getColNames(db, tb, dataFilterBase); - tableAndCols.put(tb, cols); - dbAndTablePair.put(db, tableAndCols); - return cols; - } else { - List needCols = tableAndCols.get(tb); - //we propose the cols can't be null, so we use null to determinate whether the cols we - //needed has existed in the map - if (needCols == null) { - //the cols we needed is missing ,use getColNames to require the missing cols - List cols = getColNames(db, tb, dataFilterBase); - tableAndCols.put(tb, cols); - return cols; - } else { - //the cols existed, just return the value. - return needCols; - } - } - } - - /** - * Use the give db and tb name to retrieve cols list - * @param db db name - * @param tb table name - * @param dataFilterBase DataFilterBase - * @return cols reference to corresponded db name and table name - */ - public static List getColNames(String db, String tb, DataFilterBase dataFilterBase) { - - if (tb == null) { - return null; - } - Map>> requireMap = dataFilterBase.getRequireMap(); - for (Map.Entry>> dbEntry : requireMap.entrySet()) { - StringBuffer buf = new StringBuffer(dbEntry.getKey()); - processStringToRegularExpress(buf); - if (db == null || db.toLowerCase().matches(buf.toString().toLowerCase())) { - for (Map.Entry> entry : dbEntry.getValue().entrySet()) { - buf = new StringBuffer(entry.getKey()); - processStringToRegularExpress(buf); - if (tb.toLowerCase().matches(buf.toString().toLowerCase())) { - return entry.getValue(); - } - } - } - } - return null; - } - - /** - * This function will first replace all "." to "\.", then replace all "*" to ".*" - * - * @param stringBuffer original string buffer - */ - public static void processStringToRegularExpress(StringBuffer stringBuffer) { - int index; - int beginIndex = 0; - while (-1 != (index = stringBuffer.indexOf(".", beginIndex))) { - stringBuffer.insert(index, '\\'); - beginIndex = index + 2; - } - beginIndex = 0; - while (-1 != (index = stringBuffer.indexOf("*", beginIndex))) { - stringBuffer.insert(index, '.'); - beginIndex = index + 2; - } - } - - /** - * Judge if the given col name exists in col lists - * @param col col to be judged - * @param s cols list - * @return true if exists in, else false - */ - public static boolean isColInArray(final String col, final List s) { - for (int i = 0; i < s.size(); i++) { - if ("*".equals(s.get(i)) || col.equalsIgnoreCase(s.get(i))) { - return true; - } - } - return false; - } - -} diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NettyEventLoopUtil.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NettyEventLoopUtil.java index 919b1047e302b3fafd6a2fe1bab80486fbe8a579..c630ad80b896b8488fa22532b9d6f8b160ab2e64 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NettyEventLoopUtil.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/NettyEventLoopUtil.java @@ -20,27 +20,34 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.util.concurrent.ThreadFactory; +/** + * Utils class for netty. + */ public class NettyEventLoopUtil { - /** check whether epoll enabled, and it would not be changed during runtime. */ - private static boolean epollEnabled = Epoll.isAvailable(); + /** + * Check whether epoll enabled, and it would not be changed during runtime. + */ + private static final boolean EPOLL_ENABLED = Epoll.isAvailable(); /** - * Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled. + * Create a new {@link EventLoopGroup} according to current platform and system property, fallback to NIO when epoll not enabled. * - * @param nThreads number of threads - * @param threadFactory ThreadFactory - * @return an EventLoopGroup suitable for the current platform + * @param nThreads Number of threads. + * @param threadFactory A {@link ThreadFactory} instance. + * @return An {@link EventLoopGroup} instance. */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory) + return EPOLL_ENABLED ? new EpollEventLoopGroup(nThreads, threadFactory) : new NioEventLoopGroup(nThreads, threadFactory); } /** - * @return a SocketChannel class suitable for the given EventLoopGroup implementation + * Get the suitable {@link SocketChannel} class according to current platform and system property. + * + * @return A {@link SocketChannel} implementation class. */ public static Class getClientSocketChannelClass() { - return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class; + return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class; } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/StringUtils.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/StringUtils.java index 8cfe6fd4f318536be067e89edb24d05ba590aa18..83fe7e2320ea4470edf9108f5f8fabfc2d86a606 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/StringUtils.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/StringUtils.java @@ -13,14 +13,17 @@ package com.oceanbase.clogproxy.client.util; import java.util.ArrayList; import java.util.List; +/** + * Utils class for string. + */ public class StringUtils { /** * Split a string by one separator character. The performance * is better than Java String split. - * - * @param str is the string need be split. - * @param separatorChar the single separator character. - * @return the array of split items. + * + * @param str The string need be split. + * @param separatorChar The single separator character. + * @return The array of split items. */ public static String[] split(String str, char separatorChar) { if (str == null) { @@ -57,6 +60,6 @@ public class StringUtils { list.add(str.substring(start, i)); } - return (String[]) list.toArray(new String[list.size()]); + return list.toArray(new String[list.size()]); } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/Validator.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/Validator.java index 00de5c98ead14697c99cde896bd58a294746c7df..55420605a66390e773973dd8a691ffd6bd974720 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/Validator.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/util/Validator.java @@ -12,29 +12,56 @@ package com.oceanbase.clogproxy.client.util; import java.util.Map; +/** + * Utils class used to validate arguments. + */ public class Validator { private static final int MINIMAL_VALID_PORT = 1; private static final int MAXIMAL_VALID_PORT = 65535; + /** + * Validate the object is not null, otherwise throws an {@link NullPointerException}. + * + * @param obj Object to be verified. + * @param message Message in the NullPointerException. + */ public static void notNull(Object obj, String message) { if (obj == null) { throw new NullPointerException(message); } } + /** + * Validate the port number is valid, otherwise throws an {@link IllegalArgumentException}. + * + * @param port Port number to be verified. + * @param message Message in the IllegalArgumentException. + */ public static void validatePort(int port, String message) { if (port < MINIMAL_VALID_PORT || port >= MAXIMAL_VALID_PORT) { throw new IllegalArgumentException(message); } } + /** + * Validate the string is not null or empty, otherwise throws an {@link IllegalArgumentException}. + * + * @param val String to be verified. + * @param message Message in the IllegalArgumentException. + */ public static void notEmpty(String val, String message) { if (val == null || val.isEmpty()) { throw new IllegalArgumentException(message); } } + /** + * Validate the map is not null or empty, otherwise throws an {@link IllegalArgumentException}. + * + * @param map Map to be verified. + * @param message Message in the IllegalArgumentException. + */ public static void notEmpty(Map map, String message) { if (map == null || map.isEmpty()) { throw new IllegalArgumentException(message);