未验证 提交 be99018e 编写于 作者: J Juan Pan(Trista) 提交者: GitHub

Create RDLBackendHandler (#6934)

上级 d0626f97
......@@ -22,13 +22,9 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.kernel.context.SchemaContext;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.SQLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.engine.RDLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.engine.jdbc.JDBCExecuteEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.wrapper.JDBCExecutorWrapper;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.wrapper.PreparedStatementExecutorWrapper;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.wrapper.StatementExecutorWrapper;
import org.apache.shardingsphere.rdl.parser.statement.rdl.RDLStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.SQLStatement;
import java.util.List;
......@@ -60,7 +56,7 @@ public final class DatabaseCommunicationEngineFactory {
*/
public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatement sqlStatement, final String sql, final BackendConnection backendConnection) {
SchemaContext schema = backendConnection.getSchema();
return new JDBCDatabaseCommunicationEngine(sql, backendConnection, createSQLExecuteEngine(schema, sqlStatement, backendConnection, new StatementExecutorWrapper(schema, sqlStatement)));
return new JDBCDatabaseCommunicationEngine(sql, backendConnection, new JDBCExecuteEngine(backendConnection, new StatementExecutorWrapper(schema, sqlStatement)));
}
/**
......@@ -75,10 +71,6 @@ public final class DatabaseCommunicationEngineFactory {
public DatabaseCommunicationEngine newBinaryProtocolInstance(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final BackendConnection backendConnection) {
SchemaContext schema = backendConnection.getSchema();
return new JDBCDatabaseCommunicationEngine(sql,
backendConnection, createSQLExecuteEngine(schema, sqlStatement, backendConnection, new PreparedStatementExecutorWrapper(schema, sqlStatement, parameters)));
}
private SQLExecuteEngine createSQLExecuteEngine(final SchemaContext schema, final SQLStatement sqlStatement, final BackendConnection backendConnection, final JDBCExecutorWrapper executorWrapper) {
return sqlStatement instanceof RDLStatement ? new RDLExecuteEngine(schema, sqlStatement) : new JDBCExecuteEngine(backendConnection, executorWrapper);
backendConnection, new JDBCExecuteEngine(backendConnection, new PreparedStatementExecutorWrapper(schema, sqlStatement, parameters)));
}
}
......@@ -23,6 +23,7 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.text.admin.BroadcastBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.admin.RDLBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.admin.ShowDatabasesBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.admin.ShowTablesBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.admin.UnicastBackendHandler;
......@@ -33,12 +34,14 @@ import org.apache.shardingsphere.proxy.backend.text.sctl.utils.SCTLUtils;
import org.apache.shardingsphere.proxy.backend.text.transaction.SkipBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.transaction.TransactionBackendHandler;
import org.apache.shardingsphere.rdl.parser.engine.ShardingSphereSQLParserEngineFactory;
import org.apache.shardingsphere.rdl.parser.statement.rdl.RDLStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.dal.DALStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.dal.SetStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.dal.dialect.mysql.ShowDatabasesStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.dal.dialect.mysql.ShowTablesStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.dal.dialect.mysql.UseStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.ddl.CreateDatabaseStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.tcl.BeginTransactionStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.tcl.CommitStatement;
import org.apache.shardingsphere.sql.parser.sql.statement.tcl.RollbackStatement;
......@@ -70,6 +73,9 @@ public final class TextProtocolBackendHandlerFactory {
return ShardingCTLBackendHandlerFactory.newInstance(trimSQL, backendConnection);
}
SQLStatement sqlStatement = ShardingSphereSQLParserEngineFactory.getSQLParserEngine(databaseType.getName()).parse(sql, false);
if (sqlStatement instanceof RDLStatement || sqlStatement instanceof CreateDatabaseStatement) {
return new RDLBackendHandler(backendConnection, sqlStatement);
}
if (sqlStatement instanceof TCLStatement) {
return createTCLBackendHandler(sql, (TCLStatement) sqlStatement, backendConnection);
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.engine;
package org.apache.shardingsphere.proxy.backend.text.admin;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.callback.orchestration.DataSourceCallback;
......@@ -24,17 +24,15 @@ import org.apache.shardingsphere.infra.callback.orchestration.SchemaNameCallback
import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.kernel.context.SchemaContext;
import org.apache.shardingsphere.kernel.context.StandardSchemaContexts;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.SQLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.config.util.DataSourceConverter;
import org.apache.shardingsphere.proxy.config.yaml.YamlDataSourceParameter;
import org.apache.shardingsphere.proxy.convert.CreateDataSourcesStatementContextConverter;
......@@ -52,30 +50,25 @@ import org.apache.shardingsphere.sql.parser.sql.statement.SQLStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
/**
* RDL execute engine.
* Backer handler for rdl.
*/
@RequiredArgsConstructor
public final class RDLExecuteEngine implements SQLExecuteEngine {
public final class RDLBackendHandler implements TextProtocolBackendHandler {
private final SchemaContext schema;
private final BackendConnection backendConnection;
private final SQLStatement sqlStatement;
@Override
public ExecutionContext execute(final String sql) {
return new ExecutionContext(getSqlStatementContext(), Collections.singleton(getExecutionUnit(sql)));
}
@Override
public BackendResponse execute(final ExecutionContext executionContext) {
public BackendResponse execute() {
SQLStatementContext context = getSqlStatementContext();
if (!isRegistryCenterExisted()) {
return new ErrorResponse(new SQLException("No Registry center to execute `%s` SQL", executionContext.getSqlStatementContext().getClass().getSimpleName()));
return new ErrorResponse(new SQLException("No Registry center to execute `%s` SQL", context.getClass().getSimpleName()));
}
return getBackendResponse(executionContext.getSqlStatementContext());
return getBackendResponse(context);
}
private BackendResponse execute(final CreateSchemaStatementContext context) {
......@@ -90,7 +83,7 @@ public final class RDLExecuteEngine implements SQLExecuteEngine {
Map<String, YamlDataSourceParameter> parameters = new CreateDataSourcesStatementContextConverter().convert(context);
Map<String, DataSourceConfiguration> dataSources = DataSourceConverter.getDataSourceConfigurationMap(DataSourceConverter.getDataSourceParameterMap2(parameters));
// TODO Need to get the executed feedback from registry center for returning.
DataSourceCallback.getInstance().run(schema.getName(), dataSources);
DataSourceCallback.getInstance().run(backendConnection.getSchema().getName(), dataSources);
UpdateResponse result = new UpdateResponse();
result.setType("CREATE");
return result;
......@@ -100,7 +93,7 @@ public final class RDLExecuteEngine implements SQLExecuteEngine {
YamlShardingRuleConfiguration configurations = new CreateShardingRuleStatementContextConverter().convert(context);
Collection<RuleConfiguration> rules = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(Collections.singleton(configurations));
// TODO Need to get the executed feedback from registry center for returning.
RuleCallback.getInstance().run(schema.getName(), rules);
RuleCallback.getInstance().run(backendConnection.getSchema().getName(), rules);
UpdateResponse result = new UpdateResponse();
result.setType("CREATE");
return result;
......@@ -116,10 +109,6 @@ public final class RDLExecuteEngine implements SQLExecuteEngine {
return new CreateSchemaStatementContext((CreateSchemaStatement) sqlStatement);
}
private ExecutionUnit getExecutionUnit(final String sql) {
return new ExecutionUnit("", new SQLUnit(sql, new LinkedList<>()));
}
private BackendResponse getBackendResponse(final SQLStatementContext context) {
if (context instanceof CreateSchemaStatementContext) {
return execute((CreateSchemaStatementContext) context);
......@@ -133,4 +122,14 @@ public final class RDLExecuteEngine implements SQLExecuteEngine {
private boolean isRegistryCenterExisted() {
return !(ProxySchemaContexts.getInstance().getSchemaContexts() instanceof StandardSchemaContexts);
}
@Override
public boolean next() {
return false;
}
@Override
public QueryData getQueryData() {
return new QueryData(Collections.emptyList(), Collections.emptyList());
}
}
......@@ -15,81 +15,54 @@
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.backend.communication.jdbc.execute;
package org.apache.shardingsphere.proxy.backend.text.admin;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.kernel.context.SchemaContext;
import org.apache.shardingsphere.kernel.context.StandardSchemaContexts;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.engine.RDLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.OrchestrationSchemaContextsFixture;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
import org.apache.shardingsphere.rdl.parser.binder.context.CreateDataSourcesStatementContext;
import org.apache.shardingsphere.rdl.parser.binder.context.CreateShardingRuleStatementContext;
import org.apache.shardingsphere.rdl.parser.statement.rdl.CreateDataSourcesStatement;
import org.apache.shardingsphere.rdl.parser.statement.rdl.CreateShardingRuleStatement;
import org.apache.shardingsphere.rdl.parser.statement.rdl.TableRuleSegment;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class RDLExecuteEngineTest {
private CreateDataSourcesStatementContext dataSourcesContext;
private CreateShardingRuleStatementContext ruleContext;
@Before
public void setUp() {
createDataSourcesContext();
createRuleContext();
}
private void createDataSourcesContext() {
dataSourcesContext = new CreateDataSourcesStatementContext(new CreateDataSourcesStatement(Collections.emptyList()), new MySQLDatabaseType());
}
private void createRuleContext() {
TableRuleSegment segment = new TableRuleSegment();
segment.setLogicTable("t_order");
segment.setDataSources(Arrays.asList("ds0", "ds1"));
segment.setShardingColumn("order_id");
segment.setAlgorithmType("MOD");
segment.setProperties(Collections.singleton("2"));
ruleContext = new CreateShardingRuleStatementContext(new CreateShardingRuleStatement(Collections.singleton(segment)));
}
public final class RDLBackendHandlerTest {
@Test
public void assertExecuteDataSourcesContext() {
SchemaContext context = new SchemaContext("sharding_db", null, null);
RDLExecuteEngine executeEngine = new RDLExecuteEngine(context, mock(CreateDataSourcesStatement.class));
BackendResponse response = executeEngine.execute(new ExecutionContext(dataSourcesContext, new LinkedList<>()));
BackendConnection connection = mock(BackendConnection.class);
when(connection.getSchema()).thenReturn(context);
RDLBackendHandler executeEngine = new RDLBackendHandler(connection, mock(CreateDataSourcesStatement.class));
BackendResponse response = executeEngine.execute();
assertThat(response, instanceOf(ErrorResponse.class));
setOrchestrationSchemaContexts(true);
response = executeEngine.execute(new ExecutionContext(dataSourcesContext, new LinkedList<>()));
response = executeEngine.execute();
assertThat(response, instanceOf(UpdateResponse.class));
}
@Test
public void assertExecuteShardingRuleContext() {
SchemaContext context = new SchemaContext("sharding_db", null, null);
RDLExecuteEngine executeEngine = new RDLExecuteEngine(context, mock(CreateShardingRuleStatement.class));
BackendResponse response = executeEngine.execute(new ExecutionContext(ruleContext, new LinkedList<>()));
BackendConnection connection = mock(BackendConnection.class);
when(connection.getSchema()).thenReturn(context);
RDLBackendHandler executeEngine = new RDLBackendHandler(connection, mock(CreateShardingRuleStatement.class));
BackendResponse response = executeEngine.execute();
assertThat(response, instanceOf(ErrorResponse.class));
setOrchestrationSchemaContexts(true);
response = executeEngine.execute(new ExecutionContext(ruleContext, new LinkedList<>()));
response = executeEngine.execute();
assertThat(response, instanceOf(UpdateResponse.class));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册