ShardingPreparedStatement.java 7.8 KB
Newer Older
T
terrymanu 已提交
1
/*
T
terrymanu 已提交
2 3 4 5 6
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
T
terrymanu 已提交
7
 *
T
terrymanu 已提交
8
 *      http://www.apache.org/licenses/LICENSE-2.0
T
terrymanu 已提交
9
 *
T
terrymanu 已提交
10 11 12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

T
terrymanu 已提交
18
package com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.prepared;
T
terrymanu 已提交
19 20

import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor;
21
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
T
terrymanu 已提交
22
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
T
terrymanu 已提交
23 24
import com.dangdang.ddframe.rdb.sharding.jdbc.core.connection.ShardingConnection;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.BackendStatementWrapper;
T
terrymanu 已提交
25
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
26
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey;
T
terrymanu 已提交
27
import com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine;
T
terrymanu 已提交
28 29
import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.routing.SQLRouteResult;
G
Gao Hongtao 已提交
30 31 32
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
T
terrymanu 已提交
33

T
terrymanu 已提交
34 35 36 37 38 39
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
G
Gao Hongtao 已提交
40
import java.util.Objects;
T
terrymanu 已提交
41

T
terrymanu 已提交
42 43 44
/**
 * 支持分片的预编译语句对象.
 * 
T
terrymanu 已提交
45 46
 * @author zhangliang
 * @author caohao
T
terrymanu 已提交
47 48 49
 */
public final class ShardingPreparedStatement extends AbstractPreparedStatementAdapter {
    
T
terrymanu 已提交
50
    private final PreparedStatementRoutingEngine preparedStatementRoutingEngine;
T
terrymanu 已提交
51
    
G
Gao Hongtao 已提交
52
    private final List<PreparedStatementExecutorWrapper> cachedPreparedStatementWrappers = new ArrayList<>();
T
terrymanu 已提交
53
    
54 55
    private int batchIndex;
    
T
terrymanu 已提交
56
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql) {
G
fix #16  
gaohongtao 已提交
57
        this(shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
T
terrymanu 已提交
58 59
    }
    
T
terrymanu 已提交
60
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, 
G
gaohongtao 已提交
61
            final String sql, final int resultSetType, final int resultSetConcurrency) {
G
fix #16  
gaohongtao 已提交
62
        this(shardingConnection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
T
terrymanu 已提交
63 64
    }
    
T
terrymanu 已提交
65
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, 
66
            final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
G
fix #16  
gaohongtao 已提交
67
        super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability);
T
terrymanu 已提交
68
        preparedStatementRoutingEngine = new PreparedStatementRoutingEngine(sql, shardingConnection.getShardingContext());
T
terrymanu 已提交
69 70
    }
    
T
terrymanu 已提交
71
    public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) {
G
fix #16  
gaohongtao 已提交
72
        this(shardingConnection, sql);
T
terrymanu 已提交
73
        if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
T
terrymanu 已提交
74 75
            markReturnGeneratedKeys();
        }
T
terrymanu 已提交
76 77 78 79
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
80 81 82
        ResultSet rs;
        try {
            rs = ResultSetFactory.getResultSet(
83
                    new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeQuery(), getSqlRouteResult().getSqlStatement());
84 85 86 87 88
        } finally {
            clearRouteContext();
        }
        setCurrentResultSet(rs);
        return rs;
T
terrymanu 已提交
89 90 91 92
    }
    
    @Override
    public int executeUpdate() throws SQLException {
93 94 95 96 97
        try {
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeUpdate();
        } finally {
            clearRouteContext();
        }
T
terrymanu 已提交
98 99 100 101
    }
    
    @Override
    public boolean execute() throws SQLException {
102 103 104 105 106 107 108
        try {
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute();
        } finally {
            clearRouteContext();
        }
    }
    
109
    protected void clearRouteContext() throws SQLException {
G
Gao Hongtao 已提交
110 111
        resetBatch();
        cachedPreparedStatementWrappers.clear();
112
        batchIndex = 0;
G
Gao Hongtao 已提交
113 114 115 116 117
    }
    
    @Override
    public void clearBatch() throws SQLException {
        clearRouteContext();
T
terrymanu 已提交
118 119 120 121
    }
    
    @Override
    public void addBatch() throws SQLException {
122 123 124
        try {
            for (PreparedStatementExecutorWrapper each : routeSQL()) {
                each.getPreparedStatement().addBatch();
125
                each.mapBatchIndex(batchIndex);
126
            }
127
            batchIndex++;
128
        } finally {
G
Gao Hongtao 已提交
129
            resetBatch();
130
        }
T
terrymanu 已提交
131 132
    }
    
G
Gao Hongtao 已提交
133 134 135
    private void resetBatch() throws SQLException {
        super.clearRouteContext();
        clearParameters();
T
terrymanu 已提交
136 137 138 139
    }
    
    @Override
    public int[] executeBatch() throws SQLException {
140
        try {
141
            return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch(batchIndex);
142
        } finally {
G
Gao Hongtao 已提交
143
            clearRouteContext();
T
terrymanu 已提交
144 145 146
        }
    }
    
147
    private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
148
        List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
T
terrymanu 已提交
149
        SQLRouteResult sqlRouteResult = preparedStatementRoutingEngine.route(getParameters());
150
        setSqlRouteResult(sqlRouteResult);
T
terrymanu 已提交
151
        for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
T
terrymanu 已提交
152
            PreparedStatement preparedStatement = (PreparedStatement) getStatement(
153
                    getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatement().getType()), each.getSql());
T
terrymanu 已提交
154
            replayMethodsInvocation(preparedStatement);
G
gaohongtao 已提交
155
            getParameters().replayMethodsInvocation(preparedStatement);
G
Gao Hongtao 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
            result.add(wrap(preparedStatement, each));
        }
        return result;
    }
    
    private PreparedStatementExecutorWrapper wrap(final PreparedStatement preparedStatement, final SQLExecutionUnit sqlExecutionUnit) {
        Optional<PreparedStatementExecutorWrapper> wrapperOptional = Iterators.tryFind(cachedPreparedStatementWrappers.iterator(), new Predicate<PreparedStatementExecutorWrapper>() {
            @Override
            public boolean apply(final PreparedStatementExecutorWrapper input) {
                return Objects.equals(input.getPreparedStatement(), preparedStatement);
            }
        });
        if (wrapperOptional.isPresent()) {
            wrapperOptional.get().addBatchParameters(getParameters());
            return wrapperOptional.get();
T
terrymanu 已提交
171
        }
G
Gao Hongtao 已提交
172 173
        PreparedStatementExecutorWrapper result = new PreparedStatementExecutorWrapper(preparedStatement, getParameters(), sqlExecutionUnit);
        cachedPreparedStatementWrappers.add(result);
T
terrymanu 已提交
174 175 176
        return result;
    }
    
177
    @Override
178
    protected BackendStatementWrapper generateStatement(final Connection conn, final String shardingSql) throws SQLException {
179 180
        Optional<GeneratedKey> generatedKey = getGeneratedKey();
        if (isReturnGeneratedKeys() && generatedKey.isPresent()) {
T
terrymanu 已提交
181
            return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, RETURN_GENERATED_KEYS), shardingSql);
T
terrymanu 已提交
182
        }
183
        return new BackendPreparedStatementWrapper(conn.prepareStatement(shardingSql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), shardingSql);
T
terrymanu 已提交
184 185
    }
}