ShardingPreparedStatement.java 9.4 KB
Newer Older
T
terrymanu 已提交
1
/*
T
terrymanu 已提交
2 3 4 5 6
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
T
terrymanu 已提交
7
 *
T
terrymanu 已提交
8
 *      http://www.apache.org/licenses/LICENSE-2.0
T
terrymanu 已提交
9
 *
T
terrymanu 已提交
10 11 12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

T
terrymanu 已提交
18
package com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.prepared;
T
terrymanu 已提交
19 20

import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor;
21
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
T
terrymanu 已提交
22
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
T
terrymanu 已提交
23
import com.dangdang.ddframe.rdb.sharding.jdbc.core.connection.ShardingConnection;
T
terrymanu 已提交
24
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
25
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey;
T
terrymanu 已提交
26
import com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine;
T
terrymanu 已提交
27 28
import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.routing.SQLRouteResult;
G
Gao Hongtao 已提交
29 30 31
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
T
terrymanu 已提交
32

T
terrymanu 已提交
33 34 35 36 37
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
T
terrymanu 已提交
38
import java.util.LinkedList;
T
terrymanu 已提交
39
import java.util.List;
G
Gao Hongtao 已提交
40
import java.util.Objects;
T
terrymanu 已提交
41

T
terrymanu 已提交
42 43 44
/**
 * 支持分片的预编译语句对象.
 * 
T
terrymanu 已提交
45 46
 * @author zhangliang
 * @author caohao
T
terrymanu 已提交
47 48 49
 */
public final class ShardingPreparedStatement extends AbstractPreparedStatementAdapter {
    
T
terrymanu 已提交
50
    private final PreparedStatementRoutingEngine preparedStatementRoutingEngine;
T
terrymanu 已提交
51
    
T
terrymanu 已提交
52 53 54
    private final List<BackendPreparedStatementWrapper> cachedRoutedPreparedStatements = new LinkedList<>();
    
    private final List<PreparedStatementExecutorWrapper> cachedPreparedStatementWrappers = new LinkedList<>();
T
terrymanu 已提交
55
    
56 57
    private int batchIndex;
    
T
terrymanu 已提交
58
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql) {
G
fix #16  
gaohongtao 已提交
59
        this(shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
T
terrymanu 已提交
60 61
    }
    
T
terrymanu 已提交
62
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int resultSetType, final int resultSetConcurrency) {
G
fix #16  
gaohongtao 已提交
63
        this(shardingConnection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
T
terrymanu 已提交
64 65
    }
    
T
terrymanu 已提交
66
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
G
fix #16  
gaohongtao 已提交
67
        super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability);
T
terrymanu 已提交
68
        preparedStatementRoutingEngine = new PreparedStatementRoutingEngine(sql, shardingConnection.getShardingContext());
T
terrymanu 已提交
69 70
    }
    
T
terrymanu 已提交
71
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) {
G
fix #16  
gaohongtao 已提交
72
        this(shardingConnection, sql);
T
terrymanu 已提交
73
        if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
T
terrymanu 已提交
74 75
            markReturnGeneratedKeys();
        }
T
terrymanu 已提交
76 77 78 79
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
T
terrymanu 已提交
80
        ResultSet result;
81
        try {
T
terrymanu 已提交
82
            result = ResultSetFactory.getResultSet(
83
                    new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeQuery(), getRouteResult().getSqlStatement());
84 85 86
        } finally {
            clearRouteContext();
        }
T
terrymanu 已提交
87 88
        setCurrentResultSet(result);
        return result;
T
terrymanu 已提交
89 90 91 92
    }
    
    @Override
    public int executeUpdate() throws SQLException {
93 94 95 96 97
        try {
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeUpdate();
        } finally {
            clearRouteContext();
        }
T
terrymanu 已提交
98 99 100 101
    }
    
    @Override
    public boolean execute() throws SQLException {
102 103 104 105 106 107 108
        try {
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute();
        } finally {
            clearRouteContext();
        }
    }
    
109
    protected void clearRouteContext() throws SQLException {
G
Gao Hongtao 已提交
110 111
        resetBatch();
        cachedPreparedStatementWrappers.clear();
112
        batchIndex = 0;
G
Gao Hongtao 已提交
113 114 115 116 117
    }
    
    @Override
    public void clearBatch() throws SQLException {
        clearRouteContext();
T
terrymanu 已提交
118 119 120 121
    }
    
    @Override
    public void addBatch() throws SQLException {
122
        try {
T
terrymanu 已提交
123
            for (PreparedStatementExecutorWrapper each : routeSQLForBatch()) {
124
                each.getPreparedStatement().addBatch();
125
                each.mapBatchIndex(batchIndex);
126
            }
127
            batchIndex++;
128
        } finally {
G
Gao Hongtao 已提交
129
            resetBatch();
130
        }
T
terrymanu 已提交
131 132
    }
    
G
Gao Hongtao 已提交
133
    private void resetBatch() throws SQLException {
T
terrymanu 已提交
134
        setCurrentResultSet(null);
G
Gao Hongtao 已提交
135
        clearParameters();
T
terrymanu 已提交
136 137 138 139
    }
    
    @Override
    public int[] executeBatch() throws SQLException {
140
        try {
141
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch(batchIndex);
142
        } finally {
G
Gao Hongtao 已提交
143
            clearRouteContext();
T
terrymanu 已提交
144 145 146
        }
    }
    
147
    private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
148
        List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
T
terrymanu 已提交
149
        SQLRouteResult sqlRouteResult = preparedStatementRoutingEngine.route(getParameters());
150
        setRouteResult(sqlRouteResult);
T
terrymanu 已提交
151
        for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
T
terrymanu 已提交
152
            BackendPreparedStatementWrapper backendPreparedStatementWrapper = generateStatement(
153
                    getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatement().getType()), each.getSql());
T
terrymanu 已提交
154 155 156 157
            getRoutedStatements().add(backendPreparedStatementWrapper.getPreparedStatement());
            replayMethodsInvocation(backendPreparedStatementWrapper.getPreparedStatement());
            getParameters().replayMethodsInvocation(backendPreparedStatementWrapper.getPreparedStatement());
            result.add(wrap(backendPreparedStatementWrapper.getPreparedStatement(), each));
G
Gao Hongtao 已提交
158 159 160 161
        }
        return result;
    }
    
T
terrymanu 已提交
162 163 164
    private List<PreparedStatementExecutorWrapper> routeSQLForBatch() throws SQLException {
        List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
        SQLRouteResult sqlRouteResult = preparedStatementRoutingEngine.route(getParameters());
165
        setRouteResult(sqlRouteResult);
T
terrymanu 已提交
166
        for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
T
terrymanu 已提交
167
            PreparedStatement preparedStatement = getStatementForBatch(
T
terrymanu 已提交
168 169 170 171 172 173 174 175
                    getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatement().getType()), each.getSql());
            replayMethodsInvocation(preparedStatement);
            getParameters().replayMethodsInvocation(preparedStatement);
            result.add(wrap(preparedStatement, each));
        }
        return result;
    }
    
T
terrymanu 已提交
176
    private PreparedStatement getStatementForBatch(final Connection connection, final String sql) throws SQLException {
T
terrymanu 已提交
177
        for  (BackendPreparedStatementWrapper each : cachedRoutedPreparedStatements) {
T
terrymanu 已提交
178
            if (each.isBelongTo(connection, sql)) {
T
terrymanu 已提交
179
                return each.getPreparedStatement();
T
terrymanu 已提交
180 181
            }
        }
T
terrymanu 已提交
182 183 184 185 186 187 188 189 190 191 192 193
        BackendPreparedStatementWrapper statement = generateStatement(connection, sql);
        getRoutedStatements().add(statement.getPreparedStatement());
        cachedRoutedPreparedStatements.add(statement);
        return statement.getPreparedStatement();
    }
    
    private BackendPreparedStatementWrapper generateStatement(final Connection connection, final String sql) throws SQLException {
        Optional<GeneratedKey> generatedKey = getGeneratedKey();
        if (isReturnGeneratedKeys() && generatedKey.isPresent()) {
            return new BackendPreparedStatementWrapper(connection.prepareStatement(sql, RETURN_GENERATED_KEYS), sql);
        }
        return new BackendPreparedStatementWrapper(connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), sql);
T
terrymanu 已提交
194 195
    }
    
G
Gao Hongtao 已提交
196 197
    private PreparedStatementExecutorWrapper wrap(final PreparedStatement preparedStatement, final SQLExecutionUnit sqlExecutionUnit) {
        Optional<PreparedStatementExecutorWrapper> wrapperOptional = Iterators.tryFind(cachedPreparedStatementWrappers.iterator(), new Predicate<PreparedStatementExecutorWrapper>() {
198
            
G
Gao Hongtao 已提交
199 200 201 202 203 204 205 206
            @Override
            public boolean apply(final PreparedStatementExecutorWrapper input) {
                return Objects.equals(input.getPreparedStatement(), preparedStatement);
            }
        });
        if (wrapperOptional.isPresent()) {
            wrapperOptional.get().addBatchParameters(getParameters());
            return wrapperOptional.get();
T
terrymanu 已提交
207
        }
208
        PreparedStatementExecutorWrapper result = new PreparedStatementExecutorWrapper(getRouteResult().getSqlStatement().getType(), preparedStatement, sqlExecutionUnit);
G
Gao Hongtao 已提交
209
        cachedPreparedStatementWrappers.add(result);
T
terrymanu 已提交
210 211 212
        return result;
    }
}