提交 dd37d83f 编写于 作者: T terrymanu

abstract base component and mysql component

上级 7fabde2c
......@@ -17,6 +17,8 @@
package io.shardingjdbc.proxy;
import io.shardingjdbc.proxy.transport.ShardingProxy;
/**
* Sharding-Proxy Bootstrap.
*
......
......@@ -19,7 +19,6 @@ package io.shardingjdbc.proxy.backend;
import io.shardingjdbc.core.api.ShardingDataSourceFactory;
import io.shardingjdbc.core.exception.ShardingJdbcException;
import io.shardingjdbc.proxy.packet.command.ComQueryPacket;
import lombok.Getter;
import javax.sql.DataSource;
......@@ -42,7 +41,7 @@ public final class DataSourceManager {
private DataSourceManager() {
try {
dataSource = ShardingDataSourceFactory.createDataSource(new File(ComQueryPacket.class.getResource("/conf/sharding-config.yaml").getFile()));
dataSource = ShardingDataSourceFactory.createDataSource(new File(DataSourceManager.class.getResource("/conf/sharding-config.yaml").getFile()));
} catch (final IOException | SQLException ex) {
throw new ShardingJdbcException(ex);
}
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingjdbc.proxy;
package io.shardingjdbc.proxy.transport;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
......@@ -25,8 +25,9 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.shardingjdbc.proxy.codec.MySQLPacketCodec;
import io.shardingjdbc.proxy.handler.ServerHandler;
import io.shardingjdbc.core.constant.DatabaseType;
import io.shardingjdbc.proxy.transport.codec.PacketCodecFactory;
import io.shardingjdbc.proxy.transport.handler.DatabaseProxyHandlerFactory;
/**
* Sharding-Proxy.
......@@ -52,8 +53,9 @@ public final class ShardingProxy {
@Override
public void initChannel(final SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MySQLPacketCodec());
pipeline.addLast(new ServerHandler());
// TODO load database type from yaml or startup arguments
pipeline.addLast(PacketCodecFactory.createPacketCodecInstance(DatabaseType.MySQL));
pipeline.addLast(DatabaseProxyHandlerFactory.createDatabaseProxyHandlerInstance(DatabaseType.MySQL));
}
});
ChannelFuture future = bootstrap.bind(port).sync();
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.shardingjdbc.proxy.transport.packet.SentPacket;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Database protocol packet codec.
*
* @author zhangliang
*/
@Slf4j
public abstract class PacketCodec<T extends SentPacket> extends ByteToMessageCodec<T> {
@Override
protected void decode(final ChannelHandlerContext context, final ByteBuf in, final List<Object> out) {
int readableBytes = in.readableBytes();
if (!isValidHeader(readableBytes)) {
return;
}
if (log.isDebugEnabled()) {
log.debug("Read from client: \n {}", ByteBufUtil.prettyHexDump(in));
}
doDecode(context, in, out, readableBytes);
}
protected abstract boolean isValidHeader(int readableBytes);
protected abstract void doDecode(ChannelHandlerContext context, ByteBuf in, List<Object> out, int readableBytes);
@Override
protected void encode(final ChannelHandlerContext context, final T message, final ByteBuf out) {
doEncode(context, message, out);
if (log.isDebugEnabled()) {
log.debug("Write to client: \n {}", ByteBufUtil.prettyHexDump(out));
}
}
protected abstract void doEncode(ChannelHandlerContext context, T message, ByteBuf out);
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.codec;
import io.shardingjdbc.core.constant.DatabaseType;
import io.shardingjdbc.proxy.transport.codec.mysql.MySQLPacketCodec;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
* Database protocol packet codec factory.
*
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PacketCodecFactory {
/**
* Create packet codec instance.
*
* @param databaseType database type
* @return packet codec instance
*/
public static PacketCodec createPacketCodecInstance(final DatabaseType databaseType) {
switch (databaseType) {
case MySQL:
return new MySQLPacketCodec();
default:
throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseType));
}
}
}
......@@ -15,16 +15,14 @@
* </p>
*/
package io.shardingjdbc.proxy.codec;
package io.shardingjdbc.proxy.transport.codec.mysql;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.shardingjdbc.proxy.packet.MySQLPacket;
import io.shardingjdbc.proxy.packet.MySQLPacketPayload;
import io.shardingjdbc.proxy.packet.MySQLSentPacket;
import lombok.extern.slf4j.Slf4j;
import io.shardingjdbc.proxy.transport.codec.PacketCodec;
import io.shardingjdbc.proxy.transport.packet.mysql.MySQLPacket;
import io.shardingjdbc.proxy.transport.packet.mysql.MySQLPacketPayload;
import io.shardingjdbc.proxy.transport.packet.mysql.MySQLSentPacket;
import java.util.List;
......@@ -33,18 +31,15 @@ import java.util.List;
*
* @author zhangliang
*/
@Slf4j
public final class MySQLPacketCodec extends ByteToMessageCodec<MySQLSentPacket> {
public final class MySQLPacketCodec extends PacketCodec<MySQLSentPacket> {
@Override
protected void decode(final ChannelHandlerContext context, final ByteBuf in, final List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes < MySQLPacket.PAYLOAD_LENGTH + MySQLPacket.SEQUENCE_LENGTH) {
return;
}
if (log.isDebugEnabled()) {
log.debug("Read from client: \n {}", ByteBufUtil.prettyHexDump(in));
}
protected boolean isValidHeader(final int readableBytes) {
return readableBytes > MySQLPacket.PAYLOAD_LENGTH + MySQLPacket.SEQUENCE_LENGTH;
}
@Override
protected void doDecode(final ChannelHandlerContext context, final ByteBuf in, final List<Object> out, final int readableBytes) {
int payloadLength = in.markReaderIndex().readMediumLE();
int realPacketLength = payloadLength + MySQLPacket.PAYLOAD_LENGTH + MySQLPacket.SEQUENCE_LENGTH;
if (readableBytes < realPacketLength) {
......@@ -59,14 +54,11 @@ public final class MySQLPacketCodec extends ByteToMessageCodec<MySQLSentPacket>
}
@Override
protected void encode(final ChannelHandlerContext context, final MySQLSentPacket message, final ByteBuf out) throws Exception {
protected void doEncode(final ChannelHandlerContext context, final MySQLSentPacket message, final ByteBuf out) {
MySQLPacketPayload mysqlPacketPayload = new MySQLPacketPayload(context.alloc().buffer());
message.write(mysqlPacketPayload);
out.writeMediumLE(mysqlPacketPayload.getByteBuf().readableBytes());
out.writeByte(message.getSequenceId());
out.writeBytes(mysqlPacketPayload.getByteBuf());
if (log.isDebugEnabled()) {
log.debug("Write to client: \n {}", ByteBufUtil.prettyHexDump(out));
}
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Database proxy handler.
*
* @author zhangliang
*/
public abstract class DatabaseProxyHandler extends ChannelInboundHandlerAdapter {
private boolean authorized;
@Override
public void channelActive(final ChannelHandlerContext context) {
handshake(context);
}
protected abstract void handshake(ChannelHandlerContext context);
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
if (!authorized) {
auth(context, (ByteBuf) message);
authorized = true;
} else {
executeCommand(context, (ByteBuf) message);
}
}
protected abstract void auth(ChannelHandlerContext context, ByteBuf message);
protected abstract void executeCommand(ChannelHandlerContext context, ByteBuf message);
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.handler;
import io.shardingjdbc.core.constant.DatabaseType;
import io.shardingjdbc.proxy.transport.handler.mysql.MySQLProxyHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
* Database proxy handler factory.
*
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DatabaseProxyHandlerFactory {
/**
* Create database proxy handler instance.
*
* @param databaseType database type
* @return database proxy handler instance
*/
public static DatabaseProxyHandler createDatabaseProxyHandlerInstance(final DatabaseType databaseType) {
switch (databaseType) {
case MySQL:
return new MySQLProxyHandler();
default:
throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseType));
}
}
}
......@@ -15,57 +15,48 @@
* </p>
*/
package io.shardingjdbc.proxy.handler;
package io.shardingjdbc.proxy.transport.handler.mysql;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.shardingjdbc.proxy.constant.StatusFlag;
import io.shardingjdbc.proxy.packet.MySQLSentPacket;
import io.shardingjdbc.proxy.packet.MySQLPacketPayload;
import io.shardingjdbc.proxy.packet.command.CommandPacket;
import io.shardingjdbc.proxy.packet.command.CommandPacketFactory;
import io.shardingjdbc.proxy.packet.handshake.AuthPluginData;
import io.shardingjdbc.proxy.packet.handshake.ConnectionIdGenerator;
import io.shardingjdbc.proxy.packet.handshake.HandshakePacket;
import io.shardingjdbc.proxy.packet.handshake.HandshakeResponse41Packet;
import io.shardingjdbc.proxy.packet.ok.OKPacket;
import io.shardingjdbc.proxy.transport.handler.DatabaseProxyHandler;
import io.shardingjdbc.proxy.transport.packet.mysql.MySQLPacketPayload;
import io.shardingjdbc.proxy.transport.packet.mysql.MySQLSentPacket;
import io.shardingjdbc.proxy.transport.packet.mysql.command.CommandPacket;
import io.shardingjdbc.proxy.transport.packet.mysql.command.CommandPacketFactory;
import io.shardingjdbc.proxy.transport.packet.mysql.handshake.AuthPluginData;
import io.shardingjdbc.proxy.transport.packet.mysql.handshake.ConnectionIdGenerator;
import io.shardingjdbc.proxy.transport.packet.mysql.handshake.HandshakePacket;
import io.shardingjdbc.proxy.transport.packet.mysql.handshake.HandshakeResponse41Packet;
import io.shardingjdbc.proxy.transport.packet.mysql.ok.OKPacket;
/**
* Server handler.
* MySQL proxy handler.
*
* @author zhangliang
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
public final class MySQLProxyHandler extends DatabaseProxyHandler {
private AuthPluginData authPluginData;
private boolean authorized;
@Override
public void channelActive(final ChannelHandlerContext context) {
protected void handshake(final ChannelHandlerContext context) {
authPluginData = new AuthPluginData();
context.writeAndFlush(new HandshakePacket(ConnectionIdGenerator.getInstance().nextId(), authPluginData));
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
MySQLPacketPayload mysqlPacketPayload = new MySQLPacketPayload((ByteBuf) message);
if (!authorized) {
auth(context, mysqlPacketPayload);
} else {
executeCommand(context, mysqlPacketPayload);
}
}
private void auth(final ChannelHandlerContext context, final MySQLPacketPayload mysqlPacketPayload) {
HandshakeResponse41Packet response41 = new HandshakeResponse41Packet().read(mysqlPacketPayload);
protected void auth(final ChannelHandlerContext context, final ByteBuf message) {
MySQLPacketPayload mysqlPacketPayload = new MySQLPacketPayload(message);
// TODO use authPluginData to auth
authorized = true;
HandshakeResponse41Packet response41 = new HandshakeResponse41Packet().read(mysqlPacketPayload);
context.writeAndFlush(new OKPacket(response41.getSequenceId() + 1, 0L, 0L, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue(), 0, ""));
}
private void executeCommand(final ChannelHandlerContext context, final MySQLPacketPayload mysqlPacketPayload) {
@Override
protected void executeCommand(final ChannelHandlerContext context, final ByteBuf message) {
MySQLPacketPayload mysqlPacketPayload = new MySQLPacketPayload(message);
int sequenceId = mysqlPacketPayload.readInt1();
CommandPacket commandPacket = CommandPacketFactory.getCommandPacket(mysqlPacketPayload.readInt1());
commandPacket.setSequenceId(sequenceId);
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.packet;
/**
* Database protocol packet.
*
* @author zhangliang
*/
public interface DatabaseProtocolPacket {
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.packet;
/**
* Database received protocol packet.
*
* @author zhangliang
*/
public interface ReceivedPacket extends DatabaseProtocolPacket {
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingjdbc.proxy.transport.packet;
/**
* Database sent protocol packet.
*
* @author zhangliang
*/
public interface SentPacket extends DatabaseProtocolPacket {
}
......@@ -17,7 +17,7 @@
package io.shardingjdbc.proxy.util;
import io.shardingjdbc.proxy.packet.handshake.RandomGenerator;
import io.shardingjdbc.proxy.transport.packet.mysql.handshake.RandomGenerator;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册