未验证 提交 c168ae56 编写于 作者: A avalon5666 提交者: GitHub

Merge and batch execute the rows change (#8147)

* Fix jdbc dumper judge primary key error

* Merge and batch execute the rows change (#7836)
上级 c50e3da5
......@@ -94,7 +94,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
record.setType(ScalingConstant.INSERT);
record.setTableName(inventoryDumperConfiguration.getTableNameMap().get(inventoryDumperConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i)));
record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i - 1)));
}
pushRecord(record);
}
......
......@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
......@@ -28,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
......@@ -36,9 +38,8 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Abstract JDBC importer implementation.
......@@ -46,6 +47,8 @@ import java.util.List;
@Slf4j
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
private static final DataRecordMerger MERGER = new DataRecordMerger();
private final ImporterConfiguration importerConfig;
private final DataSourceManager dataSourceManager;
......@@ -77,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
@Override
public final void write() {
while (isRunning()) {
List<Record> records = channel.fetchRecords(100, 3);
List<Record> records = channel.fetchRecords(1024, 3);
if (null != records && !records.isEmpty()) {
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfiguration()), records);
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
......@@ -90,65 +93,79 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void flush(final DataSource dataSource, final List<Record> buffer) {
List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream()
.filter(each -> each instanceof DataRecord)
.map(each -> (DataRecord) each)
.collect(Collectors.toList()));
groupedDataRecords.forEach(each -> {
if (CollectionUtils.isNotEmpty(each.getDeleteDataRecords())) {
flushInternal(dataSource, each.getDeleteDataRecords());
}
if (CollectionUtils.isNotEmpty(each.getInsertDataRecords())) {
flushInternal(dataSource, each.getInsertDataRecords());
}
if (CollectionUtils.isNotEmpty(each.getUpdateDataRecords())) {
flushInternal(dataSource, each.getUpdateDataRecords());
}
});
}
private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
boolean success = tryFlush(dataSource, buffer);
if (isRunning() && !success) {
throw new SyncTaskExecuteException("write failed.");
}
}
private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) {
private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
int retryTimes = importerConfig.getRetryTimes();
List<Record> unflushed = buffer;
do {
unflushed = doFlush(dataSource, unflushed);
} while (isRunning() && !unflushed.isEmpty() && retryTimes-- > 0);
return unflushed.isEmpty();
try {
doFlush(dataSource, buffer);
return true;
} catch (SQLException ex) {
log.error("flush failed: ", ex);
}
} while (isRunning() && retryTimes-- > 0);
return false;
}
private List<Record> doFlush(final DataSource dataSource, final List<Record> buffer) {
int i = 0;
private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
for (; i < buffer.size(); i++) {
execute(connection, buffer.get(i));
}
connection.commit();
} catch (final SQLException ex) {
log.error("flush failed: {}", buffer.get(i), ex);
return buffer.subList(i, buffer.size());
}
return Collections.emptyList();
}
private void execute(final Connection connection, final Record record) throws SQLException {
if (DataRecord.class.equals(record.getClass())) {
DataRecord dataRecord = (DataRecord) record;
switch (dataRecord.getType()) {
switch (buffer.get(0).getType()) {
case ScalingConstant.INSERT:
executeInsert(connection, dataRecord);
executeBatchInsert(connection, buffer);
break;
case ScalingConstant.UPDATE:
executeUpdate(connection, dataRecord);
executeUpdate(connection, buffer);
break;
case ScalingConstant.DELETE:
executeDelete(connection, dataRecord);
executeBatchDelete(connection, buffer);
break;
default:
break;
}
connection.commit();
}
}
private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
String insertSql = sqlBuilder.buildInsertSQL(record);
private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
try {
for (int i = 0; i < record.getColumnCount(); i++) {
ps.setObject(i + 1, record.getColumn(i).getValue());
for (DataRecord each : dataRecords) {
for (int i = 0; i < each.getColumnCount(); i++) {
ps.setObject(i + 1, each.getColumn(i).getValue());
}
ps.execute();
} catch (final SQLIntegrityConstraintViolationException ignored) {
ps.addBatch();
}
ps.executeBatch();
}
private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
for (DataRecord each : dataRecords) {
executeUpdate(connection, each);
}
}
......@@ -169,13 +186,18 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
ps.execute();
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < conditionColumns.size(); i++) {
ps.setObject(i + 1, conditionColumns.get(i).getValue());
private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecords.get(0), importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSQL);
ps.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
conditionColumns = RecordUtil.extractConditionColumns(each, importerConfig.getShardingColumnsMap().get(each.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
ps.setObject(i + 1, conditionColumns.get(i).getValue());
}
ps.addBatch();
}
ps.execute();
ps.executeBatch();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Data Record merger.
*/
public class DataRecordMerger {
/**
* Merge data record.
* insert + insert -> exception
* update + insert -> exception
* delete + insert -> insert
* insert + update -> insert
* update + update -> update
* delete + update -> exception
* insert + delete -> delete
* update + delete -> delete
* delete + delete -> exception
*
* @param dataRecords data records
* @return merged data records
*/
public List<DataRecord> merge(final List<DataRecord> dataRecords) {
Map<DataRecord.Key, DataRecord> result = new HashMap<>();
dataRecords.forEach(dataRecord -> {
if (ScalingConstant.INSERT.equals(dataRecord.getType())) {
mergeInsert(dataRecord, result);
} else if (ScalingConstant.UPDATE.equals(dataRecord.getType())) {
mergeUpdate(dataRecord, result);
} else if (ScalingConstant.DELETE.equals(dataRecord.getType())) {
mergeDelete(dataRecord, result);
}
});
return new ArrayList<>(result.values());
}
/**
* Group by table and type.
*
* @param dataRecords data records
* @return grouped data records
*/
public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
List<DataRecord> mergedDataRecords = merge(dataRecords);
List<GroupedDataRecord> result = new ArrayList<>(100);
Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
for (Map.Entry<String, List<DataRecord>> each : tableGroup.entrySet()) {
Map<String, List<DataRecord>> typeGroup = each.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
result.add(new GroupedDataRecord(each.getKey(),
typeGroup.get(ScalingConstant.INSERT),
typeGroup.get(ScalingConstant.UPDATE),
typeGroup.get(ScalingConstant.DELETE)));
}
return result;
}
private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
if (null != beforeDataRecord && !ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
}
dataRecords.put(dataRecord.getKey(), dataRecord);
}
private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord)
? dataRecords.get(dataRecord.getOldKey())
: dataRecords.get(dataRecord.getKey());
if (null == beforeDataRecord) {
dataRecords.put(dataRecord.getKey(), dataRecord);
return;
}
if (ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
throw new UnsupportedOperationException();
}
if (checkUpdatedPrimaryKey(dataRecord) && dataRecords.containsKey(dataRecord.getOldKey())) {
dataRecords.remove(dataRecord.getOldKey());
}
if (ScalingConstant.INSERT.equals(beforeDataRecord.getType())) {
DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
mergedDataRecord.setTableName(dataRecord.getTableName());
mergedDataRecord.setType(ScalingConstant.INSERT);
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
return;
}
if (ScalingConstant.UPDATE.equals(beforeDataRecord.getType())) {
DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
mergedDataRecord.setTableName(dataRecord.getTableName());
mergedDataRecord.setType(ScalingConstant.UPDATE);
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
return;
}
}
private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
if (null != beforeDataRecord && (ScalingConstant.DELETE.equals(beforeDataRecord.getType()))) {
throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
}
if (null != beforeDataRecord && ScalingConstant.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
// primary key updated + delete
DataRecord mergedDataRecord = new DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
mergedDataRecord.addColumn(new Column(
dataRecord.getColumn(i).getName(),
dataRecord.getColumn(i).isPrimaryKey()
? beforeDataRecord.getColumn(i).getOldValue()
: beforeDataRecord.getColumn(i).getValue(),
true,
dataRecord.getColumn(i).isPrimaryKey()
));
}
mergedDataRecord.setTableName(dataRecord.getTableName());
mergedDataRecord.setType(ScalingConstant.DELETE);
dataRecords.remove(beforeDataRecord.getKey());
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
} else {
dataRecords.put(dataRecord.getKey(), dataRecord);
}
}
private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
return RecordUtil.extractPrimaryColumns(dataRecord).stream().anyMatch(each -> each.isUpdated());
}
private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord curDataRecord) {
DataRecord result = new DataRecord(curDataRecord.getPosition(), curDataRecord.getColumnCount());
for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
result.addColumn(new Column(
curDataRecord.getColumn(i).getName(),
preDataRecord.getColumn(i).isPrimaryKey()
? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
: null,
curDataRecord.getColumn(i).getValue(),
preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
curDataRecord.getColumn(i).isPrimaryKey()
));
}
return result;
}
private Object mergePrimaryKeyOldValue(final Column beforeColumn, final Column column) {
return beforeColumn.isUpdated()
? beforeColumn.getOldValue()
: (column.isUpdated() ? column.getOldValue() : null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
@RequiredArgsConstructor
public class UnexpectedDataRecordOrder extends RuntimeException {
private final DataRecord beforeDataRecord;
private final DataRecord afterDataRecord;
}
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.scaling.core.job.position.Position;
......@@ -41,6 +42,8 @@ public final class DataRecord extends Record {
private final List<Object> primaryKeyValue = new LinkedList<>();
private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
private String type;
private String tableName;
......@@ -59,6 +62,7 @@ public final class DataRecord extends Record {
columns.add(data);
if (data.isPrimaryKey()) {
primaryKeyValue.add(data.getValue());
oldPrimaryKeyValues.add(data.getOldValue());
}
}
......@@ -80,4 +84,31 @@ public final class DataRecord extends Record {
public Column getColumn(final int index) {
return columns.get(index);
}
/**
* Get key.
*
* @return key
*/
public Key getKey() {
return new Key(tableName, primaryKeyValue);
}
/**
* Get old key.
*
* @return key
*/
public Key getOldKey() {
return new Key(tableName, oldPrimaryKeyValues);
}
@EqualsAndHashCode
@RequiredArgsConstructor
public static class Key {
private final String tableName;
private final List<Object> primaryKeyValues;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.execute.executor.record;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.List;
@Getter
@RequiredArgsConstructor
public class GroupedDataRecord {
private final String tableName;
private final List<DataRecord> insertDataRecords;
private final List<DataRecord> updateDataRecords;
private final List<DataRecord> deleteDataRecords;
}
......@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
......@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -103,12 +104,12 @@ public final class AbstractJDBCImporterTest {
DataRecord insertRecord = getDataRecord("INSERT");
when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(insertRecord));
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).setObject(3, "INSERT");
verify(preparedStatement).execute();
verify(preparedStatement).addBatch();
}
@Test
......@@ -116,11 +117,11 @@ public final class AbstractJDBCImporterTest {
DataRecord deleteRecord = getDataRecord("DELETE");
when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).execute();
verify(preparedStatement).addBatch();
}
@Test
......@@ -128,7 +129,7 @@ public final class AbstractJDBCImporterTest {
DataRecord updateRecord = getDataRecord("UPDATE");
when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 10);
verify(preparedStatement).setObject(2, "UPDATE");
......@@ -142,7 +143,7 @@ public final class AbstractJDBCImporterTest {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Lists;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertThat;
public class DataRecordMergerTest {
private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
private DataRecord beforeDataRecord;
private DataRecord afterDataRecord;
private Collection<DataRecord> actual;
@Test(expected = UnexpectedDataRecordOrder.class)
public void assertInsertBeforeInsert() {
beforeDataRecord = mockInsertDataRecord(1, 1, 1);
afterDataRecord = mockInsertDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
}
@Test(expected = UnexpectedDataRecordOrder.class)
public void assertUpdateBeforeInsert() {
beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
afterDataRecord = mockInsertDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
}
@Test
public void assertDeleteBeforeInsert() {
beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
afterDataRecord = mockInsertDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
public void assertInsertBeforeUpdate() {
beforeDataRecord = mockInsertDataRecord(1, 1, 1);
afterDataRecord = mockUpdateDataRecord(1, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
assertThat(dataRecord.getColumn(0).getValue(), is(1));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
public void assertInsertBeforeUpdatePrimaryKey() {
beforeDataRecord = mockInsertDataRecord(1, 1, 1);
afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
assertThat(dataRecord.getColumn(0).getValue(), is(2));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
public void assertUpdateBeforeUpdate() {
beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
afterDataRecord = mockUpdateDataRecord(1, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
assertThat(dataRecord.getColumn(0).getValue(), is(1));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
public void assertUpdateBeforeUpdatePrimaryKey() {
beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
assertThat(dataRecord.getColumn(0).getValue(), is(2));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
public void assertUpdatePrimaryKeyBeforeUpdate() {
beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
afterDataRecord = mockUpdateDataRecord(2, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
assertThat(dataRecord.getColumn(0).getValue(), is(2));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
public void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
assertThat(dataRecord.getColumn(0).getValue(), is(3));
assertThat(dataRecord.getColumn(1).getValue(), is(2));
assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test(expected = UnsupportedOperationException.class)
public void assertDeleteBeforeUpdate() {
beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
afterDataRecord = mockUpdateDataRecord(1, 2, 2);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
}
@Test
public void assertInsertBeforeDelete() {
beforeDataRecord = mockInsertDataRecord(1, 1, 1);
afterDataRecord = mockDeleteDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
public void assertUpdateBeforeDelete() {
beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
afterDataRecord = mockDeleteDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
public void assertUpdatePrimaryKeyBeforeDelete() {
beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
afterDataRecord = mockDeleteDataRecord(2, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(ScalingConstant.DELETE));
assertThat(dataRecord.getTableName(), is("order"));
assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
assertThat(dataRecord.getColumn(0).getValue(), is(1));
assertThat(dataRecord.getColumn(1).getValue(), is(1));
assertThat(dataRecord.getColumn(2).getValue(), is(1));
}
@Test(expected = UnexpectedDataRecordOrder.class)
public void assertDeleteBeforeDelete() {
beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
afterDataRecord = mockDeleteDataRecord(1, 1, 1);
actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
}
@Test
public void assertGroup() {
List<DataRecord> dataRecords = mockDataRecords();
List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
assertThat(groupedDataRecords.size(), is(2));
assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), is(1));
assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), is(1));
assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), is(1));
}
private List<DataRecord> mockDataRecords() {
return Lists.newArrayList(
mockInsertDataRecord("t1", 1, 1, 1),
mockUpdateDataRecord("t1", 1, 2, 1),
mockUpdateDataRecord("t1", 1, 2, 2),
mockUpdateDataRecord("t1", 2, 1, 1),
mockUpdateDataRecord("t1", 2, 2, 1),
mockUpdateDataRecord("t1", 2, 2, 2),
mockDeleteDataRecord("t1", 3, 1, 1),
mockInsertDataRecord("t2", 1, 1, 1)
);
}
private DataRecord mockInsertDataRecord(final int id, final int userId, final int totalPrice) {
return mockInsertDataRecord("order", id, userId, totalPrice);
}
private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
DataRecord result = new DataRecord(new NopPosition(), 3);
result.setType(ScalingConstant.INSERT);
result.setTableName(tableName);
result.addColumn(new Column("id", id, true, true));
result.addColumn(new Column("user_id", userId, true, false));
result.addColumn(new Column("total_price", totalPrice, true, false));
return result;
}
private DataRecord mockUpdateDataRecord(final int id, final int userId, final int totalPrice) {
return mockUpdateDataRecord("order", null, id, userId, totalPrice);
}
private DataRecord mockUpdateDataRecord(final Integer oldId, final int id, final int userId, final int totalPrice) {
return mockUpdateDataRecord("order", oldId, id, userId, totalPrice);
}
private DataRecord mockUpdateDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
return mockUpdateDataRecord(tableName, null, id, userId, totalPrice);
}
private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) {
DataRecord result = new DataRecord(new NopPosition(), 3);
result.setType(ScalingConstant.UPDATE);
result.setTableName(tableName);
result.addColumn(new Column("id", oldId, id, null != oldId, true));
result.addColumn(new Column("user_id", userId, true, false));
result.addColumn(new Column("total_price", totalPrice, true, false));
return result;
}
private DataRecord mockDeleteDataRecord(final int id, final int userId, final int totalPrice) {
return mockDeleteDataRecord("order", id, userId, totalPrice);
}
private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
DataRecord preDataRecord = new DataRecord(new NopPosition(), 3);
preDataRecord.setType(ScalingConstant.DELETE);
preDataRecord.setTableName(tableName);
preDataRecord.addColumn(new Column("id", id, true, true));
preDataRecord.addColumn(new Column("user_id", userId, true, false));
preDataRecord.addColumn(new Column("total_price", totalPrice, true, false));
return preDataRecord;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.execute.executor.record;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class DataRecordTest {
private DataRecord beforeDataRecord;
private DataRecord afterDataRecord;
@Test
public void assertKeyEqual() {
beforeDataRecord = new DataRecord(new NopPosition(), 2);
beforeDataRecord.setTableName("t1");
beforeDataRecord.addColumn(new Column("id", 1, true, true));
beforeDataRecord.addColumn(new Column("name", "1", true, false));
afterDataRecord = new DataRecord(new NopPosition(), 2);
afterDataRecord.setTableName("t1");
afterDataRecord.addColumn(new Column("id", 1, true, true));
afterDataRecord.addColumn(new Column("name", "2", true, false));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
}
@Test
public void assertOldKeyEqual() {
beforeDataRecord = new DataRecord(new NopPosition(), 2);
beforeDataRecord.setTableName("t1");
beforeDataRecord.addColumn(new Column("id", 1, true, true));
beforeDataRecord.addColumn(new Column("name", "1", true, false));
afterDataRecord = new DataRecord(new NopPosition(), 2);
afterDataRecord.setTableName("t1");
afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
afterDataRecord.addColumn(new Column("name", "2", true, false));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册