未验证 提交 cb2c4942 编写于 作者: 杨翊 SionYang 提交者: GitHub

Refactor RowsEventPacket to database/protocol module (#4797)

* Add MySQLBinlogProtocolValueFactory

* Use MySQLNullBitmap replace MySQLBinlogBitmap

* Refactor RowsEventPacket to database/protocol module
上级 f58f74dd
......@@ -36,11 +36,12 @@ import java.util.List;
*
* <p>
* https://dev.mysql.com/doc/internals/en/rows-event.html
*
* Refactor by extends {@link org.apache.shardingsphere.database.protocol.mysql.packet.command.MySQLCommandPacket}.
* </p>
*
* @deprecated Replaced with {@link org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.MySQLBinlogRowsEventPacket}
*/
@Getter
@Deprecated
public final class RowsEventPacket {
private final BinlogEventHeader binlogEventHeader;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row;
import java.util.BitSet;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
/**
* MySQL binlog bitmap.
*
* @see <a href="https://dev.mysql.com/doc/internals/en/bitmaps.html">Bitmaps</a>
*/
public final class MySQLBinlogBitmap {
private final BitSet bitSet;
public MySQLBinlogBitmap(final int length, final MySQLPacketPayload payload) {
this.bitSet = new BitSet(length);
fillBitmap(length, payload);
}
private void fillBitmap(final int length, final MySQLPacketPayload payload) {
for (int bit = 0; bit < length; bit += 8) {
int flag = payload.readInt1();
if (0 != flag) {
for (int i = 0; i < 8; i++) {
if (0 != (flag & (0x01 << i))) {
bitSet.set(bit + i);
}
}
}
}
}
/**
* Whether contain bit of index in bitmap.
*
* @param index index of bit
* @return {@code true} if contain bit, otherwise {@code false}
*/
public boolean containBit(final int index) {
return bitSet.get(index);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLBinlogEventType;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.AbstractMySQLBinlogEventPacket;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.MySQLBinlogProtocolValueFactory;
import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.binary.execute.MySQLNullBitmap;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import lombok.Getter;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
/**
* MySQL binlog rows event packet.
*
* @see <a href="https://dev.mysql.com/doc/internals/en/rows-event.html">ROWS_EVENT</a>
*/
@Getter
public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPacket {
private final long tableId;
private final int flags;
private final int columnNumber;
private final MySQLNullBitmap columnsPresentBitmap;
private final MySQLNullBitmap columnsPresentBitmap2;
private final List<Serializable[]> rows = new LinkedList<>();
private final List<Serializable[]> rows2 = new LinkedList<>();
public MySQLBinlogRowsEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
super(binlogEventHeader);
tableId = payload.readInt6();
flags = payload.readInt2();
skipExtraData(payload);
columnNumber = (int) payload.readIntLenenc();
columnsPresentBitmap = new MySQLNullBitmap(columnNumber, payload);
columnsPresentBitmap2 = readUpdateColumnsPresentBitmap(payload);
}
private void skipExtraData(final MySQLPacketPayload payload) {
if (isRowsEventVersion2(getBinlogEventHeader().getEventType())) {
int extraDataLength = payload.readInt2();
payload.skipReserved(extraDataLength);
}
}
private boolean isRowsEventVersion2(final int eventType) {
return MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue() == eventType
|| MySQLBinlogEventType.DELETE_ROWS_EVENTv2.getValue() == eventType;
}
private MySQLNullBitmap readUpdateColumnsPresentBitmap(final MySQLPacketPayload payload) {
return isUpdateRowsEvent(getBinlogEventHeader().getEventType()) ? new MySQLNullBitmap(columnNumber, payload) : null;
}
private boolean isUpdateRowsEvent(final int eventType) {
return MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENTv1.getValue() == eventType;
}
/**
* Read rows in binlog.
*
* @param tableMapEventPacket TABLE_MAP_EVENT packet before this ROWS_EVENT
* @param payload ROWS_EVENT packet payload
*/
public void readRows(final MySQLBinlogTableMapEventPacket tableMapEventPacket, final MySQLPacketPayload payload) {
List<MySQLBinlogColumnDef> columnDefs = tableMapEventPacket.getColumnDefs();
while (hasNextRow(payload)) {
rows.add(readRow(columnDefs, payload));
if (isUpdateRowsEvent(getBinlogEventHeader().getEventType())) {
rows2.add(readRow(columnDefs, payload));
}
}
}
private boolean hasNextRow(final MySQLPacketPayload payload) {
return payload.getByteBuf().isReadable();
}
private Serializable[] readRow(final List<MySQLBinlogColumnDef> columnDefs, final MySQLPacketPayload payload) {
MySQLNullBitmap nullBitmap = new MySQLNullBitmap(columnNumber, payload);
Serializable[] result = new Serializable[columnNumber];
for (int i = 0; i < columnNumber; i++) {
MySQLBinlogColumnDef columnDef = columnDefs.get(i);
result[i] = nullBitmap.isNullParameter(i) ? null : MySQLBinlogProtocolValueFactory.getBinlogProtocolValue(columnDef.getColumnType()).read(columnDef, payload);
}
return result;
}
@Override
protected void writeEvent(final MySQLPacketPayload payload) {
// TODO
}
}
......@@ -21,12 +21,12 @@ import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLColumnTyp
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.AbstractMySQLBinlogEventPacket;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.binary.execute.MySQLNullBitmap;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import lombok.Getter;
import java.util.Collection;
import java.util.LinkedList;
import lombok.Getter;
import java.util.List;
/**
* MySQL binlog table map event packet.
......@@ -46,11 +46,11 @@ public final class MySQLBinlogTableMapEventPacket extends AbstractMySQLBinlogEve
private final int columnCount;
private final Collection<MySQLBinlogColumnDef> columnDefs;
private final List<MySQLBinlogColumnDef> columnDefs;
private final MySQLBinlogBitmap nullBitMap;
private final MySQLNullBitmap nullBitMap;
protected MySQLBinlogTableMapEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
public MySQLBinlogTableMapEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
super(binlogEventHeader);
this.tableId = payload.readInt6();
this.flags = payload.readInt2();
......@@ -62,7 +62,7 @@ public final class MySQLBinlogTableMapEventPacket extends AbstractMySQLBinlogEve
columnDefs = new LinkedList<>();
readColumnDefs(payload);
readColumnMetaDefs(payload);
nullBitMap = new MySQLBinlogBitmap(columnCount, payload);
nullBitMap = new MySQLNullBitmap(columnCount, payload);
}
private void readColumnDefs(final MySQLPacketPayload payload) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value;
import com.google.common.base.Preconditions;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.blob.MySQLBlobBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.decimal.MySQLDecimalBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.decimal.MySQLDoubleBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.decimal.MySQLFloatBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLInt24BinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLLongBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLLongLongBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLShortBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLTinyBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.string.MySQLStringBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.string.MySQLVarcharBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLDateBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLDatetime2BinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLDatetimeBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLTime2BinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLTimeBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLTimestamp2BinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLTimestampBinlogProtocolValue;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.time.MySQLYearBinlogProtocolValue;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* Binlog protocol value factory of MySQL.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MySQLBinlogProtocolValueFactory {
private static final Map<MySQLColumnType, MySQLBinlogProtocolValue> BINLOG_PROTOCOL_VALUES = new HashMap<>();
static {
registerIntegerTypeValue();
registerDecimalTypeValue();
registerTimeTypeValue();
registerStringTypeValue();
registerBlobTypeValue();
}
private static void registerIntegerTypeValue() {
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_TINY, new MySQLTinyBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_SHORT, new MySQLShortBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_INT24, new MySQLInt24BinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_LONG, new MySQLLongBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_LONGLONG, new MySQLLongLongBinlogProtocolValue());
}
private static void registerDecimalTypeValue() {
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_NEWDECIMAL, new MySQLDecimalBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_DOUBLE, new MySQLDoubleBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_FLOAT, new MySQLFloatBinlogProtocolValue());
}
private static void registerTimeTypeValue() {
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_YEAR, new MySQLYearBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_DATE, new MySQLDateBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_TIME, new MySQLTimeBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_TIME2, new MySQLTime2BinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_TIMESTAMP, new MySQLTimestampBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_TIMESTAMP2, new MySQLTimestamp2BinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_DATETIME, new MySQLDatetimeBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_DATETIME2, new MySQLDatetime2BinlogProtocolValue());
}
private static void registerStringTypeValue() {
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_STRING, new MySQLStringBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_VARCHAR, new MySQLVarcharBinlogProtocolValue());
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_VAR_STRING, new MySQLVarcharBinlogProtocolValue());
}
private static void registerBlobTypeValue() {
BINLOG_PROTOCOL_VALUES.put(MySQLColumnType.MYSQL_TYPE_BLOB, new MySQLBlobBinlogProtocolValue());
}
/**
* Get binlog protocol value.
*
* @param columnType column type
* @return binlog protocol value
*/
public static MySQLBinlogProtocolValue getBinlogProtocolValue(final MySQLColumnType columnType) {
Preconditions.checkArgument(BINLOG_PROTOCOL_VALUES.containsKey(columnType), "Cannot find MySQL type '%s' in column type when process binlog protocol value", columnType);
return BINLOG_PROTOCOL_VALUES.get(columnType);
}
}
......@@ -17,6 +17,8 @@
package org.apache.shardingsphere.database.protocol.mysql.packet.command.query.binary.execute;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import lombok.Getter;
/**
......@@ -36,6 +38,18 @@ public final class MySQLNullBitmap {
nullBitmap = new int[calculateLength(columnsNumbers, offset)];
}
public MySQLNullBitmap(final int columnNumbers, final MySQLPacketPayload payload) {
this.offset = 0;
nullBitmap = new int[calculateLength(columnNumbers, 0)];
fillBitmap(payload);
}
private void fillBitmap(final MySQLPacketPayload payload) {
for (int i = 0; i < nullBitmap.length; i++) {
nullBitmap[i] = payload.readInt1();
}
}
private int calculateLength(final int columnsNumbers, final int offset) {
return (columnsNumbers + offset + 7) / 8;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row;
import io.netty.buffer.ByteBuf;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLBinlogEventType;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLBinlogRowsEventPacketTest {
@Mock
private MySQLPacketPayload payload;
@Mock
private ByteBuf byteBuf;
@Mock
private MySQLBinlogEventHeader binlogEventHeader;
@Mock
private MySQLBinlogTableMapEventPacket tableMapEventPacket;
private List<MySQLBinlogColumnDef> columnDefs;
@Before
public void setUp() {
mockColumnDefs();
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
when(payload.readInt6()).thenReturn(1L);
when(payload.readInt2()).thenReturn(2);
when(payload.readIntLenenc()).thenReturn(1L);
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.isReadable()).thenReturn(true, false);
}
private void mockColumnDefs() {
columnDefs = new ArrayList<>();
columnDefs.add(new MySQLBinlogColumnDef(MySQLColumnType.MYSQL_TYPE_LONGLONG));
}
@Test
public void assertReadWriteRowV1WithoutNullValue() {
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV1BeforeRows(actual);
assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
assertNull(actual.getColumnsPresentBitmap2());
assertThat(actual.getRows().size(), is(1));
assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
assertTrue(actual.getRows2().isEmpty());
}
@Test
public void assertReadWriteRowV1WithNullValue() {
when(payload.readInt1()).thenReturn(0x01);
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV1BeforeRows(actual);
assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
assertNull(actual.getColumnsPresentBitmap2());
assertThat(actual.getRows().size(), is(1));
assertNull(actual.getRows().get(0)[0]);
assertTrue(actual.getRows2().isEmpty());
}
@Test
public void assertReadUpdateRowV1WithoutNullValue() {
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv1.getValue());
when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV1BeforeRows(actual);
assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
assertThat(actual.getRows().size(), is(1));
assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
assertThat(actual.getRows2().size(), is(1));
assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
}
private void assertBinlogRowsEventV1BeforeRows(final MySQLBinlogRowsEventPacket actual) {
assertThat(actual.getTableId(), is(1L));
assertThat(actual.getFlags(), is(2));
verify(payload, never()).skipReserved(2);
assertThat(actual.getColumnNumber(), is(1));
}
@Test
public void assertReadWriteRowV2WithoutNullValue() {
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV2BeforeRows(actual);
assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
assertNull(actual.getColumnsPresentBitmap2());
assertThat(actual.getRows().size(), is(1));
assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
assertTrue(actual.getRows2().isEmpty());
}
@Test
public void assertReadWriteRowV2WithNullValue() {
when(payload.readInt1()).thenReturn(0x01);
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV2BeforeRows(actual);
assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
assertNull(actual.getColumnsPresentBitmap2());
assertThat(actual.getRows().size(), is(1));
assertNull(actual.getRows().get(0)[0]);
assertTrue(actual.getRows2().isEmpty());
}
@Test
public void assertReadUpdateRowV2WithoutNullValue() {
when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue());
when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV2BeforeRows(actual);
assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
assertThat(actual.getRows().size(), is(1));
assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
assertThat(actual.getRows2().size(), is(1));
assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
}
private void assertBinlogRowsEventV2BeforeRows(final MySQLBinlogRowsEventPacket actual) {
assertThat(actual.getTableId(), is(1L));
assertThat(actual.getFlags(), is(2));
verify(payload).skipReserved(2);
assertThat(actual.getColumnNumber(), is(1));
}
}
......@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.binary.execute.MySQLNullBitmap;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -85,10 +86,10 @@ public final class MySQLBinlogTableMapEventPacketTest {
assertThat(actual.getColumnMeta(), is(columnMeta));
}
private void assertNullBitmap(final MySQLBinlogBitmap actualNullBitMap) {
assertFalse(actualNullBitMap.containBit(0));
assertTrue(actualNullBitMap.containBit(1));
assertTrue(actualNullBitMap.containBit(2));
assertTrue(actualNullBitMap.containBit(3));
private void assertNullBitmap(final MySQLNullBitmap actualNullBitMap) {
assertFalse(actualNullBitMap.isNullParameter(0));
assertTrue(actualNullBitMap.isNullParameter(1));
assertTrue(actualNullBitMap.isNullParameter(2));
assertTrue(actualNullBitMap.isNullParameter(3));
}
}
......@@ -15,33 +15,24 @@
* limitations under the License.
*/
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row;
package org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.database.protocol.mysql.packet.binlog.row.column.value.integer.MySQLTinyBinlogProtocolValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLBinlogBitmapTest {
@Mock
private MySQLPacketPayload payload;
public final class MySQLBinlogProtocolValueFactoryTest {
@Test
public void assertContainBit() {
when(payload.readInt1()).thenReturn(0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80);
MySQLBinlogBitmap actual = new MySQLBinlogBitmap(64, payload);
assertTrue(actual.containBit(0));
assertTrue(actual.containBit(9));
assertTrue(actual.containBit(63));
assertFalse(actual.containBit(1));
assertFalse(actual.containBit(62));
public void assertGetBinlogProtocolValue() {
assertThat(MySQLBinlogProtocolValueFactory.getBinlogProtocolValue(MySQLColumnType.MYSQL_TYPE_TINY), instanceOf(MySQLTinyBinlogProtocolValue.class));
}
@Test(expected = IllegalArgumentException.class)
public void assertGetBinlogProtocolValueFailure() {
MySQLBinlogProtocolValueFactory.getBinlogProtocolValue(MySQLColumnType.MYSQL_TYPE_GEOMETRY);
}
}
......@@ -17,15 +17,35 @@
package org.apache.shardingsphere.database.protocol.mysql.packet.command.query.binary.execute;
import org.apache.shardingsphere.database.protocol.mysql.payload.MySQLPacketPayload;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLNullBitmapTest {
@Mock
private MySQLPacketPayload payload;
@Test
public void assertNewMySQLNullBitmapFromPayload() {
when(payload.readInt1()).thenReturn(0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80);
MySQLNullBitmap actual = new MySQLNullBitmap(64, payload);
assertTrue(actual.isNullParameter(0));
assertTrue(actual.isNullParameter(9));
assertTrue(actual.isNullParameter(63));
assertFalse(actual.isNullParameter(1));
assertFalse(actual.isNullParameter(62));
}
@Test
public void assertGetNullBitmap() {
MySQLNullBitmap actual = new MySQLNullBitmap(8, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册