未验证 提交 426f63aa 编写于 作者: H He Wang 提交者: GitHub

update logmessage based on internal version (#51)

* update logmessage from internal store client

* update comments
上级 92bcf94e
...@@ -8,21 +8,16 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, ...@@ -8,21 +8,16 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.enums; package com.oceanbase.oms.common.enums;
/** Database type enumeration for open source. */ public enum DbCategoryEnum {
public enum DBType {
MYSQL, /** Relational database. */
RDB,
OCEANBASE, /** Message queue. */
MQ,
HBASE, /** Big data. */
BIGDATA,
ORACLE, /** Not only SQL. */
NOSQL;
OCEANBASE1,
DB2,
UNKNOWN
} }
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.common.enums;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public enum DbTypeEnum {
/** OceanBase 0.5. */
OB_05(DbCategoryEnum.RDB, new HashSet<>(Arrays.asList("oceanbase", "ob05"))),
/** OceanBase in MySQL mode. */
OB_MYSQL(
DbCategoryEnum.RDB,
new HashSet<>(Arrays.asList("oceanbase1", "ob10", "oceanbase_mysql_mode"))),
/** OceanBase in Oracle mode. */
OB_ORACLE(
DbCategoryEnum.RDB,
new HashSet<>(Arrays.asList("oceanbase_oracle_mode", "ob_in_oracle_mode"))),
UNKNOWN(null);
DbTypeEnum(DbCategoryEnum category) {
this.category = category;
this.aliases = Collections.emptySet();
}
DbTypeEnum(DbCategoryEnum category, Set<String> aliases) {
this.category = category;
this.aliases = aliases;
}
public static DbTypeEnum fromAlias(String alias) {
return ALIAS_ENUM_MAP.get(alias.toLowerCase());
}
public static DbTypeEnum valueOfIgnoreCase(String value) {
try {
if (DbTypeEnum.fromAlias(value) != null) {
return DbTypeEnum.fromAlias(value);
} else {
return DbTypeEnum.valueOf(value.toUpperCase());
}
} catch (IllegalArgumentException e) {
return null;
}
}
private static final Map<String, DbTypeEnum> ALIAS_ENUM_MAP = new HashMap<>();
static {
for (DbTypeEnum one : values()) {
for (String alias : one.getAliases()) {
assert !ALIAS_ENUM_MAP.containsKey(alias.toLowerCase());
ALIAS_ENUM_MAP.put(alias.toLowerCase(), one);
}
}
}
private final DbCategoryEnum category;
private final Set<String> aliases;
public DbCategoryEnum getCategory() {
return category;
}
public Set<String> getAliases() {
return aliases;
}
}
...@@ -11,9 +11,9 @@ See the Mulan PSL v2 for more details. */ ...@@ -11,9 +11,9 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage; package com.oceanbase.oms.logmessage;
import com.oceanbase.oms.logmessage.enums.DBType; import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.typehelper.LogTypeHelper; import com.oceanbase.oms.logmessage.typehelper.LogTypeHelper;
import com.oceanbase.oms.logmessage.typehelper.OBLogTypeHelper; import com.oceanbase.oms.logmessage.typehelper.LogTypeHelperFactory;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -31,7 +31,6 @@ public class DataMessage extends Message { ...@@ -31,7 +31,6 @@ public class DataMessage extends Message {
/** Record contains data of one record. */ /** Record contains data of one record. */
public static class Record { public static class Record {
public static LogTypeHelper logTypeHelper = OBLogTypeHelper.OB_LOG_TYPE_HELPER;
public static final String UTF8MB4_ENCODING = "utf8mb4"; public static final String UTF8MB4_ENCODING = "utf8mb4";
public static final String TRACEID_STRING = "traceid"; public static final String TRACEID_STRING = "traceid";
...@@ -122,6 +121,8 @@ public class DataMessage extends Message { ...@@ -122,6 +121,8 @@ public class DataMessage extends Message {
public boolean prev = false; public boolean prev = false;
public boolean notNull = false;
public enum Type { public enum Type {
INT8, INT8,
INT16, INT16,
...@@ -190,6 +191,10 @@ public class DataMessage extends Message { ...@@ -190,6 +191,10 @@ public class DataMessage extends Message {
this.flag = flag; this.flag = flag;
} }
public void setNotNull(boolean notNull) {
this.notNull = notNull;
}
public final boolean isPrimary() { public final boolean isPrimary() {
return primaryKey; return primaryKey;
} }
...@@ -223,6 +228,10 @@ public class DataMessage extends Message { ...@@ -223,6 +228,10 @@ public class DataMessage extends Message {
return encoding; return encoding;
} }
public final boolean getNotNull() {
return notNull;
}
public static Type[] MYSQL_TYPES = new Type[256]; public static Type[] MYSQL_TYPES = new Type[256];
static { static {
...@@ -371,6 +380,7 @@ public class DataMessage extends Message { ...@@ -371,6 +380,7 @@ public class DataMessage extends Message {
builder.append("Field name: " + name + System.getProperty("line.separator")); builder.append("Field name: " + name + System.getProperty("line.separator"));
builder.append("Field type: " + type + System.getProperty("line.separator")); builder.append("Field type: " + type + System.getProperty("line.separator"));
builder.append("Field length: " + length + System.getProperty("line.separator")); builder.append("Field length: " + length + System.getProperty("line.separator"));
builder.append("Field notNull: " + notNull + System.getProperty("line.separator"));
if (value != null) { if (value != null) {
if ("binary".equalsIgnoreCase(encoding)) { if ("binary".equalsIgnoreCase(encoding)) {
builder.append( builder.append(
...@@ -447,7 +457,9 @@ public class DataMessage extends Message { ...@@ -447,7 +457,9 @@ public class DataMessage extends Message {
type = Type.valueOf(stype.toUpperCase()); type = Type.valueOf(stype.toUpperCase());
// set timestamp,process heartbeat between tx // set timestamp,process heartbeat between tx
timestamp = getAttribute("timestamp"); timestamp = getAttribute("timestamp");
if (getDbType() == DBType.OCEANBASE1) { DbTypeEnum dbType = getDbType();
LogTypeHelper logTypeHelper = LogTypeHelperFactory.getInstance(dbType);
if (dbType == DbTypeEnum.OB_MYSQL || dbType == DbTypeEnum.OB_ORACLE) {
if (type == Type.HEARTBEAT) { if (type == Type.HEARTBEAT) {
globalSafeTimestamp.set(timestamp); globalSafeTimestamp.set(timestamp);
} else { } else {
...@@ -654,25 +666,8 @@ public class DataMessage extends Message { ...@@ -654,25 +666,8 @@ public class DataMessage extends Message {
return getAttribute("unique"); return getAttribute("unique");
} }
public DBType getDbType() { public DbTypeEnum getDbType() {
String type = getAttribute("source_type"); return parseDbTypeStr(getAttribute("source_type"));
if (StringUtils.isEmpty(type)) {
return DBType.UNKNOWN;
}
if ("mysql".equalsIgnoreCase(type)) {
return DBType.MYSQL;
} else if ("oceanbase".equalsIgnoreCase(type)) {
return DBType.OCEANBASE;
} else if ("oracle".equalsIgnoreCase(type)) {
return DBType.ORACLE;
} else if ("hbase".equalsIgnoreCase(type)) {
return DBType.HBASE;
} else if ("oceanbase_1_0".equalsIgnoreCase(type)) {
return DBType.OCEANBASE1;
} else if ("db2".equalsIgnoreCase(type)) {
return DBType.DB2;
}
return DBType.UNKNOWN;
} }
public boolean isQueryBack() { public boolean isQueryBack() {
...@@ -772,15 +767,12 @@ public class DataMessage extends Message { ...@@ -772,15 +767,12 @@ public class DataMessage extends Message {
} }
public String getMessageUniqueIdStr() throws Exception { public String getMessageUniqueIdStr() throws Exception {
DBType dbType = getDbType(); DbTypeEnum dbType = getDbType();
this.checkDBType(dbType); this.checkDBType(dbType);
StringBuilder messageId = new StringBuilder(); StringBuilder messageId = new StringBuilder();
if (dbType == DBType.MYSQL) {
messageId.append(getServerId());
}
messageId.append("/").append(this.getCommonPart()).append("/"); messageId.append("/").append(this.getCommonPart()).append("/");
if (dbType == DBType.OCEANBASE1) { if (dbType == DbTypeEnum.OB_MYSQL || dbType == DbTypeEnum.OB_ORACLE) {
messageId.append("/"); messageId.append("/");
} else { } else {
String checkpoint = getCheckpoint(); String checkpoint = getCheckpoint();
...@@ -791,7 +783,7 @@ public class DataMessage extends Message { ...@@ -791,7 +783,7 @@ public class DataMessage extends Message {
} }
messageId.append("/"); messageId.append("/");
if (dbType == DBType.OCEANBASE1) { if (dbType == DbTypeEnum.OB_MYSQL || dbType == DbTypeEnum.OB_ORACLE) {
messageId.append(getOB10UniqueId()); messageId.append(getOB10UniqueId());
} }
...@@ -799,12 +791,13 @@ public class DataMessage extends Message { ...@@ -799,12 +791,13 @@ public class DataMessage extends Message {
return messageId.toString(); return messageId.toString();
} }
private void checkDBType(DBType dbType) { private void checkDBType(DbTypeEnum dbType) {
if (dbType != DBType.MYSQL switch (dbType) {
&& dbType != DBType.OCEANBASE case OB_MYSQL:
&& dbType != DBType.OCEANBASE1 case OB_ORACLE:
&& dbType != DBType.ORACLE case OB_05:
&& dbType != DBType.DB2) { break;
default:
throw new IllegalStateException( throw new IllegalStateException(
"dbType [" + dbType + "] is not valid for messageId"); "dbType [" + dbType + "] is not valid for messageId");
} }
...@@ -914,4 +907,27 @@ public class DataMessage extends Message { ...@@ -914,4 +907,27 @@ public class DataMessage extends Message {
public void addRecord(Record r) { public void addRecord(Record r) {
records.add(r); records.add(r);
} }
public static DbTypeEnum parseDbTypeStr(String dbTypeInStr) {
if (StringUtils.isEmpty(dbTypeInStr)) {
return DbTypeEnum.UNKNOWN;
}
if ("oceanbase".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.OB_05;
} else if ("oceanbase_1_0".equalsIgnoreCase(dbTypeInStr)) {
return DbTypeEnum.OB_MYSQL;
}
return DbTypeEnum.UNKNOWN;
}
public static DbTypeEnum parseDBTypeCode(int dbTypeCode) {
switch (dbTypeCode) {
case 1:
return DbTypeEnum.OB_05;
case 4:
return DbTypeEnum.OB_MYSQL;
default:
return DbTypeEnum.UNKNOWN;
}
}
} }
...@@ -10,15 +10,44 @@ See the Mulan PSL v2 for more details. */ ...@@ -10,15 +10,44 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage; package com.oceanbase.oms.logmessage;
import com.oceanbase.oms.logmessage.typehelper.LogMessageTypeCode;
/** This interface defined a kind of listener for field parsing. */ /** This interface defined a kind of listener for field parsing. */
public interface FieldParseListener { public interface FieldParseListener {
/** /**
* Handle the filed parsing result. * Handle the filed parsing result.
* *
* @param prev The original field. * @param fieldName Field name.
* @param next The field after parsing. * @param type {@link LogMessageTypeCode}.
* @throws Exception When exception occurs. * @param encoding Encoding of value.
* @param value Field value.
* @param notNull Flag of whether the field is not null (not optional).
* @param isPrev Flag of whether the value is the old one.
*/
void parseNotify(
String fieldName,
int type,
String encoding,
ByteString value,
boolean notNull,
boolean isPrev);
/**
* Handle the filed parsing result. Only support value, as we already know schema info.
*
* @param type {@link LogMessageTypeCode}.
* @param value Field value.
* @param encoding Encoding of value.
* @param isPrev Flag of whether the value is the old one.
*/
void parseNotify(int type, ByteString value, String encoding, boolean isPrev);
/**
* Flag of whether schema info (fieldName, type, encoding, notNull) is needed.
*
* @return True for needed, otherwise false.
*/ */
void parseNotify(DataMessage.Record.Field prev, DataMessage.Record.Field next) throws Exception; boolean needSchemaInfo();
} }
...@@ -11,8 +11,10 @@ See the Mulan PSL v2 for more details. */ ...@@ -11,8 +11,10 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage; package com.oceanbase.oms.logmessage;
import com.oceanbase.oms.logmessage.enums.DBType; import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.enums.DataType; import com.oceanbase.oms.logmessage.enums.DataType;
import com.oceanbase.oms.logmessage.typehelper.LogTypeHelper;
import com.oceanbase.oms.logmessage.typehelper.LogTypeHelperFactory;
import com.oceanbase.oms.logmessage.utils.BinaryMessageUtils; import com.oceanbase.oms.logmessage.utils.BinaryMessageUtils;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
...@@ -38,10 +40,9 @@ public class LogMessage extends DataMessage.Record { ...@@ -38,10 +40,9 @@ public class LogMessage extends DataMessage.Record {
public static final String UTF8_ENCODING = "UTF-8"; public static final String UTF8_ENCODING = "UTF-8";
private static final String SEP = System.getProperty("line.separator"); private static final String SEP = System.getProperty("line.separator");
// old version header length
private static final int OLD_VERSION_2_HEADER_LEN = 88; private static final int OLD_VERSION_2_HEADER_LEN = 88;
// new version header length
private static final int NEW_VERSION_2_HEADER_LEN = 96; private static final int NEW_VERSION_2_HEADER_LEN = 96;
private static final int VERSION_3_HEADER_LEN = 104; private static final int VERSION_3_HEADER_LEN = 104;
...@@ -85,7 +86,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -85,7 +86,7 @@ public class LogMessage extends DataMessage.Record {
private long newColsOffset = -1; private long newColsOffset = -1;
private long m_pkValOffset = -1; private long pkValOffset = -1;
private long pkKeysOffset = -1; private long pkKeysOffset = -1;
...@@ -101,6 +102,8 @@ public class LogMessage extends DataMessage.Record { ...@@ -101,6 +102,8 @@ public class LogMessage extends DataMessage.Record {
private long colFlagOffset = -1; private long colFlagOffset = -1;
private long colNotNullOffset = -1;
/** buf parse data */ /** buf parse data */
private String dbName; private String dbName;
...@@ -120,16 +123,10 @@ public class LogMessage extends DataMessage.Record { ...@@ -120,16 +123,10 @@ public class LogMessage extends DataMessage.Record {
private List<Long> timeMarks = null; private List<Long> timeMarks = null;
private List<String> colFilter;
private boolean keyChange = false; private boolean keyChange = false;
/** /** type size map, used to get array type bytes by type index */
* type bitmap,get array type bytes by type index array first byte : 1, array element is private static final int[] ELEMENT_ARRAY = {0, 1, 1, 2, 2, 4, 4, 8, 8};
* unsigned byte 2, array element is unsigned short 4, array element is unsigned int 8, array
* element is long
*/
private static final int[] elementArray = {0, 1, 1, 2, 2, 4, 4, 8, 8};
private static final int BYTE_SIZE = 1; private static final int BYTE_SIZE = 1;
...@@ -152,33 +149,13 @@ public class LogMessage extends DataMessage.Record { ...@@ -152,33 +149,13 @@ public class LogMessage extends DataMessage.Record {
return keyChange; return keyChange;
} }
@Override
public void setColFilter(List<String> colFilter) {
this.colFilter = colFilter;
}
public int getVersion() { public int getVersion() {
return brVersion; return brVersion;
} }
@Override @Override
public DBType getDbType() { public DbTypeEnum getDbType() {
switch (srcType) { return DataMessage.parseDBTypeCode(srcType);
case 0:
return DBType.MYSQL;
case 1:
return DBType.OCEANBASE;
case 2:
return DBType.HBASE;
case 3:
return DBType.ORACLE;
case 4:
return DBType.OCEANBASE1;
case 5:
return DBType.DB2;
default:
return DBType.UNKNOWN;
}
} }
@Override @Override
...@@ -275,13 +252,19 @@ public class LogMessage extends DataMessage.Record { ...@@ -275,13 +252,19 @@ public class LogMessage extends DataMessage.Record {
if (colNamesOffset < 0 || colTypesOffset < 0 || oldColsOffset < 0 || newColsOffset < 0) { if (colNamesOffset < 0 || colTypesOffset < 0 || oldColsOffset < 0 || newColsOffset < 0) {
return; return;
} }
// global encoding
String encodingStr = /*
* global encoding
*
* 对于 DDL 的默认编码改动, DDL DrcMessage 不携带编码信息,只能使用默认编码,但是 ASCII 对于中文处理出错
* 对于 DDL 默认编码改为 UTF-8,不改变除 DDL 之外其他行为
*/
String encodingStr = null;
if (this.getOpt() == Type.DDL) {
encodingStr = UTF8_ENCODING;
} else {
encodingStr =
BinaryMessageUtils.getString(byteBuf.array(), (int) encoding, DEFAULT_ENCODING); BinaryMessageUtils.getString(byteBuf.array(), (int) encoding, DEFAULT_ENCODING);
// pk info
List<Integer> pks = null;
if ((int) pkKeysOffset > 0) {
pks = BinaryMessageUtils.getArray(byteBuf.array(), (int) pkKeysOffset);
} }
// get column count // get column count
ByteBuf wrapByteBuf = ByteBuf wrapByteBuf =
...@@ -291,7 +274,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -291,7 +274,7 @@ public class LogMessage extends DataMessage.Record {
// op type array // op type array
wrapByteBuf.readerIndex(PREFIX_LENGTH + (int) colTypesOffset); wrapByteBuf.readerIndex(PREFIX_LENGTH + (int) colTypesOffset);
byte t = wrapByteBuf.readByte(); byte t = wrapByteBuf.readByte();
int elementSize = elementArray[t & DataType.DT_MASK]; int elementSize = ELEMENT_ARRAY[t & DataType.DT_MASK];
// encoding // encoding
int colEncodingsCount = 0; int colEncodingsCount = 0;
int currentEncodingOffset = 0; int currentEncodingOffset = 0;
...@@ -318,14 +301,9 @@ public class LogMessage extends DataMessage.Record { ...@@ -318,14 +301,9 @@ public class LogMessage extends DataMessage.Record {
if (0 != newColCount) { if (0 != newColCount) {
currentNewColOffset = (int) wrapByteBuf.readUnsignedInt(); currentNewColOffset = (int) wrapByteBuf.readUnsignedInt();
} }
LogTypeHelper logTypeHelper = LogTypeHelperFactory.getInstance(getDbType());
// start loop // start loop
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// get pk boolean
boolean isPk = false;
if (pks != null && pks.contains(i)) {
isPk = true;
}
// get real op type // get real op type
int type = 0; int type = 0;
wrapByteBuf.readerIndex( wrapByteBuf.readerIndex(
...@@ -344,19 +322,26 @@ public class LogMessage extends DataMessage.Record { ...@@ -344,19 +322,26 @@ public class LogMessage extends DataMessage.Record {
type = (int) wrapByteBuf.readLong(); type = (int) wrapByteBuf.readLong();
break; break;
} }
// get col flag boolean notNull = false;
int flag = 0; if (fieldParseListener.needSchemaInfo()) {
if (colFlagOffset > 0) { if (colNotNullOffset > 0) {
wrapByteBuf.readerIndex( wrapByteBuf.readerIndex(
PREFIX_LENGTH PREFIX_LENGTH
+ (int) colFlagOffset + (int) colNotNullOffset
+ BYTE_SIZE + BYTE_SIZE
+ INT_SIZE + INT_SIZE
+ i * elementSize); + i * elementSize);
flag = wrapByteBuf.readUnsignedByte(); notNull = wrapByteBuf.readBoolean();
}
} }
// get real encoding // get real encoding
String realEncoding = encodingStr; String realEncoding = encodingStr;
// now deliver have fix encoding offset bug, encoding has been decoded correctly
// add db2 compatible code for new version db2 reader
// old else will also saved for old version store
// this code will deprecated in future release
// correct oracle code if oralce reader has update delivier version or correct type code
if (colEncodingsCount > 0) { if (colEncodingsCount > 0) {
wrapByteBuf.readerIndex( wrapByteBuf.readerIndex(
(int) (int)
...@@ -380,7 +365,10 @@ public class LogMessage extends DataMessage.Record { ...@@ -380,7 +365,10 @@ public class LogMessage extends DataMessage.Record {
currentEncodingOffset = nextEncodingOffset; currentEncodingOffset = nextEncodingOffset;
} }
realEncoding = logTypeHelper.correctEncoding(type, realEncoding); realEncoding = logTypeHelper.correctEncoding(type, realEncoding);
String columnName = null;
if (fieldParseListener.needSchemaInfo()) {
type = logTypeHelper.correctCode(type, realEncoding); type = logTypeHelper.correctCode(type, realEncoding);
// colName // colName
wrapByteBuf.readerIndex( wrapByteBuf.readerIndex(
(int) (int)
...@@ -390,7 +378,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -390,7 +378,7 @@ public class LogMessage extends DataMessage.Record {
+ INT_SIZE + INT_SIZE
+ (i + 1) * INT_SIZE)); + (i + 1) * INT_SIZE));
int nextColNameOffset = (int) wrapByteBuf.readUnsignedInt(); int nextColNameOffset = (int) wrapByteBuf.readUnsignedInt();
ByteString ColNameByteString = ByteString colNameByteString =
new ByteString( new ByteString(
wrapByteBuf.array(), wrapByteBuf.array(),
PREFIX_LENGTH PREFIX_LENGTH
...@@ -400,23 +388,9 @@ public class LogMessage extends DataMessage.Record { ...@@ -400,23 +388,9 @@ public class LogMessage extends DataMessage.Record {
+ (count + 1) * INT_SIZE + (count + 1) * INT_SIZE
+ (int) colNamesOffset, + (int) colNamesOffset,
nextColNameOffset - currentColNameOffset - 1); nextColNameOffset - currentColNameOffset - 1);
String columnName = ColNameByteString.toString(); columnName = colNameByteString.toString();
currentColNameOffset = nextColNameOffset; currentColNameOffset = nextColNameOffset;
Field prev = null;
Field next = null;
boolean match = false;
// col filter check
if (colFilter != null && colFilter.size() > 0) {
for (String col : colFilter) {
if (col.equalsIgnoreCase(columnName) || "*".equalsIgnoreCase(col)) {
match = true;
} }
}
} else {
match = true;
}
// old col // old col
if (oldColCount != 0) { if (oldColCount != 0) {
wrapByteBuf.readerIndex( wrapByteBuf.readerIndex(
...@@ -440,10 +414,12 @@ public class LogMessage extends DataMessage.Record { ...@@ -440,10 +414,12 @@ public class LogMessage extends DataMessage.Record {
+ (int) oldColsOffset, + (int) oldColsOffset,
nextOldColOffset - currentOldColOffset - 1); nextOldColOffset - currentOldColOffset - 1);
} }
Field field = new Field(columnName, type, realEncoding, value, isPk); if (fieldParseListener.needSchemaInfo()) {
field.setFlag(flag); fieldParseListener.parseNotify(
field.setPrev(true); columnName, type, realEncoding, value, notNull, true);
prev = field; } else {
fieldParseListener.parseNotify(type, value, realEncoding, true);
}
currentOldColOffset = nextOldColOffset; currentOldColOffset = nextOldColOffset;
} }
// new col // new col
...@@ -469,14 +445,13 @@ public class LogMessage extends DataMessage.Record { ...@@ -469,14 +445,13 @@ public class LogMessage extends DataMessage.Record {
+ (int) newColsOffset, + (int) newColsOffset,
nextNewColOffset - currentNewColOffset - 1); nextNewColOffset - currentNewColOffset - 1);
} }
Field field = new Field(columnName, type, realEncoding, value, isPk); if (fieldParseListener.needSchemaInfo()) {
field.setFlag(flag); fieldParseListener.parseNotify(
field.setPrev(false); columnName, type, realEncoding, value, notNull, false);
next = field; } else {
currentNewColOffset = nextNewColOffset; fieldParseListener.parseNotify(type, value, realEncoding, false);
} }
if (match) { currentNewColOffset = nextNewColOffset;
fieldParseListener.parseNotify(prev, next);
} }
} }
} }
...@@ -491,7 +466,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -491,7 +466,7 @@ public class LogMessage extends DataMessage.Record {
|| newColsOffset < 0) { || newColsOffset < 0) {
return fields; return fields;
} }
LogTypeHelper logTypeHelper = LogTypeHelperFactory.getInstance(getDbType());
/* /*
* global encoding * global encoding
* *
...@@ -509,7 +484,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -509,7 +484,7 @@ public class LogMessage extends DataMessage.Record {
// pk info // pk info
List<Integer> pks = null; List<Integer> pks = null;
if ((int) pkKeysOffset > 0) { if ((int) pkKeysOffset > 0) {
pks = BinaryMessageUtils.getArray(byteBuf.array(), (int) pkKeysOffset); pks = (BinaryMessageUtils.getArray(byteBuf.array(), (int) pkKeysOffset));
} }
// get column count // get column count
ByteBuf wrapByteBuf = ByteBuf wrapByteBuf =
...@@ -520,7 +495,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -520,7 +495,7 @@ public class LogMessage extends DataMessage.Record {
// op type array // op type array
wrapByteBuf.readerIndex(PREFIX_LENGTH + (int) colTypesOffset); wrapByteBuf.readerIndex(PREFIX_LENGTH + (int) colTypesOffset);
byte t = wrapByteBuf.readByte(); byte t = wrapByteBuf.readByte();
int elementSize = elementArray[t & DataType.DT_MASK]; int elementSize = ELEMENT_ARRAY[t & DataType.DT_MASK];
// encoding // encoding
int colEncodingsCount = 0; int colEncodingsCount = 0;
int currentEncodingOffset = 0; int currentEncodingOffset = 0;
...@@ -589,6 +564,16 @@ public class LogMessage extends DataMessage.Record { ...@@ -589,6 +564,16 @@ public class LogMessage extends DataMessage.Record {
+ i * elementSize); + i * elementSize);
flag = wrapByteBuf.readUnsignedByte(); flag = wrapByteBuf.readUnsignedByte();
} }
boolean notNull = false;
if (colNotNullOffset > 0) {
wrapByteBuf.readerIndex(
PREFIX_LENGTH
+ (int) colNotNullOffset
+ BYTE_SIZE
+ INT_SIZE
+ i * elementSize);
notNull = wrapByteBuf.readBoolean();
}
// get real encoding // get real encoding
String realEncoding = encodingStr; String realEncoding = encodingStr;
...@@ -632,7 +617,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -632,7 +617,7 @@ public class LogMessage extends DataMessage.Record {
+ INT_SIZE + INT_SIZE
+ (i + 1) * INT_SIZE)); + (i + 1) * INT_SIZE));
int nextColNameOffset = (int) wrapByteBuf.readUnsignedInt(); int nextColNameOffset = (int) wrapByteBuf.readUnsignedInt();
ByteString ColNameByteString = ByteString colNameByteString =
new ByteString( new ByteString(
wrapByteBuf.array(), wrapByteBuf.array(),
PREFIX_LENGTH PREFIX_LENGTH
...@@ -642,7 +627,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -642,7 +627,7 @@ public class LogMessage extends DataMessage.Record {
+ (count + 1) * INT_SIZE + (count + 1) * INT_SIZE
+ (int) colNamesOffset, + (int) colNamesOffset,
nextColNameOffset - currentColNameOffset - 1); nextColNameOffset - currentColNameOffset - 1);
String columnName = ColNameByteString.toString(); String columnName = colNameByteString.toString();
currentColNameOffset = nextColNameOffset; currentColNameOffset = nextColNameOffset;
// old col // old col
if (oldColCount != 0) { if (oldColCount != 0) {
...@@ -669,6 +654,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -669,6 +654,7 @@ public class LogMessage extends DataMessage.Record {
} }
Field field = new Field(columnName, type, realEncoding, value, isPk); Field field = new Field(columnName, type, realEncoding, value, isPk);
field.setFlag(flag); field.setFlag(flag);
field.setNotNull(notNull);
fields.add(field); fields.add(field);
field.setPrev(true); field.setPrev(true);
currentOldColOffset = nextOldColOffset; currentOldColOffset = nextOldColOffset;
...@@ -698,6 +684,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -698,6 +684,7 @@ public class LogMessage extends DataMessage.Record {
} }
Field field = new Field(columnName, type, realEncoding, value, isPk); Field field = new Field(columnName, type, realEncoding, value, isPk);
field.setFlag(flag); field.setFlag(flag);
field.setNotNull(notNull);
fields.add(field); fields.add(field);
field.setPrev(false); field.setPrev(false);
currentNewColOffset = nextNewColOffset; currentNewColOffset = nextNewColOffset;
...@@ -800,9 +787,8 @@ public class LogMessage extends DataMessage.Record { ...@@ -800,9 +787,8 @@ public class LogMessage extends DataMessage.Record {
colNamesOffset = byteBuf.readInt(); colNamesOffset = byteBuf.readInt();
colTypesOffset = byteBuf.readInt(); colTypesOffset = byteBuf.readInt();
// process old version
if (!old) { if (!old) {
m_pkValOffset = byteBuf.readInt(); pkValOffset = byteBuf.readInt();
fileNameOffset = byteBuf.readLong(); fileNameOffset = byteBuf.readLong();
fileOffset = byteBuf.readLong(); fileOffset = byteBuf.readLong();
if (fileNameOffset < -1 || fileOffset < -1) { if (fileNameOffset < -1 || fileOffset < -1) {
...@@ -816,12 +802,11 @@ public class LogMessage extends DataMessage.Record { ...@@ -816,12 +802,11 @@ public class LogMessage extends DataMessage.Record {
oldColsOffset = byteBuf.readInt(); oldColsOffset = byteBuf.readInt();
newColsOffset = byteBuf.readInt(); newColsOffset = byteBuf.readInt();
} else { } else {
// process new version
fileNameOffset = byteBuf.readInt(); fileNameOffset = byteBuf.readInt();
fileOffset = byteBuf.readInt(); fileOffset = byteBuf.readInt();
oldColsOffset = byteBuf.readInt(); oldColsOffset = byteBuf.readInt();
newColsOffset = byteBuf.readInt(); newColsOffset = byteBuf.readInt();
m_pkValOffset = byteBuf.readInt(); pkValOffset = byteBuf.readInt();
} }
pkKeysOffset = byteBuf.readInt(); pkKeysOffset = byteBuf.readInt();
...@@ -836,16 +821,20 @@ public class LogMessage extends DataMessage.Record { ...@@ -836,16 +821,20 @@ public class LogMessage extends DataMessage.Record {
tailOffset = byteBuf.readInt(); tailOffset = byteBuf.readInt();
long version = id >> 56; long version = id >> 56;
if (version == 1 || version == 2) { if (version >= 1) {
metaVersion = byteBuf.readInt(); metaVersion = byteBuf.readInt();
colFlagOffset = byteBuf.readInt(); colFlagOffset = byteBuf.readInt();
} }
if (version >= 2) {
colNotNullOffset = byteBuf.readInt();
}
} }
// timestamp,process heartbeat between tx // timestamp,process heartbeat between tx
Type type = Type.valueOf(op); Type type = Type.valueOf(op);
String ts = Long.toString(timestamp); String ts = Long.toString(timestamp);
if (getDbType() == DBType.OCEANBASE1) { DbTypeEnum dbTypeEnum = getDbType();
if (dbTypeEnum == DbTypeEnum.OB_MYSQL || dbTypeEnum == DbTypeEnum.OB_ORACLE) {
globalSafeTimestamp.set(String.valueOf(fileNameOffset)); globalSafeTimestamp.set(String.valueOf(fileNameOffset));
} else { } else {
if (type == Type.BEGIN) { if (type == Type.BEGIN) {
...@@ -860,7 +849,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -860,7 +849,7 @@ public class LogMessage extends DataMessage.Record {
txEnd.set(true); txEnd.set(true);
} }
} }
safeTimestamp = new String(globalSafeTimestamp.get()); safeTimestamp = globalSafeTimestamp.get();
if (isCheckCRC) { if (isCheckCRC) {
checkCRC(); checkCRC();
} }
...@@ -1006,7 +995,7 @@ public class LogMessage extends DataMessage.Record { ...@@ -1006,7 +995,7 @@ public class LogMessage extends DataMessage.Record {
// get key str // get key str
keysValue = new HashSet<String>(); keysValue = new HashSet<String>();
List<ByteString> keys = List<ByteString> keys =
BinaryMessageUtils.getByteStringList(byteBuf.array(), (int) m_pkValOffset); BinaryMessageUtils.getByteStringList(byteBuf.array(), (int) pkValOffset);
if (keys == null || keys.size() == 0) { if (keys == null || keys.size() == 0) {
return null; return null;
} }
...@@ -1025,9 +1014,8 @@ public class LogMessage extends DataMessage.Record { ...@@ -1025,9 +1014,8 @@ public class LogMessage extends DataMessage.Record {
case UPDATE: case UPDATE:
case INDEX_UPDATE: case INDEX_UPDATE:
switch (getDbType()) { switch (getDbType()) {
case ORACLE: case OB_MYSQL:
case MYSQL: case OB_ORACLE:
case OCEANBASE1:
prev.addAll(getKeys((int) oldColsOffset, keys)); prev.addAll(getKeys((int) oldColsOffset, keys));
next.addAll(getKeys((int) newColsOffset, keys)); next.addAll(getKeys((int) newColsOffset, keys));
if (!prev.equals(next)) { if (!prev.equals(next)) {
...@@ -1122,9 +1110,9 @@ public class LogMessage extends DataMessage.Record { ...@@ -1122,9 +1110,9 @@ public class LogMessage extends DataMessage.Record {
public List<int[]> getPrimaryAndUniqueConstraintColumnIndexTuples() { public List<int[]> getPrimaryAndUniqueConstraintColumnIndexTuples() {
List<int[]> tuples = new ArrayList<int[]>(); List<int[]> tuples = new ArrayList<int[]>();
try { try {
if ((int) m_pkValOffset > 0) { if ((int) pkValOffset > 0) {
List<ByteString> rawConstraintByteString = List<ByteString> rawConstraintByteString =
BinaryMessageUtils.getByteStringList(byteBuf.array(), m_pkValOffset); BinaryMessageUtils.getByteStringList(byteBuf.array(), pkValOffset);
if (rawConstraintByteString != null && !rawConstraintByteString.isEmpty()) { if (rawConstraintByteString != null && !rawConstraintByteString.isEmpty()) {
/** /**
* The raw format is "(0,1),(2,3)" or "(", the last one is for empty primary or * The raw format is "(0,1),(2,3)" or "(", the last one is for empty primary or
...@@ -1180,10 +1168,10 @@ public class LogMessage extends DataMessage.Record { ...@@ -1180,10 +1168,10 @@ public class LogMessage extends DataMessage.Record {
@Override @Override
public List<ByteString> getFirstPKValue() { public List<ByteString> getFirstPKValue() {
try { try {
if ((int) m_pkValOffset < 0) { if ((int) pkValOffset < 0) {
return null; return null;
} else { } else {
return BinaryMessageUtils.getByteStringList(byteBuf.array(), m_pkValOffset); return BinaryMessageUtils.getByteStringList(byteBuf.array(), pkValOffset);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -13,62 +13,62 @@ package com.oceanbase.oms.logmessage.typehelper; ...@@ -13,62 +13,62 @@ package com.oceanbase.oms.logmessage.typehelper;
// compatible with mysql type code // compatible with mysql type code
// same code may reference different schema type // same code may reference different schema type
public class LogMessageTypeCode { public class LogMessageTypeCode {
public static final int DRC_MSG_TYPE_DECIMAL = 0; public static final int LOG_MSG_TYPE_DECIMAL = 0;
public static final int DRC_MSG_TYPE_TINY = 1; public static final int LOG_MSG_TYPE_TINY = 1;
public static final int DRC_MSG_TYPE_SHORT = 2; public static final int LOG_MSG_TYPE_SHORT = 2;
public static final int DRC_MSG_TYPE_LONG = 3; public static final int LOG_MSG_TYPE_LONG = 3;
public static final int DRC_MSG_TYPE_FLOAT = 4; public static final int LOG_MSG_TYPE_FLOAT = 4;
public static final int DRC_MSG_TYPE_DOUBLE = 5; public static final int LOG_MSG_TYPE_DOUBLE = 5;
public static final int DRC_MSG_TYPE_NULL = 6; public static final int LOG_MSG_TYPE_NULL = 6;
public static final int DRC_MSG_TYPE_TIMESTAMP = 7; public static final int LOG_MSG_TYPE_TIMESTAMP = 7;
public static final int DRC_MSG_TYPE_LONGLONG = 8; public static final int LOG_MSG_TYPE_LONGLONG = 8;
public static final int DRC_MSG_TYPE_INT24 = 9; public static final int LOG_MSG_TYPE_INT24 = 9;
public static final int DRC_MSG_TYPE_DATE = 10; public static final int LOG_MSG_TYPE_DATE = 10;
public static final int DRC_MSG_TYPE_TIME = 11; public static final int LOG_MSG_TYPE_TIME = 11;
public static final int DRC_MSG_TYPE_DATETIME = 12; public static final int LOG_MSG_TYPE_DATETIME = 12;
public static final int DRC_MSG_TYPE_YEAR = 13; public static final int LOG_MSG_TYPE_YEAR = 13;
public static final int DRC_MSG_TYPE_NEWDATE = 14; public static final int LOG_MSG_TYPE_NEWDATE = 14;
public static final int DRC_MSG_TYPE_VARCHAR = 15; public static final int LOG_MSG_TYPE_VARCHAR = 15;
public static final int DRC_MSG_TYPE_BIT = 16; public static final int LOG_MSG_TYPE_BIT = 16;
public static final int DRC_MSG_TYPE_TIMESTAMP2 = 17; public static final int LOG_MSG_TYPE_TIMESTAMP2 = 17;
public static final int DRC_MSG_TYPE_DATETIME2 = 18; public static final int LOG_MSG_TYPE_DATETIME2 = 18;
public static final int DRC_MSG_TYPE_TIME2 = 19; public static final int LOG_MSG_TYPE_TIME2 = 19;
// appeared in ob define, but should not appeared in drc types // appeared in ob define, but should not appeared in drc types
public static final int DRC_MSG_COMPLEX = 160; public static final int LOG_MSG_COMPLEX = 160;
public static final int DRC_MSG_TYPE_ARRAY = 161; public static final int LOG_MSG_TYPE_ARRAY = 161;
public static final int DRC_MSG_TYPE_STRUCT = 162; public static final int LOG_MSG_TYPE_STRUCT = 162;
public static final int DRC_MSG_TYPE_CURSOR = 163; public static final int LOG_MSG_TYPE_CURSOR = 163;
public static final int DRC_MSG_TYPE_ORA_BLOB = 210; public static final int LOG_MSG_TYPE_ORA_BLOB = 210;
public static final int DRC_MSG_TYPE_CLOB = 211; public static final int LOG_MSG_TYPE_CLOB = 211;
public static final int DRC_MSG_TYPE_TEXT = 197; public static final int LOG_MSG_TYPE_TEXT = 197;
public static final int DRC_MSG_TYPE_VAR_BINARY = 198; public static final int LOG_MSG_TYPE_VAR_BINARY = 198;
public static final int DRC_MSG_TYPE_BINARY = 199; public static final int LOG_MSG_TYPE_BINARY = 199;
public static final int DRC_MSG_TYPE_TIMESTAMP_WITH_TIME_ZONE = 200; public static final int LOG_MSG_TYPE_TIMESTAMP_WITH_TIME_ZONE = 200;
public static final int DRC_MSG_TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = 201; public static final int LOG_MSG_TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = 201;
public static final int DRC_MSG_TYPE_TIMESTAMP_NANO = 202; public static final int LOG_MSG_TYPE_TIMESTAMP_NANO = 202;
public static final int DRC_MSG_TYPE_RAW = 203; public static final int LOG_MSG_TYPE_RAW = 203;
public static final int DRC_MSG_TYPE_INTERVAL_YEAR_TO_MONTH = 204; public static final int LOG_MSG_TYPE_INTERVAL_YEAR_TO_MONTH = 204;
public static final int DRC_MSG_TYPE_INTERVAL_DAY_TO_SECOND = 205; public static final int LOG_MSG_TYPE_INTERVAL_DAY_TO_SECOND = 205;
public static final int DRC_MSG_TYPE_NUMBER_FLOAT = 206; public static final int LOG_MSG_TYPE_NUMBER_FLOAT = 206;
public static final int DRC_MSG_TYPE_NVARCHAR2 = 207; public static final int LOG_MSG_TYPE_NVARCHAR2 = 207;
public static final int DRC_MSG_TYPE_NCHAR = 208; public static final int LOG_MSG_TYPE_NCHAR = 208;
public static final int DRC_MSG_TYPE_ROW_ID = 209; public static final int LOG_MSG_TYPE_ROW_ID = 209;
public static final int DRC_MSG_TYPE_JSON = 245; public static final int LOG_MSG_TYPE_JSON = 245;
public static final int DRC_MSG_TYPE_NEWDECIMAL = 246; public static final int LOG_MSG_TYPE_NEWDECIMAL = 246;
public static final int DRC_MSG_TYPE_ENUM = 247; public static final int LOG_MSG_TYPE_ENUM = 247;
public static final int DRC_MSG_TYPE_SET = 248; public static final int LOG_MSG_TYPE_SET = 248;
public static final int DRC_MSG_TYPE_TINY_BLOB = 249; public static final int LOG_MSG_TYPE_TINY_BLOB = 249;
public static final int DRC_MSG_TYPE_MEDIUM_BLOB = 250; public static final int LOG_MSG_TYPE_MEDIUM_BLOB = 250;
public static final int DRC_MSG_TYPE_LONG_BLOB = 251; public static final int LOG_MSG_TYPE_LONG_BLOB = 251;
public static final int DRC_MSG_TYPE_BLOB = 252; public static final int LOG_MSG_TYPE_BLOB = 252;
public static final int DRC_MSG_TYPE_VAR_STRING = 253; public static final int LOG_MSG_TYPE_VAR_STRING = 253;
public static final int DRC_MSG_TYPE_STRING = 254; public static final int LOG_MSG_TYPE_STRING = 254;
public static final int DRC_MSG_TYPE_GEOMETRY = 255; public static final int LOG_MSG_TYPE_GEOMETRY = 255;
public static final int DRC_MSG_TYPE_ORA_BINARY_FLOAT = 256; public static final int LOG_MSG_TYPE_ORA_BINARY_FLOAT = 256;
public static final int DRC_MSG_TYPE_ORA_BINARY_DOUBLE = 257; public static final int LOG_MSG_TYPE_ORA_BINARY_DOUBLE = 257;
public static final int DRC_MSG_TYPE_UNKNOWN = DRC_MSG_TYPE_ORA_BINARY_DOUBLE + 1; public static final int LOG_MSG_TYPE_UNKNOWN = LOG_MSG_TYPE_ORA_BINARY_DOUBLE + 1;
} }
...@@ -11,8 +11,8 @@ See the Mulan PSL v2 for more details. */ ...@@ -11,8 +11,8 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper; package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.enums.DBType;
public abstract class LogTypeHelper { public abstract class LogTypeHelper {
...@@ -20,13 +20,13 @@ public abstract class LogTypeHelper { ...@@ -20,13 +20,13 @@ public abstract class LogTypeHelper {
public static final String EMPTY_ENCODING_STR = ""; public static final String EMPTY_ENCODING_STR = "";
protected final DBType dbType; protected final DbTypeEnum dbType;
public LogTypeHelper(DBType dbType) { public LogTypeHelper(DbTypeEnum dbType) {
this.dbType = dbType; this.dbType = dbType;
} }
public DBType getDbType() { public DbTypeEnum getDbType() {
return dbType; return dbType;
} }
......
...@@ -11,17 +11,18 @@ See the Mulan PSL v2 for more details. */ ...@@ -11,17 +11,18 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper; package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.logmessage.enums.DBType; import com.oceanbase.oms.common.enums.DbTypeEnum;
public abstract class LogTypeHelperFactory { public abstract class LogTypeHelperFactory {
public static LogTypeHelper getInstance(DBType dbType) { public static LogTypeHelper getInstance(DbTypeEnum dbType) {
switch (dbType) { switch (dbType) {
case OCEANBASE: case OB_MYSQL:
case OCEANBASE1: case OB_ORACLE:
case OB_05:
return OBLogTypeHelper.OB_LOG_TYPE_HELPER; return OBLogTypeHelper.OB_LOG_TYPE_HELPER;
default: default:
throw new RuntimeException("unsupported dbtype"); throw new IllegalArgumentException("Unsupported dbType " + dbType);
} }
} }
} }
...@@ -11,8 +11,8 @@ See the Mulan PSL v2 for more details. */ ...@@ -11,8 +11,8 @@ See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.typehelper; package com.oceanbase.oms.logmessage.typehelper;
import com.oceanbase.oms.common.enums.DbTypeEnum;
import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.enums.DBType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
public class OBLogTypeHelper extends LogTypeHelper { public class OBLogTypeHelper extends LogTypeHelper {
...@@ -21,14 +21,14 @@ public class OBLogTypeHelper extends LogTypeHelper { ...@@ -21,14 +21,14 @@ public class OBLogTypeHelper extends LogTypeHelper {
private static final String DEFAULT_ENCODING = ""; private static final String DEFAULT_ENCODING = "";
public OBLogTypeHelper() { public OBLogTypeHelper() {
super(DBType.OCEANBASE1); super(DbTypeEnum.OB_MYSQL);
} }
@Override @Override
public String correctEncoding(int typeCode, String realEncoding) { public String correctEncoding(int typeCode, String realEncoding) {
switch (typeCode) { switch (typeCode) {
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING:
case LogMessageTypeCode.DRC_MSG_TYPE_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_STRING:
return realEncoding; return realEncoding;
default: default:
if (StringUtils.equals(realEncoding, "binary")) { if (StringUtils.equals(realEncoding, "binary")) {
...@@ -42,23 +42,23 @@ public class OBLogTypeHelper extends LogTypeHelper { ...@@ -42,23 +42,23 @@ public class OBLogTypeHelper extends LogTypeHelper {
@Override @Override
public int correctCode(int typeCode, String encoding) { public int correctCode(int typeCode, String encoding) {
switch (typeCode) { switch (typeCode) {
case LogMessageTypeCode.DRC_MSG_TYPE_TINY_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_MEDIUM_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_MEDIUM_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_LONG_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_LONG_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
if (!StringUtils.isEmpty(encoding) && !StringUtils.equals(encoding, "binary")) { if (!StringUtils.isEmpty(encoding) && !StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_CLOB; return LogMessageTypeCode.LOG_MSG_TYPE_CLOB;
} }
break; break;
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING:
if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) { if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_VAR_BINARY; return LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY;
} else { } else {
return LogMessageTypeCode.DRC_MSG_TYPE_VARCHAR; return LogMessageTypeCode.LOG_MSG_TYPE_VARCHAR;
} }
case LogMessageTypeCode.DRC_MSG_TYPE_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_STRING:
if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) { if (StringUtils.isEmpty(encoding) || StringUtils.equals(encoding, "binary")) {
return LogMessageTypeCode.DRC_MSG_TYPE_BINARY; return LogMessageTypeCode.LOG_MSG_TYPE_BINARY;
} }
break; break;
} }
...@@ -68,24 +68,24 @@ public class OBLogTypeHelper extends LogTypeHelper { ...@@ -68,24 +68,24 @@ public class OBLogTypeHelper extends LogTypeHelper {
@Override @Override
public void correctField(DataMessage.Record.Field f, String realEncoding) { public void correctField(DataMessage.Record.Field f, String realEncoding) {
switch (f.type) { switch (f.type) {
case LogMessageTypeCode.DRC_MSG_TYPE_TINY_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_TINY_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_MEDIUM_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_MEDIUM_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_LONG_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_LONG_BLOB:
case LogMessageTypeCode.DRC_MSG_TYPE_BLOB: case LogMessageTypeCode.LOG_MSG_TYPE_BLOB:
if (!StringUtils.isEmpty(f.encoding) && !StringUtils.equals(f.encoding, "binary")) { if (!StringUtils.isEmpty(f.encoding) && !StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_CLOB; f.type = LogMessageTypeCode.LOG_MSG_TYPE_CLOB;
} }
break; break;
case LogMessageTypeCode.DRC_MSG_TYPE_VAR_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_VAR_STRING:
if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) { if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_VAR_BINARY; f.type = LogMessageTypeCode.LOG_MSG_TYPE_VAR_BINARY;
} else { } else {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_VARCHAR; f.type = LogMessageTypeCode.LOG_MSG_TYPE_VARCHAR;
} }
break; break;
case LogMessageTypeCode.DRC_MSG_TYPE_STRING: case LogMessageTypeCode.LOG_MSG_TYPE_STRING:
if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) { if (StringUtils.isEmpty(f.encoding) || StringUtils.equals(f.encoding, "binary")) {
f.type = LogMessageTypeCode.DRC_MSG_TYPE_BINARY; f.type = LogMessageTypeCode.LOG_MSG_TYPE_BINARY;
} }
break; break;
default: default:
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
oblogclient is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
package com.oceanbase.oms.logmessage.utils;
import java.util.ArrayList;
import java.util.List;
/** Utils class for string. */
public class StringUtils {
/**
* Split a string by one separator character. The performance is better than Java String split.
*
* @param str The string need be split.
* @param separatorChar The single separator character.
* @return The array of split items.
*/
public static String[] split(String str, char separatorChar) {
if (str == null) {
return null;
}
int length = str.length();
if (length == 0) {
return null;
}
List<String> list = new ArrayList<String>();
int i = 0;
int start = 0;
boolean match = false;
while (i < length) {
if (str.charAt(i) == separatorChar) {
if (match) {
list.add(str.substring(start, i));
match = false;
}
start = ++i;
continue;
}
match = true;
i++;
}
if (match) {
list.add(str.substring(start, i));
}
return list.toArray(new String[list.size()]);
}
}
...@@ -114,7 +114,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -114,7 +114,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
} }
@Override @Override
public Map<String, String> generateConfigurationMap(boolean encrypt_password) { public Map<String, String> generateConfigurationMap(boolean encryptPassword) {
Map<String, String> result = new HashMap<>(); Map<String, String> result = new HashMap<>();
for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) { for (Map.Entry<String, ConfigItem<Object>> entry : configs.entrySet()) {
String value = entry.getValue().val.toString(); String value = entry.getValue().val.toString();
...@@ -123,7 +123,7 @@ public class ObReaderConfig extends AbstractConnectionConfig { ...@@ -123,7 +123,7 @@ public class ObReaderConfig extends AbstractConnectionConfig {
if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) { if (clusterUrl.key.equals(entry.getKey()) && StringUtils.isEmpty(value)) {
continue; continue;
} }
if (encrypt_password if (encryptPassword
&& clusterPassword.key.equals(entry.getKey()) && clusterPassword.key.equals(entry.getKey())
&& SharedConf.AUTH_PASSWORD_HASH) { && SharedConf.AUTH_PASSWORD_HASH) {
value = Hex.str(CryptoUtil.sha1(value)); value = Hex.str(CryptoUtil.sha1(value));
......
...@@ -259,7 +259,7 @@ See the Mulan PSL v2 for more details. ...@@ -259,7 +259,7 @@ See the Mulan PSL v2 for more details.
<id>spotless-check</id> <id>spotless-check</id>
<phase>validate</phase> <phase>validate</phase>
<goals> <goals>
<goal>check</goal> <goal>apply</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册