提交 fe22f2bb 编写于 作者: Y youyong

Merge branch 'master' of ssh://192.168.8.22:58422/cat

...@@ -7,15 +7,19 @@ import com.dianping.bee.engine.spi.meta.RowSet; ...@@ -7,15 +7,19 @@ import com.dianping.bee.engine.spi.meta.RowSet;
public interface Statement { public interface Statement {
public IndexMeta getIndex(); public IndexMeta getIndex();
public int getParameterSize();
public RowFilter getRowFilter(); public RowFilter getRowFilter();
public ColumnMeta[] getSelectColumns(); public ColumnMeta[] getSelectColumns();
public RowSet query();
public void setIndex(IndexMeta index); public void setIndex(IndexMeta index);
public void setParameterSize(int m_parameterSize);
public void setRowFilter(RowFilter rowFilter); public void setRowFilter(RowFilter rowFilter);
public void setSelectColumns(ColumnMeta[] selectColumns); public void setSelectColumns(ColumnMeta[] selectColumns);
public RowSet query();
} }
...@@ -3,7 +3,13 @@ package com.dianping.bee.engine.spi; ...@@ -3,7 +3,13 @@ package com.dianping.bee.engine.spi;
import java.sql.SQLSyntaxErrorException; import java.sql.SQLSyntaxErrorException;
public interface StatementManager { public interface StatementManager {
public Statement parseSQL(String sql) throws SQLSyntaxErrorException;
public Statement build(String sql) throws SQLSyntaxErrorException; public Statement build(String sql) throws SQLSyntaxErrorException;
public Statement prepare(String sql) throws SQLSyntaxErrorException; public long stmtPrepare(Statement stmt);
public Statement stmtExecute(long stmtId);
public void stmtClose(long stmtId);
} }
...@@ -78,27 +78,25 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements ...@@ -78,27 +78,25 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements
} }
public void write(MySQLPacket packet) { public void write(MySQLPacket packet) {
packet.packetId = m_packetId++;
m_buffer = packet.write(m_buffer, m_conn); m_buffer = packet.write(m_buffer, m_conn);
} }
public void writeEOF() { public void writeEOF() {
EOFPacket eof = new EOFPacket(); EOFPacket eof = new EOFPacket();
eof.packetId = m_packetId++;
write(eof); write(eof);
} }
public void writeField(String name, int fieldType) { public void writeField(String name, int fieldType) {
FieldPacket field = PacketUtil.getField(name, fieldType); FieldPacket field = PacketUtil.getField(name, fieldType);
field.packetId = m_packetId++;
write(field); write(field);
} }
public void writeHeader(int fieldCount) { public void writeHeader(int fieldCount) {
ResultSetHeaderPacket header = PacketUtil.getHeader(fieldCount); ResultSetHeaderPacket header = PacketUtil.getHeader(fieldCount);
header.packetId = m_packetId++;
write(header); write(header);
} }
...@@ -114,7 +112,6 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements ...@@ -114,7 +112,6 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements
row.add(StringUtil.encode(values[i], m_charset)); row.add(StringUtil.encode(values[i], m_charset));
} }
row.packetId = m_packetId++;
write(row); write(row);
} }
...@@ -161,7 +158,6 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements ...@@ -161,7 +158,6 @@ public abstract class AbstractCommandHandler extends ContainerHolder implements
} }
} }
packet.packetId = m_packetId++;
write(packet); write(packet);
} }
} }
......
...@@ -18,10 +18,11 @@ import java.sql.SQLSyntaxErrorException; ...@@ -18,10 +18,11 @@ import java.sql.SQLSyntaxErrorException;
import java.util.List; import java.util.List;
import com.alibaba.cobar.ErrorCode; import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.Fields;
import com.alibaba.cobar.server.ServerConnection; import com.alibaba.cobar.server.ServerConnection;
import com.dianping.bee.engine.spi.Statement;
import com.dianping.bee.engine.spi.StatementManager; import com.dianping.bee.engine.spi.StatementManager;
import com.dianping.bee.engine.spi.handler.AbstractCommandHandler; import com.dianping.bee.engine.spi.handler.AbstractCommandHandler;
import com.site.helper.Joiners;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
/** /**
...@@ -33,11 +34,60 @@ public class PrepareHandler extends AbstractCommandHandler { ...@@ -33,11 +34,60 @@ public class PrepareHandler extends AbstractCommandHandler {
@Override @Override
protected void handle(ServerConnection c, List<String> parts) { protected void handle(ServerConnection c, List<String> parts) {
String stmt = Joiners.by(' ').join(parts); // String stmt = Joiners.by(' ').join(parts);
// try {
// m_manager.stmtPrepare(stmt);
// } catch (SQLSyntaxErrorException e) {
// error(c, ErrorCode.ER_SYNTAX_ERROR, e.getMessage());
// }
}
/**
* @param sql
* @param c
* @param offset
*/
public void close(String sql, ServerConnection c, int offset) {
}
/**
* @param sql
* @param c
* @param offset
*/
public void execute(String sql, ServerConnection c, int offset) {
}
/**
* @param sql
* @param c
* @param offset
*/
public void prepare(String sql, ServerConnection c, int offset) {
Statement stmt = null;
try { try {
m_manager.prepare(stmt); stmt = m_manager.parseSQL(sql);
} catch (SQLSyntaxErrorException e) { } catch (SQLSyntaxErrorException e) {
error(c, ErrorCode.ER_SYNTAX_ERROR, e.getMessage()); error(c, ErrorCode.ER_SYNTAX_ERROR, e.getMessage());
} }
long stmtId = m_manager.stmtPrepare(stmt);
CommandContext ctx = new CommandContext(c);
int columnSize = stmt.getSelectColumns().length;
int parameterSize = stmt.getParameterSize();
PreparePacket packet = new PreparePacket(stmtId, columnSize, parameterSize);
ctx.write(packet);
// FIXME: just some sample code here
for (int i = 0; i < parameterSize; i++) {
PrepareParameterPacket parameterPacket = new PrepareParameterPacket(Fields.FIELD_TYPE_STRING,
Fields.NOT_NULL_FLAG, (byte) 0, 50);
ctx.write(parameterPacket);
}
ctx.complete();
} }
} }
\ No newline at end of file
/**
* Project: bee-engine
*
* File Created at 2012-8-31
*
* Copyright 2012 dianping.com.
* All rights reserved.
*
* This software is the confidential and proprietary information of
* Dianping Company. ("Confidential Information"). You shall not
* disclose such Confidential Information and shall use it only in
* accordance with the terms of the license agreement you entered into
* with dianping.com.
*/
package com.dianping.bee.engine.spi.handler.internal;
import java.nio.ByteBuffer;
import com.alibaba.cobar.net.FrontendConnection;
import com.alibaba.cobar.net.util.BufferUtil;
import com.alibaba.cobar.protocol.MySQLPacket;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class PreparePacket extends MySQLPacket {
public static final byte FIELD_COUNT = 0x00;
private byte m_fieldCount = FIELD_COUNT;
private long m_statementId;
private int m_columnSize;
private int m_parameterSize;
public PreparePacket(long statementId, int columnSize, int parameterSize) {
this.m_statementId = statementId;
this.m_columnSize = columnSize;
this.m_parameterSize = parameterSize;
}
@Override
public ByteBuffer write(ByteBuffer buffer, FrontendConnection c) {
int size = calcPacketSize();
buffer = c.checkWriteBuffer(buffer, c.getPacketHeaderSize() + size);
BufferUtil.writeUB3(buffer, size);
buffer.put(packetId);
buffer.put(m_fieldCount);
BufferUtil.writeUB4(buffer, m_statementId);
BufferUtil.writeUB2(buffer, m_columnSize);
BufferUtil.writeUB2(buffer, m_parameterSize);
return buffer;
}
@Override
public int calcPacketSize() {
return 9;// 1+4+2+2
}
@Override
protected String getPacketInfo() {
return "MySQL Prepare Packet";
}
}
\ No newline at end of file
/**
* Project: bee-engine
*
* File Created at 2012-8-31
*
* Copyright 2012 dianping.com.
* All rights reserved.
*
* This software is the confidential and proprietary information of
* Dianping Company. ("Confidential Information"). You shall not
* disclose such Confidential Information and shall use it only in
* accordance with the terms of the license agreement you entered into
* with dianping.com.
*/
package com.dianping.bee.engine.spi.handler.internal;
import java.nio.ByteBuffer;
import com.alibaba.cobar.net.FrontendConnection;
import com.alibaba.cobar.net.util.BufferUtil;
import com.alibaba.cobar.protocol.MySQLPacket;
/**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/
public class PrepareParameterPacket extends MySQLPacket {
private int m_fieldType;
private int m_columnFlag;
private byte m_decimal;
private int m_length;
public PrepareParameterPacket(int fieldType, int columnFlag, byte decimal, int length) {
this.m_fieldType = fieldType;
this.m_columnFlag = columnFlag;
this.m_decimal = decimal;
this.m_length = length;
}
@Override
public ByteBuffer write(ByteBuffer buffer, FrontendConnection c) {
int size = calcPacketSize();
buffer = c.checkWriteBuffer(buffer, c.getPacketHeaderSize() + size);
BufferUtil.writeUB3(buffer, size);
buffer.put(packetId);
BufferUtil.writeUB2(buffer, m_fieldType);
BufferUtil.writeUB2(buffer, m_columnFlag);
buffer.put(m_decimal);
BufferUtil.writeUB4(buffer, m_length);
return buffer;
}
@Override
public int calcPacketSize() {
return 9; // 2+2+1+4
}
@Override
protected String getPacketInfo() {
return "MySQL Prepare Parameter Packet";
}
}
...@@ -23,6 +23,11 @@ public class DefaultMultiTableStatement implements MultiTableStatement { ...@@ -23,6 +23,11 @@ public class DefaultMultiTableStatement implements MultiTableStatement {
return m_index; return m_index;
} }
@Override
public int getParameterSize() {
return 0;
}
@Override @Override
public RowFilter getRowFilter() { public RowFilter getRowFilter() {
return m_rowFilter; return m_rowFilter;
...@@ -38,11 +43,20 @@ public class DefaultMultiTableStatement implements MultiTableStatement { ...@@ -38,11 +43,20 @@ public class DefaultMultiTableStatement implements MultiTableStatement {
return m_tables; return m_tables;
} }
@Override
public RowSet query() {
return null;
}
@Override @Override
public void setIndex(IndexMeta index) { public void setIndex(IndexMeta index) {
m_index = index; m_index = index;
} }
@Override
public void setParameterSize(int m_parameterSize) {
}
@Override @Override
public void setRowFilter(RowFilter rowFilter) { public void setRowFilter(RowFilter rowFilter) {
m_rowFilter = rowFilter; m_rowFilter = rowFilter;
...@@ -57,9 +71,4 @@ public class DefaultMultiTableStatement implements MultiTableStatement { ...@@ -57,9 +71,4 @@ public class DefaultMultiTableStatement implements MultiTableStatement {
public void setTables(List<TableProvider> tables) { public void setTables(List<TableProvider> tables) {
m_tables = tables; m_tables = tables;
} }
@Override
public RowSet query() {
return null;
}
} }
...@@ -16,11 +16,26 @@ public class DefaultSingleTableStatement implements SingleTableStatement { ...@@ -16,11 +16,26 @@ public class DefaultSingleTableStatement implements SingleTableStatement {
private ColumnMeta[] m_selectColumns; private ColumnMeta[] m_selectColumns;
private int m_parameterSize;
/**
* @param providerRowSet
* @return
*/
private RowSet buildReturnRowSet(RowSet c) {
return c;
}
@Override @Override
public IndexMeta getIndex() { public IndexMeta getIndex() {
return m_index; return m_index;
} }
@Override
public int getParameterSize() {
return m_parameterSize;
}
@Override @Override
public RowFilter getRowFilter() { public RowFilter getRowFilter() {
return m_rowFilter; return m_rowFilter;
...@@ -36,11 +51,29 @@ public class DefaultSingleTableStatement implements SingleTableStatement { ...@@ -36,11 +51,29 @@ public class DefaultSingleTableStatement implements SingleTableStatement {
return m_table; return m_table;
} }
@Override
public RowSet query() {
// Query By Index
RowSet providerRowSet = m_table.queryByIndex(m_index, m_selectColumns);
// Filter
if (providerRowSet != null) {
providerRowSet.filter(m_rowFilter);
}
// Build select columns
RowSet returnRowSet = buildReturnRowSet(providerRowSet);
return returnRowSet;
}
@Override @Override
public void setIndex(IndexMeta index) { public void setIndex(IndexMeta index) {
m_index = index; m_index = index;
} }
@Override
public void setParameterSize(int m_parameterSize) {
this.m_parameterSize = m_parameterSize;
}
@Override @Override
public void setRowFilter(RowFilter rowFilter) { public void setRowFilter(RowFilter rowFilter) {
m_rowFilter = rowFilter; m_rowFilter = rowFilter;
...@@ -59,25 +92,4 @@ public class DefaultSingleTableStatement implements SingleTableStatement { ...@@ -59,25 +92,4 @@ public class DefaultSingleTableStatement implements SingleTableStatement {
public void setTable(TableProvider table) { public void setTable(TableProvider table) {
m_table = table; m_table = table;
} }
@Override
public RowSet query() {
// Query By Index
RowSet providerRowSet = m_table.queryByIndex(m_index, m_selectColumns);
// Filter
if (providerRowSet != null) {
providerRowSet.filter(m_rowFilter);
}
// Build select columns
RowSet returnRowSet = buildReturnRowSet(providerRowSet);
return returnRowSet;
}
/**
* @param providerRowSet
* @return
*/
private RowSet buildReturnRowSet(RowSet c) {
return c;
}
} }
...@@ -13,7 +13,9 @@ import com.site.lookup.ContainerHolder; ...@@ -13,7 +13,9 @@ import com.site.lookup.ContainerHolder;
public class DefaultStatementManager extends ContainerHolder implements StatementManager { public class DefaultStatementManager extends ContainerHolder implements StatementManager {
private Map<String, Statement> m_statements = new HashMap<String, Statement>(); private Map<String, Statement> m_statements = new HashMap<String, Statement>();
private Map<String, Statement> m_prepares = new HashMap<String, Statement>(); private Map<Long, Statement> m_prepares = new HashMap<Long, Statement>();
private static long stmtId = 0;
@Override @Override
public Statement build(String sql) throws SQLSyntaxErrorException { public Statement build(String sql) throws SQLSyntaxErrorException {
...@@ -33,7 +35,7 @@ public class DefaultStatementManager extends ContainerHolder implements Statemen ...@@ -33,7 +35,7 @@ public class DefaultStatementManager extends ContainerHolder implements Statemen
return statement; return statement;
} }
private Statement parseSQL(String sql) throws SQLSyntaxErrorException { public Statement parseSQL(String sql) throws SQLSyntaxErrorException {
SQLStatement statement = SQLParserDelegate.parse(sql); SQLStatement statement = SQLParserDelegate.parse(sql);
DefaultStatementVisitor defaultVisitor = new DefaultStatementVisitor(); DefaultStatementVisitor defaultVisitor = new DefaultStatementVisitor();
...@@ -53,20 +55,23 @@ public class DefaultStatementManager extends ContainerHolder implements Statemen ...@@ -53,20 +55,23 @@ public class DefaultStatementManager extends ContainerHolder implements Statemen
} }
@Override @Override
public Statement prepare(String sql) throws SQLSyntaxErrorException { public long stmtPrepare(Statement stmt) {
Statement statement = m_prepares.get(sql); synchronized (m_prepares) {
m_prepares.put(stmtId++ % Long.MAX_VALUE, stmt);
}
if (statement == null) { return stmtId;
synchronized (m_prepares) { }
statement = m_prepares.get(sql);
if (statement == null) { @Override
statement = parseSQL(sql); public Statement stmtExecute(long stmtId) {
m_prepares.put(sql, statement); return m_prepares.get(stmtId);
} }
}
}
return statement; @Override
public void stmtClose(long stmtId) {
synchronized (m_prepares) {
m_prepares.remove(stmtId);
}
} }
} }
...@@ -5,6 +5,7 @@ import java.util.List; ...@@ -5,6 +5,7 @@ import java.util.List;
import com.alibaba.cobar.parser.ast.expression.Expression; import com.alibaba.cobar.parser.ast.expression.Expression;
import com.alibaba.cobar.parser.ast.expression.primary.Identifier; import com.alibaba.cobar.parser.ast.expression.primary.Identifier;
import com.alibaba.cobar.parser.ast.expression.primary.ParamMarker;
import com.alibaba.cobar.parser.ast.fragment.tableref.TableRefFactor; import com.alibaba.cobar.parser.ast.fragment.tableref.TableRefFactor;
import com.alibaba.cobar.parser.ast.fragment.tableref.TableReference; import com.alibaba.cobar.parser.ast.fragment.tableref.TableReference;
import com.alibaba.cobar.parser.ast.stmt.dml.DMLSelectStatement; import com.alibaba.cobar.parser.ast.stmt.dml.DMLSelectStatement;
...@@ -46,6 +47,8 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor { ...@@ -46,6 +47,8 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor {
private String m_databaseName; private String m_databaseName;
private int m_parameterSize;
private Clause m_clause; private Clause m_clause;
private List<ColumnMeta> m_selectColumns = new ArrayList<ColumnMeta>(); private List<ColumnMeta> m_selectColumns = new ArrayList<ColumnMeta>();
...@@ -139,6 +142,8 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor { ...@@ -139,6 +142,8 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor {
} else { } else {
m_stmt.setIndex(m_helper.findIndex(m_databaseName, m_tableName, m_whereColumns)); m_stmt.setIndex(m_helper.findIndex(m_databaseName, m_tableName, m_whereColumns));
} }
m_stmt.setParameterSize(m_parameterSize);
} }
} }
...@@ -178,4 +183,9 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor { ...@@ -178,4 +183,9 @@ public class SingleTableStatementVisitor extends EmptySQLASTVisitor {
m_databaseName = node.getTable().getParent().getIdTextUpUnescape(); m_databaseName = node.getTable().getParent().getIdTextUpUnescape();
} }
} }
@Override
public void visit(ParamMarker node) {
m_parameterSize++;
}
} }
...@@ -93,7 +93,67 @@ public class SimpleServerConnection extends ServerConnection { ...@@ -93,7 +93,67 @@ public class SimpleServerConnection extends ServerConnection {
// 执行查询 // 执行查询
if (queryHandler != null) { if (queryHandler != null) {
((SimpleServerQueryHandler) queryHandler).prepare(sql); ((SimpleServerQueryHandler) queryHandler).stmtPrepare(sql);
} else {
writeErrMessage(ErrorCode.ER_YES, "Empty QueryHandler");
}
} finally {
m_sessionManager.removeSession();
}
}
@Override
public void stmtExecute(byte[] data) {
m_sessionManager.getSession().setDatabase(getSchema());
try {
// 取得查询语句
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
String sql = null;
try {
sql = mm.readString(charset);
} catch (UnsupportedEncodingException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
return;
}
if (sql == null || sql.length() == 0) {
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty Prepared SQL");
return;
}
// 执行查询
if (queryHandler != null) {
((SimpleServerQueryHandler) queryHandler).stmtExecute(sql);
} else {
writeErrMessage(ErrorCode.ER_YES, "Empty QueryHandler");
}
} finally {
m_sessionManager.removeSession();
}
}
@Override
public void stmtClose(byte[] data) {
m_sessionManager.getSession().setDatabase(getSchema());
try {
// 取得查询语句
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
String sql = null;
try {
sql = mm.readString(charset);
} catch (UnsupportedEncodingException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
return;
}
if (sql == null || sql.length() == 0) {
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty Prepared SQL");
return;
}
// 执行查询
if (queryHandler != null) {
((SimpleServerQueryHandler) queryHandler).stmtClose(sql);
} else { } else {
writeErrMessage(ErrorCode.ER_YES, "Empty QueryHandler"); writeErrMessage(ErrorCode.ER_YES, "Empty QueryHandler");
} }
......
...@@ -84,12 +84,28 @@ public class SimpleServerQueryHandler implements FrontendQueryHandler { ...@@ -84,12 +84,28 @@ public class SimpleServerQueryHandler implements FrontendQueryHandler {
} }
} }
public void prepare(String sql) { public void setServerConnection(ServerConnection c) {
m_conn = c;
}
/**
* @param sql
*/
public void stmtClose(String sql) {
ServerConnection c = m_conn; ServerConnection c = m_conn;
m_prepareHandler.handle(sql, c, -1); m_prepareHandler.close(sql, c, -1);
} }
public void setServerConnection(ServerConnection c) { /**
m_conn = c; * @param sql
*/
public void stmtExecute(String sql) {
ServerConnection c = m_conn;
m_prepareHandler.execute(sql, c, -1);
}
public void stmtPrepare(String sql) {
ServerConnection c = m_conn;
m_prepareHandler.prepare(sql, c, -1);
} }
} }
...@@ -20,6 +20,7 @@ import java.sql.PreparedStatement; ...@@ -20,6 +20,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Properties;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -39,14 +40,15 @@ public class PreparedStatementTest extends ComponentTestCase { ...@@ -39,14 +40,15 @@ public class PreparedStatementTest extends ComponentTestCase {
String url = "jdbc:mysql://localhost:2330/"; String url = "jdbc:mysql://localhost:2330/";
String dbName = "cat"; String dbName = "cat";
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String userName = "test"; Properties props = new Properties();
String password = "test"; props.setProperty("user", "test");
String arg = "?useServerPrepStmts=true"; props.setProperty("password", "test");
props.setProperty("useServerPrepStmts", "true");
String sql = "select type, sum(failures) from transaction where domain=? and starttime=?"; String sql = "select type, sum(failures) from transaction where domain=? and starttime=?";
Class.forName(driver).newInstance(); Class.forName(driver).newInstance();
DriverManager.setLoginTimeout(600); DriverManager.setLoginTimeout(600);
Connection conn = DriverManager.getConnection(url + dbName + (arg == null ? "" : arg), userName, password); Connection conn = DriverManager.getConnection(url + dbName, props);
PreparedStatement stmt = conn.prepareStatement(sql); PreparedStatement stmt = conn.prepareStatement(sql);
Assert.assertNotNull(stmt); Assert.assertNotNull(stmt);
stmt.setString(1, "MobiApi"); stmt.setString(1, "MobiApi");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册