未验证 提交 3208c901 编写于 作者: F Fankux 提交者: GitHub

introduce `startTimestampUs`, `clusterId`, `sysUser` and `sysPassword` to startup config (#58)

* start timestamp in us

* fix pom

* fix stuffs reviewed in pr

* upgrade version

* fix PR review issues

* fix ut
上级 6a399cdf
......@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.8-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -90,7 +90,6 @@ public class CryptoUtil {
System.arraycopy(cipherBytes, 0, iv, 0, Math.min(iv.length, cipherBytes.length));
} catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
System.out.println("failed to init AES key generator, exit!!! : " + e);
System.exit(-1);
}
}
......@@ -112,7 +111,6 @@ public class CryptoUtil {
| InvalidAlgorithmParameterException
| IllegalBlockSizeException
| BadPaddingException e) {
System.out.println("failed to encrypt AES 256 GCM: " + e);
return null;
}
}
......@@ -134,7 +132,6 @@ public class CryptoUtil {
| InvalidAlgorithmParameterException
| IllegalBlockSizeException
| BadPaddingException e) {
System.out.println("failed to decrypt AES 256 GCM: " + e);
return "";
}
}
......
......@@ -33,6 +33,11 @@ public enum DbTypeEnum {
DbCategoryEnum.RDB,
new HashSet<>(Arrays.asList("oceanbase_oracle_mode", "ob_in_oracle_mode"))),
MYSQL(DbCategoryEnum.RDB),
ORACLE(DbCategoryEnum.RDB),
DB2_LUW(DbCategoryEnum.RDB, Collections.singleton("db2")),
POSTGRESQL(DbCategoryEnum.RDB),
HBASE(DbCategoryEnum.NOSQL),
UNKNOWN(null);
DbTypeEnum(DbCategoryEnum category) {
......
......@@ -776,6 +776,9 @@ public class DataMessage extends Message {
DbTypeEnum dbType = getDbType();
this.checkDBType(dbType);
StringBuilder messageId = new StringBuilder();
if (dbType == DbTypeEnum.MYSQL) {
messageId.append(getServerId());
}
messageId.append("/").append(this.getCommonPart()).append("/");
if (dbType == DbTypeEnum.OB_MYSQL || dbType == DbTypeEnum.OB_ORACLE) {
......@@ -799,6 +802,9 @@ public class DataMessage extends Message {
private void checkDBType(DbTypeEnum dbType) {
switch (dbType) {
case MYSQL:
case ORACLE:
case DB2_LUW:
case OB_MYSQL:
case OB_ORACLE:
case OB_05:
......@@ -918,20 +924,40 @@ public class DataMessage extends Message {
if (StringUtils.isEmpty(dbTypeInStr)) {
return DbTypeEnum.UNKNOWN;
}
if ("oceanbase".equalsIgnoreCase(dbTypeInStr)) {
if ("mysql".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.MYSQL;
} else if ("oceanbase".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.OB_05;
} else if ("oracle".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.ORACLE;
} else if ("hbase".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.HBASE;
} else if ("oceanbase_1_0".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.OB_MYSQL;
} else if ("db2".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.DB2_LUW;
} else if ("postgresql".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.POSTGRESQL;
}
return DbTypeEnum.UNKNOWN;
}
public static DbTypeEnum parseDBTypeCode(int dbTypeCode) {
switch (dbTypeCode) {
case 0:
return DbTypeEnum.MYSQL;
case 1:
return DbTypeEnum.OB_05;
case 2:
return DbTypeEnum.HBASE;
case 3:
return DbTypeEnum.ORACLE;
case 4:
return DbTypeEnum.OB_MYSQL;
case 5:
return DbTypeEnum.DB2_LUW;
case 6:
return DbTypeEnum.POSTGRESQL;
default:
return DbTypeEnum.UNKNOWN;
}
......
......@@ -1014,6 +1014,8 @@ public class LogMessage extends DataMessage.Record {
case UPDATE:
case INDEX_UPDATE:
switch (getDbType()) {
case ORACLE:
case MYSQL:
case OB_MYSQL:
case OB_ORACLE:
prev.addAll(getKeys((int) oldColsOffset, keys));
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import static com.oceanbase.oms.logmessage.LogMessage.UTF8_ENCODING;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage;
public class DB2LogTypeHelper extends LogTypeHelper {
public static final DB2LogTypeHelper DB2_LOG_TYPE_HELPER = new DB2LogTypeHelper();
public DB2LogTypeHelper() {
super(DbTypeEnum.DB2_LUW);
}
@Override
public String correctEncoding(int typeCode, String realEncoding) {
switch (typeCode) {
case LogMessageTypeCode.LOG_MSG_TYPE_BINARY:
case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY:
return EMPTY_ENCODING_STR;
default:
return UTF8_ENCODING;
}
}
@Override
public int correctCode(int typeCode, String encoding) {
if (typeCode == LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB) {
return LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY;
} else {
return typeCode;
}
}
@Override
public void correctField(DataMessage.Record.Field f, String realEncoding) {
switch (f.type) {
case LogMessageTypeCode.LOG_MSG_TYPE_BINARY:
case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY:
f.encoding = EMPTY_ENCODING_STR;
break;
case LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB:
f.encoding = EMPTY_ENCODING_STR;
f.type = LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY;
break;
}
}
}
......@@ -71,4 +71,42 @@ public class LogMessageTypeCode {
public static final int LOG_MSG_TYPE_ORA_BINARY_FLOAT = 256;
public static final int LOG_MSG_TYPE_ORA_BINARY_DOUBLE = 257;
public static final int LOG_MSG_TYPE_UNKNOWN = LOG_MSG_TYPE_ORA_BINARY_DOUBLE + 1;
// type code for xlog
public static final int XLOG_MSG_TYPE_SHORT = 50;
public static final int XLOG_MSG_TYPE_INT = 51;
public static final int XLOG_MSG_TYPE_LONG = 52;
public static final int XLOG_MSG_TYPE_DECIMAL = 53;
public static final int XLOG_MSG_TYPE_FLOAT = 54;
public static final int XLOG_MSG_TYPE_DOUBLE = 55;
public static final int XLOG_MSG_TYPE_BOOLEAN = 56;
public static final int XLOG_MSG_TYPE_TINY = 57;
public static final int XLOG_MSG_TYPE_LONGLONG = 58;
public static final int XLOG_MSG_TYPE_CHAR = 70;
public static final int XLOG_MSG_TYPE_VARCHAR = 71;
public static final int XLOG_MSG_TYPE_BINARY = 72;
public static final int XLOG_MSG_TYPE_JSON = 73;
public static final int XLOG_MSG_TYPE_CLOB = 74;
public static final int XLOG_MSG_TYPE_ENUM = 75;
public static final int XLOG_MSG_TYPE_SET = 76;
public static final int XLOG_MSG_TYPE_UUID = 77;
public static final int XLOG_MSG_TYPE_ROWID = 78;
public static final int XLOG_MSG_TYPE_VARBINARY = 79;
public static final int XLOG_MSG_TYPE_TIMESTAMP = 80;
public static final int XLOG_MSG_TYPE_TIMESTAMP_WITH_TIME_ZONE = 81;
public static final int XLOG_MSG_TYPE_DATE = 82;
public static final int XLOG_MSG_TYPE_TIME = 83;
public static final int XLOG_MSG_TYPE_TIME_WITH_TIME_ZONE = 84;
public static final int XLOG_MSG_TYPE_INSTANT = 85;
public static final int XLOG_MSG_TYPE_YEAR = 86;
public static final int XLOG_MSG_TYPE_INTERVAL_YEAR_TO_MONTH = 87;
public static final int XLOG_MSG_TYPE_INTERVAL_DAY_TO_SECOND = 88;
public static final int XLOG_MSG_TYPE_INTERVAL_YEAR_TO_SECOND = 89;
public static final int XLOG_MSG_TYPE_XML = 90;
public static final int XLOG_MSG_TYPE_BITMAP = 91;
public static final int XLOG_MSG_TYPE_GEOMETRY_EWKT = 92;
public static final int XLOG_MSG_TYPE_GEOMETRY_EWKB = 93;
public static final int XLOG_MSG_TYPE_GEOGRAPHY_EWKT = 94;
public static final int XLOG_MSG_TYPE_GEOGRAPHY_EWKB = 95;
public static final int XLOG_MSG_TYPE_BLOB = 102;
}
......@@ -21,6 +21,14 @@ public abstract class LogTypeHelperFactory {
case OB_ORACLE:
case OB_05:
return OBLogTypeHelper.OB_LOG_TYPE_HELPER;
case MYSQL:
return MySQLLogTypeHelper.MYSQL_LOG_TYPE_HELPER;
case ORACLE:
return OracleLogTypeHelper.ORACLE_LOG_TYPE_HELPER;
case DB2_LUW:
return DB2LogTypeHelper.DB2_LOG_TYPE_HELPER;
case POSTGRESQL:
return XLogTypeHelper.XLOG_TYPE_HELPER;
default:
throw new IllegalArgumentException("Unsupported dbType " + dbType);
}
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import static com.oceanbase.oms.logmessage.DataMessage.Record.UTF8MB4_ENCODING;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage;
import org.apache.commons.lang3.StringUtils;
public class MySQLLogTypeHelper extends LogTypeHelper {
public static final MySQLLogTypeHelper MYSQL_LOG_TYPE_HELPER = new MySQLLogTypeHelper();
public MySQLLogTypeHelper() {
super(DbTypeEnum.MYSQL);
}
@Override
public String correctEncoding(int type, String realEncoding) {
switch (type) {
case LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING:
case LogMessageTypeCode.LOG_MSG_TYPE_STRING:
return realEncoding.isEmpty() ? BINARY_STR : realEncoding;
case LogMessageTypeCode.LOG_MSG_TYPE_JSON:
return UTF8MB4_ENCODING;
default:
return realEncoding;
}
}
@Override
public int correctCode(int typeCode, String encoding) {
switch (typeCode) {
case LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING:
return StringUtils.equals(encoding, BINARY_STR)
? LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY
: LogMessageTypeCode.LOG_MSG_TYPE_VARCHAR;
case LogMessageTypeCode.LOG_MSG_TYPE_STRING:
return StringUtils.equals(encoding, BINARY_STR)
? LogMessageTypeCode.LOG_MSG_TYPE_BINARY
: LogMessageTypeCode.LOG_MSG_TYPE_STRING;
case LogMessageTypeCode.LOG_MSG_TYPE_LONG_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_MEDIUM_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
case LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB:
return StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, BINARY_STR)
? LogMessageTypeCode.LOG_MSG_TYPE_BLOB
: LogMessageTypeCode.LOG_MSG_TYPE_TEXT;
default:
return typeCode;
}
}
@Override
public void correctField(DataMessage.Record.Field field, String enc) {
if (enc.isEmpty()) {
if (field.type == LogMessageTypeCode.LOG_MSG_TYPE_STRING) {
field.encoding = BINARY_STR;
field.type = LogMessageTypeCode.LOG_MSG_TYPE_BINARY;
} else if (field.type == LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING) {
field.encoding = BINARY_STR;
field.type = LogMessageTypeCode.LOG_MSG_TYPE_VARCHAR;
} else if (field.type == LogMessageTypeCode.LOG_MSG_TYPE_JSON) {
field.encoding = UTF8MB4_ENCODING;
}
} else {
if (field.type >= LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB
&& field.type <= LogMessageTypeCode.LOG_MSG_TYPE_BLOB) {
field.type = LogMessageTypeCode.LOG_MSG_TYPE_TEXT;
}
field.encoding = enc;
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage;
public class OracleLogTypeHelper extends LogTypeHelper {
public static final OracleLogTypeHelper ORACLE_LOG_TYPE_HELPER = new OracleLogTypeHelper();
public OracleLogTypeHelper() {
super(DbTypeEnum.ORACLE);
}
@Override
public String correctEncoding(int typeCode, String realEncoding) {
switch (typeCode) {
case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
return EMPTY_ENCODING_STR;
default:
return realEncoding;
}
}
@Override
public int correctCode(int typeCode, String encoding) {
return typeCode;
}
@Override
public void correctField(DataMessage.Record.Field f, String realEncoding) {
switch (f.type) {
case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
f.encoding = EMPTY_ENCODING_STR;
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage;
public class XLogTypeHelper extends LogTypeHelper {
public static final XLogTypeHelper XLOG_TYPE_HELPER = new XLogTypeHelper();
public XLogTypeHelper() {
super(DbTypeEnum.POSTGRESQL);
}
@Override
public String correctEncoding(int typeCode, String realEncoding) {
return realEncoding;
}
@Override
public int correctCode(int typeCode, String encoding) {
return typeCode;
}
@Override
public void correctField(DataMessage.Record.Field f, String realEncoding) {}
}
......@@ -16,14 +16,14 @@ See the Mulan PSL v2 for more details.
<parent>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.8-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>logproxy-client</artifactId>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>
<description>The Client for OceanBase Log Proxy.</description>
<description>The Client for OceanBase LogProxy.</description>
<dependencies>
<!-- common dependency -->
......
......@@ -54,7 +54,8 @@ public class LogProxyClient {
String clientId = clientConf.getClientId();
ConnectionParams connectionParams =
new ConnectionParams(config.getLogType(), clientId, host, port, config);
connectionParams.setProtocolVersion(ProtocolVersion.V2);
connectionParams.setProtocolVersion(
ProtocolVersion.codeOf(clientConf.getProtocolVersion()));
this.stream = new ClientStream(clientConf, connectionParams);
}
......
......@@ -13,6 +13,7 @@ package com.oceanbase.clogproxy.client.config;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.common.config.SharedConf;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import io.netty.handler.ssl.SslContext;
import java.io.Serializable;
......@@ -22,7 +23,7 @@ public class ClientConf extends SharedConf implements Serializable {
private static final long serialVersionUID = 1L;
/** Client version. */
public static final String VERSION = "1.0.7";
public static final String VERSION = "1.1.0";
/** Queue size for storing records received from log proxy. */
private final int transferQueueSize;
......@@ -48,6 +49,8 @@ public class ClientConf extends SharedConf implements Serializable {
/** Maximum number of reads, after which data will be discarded. */
private final int nettyDiscardAfterReads;
private final int protocolVersion;
/** User defined client id. */
private final String clientId;
......@@ -68,6 +71,7 @@ public class ClientConf extends SharedConf implements Serializable {
int maxReconnectTimes,
int idleTimeoutS,
int nettyDiscardAfterReads,
int protocolVersion,
String clientId,
boolean ignoreUnknownRecordType,
SslContext sslContext) {
......@@ -78,6 +82,7 @@ public class ClientConf extends SharedConf implements Serializable {
this.maxReconnectTimes = maxReconnectTimes;
this.idleTimeoutS = idleTimeoutS;
this.nettyDiscardAfterReads = nettyDiscardAfterReads;
this.protocolVersion = protocolVersion;
this.clientId = clientId;
this.ignoreUnknownRecordType = ignoreUnknownRecordType;
this.sslContext = sslContext;
......@@ -111,6 +116,10 @@ public class ClientConf extends SharedConf implements Serializable {
return nettyDiscardAfterReads;
}
public int getProtocolVersion() {
return protocolVersion;
}
public String getClientId() {
return clientId;
}
......@@ -136,6 +145,7 @@ public class ClientConf extends SharedConf implements Serializable {
private int maxReconnectTimes = -1;
private int idleTimeoutS = 15;
private int nettyDiscardAfterReads = 16;
private int protocolVersion = ProtocolVersion.V2.code();
private String clientId = ClientIdGenerator.generate();
private boolean ignoreUnknownRecordType = false;
private SslContext sslContext = null;
......@@ -175,6 +185,11 @@ public class ClientConf extends SharedConf implements Serializable {
return this;
}
public Builder protocolVersion(int protocolVersion) {
this.protocolVersion = protocolVersion;
return this;
}
public Builder clientId(String clientId) {
this.clientId = clientId;
return this;
......@@ -199,6 +214,7 @@ public class ClientConf extends SharedConf implements Serializable {
maxReconnectTimes,
idleTimeoutS,
nettyDiscardAfterReads,
protocolVersion,
clientId,
ignoreUnknownRecordType,
sslContext);
......
......@@ -29,6 +29,9 @@ public class ObReaderConfig extends AbstractConnectionConfig {
private static final Logger logger = LoggerFactory.getLogger(ObReaderConfig.class);
/** Cluster Id. */
private final ConfigItem<String> clusterId = new ConfigItem<>("cluster_id", "");
/** Cluster config url. */
private final ConfigItem<String> clusterUrl = new ConfigItem<>("cluster_url", "");
......@@ -41,6 +44,10 @@ public class ObReaderConfig extends AbstractConnectionConfig {
/** Cluster password. */
private final ConfigItem<String> clusterPassword = new ConfigItem<>("cluster_password", "");
private final ConfigItem<String> sysUser = new ConfigItem<>("sys_user", "");
private final ConfigItem<String> sysPassword = new ConfigItem<>("sys_password", "");
/** Table whitelist. */
private final ConfigItem<String> tableWhitelist = new ConfigItem<>("tb_white_list", "*.*.*");
......@@ -50,6 +57,10 @@ public class ObReaderConfig extends AbstractConnectionConfig {
/** Start timestamp. */
private final ConfigItem<Long> startTimestamp = new ConfigItem<>("first_start_timestamp", 0L);
/** Start timestamp in microsecond. */
private final ConfigItem<Long> startTimestampUs =
new ConfigItem<>("first_start_timestamp_us", 0L);
/** Timezone offset. */
private final ConfigItem<String> timezone = new ConfigItem<>("timezone", "+8:00");
......@@ -101,6 +112,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (sysUser.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (sysPassword.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (clusterPassword.key.equals(entry.getKey()) && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value));
}
......@@ -123,6 +140,12 @@ public class ObReaderConfig extends AbstractConnectionConfig {
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (sysUser.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (sysPassword.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue;
}
if (encryptPassword
&& clusterPassword.key.equals(entry.getKey())
&& SharedConf.AUTH_PASSWORD_HASH) {
......@@ -153,21 +176,37 @@ public class ObReaderConfig extends AbstractConnectionConfig {
return (StringUtils.isNotEmpty(clusterUrl.val))
? ("cluster_url=" + clusterUrl)
: ("rootserver_list=" + rsList)
+ ", cluster_id="
+ clusterId
+ ", cluster_user="
+ clusterUser
+ ", cluster_password=******, "
+ ", sys_user="
+ sysUser
+ ", sys_password=******, "
+ "tb_white_list="
+ tableWhitelist
+ ", tb_black_list="
+ tableBlacklist
+ ", start_timestamp="
+ startTimestamp
+ ", start_timestamp_us="
+ startTimestampUs
+ ", timezone="
+ timezone
+ ", working_mode="
+ workingMode;
}
/**
* Set cluster id.
*
* @param clusterId Cluster Id.
*/
public void setClusterId(String clusterId) {
this.clusterId.set(clusterId);
}
/**
* Set cluster config url.
*
......@@ -204,6 +243,24 @@ public class ObReaderConfig extends AbstractConnectionConfig {
this.clusterPassword.set(clusterPassword);
}
/**
* Set cluster sys username
*
* @param sysUsername Cluster sys username.
*/
public void setSysUsername(String sysUsername) {
this.sysUser.set(sysUsername);
}
/**
* Set cluster sys password
*
* @param sysPassword Cluster sys password.
*/
public void setSysPassword(String sysPassword) {
this.sysPassword.set(sysPassword);
}
/**
* Set table whitelist. It is composed of three dimensions: tenant, db and table. Pattern
* matching is provided by `fnmatch`, so asterisk means any, for example: "A.foo.bar",
......@@ -233,6 +290,10 @@ public class ObReaderConfig extends AbstractConnectionConfig {
this.startTimestamp.set(startTimestamp);
}
public void setStartTimestampUs(Long startTimestampUs) {
this.startTimestampUs.set(startTimestampUs);
}
/**
* Set the timezone which is used to convert timestamp column.
*
......
......@@ -64,7 +64,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
private BlockingQueue<StreamContext.TransferPacket> recordQueue;
/** Handshake type enumeration. */
enum HandshakeStateV1 {
enum HandshakeState {
/** State of parsing the packet header. */
PB_HEAD,
/** State of handling handshake response. */
......@@ -78,7 +78,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
/** Handshake state. */
private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD;
private HandshakeState state = HandshakeState.PB_HEAD;
/** A {@link Cumulator} instance. */
private final Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR;
......@@ -107,12 +107,18 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
/** A {@link LZ4FastDecompressor} instance. */
LZ4FastDecompressor fastDecompressor = factory.fastDecompressor();
ClientHandlerV01 clientHandlerV01;
/** Constructor with empty arguments. */
public ClientHandler() {}
/** Reset {@link #state} to {@link HandshakeStateV1#PB_HEAD}. */
/** Reset {@link #state} to {@link HandshakeState#PB_HEAD}. */
protected void resetState() {
state = HandshakeStateV1.PB_HEAD;
if (params.getProtocolVersion().code() < ProtocolVersion.V2.code()) {
clientHandlerV01.resetState();
} else {
state = HandshakeState.PB_HEAD;
}
}
@Override
......@@ -136,6 +142,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
while (poolFlag && buffer.isReadable() && !dataNotEnough) {
if (params.getProtocolVersion().code() < ProtocolVersion.V2.code()) {
dataNotEnough = clientHandlerV01.channelRead(poolFlag, buffer, dataNotEnough);
continue;
}
switch (state) {
case PB_HEAD:
handleHeader();
......@@ -175,13 +186,13 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
HeaderType headerType = HeaderType.codeOf(type);
if (headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE;
state = HandshakeState.CLIENT_HANDSHAKE_RESPONSE;
} else if (headerType == HeaderType.ERROR_RESPONSE) {
state = HandshakeStateV1.ERROR_RESPONSE;
state = HandshakeState.ERROR_RESPONSE;
} else if (headerType == HeaderType.DATA_CLIENT) {
state = HandshakeStateV1.RECORD;
state = HandshakeState.RECORD;
} else if (headerType == HeaderType.STATUS) {
state = HandshakeStateV1.STATUS;
state = HandshakeState.STATUS;
}
} else {
dataNotEnough = true;
......@@ -199,7 +210,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
"Connected to LogProxyServer, ip:{}, version:{}",
response.getIp(),
response.getVersion());
state = HandshakeStateV1.PB_HEAD;
state = HandshakeState.PB_HEAD;
} else {
dataNotEnough = true;
}
......@@ -227,7 +238,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
buffer.readBytes(bytes);
LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom(bytes);
logger.debug("Server status: {}", response.toString());
state = HandshakeStateV1.PB_HEAD;
state = HandshakeState.PB_HEAD;
} else {
dataNotEnough = true;
}
......@@ -236,8 +247,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
/** Handle record data response. */
private void handleRecord() {
if (buffer.readableBytes() >= dataLength) {
parseDataNew();
state = HandshakeStateV1.PB_HEAD;
parseData();
state = HandshakeState.PB_HEAD;
} else {
dataNotEnough = true;
}
......@@ -251,7 +262,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
* @param length Data length.
*/
private void checkHeader(int version, int type, int length) {
if (ProtocolVersion.codeOf(version) == null) {
if (ProtocolVersion.codeOf(version) == null && version != ProtocolVersion.V2.code()) {
logger.error("Unsupported protocol version: {}", version);
throw new LogProxyClientException(
ErrorCode.E_PROTOCOL, "Unsupported protocol version: " + version);
......@@ -268,7 +279,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
/** Do parse record data from buffer. It will firstly decompress the raw data if necessary. */
private void parseDataNew() {
private void parseData() {
try {
byte[] buff = new byte[dataLength];
buffer.readBytes(buff, 0, dataLength);
......@@ -342,12 +353,6 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
}
}
try {
stream.setCheckpointString(logMessage.getSafeTimestamp());
} catch (IllegalArgumentException e) {
logger.error("Failed to update checkpoint for log message: " + logMessage, e);
}
offset += (8 + dataLength);
}
}
......@@ -375,6 +380,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
config = context.config();
params = context.params();
recordQueue = context.recordQueue();
clientHandlerV01 = new ClientHandlerV01(config, params, recordQueue, fastDecompressor);
logger.info(
"ClientId: {} connecting LogProxy: {}",
......@@ -386,9 +392,14 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* Generate the request body for protocol v2.
*
* @param version version of protocol
* @return Request body.
*/
public ByteBuf generateConnectRequestV2() {
public ByteBuf generateConnectRequest(ProtocolVersion version) {
if (version.code() < ProtocolVersion.V2.code()) {
return clientHandlerV01.generateConnectRequest();
}
LogProxyProto.ClientHandshakeRequest handShake =
LogProxyProto.ClientHandshakeRequest.newBuilder()
.setLogType(params.getLogType().code())
......@@ -411,50 +422,6 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
return byteBuf;
}
/**
* Generate the request body.
*
* @param version Protocol version.
* @return Request body.
*/
public ByteBuf generateConnectRequest(ProtocolVersion version) {
if (version == ProtocolVersion.V2) {
return generateConnectRequestV2();
}
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
byteBuf.writeBytes(MAGIC_STRING);
// header
byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
byteBuf.writeShort(ProtocolVersion.V0.code());
byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
byteBuf.writeByte(params.getLogType().code());
// body
int length = CLIENT_IP.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(CLIENT_IP.getBytes());
length = params.getClientId().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getClientId().getBytes());
length = ClientConf.VERSION.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(ClientConf.VERSION.getBytes());
length = params.getConfigurationString().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getConfigurationString().getBytes());
return byteBuf;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
poolFlag = false;
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.packet.protocol.V1Proto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import java.util.concurrent.BlockingQueue;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/* Compatitable for legacy V0 and V1 only, however you should not use */
public class ClientHandlerV01 {
private static final Logger logger = LoggerFactory.getLogger(ClientHandlerV01.class);
private static final byte[] MAGIC_STRING = new byte[] {'x', 'i', '5', '3', 'g', ']', 'q'};
private static final String CLIENT_IP = NetworkUtil.getLocalIp();
private final ClientConf config;
private ConnectionParams params;
private final BlockingQueue<StreamContext.TransferPacket> recordQueue;
private final LZ4FastDecompressor fastDecompressor;
enum HandshakeState {
/** State of protocol version */
PROTOCOL_VERSION,
/** State of header code */
HEADER_CODE,
/** State of handshake response code */
RESPONSE_CODE,
/** State of error message */
MESSAGE,
/** State of handsake response version */
LOGPROXY_IP,
/** State of handsake response version */
LOGPROXY_VERSION,
/** State of data record */
STREAM
}
private HandshakeState state = HandshakeState.PROTOCOL_VERSION;
private String logProxyIp;
public ClientHandlerV01(
ClientConf config,
ConnectionParams params,
BlockingQueue<StreamContext.TransferPacket> recordQueue,
LZ4FastDecompressor fastDecompressor) {
this.config = config;
this.params = params;
this.recordQueue = recordQueue;
this.fastDecompressor = fastDecompressor;
}
public void setParams(ConnectionParams params) {
this.params = params;
}
public boolean channelRead(boolean poolflag, ByteBuf buffer, boolean inDataNotEnough)
throws Exception {
boolean dataNotEnough = inDataNotEnough;
switch (state) {
case PROTOCOL_VERSION:
if (buffer.readableBytes() >= Short.BYTES) {
int code = buffer.readShort();
ProtocolVersion version = ProtocolVersion.codeOf(code);
if (version == null) {
resetState();
logger.error("unsupport protocol version: {}", code);
throw new LogProxyClientException(
ErrorCode.E_PROTOCOL, "unsupport protocol version: " + code);
}
state = HandshakeState.HEADER_CODE;
} else {
dataNotEnough = true;
}
break;
case HEADER_CODE:
if (buffer.readableBytes() >= Integer.BYTES) {
int code = buffer.readInt();
if ((code != HeaderType.HANDSHAKE_RESPONSE_CLIENT.code())
&& (code != HeaderType.ERROR_RESPONSE.code())) {
resetState();
logger.error(
"unexpected Header Type, expected: {}({}), income: {}",
HeaderType.HANDSHAKE_RESPONSE_CLIENT.code(),
HeaderType.HANDSHAKE_RESPONSE_CLIENT.name(),
code);
throw new LogProxyClientException(
ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + code);
}
state = HandshakeState.RESPONSE_CODE;
} else {
dataNotEnough = true;
}
break;
case RESPONSE_CODE:
if (buffer.readableBytes() >= 4) {
int code = buffer.readInt();
if (code != 0) {
state = HandshakeState.MESSAGE;
} else {
state = HandshakeState.LOGPROXY_IP;
}
} else {
dataNotEnough = true;
}
break;
case MESSAGE:
String message = decodeStringInt(buffer);
if (message != null) {
resetState();
logger.error("LogProxy refused handshake request: {}", message);
throw new LogProxyClientException(
ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + message);
} else {
dataNotEnough = true;
}
break;
case LOGPROXY_IP:
logProxyIp = decodeStringByte(buffer);
if (logProxyIp != null) {
state = HandshakeState.LOGPROXY_VERSION;
} else {
dataNotEnough = true;
}
break;
case LOGPROXY_VERSION:
String logProxyVersion = decodeStringByte(buffer);
if (logProxyVersion != null) {
logger.info("Connected to LogProxy: {}, {}", logProxyIp, logProxyVersion);
state = HandshakeState.STREAM;
} else {
dataNotEnough = true;
}
break;
case STREAM:
parseData(poolflag, buffer);
dataNotEnough = true;
break;
}
return dataNotEnough;
}
private static String decodeStringInt(ByteBuf buffer) {
if (buffer.readableBytes() < Integer.BYTES) {
return null;
}
buffer.markReaderIndex();
int length = buffer.readInt();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
String str = new String(bytes);
if (str.isEmpty()) {
throw new RuntimeException("decode string is null or empty");
}
return str;
}
private static String decodeStringByte(ByteBuf buffer) {
if (buffer.readableBytes() < Byte.BYTES) {
return null;
}
buffer.markReaderIndex();
short length = buffer.readByte();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
String str = new String(bytes);
if (str.isEmpty()) {
throw new RuntimeException("decode string is null or empty");
}
return str;
}
private void parseData(boolean poolflag, ByteBuf buffer) throws LogProxyClientException {
// TODO... parse data exception handle
while (poolflag && buffer.readableBytes() >= 2) {
buffer.markReaderIndex();
int code = buffer.readShort();
ProtocolVersion version = ProtocolVersion.codeOf(code);
if (version == null) {
resetState();
logger.error("unsupport protocol version: {}", code);
throw new LogProxyClientException(
ErrorCode.E_PROTOCOL, "unsupport protocol version: " + code);
}
boolean go;
switch (version) {
case V1:
go = parseDataV1(buffer);
break;
case V0:
default:
go = parseDataV0(buffer);
}
if (!go) {
break;
}
}
}
private boolean parseDataV0(ByteBuf buffer) {
if (buffer.readableBytes() < 8) {
buffer.resetReaderIndex();
return false;
}
int code = buffer.readInt();
if (code != HeaderType.DATA_CLIENT.code()) {
resetState();
logger.error(
"unexpected Header Type, expected: {}({}), income: {}",
HeaderType.DATA_CLIENT.code(),
HeaderType.DATA_CLIENT.name(),
code);
throw new LogProxyClientException(
ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + code);
}
int dataLength = buffer.readInt();
if (buffer.readableBytes() < dataLength) {
buffer.resetReaderIndex();
return false;
}
code = buffer.readByte();
if (CompressType.codeOf(code) == null) {
throw new LogProxyClientException(
ErrorCode.E_COMPRESS_TYPE, "unexpected Compress Type: " + code);
}
int totalLength = buffer.readInt();
int rawDataLength = buffer.readInt();
byte[] rawData = new byte[rawDataLength];
buffer.readBytes(rawData);
if (code == CompressType.LZ4.code()) {
byte[] bytes = new byte[totalLength];
int decompress = fastDecompressor.decompress(rawData, 0, bytes, 0, totalLength);
if (decompress != rawDataLength) {
throw new LogProxyClientException(
ErrorCode.E_LEN,
"decompressed length ["
+ decompress
+ "] is not expected ["
+ rawDataLength
+ "]");
}
parseRecord(bytes);
} else {
parseRecord(rawData);
}
// complete
return true;
}
/**
* Do parse record data from an array of bytes to a {@link LogMessage} and add it into {@link
* #recordQueue}.
*
* @param bytes An array of bytes of record data.
* @throws LogProxyClientException If exception occurs.
*/
private void parseRecord(byte[] bytes) throws LogProxyClientException {
int offset = 0;
while (offset < bytes.length) {
int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4);
dataLength = ByteBufUtil.swapInt(dataLength);
/*
* We must copy a byte array and call parse after then,
* or got a !!!RIDICULOUS EXCEPTION!!!,
* if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do.
*/
LogMessage logMessage = new LogMessage(false);
byte[] data = new byte[dataLength];
System.arraycopy(bytes, offset + 8, data, 0, data.length);
try {
logMessage.parse(data);
} catch (Exception e) {
if (config.isIgnoreUnknownRecordType()) {
// unsupported type, ignore
logger.debug("Unsupported record type: {}", logMessage);
offset += (8 + dataLength);
continue;
}
throw new LogProxyClientException(ErrorCode.E_PARSE, e);
}
if (logger.isTraceEnabled()) {
logger.trace("Log message: {}", logMessage);
}
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(logMessage));
break;
} catch (InterruptedException e) {
// do nothing
}
}
offset += (8 + dataLength);
}
}
private boolean parseDataV1(ByteBuf buffer) {
if (buffer.readableBytes() < 4) {
buffer.resetReaderIndex();
return false;
}
int length = buffer.readInt();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return false;
}
byte[] buff = new byte[length];
buffer.readBytes(buff, 0, length);
try {
V1Proto.PbPacket packet = V1Proto.PbPacket.parseFrom(buff);
if (packet.getCompressType() != CompressType.NONE.code()) {
// TODO..
throw new LogProxyClientException(
ErrorCode.E_COMPRESS_TYPE,
"Unsupport Compress Type: " + packet.getCompressType());
}
if (packet.getType() != HeaderType.STATUS.code()) {
// TODO.. header type dispatcher
throw new LogProxyClientException(
ErrorCode.E_HEADER_TYPE, "Unsupport Header Type: " + packet.getType());
}
LogProxyProto.RuntimeStatus status =
LogProxyProto.RuntimeStatus.parseFrom(packet.getPayload());
if (status == null) {
throw new LogProxyClientException(
ErrorCode.E_PARSE, "Failed to read PB packet, empty Runtime Status");
}
while (true) {
try {
recordQueue.put(new StreamContext.TransferPacket(status));
break;
} catch (InterruptedException e) {
// do nothing
}
}
} catch (InvalidProtocolBufferException e) {
throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", e);
}
return true;
}
public void resetState() {
state = HandshakeState.PROTOCOL_VERSION;
}
public ByteBuf generateConnectRequest() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
byteBuf.writeBytes(MAGIC_STRING);
// header
byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
byteBuf.writeShort(ProtocolVersion.V0.code());
byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
byteBuf.writeByte(params.getLogType().code());
// body
int length = CLIENT_IP.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(CLIENT_IP.getBytes());
length = params.getClientId().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getClientId().getBytes());
length = ClientConf.VERSION.length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(ClientConf.VERSION.getBytes());
length = params.getConfigurationString().length();
byteBuf.capacity(byteBuf.capacity() + length + 4);
byteBuf.writeInt(length);
byteBuf.writeBytes(params.getConfigurationString().getBytes());
return byteBuf;
}
}
......@@ -192,6 +192,20 @@ public class ClientStream {
"Unsupported Packet Type: "
+ packet.getType());
}
try {
setCheckpointString(
packet.getRecord().getSafeTimestamp());
} catch (IllegalArgumentException e) {
logger.error(
"Failed to update checkpoint for log message: "
+ packet.getRecord(),
e);
throw new LogProxyClientException(
ErrorCode.E_INNER,
"Failed to update checkpoint");
}
} catch (LogProxyClientException e) {
triggerException(e);
break;
......@@ -295,7 +309,7 @@ public class ClientStream {
*
* @param checkpointString Checkpoint string.
*/
public void setCheckpointString(String checkpointString) {
private void setCheckpointString(String checkpointString) {
long timestamp = Long.parseLong(checkpointString);
if (timestamp <= 0) {
throw new IllegalArgumentException(
......
......@@ -24,11 +24,11 @@ public class StreamContext {
public static class TransferPacket {
/** Packet header type. */
private final HeaderType type;
protected final HeaderType type;
/** Log message record. */
private LogMessage record;
protected LogMessage record;
/** Log proxy runtime status. */
private RuntimeStatus status;
protected RuntimeStatus status;
/**
* Constructor with a {@link LogMessage}.
......
......@@ -30,6 +30,7 @@ public class ObReaderConfigTest {
config.setUsername("root@test_tenant");
config.setPassword("password");
config.setStartTimestamp(0L);
config.setStartTimestampUs(0L);
config.setTableWhiteList("test_tenant.test.*");
config.setTableBlackList("|");
config.setTimezone("+8:00");
......@@ -53,7 +54,7 @@ public class ObReaderConfigTest {
Assert.assertTrue(object instanceof ObReaderConfig);
Map<String, String> configMap = ((ObReaderConfig) object).generateConfigurationMap(false);
Assert.assertEquals(configMap.size(), 8);
Assert.assertEquals(configMap.size(), 10);
Assert.assertEquals(configMap, config.generateConfigurationMap(false));
}
}
......@@ -10,12 +10,13 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logclient</artifactId>
<version>1.0.8-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.groupId}:${project.artifactId}</name>
......@@ -71,7 +72,7 @@ See the Mulan PSL v2 for more details.
<commons-lang.version>3.12.0</commons-lang.version>
<commons-codec.version>1.15</commons-codec.version>
<netty.version>4.1.68.Final</netty.version>
<netty.version>4.1.77.Final</netty.version>
<protobuf.version>3.19.6</protobuf.version>
<lz4.version>1.8.0</lz4.version>
<slf4j.version>1.7.32</slf4j.version>
......@@ -210,7 +211,7 @@ See the Mulan PSL v2 for more details.
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
<licenseFamilyCategory>MulanPSL2</licenseFamilyCategory>
<licenseFamilyName>Mulan Public License,Version 2</licenseFamilyName>
<notes />
<notes/>
<patterns>
<pattern>Mulan PSL v2</pattern>
</patterns>
......@@ -245,13 +246,14 @@ See the Mulan PSL v2 for more details.
<excludes>
<!-- Generated files. -->
<exclude>**/LogProxyProto.java</exclude>
<exclude>**/V1Proto.java</exclude>
</excludes>
<googleJavaFormat>
<version>1.7</version>
<style>AOSP</style>
</googleJavaFormat>
<importOrder />
<removeUnusedImports />
<importOrder/>
<removeUnusedImports/>
</java>
</configuration>
<executions>
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
syntax = "proto3";
package oms;
option java_package = "com.oceanbase.clogproxy.common.packet.protocol";
option java_outer_classname = "V1Proto";
message PbPacket {
int32 type = 1; // HeaderType
int32 compress_type = 2; // CompressType
// resevered for other options
bytes payload = 100;
}
message ClientHandShake {
int32 log_type = 1; // LogType
string client_ip = 2;
string client_id = 3;
string client_version = 4;
bool enable_monitor = 5;
string configuration = 6;
}
message RuntimeStatus {
string ip = 1;
int32 port = 2;
int32 stream_count = 3;
int32 worker_count = 4;
}
......@@ -13,4 +13,5 @@
cd "$(dirname "$0")/.." || exit
protoc --java_out=common/src/main/java common/src/main/java/com/oceanbase/clogproxy/common/packet/protocol/logproxy.proto
protoc --java_out=common/src/main/java proto/logproxy.proto
protoc --java_out=common/src/main/java proto/v1.proto
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册