未验证 提交 ad186a5d 编写于 作者: H He Wang

Initial commit

上级
.idea/
*.iml
*.ipr
*.iws
*/target/
target
Legal Disclaimer
Within this source code, the comments in Chinese shall be the original, governing version. Any comment in other languages are for reference only. In the event of any conflict between the Chinese language version comments and other language version comments, the Chinese language version shall prevail.
法律免责声明
关于代码注释部分,中文注释为官方版本,其它语言注释仅做参考。中文注释可能与其它语言注释存在不一致,当中文注释与其它语言注释存在不一致时,请以中文注释为准。
\ No newline at end of file
木兰宽松许可证, 第2版
2020年1月 http://license.coscl.org.cn/MulanPSL2
您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束:
0. 定义
“软件” 是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。
“贡献” 是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。
“贡献者” 是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。
“法人实体” 是指提交贡献的机构及其“关联实体”。
“关联实体” 是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。
1. 授予版权许可
每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。
2. 授予专利许可
每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。
3. 无商标许可
“本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。
4. 分发限制
您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。
5. 免责声明与责任限制
“软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。
6. 语言
“本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。
条款结束
Mulan Permissive Software License,Version 2 (Mulan PSL v2)
January 2020 http://license.coscl.org.cn/MulanPSL2
Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions:
0. Definition
Software means the program and related documents which are licensed under this License and comprise all Contribution(s).
Contribution means the copyrightable work licensed by a particular Contributor under this License.
Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License.
Legal Entity means the entity making a Contribution and all its Affiliates.
Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity.
1. Grant of Copyright License
Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not.
2. Grant of Patent License
Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken.
3. No Trademark License
No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in section 4.
4. Distribution Restriction
You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software.
5. Disclaimer of Warranty and Limitation of Liability
THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
6. Language
THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL.
END OF THE TERMS AND CONDITIONS
OceanBase Log Client
---------------
OceanBase Log Client is a Java client for [oblogproxy](https://github.com/oceanbase/oblogproxy). It uses [netty](https://github.com/netty/netty) to connect to the log proxy server and receive the incremental change log in real time.
Communication
---------------
* [Official Q&A Website (Chinese)](https://open.oceanbase.com/answer) (Q&A, Ideas, General discussion)
* [GitHub Issues](https://github.com/oceanbase/oblogclient/issues) (Bug reports, feature requests)
* DingTalk Group (chat): 33254054
Binaries/Download
----------------
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.
Releases are available in the Maven Central repository. Take also a look at the [Releases](https://github.com/oceanbase/oblogclient/releases).
Example for Maven:
```xml
<dependency>
<groupId>com.oceanbase.logproxy.client</groupId>
<artifactId>client</artifactId>
<version>x.y.z</version>
</dependency>
```
If you'd rather like the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
```xml
<dependency>
<groupId>com.oceanbase.logproxy.client</groupId>
<artifactId>client</artifactId>
<version>x.y.z-SNAPSHOT</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```
Usage
-----------
Basic usage
```java
ObReaderConfig config = new ObReaderConfig();
// set root server list in format [ip:rpc_port:sql_port]
config.setRsList("127.0.0.1:2882:2881");
// username and password
config.setUsername("root@sys");
config.setPassword("root@sys");
// timestamp of start point, zero means starting from now
config.setStartTimestamp(0L);
// whitelist in format [tenant.db.table]
config.setTableWhiteList("sys.*.*");
// create a client
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config);
// add handler
client.addListener(new RecordListener() {
@Override
public void notify(LogMessage message){
// process
}
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
// handle error and stop client
client.stop();
}
}
});
// start and wait
client.start();
client.join();
```
Use [SslContext](https://netty.io/4.1/api/io/netty/handler/ssl/SslContext.html) to encrypt communication between log client and log proxy.
```java
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslContext.defaultClientProvider())
.trustManager(this.getClass().getClassLoader().getResourceAsStream("server.crt"))
.keyManager(this.getClass().getClassLoader().getResourceAsStream("client.crt"),
this.getClass().getClassLoader().getResourceAsStream("client.key"))
.build();
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, sslContext);
```
Once the client is successfully started, you should be able to receive `HEARTBEAT` and other messages in the notify method.
License
-------
Mulan Permissive Software License, Version 2 (Mulan PSL v2). See the [LICENSE](LICENCE) file for details.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.oceanbase.logproxy.client</groupId>
<artifactId>logproxy-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>client</artifactId>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>com.oceanbase.logproxy.client</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client;
import com.oceanbase.clogproxy.client.config.AbstractConnectionConfig;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.ClientStream;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.client.util.Validator;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import io.netty.handler.ssl.SslContext;
public class LogProxyClient {
private final ClientStream stream;
/**
* @param host server hostname name or ip
* @param port server port
* @param config real config object according to what-you-expected
* @param sslContext ssl context to create netty handler
*/
public LogProxyClient(String host, int port, AbstractConnectionConfig config, SslContext sslContext) {
Validator.notNull(config.getLogType(), "log type cannot be null");
Validator.notNull(host, "server cannot be null");
Validator.validatePort(port, "port is not valid");
String clientId = ClientConf.USER_DEFINED_CLIENTID.isEmpty() ? ClientIdGenerator.generate() : ClientConf.USER_DEFINED_CLIENTID;
ConnectionParams connectionParams = new ConnectionParams(config.getLogType(), clientId, host, port, config);
connectionParams.setProtocolVersion(ProtocolVersion.V2);
this.stream = new ClientStream(connectionParams, sslContext);
}
public LogProxyClient(String host, int port, AbstractConnectionConfig config) {
this(host, port, config, null);
}
public void start() {
stream.start();
}
public void stop() {
stream.stop();
}
public void join() {
stream.join();
}
public synchronized void addListener(RecordListener recordListener) {
stream.addListener(recordListener);
}
public synchronized void addStatusListener(StatusListener statusListener) {
stream.addStatusListener(statusListener);
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.util.TypeTrait;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
public abstract class AbstractConnectionConfig implements ConnectionConfig {
/**
* defined structure configurations
*/
protected static Map<String, ConfigItem<Object>> configs = new HashMap<>();
/**
* extra configurations
*/
protected final Map<String, String> extraConfigs = new HashMap<>();
@SuppressWarnings("unchecked")
protected static class ConfigItem<T> {
protected String key;
protected T val;
public ConfigItem(String key, T val) {
this.key = key;
this.val = val;
configs.put(key, (ConfigItem<Object>) this);
}
public void set(T val) {
this.val = val;
}
public void fromString(String val) {
this.val = TypeTrait.fromString(val, this.val.getClass());
}
@Override
public String toString() {
return val.toString();
}
}
public AbstractConnectionConfig(Map<String, String> allConfigs) {
if (allConfigs != null) {
for (Entry<String, String> entry : allConfigs.entrySet()) {
if (!configs.containsKey(entry.getKey())) {
extraConfigs.put(entry.getKey(), entry.getValue());
} else {
set(entry.getKey(), entry.getValue());
}
}
}
}
public abstract LogType getLogType();
public void setExtraConfigs(Map<String, String> extraConfigs) {
this.extraConfigs.putAll(extraConfigs);
}
void set(String key, String val) {
ConfigItem<Object> cs = configs.get(key);
if (cs != null) {
cs.fromString(val);
}
}
/**
* validate if defined configurations
*
* @return True or False
*/
public abstract boolean valid();
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.common.config.ShareConf;
public class ClientConf extends ShareConf {
public static final String VERSION = "1.1.0";
public static int TRANSFER_QUEUE_SIZE = 20000;
public static int CONNECT_TIMEOUT_MS = 5000;
public static int READ_WAIT_TIME_MS = 2000;
public static int RETRY_INTERVAL_S = 2;
/**
* max retry time after disconnect, if not data income lasting IDLE_TIMEOUT_S, a reconnect we be trigger
*/
public static int MAX_RECONNECT_TIMES = -1;
public static int IDLE_TIMEOUT_S = 15;
public static int NETTY_DISCARD_AFTER_READS = 16;
/**
* set user defined userid,
* for inner use only
*/
public static String USER_DEFINED_CLIENTID = "";
/**
* ignore unknown or unsupported record type with a warning log instead throwing an exception
*/
public static boolean IGNORE_UNKNOWN_RECORD_TYPE = false;
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
public interface ConnectionConfig {
String generateConfigurationString();
void updateCheckpoint(String checkpoint);
String toString();
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.config;
import com.google.common.collect.Maps;
import com.oceanbase.clogproxy.client.util.Validator;
import com.oceanbase.clogproxy.common.config.ShareConf;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.util.CryptoUtil;
import com.oceanbase.clogproxy.common.util.Hex;
import java.util.Map;
public class ObReaderConfig extends AbstractConnectionConfig {
private static final ConfigItem<String> RS_LIST = new ConfigItem<>("rootserver_list", "");
private static final ConfigItem<String> CLUSTER_USER = new ConfigItem<>("cluster_user", "");
private static final ConfigItem<String> CLUSTER_PASSWORD = new ConfigItem<>("cluster_password", "");
private static final ConfigItem<String> TABLE_WHITE_LIST = new ConfigItem<>("tb_white_list", "");
private static final ConfigItem<Long> START_TIMESTAMP = new ConfigItem<>("first_start_timestamp", 0L);
public ObReaderConfig() {
super(Maps.newHashMap());
}
public ObReaderConfig(Map<String, String> allConfigs) {
super(allConfigs);
}
@Override
public LogType getLogType() {
return LogType.OCEANBASE;
}
@Override
public boolean valid() {
try {
Validator.notEmpty(RS_LIST.val, "invalid rsList");
Validator.notEmpty(CLUSTER_USER.val, "invalid clusterUser");
Validator.notEmpty(CLUSTER_PASSWORD.val, "invalid clusterPassword");
if (START_TIMESTAMP.val < 0L) {
throw new IllegalArgumentException("invalid startTimestamp");
}
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
@Override
public String generateConfigurationString() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString();
if (CLUSTER_PASSWORD.key.equals(entry.getKey()) && ShareConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
sb.append(entry.getKey()).append("=").append(value).append(" ");
}
for (Map.Entry<String, String> entry : extraConfigs.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
}
return sb.toString();
}
@Override
public void updateCheckpoint(String checkpoint) {
try {
START_TIMESTAMP.set(Long.parseLong(checkpoint));
} catch (NumberFormatException e) {
// do nothing
}
}
@Override
public String toString() {
return "rootserver_list=" + RS_LIST + ", cluster_user=" + CLUSTER_USER + ", cluster_password=******, " +
"tb_white_list=" + TABLE_WHITE_LIST + ", start_timestamp=" + START_TIMESTAMP;
}
/**
* 设置管控服务列表
*
* @param rsList 管控服务列表
*/
public void setRsList(String rsList) {
RS_LIST.set(rsList);
}
/**
* 设置连接OB用户名
*
* @param clusterUser 用户名
*/
public void setUsername(String clusterUser) {
CLUSTER_USER.set(clusterUser);
}
/**
* 设置连接OB密码
*
* @param clusterPassword 密码
*/
public void setPassword(String clusterPassword) {
CLUSTER_PASSWORD.set(clusterPassword);
}
/**
* 配置过滤规则,由租户.库.表3个维度组成,每一段 * 表示任意,如:A.foo.bar,B.foo.*,C.*.*,*.*.*
*
* @param tableWhiteList 监听表的过滤规则
*/
public void setTableWhiteList(String tableWhiteList) {
TABLE_WHITE_LIST.set(tableWhiteList);
}
/**
* 设置起始订阅的 UNIX时间戳,0表示从当前,通常不要早于1小时
*
* @param startTimestamp 起始时间戳
*/
public void setStartTimestamp(Long startTimestamp) {
START_TIMESTAMP.set(startTimestamp);
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
import io.netty.handler.timeout.IdleStateEvent;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private static final byte[] MAGIC_STRING = new byte[]{'x', 'i', '5', '3', 'g', ']', 'q'};
private static final String CLIENT_IP = NetworkUtil.getLocalIp();
private static final int HEAD_LENGTH = 7;
private ClientStream stream;
private ConnectionParams params;
private BlockingQueue<StreamContext.TransferPacket> recordQueue;
enum HandshakeStateV1 {
PB_HEAD,
CLIENT_HANDSHAKE_RESPONSE,
RECORD,
ERROR_RESPONSE,
STATUS
}
private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD;
private final Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR;
ByteBuf buffer;
private boolean poolFlag = true;
private boolean first;
private int numReads = 0;
private boolean dataNotEnough = false;
private int dataLength = 0;
LZ4Factory factory = LZ4Factory.fastestInstance();
LZ4FastDecompressor fastDecompressor = factory.fastDecompressor();
public ClientHandler() { }
protected void resetState() {
state = HandshakeStateV1.PB_HEAD;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
dataNotEnough = false;
ByteBuf data = (ByteBuf) msg;
first = buffer == null;
if (first) {
buffer = data;
} else {
buffer = cumulator.cumulate(ctx.alloc(), buffer, data);
}
} else if (msg instanceof IdleStateEvent) {
if (stream != null) {
stream.triggerReconnect();
}
return;
} else {
return;
}
while (poolFlag && buffer.isReadable() && !dataNotEnough) {
switch (state) {
case PB_HEAD:
handleHeader();
break;
case CLIENT_HANDSHAKE_RESPONSE:
handleHandshakeResponse();
break;
case ERROR_RESPONSE:
handleErrorResponse();
break;
case STATUS:
handleServerStatus();
break;
case RECORD:
handleRecord();
break;
}
}
if (buffer != null && !buffer.isReadable()) {
numReads = 0;
buffer.release();
buffer = null;
} else if (++numReads >= ClientConf.NETTY_DISCARD_AFTER_READS) {
numReads = 0;
discardSomeReadBytes();
}
}
private void handleHeader() {
if (buffer.readableBytes() >= HEAD_LENGTH) {
int version = buffer.readShort();
int type = buffer.readByte();
dataLength = buffer.readInt();
checkHeader(version, type, dataLength);
HeaderType headerType = HeaderType.codeOf(type);
if(headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE;
} else if(headerType == HeaderType.ERROR_RESPONSE) {
state = HandshakeStateV1.ERROR_RESPONSE;
} else if(headerType == HeaderType.DATA_CLIENT) {
state = HandshakeStateV1.RECORD;
} else if(headerType == HeaderType.STATUS) {
state = HandshakeStateV1.STATUS;
}
} else {
dataNotEnough = true;
}
}
private void handleHandshakeResponse() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.ClientHandshakeResponse response = LogProxyProto.ClientHandshakeResponse.parseFrom(bytes);
logger.info("Connected to LogProxyServer, ip:{}, version:{}", response.getIp(), response.getVersion());
state = HandshakeStateV1.PB_HEAD;
} else {
dataNotEnough = true;
}
}
private void handleErrorResponse() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.ErrorResponse response = LogProxyProto.ErrorResponse.parseFrom(bytes);
logger.error("LogProxy refused handshake request: {}", response.toString());
throw new LogProxyClientException(ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + response.toString());
} else {
dataNotEnough = true;
}
}
private void handleServerStatus() throws InvalidProtocolBufferException {
if(buffer.readableBytes() >= dataLength) {
byte[] bytes = new byte[dataLength];
buffer.readBytes(bytes);
LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes);
logger.debug("server status: {}", response.toString());
state = HandshakeStateV1.PB_HEAD;
} else {
dataNotEnough = true;
}
}
private void handleRecord() {
if(buffer.readableBytes() >= dataLength) {
parseDataNew();
state = HandshakeStateV1.PB_HEAD;
} else {
dataNotEnough = true;
}
}
private void checkHeader(int version, int type, int length) {
if (ProtocolVersion.codeOf(version) == null) {
logger.error("unsupported protocol version: {}", version);
throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupported protocol version: " + version);
}
if (HeaderType.codeOf(type) == null) {
logger.error("unsupported header type: {}", type);
throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unsupported header type: " + type);
}
if (length <= 0) {
logger.error("data length equals 0");
throw new LogProxyClientException(ErrorCode.E_LEN, "data length equals 0");
}
}
private void parseDataNew() {
try {
byte[] buff = new byte[dataLength];
buffer.readBytes(buff, 0, dataLength);
LogProxyProto.RecordData recordData = LogProxyProto.RecordData.parseFrom(buff);
int compressType = recordData.getCompressType();
int compressedLen = recordData.getCompressedLen();
int rawLen = recordData.getRawLen();
byte[] rawData = recordData.getRecords().toByteArray();
if (compressType == CompressType.LZ4.code()) {
byte[] bytes = new byte[compressedLen];
int decompress = fastDecompressor.decompress(rawData, 0, bytes, 0, compressedLen);
if (decompress != rawLen) {
throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress
+ "] is not expected [" + rawLen + "]");
}
parseRecord(bytes);
} else {
parseRecord(rawData);
}
} catch (InvalidProtocolBufferException e) {
throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", e);
}
}
private void parseRecord(byte[] bytes) throws LogProxyClientException {
int offset = 0;
while (offset < bytes.length) {
int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4);
LogMessage drcRecord;
try {
/*
* We must copy a byte array and call parse after then,
* or got a !!!RIDICULOUS EXCEPTION!!!,
* if we wrap a upooled buffer with offset and call setByteBuf just as same as `parse` function do.
*/
drcRecord = new LogMessage(false);
byte[] data = new byte[dataLength + 8];
System.arraycopy(bytes, offset, data, 0, data.length);
drcRecord.parse(data);
if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) {
// unsupported type, ignore
logger.debug("Unsupported record type: {}", drcRecord);
offset += (8 + dataLength);
continue;
}
} catch (Exception e) {
throw new LogProxyClientException(ErrorCode.E_PARSE, e);
}
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(drcRecord));
break;
} catch (InterruptedException e) {
// do nothing
}
}
offset += (8 + dataLength);
}
}
protected final void discardSomeReadBytes() {
if (buffer != null && !first && buffer.refCnt() == 1) {
// discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have
// used slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer.discardSomeReadBytes();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
poolFlag = true;
StreamContext context = ctx.channel().attr(ConnectionFactory.CONTEXT_KEY).get();
stream = context.stream();
params = context.getParams();
recordQueue = context.recordQueue();
logger.info("ClientId: {} connecting LogProxy: {}", params.info(), NetworkUtil.parseRemoteAddress(ctx.channel()));
ctx.channel().writeAndFlush(generateConnectRequest(params.getProtocolVersion()));
}
public ByteBuf generateConnectRequestV2() {
LogProxyProto.ClientHandshakeRequest handShake = LogProxyProto.ClientHandshakeRequest.newBuilder().
setLogType(params.getLogType().getCode()).
setIp(CLIENT_IP).
setId(params.getClientId()).
setVersion(ClientConf.VERSION).
setEnableMonitor(params.isEnableMonitor()).
setConfiguration(params.getConfigurationString()).
build();
byte[] packetBytes = handShake.toByteArray();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length + 2 + 1 + 4 + packetBytes.length);
byteBuf.writeBytes(MAGIC_STRING);
byteBuf.writeShort(ProtocolVersion.V2.code());
byteBuf.writeByte(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
byteBuf.writeInt(packetBytes.length);
byteBuf.writeBytes(packetBytes);
return byteBuf;
}
public ByteBuf generateConnectRequest(ProtocolVersion version) {
if (version == ProtocolVersion.V2) {
return generateConnectRequestV2();
}
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
byteBuf.writeBytes(MAGIC_STRING);
// header
byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
byteBuf.writeShort(ProtocolVersion.V0.code());
byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
byteBuf.writeByte(params.getLogType().getCode());
// body
int length = CLIENT_IP.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(CLIENT_IP.getBytes());
length = params.getClientId().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getClientId().getBytes());
length = ClientConf.VERSION.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(ClientConf.VERSION.getBytes());
length = params.getConfigurationString().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getConfigurationString().getBytes());
return byteBuf;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
poolFlag = false;
logger.info("Connect broken of ClientId: {} with LogProxy: {}", params.info(), NetworkUtil.parseRemoteAddress(ctx.channel()));
ctx.channel().disconnect();
ctx.close();
if (stream != null) {
stream.triggerReconnect();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
poolFlag = false;
resetState();
logger.error("Exception occurred ClientId: {}, with LogProxy: {}", params.info(), NetworkUtil.parseRemoteAddress(ctx.channel()), cause);
ctx.channel().disconnect();
ctx.close();
if (stream != null) {
if (cause instanceof LogProxyClientException) {
if (((LogProxyClientException) cause).needStop()) {
stream.stop();
stream.triggerException((LogProxyClientException) cause);
}
} else {
stream.triggerReconnect();
}
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import io.netty.handler.ssl.SslContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ClientStream {
private static final Logger logger = LoggerFactory.getLogger(ClientStream.class);
// routine
private final AtomicBoolean started = new AtomicBoolean(false);
private Thread thread = null;
// status
private StreamContext context = null;
private String checkpointString;
// reconnection
private int retryTimes = 0;
private Connection connection = null;
private final AtomicBoolean reconnecting = new AtomicBoolean(true);
private final AtomicBoolean reconnect = new AtomicBoolean(true);
// user callbacks
private final List<RecordListener> listeners = new ArrayList<>();
private final List<StatusListener> statusListeners = new ArrayList<>();
private enum ReconnectState {
/**
* success
*/
SUCCESS,
/**
* retry connect next round
*/
RETRY,
/**
* failed, exit thread
*/
EXIT;
}
public ClientStream(ConnectionParams connectionParams, SslContext sslContext) {
context = new StreamContext(this, connectionParams, sslContext);
}
public void stop() {
if (!started.compareAndSet(true, false)) {
logger.info("stopping LogProxy Client....");
if (connection != null) {
connection.close();
connection = null;
}
join();
thread = null;
}
logger.info("stopped LogProxy Client");
}
public void join() {
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
logger.warn("exception occurred when join LogProxy exit: ", e);
}
}
}
public void triggerStop() {
new Thread(this::stop).start();
}
public void triggerException(LogProxyClientException e) {
// use thread make sure non-blocking
new Thread(() -> {
for (RecordListener listener : listeners) {
listener.onException(e);
}
}).start();
}
public void start() {
// if status listener exist, enable monitor
context.params.setEnableMonitor(CollectionUtils.isNotEmpty(statusListeners));
if (started.compareAndSet(false, true)) {
thread = new Thread(() -> {
while (isRunning()) {
ReconnectState state = reconnect();
if (state == ReconnectState.EXIT) {
logger.error("read thread to exit");
triggerException(new LogProxyClientException(ErrorCode.E_MAX_RECONNECT, "exceed max reconnect retry"));
break;
}
if (state == ReconnectState.RETRY) {
try {
TimeUnit.SECONDS.sleep(ClientConf.RETRY_INTERVAL_S);
} catch (InterruptedException e) {
// do nothing
}
continue;
}
StreamContext.TransferPacket packet;
while (true) {
try {
packet = context.recordQueue().poll(ClientConf.READ_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException e) {
// do nothing
}
}
if (packet == null) {
continue;
}
try {
switch (packet.getType()) {
case DATA_CLIENT:
for (RecordListener listener : listeners) {
listener.notify(packet.getRecord());
}
break;
case STATUS:
for (StatusListener listener : statusListeners) {
listener.notify(packet.getStatus());
}
break;
default:
throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "Unsupported Packet Type: " + packet.getType());
}
} catch (LogProxyClientException e) {
triggerStop();
triggerException(e);
return;
} catch (Exception e) {
// if exception occured, we exit
triggerStop();
triggerException(new LogProxyClientException(ErrorCode.E_USER, e));
return;
}
}
started.set(false);
if (connection != null) {
connection.close();
}
thread = null;
// TODO... if exception occurred, run handler callback
logger.warn("!!! read thread exit !!!");
});
thread.setDaemon(false);
thread.start();
}
}
public boolean isRunning() {
return started.get();
}
private ReconnectState reconnect() {
// reconnect flag mark, tiny load for checking
if (reconnect.compareAndSet(true, false)) {
logger.warn("start to reconnect...");
try {
if (ClientConf.MAX_RECONNECT_TIMES != -1 && retryTimes >= ClientConf.MAX_RECONNECT_TIMES) {
logger.error("failed to reconnect, exceed max reconnect retry time: {}", ClientConf.MAX_RECONNECT_TIMES);
reconnect.set(true);
return ReconnectState.EXIT;
}
if (connection != null) {
connection.close();
connection = null;
}
// when stopped, context.recordQueue may not empty, just use checkpointString to do reconnection.
if (StringUtils.isNotEmpty(checkpointString)) {
logger.warn("reconnect set checkpoint: {}", checkpointString);
context.getParams().updateCheckpoint(checkpointString);
}
connection = ConnectionFactory.instance().createConnection(context);
if (connection != null) {
logger.warn("reconnect SUCC");
retryTimes = 0;
reconnect.compareAndSet(true, false);
return ReconnectState.SUCCESS;
}
logger.error("failed to reconnect, retry count: {}, max: {}", ++retryTimes, ClientConf.MAX_RECONNECT_TIMES);
// not success, retry next time
reconnect.set(true);
return ReconnectState.RETRY;
} catch (Exception e) {
logger.error("failed to reconnect, retry count: {}, max: {}", ++retryTimes, ClientConf.MAX_RECONNECT_TIMES);
// not success, retry next time
reconnect.set(true);
return ReconnectState.RETRY;
} finally {
reconnecting.set(false);
}
}
return ReconnectState.SUCCESS;
}
public void triggerReconnect() {
// reconnection action guard, avoid concurrent or multiple invoke
if (reconnecting.compareAndSet(false, true)) {
reconnect.compareAndSet(false, true);
}
}
public synchronized void addListener(RecordListener recordListener) {
listeners.add(recordListener);
}
public synchronized void addStatusListener(StatusListener statusListener) {
statusListeners.add(statusListener);
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class Connection {
private static final Logger logger = LoggerFactory.getLogger(Connection.class);
private Channel channel;
private final AtomicBoolean closed = new AtomicBoolean(false);
public Connection(Channel channel) {
this.channel = channel;
}
public void close() {
if (!closed.compareAndSet(false, true)) {
logger.warn("connection already closed");
}
if (channel != null) {
if (channel.isActive()) {
try {
channel.close().addListener(this::logCloseResult).syncUninterruptibly();
} catch (Exception e) {
logger.warn("close connection to remote address {} exception",
NetworkUtil.parseRemoteAddress(channel), e);
}
}
channel = null;
}
}
private void logCloseResult(Future future) {
if (future.isSuccess()) {
if (logger.isInfoEnabled()) {
logger.info("close connection to remote address {} success", NetworkUtil.parseRemoteAddress(channel));
}
} else {
logger.warn("close connection to remote address {} fail", NetworkUtil.parseRemoteAddress(channel), future.cause());
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
public class ConnectionFactory {
private static class Singleton {
private static final ConnectionFactory INSTANCE = new ConnectionFactory();
}
public static ConnectionFactory instance() {
return Singleton.INSTANCE;
}
private ConnectionFactory() {
}
public static final AttributeKey<StreamContext> CONTEXT_KEY = AttributeKey.valueOf("context");
private static final EventLoopGroup WORKER_GROUP = NettyEventLoopUtil.newEventLoopGroup(1,
new NamedThreadFactory("log-proxy-client-worker", true));
private Bootstrap initBootstrap(SslContext sslContext) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(WORKER_GROUP)
.channel(NettyEventLoopUtil.getClientSocketChannelClass())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
if (sslContext != null) {
ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()));
}
ch.pipeline().addLast(new IdleStateHandler(ClientConf.IDLE_TIMEOUT_S, 0, 0));
ch.pipeline().addLast(new ClientHandler());
}
});
return bootstrap;
}
public Connection createConnection(StreamContext context) throws LogProxyClientException {
Bootstrap bootstrap = initBootstrap(context.getSslContext());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConf.CONNECT_TIMEOUT_MS);
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(context.getParams().getHost(), context.getParams().getPort()));
channelFuture.channel().attr(CONTEXT_KEY).set(context);
channelFuture.awaitUninterruptibly();
if (!channelFuture.isDone()) {
throw new LogProxyClientException(ErrorCode.E_CONNECT, "timeout of create connection!");
}
if (channelFuture.isCancelled()) {
throw new LogProxyClientException(ErrorCode.E_CONNECT, "cancelled by user of create connection!");
}
if (!channelFuture.isSuccess()) {
throw new LogProxyClientException(ErrorCode.E_CONNECT, "failed to create connection!", channelFuture.cause());
}
return new Connection(channelFuture.channel());
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ConnectionConfig;
import com.oceanbase.clogproxy.common.packet.LogType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
public class ConnectionParams {
private final LogType logType;
private final String clientId;
private final String host;
private final int port;
private final ConnectionConfig connectionConfig;
private String configurationString;
private ProtocolVersion protocolVersion;
private boolean enableMonitor;
public ConnectionParams(LogType logType, String clientId, String host, int port, ConnectionConfig connectionConfig) {
this.logType = logType;
this.clientId = clientId;
this.host = host;
this.port = port;
this.connectionConfig = connectionConfig;
this.configurationString = connectionConfig.generateConfigurationString();
}
public void updateCheckpoint(String checkpoint) {
connectionConfig.updateCheckpoint(checkpoint);
configurationString = connectionConfig.generateConfigurationString();
}
@Override
public String toString() {
return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
public String info() {
return clientId + ": " + connectionConfig.toString();
}
public LogType getLogType() {
return logType;
}
public String getClientId() {
return clientId;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getConfigurationString() {
return configurationString;
}
public ProtocolVersion getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(ProtocolVersion protocolVersion) {
this.protocolVersion = protocolVersion;
}
public boolean isEnableMonitor() {
return enableMonitor;
}
public void setEnableMonitor(boolean enableMonitor) {
this.enableMonitor = enableMonitor;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
private final boolean isDaemon;
public NamedThreadFactory() {
this("ThreadPool");
}
public NamedThreadFactory(String name) {
this(name, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-" + POOL_NUMBER.getAndIncrement() + "-thread-";
isDaemon = daemon;
}
/**
* Create a thread.
*
* @see ThreadFactory#newThread(Runnable)
*/
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(isDaemon);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
public class NettyEventLoopUtil {
/** check whether epoll enabled, and it would not be changed during runtime. */
private static boolean epollEnabled = Epoll.isAvailable();
/**
* Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
*
* @param nThreads number of threads
* @param threadFactory ThreadFactory
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}
/**
* @return a SocketChannel class suitable for the given EventLoopGroup implementation
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import io.netty.handler.ssl.SslContext;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto.RuntimeStatus;
public class StreamContext {
public static class TransferPacket {
private HeaderType type;
private LogMessage record;
private RuntimeStatus status;
public TransferPacket(LogMessage record) {
this.type = HeaderType.DATA_CLIENT;
this.record = record;
}
public TransferPacket(RuntimeStatus status) {
this.type = HeaderType.STATUS;
this.status = status;
}
public HeaderType getType() {
return type;
}
public LogMessage getRecord() {
return record;
}
public RuntimeStatus getStatus() {
return status;
}
}
private final BlockingQueue<TransferPacket> recordQueue = new LinkedBlockingQueue<>(ClientConf.TRANSFER_QUEUE_SIZE);
private final ClientStream stream;
ConnectionParams params;
private final SslContext sslContext;
public StreamContext(ClientStream stream, ConnectionParams params, SslContext sslContext) {
this.stream = stream;
this.params = params;
this.sslContext = sslContext;
}
public ConnectionParams getParams() {
return params;
}
public SslContext getSslContext() {
return sslContext;
}
public ClientStream stream() {
return stream;
}
public BlockingQueue<TransferPacket> recordQueue() {
return recordQueue;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.constants;
public class DataType {
public static final byte DT_UNKNOWN = 0x00;
public static final byte DT_INT8 = 0x01;
public static final byte DT_UINT8 = 0x02;
public static final byte DT_INT16 = 0x03;
public static final byte DT_UINT16 = 0x04;
public static final byte DT_INT32 = 0x05;
public static final byte DT_UINT32 = 0x06;
public static final byte DT_INT64 = 0x07;
public static final byte DT_UINT64 = 0x08;
public static final byte DT_FLOAT = 0x09;
public static final byte DT_DOUBLE = 0x0a;
public static final byte DT_STRING = 0x0b;
public static final byte TOTAL_DT = 0x0c;
public static final byte DT_MASK = 0x0f;
public static final byte DC_ARRAY = 0x10;
public static final byte DC_NULL = 0x20;
public static final byte DC_MASK = 0x30;
public static final int getDataTypeLen() {
return 2;
}
public static final int getStringLengthLen() {
return 4;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.enums;
public enum DBType {
MYSQL,
OCEANBASE,
HBASE,
ORACLE,
OCEANBASE1,
DB2,
UNKNOWN
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.enums;
public enum ErrorCode {
////////// 0~499: process error ////////////
/**
* general error
*/
NONE(0),
/**
* inner error
*/
E_INNER(1),
/**
* failed to connect
*/
E_CONNECT(2),
/**
* exceed max retry connect count
*/
E_MAX_RECONNECT(3),
/**
* user callback throws exception
*/
E_USER(4),
////////// 500~: receive data error ////////////
/**
* unknown data protocol
*/
E_PROTOCOL(500),
/**
* unknown header type
*/
E_HEADER_TYPE(501),
/**
* failed to auth
*/
NO_AUTH(502),
/**
* unknown compress type
*/
E_COMPRESS_TYPE(503),
/**
* length not match
*/
E_LEN(504),
/**
* failed to parse data
*/
E_PARSE(505);
int code;
ErrorCode(int code) {
this.code = code;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
public class DRCClientException extends Exception {
private static final long serialVersionUID = 1L;
public DRCClientException() {
}
public DRCClientException(final String message) {
super(message);
}
}
\ No newline at end of file
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
public class DRCClientRunTimeException extends RuntimeException {
public DRCClientRunTimeException(String errMessage) {
super(errMessage);
}
public DRCClientRunTimeException() {
super();
}
public DRCClientRunTimeException(Throwable cause) {
super(cause);
}
public DRCClientRunTimeException(String errMessage, Throwable cause) {
super(errMessage, cause);
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
public class LogProxyClientException extends RuntimeException {
private ErrorCode code = ErrorCode.NONE;
public LogProxyClientException(ErrorCode code, String message) {
super(message);
this.code = code;
}
public LogProxyClientException(ErrorCode code, Exception exception) {
super(exception.getMessage(), exception.getCause());
this.code = code;
}
public LogProxyClientException(ErrorCode code, String message, Throwable throwable) {
super(message, throwable);
this.code = code;
}
public boolean needStop() {
return (code == ErrorCode.E_MAX_RECONNECT) || (code == ErrorCode.E_PROTOCOL) ||
(code == ErrorCode.E_HEADER_TYPE) || (code == ErrorCode.NO_AUTH) ||
(code == ErrorCode.E_COMPRESS_TYPE) || (code == ErrorCode.E_LEN) ||
(code == ErrorCode.E_PARSE);
}
public ErrorCode getCode() {
return code;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.fliter;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.DRCClientException;
import com.oceanbase.clogproxy.client.util.DataFilterUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DataFilter implements DataFilterBase {
/* Used to be compatibility with old version */
private String oldBranchDb;
private String filterInfo;
// String save the source filter string user passed through
private String sourceFilter;
// String save the filter that will be sent to store to acquire data
// For ob1.0, that must be four columns, dbname like a.b.
private String connectStoreFilterConditions;
private final StringBuilder builder;
private final Map<String, Map<String, List<String>>> requires;
private final Map<String, Map<String, List<String>>> dbTableColsReflectionMap;
//If all cols needed is '*', then we don't need do filter operation.
//In that case, we can save a lot compute.
private boolean isAllMatch = true;
private String tenant;
public DataFilter() {
oldBranchDb = null;
filterInfo = null;
builder = new StringBuilder();
requires = new HashMap<String, Map<String, List<String>>>();
dbTableColsReflectionMap = new HashMap<String, Map<String, List<String>>>();
}
/**
* Initialize the filter using formatted string.
* @param tenant tenant name
* @param tableFields the formatted filter information such as
* "tableName1;fieldName1;fieldName2|tableName2;fieldName1". No ";" or "|" should be
* transfer to
* *.tableName1.fieldName1|*.tableName1.fieldName2|...
* added at the beginning or end of the string.
*/
public DataFilter(String tenant, String tableFields) {
this(tableFields);
this.tenant = tenant;
}
public DataFilter(String tableFields) {
oldBranchDb = null;
builder = new StringBuilder();
requires = new HashMap<String, Map<String, List<String>>>();
dbTableColsReflectionMap = new HashMap<String, Map<String, List<String>>>();
builder.append(tableFields);
this.sourceFilter = tableFields;
}
/**
* The current version uses topic instead of dbname, so use the
* method to be compatible with the older version.
* @param db is the original branched db name.
*/
@Override
public void setBranchDb(final String db) {
oldBranchDb = db;
}
/**
* Add more filter information after initializing, note that the user should
* make it consistent to the formatted parameters.
* @param tableFields consistent formatted filter information.
*/
public void addTablesFields(String tableFields) {
builder.append(tableFields);
}
@Override
public boolean getIsAllMatch() {
return isAllMatch;
}
@Override
public Map<String, Map<String, List<String>>> getReflectionMap() {
return dbTableColsReflectionMap;
}
@Override
public Map<String, Map<String, List<String>>> getRequireMap() {
return requires;
}
//Before validate function called, toString may return null;
//Yet, user should not care about this. That's inter behavior.
@Override
public String toString() {
return connectStoreFilterConditions;
}
/**
* The validate function will form mysql, ob0.5, oracle eg filter condition.
*/
private boolean validateNormalFilterString() {
if (filterInfo != null) {
return true;
}
String s = builder.toString();
String[] tbs = s.split("\\|");
int colStart;
StringBuilder builder1 = new StringBuilder();
for (String s1 : tbs) {
String[] tb = s1.split("[;,\\.]");
if (tb.length > 0) {
String itemDb;
String itemTb;
if (tb.length <= 2) {
if (oldBranchDb != null) {
itemDb = oldBranchDb;
} else {
itemDb = "*";
}
colStart = 1;
itemTb = tb[0];
} else {
colStart = 2;
itemDb = tb[0];
itemTb = tb[1];
}
if (tenant != null) {
builder1.append(tenant).append(".");
}
builder1.append(itemDb).append(".").append(itemTb).append("|");
if (tb.length > colStart) {
List<String> cols = new ArrayList<String>();
for (int i = colStart; i < tb.length; i++) {
cols.add(tb[i]);
//here, we don't use trim in case that " *" or "* " or " * " is kind of col names
if (!"*".equals(tb[i])) {
isAllMatch = false;
}
}
DataFilterUtil.putColNames(itemDb, itemTb, cols, this);
}
}
}
if (builder1.charAt(builder1.length() - 1) == '|') {
builder1.deleteCharAt(builder1.length() - 1);
}
filterInfo = builder1.toString();
connectStoreFilterConditions = filterInfo;
return true;
}
/**
* The validate function will reform the filter condition and cols info
*/
private boolean validateOB10FilterString() {
if (sourceFilter == null) {
return false;
}
String[] tenantAndDbAndTBAndCols = sourceFilter.split("\\|");
requires.clear();
StringBuilder filterConditionBuilder = new StringBuilder();
for (String s1 : tenantAndDbAndTBAndCols) {
String[] tb = s1.split("[;,\\.]");
if (tb.length < 4) {
// tenant dbname tableName columnName is strictly required for 0b1.0
return false;
}
String tenant = tb[0];
String dbname = (oldBranchDb != null) ? oldBranchDb : tb[1];
String tableName = tb[2];
List<String> cols = new ArrayList<String>();
for (int i = 3; i < tb.length; ++i) {
cols.add(tb[i]);
if (!"*".equals(tb[i])) {
isAllMatch = false;
}
}
//format string passed to store
String formatDBName = tenant + "." + dbname;
filterConditionBuilder.append(formatDBName).append(FILTER_SEPARATOR_INNER);
filterConditionBuilder.append(tableName).append(FILTER_SEPARATOR);
DataFilterUtil.putColNames(formatDBName, tableName, cols, this);
}
connectStoreFilterConditions = filterConditionBuilder.toString();
return true;
}
// When source type is ocean base 1.0, filter's content is like tenant.dbname.tablename.colvalues| ....
@Override
public boolean validateFilter(DBType dbType) throws DRCClientException {
switch (dbType) {
case OCEANBASE1: {
return validateOB10FilterString();
}
default: {
return validateNormalFilterString();
}
}
}
@Override
public String getConnectStoreFilterConditions() {
return this.connectStoreFilterConditions;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.fliter;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.DRCClientException;
import java.util.List;
import java.util.Map;
public interface DataFilterBase {
String FILTER_SEPARATOR_INNER = ".";
String FILTER_SEPARATOR = "|";
/**
* Get the formatted filter string which will be delivered to store.
* Notice: before validate filter function called, getConnectStoreFilterConditions may return null.
* @return filter string
*/
String getConnectStoreFilterConditions();
/**
* Validate if the filter user passed is legal
* @param dbType database type which may be ob, mysql, oracle.
* For now, only ob1.0 need special handle which 4 tuple contains tenant, db, tb, cols is strictly required.
* @return true if filter is valid
* @throws DRCClientException if exception occurs
*/
boolean validateFilter(DBType dbType) throws DRCClientException;
/**
* This function is compatible for old usage.
* @param branchDb the original branched db name.
*/
void setBranchDb(String branchDb);
/**
* Fast match if cols are all needed.
* @return true if all cols are needed
*/
boolean getIsAllMatch();
Map<String, Map<String, List<String>>> getReflectionMap();
Map<String, Map<String, List<String>>> getRequireMap();
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.message.DataMessage;
public interface FieldParseListener {
public void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception;
}
\ No newline at end of file
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.message.LogMessage;
public interface RecordListener {
void notify(LogMessage record);
void onException(LogProxyClientException e);
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
public interface StatusListener {
void notify(LogProxyProto.RuntimeStatus status);
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
import java.io.UnsupportedEncodingException;
public class ByteString {
private int len;
private int offset;
private byte[] bytes;
public ByteString(byte[] bytes, int len) {
this.bytes = bytes;
this.len = len;
}
public ByteString(byte[] bytes, int offset, int len) {
this.bytes = bytes;
this.len = len;
this.offset = offset;
}
/**
* Convert the bytes to any encoding.
*
* @param encoding the target encoding.
* @return the encoded string.
*/
public String toString(final String encoding) {
if (len == 0) {
return "";
}
if ("binary".equalsIgnoreCase(encoding)) {
throw new IllegalArgumentException(
"field encoding: binary, use getBytes() instead of toString()");
}
String realEncoding = encoding;
if (encoding.isEmpty() || "null".equalsIgnoreCase(encoding)) {
realEncoding = "ASCII";
} else if ("utf8mb4".equalsIgnoreCase(encoding)) {
realEncoding = "utf8";
} else if ("latin1".equalsIgnoreCase(encoding)) {
realEncoding = "cp1252";
} else if ("latin2".equalsIgnoreCase(encoding)) {
realEncoding = "iso-8859-2";
}
try {
return new String(bytes, offset, len, realEncoding);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
if (len == 0) {
return "";
}
byte[] byteArray = this.bytes;
char[] charArray = new char[len];
for (int i = 0; i < len; i++) {
charArray[i] = (char) byteArray[i + offset];
}
return String.valueOf(charArray);
}
public byte[] getBytes() {
byte[] t = new byte[len];
System.arraycopy(bytes, offset, t, 0, len);
return t;
}
public int getLen() {
return len;
}
public int getOffset() {
return offset;
}
public byte[] getRawBytes() {
return bytes;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof ByteString)) {
return false;
}
ByteString other = (ByteString) obj;
if (this.getLen() != other.getLen()) {
return false;
}
for (int i = 0; i < getLen(); i++) {
byte x = bytes[offset + i];
byte y = other.getRawBytes()[other.getOffset() + i];
if (x != y) {
return false;
}
}
return true;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
public class Checkpoint {
private String recordId;
private String position;
private String timestamp;
private String serverId;
private static final String DELIMITER = ":";
public Checkpoint() {
recordId = position = serverId = timestamp = null;
}
public Checkpoint(final String recordId, final String position, final String serverId,
final String timestamp) {
this.recordId = recordId;
this.position = position;
this.serverId = serverId;
this.timestamp = timestamp;
}
public Checkpoint(final Checkpoint cp) {
this(cp.recordId, cp.position, cp.serverId, cp.timestamp);
}
public boolean equals(final String cp) {
if (position != null && cp != null) {
return position.equals(cp);
}
if (timestamp != null && cp != null) {
return timestamp.equals(cp);
}
return false;
}
public String getRecordId() {
return recordId;
}
public void setRecordId(final String recordId) {
this.recordId = recordId;
}
public String getPosition() {
return position;
}
public void setPosition(final String position) {
String cp = position;
if (cp.contains("@mysql-bin.")) {
int m = cp.indexOf("@");
int p = cp.indexOf(".");
String cp1 = cp.substring(0, m);
String cp2 = cp.substring(p + 1);
long lcp2 = Long.parseLong(cp2);
cp = cp1 + "@" + lcp2;
}
this.position = cp;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(final String timestamp) {
this.timestamp = timestamp;
}
public String getServerId() {
return serverId;
}
public void setServerId(String serverId) {
this.serverId = serverId;
}
@Override
public String toString() {
String cp1 = null, cp2 = null;
if (position != null && !position.isEmpty()) {
int in = position.indexOf('@');
cp1 = position.substring(in + 1);
cp2 = position.substring(0, in);
}
StringBuilder builder = new StringBuilder();
if (serverId == null || serverId.isEmpty()) {
builder.append(DELIMITER).append(DELIMITER);
} else {
int in = serverId.indexOf('-');
String db = serverId.substring(0, in);
String dbport = serverId.substring(in + 1);
builder.append(db).append(DELIMITER).append(dbport).append(DELIMITER);
}
if (cp1 != null) {
builder.append(cp1).append(DELIMITER).append(cp2).append(DELIMITER);
} else {
builder.append(DELIMITER).append(DELIMITER);
}
if (timestamp != null) {
builder.append(timestamp).append(DELIMITER);
} else {
builder.append(DELIMITER);
}
if (recordId != null) {
builder.append(recordId);
}
return builder.toString();
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
import com.oceanbase.clogproxy.client.util.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
public class Message {
protected int type;
protected long id;
protected Map<String, String> attributes;
//this attribute means the source store's ip and port which this client is pulling data from.
public static final String SOURCEIPANDPORT = "sourceIPAndPort";
public Message() {
attributes = new HashMap<String, String>();
}
public long getMid() {
return id;
}
public int getType() {
return type;
}
public String getAttribute(final String key) {
return attributes.get(key);
}
public String getStoreIpAndPort() {
return attributes.get(SOURCEIPANDPORT);
}
public void setId(long id) {
this.id = id;
}
public void setType(int type) {
this.type = type;
}
public void addAttribute(final String key, final String value) {
attributes.put(key, value);
}
public void addAttributes(final Map<String, String> attrs) {
attributes.putAll(attrs);
}
@SuppressWarnings("deprecation")
void mergeFrom(final DataInputStream reader) throws IOException {
String line;
while (!(line = reader.readLine()).isEmpty()) {
String[] kv = StringUtils.split(line, ':');
if (kv.length != 2) {
throw new IOException("Parse message attribute " + line + " error");
}
addAttribute(kv[0], kv[1]);
}
}
public void clear() {
type = 0;
id = -1;
attributes.clear();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for (Entry<String, String> entry : attributes.entrySet()) {
builder.append(entry.getKey() + ":" + entry.getValue());
builder.append(System.getProperty("line.separator"));
}
builder.append(System.getProperty("line.separator"));
return builder.toString();
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import com.oceanbase.clogproxy.client.constants.DataType;
import com.oceanbase.clogproxy.client.message.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
public class BinaryMessageUtils {
private static final int PREFIX_LENGTH = 12;
/**
* get string begin with offset
*
* @param data bytes array
* @param offset read offset
* @param encoding string encoding
* @return result string
* @throws UnsupportedEncodingException when the encoding is not supported
*/
public static String getString(byte[] data, int offset, String encoding) throws UnsupportedEncodingException {
ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN);
wrapByteBuf.readerIndex(PREFIX_LENGTH + offset);
byte t = wrapByteBuf.readByte();
if ((t & DataType.DC_ARRAY) != 0 || (t & DataType.DC_NULL) != 0) {
return null;
}
int length = (int) wrapByteBuf.readUnsignedInt();
return new String(wrapByteBuf.array(), PREFIX_LENGTH + 5 + offset, length - 1, encoding);
}
/**
* get list begin with offset
*
* @param data bytes array
* @param offset read offset
* @return result list
* @throws IOException if data type is unsigned long
*/
public static List getArray(byte[] data, int offset) throws IOException {
ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN);
wrapByteBuf.readerIndex(PREFIX_LENGTH + offset);
byte t = wrapByteBuf.readByte();
if ((t & DataType.DC_ARRAY) == 0) {
return null;
}
int count = (int) wrapByteBuf.readUnsignedInt();
if (count == 0) {
return null;
}
List lists = new ArrayList(count);
int type = t & DataType.DT_MASK;
for (int i = 0; i < count; i++) {
switch (type) {
case DataType.DT_INT8: {
lists.add(wrapByteBuf.readByte());
break;
}
case DataType.DT_UINT8: {
lists.add((int) wrapByteBuf.readUnsignedByte());
break;
}
case DataType.DT_INT16: {
lists.add(wrapByteBuf.readShort());
break;
}
case DataType.DT_UINT16: {
lists.add((int) wrapByteBuf.readUnsignedShort());
break;
}
case DataType.DT_INT32: {
lists.add(wrapByteBuf.readInt());
break;
}
case DataType.DT_UINT32: {
lists.add((long) wrapByteBuf.readUnsignedInt());
break;
}
case DataType.DT_INT64: {
lists.add((long) wrapByteBuf.readLong());
break;
}
case DataType.DT_UINT64: {
throw new IOException("Unsupported unsigned long");
}
}
}
return lists;
}
/**
* get ByteString begin with offset
*
* @param data bytes array
* @param offset read offset
* @return list of ByteString
*/
public static List<ByteString> getByteStringList(byte[] data, long offset) {
if (offset == -1) {
return null;
}
ByteBuf wrapByteBuf = Unpooled.wrappedBuffer(data).order(ByteOrder.LITTLE_ENDIAN);
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + offset));
byte t = wrapByteBuf.readByte();
if ((t & DataType.DC_ARRAY) == 0 || (t & DataType.DT_MASK) != DataType.DT_STRING) {
throw new RuntimeException("Data type not array or not string");
}
int count = wrapByteBuf.readInt();
if (count == 0) {
return null;
}
int readBytes = 5;
readBytes += (count + 1) * 4;
List<ByteString> lists = new ArrayList<>(count);
int currentOffset = (int) wrapByteBuf.readUnsignedInt();
int nextOffset;
for (int i = 0; i < count; i++) {
nextOffset = (int) wrapByteBuf.readUnsignedInt();
if (nextOffset == currentOffset) {
lists.add(null);
} else {
lists.add(new ByteString(wrapByteBuf.array(),
PREFIX_LENGTH + currentOffset + readBytes + (int) offset,
nextOffset - currentOffset - 1));
}
currentOffset = nextOffset;
}
return lists;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import java.lang.management.ManagementFactory;
public class ClientIdGenerator {
/**
* LocalIP_PID_currentTimestamp
* pattern may be change, never depend on the content of this
* @return client id string
*/
public static String generate() {
return NetworkUtil.getLocalIp() + "_" + getProcessId() + "_" + (System.currentTimeMillis() / 1000);
}
private static String getProcessId() {
// Note: may fail in some JVM implementations
// therefore fallback has to be provided
// something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
final int index = jvmName.indexOf('@');
if (index < 1) {
return "NOPID";
}
try {
return Long.toString(Long.parseLong(jvmName.substring(0, index)));
} catch (NumberFormatException e) {
return "NOPID";
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import com.oceanbase.clogproxy.client.fliter.DataFilterBase;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DataFilterUtil {
/**
*
* @param db db name
* @param tb table name
* @param cols column names
* @param dataFilterBase DataFilterBase
*/
public static void putColNames(String db, String tb, List<String> cols,
DataFilterBase dataFilterBase) {
if (tb == null) {
return;
}
Map<String, Map<String, List<String>>> dbAndTablePair = dataFilterBase.getRequireMap();
boolean founded = false;
for (Map.Entry<String, Map<String, List<String>>> dbEntry : dbAndTablePair.entrySet()) {
if (db == null || db.equalsIgnoreCase(dbEntry.getKey())) {
for (Map.Entry<String, List<String>> entry : dbEntry.getValue().entrySet()) {
if (tb.equalsIgnoreCase(entry.getKey())) {
founded = true;
entry.getValue().addAll(cols);
}
}
if (!founded) {
// db is already in the filter, but the table is not, so add the table
Map<String, List<String>> tabMap = dbEntry.getValue();
tabMap.put(tb, cols);
founded = true;
}
}
}
if (!founded) {
// db is not in the filter, so add two maps
Map<String, List<String>> tabMap = new HashMap<String, List<String>>();
tabMap.put(tb, cols);
dbAndTablePair.put(db, tabMap);
}
}
/**
* Use the give db and tb name to retrieve cols list
* @param db db name
* @param tb table name
* @param dataFilterBase DataFilterBase
* @return cols reference to corresponded db name and table name
* Note: this function get cols from map in old DataFilter implementation
*/
public static List<String> getColNamesWithMapping(String db, String tb,
DataFilterBase dataFilterBase) {
if (tb == null) {
return null;
}
Map<String, Map<String, List<String>>> dbAndTablePair = dataFilterBase.getReflectionMap();
Map<String, List<String>> tableAndCols = dbAndTablePair.get(db);
if (tableAndCols == null) {
//if we don't find tableAndCols, that mean this dbName appears for the first time,
//and we use getColNames to require the missing cols and update map;
tableAndCols = new HashMap<String, List<String>>();
List<String> cols = getColNames(db, tb, dataFilterBase);
tableAndCols.put(tb, cols);
dbAndTablePair.put(db, tableAndCols);
return cols;
} else {
List<String> needCols = tableAndCols.get(tb);
//we propose the cols can't be null, so we use null to determinate whether the cols we
//needed has existed in the map
if (needCols == null) {
//the cols we needed is missing ,use getColNames to require the missing cols
List<String> cols = getColNames(db, tb, dataFilterBase);
tableAndCols.put(tb, cols);
return cols;
} else {
//the cols existed, just return the value.
return needCols;
}
}
}
/**
* Use the give db and tb name to retrieve cols list
* @param db db name
* @param tb table name
* @param dataFilterBase DataFilterBase
* @return cols reference to corresponded db name and table name
*/
public static List<String> getColNames(String db, String tb, DataFilterBase dataFilterBase) {
if (tb == null) {
return null;
}
Map<String, Map<String, List<String>>> requireMap = dataFilterBase.getRequireMap();
for (Map.Entry<String, Map<String, List<String>>> dbEntry : requireMap.entrySet()) {
StringBuffer buf = new StringBuffer(dbEntry.getKey());
processStringToRegularExpress(buf);
if (db == null || db.toLowerCase().matches(buf.toString().toLowerCase())) {
for (Map.Entry<String, List<String>> entry : dbEntry.getValue().entrySet()) {
buf = new StringBuffer(entry.getKey());
processStringToRegularExpress(buf);
if (tb.toLowerCase().matches(buf.toString().toLowerCase())) {
return entry.getValue();
}
}
}
}
return null;
}
/**
* This function will first replace all "." to "\.", then replace all "*" to ".*"
*
* @param stringBuffer original string buffer
*/
public static void processStringToRegularExpress(StringBuffer stringBuffer) {
int index;
int beginIndex = 0;
while (-1 != (index = stringBuffer.indexOf(".", beginIndex))) {
stringBuffer.insert(index, '\\');
beginIndex = index + 2;
}
beginIndex = 0;
while (-1 != (index = stringBuffer.indexOf("*", beginIndex))) {
stringBuffer.insert(index, '.');
beginIndex = index + 2;
}
}
/**
* Judge if the given col name exists in col lists
* @param col col to be judged
* @param s cols list
* @return true if exists in, else false
*/
public static boolean isColInArray(final String col, final List<String> s) {
for (int i = 0; i < s.size(); i++) {
if ("*".equals(s.get(i)) || col.equalsIgnoreCase(s.get(i))) {
return true;
}
}
return false;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ThreadFactory;
public class NettyEventLoopUtil {
/** check whether epoll enabled, and it would not be changed during runtime. */
private static boolean epollEnabled = Epoll.isAvailable();
/**
* Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
*
* @param nThreads number of threads
* @param threadFactory ThreadFactory
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}
/**
* @return a SocketChannel class suitable for the given EventLoopGroup implementation
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import java.util.ArrayList;
import java.util.List;
public class StringUtils {
/**
* Split a string by one separator character. The performance
* is better than Java String split.
*
* @param str is the string need be split.
* @param separatorChar the single separator character.
* @return the array of split items.
*/
public static String[] split(String str, char separatorChar) {
if (str == null) {
return null;
}
int length = str.length();
if (length == 0) {
return null;
}
List<String> list = new ArrayList<String>();
int i = 0;
int start = 0;
boolean match = false;
while (i < length) {
if (str.charAt(i) == separatorChar) {
if (match) {
list.add(str.substring(start, i));
match = false;
}
start = ++i;
continue;
}
match = true;
i++;
}
if (match) {
list.add(str.substring(start, i));
}
return (String[]) list.toArray(new String[list.size()]);
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
import java.util.Map;
public class Validator {
private static final int MINIMAL_VALID_PORT = 1;
private static final int MAXIMAL_VALID_PORT = 65535;
public static void notNull(Object obj, String message) {
if (obj == null) {
throw new NullPointerException(message);
}
}
public static void validatePort(int port, String message) {
if (port < MINIMAL_VALID_PORT || port >= MAXIMAL_VALID_PORT) {
throw new IllegalArgumentException(message);
}
}
public static void notEmpty(String val, String message) {
if (val == null || val.isEmpty()) {
throw new IllegalArgumentException(message);
}
}
public static void notEmpty(Map<String, String> map, String message) {
if (map == null || map.isEmpty()) {
throw new IllegalArgumentException(message);
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.message.LogMessage;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.junit.Ignore;
import org.junit.Test;
import javax.net.ssl.SSLException;
@Ignore
public class LogProxyClientTest {
@Test
public void testLogProxyClient() {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@sys");
config.setPassword("root@sys");
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.test.*");
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config);
client.addListener(new RecordListener() {
@Override
public void notify(LogMessage record) {
System.out.println(record);
}
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
System.out.println(e.getMessage());
client.stop();
}
}
});
client.start();
client.join();
}
@Test
public void testLogProxyClientWithSsl() throws SSLException {
ObReaderConfig config = new ObReaderConfig();
config.setRsList("127.0.0.1:2882:2881");
config.setUsername("root@sys");
config.setPassword("root@sys");
config.setStartTimestamp(0L);
config.setTableWhiteList("sys.test.*");
LogProxyClient client = new LogProxyClient("127.0.0.1", 2983, config, sslContext());
client.addListener(new RecordListener() {
@Override
public void notify(LogMessage record) {
System.out.println(record);
}
@Override
public void onException(LogProxyClientException e) {
if (e.needStop()) {
System.out.println(e.getMessage());
client.stop();
}
}
});
client.start();
client.join();
}
private SslContext sslContext() throws SSLException {
return SslContextBuilder.forClient()
.sslProvider(SslContext.defaultClientProvider())
.trustManager(this.getClass().getClassLoader().getResourceAsStream("certs/ca.crt"))
.keyManager(this.getClass().getClassLoader().getResourceAsStream("certs/client.crt"),
this.getClass().getClassLoader().getResourceAsStream("certs/client.key"))
.build();
}
}
-----BEGIN CERTIFICATE-----
MIICeDCCAeGgAwIBAgIJAMK6EbZONqr5MA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV
BAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5
MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBhbnkgTHRkMB4XDTIxMDkxMzA1NTk1MloX
DTIxMTAxMzA1NTk1MlowVTELMAkGA1UEBhMCQ04xETAPBgNVBAgMCFpoZUppYW5n
MRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFu
eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPA7Wl5b5q3anZoGorUE
9Q/7WexpY3YrjawseYwzBIgtuQlOG/K8EZO2VztcsY1fkKQfOy56Y4awpWPVsM13
GAl/qkT6Dw358O14fadQ5UVnama8UB52DPsXHkvoZQBT45nXckWYTyycr3WhzlH/
fwygORnTJujhcrk69IkROAjpAgMBAAGjUDBOMB0GA1UdDgQWBBRa9yv01ZQEklvt
o07396rg0lemyTAfBgNVHSMEGDAWgBRa9yv01ZQEklvto07396rg0lemyTAMBgNV
HRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GBAEHhtxKnQJ7DHcoloO61OI3TCxuf
9P/qYcLn+PkkAEHbOxG/+hLom/AgiI4licKGAtrNtxqfHc/05yFQYS6KMmtJWk3q
wVuCUVHoZbEdkubV0quBtr5MDpQC7IzRCFmPxNRt7G8EAYcoEsPoMFZCMEh4hj6i
KXXPa+iYoc2HHsX+
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICZjCCAc8CCQDnFsEOdj0oqDANBgkqhkiG9w0BAQsFADBVMQswCQYDVQQGEwJD
TjERMA8GA1UECAwIWmhlSmlhbmcxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoG
A1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDAeFw0yMTA5MTMxMTAzMTNaFw0yMTEw
MTMxMTAzMTNaMIGZMQswCQYDVQQGEwJDTjERMA8GA1UECAwIWmhlSmlhbmcxETAP
BgNVBAcMCEhhbmdaaG91MQ8wDQYDVQQKDAZBbnRGaW4xCzAJBgNVBAsMAk9CMRow
GAYDVQQDDBFrMDhqMTUyNjQuZXU5NXNxYTEqMCgGCSqGSIb3DQEJARYbd2FuZ3l1
bmxhaS53eWxAYW50Z3JvdXAuY29tMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQD3AucmBGU87Rqr87L3cjr/Ca6vXiKLxOVXDspSEVtpwiACeMK7NOvP9X+iXEOc
LLZWLJFIWWtiu9M5AXVsZe1pJci7bqAXsa96iaNQ8FdcLqy7MLSPj71bdJ8yv3wW
1Q2tzqRrKx/Q7YlJQ2RtRqJLQwfPOAhXKvYBzf3+ocDn+QIDAQABMA0GCSqGSIb3
DQEBCwUAA4GBAGRMRgU+sT5JFX58AYY0PO1YAiz94LJoLeBq9NttDicOgFGk2wlo
fAlbcDYZd9EBHJ4Cn/LCVLAR1icRGClQpObNz8fYdcsihWzFYy2cGDZ6m2ACSKeh
Agn4XgEfGbwjgCQuKORYBkDAfA0OU2lFcfzkrigktvwFgRMLsyseexb0
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAPcC5yYEZTztGqvz
svdyOv8Jrq9eIovE5VcOylIRW2nCIAJ4wrs068/1f6JcQ5wstlYskUhZa2K70zkB
dWxl7WklyLtuoBexr3qJo1DwV1wurLswtI+PvVt0nzK/fBbVDa3OpGsrH9DtiUlD
ZG1GoktDB884CFcq9gHN/f6hwOf5AgMBAAECgYEAp9AFEcLytI2xDRkngQzOH+6I
CwQ9HA/Mb3TQ5yh7nkIQVR2NznmJq2LyL2/XTwbhaXIl0a1OU2mfep8PE3G789nU
0a9DWteUxleS00OPvfsxkcKwqFLb3SFnGaZpZvShJ3Ug9mwnSRY59qQiTnbJ4lx3
edvYmWr+dX/MGoyuUzECQQD73AExkNBvKi0Ti+C/GjmIBLRKobgege26rTPtXnYt
SEbd+USR9q1dD6EF/CFbHp6cbv1ir8sRyrIICIv83l/DAkEA+xJ+jlrFe8o99US0
5i0kbZNqHTvgjXk/4eSXr3Bel02ljYSnP1FDxus+UXT6G1mUdymtCgWm9ppHNGm3
Z/S5kwJBAIZFXRGKrcSGDK/+A5x+I6vDLkcXfmwtQosiKavjj0dG4BkY+hiDFRum
6GajazkD0vV9KnMBW1ap5E3qGI+AEjcCQAPL6MwARWI00bEGw/GDFzzs8LrWb/PT
tIqW6VBG07dX/jvgmKLVeL/mSL/0k9+cACm5IJu5MCgkdxUs0BArXC8CQHjZIuVA
1KVhJpRiy9HtbQczoLjSm2SkD17Azv9H7MrlM4Db4qKNX9NUzsXkgKOm9R639aF6
b/uC6OcdsA0jPTE=
-----END PRIVATE KEY-----
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.oceanbase.logproxy.client</groupId>
<artifactId>logproxy-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>common</artifactId>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>
<dependencies>
<!-- common -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- net -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- data protocol -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
</dependencies>
</project>
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.config;
public class ShareConf {
public static boolean AUTH_PASSWORD_HASH = true;
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
public enum CompressType {
/**
* no compress
*/
NONE(0),
/**
* lz4 compress
*/
LZ4(1);
private int code;
CompressType(int code) {
this.code = code;
}
public static CompressType codeOf(int code) {
for (CompressType v : values()) {
if (v.code == code) {
return v;
}
}
return null;
}
public int code() {
return code;
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.common.packet;
import java.util.HashMap;
import java.util.Map;
public enum LogType {
/**
* LogProxy OceanBase LogReader
*/
OCEANBASE(0);
private final int code;
private static final Map<Integer, LogType> CODE_TYPES = new HashMap<>(values().length);
static {
for (LogType logCaptureType : values()) {
CODE_TYPES.put(logCaptureType.code, logCaptureType);
}
}
LogType(int code) {
this.code = code;
}
public int getCode() {
return this.code;
}
public static LogType fromString(String string) {
if (string == null) {
throw new NullPointerException("logTypeString is null");
}
return valueOf(string.toUpperCase());
}
public static LogType fromCode(int code) {
if (CODE_TYPES.containsKey(code)) {
return CODE_TYPES.get(code);
}
return null;
}
}
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册