提交 4dcaa7b2 编写于 作者: K KomachiSion

Refactor MysqlCommandPacketDecoder to improve readability

上级 03ddad93
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
/**
* MySQL client/server protocol Authentication Method
*
* MySQL Internals Manual / MySQL Client/Server Protocol / Authentication Method / SHA256
* https://dev.mysql.com/doc/internals/en/sha256.html
*
* @author yangyi
*/
public final class AuthenticationMethod {
public static final String OLD_PASSWORD_AUTHENTICATION = "mysql_old_password";
public static final String SECURE_PASSWORD_AUTHENTICATION = "mysql_native_password";
public static final String CLEAR_TEXT_AUTHENTICATION = "mysql_clear_password";
public static final String WINDOWS_NATIVE_AUTHENTICATION = "authentication_windows_client";
public static final String SHA256 = "sha256_password";
}
......@@ -24,6 +24,5 @@ public class MysqlBinlogEventPacketDecoder extends ByteToMessageDecoder {
LOGGER.info(Byte.toString(eventHeader.getTypeCode()));
LOGGER.info("readable:{},length:{}", in.readableBytes(), eventHeader.getEventLength() - 19);
in.readBytes(eventHeader.getEventLength() - 19);
return;
}
}
......@@ -5,79 +5,96 @@ import info.avalon566.shardingscaling.sync.mysql.binlog.packet.response.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* MySQL Command Packet decoder
*
* @author avalon566
* @author yangyi
*/
@Slf4j
public class MysqlCommandPacketDecoder extends ByteToMessageDecoder {
private Logger LOGGER = LoggerFactory.getLogger(MysqlCommandPacketDecoder.class);
private enum States {OkOrError, FieldPacket, RowDataPacket}
private boolean initiated = false;
private States expectedState = States.OkOrError;
private enum States {Initiate, ResponsePacket, FieldPacket, RowDataPacket}
private States currentState = States.Initiate;
private InternalResultSet internalResultSet = null;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// first packet from server is handshake initialization packet
if (!initiated) {
var handshake = new HandshakeInitializationPacket();
handshake.fromByteBuf(in);
if (handshake.getProtocolVersion() != 0x0a) {
throw new UnsupportedOperationException();
}
if (!"mysql_native_password".equals(handshake.getAuthPluginName())) {
throw new UnsupportedOperationException();
}
out.add(handshake);
initiated = true;
if (States.Initiate.equals(currentState)) {
out.add(decodeHandshakeInitializationPacket(in));
currentState = States.ResponsePacket;
return;
}
if (States.FieldPacket.equals(expectedState)) {
if (-2 != in.getByte(0)) {
var fieldPacket = new FieldPacket();
fieldPacket.fromByteBuf(in);
internalResultSet.getFieldDescriptors().add(fieldPacket);
} else {
var eofPacket = new EofPacket();
eofPacket.fromByteBuf(in);
expectedState = States.RowDataPacket;
}
if (States.FieldPacket.equals(currentState)) {
decodeFieldPacket(in);
return;
}
if (States.RowDataPacket.equals(expectedState)) {
if (-2 != in.getByte(0)) {
var rowDataPacket = new RowDataPacket();
rowDataPacket.fromByteBuf(in);
internalResultSet.getFieldValues().add(rowDataPacket);
} else {
var eofPacket = new EofPacket();
eofPacket.fromByteBuf(in);
out.add(internalResultSet);
expectedState = States.OkOrError;
internalResultSet = null;
}
if (States.RowDataPacket.equals(currentState)) {
decodeRowDataPacket(in, out);
return;
}
if (-1 == in.getByte(0)) {
decodeResponsePacket(in, out);
}
private HandshakeInitializationPacket decodeHandshakeInitializationPacket(final ByteBuf in) {
var result = new HandshakeInitializationPacket();
result.fromByteBuf(in);
if (PacketConstants.PROTOCOL_VERSION != result.getProtocolVersion()) {
throw new UnsupportedOperationException();
}
if (!AuthenticationMethod.SECURE_PASSWORD_AUTHENTICATION.equals(result.getAuthPluginName())) {
throw new UnsupportedOperationException();
}
return result;
}
private void decodeFieldPacket(final ByteBuf in) {
if (PacketConstants.EOF_PACKET_MARK != in.getByte(0)) {
var fieldPacket = new FieldPacket();
fieldPacket.fromByteBuf(in);
internalResultSet.getFieldDescriptors().add(fieldPacket);
} else {
var eofPacket = new EofPacket();
eofPacket.fromByteBuf(in);
currentState = States.RowDataPacket;
}
}
private void decodeRowDataPacket(final ByteBuf in, final List<Object> out) {
if (PacketConstants.EOF_PACKET_MARK != in.getByte(0)) {
var rowDataPacket = new RowDataPacket();
rowDataPacket.fromByteBuf(in);
internalResultSet.getFieldValues().add(rowDataPacket);
} else {
var eofPacket = new EofPacket();
eofPacket.fromByteBuf(in);
out.add(internalResultSet);
currentState = States.ResponsePacket;
internalResultSet = null;
}
}
private void decodeResponsePacket(final ByteBuf in, final List<Object> out) {
if (PacketConstants.ERR_PACKET_MARK == in.getByte(0)) {
var error = new ErrorPacket();
error.fromByteBuf((ByteBuf) in);
error.fromByteBuf(in);
out.add(error);
} else if (0 == in.getByte(0)) {
} else if (PacketConstants.OK_PACKET_MARK == in.getByte(0)) {
var ok = new OkPacket();
ok.fromByteBuf(in);
out.add(ok);
} else {
var resultSetHeaderPacket = new ResultSetHeaderPacket();
resultSetHeaderPacket.fromByteBuf(in);
expectedState = States.FieldPacket;
currentState = States.FieldPacket;
internalResultSet = new InternalResultSet(resultSetHeaderPacket);
}
}
......
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
/**
* MySQL protocol constants.
*
* @author yangyi
*/
public final class PacketConstants {
public static final byte PROTOCOL_VERSION = 0x0a;
public static final byte OK_PACKET_MARK = 0x00;
public static final byte EOF_PACKET_MARK = (byte) 0xfe;
public static final byte ERR_PACKET_MARK = (byte) 0xff;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册