/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.driver.executor; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext; import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts; import org.apache.shardingsphere.infra.database.DefaultSchema; import org.apache.shardingsphere.infra.executor.kernel.InputGroup; import org.apache.shardingsphere.infra.executor.sql.QueryResult; import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.StatementExecuteUnit; import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLExecutor; import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLExecutorCallback; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema; import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials; import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher; import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory; import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier; import org.apache.shardingsphere.infra.route.context.RouteUnit; import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule; import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import javax.sql.DataSource; import java.sql.SQLException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; /** * Abstract statement executor. */ @Getter @RequiredArgsConstructor public abstract class AbstractStatementExecutor { static { ShardingSphereServiceLoader.register(SchemaChangedNotifier.class); } private final Map dataSourceMap; private final MetaDataContexts metaDataContexts; private final SQLExecutor sqlExecutor; protected final boolean isNeedAccumulate(final Collection rules, final SQLStatementContext sqlStatementContext) { return rules.stream().anyMatch(each -> ((DataNodeContainedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames())); } protected final int accumulate(final List results) { return results.stream().mapToInt(each -> null == each ? 0 : each).sum(); } @SuppressWarnings({"unchecked", "rawtypes"}) protected final void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection routeUnits) throws SQLException { if (null == sqlStatement) { return; } Optional schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement); if (schemaRefresher.isPresent()) { Collection routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList()); SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps()); schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials); notifySchemaChanged(DefaultSchema.LOGIC_NAME, metaData.getSchema()); } } private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) { OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema)); } protected final boolean executeAndRefreshMetaData(final Collection> inputGroups, final SQLStatement sqlStatement, final Collection routeUnits, final SQLExecutorCallback sqlExecutorCallback) throws SQLException { List result = sqlExecutor.execute(inputGroups, sqlExecutorCallback); refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits); return null != result && !result.isEmpty() && null != result.get(0) && result.get(0); } /** * Execute SQL. * * @param inputGroups input groups * @param sqlStatement SQL statement * @param routeUnits route units * @return return true if is DQL, false if is DML * @throws SQLException SQL exception */ public abstract boolean execute(Collection> inputGroups, SQLStatement sqlStatement, Collection routeUnits) throws SQLException; /** * Execute query. * * @param inputGroups input groups * @return result set list * @throws SQLException SQL exception */ public abstract List executeQuery(Collection> inputGroups) throws SQLException; /** * Execute update. * * @param inputGroups input groups * @param sqlStatementContext SQL statement context * @param routeUnits route units * @return effected records count * @throws SQLException SQL exception */ public abstract int executeUpdate(Collection> inputGroups, SQLStatementContext sqlStatementContext, Collection routeUnits) throws SQLException; }