提交 db06246f 编写于 作者: A avalon566

Refactore RowsEvent

上级 0c274619
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import io.netty.buffer.ByteBuf;
import java.io.Serializable;
/**
* @author avalon566
*/
public class BlobValueDecoder {
public static Serializable decodeBlob(final int meta, final ByteBuf in) {
switch (meta) {
case 1:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt1(in), in);
case 2:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt2LE(in), in);
case 3:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt3LE(in), in);
case 4:
return DataTypesCodec.readBytes((int) DataTypesCodec.readUnsignedInt4LE(in), in);
default:
throw new UnsupportedOperationException();
}
}
}
......@@ -347,6 +347,19 @@ public final class DataTypesCodec {
}
}
/**
* Write big endian byte order n byte integer to {@code ByteBuf}.
*
* @param length length of n
* @param data the data
* @param out target byte buf
*/
public static void writeIntN(final int length, final long data, final ByteBuf out) {
for (int i = length - 1; i >= 0; i--) {
out.writeByte((byte) (data >> (8 * i)));
}
}
/**
* Write little endian byte order 2 byte integer to {@code ByteBuf}.
*
......@@ -367,6 +380,19 @@ public final class DataTypesCodec {
out.writeIntLE(data);
}
/**
* Write little endian byte order n byte integer to {@code ByteBuf}.
*
* @param length length of n
* @param data the data
* @param out target byte buf
*/
public static void writeIntNLE(final int length, final long data, final ByteBuf out) {
for (int i = 0; i < length; i++) {
out.writeByte((byte) (data >> (8 * i)));
}
}
/**
* Write little endian byte order length coded integer to {@code ByteBuf}.
*
......
......@@ -18,6 +18,7 @@
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.BinlogContext;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.BlobValueDecoder;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DateAndTimeValueDecoder;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DecimalValueDecoder;
......@@ -151,7 +152,7 @@ public class RowsEvent {
case ColumnTypes.MYSQL_TYPE_YEAR:
return DateAndTimeValueDecoder.decodeYear(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_BLOB:
return decodeBlob(columnDef.getMeta(), in);
return BlobValueDecoder.decodeBlob(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_VARCHAR:
case ColumnTypes.MYSQL_TYPE_VAR_STRING:
return decodeVarString(columnDef.getMeta(), in);
......@@ -203,21 +204,6 @@ public class RowsEvent {
}
}
private Serializable decodeBlob(final int meta, final ByteBuf in) {
switch (meta) {
case 1:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt1(in), in);
case 2:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt2LE(in), in);
case 3:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt3LE(in), in);
case 4:
return DataTypesCodec.readBytes((int) DataTypesCodec.readUnsignedInt4LE(in), in);
default:
throw new UnsupportedOperationException();
}
}
private Serializable decodeVarString(final int meta, final ByteBuf in) {
int length = 0;
if (256 > meta) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class BlobValueDecoderTest {
@Test
public void assertDecodeBlob() {
byte[] value = new byte[(1 << 8) - 1];
assertDecodeBlob(1, value, value);
value = new byte[(1 << 16) - 1];
assertDecodeBlob(2, value, value);
value = new byte[(1 << 24) - 1];
assertDecodeBlob(3, value, value);
value = new byte[(1 << 32) - 1];
assertDecodeBlob(4, value, value);
}
private void assertDecodeBlob(int meta, byte[] value, byte[] expect) {
ByteBuf byteBuf = Unpooled.buffer();
DataTypesCodec.writeIntNLE(meta, value.length, byteBuf);
byteBuf.writeBytes(value);
byte[] actual = (byte[])BlobValueDecoder.decodeBlob(meta, byteBuf);
assertThat(actual.length, is(expect.length));
}
}
......@@ -18,6 +18,7 @@
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
......@@ -240,6 +241,26 @@ public class DataTypesCodecTest {
verify(byteBuf, times(4)).writeByte(0x00);
}
@Test
public void assertWriteIntN() {
long value = 0xff;
long actual = writeIntN(1, value).readUnsignedByte();
assertThat(actual, is(value));
value = 0xff00;
actual = writeIntN(2, value).readUnsignedShort();
assertThat(actual, is(value));
value = 0x00ff;
actual = writeIntN(2, value).readUnsignedShort();
assertThat(actual, is(value));
assertThat(writeIntN(2, value).writerIndex(), is(2));
}
private ByteBuf writeIntN(final int length, final long value) {
ByteBuf byteBuf = Unpooled.buffer();
DataTypesCodec.writeIntN(length, value, byteBuf);
return byteBuf;
}
@Test
public void assertWriteInt2LE() {
final short data = 0x00;
......@@ -254,6 +275,26 @@ public class DataTypesCodecTest {
verify(byteBuf).writeIntLE(data);
}
@Test
public void assertWriteIntNLE() {
long value = 0xff;
long actual = writeIntNLE(1, value).readUnsignedByte();
assertThat(actual, is(value));
value = 0xff00;
actual = writeIntNLE(2, value).readUnsignedShortLE();
assertThat(actual, is(value));
value = 0x00ff;
actual = writeIntNLE(2, value).readUnsignedShortLE();
assertThat(actual, is(value));
assertThat(writeIntNLE(2, value).writerIndex(), is(2));
}
private ByteBuf writeIntNLE(final int length, final long value) {
ByteBuf byteBuf = Unpooled.buffer();
DataTypesCodec.writeIntNLE(length, value, byteBuf);
return byteBuf;
}
@Test
public void assertWriteLengthCodedInt() {
DataTypesCodec.writeLengthCodedInt(1, byteBuf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册