AbstractJDBCImporter.java 8.8 KB
Newer Older
K
KomachiSion 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
19

A
avalon566 已提交
20 21
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
22
import org.apache.commons.collections4.CollectionUtils;
A
avalon5666 已提交
23
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
24
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
25
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
26 27 28
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
29
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
30 31
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
32
import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
33
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
34
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
邱鹿 Lucas 已提交
35
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
A
avalon566 已提交
36

A
avalon566 已提交
37
import javax.sql.DataSource;
A
avalon566 已提交
38
import java.sql.Connection;
A
avalon566 已提交
39 40
import java.sql.PreparedStatement;
import java.sql.SQLException;
K
KomachiSion 已提交
41
import java.util.List;
A
avalon5666 已提交
42 43
import java.util.Map;
import java.util.Set;
44
import java.util.stream.Collectors;
A
avalon566 已提交
45 46

/**
47
 * Abstract JDBC importer implementation.
A
avalon566 已提交
48
 */
49
@Slf4j
邱鹿 Lucas 已提交
50
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
51
    
52 53
    private static final DataRecordMerger MERGER = new DataRecordMerger();
    
54
    private final ImporterConfiguration importerConfig;
A
avalon566 已提交
55
    
56
    private final DataSourceManager dataSourceManager;
57
    
58
    private final AbstractSQLBuilder sqlBuilder;
59
    
60 61
    @Setter
    private Channel channel;
62
    
63 64
    protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
        this.importerConfig = importerConfig;
65
        this.dataSourceManager = dataSourceManager;
A
avalon5666 已提交
66
        sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
A
avalon566 已提交
67
    }
68 69
    
    /**
70
     * Create SQL builder.
71
     *
A
avalon5666 已提交
72
     * @param shardingColumnsMap sharding columns map
73
     * @return SQL builder
74
     */
A
avalon5666 已提交
75
    protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
76
    
77
    @Override
78 79
    public final void start() {
        super.start();
80
        write();
81
    }
82
    
A
avalon566 已提交
83
    @Override
84
    public final void write() {
85
        while (isRunning()) {
86
            List<Record> records = channel.fetchRecords(1024, 3);
L
Liang Zhang 已提交
87
            if (null != records && !records.isEmpty()) {
88
                flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfiguration()), records);
89 90 91
                if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
                    channel.ack();
                    break;
A
avalon566 已提交
92 93
                }
            }
94
            channel.ack();
A
avalon566 已提交
95 96
        }
    }
97
    
98
    private void flush(final DataSource dataSource, final List<Record> buffer) {
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
        List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream()
                .filter(each -> each instanceof DataRecord)
                .map(each -> (DataRecord) each)
                .collect(Collectors.toList()));
        groupedDataRecords.forEach(each -> {
            if (CollectionUtils.isNotEmpty(each.getDeleteDataRecords())) {
                flushInternal(dataSource, each.getDeleteDataRecords());
            }
            if (CollectionUtils.isNotEmpty(each.getInsertDataRecords())) {
                flushInternal(dataSource, each.getInsertDataRecords());
            }
            if (CollectionUtils.isNotEmpty(each.getUpdateDataRecords())) {
                flushInternal(dataSource, each.getUpdateDataRecords());
            }
        });
    }
    
    private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
117 118
        boolean success = tryFlush(dataSource, buffer);
        if (isRunning() && !success) {
119 120 121 122
            throw new SyncTaskExecuteException("write failed.");
        }
    }
    
123
    private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
124
        int retryTimes = importerConfig.getRetryTimes();
125
        do {
126 127 128 129 130 131 132 133
            try {
                doFlush(dataSource, buffer);
                return true;
            } catch (SQLException ex) {
                log.error("flush failed: ", ex);
            }
        } while (isRunning() && retryTimes-- > 0);
        return false;
134 135
    }
    
136
    private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
A
avalon566 已提交
137
        try (Connection connection = dataSource.getConnection()) {
A
avalon5666 已提交
138
            connection.setAutoCommit(false);
139
            switch (buffer.get(0).getType()) {
140
                case ScalingConstant.INSERT:
141
                    executeBatchInsert(connection, buffer);
142
                    break;
143
                case ScalingConstant.UPDATE:
144
                    executeUpdate(connection, buffer);
145
                    break;
146
                case ScalingConstant.DELETE:
147
                    executeBatchDelete(connection, buffer);
148 149 150
                    break;
                default:
                    break;
A
avalon566 已提交
151
            }
152
            connection.commit();
A
avalon566 已提交
153 154
        }
    }
155
    
156 157
    private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
        String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
158 159
        PreparedStatement ps = connection.prepareStatement(insertSql);
        ps.setQueryTimeout(30);
160 161 162
        for (DataRecord each : dataRecords) {
            for (int i = 0; i < each.getColumnCount(); i++) {
                ps.setObject(i + 1, each.getColumn(i).getValue());
163
            }
164 165 166 167 168 169 170 171
            ps.addBatch();
        }
        ps.executeBatch();
    }
    
    private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
        for (DataRecord each : dataRecords) {
            executeUpdate(connection, each);
A
avalon566 已提交
172 173
        }
    }
174
    
A
avalon566 已提交
175
    private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
176 177 178 179 180 181 182 183 184 185 186 187 188 189
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
        List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
        String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
        PreparedStatement ps = connection.prepareStatement(updateSql);
        for (int i = 0; i < updatedColumns.size(); i++) {
            ps.setObject(i + 1, updatedColumns.get(i).getValue());
        }
        for (int i = 0; i < conditionColumns.size(); i++) {
            Column keyColumn = conditionColumns.get(i);
            ps.setObject(updatedColumns.size() + i + 1,
                    // sharding column can not be updated
                    (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
        }
        ps.execute();
A
avalon566 已提交
190
    }
191
    
192 193 194 195 196 197 198 199 200 201 202
    private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecords.get(0), importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
        String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
        PreparedStatement ps = connection.prepareStatement(deleteSQL);
        ps.setQueryTimeout(30);
        for (DataRecord each : dataRecords) {
            conditionColumns = RecordUtil.extractConditionColumns(each, importerConfig.getShardingColumnsMap().get(each.getTableName()));
            for (int i = 0; i < conditionColumns.size(); i++) {
                ps.setObject(i + 1, conditionColumns.get(i).getValue());
            }
            ps.addBatch();
A
avalon566 已提交
203
        }
204
        ps.executeBatch();
A
avalon566 已提交
205 206
    }
}