AbstractJDBCImporter.java 7.0 KB
Newer Older
K
KomachiSion 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
19

A
avalon566 已提交
20 21
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
22
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
23
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
24 25 26 27 28 29 30 31
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
A
avalon566 已提交
32

A
avalon566 已提交
33
import javax.sql.DataSource;
A
avalon566 已提交
34
import java.sql.Connection;
A
avalon566 已提交
35 36 37
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
38
import java.util.ArrayList;
39
import java.util.Collections;
K
KomachiSion 已提交
40
import java.util.List;
A
avalon566 已提交
41 42

/**
43
 * Abstract JDBC importer implementation.
A
avalon566 已提交
44
 */
45
@Slf4j
46
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor implements Importer {
47
    
A
avalon566 已提交
48
    private final RdbmsConfiguration rdbmsConfiguration;
A
avalon566 已提交
49
    
50
    private final DataSourceManager dataSourceManager;
51 52 53
    
    private final AbstractSqlBuilder sqlBuilder;
    
54 55
    @Setter
    private Channel channel;
56
    
57
    public AbstractJDBCImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
A
avalon566 已提交
58
        this.rdbmsConfiguration = rdbmsConfiguration;
59
        this.dataSourceManager = dataSourceManager;
60
        sqlBuilder = createSqlBuilder();
A
avalon566 已提交
61
    }
62 63 64 65 66 67
    
    /**
     * Create sql builder.
     *
     * @return sql builder
     */
68
    protected abstract AbstractSqlBuilder createSqlBuilder();
69
    
70
    @Override
71 72
    public final void start() {
        super.start();
73
        write();
74
    }
75
    
A
avalon566 已提交
76
    @Override
77
    public final void write() {
78 79 80 81 82 83 84
        while (isRunning()) {
            List<Record> records = channel.fetchRecords(100, 3);
            if (null != records && records.size() > 0) {
                flush(dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()), records);
                if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
                    channel.ack();
                    break;
A
avalon566 已提交
85 86
                }
            }
87
            channel.ack();
A
avalon566 已提交
88 89
        }
    }
90
    
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    private void flush(final DataSource dataSource, final List<Record> buffer) {
        List<Record> unflushed = tryFlush(dataSource, buffer);
        if (isRunning() && unflushed.size() > 0) {
            throw new SyncTaskExecuteException("write failed.");
        }
    }
    
    private List<Record> tryFlush(final DataSource dataSource, final List<Record> buffer) {
        int retryTimes = rdbmsConfiguration.getRetryTimes();
        List<Record> unflushed = buffer;
        do {
            unflushed = doFlush(dataSource, unflushed);
        } while (isRunning() && unflushed.size() > 0 && retryTimes-- > 0);
        return unflushed;
    }
    
    private List<Record> doFlush(final DataSource dataSource, final List<Record> buffer) {
        int i = 0;
A
avalon566 已提交
109
        try (Connection connection = dataSource.getConnection()) {
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
            for (; i < buffer.size(); i++) {
                execute(connection, buffer.get(i));
            }
        } catch (SQLException ex) {
            log.error("flush failed: {}", buffer.get(i), ex);
            return buffer.subList(i, buffer.size());
        }
        return Collections.emptyList();
    }
    
    private void execute(final Connection connection, final Record record) throws SQLException {
        if (DataRecord.class.equals(record.getClass())) {
            DataRecord dataRecord = (DataRecord) record;
            switch (dataRecord.getType()) {
                case "BOOTSTRAP-INSERT":
                case "INSERT":
                    executeInsert(connection, dataRecord);
                    break;
                case "UPDATE":
                    executeUpdate(connection, dataRecord);
                    break;
                case "DELETE":
                    executeDelete(connection, dataRecord);
                    break;
                default:
                    break;
A
avalon566 已提交
136 137 138
            }
        }
    }
139
    
A
avalon566 已提交
140
    private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
141
        String insertSql = sqlBuilder.buildInsertSQL(record);
A
avalon566 已提交
142 143 144 145 146 147 148
        PreparedStatement ps = connection.prepareStatement(insertSql);
        ps.setQueryTimeout(30);
        try {
            for (int i = 0; i < record.getColumnCount(); i++) {
                ps.setObject(i + 1, record.getColumn(i).getValue());
            }
            ps.execute();
149
        } catch (SQLIntegrityConstraintViolationException ignored) {
A
avalon566 已提交
150 151
        }
    }
152
    
A
avalon566 已提交
153
    private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
154
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
155
        List<Column> values = new ArrayList<>();
156
        values.addAll(RecordUtil.extractUpdatedColumns(record));
157 158
        values.addAll(conditionColumns);
        String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
A
avalon566 已提交
159
        PreparedStatement ps = connection.prepareStatement(updateSql);
A
avalon566 已提交
160 161 162 163 164
        for (int i = 0; i < values.size(); i++) {
            ps.setObject(i + 1, values.get(i).getValue());
        }
        ps.execute();
    }
165
    
A
avalon566 已提交
166
    private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
167 168
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
        String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
A
avalon566 已提交
169
        PreparedStatement ps = connection.prepareStatement(deleteSql);
170 171
        for (int i = 0; i < conditionColumns.size(); i++) {
            ps.setObject(i + 1, conditionColumns.get(i).getValue());
A
avalon566 已提交
172 173 174 175
        }
        ps.execute();
    }
}