diff --git a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoder.java b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoder.java new file mode 100644 index 0000000000000000000000000000000000000000..cef9df8f8bd99eb23ef501e99db05f363ec0bc66 --- /dev/null +++ b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package info.avalon566.shardingscaling.sync.mysql.binlog.codec; + +import io.netty.buffer.ByteBuf; + +import java.io.Serializable; + +/** + * @author avalon566 + */ +public class BlobValueDecoder { + + public static Serializable decodeBlob(final int meta, final ByteBuf in) { + switch (meta) { + case 1: + return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt1(in), in); + case 2: + return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt2LE(in), in); + case 3: + return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt3LE(in), in); + case 4: + return DataTypesCodec.readBytes((int) DataTypesCodec.readUnsignedInt4LE(in), in); + default: + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodec.java b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodec.java index afe8b51c10d482391049d8421992442785e6fbc0..be20ac09c5d6d15c25fce7efcf4e0a7c3c547d47 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodec.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodec.java @@ -347,6 +347,19 @@ public final class DataTypesCodec { } } + /** + * Write big endian byte order n byte integer to {@code ByteBuf}. + * + * @param length length of n + * @param data the data + * @param out target byte buf + */ + public static void writeIntN(final int length, final long data, final ByteBuf out) { + for (int i = length - 1; i >= 0; i--) { + out.writeByte((byte) (data >> (8 * i))); + } + } + /** * Write little endian byte order 2 byte integer to {@code ByteBuf}. * @@ -367,6 +380,19 @@ public final class DataTypesCodec { out.writeIntLE(data); } + /** + * Write little endian byte order n byte integer to {@code ByteBuf}. + * + * @param length length of n + * @param data the data + * @param out target byte buf + */ + public static void writeIntNLE(final int length, final long data, final ByteBuf out) { + for (int i = 0; i < length; i++) { + out.writeByte((byte) (data >> (8 * i))); + } + } + /** * Write little endian byte order length coded integer to {@code ByteBuf}. * diff --git a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/RowsEvent.java b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/RowsEvent.java index 10168b2d3144a5672cbf2b43bebbbfad530e9365..bd042cfe8ef326bf7426bf519c1f46a1a5be1abf 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/RowsEvent.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/RowsEvent.java @@ -18,6 +18,7 @@ package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog; import info.avalon566.shardingscaling.sync.mysql.binlog.BinlogContext; +import info.avalon566.shardingscaling.sync.mysql.binlog.codec.BlobValueDecoder; import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec; import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DateAndTimeValueDecoder; import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DecimalValueDecoder; @@ -151,7 +152,7 @@ public class RowsEvent { case ColumnTypes.MYSQL_TYPE_YEAR: return DateAndTimeValueDecoder.decodeYear(columnDef.getMeta(), in); case ColumnTypes.MYSQL_TYPE_BLOB: - return decodeBlob(columnDef.getMeta(), in); + return BlobValueDecoder.decodeBlob(columnDef.getMeta(), in); case ColumnTypes.MYSQL_TYPE_VARCHAR: case ColumnTypes.MYSQL_TYPE_VAR_STRING: return decodeVarString(columnDef.getMeta(), in); @@ -203,21 +204,6 @@ public class RowsEvent { } } - private Serializable decodeBlob(final int meta, final ByteBuf in) { - switch (meta) { - case 1: - return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt1(in), in); - case 2: - return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt2LE(in), in); - case 3: - return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt3LE(in), in); - case 4: - return DataTypesCodec.readBytes((int) DataTypesCodec.readUnsignedInt4LE(in), in); - default: - throw new UnsupportedOperationException(); - } - } - private Serializable decodeVarString(final int meta, final ByteBuf in) { int length = 0; if (256 > meta) { diff --git a/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoderTest.java b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d28e9d60c69daaae6fd6a11a8f58925a9ee778ca --- /dev/null +++ b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/BlobValueDecoderTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package info.avalon566.shardingscaling.sync.mysql.binlog.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class BlobValueDecoderTest { + + @Test + public void assertDecodeBlob() { + byte[] value = new byte[(1 << 8) - 1]; + assertDecodeBlob(1, value, value); + value = new byte[(1 << 16) - 1]; + assertDecodeBlob(2, value, value); + value = new byte[(1 << 24) - 1]; + assertDecodeBlob(3, value, value); + value = new byte[(1 << 32) - 1]; + assertDecodeBlob(4, value, value); + } + + private void assertDecodeBlob(int meta, byte[] value, byte[] expect) { + ByteBuf byteBuf = Unpooled.buffer(); + DataTypesCodec.writeIntNLE(meta, value.length, byteBuf); + byteBuf.writeBytes(value); + byte[] actual = (byte[])BlobValueDecoder.decodeBlob(meta, byteBuf); + assertThat(actual.length, is(expect.length)); + } +} diff --git a/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodecTest.java b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodecTest.java index 8392af1020e5476237781f6e4f669c99433023fe..c424112d6980be33493ca5590b0e14b4683de9b7 100644 --- a/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodecTest.java +++ b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/DataTypesCodecTest.java @@ -18,6 +18,7 @@ package info.avalon566.shardingscaling.sync.mysql.binlog.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -240,6 +241,26 @@ public class DataTypesCodecTest { verify(byteBuf, times(4)).writeByte(0x00); } + @Test + public void assertWriteIntN() { + long value = 0xff; + long actual = writeIntN(1, value).readUnsignedByte(); + assertThat(actual, is(value)); + value = 0xff00; + actual = writeIntN(2, value).readUnsignedShort(); + assertThat(actual, is(value)); + value = 0x00ff; + actual = writeIntN(2, value).readUnsignedShort(); + assertThat(actual, is(value)); + assertThat(writeIntN(2, value).writerIndex(), is(2)); + } + + private ByteBuf writeIntN(final int length, final long value) { + ByteBuf byteBuf = Unpooled.buffer(); + DataTypesCodec.writeIntN(length, value, byteBuf); + return byteBuf; + } + @Test public void assertWriteInt2LE() { final short data = 0x00; @@ -254,6 +275,26 @@ public class DataTypesCodecTest { verify(byteBuf).writeIntLE(data); } + @Test + public void assertWriteIntNLE() { + long value = 0xff; + long actual = writeIntNLE(1, value).readUnsignedByte(); + assertThat(actual, is(value)); + value = 0xff00; + actual = writeIntNLE(2, value).readUnsignedShortLE(); + assertThat(actual, is(value)); + value = 0x00ff; + actual = writeIntNLE(2, value).readUnsignedShortLE(); + assertThat(actual, is(value)); + assertThat(writeIntNLE(2, value).writerIndex(), is(2)); + } + + private ByteBuf writeIntNLE(final int length, final long value) { + ByteBuf byteBuf = Unpooled.buffer(); + DataTypesCodec.writeIntNLE(length, value, byteBuf); + return byteBuf; + } + @Test public void assertWriteLengthCodedInt() { DataTypesCodec.writeLengthCodedInt(1, byteBuf);