未验证 提交 1686e1dc 编写于 作者: W whhe 提交者: GitHub

add comments and cleanup redundant class (#6)

* add comment and cleanup redundant class

* update java doc

* add javadoc to listeners

* cleanup exception class
上级 8cae74a2
......@@ -10,6 +10,13 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.config;
public class ShareConf {
/**
* The class that defines the shared constants.
*/
public class SharedConf {
/**
* Flag of whether to use the hash function to process password.
*/
public static boolean AUTH_PASSWORD_HASH = true;
}
......@@ -10,23 +10,40 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
/**
* Compress type enumeration. Primarily used for {@link com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.RecordData}.
*/
public enum CompressType {
/**
* no compress
* No compress.
*/
NONE(0),
/**
* lz4 compress
* LZ4 compress.
*/
LZ4(1);
private int code;
/**
* The ordinal of this enumeration constant.
*/
private final int code;
/**
* Constructor.
*
* @param code The ordinal of this enumeration constant.
*/
CompressType(int code) {
this.code = code;
}
/**
* Returns the enum constant of CompressType with the specified code.
*
* @param code The ordinal of this enumeration constant.
* @return The enum constant.
*/
public static CompressType codeOf(int code) {
for (CompressType v : values()) {
if (v.code == code) {
......@@ -36,6 +53,11 @@ public enum CompressType {
return null;
}
/**
* Get the ordinal of this enumeration constant.
*
* @return The ordinal of this enumeration constant.
*/
public int code() {
return code;
}
......
......@@ -10,59 +10,75 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
/**
* Header type enumeration. Used to identify the type of request or response in protobuf.
*/
public enum HeaderType {
/**
* error response
* Error response.
*/
ERROR_RESPONSE(-1),
/**
* client request handshake
* Client handshake request.
*/
HANDSHAKE_REQUEST_CLIENT(1),
/**
* response to client handshake
* Client handshake response.
*/
HANDSHAKE_RESPONSE_CLIENT(2),
/**
* logreader request handshake
* LogReader handshake request.
*/
HANDSHAKE_REQUEST_LOGREADER(3),
/**
* logreader response handshake
* LogReader handshake response.
*/
HANDSHAKE_RESPONSE_LOGREADER(4),
/**
* logreader data stream
* LogReader data stream.
*/
DATA_LOGREADER(5),
/**
* client data stream
* Client data stream.
*/
DATA_CLIENT(6),
/**
* status info of server runtime
* Status info of server runtime.
*/
STATUS(7),
/**
* status info of LogReader
* Status info of LogReader.
*/
STATUS_LOGREADER(8),
;
STATUS_LOGREADER(8);
/**
* The ordinal of this enumeration constant.
*/
private final int code;
/**
* Constructor.
*
* @param code The ordinal of this enumeration constant.
*/
HeaderType(int code) {
this.code = code;
}
/**
* Returns the enum constant of HeaderType with the specified code.
*
* @param code The ordinal of this enumeration constant.
* @return The enum constant.
*/
public static HeaderType codeOf(int code) {
for (HeaderType t : values()) {
if (t.code == code) {
......@@ -72,6 +88,11 @@ public enum HeaderType {
return null;
}
/**
* Get the ordinal of this enumeration constant.
*
* @return The ordinal of this enumeration constant.
*/
public int code() {
return code;
}
......
......@@ -10,45 +10,51 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
import java.util.HashMap;
import java.util.Map;
/**
* Log type enumeration.
*/
public enum LogType {
/**
* LogProxy OceanBase LogReader
* LogProxy OceanBase LogReader.
*/
OCEANBASE(0);
/**
* The ordinal of this enumeration constant.
*/
private final int code;
private static final Map<Integer, LogType> CODE_TYPES = new HashMap<>(values().length);
static {
for (LogType logCaptureType : values()) {
CODE_TYPES.put(logCaptureType.code, logCaptureType);
}
}
/**
* Constructor.
*
* @param code The ordinal of this enumeration constant.
*/
LogType(int code) {
this.code = code;
}
public int getCode() {
return this.code;
}
public static LogType fromString(String string) {
if (string == null) {
throw new NullPointerException("logTypeString is null");
/**
* Returns the enum constant of LogType with the specified code.
*
* @param code The ordinal of this enumeration constant.
* @return The enum constant.
*/
public static LogType codeOf(int code) {
for (LogType t : values()) {
if (t.code == code) {
return t;
}
}
return valueOf(string.toUpperCase());
return null;
}
public static LogType fromCode(int code) {
if (CODE_TYPES.containsKey(code)) {
return CODE_TYPES.get(code);
}
return null;
/**
* Get the ordinal of this enumeration constant.
*
* @return The ordinal of this enumeration constant.
*/
public int code() {
return this.code;
}
}
......@@ -10,20 +10,46 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
/**
* Protocol version enumeration.
*/
public enum ProtocolVersion {
/**
* v0 version
* Protocol version 0.
*/
V0(0),
/**
* Protocol version 1.
*/
V1(1),
/**
* Protocol version 2.
*/
V2(2);
/**
* The ordinal of this enumeration constant.
*/
private final int code;
/**
* Constructor.
*
* @param code The ordinal of this enumeration constant.
*/
ProtocolVersion(int code) {
this.code = code;
}
/**
* Returns the enum constant of ProtocolVersion with the specified code.
*
* @param code The ordinal of this enumeration constant.
* @return The enum constant.
*/
public static ProtocolVersion codeOf(int code) {
for (ProtocolVersion v : values()) {
if (v.code == code) {
......@@ -33,6 +59,11 @@ public enum ProtocolVersion {
return null;
}
/**
* Get the ordinal of this enumeration constant.
*
* @return The ordinal of this enumeration constant.
*/
public int code() {
return code;
}
......
......@@ -22,26 +22,73 @@ import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
* Utils class for crypto.
*/
public class CryptoUtil {
/**
* Default cipher key.
*/
private static final String KEY = "LogProxy123*";
/**
* AES key length.
*/
private static final int AES_KEY_SIZE = 256;
/**
* GCM tag length.
*/
private static final int GCM_TAG_LENGTH = 16;
/**
* Create an {@link Encryptor} instance using given cipher key.
*
* @param key Cipher key.
* @return An {@link Encryptor} instance.
*/
public static Encryptor newEncryptor(String key) {
return new Encryptor(key);
}
/**
* Create an Encryptor instance using default cipher key.
*
* @return An {@link Encryptor} instance.
*/
public static Encryptor newEncryptor() {
return new Encryptor(KEY);
}
/**
* This class provides the functionality of encryption and decryption with a specific cipher key.
*/
public static class Encryptor {
/**
* The cipher instance.
*/
private Cipher cipher = null; // not thread-safe
private byte[] key = new byte[AES_KEY_SIZE / 16];
private byte[] iv = new byte[12];
/**
* The key material of the secret key.
*
* @see SecretKeySpec#SecretKeySpec(byte[], String)
*/
private final byte[] key = new byte[AES_KEY_SIZE / 16];
/**
* The IV source buffer.
*
* @see GCMParameterSpec#GCMParameterSpec(int, byte[])
*/
private final byte[] iv = new byte[12];
/**
* Constructor.
*
* @param cipherKey The cipher key used to generate {@link Encryptor#key} and {@link Encryptor#iv}.
*/
private Encryptor(String cipherKey) {
try {
cipher = Cipher.getInstance("AES/GCM/NoPadding");
......@@ -55,6 +102,12 @@ public class CryptoUtil {
}
}
/**
* Encrypt given text.
*
* @param text The original text.
* @return Encrypted data.
*/
public byte[] encrypt(String text) {
SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv);
......@@ -68,6 +121,12 @@ public class CryptoUtil {
}
}
/**
* Decrypt given data.
*
* @param cipherText Encrypted data.
* @return The original string.
*/
public String decrypt(byte[] cipherText) {
SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv);
......@@ -82,10 +141,22 @@ public class CryptoUtil {
}
}
/**
* Compute hash value of given array of bytes.
*
* @param bytes The origin array of bytes.
* @return The array of bytes for the resulting hash value.
*/
public static byte[] sha1(byte[] bytes) {
return DigestUtils.sha1(bytes);
}
/**
* Compute hash value of given string.
*
* @param text The origin string.
* @return The array of bytes for the resulting hash value.
*/
public static byte[] sha1(String text) {
return DigestUtils.sha1(text);
}
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
public class Decoder {
public static String decodeStringInt(ByteBuf buffer) {
if (buffer.readableBytes() < Integer.BYTES) {
return null;
}
buffer.markReaderIndex();
int length = buffer.readInt();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
String str = new String(bytes);
if (str.isEmpty()) {
throw new RuntimeException("decode string is null or empty");
}
return str;
}
public static String decodeStringByte(ByteBuf buffer) {
if (buffer.readableBytes() < Byte.BYTES) {
return null;
}
buffer.markReaderIndex();
short length = buffer.readByte();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
String str = new String(bytes);
if (str.isEmpty()) {
throw new RuntimeException("decode string is null or empty");
}
return str;
}
public static ByteBuf encodeStringInt(String string) {
if (string == null || string.length() == 0) {
throw new RuntimeException("encode string is null or empty");
}
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(4 + string.length());
byteBuf.writeInt(string.length());
byteBuf.writeBytes(string.getBytes());
return byteBuf;
}
}
......@@ -14,24 +14,54 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.apache.commons.codec.DecoderException;
/**
* This class is used to convert hexadecimal strings.
*/
public final class Hex {
/**
* Returns a multi-line hexadecimal dump of the array that is easy to read by humans.
*
* @param array An array of bytes
* @return A multi-line hexadecimal dump string
*/
public static String dump(byte[] array) {
return dump(array, 0, array.length);
}
/**
* Returns a multi-line hexadecimal dump of the specified sub-region of bytes that is easy to read by humans.
*
* @param bytes An array of bytes
* @param offset The offset of the sub-region start position
* @param length The length of the sub-region
* @return A multi-line hexadecimal dump string
*/
public static String dump(byte[] bytes, int offset, int length) {
return ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(bytes, offset, length));
}
/**
* Converts an array of bytes into a string representing the hexadecimal values of each byte in order.
*
* @param bytes An array of bytes
* @return A String containing uppercase hexadecimal characters
*/
public static String str(byte[] bytes) {
return org.apache.commons.codec.binary.Hex.encodeHexString(bytes, false);
}
public static byte[] toBytes(String hexstr) {
/**
* Converts a String representing hexadecimal values into an array of bytes of those same values.
*
* @param hexStr A String representing hexadecimal values
* @return An array of bytes
*/
public static byte[] toBytes(String hexStr) {
try {
return org.apache.commons.codec.binary.Hex.decodeHex(hexstr);
return org.apache.commons.codec.binary.Hex.decodeHex(hexStr);
} catch (DecoderException e) {
return null;
}
}
}
\ No newline at end of file
}
......@@ -13,17 +13,17 @@ package com.oceanbase.clogproxy.common.util;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.net.*;
import java.util.Enumeration;
/**
* Utils class for network.
*/
public class NetworkUtil {
/**
* Local ip.
*/
private static String IP;
static {
......@@ -50,9 +50,9 @@ public class NetworkUtil {
}
/**
* get local ip
* Get local ip.
*
* @return local ip
* @return Local ip.
*/
public static String getLocalIp() {
return IP;
......@@ -60,6 +60,9 @@ public class NetworkUtil {
/**
* Parse the remote address of the channel.
*
* @param channel A channel
* @return The address string.
*/
public static String parseRemoteAddress(final Channel channel) {
if (null == channel) {
......@@ -70,9 +73,10 @@ public class NetworkUtil {
}
/**
* Parse the address with rules:
* <ol>
* <li>if an address starts with a '/', skip it.
* <li>if an address contains a '/', substring it.
* <li>If an address starts with a '/', skip it.
* <li>If an address contains a '/', substring it.
* </ol>
*/
private static String doParse(String addr) {
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.util;
public class Password {
private String value;
public void set(String value) {
this.value = value;
}
public String get() {
return value;
}
@Override
public String toString() {
return "******";
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.util;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TaskExecutor {
private static class Singleton {
private static final TaskExecutor INSTANCE = new TaskExecutor();
}
public static TaskExecutor instance() {
return Singleton.INSTANCE;
}
private TaskExecutor() { }
public static class Task<T> {
protected Future<T> future;
protected Failure failure;
public T get() {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
if (failure != null) {
failure.onError(e);
}
return null;
}
}
public void join() {
get();
}
}
public static class BackgroundTask extends Task<Void> {
@Override
public void join() {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
if (failure != null) {
failure.onError(e);
}
}
}
}
public interface Failure {
void onError(Exception e);
}
private ExecutorService asyncTasks = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), (ThreadFactory) Thread::new);
private ExecutorService bgTasks = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), (ThreadFactory) Thread::new);
private Map<String, ConcurrentTask> concurrentTasks = Maps.newConcurrentMap();
public <T> Task<T> async(Callable<T> callable) {
return async(callable, null);
}
public <T> Task<T> async(Callable<T> callable, Failure failure) {
Task<T> task = new Task<>();
task.future = asyncTasks.submit(callable);
task.failure = failure;
return task;
}
public BackgroundTask background(Callable<Void> callable) {
return background(callable, null);
}
public BackgroundTask background(Callable<Void> callable, Failure failure) {
BackgroundTask task = new BackgroundTask();
task.future = bgTasks.submit(callable);
task.failure = failure;
return task;
}
public static class ConcurrentTask {
private ExecutorService concurrentTasks;
public ConcurrentTask(int parallelism) {
// Never exceed actual CPU core count for init count, or got an Exception
concurrentTasks = new ForkJoinPool(Math.min(parallelism,
Runtime.getRuntime().availableProcessors()),
ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
public <T> Future<T> concurrent(Callable<T> callable) {
return concurrentTasks.submit(callable);
}
}
public ConcurrentTask refConcurrent(String name, int parallelism) {
ConcurrentTask task = concurrentTasks.get(name);
if (task != null) {
return task;
}
task = new ConcurrentTask(parallelism);
concurrentTasks.put(name, task);
return task;
}
public int getAsyncTaskCount() {
return ((ThreadPoolExecutor) asyncTasks).getActiveCount();
}
public int getBgTaskCount() {
return ((ThreadPoolExecutor) bgTasks).getActiveCount();
}
public int getConcurrentTaskCount() {
int count = 0;
for (ConcurrentTask t : concurrentTasks.values()) {
count += ((ForkJoinPool) t.concurrentTasks).getActiveThreadCount();
}
return count;
}
}
......@@ -14,56 +14,123 @@ import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Field;
/**
* Utils class to check and convert data types.
*/
public class TypeTrait {
/**
* Checks if it is a number type object.
*
* @param obj An object to check.
* @return True if it is a number type object, false otherwise.
*/
public static boolean isNumber(Object obj) {
return (obj instanceof Byte) || (obj instanceof Short) ||
(obj instanceof Integer) || (obj instanceof Long);
(obj instanceof Integer) || (obj instanceof Long);
}
/**
* Checks if it is a number type {@link Field}.
*
* @param field A field to check.
* @return True if it is a number type field, false otherwise.
*/
public static boolean isNumber(Field field) {
String typeName = field.getGenericType().getTypeName();
return "byte".equals(typeName) || "java.lang.Byte".equals(typeName) ||
"short".equals(typeName) || "java.lang.Short".equals(typeName) ||
"int".equals(typeName) || "java.lang.Integer".equals(typeName) ||
"long".equals(typeName) || "java.lang.Long".equals(typeName);
"short".equals(typeName) || "java.lang.Short".equals(typeName) ||
"int".equals(typeName) || "java.lang.Integer".equals(typeName) ||
"long".equals(typeName) || "java.lang.Long".equals(typeName);
}
/**
* Checks if it is a real number type object.
*
* @param obj An object to check.
* @return True if it is a real number type object, false otherwise.
*/
public static boolean isReal(Object obj) {
return (obj instanceof Float) || (obj instanceof Double);
}
/**
* Checks if it is a real number type {@link Field}.
*
* @param field A field to check.
* @return True if it is a real number type field, false otherwise.
*/
public static boolean isReal(Field field) {
String typeName = field.getGenericType().getTypeName();
return "float".equals(typeName) || "java.lang.Float".equals(typeName) ||
"double".equals(typeName) || "java.lang.Double".equals(typeName);
"double".equals(typeName) || "java.lang.Double".equals(typeName);
}
/**
* Checks if it is a boolean type object.
*
* @param obj An object to check.
* @return True if it is a boolean type object, false otherwise.
*/
public static boolean isBool(Object obj) {
return obj instanceof Boolean;
}
/**
* Checks if it is a boolean type {@link Field}.
*
* @param field A field to check.
* @return True if it is a boolean type field, false otherwise.
*/
public static boolean isBool(Field field) {
String typeName = field.getGenericType().getTypeName();
return "boolean".equals(typeName) || "java.lang.Boolean".equals(typeName);
}
/**
* Checks if it is a string type object.
*
* @param obj An object to check.
* @return True if it is a string type object, false otherwise.
*/
public static boolean isString(Object obj) {
return (obj instanceof Character) || (obj instanceof String);
}
/**
* Checks if it is a string type {@link Field}.
*
* @param field A field to check.
* @return True if it is a string type field, false otherwise.
*/
public static boolean isString(Field field) {
String typeName = field.getGenericType().getTypeName();
return "char".equals(typeName) || "java.lang.Character".equals(typeName) ||
"java.lang.String".equals(typeName);
"java.lang.String".equals(typeName);
}
/**
* Checks if the object and field are the same loose type.
*
* @param object An object to check.
* @param field A field to check.
* @return True if the object and field are the same loose type, false otherwise.
*/
public static boolean isSameLooseType(Object object, Field field) {
return (isNumber(object) && isNumber(field)) ||
(isReal(object) && isReal(field)) ||
(isBool(object) && isBool(field)) ||
(isString(object) && isString(field));
(isReal(object) && isReal(field)) ||
(isBool(object) && isBool(field)) ||
(isString(object) && isString(field));
}
/**
* Convert a value from string type.
*
* @param value The source string.
* @param clazz Value class.
* @param <T> Expected value type.
* @return The value converted from string.
*/
@SuppressWarnings("unchecked")
public static <T> T fromString(String value, Class<?> clazz) {
if (clazz == Byte.class || clazz == byte.class) {
......
......@@ -21,46 +21,87 @@ import com.oceanbase.clogproxy.client.util.Validator;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import io.netty.handler.ssl.SslContext;
/**
* A client that makes it easy to connect to log proxy and start a {@link ClientStream}.
*/
public class LogProxyClient {
/**
* A {@link ClientStream} instance.
*/
private final ClientStream stream;
/**
* @param host server hostname name or ip
* @param port server port
* @param config real config object according to what-you-expected
* @param sslContext ssl context to create netty handler
* Constructor with {@link SslContext}.
*
* @param host Log proxy hostname name or ip.
* @param port Log proxy port.
* @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}.
* @param sslContext {@link SslContext} to create netty handler.
*/
public LogProxyClient(String host, int port, AbstractConnectionConfig config, SslContext sslContext) {
Validator.notNull(config.getLogType(), "log type cannot be null");
Validator.notNull(host, "server cannot be null");
Validator.validatePort(port, "port is not valid");
try {
Validator.notNull(config.getLogType(), "log type cannot be null");
Validator.notEmpty(host, "server cannot be null");
Validator.validatePort(port, "port is not valid");
} catch (Exception e) {
throw new IllegalArgumentException("Illegal argument for LogProxyClient");
}
if (!config.valid()) {
throw new IllegalArgumentException("Illegal argument for LogProxyClient");
}
String clientId = ClientConf.USER_DEFINED_CLIENTID.isEmpty() ? ClientIdGenerator.generate() : ClientConf.USER_DEFINED_CLIENTID;
ConnectionParams connectionParams = new ConnectionParams(config.getLogType(), clientId, host, port, config);
connectionParams.setProtocolVersion(ProtocolVersion.V2);
this.stream = new ClientStream(connectionParams, sslContext);
}
/**
* Constructor without {@link SslContext}.
*
* @param host Log proxy hostname name or ip.
* @param port Log proxy port.
* @param config {@link AbstractConnectionConfig} used to create the {@link ClientStream}.
*/
public LogProxyClient(String host, int port, AbstractConnectionConfig config) {
this(host, port, config, null);
}
/**
* Start the client.
*/
public void start() {
stream.start();
}
/**
* Stop the client.
*/
public void stop() {
stream.stop();
}
/**
* Join and wait the client.
*/
public void join() {
stream.join();
}
/**
* Add a {@link RecordListener} to {@link #stream}.
*
* @param recordListener A {@link RecordListener}.
*/
public synchronized void addListener(RecordListener recordListener) {
stream.addListener(recordListener);
}
/**
* Add a {@link StatusListener} to {@link #stream}.
*
* @param statusListener A {@link StatusListener}.
*/
public synchronized void addStatusListener(StatusListener statusListener) {
stream.addStatusListener(statusListener);
}
......
......@@ -17,33 +17,57 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* This is an abstract implementation class of the interface {@link ConnectionConfig}.
*/
public abstract class AbstractConnectionConfig implements ConnectionConfig {
/**
* defined structure configurations
* Defined configurations map.
*/
protected static Map<String, ConfigItem<Object>> configs = new HashMap<>();
/**
* extra configurations
* Extra configurations map.
*/
protected final Map<String, String> extraConfigs = new HashMap<>();
/**
* This class is used to define configuration with a default value.
*
* @param <T> The type of stored value.
*/
@SuppressWarnings("unchecked")
protected static class ConfigItem<T> {
protected String key;
protected T val;
/**
* Sole constructor.
*
* @param key Config key.
* @param val Config value.
*/
public ConfigItem(String key, T val) {
this.key = key;
this.val = val;
configs.put(key, (ConfigItem<Object>) this);
}
/**
* Set value to config item.
*
* @param val Value of specific type.
*/
public void set(T val) {
this.val = val;
}
/**
* Set value of specific type from string.
*
* @param val Value of string type.
*/
public void fromString(String val) {
this.val = TypeTrait.fromString(val, this.val.getClass());
}
......@@ -54,6 +78,11 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
}
}
/**
* Sole constructor.
*
* @param allConfigs The map of configurations.
*/
public AbstractConnectionConfig(Map<String, String> allConfigs) {
if (allConfigs != null) {
for (Entry<String, String> entry : allConfigs.entrySet()) {
......@@ -66,12 +95,28 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
}
}
/**
* Get log type set in configurations.
*
* @return The enum constant of {@link LogType}.
*/
public abstract LogType getLogType();
/**
* Add configurations to {@link #extraConfigs}
*
* @param extraConfigs A map of configurations.
*/
public void setExtraConfigs(Map<String, String> extraConfigs) {
this.extraConfigs.putAll(extraConfigs);
}
/**
* Update value into define configurations map.
*
* @param key Config key.
* @param val New config value.
*/
void set(String key, String val) {
ConfigItem<Object> cs = configs.get(key);
if (cs != null) {
......@@ -80,9 +125,9 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
}
/**
* validate if defined configurations
* Validate defined configurations.
*
* @return True or False
* @return Flag of whether all the defined configurations are valid.
*/
public abstract boolean valid();
}
......@@ -10,29 +10,58 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.common.config.ShareConf;
import com.oceanbase.clogproxy.common.config.SharedConf;
public class ClientConf extends ShareConf {
public static final String VERSION = "1.1.0";
/**
* The class that defines the constants that are used to generate the connection.
*/
public class ClientConf extends SharedConf {
/**
* Client version.
*/
public static final String VERSION = "1.0.1";
/**
* Queue size for storing records received from log proxy.
*/
public static int TRANSFER_QUEUE_SIZE = 20000;
/**
* Connection timeout in milliseconds.
*/
public static int CONNECT_TIMEOUT_MS = 5000;
/**
* Reading queue timeout in milliseconds.
*/
public static int READ_WAIT_TIME_MS = 2000;
/**
* Time to sleep in seconds when retrying.
*/
public static int RETRY_INTERVAL_S = 2;
/**
* max retry time after disconnect, if not data income lasting IDLE_TIMEOUT_S, a reconnect we be trigger
* Maximum number of retries after disconnect, if not data income lasting {@link #IDLE_TIMEOUT_S}, a reconnection will be triggered.
*/
public static int MAX_RECONNECT_TIMES = -1;
/**
* Idle timeout in seconds for netty handler.
*/
public static int IDLE_TIMEOUT_S = 15;
/**
* Maximum number of reads, after which data will be discarded.
*/
public static int NETTY_DISCARD_AFTER_READS = 16;
/**
* set user defined userid,
* for inner use only
* User defined client id.
*/
public static String USER_DEFINED_CLIENTID = "";
/**
* ignore unknown or unsupported record type with a warning log instead throwing an exception
* Ignore unknown or unsupported record type with a warning log instead of throwing an exception.
*/
public static boolean IGNORE_UNKNOWN_RECORD_TYPE = false;
}
......@@ -10,10 +10,30 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
/**
* This is the interface of connection config.
*/
public interface ConnectionConfig {
/**
* Generate a configuration string from connection parameters.
*
* @return The configuration string.
*/
String generateConfigurationString();
/**
* Update the checkpoint.
*
* @param checkpoint A checkpoint string.
*/
void updateCheckpoint(String checkpoint);
/**
* Overrides {@link Object#toString()} to structure a string.
*
* @return The structured string.
*/
@Override
String toString();
}
......@@ -12,7 +12,7 @@ package com.oceanbase.clogproxy.client.config;
import com.google.common.collect.Maps;
import com.oceanbase.clogproxy.client.util.Validator;
import com.oceanbase.clogproxy.common.config.ShareConf;
import com.oceanbase.clogproxy.common.config.SharedConf;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.util.CryptoUtil;
import com.oceanbase.clogproxy.common.util.Hex;
......@@ -21,19 +21,49 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* This is a configuration class for connection to log proxy.
*/
public class ObReaderConfig extends AbstractConnectionConfig {
private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class);
/**
* Root server list.
*/
private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", "");
/**
* Cluster username.
*/
private static final ConfigItem<String> CLUSTER_USER = new ConfigItem<>("cluster_user", "");
/**
* Cluster password.
*/
private static final ConfigItem<String> CLUSTER_PASSWORD = new ConfigItem<>("cluster_password", "");
/**
* Table whitelist.
*/
private static final ConfigItem<String> TABLE_WHITE_LIST = new ConfigItem<>("tb_white_list", "");
/**
* Start timestamp.
*/
private static final ConfigItem<Long> START_TIMESTAMP = new ConfigItem<>("first_start_timestamp", 0L);
/**
* Constructor with empty arguments.
*/
public ObReaderConfig() {
super(Maps.newHashMap());
}
/**
* Constructor with a config map.
*
* @param allConfigs Config map.
*/
public ObReaderConfig(Map<String, String> allConfigs) {
super(allConfigs);
}
......@@ -64,7 +94,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString();
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && ShareConf.AUTH_PASSWORD_HASH) {
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
sb.append(entry.getKey()).append("=").append(value).append(" ");
......@@ -88,49 +118,50 @@ public class ObReaderConfig extends AbstractConnectionConfig {
@Override
public String toString() {
return "rootserver_list=" + RS_LIST + ", cluster_user=" + CLUSTER_USER + ", cluster_password=******, " +
"tb_white_list=" + TABLE_WHITE_LIST + ", start_timestamp=" + START_TIMESTAMP;
"tb_white_list=" + TABLE_WHITE_LIST + ", start_timestamp=" + START_TIMESTAMP;
}
/**
* 设置管控服务列表
* Set root server list.
*
* @param rsList 管控服务列表
* @param rsList Root server list.
*/
public void setRsList(String rsList) {
RS_LIST.set(rsList);
}
/**
* 设置连接OB用户名
* Set cluster username
*
* @param clusterUser 用户名
* @param clusterUser Cluster username.
*/
public void setUsername(String clusterUser) {
CLUSTER_USER.set(clusterUser);
}
/**
* 设置连接OB密码
* Set cluster password
*
* @param clusterPassword 密码
* @param clusterPassword Cluster password.
*/
public void setPassword(String clusterPassword) {
CLUSTER_PASSWORD.set(clusterPassword);
}
/**
* 配置过滤规则,由租户.库.表3个维度组成,每一段 * 表示任意,如:A.foo.bar,B.foo.*,C.*.*,*.*.*
* Set table whitelist. It is composed of three dimensions: tenant, library, and table.
* Asterisk means any, such as: "A.foo.bar", "B.foo.*", "C.*.*", "*.*.*".
*
* @param tableWhiteList 监听表的过滤规则
* @param tableWhiteList Table whitelist.
*/
public void setTableWhiteList(String tableWhiteList) {
TABLE_WHITE_LIST.set(tableWhiteList);
}
/**
* 设置起始订阅的 UNIX时间戳,0表示从当前,通常不要早于1小时
* Set start timestamp, zero means from now on.
*
* @param startTimestamp 起始时间戳
* @param startTimestamp Start timestamp.
*/
public void setStartTimestamp(Long startTimestamp) {
START_TIMESTAMP.set(startTimestamp);
......
......@@ -11,9 +11,9 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
......@@ -35,42 +35,129 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
/**
* This is an implementation class of {@link ChannelInboundHandlerAdapter}.
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
/**
* Magic string used to request log proxy.
*/
private static final byte[] MAGIC_STRING = new byte[]{'x', 'i', '5', '3', 'g', ']', 'q'};
/**
* Client ip address.
*/
private static final String CLIENT_IP = NetworkUtil.getLocalIp();
private static final int HEAD_LENGTH = 7;
/**
* Length of packet header.
*/
private static final int HEAD_LENGTH = 7;
/**
* A client stream.
*/
private ClientStream stream;
/**
* Connection params.
*/
private ConnectionParams params;
/**
* Record queue, it's a {@link BlockingQueue} for storing {@link StreamContext.TransferPacket}.
*/
private BlockingQueue<StreamContext.TransferPacket> recordQueue;
/**
* Handshake type enumeration.
*/
enum HandshakeStateV1 {
/**
* State of parsing the packet header.
*/
PB_HEAD,
/**
* State of handling handshake response.
*/
CLIENT_HANDSHAKE_RESPONSE,
/**
* State of handling record.
*/
RECORD,
/**
* State of handling error response.
*/
ERROR_RESPONSE,
/**
* State of handling runtime status response.
*/
STATUS
}
/**
* Handshake state.
*/
private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD;
/**
* A {@link Cumulator} instance.
*/
private final Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR;
/**
* A {@link ByteBuf} used for channel reading.
*/
ByteBuf buffer;
/**
* A flag of whether channel is active.
*/
private boolean poolFlag = true;
/**
* A flag of whether it is the first part of {@link ByteBuf}.
*/
private boolean first;
/**
* Number of read attempts.
*/
private int numReads = 0;
/**
* A flag of whether the message is not readable.
*/
private boolean dataNotEnough = false;
/**
* The length of message body.
*/
private int dataLength = 0;
/**
* A {@link LZ4Factory} instance.
*/
LZ4Factory factory = LZ4Factory.fastestInstance();
/**
* A {@link LZ4FastDecompressor} instance.
*/
LZ4FastDecompressor fastDecompressor = factory.fastDecompressor();
public ClientHandler() { }
/**
* Constructor with empty arguments.
*/
public ClientHandler() {
}
/**
* Reset {@link #state} to {@link HandshakeStateV1#PB_HEAD}.
*/
protected void resetState() {
state = HandshakeStateV1.PB_HEAD;
}
......@@ -125,6 +212,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Handle header response.
*/
private void handleHeader() {
if (buffer.readableBytes() >= HEAD_LENGTH) {
int version = buffer.readShort();
......@@ -133,13 +223,13 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
checkHeader(version, type, dataLength);
HeaderType headerType = HeaderType.codeOf(type);
if(headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
if (headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE;
} else if(headerType == HeaderType.ERROR_RESPONSE) {
} else if (headerType == HeaderType.ERROR_RESPONSE) {
state = HandshakeStateV1.ERROR_RESPONSE;
} else if(headerType == HeaderType.DATA_CLIENT) {
} else if (headerType == HeaderType.DATA_CLIENT) {
state = HandshakeStateV1.RECORD;
} else if(headerType == HeaderType.STATUS) {
} else if (headerType == HeaderType.STATUS) {
state = HandshakeStateV1.STATUS;
}
} else {
......@@ -147,8 +237,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Handle handshake response.
*/
private void handleHandshakeResponse() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
if (buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.ClientHandshakeResponse response = LogProxyProto.ClientHandshakeResponse.parseFrom(bytes);
......@@ -159,8 +252,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Handle error response.
*/
private void handleErrorResponse() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
if (buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.ErrorResponse response = LogProxyProto.ErrorResponse.parseFrom(bytes);
......@@ -171,8 +267,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Handle server status response.
*/
private void handleServerStatus() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
if (buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes);
......@@ -183,8 +282,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Handle record data response.
*/
private void handleRecord() {
if(buffer.readableBytes() >= dataLength) {
if (buffer.readableBytes() >= dataLength) {
parseDataNew();
state = HandshakeStateV1.PB_HEAD;
} else {
......@@ -192,6 +294,13 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Check if the header is valid.
*
* @param version Protocol version.
* @param type Header type.
* @param length Data length.
*/
private void checkHeader(int version, int type, int length) {
if (ProtocolVersion.codeOf(version) == null) {
logger.error("unsupported protocol version: {}", version);
......@@ -207,6 +316,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Do parse record data from buffer. It will firstly decompress the raw data if necessary.
*/
private void parseDataNew() {
try {
byte[] buff = new byte[dataLength];
......@@ -221,7 +333,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
int decompress = fastDecompressor.decompress(rawData, 0, bytes, 0, compressedLen);
if (decompress != rawLen) {
throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress
+ "] is not expected [" + rawLen + "]");
+ "] is not expected [" + rawLen + "]");
}
parseRecord(bytes);
} else {
......@@ -232,26 +344,30 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Do parse record data from an array of bytes to a {@link LogMessage} and add it into {@link #recordQueue}.
*
* @param bytes An array of bytes of record data.
* @throws LogProxyClientException If exception occurs.
*/
private void parseRecord(byte[] bytes) throws LogProxyClientException {
int offset = 0;
while (offset < bytes.length) {
int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4);
LogMessage drcRecord;
LogMessage logMessage;
try {
/*
* We must copy a byte array and call parse after then,
* or got a !!!RIDICULOUS EXCEPTION!!!,
* if we wrap a upooled buffer with offset and call setByteBuf just as same as `parse` function do.
* if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do.
*/
drcRecord = new LogMessage(false);
logMessage = new LogMessage(false);
byte[] data = new byte[dataLength + 8];
System.arraycopy(bytes, offset, data, 0, data.length);
drcRecord.parse(data);
logMessage.parse(data);
if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) {
// unsupported type, ignore
logger.debug("Unsupported record type: {}", drcRecord);
logger.debug("Unsupported record type: {}", logMessage);
offset += (8 + dataLength);
continue;
}
......@@ -262,7 +378,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(drcRecord));
recordQueue.put(new StreamContext.TransferPacket(logMessage));
break;
} catch (InterruptedException e) {
// do nothing
......@@ -273,6 +389,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* Discard the bytes in buffer.
*/
protected final void discardSomeReadBytes() {
if (buffer != null && !first && buffer.refCnt() == 1) {
// discard some bytes if possible to make more room in the
......@@ -299,9 +418,14 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().writeAndFlush(generateConnectRequest(params.getProtocolVersion()));
}
/**
* Generate the request body for protocol v2.
*
* @return Request body.
*/
public ByteBuf generateConnectRequestV2() {
LogProxyProto.ClientHandshakeRequest handShake = LogProxyProto.ClientHandshakeRequest.newBuilder().
setLogType(params.getLogType().getCode()).
setLogType(params.getLogType().code()).
setIp(CLIENT_IP).
setId(params.getClientId()).
setVersion(ClientConf.VERSION).
......@@ -319,6 +443,12 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
return byteBuf;
}
/**
* Generate the request body.
*
* @param version Protocol version.
* @return Request body.
*/
public ByteBuf generateConnectRequest(ProtocolVersion version) {
if (version == ProtocolVersion.V2) {
return generateConnectRequestV2();
......@@ -331,7 +461,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
byteBuf.writeShort(ProtocolVersion.V0.code());
byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
byteBuf.writeByte(params.getLogType().getCode());
byteBuf.writeByte(params.getLogType().code());
// body
int length = CLIENT_IP.length();
......
......@@ -26,27 +26,64 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class represents a stream of log client. Stream means a channel of log transmission here.
*/
public class ClientStream {
private static final Logger logger = LoggerFactory.getLogger(ClientStream.class);
// routine
/**
* Flag of whether the stream is started.
*/
private final AtomicBoolean started = new AtomicBoolean(false);
/**
* The process thread.
*/
private Thread thread = null;
// status
/**
* Context of stream.
*/
private StreamContext context = null;
/**
* Checkpoint string used to resume writing into the queue.
*/
private String checkpointString;
// reconnection
/**
* Number of reconnections
*/
private int retryTimes = 0;
/**
* Connection to log proxy with netty channel.
*/
private Connection connection = null;
/**
* Flag of whether the stream is reconnecting now.
*/
private final AtomicBoolean reconnecting = new AtomicBoolean(true);
/**
* Flag of whether the stream need reconnect.
*/
private final AtomicBoolean reconnect = new AtomicBoolean(true);
// user callbacks
/**
* The list of {@link RecordListener}.
*/
private final List<RecordListener> listeners = new ArrayList<>();
/**
* The list of {@link StatusListener}
*/
private final List<StatusListener> statusListeners = new ArrayList<>();
/**
* Reconnect state type enumeration.
*/
private enum ReconnectState {
/**
* success
......@@ -62,10 +99,19 @@ public class ClientStream {
EXIT;
}
/**
* Sole constructor.
*
* @param connectionParams Connection params.
* @param sslContext A {@link SslContext} for encrypted communication.
*/
public ClientStream(ConnectionParams connectionParams, SslContext sslContext) {
context = new StreamContext(this, connectionParams, sslContext);
}
/**
* Close and wait the connection.
*/
public void stop() {
if (!started.compareAndSet(true, false)) {
logger.info("stopping LogProxy Client....");
......@@ -81,6 +127,9 @@ public class ClientStream {
logger.info("stopped LogProxy Client");
}
/**
* Call {@link Thread#join()} method of process thread.
*/
public void join() {
if (thread != null) {
try {
......@@ -91,10 +140,18 @@ public class ClientStream {
}
}
/**
* Call {@link #stop()} asynchronously.
*/
public void triggerStop() {
new Thread(this::stop).start();
}
/**
* Call {@link RecordListener#onException(LogProxyClientException)} asynchronously.
*
* @param e An exception.
*/
public void triggerException(LogProxyClientException e) {
// use thread make sure non-blocking
new Thread(() -> {
......@@ -104,6 +161,9 @@ public class ClientStream {
}).start();
}
/**
* Start the process thread.
*/
public void start() {
// if status listener exist, enable monitor
context.params.setEnableMonitor(CollectionUtils.isNotEmpty(statusListeners));
......@@ -182,10 +242,20 @@ public class ClientStream {
}
}
/**
* Get the flag of whether the stream is started.
*
* @return The flag of whether the stream is started.
*/
public boolean isRunning() {
return started.get();
}
/**
* Reconnect to log proxy. It is also used for first time connecting.
*
* @return A {@link ReconnectState}.
*/
private ReconnectState reconnect() {
// reconnect flag mark, tiny load for checking
if (reconnect.compareAndSet(true, false)) {
......@@ -233,6 +303,9 @@ public class ClientStream {
return ReconnectState.SUCCESS;
}
/**
* Reset the flags for reconnection.
*/
public void triggerReconnect() {
// reconnection action guard, avoid concurrent or multiple invoke
if (reconnecting.compareAndSet(false, true)) {
......@@ -240,10 +313,20 @@ public class ClientStream {
}
}
/**
* Add a {@link RecordListener} to {@link #listeners}.
*
* @param recordListener A {@link RecordListener}.
*/
public synchronized void addListener(RecordListener recordListener) {
listeners.add(recordListener);
}
/**
* Add a {@link StatusListener} to {@link #statusListeners}.
*
* @param statusListener A {@link StatusListener}.
*/
public synchronized void addStatusListener(StatusListener statusListener) {
statusListeners.add(statusListener);
}
......
......@@ -18,18 +18,35 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class represents a connection which contains a netty channel.
*/
public class Connection {
private static final Logger logger = LoggerFactory.getLogger(Connection.class);
/**
* A netty channel.
*/
private Channel channel;
/**
* A flag of whether the channel is closed.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Sole constructor.
*
* @param channel A netty channel.
*/
public Connection(Channel channel) {
this.channel = channel;
}
/**
* Close this connection.
*/
public void close() {
if (!closed.compareAndSet(false, true)) {
logger.warn("connection already closed");
......@@ -40,13 +57,19 @@ public class Connection {
channel.close().addListener(this::logCloseResult).syncUninterruptibly();
} catch (Exception e) {
logger.warn("close connection to remote address {} exception",
NetworkUtil.parseRemoteAddress(channel), e);
NetworkUtil.parseRemoteAddress(channel), e);
}
}
channel = null;
}
}
/**
* A callback that will logging the result of {@link Channel#close()}.
*
* @param future The source {@link Future} which called this callback.
*/
@SuppressWarnings("rawtypes")
private void logCloseResult(Future future) {
if (future.isSuccess()) {
if (logger.isInfoEnabled()) {
......
......@@ -25,31 +25,60 @@ import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
/**
* This is a factory class of {@link Connection}.
*/
public class ConnectionFactory {
/**
* A static class that holds the singleton instance of {@link ConnectionFactory}.
*/
private static class Singleton {
/**
* The singleton instance of {@link ConnectionFactory}.
*/
private static final ConnectionFactory INSTANCE = new ConnectionFactory();
}
/**
* Get the singleton instance of {@link ConnectionFactory}.
*
* @return The singleton instance of {@link ConnectionFactory}.
*/
public static ConnectionFactory instance() {
return Singleton.INSTANCE;
}
/**
* Sole constructor. It can only be used in {@link Singleton} class.
*/
private ConnectionFactory() {
}
/**
* Context key.
*/
public static final AttributeKey<StreamContext> CONTEXT_KEY = AttributeKey.valueOf("context");
/**
* Worker group in type of {@link EventLoopGroup}.
*/
private static final EventLoopGroup WORKER_GROUP = NettyEventLoopUtil.newEventLoopGroup(1,
new NamedThreadFactory("log-proxy-client-worker", true));
new NamedThreadFactory("log-proxy-client-worker", true));
/**
* Create a {@link Bootstrap} instance.
*
* @param sslContext The {@link SslContext} used for encrypted communication.
* @return A {@link Bootstrap} instance.
*/
private Bootstrap initBootstrap(SslContext sslContext) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(WORKER_GROUP)
.channel(NettyEventLoopUtil.getClientSocketChannelClass())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
.channel(NettyEventLoopUtil.getClientSocketChannelClass())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
......@@ -65,6 +94,13 @@ public class ConnectionFactory {
return bootstrap;
}
/**
* Create a {@link Connection} with specific {@link StreamContext}.
*
* @param context Stream context.
* @return A {@link Connection}.
* @throws LogProxyClientException If exception occurs.
*/
public Connection createConnection(StreamContext context) throws LogProxyClientException {
Bootstrap bootstrap = initBootstrap(context.getSslContext());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConf.CONNECT_TIMEOUT_MS);
......
......@@ -16,19 +16,60 @@ import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
/**
* This is a configuration class of connection parameters.
*/
public class ConnectionParams {
/**
* Log type.
*/
private final LogType logType;
/**
* Client id.
*/
private final String clientId;
/**
* Log proxy host.
*/
private final String host;
/**
* Log proxy port.
*/
private final int port;
/**
* Connection config.
*/
private final ConnectionConfig connectionConfig;
/**
* Generated configuration string.
*/
private String configurationString;
/**
* Protocol version.
*/
private ProtocolVersion protocolVersion;
/**
* Flag of whether enable monitor.
*/
private boolean enableMonitor;
/**
* Constructor.
*
* @param logType Log type.
* @param clientId Client id.
* @param host Log proxy host.
* @param port Log proxy port.
* @param connectionConfig Connection config.
*/
public ConnectionParams(LogType logType, String clientId, String host, int port, ConnectionConfig connectionConfig) {
this.logType = logType;
this.clientId = clientId;
......@@ -38,6 +79,11 @@ public class ConnectionParams {
this.configurationString = connectionConfig.generateConfigurationString();
}
/**
* Update checkpoint in connection config.
*
* @param checkpoint Checkpoint of the last record put in queue.
*/
public void updateCheckpoint(String checkpoint) {
connectionConfig.updateCheckpoint(checkpoint);
configurationString = connectionConfig.generateConfigurationString();
......@@ -48,42 +94,92 @@ public class ConnectionParams {
return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
/**
* Get the basic info of connection, which contains client id and connection config.
*
* @return A string of client id and connection config.
*/
public String info() {
return clientId + ": " + connectionConfig.toString();
}
/**
* Get the log type.
*
* @return The log type.
*/
public LogType getLogType() {
return logType;
}
/**
* Get the client id.
*
* @return The client id.
*/
public String getClientId() {
return clientId;
}
/**
* Get the host of log proxy.
*
* @return The host of log proxy.
*/
public String getHost() {
return host;
}
/**
* Get the port of log proxy.
*
* @return The port of log proxy.
*/
public int getPort() {
return port;
}
/**
* Get the generated configuration string.
*
* @return The configuration string.
*/
public String getConfigurationString() {
return configurationString;
}
/**
* Get the protocol version.
*
* @return Protocol version.
*/
public ProtocolVersion getProtocolVersion() {
return protocolVersion;
}
/**
* Set the protocol version.
*
* @param protocolVersion Protocol version.
*/
public void setProtocolVersion(ProtocolVersion protocolVersion) {
this.protocolVersion = protocolVersion;
}
/**
* Get the flag of whether enable monitor.
*
* @return The flag of whether enable monitor.
*/
public boolean isEnableMonitor() {
return enableMonitor;
}
/**
* Set the flag of whether enable monitor.
*
* @param enableMonitor The flag of whether enable monitor.
*/
public void setEnableMonitor(boolean enableMonitor) {
this.enableMonitor = enableMonitor;
}
......
......@@ -13,22 +13,54 @@ package com.oceanbase.clogproxy.client.connection;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This is a factory class for {@link ThreadFactory}.
*/
public class NamedThreadFactory implements ThreadFactory {
/**
* Pool number.
*/
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
private final boolean isDaemon;
/**
* Thread number.
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
/**
* Thread group.
*/
private final ThreadGroup group;
/**
* Prefix of thread name.
*/
private final String namePrefix;
/**
* Flag of whether the thread is daemon.
*/
private final boolean isDaemon;
/**
* Constructor with no arguments. It will take "ThreadPool" as its name.
*/
public NamedThreadFactory() {
this("ThreadPool");
}
/**
* Constructor with name.
*
* @param name Name of thread factory.
*/
public NamedThreadFactory(String name) {
this(name, false);
}
/**
* Constructor with name prefix and daemon flag.
*
* @param prefix Name prefix of thread factory.
* @param daemon A flag of whether starting the thread on daemon mode.
*/
public NamedThreadFactory(String prefix, boolean daemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
......@@ -36,11 +68,6 @@ public class NamedThreadFactory implements ThreadFactory {
isDaemon = daemon;
}
/**
* Create a thread.
*
* @see ThreadFactory#newThread(Runnable)
*/
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
......
......@@ -20,27 +20,34 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
/**
* This is a factory class of netty event loop.
*/
public class NettyEventLoopUtil {
/** check whether epoll enabled, and it would not be changed during runtime. */
private static boolean epollEnabled = Epoll.isAvailable();
/**
* Flag of whether epoll is enabled.
*/
private static final boolean EPOLL_ENABLED = Epoll.isAvailable();
/**
* Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
*
* @param nThreads number of threads
* @param threadFactory ThreadFactory
* @return an EventLoopGroup suitable for the current platform
* @param nThreads Number of threads.
* @param threadFactory ThreadFactory instance.
* @return An EventLoopGroup suitable for the current platform.
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
return EPOLL_ENABLED ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}
/**
* @return a SocketChannel class suitable for the given EventLoopGroup implementation
* Get the suitable {@link SocketChannel} for the given EventLoopGroup implementation.
*
* @return A {@link SocketChannel} class suitable for the given EventLoopGroup implementation.
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class;
}
}
......@@ -20,59 +20,140 @@ import java.util.concurrent.LinkedBlockingQueue;
import static com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.RuntimeStatus;
/**
* This class represents the context of client stream.
*/
public class StreamContext {
public static class TransferPacket {
private HeaderType type;
/**
* Packet header type.
*/
private final HeaderType type;
/**
* Log message record.
*/
private LogMessage record;
/**
* Log proxy runtime status.
*/
private RuntimeStatus status;
/**
* Constructor with a {@link LogMessage}.
*
* @param record A {@link LogMessage}.
*/
public TransferPacket(LogMessage record) {
this.type = HeaderType.DATA_CLIENT;
this.record = record;
}
/**
* Constructor with a {@link RuntimeStatus}.
*
* @param status A {@link RuntimeStatus}.
*/
public TransferPacket(RuntimeStatus status) {
this.type = HeaderType.STATUS;
this.status = status;
}
/**
* Get header type.
*
* @return Packet header type.
*/
public HeaderType getType() {
return type;
}
/**
* Get the log message record.
*
* @return Log message record.
*/
public LogMessage getRecord() {
return record;
}
/**
* Get the log proxy runtime status.
*
* @return Log proxy runtime status.
*/
public RuntimeStatus getStatus() {
return status;
}
}
/**
* Blocking queue which stores {@link TransferPacket}.
*/
private final BlockingQueue<TransferPacket> recordQueue = new LinkedBlockingQueue<>(ClientConf.TRANSFER_QUEUE_SIZE);
/**
* Client stream.
*/
private final ClientStream stream;
/**
* Connection params.
*/
ConnectionParams params;
/**
* Netty ssl context.
*
* @see SslContext
*/
private final SslContext sslContext;
/**
* Constructor of StreamContext.
*
* @param stream Client stream.
* @param params Connection params.
* @param sslContext Netty ssl context.
*/
public StreamContext(ClientStream stream, ConnectionParams params, SslContext sslContext) {
this.stream = stream;
this.params = params;
this.sslContext = sslContext;
}
/**
* Get connection params.
*
* @return Connection params.
*/
public ConnectionParams getParams() {
return params;
}
/**
* Get netty ssl context.
*
* @return Netty ssl context.
*/
public SslContext getSslContext() {
return sslContext;
}
/**
* Get the client stream.
*
* @return Client stream.
*/
public ClientStream stream() {
return stream;
}
/**
* Get the record queue.
*
* @return Record queue.
*/
public BlockingQueue<TransferPacket> recordQueue() {
return recordQueue;
}
......
......@@ -10,6 +10,9 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.constants;
/**
* The class that defines the constants that are used to identify data types.
*/
public class DataType {
public static final byte DT_UNKNOWN = 0x00;
......
......@@ -10,6 +10,9 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.enums;
/**
* Database type enumeration.
*/
public enum DBType {
MYSQL,
......
......@@ -10,66 +10,77 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.enums;
/**
* Error code enumeration.
*/
public enum ErrorCode {
////////// 0~499: process error ////////////
/**
* general error
* General error.
*/
NONE(0),
/**
* inner error
* Inner error
*/
E_INNER(1),
/**
* failed to connect
* Failed to connect.
*/
E_CONNECT(2),
/**
* exceed max retry connect count
* Exceed max retry connect count.
*/
E_MAX_RECONNECT(3),
/**
* user callback throws exception
* User callback throws exception.
*/
E_USER(4),
////////// 500~: receive data error ////////////
/**
* unknown data protocol
* Unknown data protocol.
*/
E_PROTOCOL(500),
/**
* unknown header type
* Unknown header type.
*/
E_HEADER_TYPE(501),
/**
* failed to auth
* Failed to auth.
*/
NO_AUTH(502),
/**
* unknown compress type
* Unknown compress type.
*/
E_COMPRESS_TYPE(503),
/**
* length not match
* Length not match.
*/
E_LEN(504),
/**
* failed to parse data
* Failed to parse data.
*/
E_PARSE(505);
/**
* The ordinal of this enumeration constant.
*/
int code;
/**
* Constructor.
*
* @param code The ordinal of this enumeration constant.
*/
ErrorCode(int code) {
this.code = code;
}
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
public class DRCClientException extends Exception {
private static final long serialVersionUID = 1L;
public DRCClientException() {
}
public DRCClientException(final String message) {
super(message);
}
}
\ No newline at end of file
......@@ -10,20 +10,43 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
public class DRCClientRunTimeException extends RuntimeException {
public DRCClientRunTimeException(String errMessage) {
/**
* This is a subclasses of {@link RuntimeException} primarily used in process of parsing {@link com.oceanbase.clogproxy.client.message.LogMessage}.
*/
public class LogMessageException extends RuntimeException {
/**
* Constructor with message.
*
* @param errMessage Error message.
*/
public LogMessageException(String errMessage) {
super(errMessage);
}
public DRCClientRunTimeException() {
/**
* Constructor with no arguments.
*/
public LogMessageException() {
super();
}
public DRCClientRunTimeException(Throwable cause) {
/**
* Constructor with cause.
*
* @param cause Cause error or exception.
*/
public LogMessageException(Throwable cause) {
super(cause);
}
public DRCClientRunTimeException(String errMessage, Throwable cause) {
/**
* Constructor with error message and cause.
*
* @param errMessage Error message.
* @param cause Cause error or exception.
*/
public LogMessageException(String errMessage, Throwable cause) {
super(errMessage, cause);
}
}
......@@ -12,31 +12,67 @@ package com.oceanbase.clogproxy.client.exception;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
/**
* This is a subclasses of {@link RuntimeException} used to indicate the exception occurs in client with an error code.
*/
public class LogProxyClientException extends RuntimeException {
/**
* Error code.
*/
private ErrorCode code = ErrorCode.NONE;
/**
* Constructor with error code and message.
*
* @param code Error code.
* @param message Error message.
*/
public LogProxyClientException(ErrorCode code, String message) {
super(message);
this.code = code;
}
/**
* Constructor with error code and exception.
*
* @param code Error code.
* @param exception Exception instance.
*/
public LogProxyClientException(ErrorCode code, Exception exception) {
super(exception.getMessage(), exception.getCause());
this.code = code;
}
/**
* Constructor with error code, error message and cause.
*
* @param code Error code.
* @param message Error message.
* @param throwable Cause.
*/
public LogProxyClientException(ErrorCode code, String message, Throwable throwable) {
super(message, throwable);
this.code = code;
}
/**
* Identify whether the client should stop the stream.
*
* @return The flag of whether the client should stop the stream.
*/
public boolean needStop() {
return (code == ErrorCode.E_MAX_RECONNECT) || (code == ErrorCode.E_PROTOCOL) ||
(code == ErrorCode.E_HEADER_TYPE) || (code == ErrorCode.NO_AUTH) ||
(code == ErrorCode.E_COMPRESS_TYPE) || (code == ErrorCode.E_LEN) ||
(code == ErrorCode.E_PARSE);
(code == ErrorCode.E_HEADER_TYPE) || (code == ErrorCode.NO_AUTH) ||
(code == ErrorCode.E_COMPRESS_TYPE) || (code == ErrorCode.E_LEN) ||
(code == ErrorCode.E_PARSE);
}
/**
* Get the error code.
*
* @return Error code.
*/
public ErrorCode getCode() {
return code;
}
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.fliter;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.DRCClientException;
import com.oceanbase.clogproxy.client.util.DataFilterUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DataFilter implements DataFilterBase {
/* Used to be compatibility with old version */
private String oldBranchDb;
private String filterInfo;
// String save the source filter string user passed through
private String sourceFilter;
// String save the filter that will be sent to store to acquire data
// For ob1.0, that must be four columns, dbname like a.b.
private String connectStoreFilterConditions;
private final StringBuilder builder;
private final Map<String, Map<String, List<String>>> requires;
private final Map<String, Map<String, List<String>>> dbTableColsReflectionMap;
//If all cols needed is '*', then we don't need do filter operation.
//In that case, we can save a lot compute.
private boolean isAllMatch = true;
private String tenant;
public DataFilter() {
oldBranchDb = null;
filterInfo = null;
builder = new StringBuilder();
requires = new HashMap<String, Map<String, List<String>>>();
dbTableColsReflectionMap = new HashMap<String, Map<String, List<String>>>();
}
/**
* Initialize the filter using formatted string.
* @param tenant tenant name
* @param tableFields the formatted filter information such as
* "tableName1;fieldName1;fieldName2|tableName2;fieldName1". No ";" or "|" should be
* transfer to
* *.tableName1.fieldName1|*.tableName1.fieldName2|...
* added at the beginning or end of the string.
*/
public DataFilter(String tenant, String tableFields) {
this(tableFields);
this.tenant = tenant;
}
public DataFilter(String tableFields) {
oldBranchDb = null;
builder = new StringBuilder();
requires = new HashMap<String, Map<String, List<String>>>();
dbTableColsReflectionMap = new HashMap<String, Map<String, List<String>>>();
builder.append(tableFields);
this.sourceFilter = tableFields;
}
/**
* The current version uses topic instead of dbname, so use the
* method to be compatible with the older version.
* @param db is the original branched db name.
*/
@Override
public void setBranchDb(final String db) {
oldBranchDb = db;
}
/**
* Add more filter information after initializing, note that the user should
* make it consistent to the formatted parameters.
* @param tableFields consistent formatted filter information.
*/
public void addTablesFields(String tableFields) {
builder.append(tableFields);
}
@Override
public boolean getIsAllMatch() {
return isAllMatch;
}
@Override
public Map<String, Map<String, List<String>>> getReflectionMap() {
return dbTableColsReflectionMap;
}
@Override
public Map<String, Map<String, List<String>>> getRequireMap() {
return requires;
}
//Before validate function called, toString may return null;
//Yet, user should not care about this. That's inter behavior.
@Override
public String toString() {
return connectStoreFilterConditions;
}
/**
* The validate function will form mysql, ob0.5, oracle eg filter condition.
*/
private boolean validateNormalFilterString() {
if (filterInfo != null) {
return true;
}
String s = builder.toString();
String[] tbs = s.split("\\|");
int colStart;
StringBuilder builder1 = new StringBuilder();
for (String s1 : tbs) {
String[] tb = s1.split("[;,\\.]");
if (tb.length > 0) {
String itemDb;
String itemTb;
if (tb.length <= 2) {
if (oldBranchDb != null) {
itemDb = oldBranchDb;
} else {
itemDb = "*";
}
colStart = 1;
itemTb = tb[0];
} else {
colStart = 2;
itemDb = tb[0];
itemTb = tb[1];
}
if (tenant != null) {
builder1.append(tenant).append(".");
}
builder1.append(itemDb).append(".").append(itemTb).append("|");
if (tb.length > colStart) {
List<String> cols = new ArrayList<String>();
for (int i = colStart; i < tb.length; i++) {
cols.add(tb[i]);
//here, we don't use trim in case that " *" or "* " or " * " is kind of col names
if (!"*".equals(tb[i])) {
isAllMatch = false;
}
}
DataFilterUtil.putColNames(itemDb, itemTb, cols, this);
}
}
}
if (builder1.charAt(builder1.length() - 1) == '|') {
builder1.deleteCharAt(builder1.length() - 1);
}
filterInfo = builder1.toString();
connectStoreFilterConditions = filterInfo;
return true;
}
/**
* The validate function will reform the filter condition and cols info
*/
private boolean validateOB10FilterString() {
if (sourceFilter == null) {
return false;
}
String[] tenantAndDbAndTBAndCols = sourceFilter.split("\\|");
requires.clear();
StringBuilder filterConditionBuilder = new StringBuilder();
for (String s1 : tenantAndDbAndTBAndCols) {
String[] tb = s1.split("[;,\\.]");
if (tb.length < 4) {
// tenant dbname tableName columnName is strictly required for 0b1.0
return false;
}
String tenant = tb[0];
String dbname = (oldBranchDb != null) ? oldBranchDb : tb[1];
String tableName = tb[2];
List<String> cols = new ArrayList<String>();
for (int i = 3; i < tb.length; ++i) {
cols.add(tb[i]);
if (!"*".equals(tb[i])) {
isAllMatch = false;
}
}
//format string passed to store
String formatDBName = tenant + "." + dbname;
filterConditionBuilder.append(formatDBName).append(FILTER_SEPARATOR_INNER);
filterConditionBuilder.append(tableName).append(FILTER_SEPARATOR);
DataFilterUtil.putColNames(formatDBName, tableName, cols, this);
}
connectStoreFilterConditions = filterConditionBuilder.toString();
return true;
}
// When source type is ocean base 1.0, filter's content is like tenant.dbname.tablename.colvalues| ....
@Override
public boolean validateFilter(DBType dbType) throws DRCClientException {
switch (dbType) {
case OCEANBASE1: {
return validateOB10FilterString();
}
default: {
return validateNormalFilterString();
}
}
}
@Override
public String getConnectStoreFilterConditions() {
return this.connectStoreFilterConditions;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.fliter;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.DRCClientException;
import java.util.List;
import java.util.Map;
public interface DataFilterBase {
String FILTER_SEPARATOR_INNER = ".";
String FILTER_SEPARATOR = "|";
/**
* Get the formatted filter string which will be delivered to store.
* Notice: before validate filter function called, getConnectStoreFilterConditions may return null.
* @return filter string
*/
String getConnectStoreFilterConditions();
/**
* Validate if the filter user passed is legal
* @param dbType database type which may be ob, mysql, oracle.
* For now, only ob1.0 need special handle which 4 tuple contains tenant, db, tb, cols is strictly required.
* @return true if filter is valid
* @throws DRCClientException if exception occurs
*/
boolean validateFilter(DBType dbType) throws DRCClientException;
/**
* This function is compatible for old usage.
* @param branchDb the original branched db name.
*/
void setBranchDb(String branchDb);
/**
* Fast match if cols are all needed.
* @return true if all cols are needed
*/
boolean getIsAllMatch();
Map<String, Map<String, List<String>>> getReflectionMap();
Map<String, Map<String, List<String>>> getRequireMap();
}
......@@ -12,7 +12,17 @@ package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.message.DataMessage;
/**
* This interface defined a kind of listener for field parsing.
*/
public interface FieldParseListener {
public void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception;
}
\ No newline at end of file
/**
* Handle the filed parsing result.
*
* @param prev The original field.
* @param next The field after parsing.
* @throws Exception When exception occurs.
*/
void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception;
}
......@@ -14,9 +14,22 @@ package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.message.LogMessage;
/**
* This interface defined a kind of listener for record response.
*/
public interface RecordListener {
void notify(LogMessage record);
/**
* Handle the {@link LogMessage}.
*
* @param logMessage A {@link LogMessage} instance.
*/
void notify(LogMessage logMessage);
/**
* Handle the exception.
*
* @param e An exception.
*/
void onException(LogProxyClientException e);
}
......@@ -12,6 +12,14 @@ package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
/**
* This interface defined a kind of listener for {@link LogProxyProto.RuntimeStatus} response.
*/
public interface StatusListener {
/**
* Handle the response of {@link LogProxyProto.RuntimeStatus}.
*
* @param status A {@link LogProxyProto.RuntimeStatus} response.
*/
void notify(LogProxyProto.RuntimeStatus status);
}
......@@ -10,7 +10,6 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
import com.oceanbase.clogproxy.client.config.DRCConfig;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.listener.FieldParseListener;
import com.oceanbase.clogproxy.client.util.StringUtils;
......@@ -854,25 +853,6 @@ public class DataMessage extends Message {
return records;
}
/**
* Construct the message from DataInputStream.
*
* @param reader is the DataInputStream.
* @param drcConfig DRCConfig
* @throws IOException if an I/O error occurs
*/
public void mergeFrom(final DataInputStream reader, DRCConfig drcConfig) throws IOException {
do {
Record record = new Record();
record.mergeFrom(reader);
record.setRegionId(drcConfig.getRegionId());
if (record.isEnding()) {
break;
}
records.add(record);
} while (true);
}
@Override
public void clear() {
super.clear();
......
......@@ -12,7 +12,7 @@ package com.oceanbase.clogproxy.client.message;
import com.oceanbase.clogproxy.client.constants.DataType;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.DRCClientRunTimeException;
import com.oceanbase.clogproxy.client.exception.LogMessageException;
import com.oceanbase.clogproxy.client.listener.FieldParseListener;
import com.oceanbase.clogproxy.client.util.BinaryMessageUtils;
import io.netty.buffer.ByteBuf;
......@@ -224,7 +224,7 @@ public class LogMessage extends DataMessage.Record {
try {
dbName = BinaryMessageUtils.getString(byteBuf.array(), (int) dbNameOffset, UTF8_ENCODING);
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
}
......@@ -240,7 +240,7 @@ public class LogMessage extends DataMessage.Record {
try {
tableName = BinaryMessageUtils.getString(byteBuf.array(), (int) tbNameOffset, UTF8_ENCODING);
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
}
......@@ -267,7 +267,7 @@ public class LogMessage extends DataMessage.Record {
serverId = BinaryMessageUtils.getString(byteBuf.array(), (int) instanceOffset,
DEFAULT_ENCODING);
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
}
......@@ -634,7 +634,7 @@ public class LogMessage extends DataMessage.Record {
} catch (Exception e) {
fields = null;
throw new DRCClientRunTimeException(e.getMessage(), e);
throw new LogMessageException(e.getMessage(), e);
}
return fields;
......@@ -660,7 +660,7 @@ public class LogMessage extends DataMessage.Record {
}
}
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
return primaryKeyIndexList;
}
......@@ -1014,7 +1014,7 @@ public class LogMessage extends DataMessage.Record {
return pkValues;
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
......@@ -1076,7 +1076,7 @@ public class LogMessage extends DataMessage.Record {
}
}
} catch (Exception e) {
throw new DRCClientRunTimeException(e);
throw new LogMessageException(e);
}
return tuples;
......@@ -1092,7 +1092,7 @@ public class LogMessage extends DataMessage.Record {
}
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
......@@ -1157,7 +1157,7 @@ public class LogMessage extends DataMessage.Record {
}
}
} catch (Exception e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
return uniqueKeyList;
}
......@@ -1234,7 +1234,7 @@ public class LogMessage extends DataMessage.Record {
return BinaryMessageUtils.getString(byteBuf.array(), (int) encoding,
DEFAULT_ENCODING);
} catch (UnsupportedEncodingException e) {
throw new DRCClientRunTimeException(e.getMessage(), e.getCause());
throw new LogMessageException(e.getMessage(), e.getCause());
}
}
......
......@@ -21,19 +21,24 @@ import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
/**
* Utils class for binary message.
*/
public class BinaryMessageUtils {
/**
*
*/
private static final int PREFIX_LENGTH = 12;
/**
* get string begin with offset
* Get string begin with offset.
*
* @param data bytes array
* @param offset read offset
* @param encoding string encoding
* @return result string
* @throws UnsupportedEncodingException when the encoding is not supported
* @param data A bytes array.
* @param offset Reading offset.
* @param encoding String encoding.
* @return Result string.
* @throws UnsupportedEncodingException When the encoding is not supported.
*/
public static String getString(byte[] data, int offset, String encoding) throws UnsupportedEncodingException {
ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN);
......@@ -47,12 +52,12 @@ public class BinaryMessageUtils {
}
/**
* get list begin with offset
* Get list begin with offset.
*
* @param data bytes array
* @param offset read offset
* @return result list
* @throws IOException if data type is unsigned long
* @param data A bytes array.
* @param offset Reading offset.
* @return Result list.
* @throws IOException If data type is unsigned long.
*/
public static List getArray(byte[] data, int offset) throws IOException {
ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN);
......@@ -107,11 +112,11 @@ public class BinaryMessageUtils {
}
/**
* get ByteString begin with offset
* Get ByteString begin with offset.
*
* @param data bytes array
* @param offset read offset
* @return list of ByteString
* @param data A bytes array.
* @param offset Reading offset.
* @return A list of {@link ByteString}.
*/
public static List<ByteString> getByteStringList(byte[] data, long offset) {
if (offset == -1) {
......@@ -138,8 +143,8 @@ public class BinaryMessageUtils {
lists.add(null);
} else {
lists.add(new ByteString(wrapByteBuf.array(),
PREFIX_LENGTH + currentOffset + readBytes + (int) offset,
nextOffset - currentOffset - 1));
PREFIX_LENGTH + currentOffset + readBytes + (int) offset,
nextOffset - currentOffset - 1));
}
currentOffset = nextOffset;
}
......
......@@ -14,18 +14,25 @@ import com.oceanbase.clogproxy.common.util.NetworkUtil;
import java.lang.management.ManagementFactory;
/**
* The class used to generate client id.
*/
public class ClientIdGenerator {
/**
* LocalIP_PID_currentTimestamp
* pattern may be change, never depend on the content of this
* @return client id string
* Generate a new client id in format "LocalIP"."PID"."currentTimestamp".
* Pattern may be changed, never depend on the content of this.
*
* @return Client id string.
*/
public static String generate() {
return NetworkUtil.getLocalIp() + "_" + getProcessId() + "_" + (System.currentTimeMillis() / 1000);
}
/**
* Get the process id.
*
* @return Process id.
*/
private static String getProcessId() {
// Note: may fail in some JVM implementations
// therefore fallback has to be provided
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import com.oceanbase.clogproxy.client.fliter.DataFilterBase;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DataFilterUtil {
/**
*
* @param db db name
* @param tb table name
* @param cols column names
* @param dataFilterBase DataFilterBase
*/
public static void putColNames(String db, String tb, List<String> cols,
DataFilterBase dataFilterBase) {
if (tb == null) {
return;
}
Map<String, Map<String, List<String>>> dbAndTablePair = dataFilterBase.getRequireMap();
boolean founded = false;
for (Map.Entry<String, Map<String, List<String>>> dbEntry : dbAndTablePair.entrySet()) {
if (db == null || db.equalsIgnoreCase(dbEntry.getKey())) {
for (Map.Entry<String, List<String>> entry : dbEntry.getValue().entrySet()) {
if (tb.equalsIgnoreCase(entry.getKey())) {
founded = true;
entry.getValue().addAll(cols);
}
}
if (!founded) {
// db is already in the filter, but the table is not, so add the table
Map<String, List<String>> tabMap = dbEntry.getValue();
tabMap.put(tb, cols);
founded = true;
}
}
}
if (!founded) {
// db is not in the filter, so add two maps
Map<String, List<String>> tabMap = new HashMap<String, List<String>>();
tabMap.put(tb, cols);
dbAndTablePair.put(db, tabMap);
}
}
/**
* Use the give db and tb name to retrieve cols list
* @param db db name
* @param tb table name
* @param dataFilterBase DataFilterBase
* @return cols reference to corresponded db name and table name
* Note: this function get cols from map in old DataFilter implementation
*/
public static List<String> getColNamesWithMapping(String db, String tb,
DataFilterBase dataFilterBase) {
if (tb == null) {
return null;
}
Map<String, Map<String, List<String>>> dbAndTablePair = dataFilterBase.getReflectionMap();
Map<String, List<String>> tableAndCols = dbAndTablePair.get(db);
if (tableAndCols == null) {
//if we don't find tableAndCols, that mean this dbName appears for the first time,
//and we use getColNames to require the missing cols and update map;
tableAndCols = new HashMap<String, List<String>>();
List<String> cols = getColNames(db, tb, dataFilterBase);
tableAndCols.put(tb, cols);
dbAndTablePair.put(db, tableAndCols);
return cols;
} else {
List<String> needCols = tableAndCols.get(tb);
//we propose the cols can't be null, so we use null to determinate whether the cols we
//needed has existed in the map
if (needCols == null) {
//the cols we needed is missing ,use getColNames to require the missing cols
List<String> cols = getColNames(db, tb, dataFilterBase);
tableAndCols.put(tb, cols);
return cols;
} else {
//the cols existed, just return the value.
return needCols;
}
}
}
/**
* Use the give db and tb name to retrieve cols list
* @param db db name
* @param tb table name
* @param dataFilterBase DataFilterBase
* @return cols reference to corresponded db name and table name
*/
public static List<String> getColNames(String db, String tb, DataFilterBase dataFilterBase) {
if (tb == null) {
return null;
}
Map<String, Map<String, List<String>>> requireMap = dataFilterBase.getRequireMap();
for (Map.Entry<String, Map<String, List<String>>> dbEntry : requireMap.entrySet()) {
StringBuffer buf = new StringBuffer(dbEntry.getKey());
processStringToRegularExpress(buf);
if (db == null || db.toLowerCase().matches(buf.toString().toLowerCase())) {
for (Map.Entry<String, List<String>> entry : dbEntry.getValue().entrySet()) {
buf = new StringBuffer(entry.getKey());
processStringToRegularExpress(buf);
if (tb.toLowerCase().matches(buf.toString().toLowerCase())) {
return entry.getValue();
}
}
}
}
return null;
}
/**
* This function will first replace all "." to "\.", then replace all "*" to ".*"
*
* @param stringBuffer original string buffer
*/
public static void processStringToRegularExpress(StringBuffer stringBuffer) {
int index;
int beginIndex = 0;
while (-1 != (index = stringBuffer.indexOf(".", beginIndex))) {
stringBuffer.insert(index, '\\');
beginIndex = index + 2;
}
beginIndex = 0;
while (-1 != (index = stringBuffer.indexOf("*", beginIndex))) {
stringBuffer.insert(index, '.');
beginIndex = index + 2;
}
}
/**
* Judge if the given col name exists in col lists
* @param col col to be judged
* @param s cols list
* @return true if exists in, else false
*/
public static boolean isColInArray(final String col, final List<String> s) {
for (int i = 0; i < s.size(); i++) {
if ("*".equals(s.get(i)) || col.equalsIgnoreCase(s.get(i))) {
return true;
}
}
return false;
}
}
......@@ -20,27 +20,34 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
/**
* Utils class for netty.
*/
public class NettyEventLoopUtil {
/** check whether epoll enabled, and it would not be changed during runtime. */
private static boolean epollEnabled = Epoll.isAvailable();
/**
* Check whether epoll enabled, and it would not be changed during runtime.
*/
private static final boolean EPOLL_ENABLED = Epoll.isAvailable();
/**
* Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
* Create a new {@link EventLoopGroup} according to current platform and system property, fallback to NIO when epoll not enabled.
*
* @param nThreads number of threads
* @param threadFactory ThreadFactory
* @return an EventLoopGroup suitable for the current platform
* @param nThreads Number of threads.
* @param threadFactory A {@link ThreadFactory} instance.
* @return An {@link EventLoopGroup} instance.
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
return EPOLL_ENABLED ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}
/**
* @return a SocketChannel class suitable for the given EventLoopGroup implementation
* Get the suitable {@link SocketChannel} class according to current platform and system property.
*
* @return A {@link SocketChannel} implementation class.
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
return EPOLL_ENABLED ? EpollSocketChannel.class : NioSocketChannel.class;
}
}
......@@ -13,14 +13,17 @@ package com.oceanbase.clogproxy.client.util;
import java.util.ArrayList;
import java.util.List;
/**
* Utils class for string.
*/
public class StringUtils {
/**
* Split a string by one separator character. The performance
* is better than Java String split.
*
* @param str is the string need be split.
* @param separatorChar the single separator character.
* @return the array of split items.
*
* @param str The string need be split.
* @param separatorChar The single separator character.
* @return The array of split items.
*/
public static String[] split(String str, char separatorChar) {
if (str == null) {
......@@ -57,6 +60,6 @@ public class StringUtils {
list.add(str.substring(start, i));
}
return (String[]) list.toArray(new String[list.size()]);
return list.toArray(new String[list.size()]);
}
}
......@@ -12,29 +12,56 @@ package com.oceanbase.clogproxy.client.util;
import java.util.Map;
/**
* Utils class used to validate arguments.
*/
public class Validator {
private static final int MINIMAL_VALID_PORT = 1;
private static final int MAXIMAL_VALID_PORT = 65535;
/**
* Validate the object is not null, otherwise throws an {@link NullPointerException}.
*
* @param obj Object to be verified.
* @param message Message in the NullPointerException.
*/
public static void notNull(Object obj, String message) {
if (obj == null) {
throw new NullPointerException(message);
}
}
/**
* Validate the port number is valid, otherwise throws an {@link IllegalArgumentException}.
*
* @param port Port number to be verified.
* @param message Message in the IllegalArgumentException.
*/
public static void validatePort(int port, String message) {
if (port < MINIMAL_VALID_PORT || port >= MAXIMAL_VALID_PORT) {
throw new IllegalArgumentException(message);
}
}
/**
* Validate the string is not null or empty, otherwise throws an {@link IllegalArgumentException}.
*
* @param val String to be verified.
* @param message Message in the IllegalArgumentException.
*/
public static void notEmpty(String val, String message) {
if (val == null || val.isEmpty()) {
throw new IllegalArgumentException(message);
}
}
/**
* Validate the map is not null or empty, otherwise throws an {@link IllegalArgumentException}.
*
* @param map Map to be verified.
* @param message Message in the IllegalArgumentException.
*/
public static void notEmpty(Map<String, String> map, String message) {
if (map == null || map.isEmpty()) {
throw new IllegalArgumentException(message);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册