提交 34ffbd7a 编写于 作者: F Frankie Wu

changes to bee-engine and add cat-data project

上级 704dab8a
...@@ -11,6 +11,10 @@ import com.dianping.bee.engine.spi.RowFilter; ...@@ -11,6 +11,10 @@ import com.dianping.bee.engine.spi.RowFilter;
import com.dianping.bee.engine.spi.SingleTableStatement; import com.dianping.bee.engine.spi.SingleTableStatement;
import com.dianping.bee.engine.spi.StatementManager; import com.dianping.bee.engine.spi.StatementManager;
import com.dianping.bee.engine.spi.TableProviderManager; import com.dianping.bee.engine.spi.TableProviderManager;
import com.dianping.bee.engine.spi.handler.internal.DefaultServerQueryHandler;
import com.dianping.bee.engine.spi.handler.internal.DescHandler;
import com.dianping.bee.engine.spi.handler.internal.ShowHandler;
import com.dianping.bee.engine.spi.handler.internal.UseHandler;
import com.dianping.bee.engine.spi.internal.DefaultMultiTableStatement; import com.dianping.bee.engine.spi.internal.DefaultMultiTableStatement;
import com.dianping.bee.engine.spi.internal.DefaultRowFilter; import com.dianping.bee.engine.spi.internal.DefaultRowFilter;
import com.dianping.bee.engine.spi.internal.DefaultSingleTableStatement; import com.dianping.bee.engine.spi.internal.DefaultSingleTableStatement;
...@@ -19,6 +23,8 @@ import com.dianping.bee.engine.spi.internal.DefaultTableProviderManager; ...@@ -19,6 +23,8 @@ import com.dianping.bee.engine.spi.internal.DefaultTableProviderManager;
import com.dianping.bee.engine.spi.internal.MultiTableStatementVisitor; import com.dianping.bee.engine.spi.internal.MultiTableStatementVisitor;
import com.dianping.bee.engine.spi.internal.SingleTableStatementVisitor; import com.dianping.bee.engine.spi.internal.SingleTableStatementVisitor;
import com.dianping.bee.engine.spi.internal.TableHelper; import com.dianping.bee.engine.spi.internal.TableHelper;
import com.dianping.bee.engine.spi.session.DefaultSessionManager;
import com.dianping.bee.engine.spi.session.SessionManager;
import com.dianping.bee.server.InformationSchemaDatabase; import com.dianping.bee.server.InformationSchemaDatabase;
import com.dianping.bee.server.SimpleServer; import com.dianping.bee.server.SimpleServer;
import com.dianping.bee.server.handler.SimpleDescHandler; import com.dianping.bee.server.handler.SimpleDescHandler;
...@@ -40,8 +46,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -40,8 +46,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(DatabaseProvider.class, "cat", CatDatabase.class)); all.add(C(DatabaseProvider.class, "cat", CatDatabase.class));
all.add(C(DatabaseProvider.class, "dog", DogDatabase.class)); all.add(C(DatabaseProvider.class, "dog", DogDatabase.class));
all.add(C(SessionManager.class, DefaultSessionManager.class));
all.add(C(TableProviderManager.class, DefaultTableProviderManager.class) // all.add(C(TableProviderManager.class, DefaultTableProviderManager.class) //
.req(DatabaseProvider.class)); .req(SessionManager.class));
all.add(C(StatementManager.class, DefaultStatementManager.class)); all.add(C(StatementManager.class, DefaultStatementManager.class));
all.add(C(SingleTableStatement.class, DefaultSingleTableStatement.class).is(PER_LOOKUP)); all.add(C(SingleTableStatement.class, DefaultSingleTableStatement.class).is(PER_LOOKUP));
all.add(C(MultiTableStatement.class, DefaultMultiTableStatement.class).is(PER_LOOKUP)); all.add(C(MultiTableStatement.class, DefaultMultiTableStatement.class).is(PER_LOOKUP));
...@@ -65,9 +72,21 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -65,9 +72,21 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(SimpleServerQueryHandler.class).is(PER_LOOKUP) // all.add(C(SimpleServerQueryHandler.class).is(PER_LOOKUP) //
.req(SimpleSelectHandler.class, SimpleShowHandler.class, SimpleDescHandler.class, SimpleUseHandler.class)); .req(SimpleSelectHandler.class, SimpleShowHandler.class, SimpleDescHandler.class, SimpleUseHandler.class));
defineHandlers(all);
return all; return all;
} }
private void defineHandlers(List<Component> all) {
all.add(C(DefaultServerQueryHandler.class).is(PER_LOOKUP) //
.req(SimpleSelectHandler.class, ShowHandler.class, DescHandler.class, UseHandler.class));
all.add(C(UseHandler.class));
all.add(C(ShowHandler.class));
all.add(C(DescHandler.class) //
.req(TableProviderManager.class));
}
public static void main(String[] args) { public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator()); generatePlexusComponentsXmlFile(new ComponentsConfigurator());
} }
......
package com.dianping.bee.engine.spi;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
import com.dianping.bee.engine.spi.meta.RowSet;
public interface TableExecutor<S, T extends ColumnMeta> {
public RowSet execute(S index, T[] columns, RowFilter filter);
}
package com.dianping.bee.engine.spi; package com.dianping.bee.engine.spi;
public interface TableProviderManager { public interface TableProviderManager {
public String getDatabaseName();
public TableProvider getTableProvider(String table); public TableProvider getTableProvider(String table);
} }
package com.dianping.bee.engine.spi.handler;
import java.nio.ByteBuffer;
import java.util.List;
import com.alibaba.cobar.net.util.PacketUtil;
import com.alibaba.cobar.protocol.MySQLPacket;
import com.alibaba.cobar.protocol.mysql.EOFPacket;
import com.alibaba.cobar.protocol.mysql.FieldPacket;
import com.alibaba.cobar.protocol.mysql.OkPacket;
import com.alibaba.cobar.protocol.mysql.ResultSetHeaderPacket;
import com.alibaba.cobar.protocol.mysql.RowDataPacket;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.util.StringUtil;
import com.site.helper.Splitters;
import com.site.lookup.ContainerHolder;
public abstract class AbstractCommandHandler extends ContainerHolder implements CommandHandler {
protected void error(ServerConnection c, int errorCode, String pattern, Object... args) {
if (args.length == 0) {
c.writeErrMessage(errorCode, pattern);
} else {
c.writeErrMessage(errorCode, String.format(pattern, args));
}
}
protected abstract void handle(ServerConnection c, List<String> parts);
@Override
public void handle(String sql, ServerConnection c, int offset) {
List<String> parts = Splitters.by(' ').noEmptyItem().trim().split(sql.substring(offset + 1));
handle(c, parts);
}
protected String unescape(String str) {
if (str == null || str.length() < 2) {
return str;
}
int length = str.length();
if (str.charAt(0) == '`' && str.charAt(length - 1) == '`') {
return str.substring(1, length - 1);
} else {
return str;
}
}
protected ByteBuffer writeHeader(ServerConnection c, ByteBuffer buffer, MySQLPacket packet) {
return packet.write(buffer, c);
}
protected static class CommandContext {
private ServerConnection m_conn;
private ByteBuffer m_buffer;
private String m_charset;
private byte m_packetId;
public CommandContext(ServerConnection c) {
m_conn = c;
m_buffer = c.allocate();
m_charset = m_conn.getCharset();
m_packetId = 1;
}
public void complete() {
m_conn.write(m_buffer);
}
public void write(MySQLPacket packet) {
m_buffer = packet.write(m_buffer, m_conn);
}
public void writeEOF() {
EOFPacket eof = new EOFPacket();
eof.packetId = m_packetId++;
write(eof);
}
public void writeField(String name, int fieldType) {
FieldPacket field = PacketUtil.getField(name, fieldType);
field.packetId = m_packetId++;
write(field);
}
public void writeHeader(int fieldCount) {
ResultSetHeaderPacket header = PacketUtil.getHeader(fieldCount);
header.packetId = m_packetId++;
write(header);
}
public void writeOk() {
m_buffer = m_conn.writeToBuffer(OkPacket.OK, m_buffer);
}
public void writeRow(String... values) {
int cols = values.length;
RowDataPacket row = new RowDataPacket(cols);
for (int i = 0; i < cols; i++) {
row.add(StringUtil.encode(values[i], m_charset));
}
row.packetId = m_packetId++;
write(row);
}
}
}
package com.dianping.bee.engine.spi.handler;
import com.alibaba.cobar.server.ServerConnection;
public interface CommandHandler {
public void handle(String sql, ServerConnection c, int offset);
}
package com.dianping.bee.engine.spi.handler.internal;
import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.net.handler.FrontendQueryHandler;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.handler.BeginHandler;
import com.alibaba.cobar.server.handler.ExplainHandler;
import com.alibaba.cobar.server.handler.KillHandler;
import com.alibaba.cobar.server.handler.SavepointHandler;
import com.alibaba.cobar.server.handler.SetHandler;
import com.alibaba.cobar.server.handler.StartHandler;
import com.dianping.bee.server.handler.SimpleSelectHandler;
import com.dianping.bee.server.parse.SimpleServerParse;
import com.site.lookup.annotation.Inject;
public class DefaultServerQueryHandler implements FrontendQueryHandler {
@Inject
private SimpleSelectHandler m_selectHandler;
@Inject
private ShowHandler m_showHandler;
@Inject
private DescHandler m_descHandler;
@Inject
private UseHandler m_useHandler;
private ServerConnection m_conn;
@Override
public void query(String sql) {
ServerConnection c = m_conn;
int rs = SimpleServerParse.parse(sql);
switch (rs & 0xff) {
case SimpleServerParse.EXPLAIN:
ExplainHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.SET:
SetHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.DESC:
m_descHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.SHOW:
m_showHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.SELECT:
m_selectHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.START:
StartHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.BEGIN:
BeginHandler.handle(sql, c);
break;
case SimpleServerParse.SAVEPOINT:
SavepointHandler.handle(sql, c);
break;
case SimpleServerParse.KILL:
KillHandler.handle(sql, rs >>> 8, c);
break;
case SimpleServerParse.KILL_QUERY:
c.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported command");
break;
case SimpleServerParse.USE:
m_useHandler.handle(sql, c, rs >>> 8);
break;
case SimpleServerParse.COMMIT:
c.commit();
break;
case SimpleServerParse.ROLLBACK:
c.rollback();
break;
default:
c.execute(sql, rs);
}
}
public void setServerConnection(ServerConnection c) {
m_conn = c;
}
}
package com.dianping.bee.engine.spi.handler.internal;
import java.util.List;
import com.alibaba.cobar.CobarServer;
import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.Fields;
import com.alibaba.cobar.config.model.SchemaConfig;
import com.alibaba.cobar.server.ServerConnection;
import com.dianping.bee.engine.spi.TableProvider;
import com.dianping.bee.engine.spi.TableProviderManager;
import com.dianping.bee.engine.spi.handler.AbstractCommandHandler;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
import com.dianping.bee.engine.spi.meta.internal.TypeUtils;
public class DescHandler extends AbstractCommandHandler {
private TableProviderManager m_manager;
@Override
public void handle(ServerConnection c, List<String> parts) {
String tableName = unescape(parts.get(0));
String db = c.getSchema();
if (db == null) {
error(c, ErrorCode.ER_NO_DB_ERROR, "No database selected");
return;
}
SchemaConfig schema = CobarServer.getInstance().getConfig().getSchemas().get(db);
if (schema == null) {
error(c, ErrorCode.ER_BAD_DB_ERROR, "Unknown database '%s'", db);
return;
}
TableProvider table = m_manager.getTableProvider(tableName);
if (table == null) {
error(c, ErrorCode.ER_BAD_TABLE_ERROR, "Unknown table '%s'", tableName);
return;
}
ColumnMeta[] columns = table.getColumns();
CommandContext ctx = new CommandContext(c);
String[] names = { "Field", "Type", "Null", "Key", "Default", "Extra" };
ctx.writeHeader(names.length);
for (String name : names) {
ctx.writeField(name, Fields.FIELD_TYPE_VAR_STRING);
}
ctx.writeEOF();
for (ColumnMeta column : columns) {
String[] values = new String[names.length];
int index = 0;
values[index++] = column.getName();
values[index++] = TypeUtils.convertFieldTypeToString(TypeUtils.convertJavaTypeToFieldType(column.getType()));
ctx.writeRow(values);
}
ctx.writeEOF();
ctx.complete();
}
}
package com.dianping.bee.engine.spi.handler.internal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.cobar.CobarServer;
import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.Fields;
import com.alibaba.cobar.config.model.SchemaConfig;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.response.ShowDatabases;
import com.dianping.bee.engine.spi.DatabaseProvider;
import com.dianping.bee.engine.spi.TableProvider;
import com.dianping.bee.engine.spi.handler.AbstractCommandHandler;
public class ShowHandler extends AbstractCommandHandler {
@Override
public void handle(ServerConnection c, List<String> parts) {
int len = parts.size();
String first = len > 0 ? parts.get(0) : null;
String second = len > 1 ? parts.get(1) : null;
String third = len > 2 ? parts.get(2) : null;
String forth = len > 3 ? parts.get(3) : null;
if ("databases".equalsIgnoreCase(first)) {
ShowDatabases.response(c);
} else if ("tables".equalsIgnoreCase(first)) {
showTables(c);
} else if ("table".equalsIgnoreCase(first)) {
if ("status".equalsIgnoreCase(second) && "from".equalsIgnoreCase(third)) {
showTableStatus(c, unescape(forth));
}
} else if ("status".equalsIgnoreCase(first)) {
showStatus(c);
} else if ("variables".equalsIgnoreCase(first)) {
showVariables(c);
} else {
error(c, ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported show command");
}
}
private void showStatus(ServerConnection c) {
Map<String, String> map = new HashMap<String, String>();
map.put("SampleName", "SampleValue");
// TODO real data here
CommandContext ctx = new CommandContext(c);
String[] names = { "Variable_name", "Value" };
ctx.writeHeader(names.length);
for (String name : names) {
ctx.writeField(name, Fields.FIELD_TYPE_VAR_STRING);
}
ctx.writeEOF();
for (Map.Entry<String, String> e : map.entrySet()) {
String[] values = new String[names.length];
int index = 0;
values[index++] = e.getKey();
values[index++] = e.getValue();
ctx.writeRow(values);
}
ctx.writeEOF();
ctx.complete();
}
private void showTables(ServerConnection c) {
// 检查当前使用的DB
String db = c.getSchema();
if (db == null) {
c.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected");
return;
}
SchemaConfig schema = CobarServer.getInstance().getConfig().getSchemas().get(db);
if (schema == null) {
c.writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'");
return;
}
DatabaseProvider provider = null;
try {
provider = lookup(DatabaseProvider.class, db);
} catch (Exception e) {
// ignore it
}
if (provider == null) {
c.writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Can not load database '" + db + "'");
return;
}
TableProvider[] tables = provider.getTables();
CommandContext ctx = new CommandContext(c);
ctx.writeHeader(1);
ctx.writeField("TABLE", Fields.FIELD_TYPE_VAR_STRING);
ctx.writeEOF();
for (TableProvider table : tables) {
ctx.writeRow(table.getName());
}
ctx.writeEOF();
ctx.complete();
}
private void showTableStatus(ServerConnection c, String dbName) {
if (dbName == null) {
error(c, ErrorCode.ER_NO_DB_ERROR, "No database specified");
return;
}
DatabaseProvider provider = null;
try {
provider = lookup(DatabaseProvider.class, dbName);
} catch (Exception e) {
error(c, ErrorCode.ER_BAD_DB_ERROR, "Can not load database '%s'", dbName);
return;
}
TableProvider[] tables = provider.getTables();
CommandContext ctx = new CommandContext(c);
String[] names = { "Name", "Engine", "Version", "Row_format", "Rows", "Avg_row_length", "Data_length", "Max_data_length",
"Index_length", "Data_free", "Auto_increment", "Create_time", "Update_time", "Check_time", "Collation", "Checksum",
"Create_options", "Comment" };
ctx.writeHeader(names.length);
for (String name : names) {
ctx.writeField(name, Fields.FIELD_TYPE_VAR_STRING);
}
ctx.writeEOF();
for (TableProvider table : tables) {
String[] values = new String[names.length];
int index = 0;
values[index++] = table.getName();
values[index++] = "Bee";
values[index++] = "1.0";
ctx.writeRow(values);
}
ctx.writeEOF();
ctx.complete();
}
private void showVariables(ServerConnection c) {
Map<String, String> map = new HashMap<String, String>();
map.put("BeeStatus", "Good");
// TODO real data here
CommandContext ctx = new CommandContext(c);
String[] names = { "Variable_name", "Value" };
ctx.writeHeader(names.length);
for (String name : names) {
ctx.writeField(name, Fields.FIELD_TYPE_VAR_STRING);
}
ctx.writeEOF();
for (Map.Entry<String, String> e : map.entrySet()) {
String[] values = new String[names.length];
int index = 0;
values[index++] = e.getKey();
values[index++] = e.getValue();
ctx.writeRow(values);
}
ctx.writeEOF();
ctx.complete();
}
}
package com.dianping.bee.engine.spi.handler.internal;
import java.util.List;
import java.util.Set;
import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.net.handler.FrontendPrivileges;
import com.alibaba.cobar.server.ServerConnection;
import com.dianping.bee.engine.spi.handler.AbstractCommandHandler;
public class UseHandler extends AbstractCommandHandler {
@Override
public void handle(ServerConnection c, List<String> parts) {
String schema = unescape(parts.get(0));
// 检查schema的有效性
FrontendPrivileges privileges = c.getPrivileges();
if (schema == null || !privileges.schemaExists(schema)) {
error(c, ErrorCode.ER_BAD_DB_ERROR, "Unknown database '%s'", schema);
return;
}
String user = c.getUser();
if (!privileges.userExists(user, c.getHost())) {
error(c, ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '%s'", c.getUser());
return;
}
Set<String> schemas = privileges.getUserSchemas(user);
if (schemas == null || schemas.size() == 0 || schemas.contains(schema)) {
CommandContext ctx = new CommandContext(c);
c.setSchema(schema);
ctx.writeOk();
ctx.complete();
} else {
error(c, ErrorCode.ER_DBACCESS_DENIED_ERROR, "Access denied for user '%s' to database '%s'", c.getUser(), schema);
}
}
}
package com.dianping.bee.engine.spi.internal; package com.dianping.bee.engine.spi.internal;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
...@@ -9,30 +10,40 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationExce ...@@ -9,30 +10,40 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationExce
import com.dianping.bee.engine.spi.DatabaseProvider; import com.dianping.bee.engine.spi.DatabaseProvider;
import com.dianping.bee.engine.spi.TableProvider; import com.dianping.bee.engine.spi.TableProvider;
import com.dianping.bee.engine.spi.TableProviderManager; import com.dianping.bee.engine.spi.TableProviderManager;
import com.dianping.bee.engine.spi.session.SessionManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
public class DefaultTableProviderManager implements TableProviderManager, Initializable { public class DefaultTableProviderManager extends ContainerHolder implements TableProviderManager, Initializable {
@Inject @Inject
private DatabaseProvider m_databaseProvider; private SessionManager m_sessionManager;
private Map<String, TableProvider> m_tables = new HashMap<String, TableProvider>(); private Map<String, Map<String, TableProvider>> m_map = new HashMap<String, Map<String, TableProvider>>();
@Override
public String getDatabaseName() {
return m_databaseProvider.getName();
}
@Override @Override
public TableProvider getTableProvider(String table) { public TableProvider getTableProvider(String table) {
return m_tables.get(table.toUpperCase()); String database = m_sessionManager.getSession().getDatabase();
Map<String, TableProvider> map = m_map.get(database);
if (map != null) {
return map.get(table.toUpperCase());
} else {
return null;
}
} }
@Override @Override
public void initialize() throws InitializationException { public void initialize() throws InitializationException {
TableProvider[] tables = m_databaseProvider.getTables(); List<DatabaseProvider> providers = lookupList(DatabaseProvider.class);
for (DatabaseProvider provider : providers) {
Map<String, TableProvider> map = new HashMap<String, TableProvider>();
for (TableProvider table : provider.getTables()) {
map.put(table.getName().toUpperCase(), table);
}
for (TableProvider table : tables) { m_map.put(provider.getName(), map);
m_tables.put(table.getName().toUpperCase(), table);
} }
} }
} }
package com.dianping.bee.engine.spi.meta;
import com.dianping.bee.engine.spi.RowFilter;
public interface Indexer {
public Index getMeta();
public void setValue(int index, Object value);
public void addValue(int index, Object value, PredicateType type);
public void scan(RowSet rowset, RowFilter filter);
public static enum PredicateType {
LT("<"),
LE("<="),
EQ("="),
GT(">"),
GE(">=");
private String m_symbol;
private PredicateType(String symbol) {
m_symbol = symbol;
}
public String getSymbol() {
return m_symbol;
}
}
}
package com.dianping.bee.engine.spi.session;
public class DefaultSession implements Session {
private String m_database;
@Override
public String getDatabase() {
System.out.println(Thread.currentThread()+":"+m_database);
return m_database;
}
@Override
public void setDatabase(String database) {
m_database = database;
System.out.println(Thread.currentThread()+":"+m_database);
}
}
package com.dianping.bee.engine.spi.session;
public class DefaultSessionManager implements SessionManager {
private ThreadLocal<Session> m_threadLocalSession = new ThreadLocal<Session>() {
@Override
protected Session initialValue() {
return new DefaultSession();
}
};
@Override
public Session getSession() {
return m_threadLocalSession.get();
}
@Override
public void removeSession() {
m_threadLocalSession.remove();
}
}
package com.dianping.bee.engine.spi.session;
public interface Session {
public String getDatabase();
public void setDatabase(String database);
}
package com.dianping.bee.engine.spi.session;
public interface SessionManager {
public Session getSession();
public void removeSession();
}
...@@ -21,12 +21,15 @@ import com.alibaba.cobar.ErrorCode; ...@@ -21,12 +21,15 @@ import com.alibaba.cobar.ErrorCode;
import com.alibaba.cobar.net.util.MySQLMessage; import com.alibaba.cobar.net.util.MySQLMessage;
import com.alibaba.cobar.protocol.mysql.OkPacket; import com.alibaba.cobar.protocol.mysql.OkPacket;
import com.alibaba.cobar.server.ServerConnection; import com.alibaba.cobar.server.ServerConnection;
import com.dianping.bee.engine.spi.session.SessionManager;
/** /**
* @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a> * @author <a href="mailto:yiming.liu@dianping.com">Yiming Liu</a>
*/ */
public class SimpleServerConnection extends ServerConnection { public class SimpleServerConnection extends ServerConnection {
private SessionManager m_sessionManager;
/** /**
* @param channel * @param channel
*/ */
...@@ -44,20 +47,20 @@ public class SimpleServerConnection extends ServerConnection { ...@@ -44,20 +47,20 @@ public class SimpleServerConnection extends ServerConnection {
String db = mm.readString(); String db = mm.readString();
// 检查schema是否已经设置 // 检查schema是否已经设置
// if (schema != null) { // if (schema != null) {
// if (schema.equals(db)) { // if (schema.equals(db)) {
// write(writeToBuffer(OkPacket.OK, allocate())); // write(writeToBuffer(OkPacket.OK, allocate()));
// } else { // } else {
// writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, "Not allowed to change the database!"); // writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, "Not allowed to change the database!");
// } // }
// return; // return;
// } // }
// 检查schema的有效性 // 检查schema的有效性
// if (db == null || !privileges.schemaExists(db)) { // if (db == null || !privileges.schemaExists(db)) {
// writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'"); // writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'");
// return; // return;
// } // }
if (!privileges.userExists(user, host)) { if (!privileges.userExists(user, host)) {
writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'"); writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'");
return; return;
...@@ -71,4 +74,14 @@ public class SimpleServerConnection extends ServerConnection { ...@@ -71,4 +74,14 @@ public class SimpleServerConnection extends ServerConnection {
writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
} }
} }
@Override
public void setSchema(String schema) {
super.setSchema(schema);
m_sessionManager.getSession().setDatabase(schema);
}
public void setSessionManager(SessionManager sessionManager) {
m_sessionManager = sessionManager;
}
} }
...@@ -9,9 +9,12 @@ import com.alibaba.cobar.CobarPrivileges; ...@@ -9,9 +9,12 @@ import com.alibaba.cobar.CobarPrivileges;
import com.alibaba.cobar.Isolations; import com.alibaba.cobar.Isolations;
import com.alibaba.cobar.net.FrontendConnection; import com.alibaba.cobar.net.FrontendConnection;
import com.alibaba.cobar.net.factory.FrontendConnectionFactory; import com.alibaba.cobar.net.factory.FrontendConnectionFactory;
import com.alibaba.cobar.net.handler.FrontendQueryHandler;
import com.alibaba.cobar.server.ServerConnection; import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.session.BlockingSession; import com.alibaba.cobar.server.session.BlockingSession;
import com.alibaba.cobar.server.session.NonBlockingSession; import com.alibaba.cobar.server.session.NonBlockingSession;
import com.dianping.bee.engine.spi.handler.internal.DefaultServerQueryHandler;
import com.dianping.bee.engine.spi.session.SessionManager;
import com.dianping.bee.server.handler.SimpleServerQueryHandler; import com.dianping.bee.server.handler.SimpleServerQueryHandler;
/** /**
...@@ -22,9 +25,10 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory { ...@@ -22,9 +25,10 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory {
@Override @Override
protected FrontendConnection getConnection(SocketChannel channel) { protected FrontendConnection getConnection(SocketChannel channel) {
ServerConnection c = new SimpleServerConnection(channel); SimpleServerConnection c = new SimpleServerConnection(channel);
SimpleServerQueryHandler queryHandler = getQueryHandler(c); FrontendQueryHandler queryHandler = getDefaultQueryHandler(c); // TODO use another one for test
c.setSessionManager(getSessionManager());
c.setQueryHandler(queryHandler); c.setQueryHandler(queryHandler);
c.setPrivileges(new CobarPrivileges()); c.setPrivileges(new CobarPrivileges());
c.setTxIsolation(Isolations.REPEATED_READ); c.setTxIsolation(Isolations.REPEATED_READ);
...@@ -33,7 +37,19 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory { ...@@ -33,7 +37,19 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory {
return c; return c;
} }
protected SimpleServerQueryHandler getQueryHandler(ServerConnection c) { protected DefaultServerQueryHandler getDefaultQueryHandler(ServerConnection c) {
try {
DefaultServerQueryHandler queryHandler = m_container.lookup(DefaultServerQueryHandler.class);
queryHandler.setServerConnection(c);
return queryHandler;
} catch (ComponentLookupException e) {
throw new RuntimeException(
"Unable to get DefaultServerQueryHandler instance, please check if the environment is setup correctly!", e);
}
}
protected SimpleServerQueryHandler getSimpleQueryHandler(ServerConnection c) {
try { try {
SimpleServerQueryHandler queryHandler = m_container.lookup(SimpleServerQueryHandler.class); SimpleServerQueryHandler queryHandler = m_container.lookup(SimpleServerQueryHandler.class);
...@@ -41,8 +57,17 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory { ...@@ -41,8 +57,17 @@ public class SimpleServerConnectionFactory extends FrontendConnectionFactory {
return queryHandler; return queryHandler;
} catch (ComponentLookupException e) { } catch (ComponentLookupException e) {
throw new RuntimeException( throw new RuntimeException(
"Unable to get SimpleServerQueryHandler instance, please check if the environment is setup correctly!", "Unable to get SimpleServerQueryHandler instance, please check if the environment is setup correctly!", e);
e); }
}
protected SessionManager getSessionManager() {
try {
SessionManager sessionManager = m_container.lookup(SessionManager.class);
return sessionManager;
} catch (ComponentLookupException e) {
throw new RuntimeException("Unable to get SessionManager instance, please check if the environment is setup correctly!", e);
} }
} }
......
...@@ -19,12 +19,16 @@ ...@@ -19,12 +19,16 @@
<role-hint>dog</role-hint> <role-hint>dog</role-hint>
<implementation>com.dianping.bee.db.DogDatabase</implementation> <implementation>com.dianping.bee.db.DogDatabase</implementation>
</component> </component>
<component>
<role>com.dianping.bee.engine.spi.session.SessionManager</role>
<implementation>com.dianping.bee.engine.spi.session.DefaultSessionManager</implementation>
</component>
<component> <component>
<role>com.dianping.bee.engine.spi.TableProviderManager</role> <role>com.dianping.bee.engine.spi.TableProviderManager</role>
<implementation>com.dianping.bee.engine.spi.internal.DefaultTableProviderManager</implementation> <implementation>com.dianping.bee.engine.spi.internal.DefaultTableProviderManager</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.bee.engine.spi.DatabaseProvider</role> <role>com.dianping.bee.engine.spi.session.SessionManager</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
...@@ -138,5 +142,41 @@ ...@@ -138,5 +142,41 @@
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
<component>
<role>com.dianping.bee.engine.spi.handler.internal.DefaultServerQueryHandler</role>
<implementation>com.dianping.bee.engine.spi.handler.internal.DefaultServerQueryHandler</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.bee.server.handler.SimpleSelectHandler</role>
</requirement>
<requirement>
<role>com.dianping.bee.engine.spi.handler.internal.ShowHandler</role>
</requirement>
<requirement>
<role>com.dianping.bee.engine.spi.handler.internal.DescHandler</role>
</requirement>
<requirement>
<role>com.dianping.bee.engine.spi.handler.internal.UseHandler</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.bee.engine.spi.handler.internal.UseHandler</role>
<implementation>com.dianping.bee.engine.spi.handler.internal.UseHandler</implementation>
</component>
<component>
<role>com.dianping.bee.engine.spi.handler.internal.ShowHandler</role>
<implementation>com.dianping.bee.engine.spi.handler.internal.ShowHandler</implementation>
</component>
<component>
<role>com.dianping.bee.engine.spi.handler.internal.DescHandler</role>
<implementation>com.dianping.bee.engine.spi.handler.internal.DescHandler</implementation>
<requirements>
<requirement>
<role>com.dianping.bee.engine.spi.TableProviderManager</role>
</requirement>
</requirements>
</component>
</components> </components>
</plexus> </plexus>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-data</artifactId>
<name>CAT Data Service</name>
<dependencies>
<dependency>
<groupId>com.dianping.bee</groupId>
<artifactId>bee-engine</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.dianping.cat.data;
import com.dianping.bee.engine.spi.DatabaseProvider;
public class CatDatabaseProvider implements DatabaseProvider {
public static final String ID = "cat";
@Override
public String getName() {
return ID;
}
@Override
public CatTableProvider[] getTables() {
return CatTableProvider.values();
}
}
package com.dianping.cat.data;
import com.dianping.bee.engine.spi.TableProvider;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
import com.dianping.bee.engine.spi.meta.Index;
import com.dianping.bee.engine.spi.meta.RowSet;
import com.dianping.cat.data.event.EventColumn;
import com.dianping.cat.data.event.EventIndex;
import com.dianping.cat.data.transaction.TransactionColumn;
import com.dianping.cat.data.transaction.TransactionIndex;
public enum CatTableProvider implements TableProvider {
Transaction("transaction", TransactionColumn.values(), TransactionIndex.values()),
Event("event", EventColumn.values(), EventIndex.values()),
Heartbeat("heartbeat"),
Problem("problem");
private String m_name;
private ColumnMeta[] m_columns;
private Index[] m_indexes;
private CatTableProvider(String name) {
m_name = name;
}
private CatTableProvider(String name, ColumnMeta[] columns, Index[] indexes) {
m_name = name;
m_columns = columns;
m_indexes = indexes;
}
@Override
public ColumnMeta[] getColumns() {
return m_columns;
}
@Override
public Index[] getIndexes() {
return m_indexes;
}
@Override
public String getName() {
return m_name;
}
@Override
public RowSet queryByIndex(Index index, ColumnMeta[] selectColumns) {
return null;
}
}
\ No newline at end of file
package com.dianping.cat.data.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.bee.engine.spi.DatabaseProvider;
import com.dianping.cat.data.CatDatabaseProvider;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(DatabaseProvider.class, CatDatabaseProvider.ID, CatDatabaseProvider.class));
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
package com.dianping.cat.data.event;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
public enum EventColumn implements ColumnMeta {
StartTime(String.class), // 20120822(for daily), 2012082213(for hour)
Domain(String.class), // MobileApi
Type(String.class), // URL
Name(String.class), // /deallist.bin
TotalCount(Integer.class), // 2033
Failures(Integer.class), // 5
SampleMessage(String.class); // MobileApi-0a0101a6-1345600834200-1
private String m_name;
private Class<?> m_type;
private EventColumn(Class<?> type) {
m_type = type;
m_name = name().toLowerCase();
}
public static EventColumn findByName(String name) {
for (EventColumn column : values()) {
if (column.getName().equalsIgnoreCase(name)) {
return column;
}
}
throw new RuntimeException(String.format("Column(%s) is not found in %s", name, EventColumn.class.getName()));
}
@Override
public String getName() {
return m_name;
}
@Override
public Class<?> getType() {
return m_type;
}
}
package com.dianping.cat.data.event;
import com.dianping.bee.engine.spi.meta.Index;
public enum EventIndex implements Index {
IDX_STARTTIME_DOMAIN(EventColumn.StartTime, false, EventColumn.Domain, true);
private EventColumn[] m_columns;
private boolean[] m_orders;
private EventIndex(Object... args) {
int length = args.length;
if (length % 2 != 0) {
throw new IllegalArgumentException(String.format("Parameters should be paired for %s(%s)!", getClass(), name()));
}
m_columns = new EventColumn[length / 2];
m_orders = new boolean[length / 2];
for (int i = 0; i < length / 2; i++) {
m_columns[i] = (EventColumn) args[2 * i];
m_orders[i] = (Boolean) args[2 * i + 1];
}
}
@Override
public EventColumn getColumn(int index) {
if (index >= 0 && index < m_columns.length) {
return m_columns[index];
} else {
throw new IndexOutOfBoundsException("size: " + m_columns.length + ", index: " + index);
}
}
@Override
public int getLength() {
return m_columns.length;
}
@Override
public boolean isAscend(int index) {
if (index >= 0 && index < m_orders.length) {
return m_orders[index];
} else {
throw new IndexOutOfBoundsException("size: " + m_orders.length + ", index: " + index);
}
}
}
\ No newline at end of file
package com.dianping.cat.data.transaction;
import com.dianping.bee.engine.spi.meta.ColumnMeta;
public enum TransactionColumn implements ColumnMeta {
StartTime(String.class), // 20120822(for daily), 2012082213(for hour)
Domain(String.class), // MobileApi
Type(String.class), // URL
Name(String.class), // /deallist.bin
TotalCount(Integer.class), // 2033
Failures(Integer.class), // 5
SampleMessage(String.class), // MobileApi-0a0101a6-1345600834200-1
MinDuration(Integer.class), // 1
MaxDuration(Integer.class), // 1234
SumDuration(Long.class), // 123456
Sum2Duration(Long.class), // 2364233
Line95(Integer.class); // 123
private String m_name;
private Class<?> m_type;
private TransactionColumn(Class<?> type) {
m_type = type;
m_name = name().toLowerCase();
}
public static TransactionColumn findByName(String name) {
for (TransactionColumn column : values()) {
if (column.getName().equalsIgnoreCase(name)) {
return column;
}
}
throw new RuntimeException(String.format("Column(%s) is not found in %s", name, TransactionColumn.class.getName()));
}
@Override
public String getName() {
return m_name;
}
@Override
public Class<?> getType() {
return m_type;
}
}
package com.dianping.cat.data.transaction;
import com.dianping.bee.engine.spi.RowFilter;
import com.dianping.bee.engine.spi.TableExecutor;
import com.dianping.bee.engine.spi.meta.RowSet;
import com.dianping.bee.engine.spi.meta.internal.DefaultRowSet;
public class TransactionExecutor implements TableExecutor<TransactionIndex, TransactionColumn> {
@Override
public RowSet execute(TransactionIndex index, TransactionColumn[] columns, RowFilter filter) {
RowSet rowset = new DefaultRowSet(columns);
if (index == TransactionIndex.IDX_STARTTIME_DOMAIN) {
} else {
}
return rowset;
}
}
package com.dianping.cat.data.transaction;
import com.dianping.bee.engine.spi.meta.Index;
public enum TransactionIndex implements Index {
IDX_STARTTIME_DOMAIN(TransactionColumn.StartTime, false, TransactionColumn.Domain, true);
private TransactionColumn[] m_columns;
private boolean[] m_orders;
private TransactionIndex(Object... args) {
int length = args.length;
if (length % 2 != 0) {
throw new IllegalArgumentException(String.format("Parameters should be paired for %s(%s)!", getClass(), name()));
}
m_columns = new TransactionColumn[length / 2];
m_orders = new boolean[length / 2];
for (int i = 0; i < length / 2; i++) {
m_columns[i] = (TransactionColumn) args[2 * i];
m_orders[i] = (Boolean) args[2 * i + 1];
}
}
@Override
public TransactionColumn getColumn(int index) {
if (index >= 0 && index < m_columns.length) {
return m_columns[index];
} else {
throw new IndexOutOfBoundsException("size: " + m_columns.length + ", index: " + index);
}
}
@Override
public int getLength() {
return m_columns.length;
}
@Override
public boolean isAscend(int index) {
if (index >= 0 && index < m_orders.length) {
return m_orders[index];
} else {
throw new IndexOutOfBoundsException("size: " + m_orders.length + ", index: " + index);
}
}
}
\ No newline at end of file
<plexus>
<components>
<component>
<role>com.dianping.bee.engine.spi.DatabaseProvider</role>
<role-hint>cat</role-hint>
<implementation>com.dianping.cat.data.CatDatabaseProvider</implementation>
</component>
</components>
</plexus>
...@@ -3,6 +3,12 @@ package com.dianping.dog.build; ...@@ -3,6 +3,12 @@ package com.dianping.dog.build;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dianping.dog.alarm.rule.DefaultRuleContext;
import com.dianping.dog.alarm.rule.RuleContext;
import com.dianping.dog.event.DefaultEventDispatcher;
import com.dianping.dog.event.DefaultEventListenerRegistry;
import com.dianping.dog.event.EventDispatcher;
import com.dianping.dog.event.EventListenerRegistry;
import com.site.lookup.configuration.AbstractResourceConfigurator; import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component; import com.site.lookup.configuration.Component;
...@@ -11,6 +17,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -11,6 +17,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() { public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>(); List<Component> all = new ArrayList<Component>();
all.add(C(EventListenerRegistry.class, DefaultEventListenerRegistry.class));
all.add(C(EventDispatcher.class, DefaultEventDispatcher.class) //
.req(EventListenerRegistry.class));
all.add(C(RuleContext.class, DefaultRuleContext.class).is(PER_LOOKUP));
// Please keep it as last // Please keep it as last
all.addAll(new WebComponentConfigurator().defineComponents()); all.addAll(new WebComponentConfigurator().defineComponents());
......
<plexus> <plexus>
<components> <components>
<component>
<role>com.dianping.dog.event.EventListenerRegistry</role>
<implementation>com.dianping.dog.event.DefaultEventListenerRegistry</implementation>
</component>
<component>
<role>com.dianping.dog.event.EventDispatcher</role>
<implementation>com.dianping.dog.event.DefaultEventDispatcher</implementation>
<requirements>
<requirement>
<role>com.dianping.dog.event.EventListenerRegistry</role>
</requirement>
</requirements>
</component>
<component> <component>
<role>com.site.web.mvc.model.ModuleRegistry</role> <role>com.site.web.mvc.model.ModuleRegistry</role>
<implementation>com.site.web.mvc.model.ModuleRegistry</implementation> <implementation>com.site.web.mvc.model.ModuleRegistry</implementation>
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
<module>dog-gateway</module> <module>dog-gateway</module>
<module>dog-home</module> <module>dog-home</module>
<module>bee-engine</module> <module>bee-engine</module>
<module>cat-data</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册