提交 815e4e6e 编写于 作者: F Frankie Wu

fix compilation errors

上级 7630b243
......@@ -19,7 +19,9 @@ import com.dianping.bee.engine.spi.internal.DefaultTableProviderManager;
import com.dianping.bee.engine.spi.internal.MultiTableStatementVisitor;
import com.dianping.bee.engine.spi.internal.SingleTableStatementVisitor;
import com.dianping.bee.engine.spi.internal.TableHelper;
import com.dianping.bee.server.SelectHandler;
import com.dianping.bee.server.SimpleServer;
import com.dianping.bee.server.SimpleServerQueryHandler;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -48,6 +50,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MultiTableStatementVisitor.class).is(PER_LOOKUP) //
.req(TableHelper.class, MultiTableStatement.class, RowFilter.class));
all.add(C(SelectHandler.class) //
.req(StatementManager.class));
all.add(C(SimpleServerQueryHandler.class).is(PER_LOOKUP) //
.req(SelectHandler.class));
return all;
}
......
......@@ -3,5 +3,5 @@ package com.dianping.bee.engine.spi;
import java.sql.SQLSyntaxErrorException;
public interface StatementManager {
public Statement parse(String sql) throws SQLSyntaxErrorException;
public Statement build(String sql) throws SQLSyntaxErrorException;
}
......@@ -21,6 +21,7 @@ public class DefaultRowFilter implements RowFilter {
public String toString() {
MySQLOutputASTVisitor visitor = new MySQLOutputASTVisitor(new StringBuilder());
m_expr.accept(visitor);
return visitor.getSql();
}
......
......@@ -14,7 +14,7 @@ public class DefaultStatementManager extends ContainerHolder implements Statemen
private Map<String, Statement> m_statements = new HashMap<String, Statement>();
@Override
public Statement parse(String sql) throws SQLSyntaxErrorException {
public Statement build(String sql) throws SQLSyntaxErrorException {
Statement statement = m_statements.get(sql);
if (statement == null) {
......
......@@ -2,6 +2,8 @@ package com.dianping.bee.engine.spi.meta;
public interface ColumnMeta {
public String getName();
public Class<?> getType();
public int getCobarType();
}
......@@ -12,54 +12,60 @@
* accordance with the terms of the license agreement you entered into
* with dianping.com.
*/
package com.dianping.bee.server.mysql;
package com.dianping.bee.server;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import java.sql.SQLSyntaxErrorException;
import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.Fields;
import com.alibaba.cobar.config.model.DataSourceConfig;
import com.alibaba.cobar.config.model.SchemaConfig;
import com.alibaba.cobar.net.util.PacketUtil;
import com.alibaba.cobar.parser.util.ParseUtil;
import com.alibaba.cobar.protocol.mysql.EOFPacket;
import com.alibaba.cobar.protocol.mysql.FieldPacket;
import com.alibaba.cobar.protocol.mysql.ResultSetHeaderPacket;
import com.alibaba.cobar.protocol.mysql.RowDataPacket;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.parser.ServerParseSelect;
import com.alibaba.cobar.server.response.SelectDatabase;
import com.alibaba.cobar.server.response.SelectIdentity;
import com.alibaba.cobar.server.response.SelectLastInsertId;
import com.alibaba.cobar.server.response.SelectUser;
import com.alibaba.cobar.server.response.SelectVersion;
import com.alibaba.cobar.server.response.SelectVersionComment;
import com.alibaba.cobar.util.IntegerUtil;
import com.alibaba.cobar.util.StringUtil;
import com.dianping.bee.engine.spi.Cell;
import com.dianping.bee.engine.spi.Row;
import com.dianping.bee.engine.spi.RowSet;
import com.dianping.bee.engine.spi.Statement;
import com.dianping.bee.engine.spi.StatementManager;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
import com.site.lookup.annotation.Inject;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class SelectResponse {
public class SelectHandler {
@Inject
private StatementManager m_manager;
private static final Logger LOGGER = Logger.getLogger(SelectResponse.class);
private RowDataPacket getRow(RowSet rowset, int rowIndex, String charset) {
int cols = rowset.getColumns();
RowDataPacket packet = new RowDataPacket(cols);
Row row = rowset.getRow(rowIndex);
/**
*
* @param rowData
* @param rowType
* @param charset
* @return
*/
private static RowDataPacket getRow(Object[] rowData, int[] rowType, String charset) {
RowDataPacket row = new RowDataPacket(rowData.length);
for (int i = 0; i < rowData.length; i++) {
switch (rowType[i]) {
for (int i = 0; i < cols; i++) {
ColumnMeta column = rowset.getColumn(i);
Cell cell = row.getCell(i);
String value = cell.getValue().toString();
switch (column.getCobarType()) {
case Fields.FIELD_TYPE_STRING:
row.add(StringUtil.encode(rowData[i].toString(), charset));
packet.add(StringUtil.encode(value, charset));
break;
case Fields.FIELD_TYPE_INT24:
row.add(IntegerUtil.toBytes(Integer.parseInt(rowData[i].toString())));
packet.add(value == null ? null : IntegerUtil.toBytes(Integer.parseInt(value)));
break;
case Fields.FIELD_TYPE_DECIMAL:
case Fields.FIELD_TYPE_TINY:
......@@ -87,28 +93,92 @@ public class SelectResponse {
case Fields.FIELD_TYPE_VAR_STRING:
case Fields.FIELD_TYPE_GEOMETRY:
default:
row.add(StringUtil.encode(rowData[i].toString(), charset));
packet.add(StringUtil.encode(value, charset));
}
}
return packet;
}
public void handle(String stmt, ServerConnection c, int offs) {
int offset = offs;
switch (ServerParseSelect.parse(stmt, offs)) {
case ServerParseSelect.VERSION_COMMENT:
SelectVersionComment.response(c);
break;
case ServerParseSelect.DATABASE:
SelectDatabase.response(c);
break;
case ServerParseSelect.USER:
SelectUser.response(c);
break;
case ServerParseSelect.VERSION:
SelectVersion.response(c);
break;
case ServerParseSelect.LAST_INSERT_ID:
// offset = ParseUtil.move(stmt, 0, "select".length());
loop: for (; offset < stmt.length(); ++offset) {
switch (stmt.charAt(offset)) {
case ' ':
continue;
case '/':
case '#':
offset = ParseUtil.comment(stmt, offset);
continue;
case 'L':
case 'l':
break loop;
}
}
offset = ServerParseSelect.indexAfterLastInsertIdFunc(stmt, offset);
offset = ServerParseSelect.skipAs(stmt, offset);
SelectLastInsertId.response(c, stmt, offset);
break;
case ServerParseSelect.IDENTITY:
// offset = ParseUtil.move(stmt, 0, "select".length());
loop: for (; offset < stmt.length(); ++offset) {
switch (stmt.charAt(offset)) {
case ' ':
continue;
case '/':
case '#':
offset = ParseUtil.comment(stmt, offset);
continue;
case '@':
break loop;
}
}
int indexOfAtAt = offset;
offset += 2;
offset = ServerParseSelect.indexAfterIdentity(stmt, offset);
String orgName = stmt.substring(indexOfAtAt, offset);
offset = ServerParseSelect.skipAs(stmt, offset);
SelectIdentity.response(c, stmt, offset, orgName);
break;
default:
try {
response(c, stmt);
} catch (SQLSyntaxErrorException e) {
c.writeErrMessage(ErrorCode.ER_SYNTAX_ERROR, e.getMessage());
}
}
return row;
}
/**
*
* @param c
* @param stmt
* @throws SQLSyntaxErrorException
*/
public static void response(ServerConnection c, String sql) {
StatementManager statementManager = lookup(StatementManager.class);
Statement stmt = statementManager.parse(sql);
public void response(ServerConnection c, String sql) throws SQLSyntaxErrorException {
Statement stmt = m_manager.build(sql);
RowSet result = stmt.query();
RowSet rowset = stmt.query();
byte packetId = 0;
EOFPacket eof = new EOFPacket();
ByteBuffer buffer = c.allocate();
// write header
int fieldCount = result.getColumns();
int fieldCount = rowset.getColumns();
ResultSetHeaderPacket header = PacketUtil.getHeader(fieldCount);
header.packetId = ++packetId;
buffer = header.write(buffer, c);
......@@ -117,7 +187,7 @@ public class SelectResponse {
int columnIndex = 0;
FieldPacket[] fields = new FieldPacket[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fields[columnIndex] = PacketUtil.getField(result.getColumn(i).getName(), result.getColumn(i).getType());
fields[columnIndex] = PacketUtil.getField(rowset.getColumn(i).getName(), rowset.getColumn(i).getCobarType());
fields[columnIndex++].packetId = ++packetId;
}
eof.packetId = ++packetId;
......@@ -128,8 +198,9 @@ public class SelectResponse {
buffer = eof.write(buffer, c);
// write rows
for (int rowIndex = 0; rowIndex < result.getRows(); rowIndex++) {
RowDataPacket row = getRow(result.getRow(rowIndex), result.getMetaData().getColumnTypes(), c.getCharset());
for (int rowIndex = 0; rowIndex < rowset.getRows(); rowIndex++) {
RowDataPacket row = getRow(rowset, rowIndex, c.getCharset());
row.packetId = ++packetId;
buffer = row.write(buffer, c);
}
......
......@@ -8,7 +8,6 @@ import org.codehaus.plexus.logging.Logger;
import com.alibaba.cobar.net.NIOAcceptor;
import com.alibaba.cobar.net.NIOConnector;
import com.alibaba.cobar.net.NIOProcessor;
import com.alibaba.cobar.server.ServerConnectionFactory;
import com.site.helper.Threads;
import com.site.helper.Threads.Task;
import com.site.lookup.annotation.Inject;
......@@ -29,6 +28,7 @@ public class SimpleServer implements LogEnabled {
}
public void startup() throws IOException {
// start processors
NIOProcessor[] processors = new NIOProcessor[4];
for (int i = 0; i < processors.length; i++) {
......@@ -36,15 +36,14 @@ public class SimpleServer implements LogEnabled {
processors[i].startup();
}
Threads.forGroup("Bee").start(new ProcessorCheckTask(processors));
// startup connector
NIOConnector connector = new NIOConnector("BeeConnector");
connector.setProcessors(processors);
connector.start();
// startup server
ServerConnectionFactory sf = new ServerConnectionFactory();
SimpleServerConnectionFactory sf = new SimpleServerConnectionFactory();
sf.setIdleTimeout(3600 * 1000L); // one hour
......@@ -53,6 +52,8 @@ public class SimpleServer implements LogEnabled {
server.setProcessors(processors);
server.start();
Threads.forGroup("Bee").start(new ProcessorCheckTask(processors));
m_logger.info(String.format("BEE server started at %s", m_port));
}
......
......@@ -2,6 +2,9 @@ package com.dianping.bee.server;
import java.nio.channels.SocketChannel;
import org.codehaus.plexus.PlexusContainer;
import org.codehaus.plexus.component.repository.exception.ComponentLookupException;
import com.alibaba.cobar.CobarPrivileges;
import com.alibaba.cobar.Isolations;
import com.alibaba.cobar.net.FrontendConnection;
......@@ -9,21 +12,41 @@ import com.alibaba.cobar.net.factory.FrontendConnectionFactory;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.session.BlockingSession;
import com.alibaba.cobar.server.session.NonBlockingSession;
import com.site.lookup.ContainerLoader;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class SimpleServerConnectionFactory extends FrontendConnectionFactory {
private PlexusContainer m_container;
public void initialize() {
m_container = ContainerLoader.getDefaultContainer();
}
@Override
protected FrontendConnection getConnection(SocketChannel channel) {
ServerConnection c = new ServerConnection(channel);
SimpleServerQueryHandler queryHandler = getQueryHandler(c);
c.setQueryHandler(queryHandler);
c.setPrivileges(new CobarPrivileges());
c.setTxIsolation(Isolations.REPEATED_READ);
c.setQueryHandler(new SimpleServerQueryHandler(c));
c.setSession(new BlockingSession(c));
c.setSession2(new NonBlockingSession(c));
return c;
}
private SimpleServerQueryHandler getQueryHandler(ServerConnection c) {
try {
SimpleServerQueryHandler queryHandler = m_container.lookup(SimpleServerQueryHandler.class);
queryHandler.setServerConnection(c);
return queryHandler;
} catch (ComponentLookupException e) {
throw new RuntimeException(
"Unable to get SimpleServerQueryHandler instance, please check if the environment is setup correctly!", e);
}
}
}
......@@ -28,26 +28,27 @@ import com.alibaba.cobar.server.handler.ShowHandler;
import com.alibaba.cobar.server.handler.StartHandler;
import com.alibaba.cobar.server.handler.UseHandler;
import com.alibaba.cobar.server.parser.ServerParse;
import com.dianping.bee.server.mysql.SelectHandler;
import com.site.lookup.annotation.Inject;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class SimpleServerQueryHandler implements FrontendQueryHandler {
private static final Logger LOGGER = Logger.getLogger(SimpleServerQueryHandler.class);
@Inject
private SelectHandler m_selectHandler;
private final ServerConnection source;
private static final Logger LOGGER = Logger.getLogger(SimpleServerQueryHandler.class);
public SimpleServerQueryHandler(ServerConnection source) {
this.source = source;
}
private ServerConnection m_conn;
@Override
public void query(String sql) {
ServerConnection c = this.source;
ServerConnection c = this.m_conn;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
}
int rs = ServerParse.parse(sql);
switch (rs & 0xff) {
case ServerParse.EXPLAIN:
......@@ -60,7 +61,7 @@ public class SimpleServerQueryHandler implements FrontendQueryHandler {
ShowHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.SELECT:
SelectHandler.handle(sql, c, rs >>> 8);
m_selectHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.START:
StartHandler.handle(sql, c, rs >>> 8);
......@@ -91,4 +92,7 @@ public class SimpleServerQueryHandler implements FrontendQueryHandler {
}
}
public void setServerConnection(ServerConnection c) {
m_conn = c;
}
}
/**
* Project: bee-engine
*
* File Created at 2012-8-14
*
* Copyright 2012 dianping.com.
* All rights reserved.
*
* This software is the confidential and proprietary information of
* Dianping Company. ("Confidential Information"). You shall not
* disclose such Confidential Information and shall use it only in
* accordance with the terms of the license agreement you entered into
* with dianping.com.
*/
package com.dianping.bee.server.mysql;
import com.alibaba.cobar.parser.util.ParseUtil;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.parser.ServerParseSelect;
import com.alibaba.cobar.server.response.SelectDatabase;
import com.alibaba.cobar.server.response.SelectIdentity;
import com.alibaba.cobar.server.response.SelectLastInsertId;
import com.alibaba.cobar.server.response.SelectUser;
import com.alibaba.cobar.server.response.SelectVersion;
import com.alibaba.cobar.server.response.SelectVersionComment;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class SelectHandler {
public static void handle(String stmt, ServerConnection c, int offs) {
int offset = offs;
switch (ServerParseSelect.parse(stmt, offs)) {
case ServerParseSelect.VERSION_COMMENT:
SelectVersionComment.response(c);
break;
case ServerParseSelect.DATABASE:
SelectDatabase.response(c);
break;
case ServerParseSelect.USER:
SelectUser.response(c);
break;
case ServerParseSelect.VERSION:
SelectVersion.response(c);
break;
case ServerParseSelect.LAST_INSERT_ID:
// offset = ParseUtil.move(stmt, 0, "select".length());
loop: for (; offset < stmt.length(); ++offset) {
switch (stmt.charAt(offset)) {
case ' ':
continue;
case '/':
case '#':
offset = ParseUtil.comment(stmt, offset);
continue;
case 'L':
case 'l':
break loop;
}
}
offset = ServerParseSelect.indexAfterLastInsertIdFunc(stmt, offset);
offset = ServerParseSelect.skipAs(stmt, offset);
SelectLastInsertId.response(c, stmt, offset);
break;
case ServerParseSelect.IDENTITY:
// offset = ParseUtil.move(stmt, 0, "select".length());
loop: for (; offset < stmt.length(); ++offset) {
switch (stmt.charAt(offset)) {
case ' ':
continue;
case '/':
case '#':
offset = ParseUtil.comment(stmt, offset);
continue;
case '@':
break loop;
}
}
int indexOfAtAt = offset;
offset += 2;
offset = ServerParseSelect.indexAfterIdentity(stmt, offset);
String orgName = stmt.substring(indexOfAtAt, offset);
offset = ServerParseSelect.skipAs(stmt, offset);
SelectIdentity.response(c, stmt, offset, orgName);
break;
default:
SelectResponse.response(c, stmt);
}
}
}
......@@ -2,8 +2,6 @@ package com.dianping.bee.db;
import com.dianping.bee.engine.spi.DatabaseProvider;
import com.dianping.bee.engine.spi.Index;
import com.dianping.bee.engine.spi.RowSet;
import com.dianping.bee.engine.spi.Statement;
import com.dianping.bee.engine.spi.TableProvider;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
......@@ -57,11 +55,6 @@ public class CatDatabase implements DatabaseProvider {
public String getName() {
return m_name;
}
@Override
public RowSet query(Statement stmt) {
return null;
}
}
public static enum TransactionColumn implements ColumnMeta {
......@@ -109,6 +102,12 @@ public class CatDatabase implements DatabaseProvider {
public Class<?> getType() {
return m_type;
}
@Override
public int getCobarType() {
// TODO
return 0;
}
}
public static enum TransactionIndex implements Index {
......
......@@ -12,8 +12,7 @@ public class SpiTest extends ComponentTestCase {
@Test
public void sample() throws Exception {
StatementManager statementManager = lookup(StatementManager.class);
Statement stmt = statementManager
.parse("select type, sum(failures) from transaction where domain=? and starttime=?");
Statement stmt = statementManager.build("select type, sum(failures) from transaction where domain=? and starttime=?");
RowSet rowset = stmt.query();
display(rowset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册