未验证 提交 be243566 编写于 作者: F Fankux 提交者: GitHub

upgrade and make logmessage as a single package (#11)

* upgrade and make logmessage as a single package

* omit OB unrelated stuffs

* omit duplicant set
上级 a20b0a67
......@@ -33,6 +33,12 @@
<artifactId>guava</artifactId>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- net -->
<dependency>
<groupId>io.netty</groupId>
......
......@@ -8,10 +8,14 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
package com.oceanbase.oms.logmessage;
import java.io.UnsupportedEncodingException;
/**
* ByteString store an array of bytes and take over all related transfers,
* such as judge if it should be null, empty or in some an encoding.
*/
public class ByteString {
private int len;
......
......@@ -8,7 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
package com.oceanbase.oms.logmessage;
public class Checkpoint {
......@@ -60,8 +60,8 @@ public class Checkpoint {
public void setPosition(final String position) {
String cp = position;
if (cp.contains("@mysql-bin.")) {
int m = cp.indexOf("@");
int p = cp.indexOf(".");
int m = cp.indexOf('@');
int p = cp.indexOf('.');
String cp1 = cp.substring(0, m);
String cp2 = cp.substring(p + 1);
long lcp2 = Long.parseLong(cp2);
......@@ -105,7 +105,7 @@ public class Checkpoint {
builder.append(db).append(DELIMITER).append(dbport).append(DELIMITER);
}
if (cp1 != null) {
if (cp1 != null && cp2 != null) {
builder.append(cp1).append(DELIMITER).append(cp2).append(DELIMITER);
} else {
builder.append(DELIMITER).append(DELIMITER);
......
......@@ -8,15 +8,15 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
package com.oceanbase.oms.logmessage;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.listener.FieldParseListener;
import com.oceanbase.clogproxy.client.util.StringUtils;
import com.oceanbase.oms.logmessage.enums.DBType;
import com.oceanbase.oms.logmessage.typehelper.LogTypeHelper;
import com.oceanbase.oms.logmessage.typehelper.OBLogTypeHelper;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -26,14 +26,17 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Message contains database updating data.
*/
public class DataMessage extends Message {
/**
* Record contains data of one record.
*
* @author erbai.qzc
*/
public static class Record {
public static LogTypeHelper logTypeHelper = OBLogTypeHelper.OB_LOG_TYPE_HELPER;
public static final String UTF8MB4_ENCODING = "utf8mb4";
public static final String TRACEID_STRING = "traceid";
......@@ -97,14 +100,12 @@ public class DataMessage extends Message {
return isConnectionFirstRecord;
}
public Long getLogSeqNum() throws UnsupportedEncodingException {
public Long getLogSeqNum() {
return 0L;
}
/**
* Field contains data of one field
*
* @author erbai.qzc
*/
public static class Field {
......@@ -170,6 +171,10 @@ public class DataMessage extends Message {
return primaryKey;
}
public final int getRawType() {
return type;
}
public void setPrimary(boolean primary) {
primaryKey = primary;
}
......@@ -226,6 +231,7 @@ public class DataMessage extends Message {
MYSQL_TYPES[206] = Type.FLOAT;
MYSQL_TYPES[207] = Type.STRING;
MYSQL_TYPES[208] = Type.STRING;
MYSQL_TYPES[209] = Type.STRING;
MYSQL_TYPES[255] = Type.GEOMETRY;
MYSQL_TYPES[254] = Type.STRING;
......@@ -254,13 +260,11 @@ public class DataMessage extends Message {
* @return the enumerated type of the field.
*/
public final Type getType() {
if ((type > 16 && type < 199) || (type > 208 && type < 245)) {
if ((type > 16 && type < 199) || (type > 209 && type < 245)) {
return Type.UNKOWN;
} else {
return MYSQL_TYPES[type];
}
}
public boolean isChangeValue() {
......@@ -395,7 +399,7 @@ public class DataMessage extends Message {
if (2 != kv.length) {
//Bug fix:trace id may contains ':'. Split by ':' and drop tuple contains more than 2 content lead to the miss of trace id.
if (kv.length > 2
&& org.apache.commons.lang3.StringUtils.equals(kv[0], TRACEID_STRING)) {
&& StringUtils.equals(kv[0], TRACEID_STRING)) {
kv[1] = line.substring(line.indexOf(':') + 1);
} else {
continue;
......@@ -461,45 +465,15 @@ public class DataMessage extends Message {
for (int i = 0; i < encodings.length; i++) {
String enc = encodings[i];
Field field = fields.get(i);
if (enc.isEmpty()) {
if (field.getType() == Field.Type.STRING) {
field.encoding = "binary";
} else if (field.getType() == Field.Type.JSON) {
field.encoding = UTF8MB4_ENCODING;
} else {
field.encoding = "";
}
} else {
if (field.getType() == Field.Type.BLOB) {
field.type = 15;
}
field.encoding = enc;
}
logTypeHelper.correctField(field, enc);
}
} else if (encodings.length * 2 == fields.size()) {
for (int i = 0; i < encodings.length; i++) {
String enc = encodings[i];
Field field1 = fields.get(i * 2);
Field field2 = fields.get(i * 2 + 1);
if (enc.isEmpty()) {
if (field1.getType() == Field.Type.STRING) {
field1.encoding = "binary";
field2.encoding = "binary";
} else if (field1.getType() == Field.Type.JSON) {
field1.encoding = UTF8MB4_ENCODING;
field2.encoding = UTF8MB4_ENCODING;
} else {
field1.encoding = "";
field2.encoding = "";
}
} else {
if (field1.getType() == Field.Type.BLOB) {
field1.type = 15;
field2.type = 15;
}
field1.encoding = enc;
field2.encoding = enc;
}
logTypeHelper.correctField(field1, enc);
logTypeHelper.correctField(field2, enc);
}
}
// ignore if mistake
......@@ -508,16 +482,38 @@ public class DataMessage extends Message {
/* Record type. */
public enum Type {
INSERT(0), UPDATE(1), DELETE(2), REPLACE(3), HEARTBEAT(4), CONSISTENCY_TEST(5), BEGIN(6), COMMIT(
7), DDL(
8), ROLLBACK(
9), DML(
10), UNKNOWN(
11), INDEX_INSERT(
128), INDEX_UPDATE(
129), INDEX_DELETE(
130), INDEX_REPLACE(
131);
// INSERT
INSERT(0),
// UPDATE
UPDATE(1),
// DELETE
DELETE(2),
// REPLACE
REPLACE(3),
// HEARTBEAT
HEARTBEAT(4),
// CONSISTENCY_TEST
CONSISTENCY_TEST(5),
// BEGIN
BEGIN(6),
// COMMIT
COMMIT(7),
// DDL
DDL(8),
// ROLLBACK
ROLLBACK(9),
// DML
DML(10),
// UNKNOWN
UNKNOWN(11),
// INDEX_INSERT
INDEX_INSERT(128),
// INDEX_UPDATE
INDEX_UPDATE(129),
// INDEX_DELETE
INDEX_DELETE(130),
// INDEX_REPLACE
INDEX_REPLACE(131);
final int _value;
......@@ -638,6 +634,9 @@ public class DataMessage extends Message {
public DBType getDbType() {
String type = getAttribute("source_type");
if (StringUtils.isEmpty(type)) {
return DBType.UNKNOWN;
}
if ("mysql".equalsIgnoreCase(type)) {
return DBType.MYSQL;
} else if ("oceanbase".equalsIgnoreCase(type)) {
......@@ -853,6 +852,24 @@ public class DataMessage extends Message {
return records;
}
/**
* Construct the message from DataInputStream.
*
* @param reader is the DataInputStream.
* @throws IOException
*/
public void mergeFrom(final DataInputStream reader, String regionId) throws IOException {
do {
Record record = new Record();
record.mergeFrom(reader);
record.setRegionId(regionId);
if (record.isEnding()) {
break;
}
records.add(record);
} while (true);
}
@Override
public void clear() {
super.clear();
......
......@@ -8,9 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.message.DataMessage;
package com.oceanbase.oms.logmessage;
/**
* This interface defined a kind of listener for field parsing.
......
......@@ -8,13 +8,11 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
package com.oceanbase.oms.logmessage;
import com.oceanbase.clogproxy.client.constants.DataType;
import com.oceanbase.clogproxy.client.enums.DBType;
import com.oceanbase.clogproxy.client.exception.LogMessageException;
import com.oceanbase.clogproxy.client.listener.FieldParseListener;
import com.oceanbase.clogproxy.client.util.BinaryMessageUtils;
import com.oceanbase.oms.logmessage.enums.DBType;
import com.oceanbase.oms.logmessage.enums.DataType;
import com.oceanbase.oms.logmessage.utils.BinaryMessageUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.StringUtils;
......@@ -32,14 +30,12 @@ import java.util.Map;
import java.util.Set;
import java.util.zip.CRC32;
public class LogMessage extends DataMessage.Record {
private static final Logger log = LoggerFactory.getLogger(LogMessage.class);
private static final String DEFAULT_ENCODING = "ASCII";
public static final String DEFAULT_ENCODING = "ASCII";
private static final String UTF8_ENCODING = "UTF-8";
public static final String UTF8_ENCODING = "UTF-8";
private static final String SEP = System.getProperty("line.separator");
//old version header length
......@@ -244,7 +240,7 @@ public class LogMessage extends DataMessage.Record {
}
}
}
return tableName == null || "".endsWith(tableName) ? null : tableName;
return "".endsWith(tableName) ? null : tableName;
}
@Override
......@@ -271,7 +267,7 @@ public class LogMessage extends DataMessage.Record {
}
}
}
return serverId == null || "".endsWith(serverId) ? null : serverId;
return "".endsWith(serverId) ? null : serverId;
}
@Override
......@@ -323,8 +319,6 @@ public class LogMessage extends DataMessage.Record {
currentNewColOffset = (int) wrapByteBuf.readUnsignedInt();
}
DBType dbType = getDbType();
//start loop
for (int i = 0; i < count; i++) {
//get pk boolean
......@@ -357,37 +351,21 @@ public class LogMessage extends DataMessage.Record {
+ i * elementSize);
flag = wrapByteBuf.readUnsignedByte();
}
Field.Type fieldType = Field.MYSQL_TYPES[type];
//get real encoding
String realEncoding = encodingStr;
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + colsEncodingOffset + BYTE_SIZE
+ INT_SIZE + (i + 1) * INT_SIZE));
if (colEncodingsCount > 0) {
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + colsEncodingOffset + BYTE_SIZE
+ INT_SIZE + (i + 1) * INT_SIZE));
int nextEncodingOffset = (int) wrapByteBuf.readUnsignedInt();
ByteString encodingByteString = new ByteString(wrapByteBuf.array(),
PREFIX_LENGTH + currentEncodingOffset + BYTE_SIZE + INT_SIZE + (count + 1)
* INT_SIZE + (int) colsEncodingOffset, nextEncodingOffset
- currentEncodingOffset - 1);
realEncoding = encodingByteString.toString();
if (realEncoding.isEmpty()) {
if ((type == 253 || type == 254)
&& Field.MYSQL_TYPES[type] == Field.Type.STRING) {
realEncoding = "binary";
} else if (Field.MYSQL_TYPES[type] == Field.Type.BLOB) {
realEncoding = "";
} else if (type == 245) {
realEncoding = UTF8MB4_ENCODING;
} else {
realEncoding = DEFAULT_ENCODING;
}
}
currentEncodingOffset = nextEncodingOffset;
} else if (dbType == DBType.DB2 && (fieldType == Field.Type.BLOB || fieldType == Field.Type.BINARY)) {
realEncoding = "";
}
if (this.getDbType() == DBType.MYSQL && realEncoding != null && !realEncoding.isEmpty() && Field.MYSQL_TYPES[type] == Field.Type.BLOB) {
type = 15;
}
realEncoding = logTypeHelper.correctEncoding(type, realEncoding);
type = logTypeHelper.correctCode(type, realEncoding);
//colName
wrapByteBuf
.readerIndex((int) (PREFIX_LENGTH + colNamesOffset + BYTE_SIZE + INT_SIZE + (i + 1)
......@@ -468,9 +446,20 @@ public class LogMessage extends DataMessage.Record {
|| newColsOffset < 0) {
return fields;
}
//global encoding
String encodingStr = BinaryMessageUtils.getString(byteBuf.array(), (int) encoding,
DEFAULT_ENCODING);
/*
* global encoding
*
* 对于 DDL 的默认编码改动, DDL DrcMessage 不携带编码信息,只能使用默认编码,但是 ASCII 对于中文处理出错
* 对于 DDL 默认编码改为 UTF-8,不改变除 DDL 之外其他行为
*/
String encodingStr = null;
if (this.getOpt() == Type.DDL) {
encodingStr = UTF8_ENCODING;
} else {
encodingStr = BinaryMessageUtils.getString(byteBuf.array(), (int) encoding,
DEFAULT_ENCODING);
}
//pk info
List<Integer> pks = null;
if ((int) pkKeysOffset > 0) {
......@@ -513,7 +502,6 @@ public class LogMessage extends DataMessage.Record {
if (0 != newColCount) {
currentNewColOffset = (int) wrapByteBuf.readUnsignedInt();
}
DBType dbType = getDbType();
//start loop
for (int i = 0; i < count; i++) {
//get pk boolean
......@@ -546,39 +534,28 @@ public class LogMessage extends DataMessage.Record {
+ INT_SIZE + i * elementSize);
flag = wrapByteBuf.readUnsignedByte();
}
Field.Type fieldType = Field.MYSQL_TYPES[type];
//get real encoding
String realEncoding = encodingStr;
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + colsEncodingOffset + BYTE_SIZE
+ INT_SIZE + (i + 1) * INT_SIZE));
// now deliver have fix encoding offset bug, encoding has been decoded correctly
// add db2 compatible code for new version db2 reader
// old else will also saved for old version store
// this code will deprecated in future release
// correct oracle code if oralce reader has update delivier version or correct type code
if (colEncodingsCount > 0) {
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + colsEncodingOffset + BYTE_SIZE
+ INT_SIZE + (i + 1) * INT_SIZE));
int nextEncodingOffset = (int) wrapByteBuf.readUnsignedInt();
ByteString encodingByteString = new ByteString(wrapByteBuf.array(),
PREFIX_LENGTH + currentEncodingOffset + BYTE_SIZE + INT_SIZE
+ (count + 1) * INT_SIZE + (int) colsEncodingOffset,
nextEncodingOffset - currentEncodingOffset - 1);
realEncoding = encodingByteString.toString();
if (realEncoding.isEmpty()) {
if ((type == 253 || type == 254)
&& Field.MYSQL_TYPES[type] == Field.Type.STRING) {
realEncoding = "binary";
} else if (Field.MYSQL_TYPES[type] == Field.Type.BLOB) {
realEncoding = "";
} else if (type == 245) {
realEncoding = UTF8MB4_ENCODING;
} else {
realEncoding = DEFAULT_ENCODING;
}
}
currentEncodingOffset = nextEncodingOffset;
} else if (dbType == DBType.DB2 && (fieldType == Field.Type.BLOB || fieldType == Field.Type.BINARY)) {
// TODO: DB2 的 BLOB 类型, 等 Store 改好之后删
realEncoding = "";
}
realEncoding = logTypeHelper.correctEncoding(type, realEncoding);
type = logTypeHelper.correctCode(type, realEncoding);
if (this.getDbType() == DBType.MYSQL && !realEncoding.isEmpty() && Field.MYSQL_TYPES[type] == Field.Type.BLOB) {
type = 15;
}
//colName
wrapByteBuf.readerIndex((int) (PREFIX_LENGTH + colNamesOffset + BYTE_SIZE
+ INT_SIZE + (i + 1) * INT_SIZE));
......@@ -671,7 +648,6 @@ public class LogMessage extends DataMessage.Record {
setByteBuf(inner);
}
/**
* Get the primary data to avoid parsing
* @return primary data
......@@ -858,11 +834,11 @@ public class LogMessage extends DataMessage.Record {
String keyStr = key.toString();
int m = 0;
while (true) {
int i = keyStr.indexOf("(", m);
int i = keyStr.indexOf('(', m);
if (i == -1) {
break;
}
int j = keyStr.indexOf(")", i);
int j = keyStr.indexOf(')', i);
if (j == -1) {
log.error("parse key error");
return null;
......@@ -1043,11 +1019,11 @@ public class LogMessage extends DataMessage.Record {
if (StringUtils.isNotEmpty(rawConstraintString)) {
int m = 0;
while (m < rawConstraintString.length()) {
int leftIndex = rawConstraintString.indexOf("(", m);
int leftIndex = rawConstraintString.indexOf('(', m);
if (leftIndex == -1) {
break;
}
int rightIndex = rawConstraintString.indexOf(")", leftIndex);
int rightIndex = rawConstraintString.indexOf(')', leftIndex);
if (rightIndex == -1) {
if (rawConstraintString.length() == 1) {
throw new IOException(
......@@ -1245,4 +1221,16 @@ public class LogMessage extends DataMessage.Record {
public long getFileOffset() {
return fileOffset;
}
public ByteBuf getByteBuff() {
return byteBuf;
}
public void releaseContents() {
// release reference to let gc collect mem
this.fields = null;
this.pkValues = null;
this.keysValue = null;
this.primaryKeyIndexList = null;
}
}
......@@ -8,10 +8,10 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.exception;
package com.oceanbase.oms.logmessage;
/**
* This is a subclasses of {@link RuntimeException} primarily used in process of parsing {@link com.oceanbase.clogproxy.client.message.LogMessage}.
* This is a subclasses of {@link RuntimeException} primarily used in process of parsing {@link LogMessage}.
*/
public class LogMessageException extends RuntimeException {
......
......@@ -8,9 +8,9 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.message;
package com.oceanbase.oms.logmessage;
import com.oceanbase.clogproxy.client.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
......
......@@ -8,10 +8,10 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.enums;
package com.oceanbase.oms.logmessage.enums;
/**
* Database type enumeration.
* Database type enumeration for open source.
*/
public enum DBType {
MYSQL,
......
......@@ -8,7 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.constants;
package com.oceanbase.oms.logmessage.enums;
/**
* The class that defines the constants that are used to identify data types.
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
// compatible with mysql type code
// same code may reference different schema type
public class LogMessageTypeCode {
public static final int DRC_MSG_TYPE_DECIMAL = 0;
public static final int DRC_MSG_TYPE_TINY = 1;
public static final int DRC_MSG_TYPE_SHORT = 2;
public static final int DRC_MSG_TYPE_LONG = 3;
public static final int DRC_MSG_TYPE_FLOAT = 4;
public static final int DRC_MSG_TYPE_DOUBLE = 5;
public static final int DRC_MSG_TYPE_NULL = 6;
public static final int DRC_MSG_TYPE_TIMESTAMP = 7;
public static final int DRC_MSG_TYPE_LONGLONG = 8;
public static final int DRC_MSG_TYPE_INT24 = 9;
public static final int DRC_MSG_TYPE_DATE = 10;
public static final int DRC_MSG_TYPE_TIME = 11;
public static final int DRC_MSG_TYPE_DATETIME = 12;
public static final int DRC_MSG_TYPE_YEAR = 13;
public static final int DRC_MSG_TYPE_NEWDATE = 14;
public static final int DRC_MSG_TYPE_VARCHAR = 15;
public static final int DRC_MSG_TYPE_BIT = 16;
public static final int DRC_MSG_TYPE_TIMESTAMP2 = 17;
public static final int DRC_MSG_TYPE_DATETIME2 = 18;
public static final int DRC_MSG_TYPE_TIME2 = 19;
// appeared in ob define, but should not appeared in drc types
public static final int DRC_MSG_COMPLEX = 160;
public static final int DRC_MSG_TYPE_ARRAY = 161;
public static final int DRC_MSG_TYPE_STRUCT = 162;
public static final int DRC_MSG_TYPE_CURSOR = 163;
public static final int DRC_MSG_TYPE_ORA_BLOB = 210;
public static final int DRC_MSG_TYPE_CLOB = 211;
public static final int DRC_MSG_TYPE_TEXT = 197;
public static final int DRC_MSG_TYPE_VAR_BINARY = 198;
public static final int DRC_MSG_TYPE_BINARY = 199;
public static final int DRC_MSG_TYPE_TIMESTAMP_WITH_TIME_ZONE = 200;
public static final int DRC_MSG_TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = 201;
public static final int DRC_MSG_TYPE_TIMESTAMP_NANO = 202;
public static final int DRC_MSG_TYPE_RAW = 203;
public static final int DRC_MSG_TYPE_INTERVAL_YEAR_TO_MONTH = 204;
public static final int DRC_MSG_TYPE_INTERVAL_DAY_TO_SECOND =205;
public static final int DRC_MSG_TYPE_NUMBER_FLOAT = 206;
public static final int DRC_MSG_TYPE_NVARCHAR2 = 207;
public static final int DRC_MSG_TYPE_NCHAR = 208;
public static final int DRC_MSG_TYPE_ROW_ID = 209;
public static final int DRC_MSG_TYPE_JSON = 245;
public static final int DRC_MSG_TYPE_NEWDECIMAL = 246;
public static final int DRC_MSG_TYPE_ENUM = 247;
public static final int DRC_MSG_TYPE_SET = 248;
public static final int DRC_MSG_TYPE_TINY_BLOB = 249;
public static final int DRC_MSG_TYPE_MEDIUM_BLOB = 250;
public static final int DRC_MSG_TYPE_LONG_BLOB = 251;
public static final int DRC_MSG_TYPE_BLOB = 252;
public static final int DRC_MSG_TYPE_VAR_STRING =253;
public static final int DRC_MSG_TYPE_STRING = 254;
public static final int DRC_MSG_TYPE_GEOMETRY = 255;
public static final int DRC_MSG_TYPE_ORA_BINARY_FLOAT = 256;
public static final int DRC_MSG_TYPE_ORA_BINARY_DOUBLE = 257;
public static final int DRC_MSG_TYPE_UNKNOWN = DRC_MSG_TYPE_ORA_BINARY_DOUBLE + 1;
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.enums.DBType;
public abstract class LogTypeHelper {
public static final String BINARY_STR = "binary";
public static final String EMPTY_ENCODING_STR = "";
protected final DBType dbType;
public LogTypeHelper(DBType dbType) {
this.dbType = dbType;
}
public DBType getDbType() {
return dbType;
}
abstract public String correctEncoding(int typeCode, String realEncoding);
abstract public int correctCode(int typeCode, String encoding);
abstract public void correctField(DataMessage.Record.Field f, String realEncoding);
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.logmessage.enums.DBType;
public abstract class LogTypeHelperFactory {
public static LogTypeHelper getInstance(DBType dbType) {
switch (dbType) {
case OCEANBASE:
case OCEANBASE1:
return OBLogTypeHelper.OB_LOG_TYPE_HELPER;
default:
throw new RuntimeException("unsupported dbtype");
}
}
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.enums.DBType;
import org.apache.commons.lang3.StringUtils;
public class OBLogTypeHelper extends LogTypeHelper {
public static final OBLogTypeHelper OB_LOG_TYPE_HELPER = new OBLogTypeHelper();
private static final String DEFAULT_ENCODING = "";
public OBLogTypeHelper() {
super(DBType.OCEANBASE1);
}
@Override
public String correctEncoding(int typeCode, String realEncoding) {
switch (typeCode) {
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING:
case LogMessageTypeCode.DRC_MSG_TYPE_STRING:
return realEncoding;
default:
if (StringUtils.equals(realEncoding, "binary")) {
return DEFAULT_ENCODING;
} else {
return realEncoding;
}
}
}
@Override
public int correctCode(int typeCode, String encoding) {
switch (typeCode) {
case LogMessageTypeCode.DRC_MSG_TYPE_TINY_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_MEDIUM_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_LONG_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_BLOB:
if (!StringUtils.isEmpty(encoding) && !StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_CLOB;
}
break;
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING:
if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_VAR_BINARY;
} else {
return LogMessageTypeCode.DRC_MSG_TYPE_VARCHAR;
}
case LogMessageTypeCode.DRC_MSG_TYPE_STRING:
if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_BINARY;
}
break;
}
return typeCode;
}
@Override
public void correctField(DataMessage.Record.Field f, String realEncoding) {
switch (f.type) {
case LogMessageTypeCode.DRC_MSG_TYPE_TINY_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_MEDIUM_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_LONG_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_BLOB:
if (!StringUtils.isEmpty(f.encoding) && !StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_CLOB;
}
break;
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING:
if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_VAR_BINARY;
} else {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_VARCHAR;
}
break;
case LogMessageTypeCode.DRC_MSG_TYPE_STRING:
if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_BINARY;
}
break;
default:
f.encoding = DEFAULT_ENCODING;
}
}
}
......@@ -8,10 +8,10 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
package com.oceanbase.oms.logmessage.utils;
import com.oceanbase.clogproxy.client.constants.DataType;
import com.oceanbase.clogproxy.client.message.ByteString;
import com.oceanbase.oms.logmessage.ByteString;
import com.oceanbase.oms.logmessage.enums.DataType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
......@@ -26,9 +26,6 @@ import java.util.List;
*/
public class BinaryMessageUtils {
/**
*
*/
private static final int PREFIX_LENGTH = 12;
/**
......
......@@ -8,7 +8,7 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.util;
package com.oceanbase.oms.logmessage.utils;
import java.util.ArrayList;
import java.util.List;
......
......@@ -14,12 +14,12 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
......
......@@ -11,8 +11,8 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.clogproxy.client.connection;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.handler.ssl.SslContext;
import java.util.concurrent.BlockingQueue;
......
......@@ -12,7 +12,7 @@ package com.oceanbase.clogproxy.client.listener;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.oms.logmessage.LogMessage;
/**
* This interface defined a kind of listener for record response.
......
......@@ -13,7 +13,7 @@ package com.oceanbase.clogproxy.client;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.message.LogMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.junit.Ignore;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册