未验证 提交 eec74213 编写于 作者: S Serge Rider 提交者: GitHub

Merge pull request #14366 from dbeaver/postgresql_copy

#8994  Postgresql copy
......@@ -59,6 +59,8 @@ public class DTUIMessages extends NLS {
public static String database_consumer_wizard_checkbox_multi_insert_skip_bind_values_description;
public static String database_consumer_wizard_disable_import_batches_label;
public static String database_consumer_wizard_disable_import_batches_description;
public static String database_consumer_wizard_use_bulk_load_label;
public static String database_consumer_wizard_use_bulk_load_description;
public static String database_consumer_wizard_on_duplicate_key_insert_method_text;
public static String database_consumer_wizard_link_label_replace_method_wiki;
public static String database_consumer_wizard_label_replace_method_not_supported;
......
......@@ -34,6 +34,8 @@ database_consumer_wizard_checkbox_multi_insert_skip_bind_values_label = Skip bin
database_consumer_wizard_checkbox_multi_insert_skip_bind_values_description = Use skip bind values if you want to directly set the values instead of binding them. \nIt gives better performance, but opens up chances of SQL injection attack.\nUse this option, if you are certain about the file's contents.
database_consumer_wizard_disable_import_batches_label = Disable batches
database_consumer_wizard_disable_import_batches_description = Disable the use of batch imports. Import row by row.\nEnabling this function will show all import errors, but make the import process slower.
database_consumer_wizard_use_bulk_load_label = Use bulk load
database_consumer_wizard_use_bulk_load_description = Use database bulk load. Ignores transaction settings and loads entire dataset using database-native tool.
database_consumer_wizard_on_duplicate_key_insert_method_text = Replace method
database_consumer_wizard_link_label_replace_method_wiki = Replace/Ignore method documentation
database_consumer_wizard_label_replace_method_not_supported = Replace method not supported by target database
......
......@@ -29,6 +29,8 @@ database_consumer_wizard_checkbox_multi_insert_description = \u0418\u0441\u043F\
database_consumer_wizard_spinner_multi_insert_batch_size = \u0420\u0430\u0437\u043C\u0435\u0440 \u043F\u0430\u043A\u0435\u0442\u0430 \u043C\u0443\u043B\u044C\u0442\u0438\u0432\u0441\u0442\u0430\u0432\u043A\u0438
database_consumer_wizard_disable_import_batches_label = \u041E\u0442\u043A\u043B\u044E\u0447\u0438\u0442\u044C \u043F\u0430\u043A\u0435\u0442\u043D\u044B\u0439 \u0438\u043C\u043F\u043E\u0440\u0442
database_consumer_wizard_disable_import_batches_description = \u041E\u0442\u043A\u043B\u044E\u0447\u0438\u0442\u044C \u043F\u0430\u043A\u0435\u0442\u043D\u044B\u0439 \u0438\u043C\u043F\u043E\u0440\u0442. \u0418\u043C\u043F\u043E\u0440\u0442 \u0431\u0443\u0434\u0435\u0442 \u043F\u0440\u043E\u0438\u0441\u0445\u043E\u0434\u0438\u0442\u044C \u043F\u043E\u0441\u0442\u0440\u043E\u0447\u043D\u043E.\n\u0412\u043A\u043B\u044E\u0447\u0435\u043D\u0438\u0435 \u044D\u0442\u043E\u0439 \u0444\u0443\u043D\u043A\u0446\u0438\u0438 \u043F\u043E\u043A\u0430\u0436\u0435\u0442 \u0432\u0441\u0435 \u043E\u0448\u0438\u0431\u043A\u0438 \u0438\u043C\u043F\u043E\u0440\u0442\u0430, \u043D\u043E \u0437\u0430\u043C\u0435\u0434\u043B\u0438\u0442 \u043F\u0440\u043E\u0446\u0435\u0441\u0441.
database_consumer_wizard_use_bulk_load_label = \u0418\u0441\u043F\u043E\u043B\u044C\u0437\u043E\u0432\u0430\u0442\u044C \u043C\u0430\u0441\u0441\u043E\u0432\u0443\u044E \u0437\u0430\u0433\u0440\u0443\u0437\u043A\u0443
database_consumer_wizard_use_bulk_load_description = \u0418\u0441\u043F\u043E\u043B\u044C\u0437\u043E\u0432\u0430\u0442\u044C \u043C\u0430\u0441\u0441\u043E\u0432\u0443\u044E \u0437\u0430\u0433\u0440\u0443\u0437\u043A\u0443. \u0418\u0433\u043D\u043E\u0440\u0438\u0440\u0443\u0435\u0442 \u043D\u0430\u0441\u0442\u0440\u043E\u0439\u043A\u0438 \u0442\u0440\u0430\u043D\u0437\u0430\u043A\u0446\u0438\u0438 \u0438 \u0437\u0430\u0433\u0440\u0443\u0436\u0430\u0435\u0442 \u0434\u0430\u0442\u0430\u0441\u0435\u0442 \u0446\u0435\u043B\u0438\u043A\u043E\u043C \u0438\u0441\u043F\u043E\u043B\u044C\u0437\u0443\u044F \u0432\u043D\u0443\u0442\u0440\u0435\u043D\u043D\u0438\u0435 \u0438\u043D\u0441\u0442\u0440\u0443\u043C\u0435\u043D\u0442\u044B \u0411\u0414.
database_consumer_wizard_on_duplicate_key_insert_method_text = \u041C\u0435\u0442\u043E\u0434\u044B \u0437\u0430\u043C\u0435\u043D\u044B
database_consumer_wizard_link_label_replace_method_wiki = \u0414\u043E\u043A\u0443\u043C\u0435\u043D\u0442\u0430\u0446\u0438\u044F \u043C\u0435\u0442\u043E\u0434\u043E\u0432 \u0438\u0433\u043D\u043E\u0440\u0438\u0440\u043E\u0432\u0430\u043D\u0438\u044F/\u0437\u0430\u043C\u0435\u043D\u044B
database_consumer_wizard_label_replace_method_not_supported = \u041C\u0435\u0442\u043E\u0434 \u0437\u0430\u043C\u0435\u043D\u044B \u043D\u0435 \u043F\u043E\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0435\u0442\u0441\u044F \u0434\u0430\u043D\u043D\u043E\u0439 \u0411\u0414
......
......@@ -26,10 +26,12 @@ import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBPDataSource;
import org.jkiss.dbeaver.model.DBPReferentialIntegrityController;
import org.jkiss.dbeaver.model.DBUtils;
import org.jkiss.dbeaver.model.navigator.DBNDatabaseNode;
import org.jkiss.dbeaver.model.sql.registry.SQLDialectDescriptor;
import org.jkiss.dbeaver.model.sql.registry.SQLDialectRegistry;
import org.jkiss.dbeaver.model.sql.registry.SQLInsertReplaceMethodDescriptor;
import org.jkiss.dbeaver.model.struct.DBSDataBulkLoader;
import org.jkiss.dbeaver.model.struct.DBSDataManipulator;
import org.jkiss.dbeaver.tools.transfer.database.DatabaseConsumerSettings;
import org.jkiss.dbeaver.tools.transfer.database.DatabaseMappingContainer;
......@@ -59,9 +61,11 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
private Group loadSettings;
private String disableReferentialIntegrityCheckboxTooltip;
private boolean isDisablingReferentialIntegritySupported;
private Spinner multiRowInsertBatch;
private Text multiRowInsertBatch;
private Button skipBindValues;
private Button useBatchCheck;
private Button useBulkLoadCheck;
private List<SQLInsertReplaceMethodDescriptor> availableInsertMethodsDescriptors;
public DatabaseConsumerPageLoadSettings() {
super(DTUIMessages.database_consumer_wizard_name);
......@@ -125,7 +129,19 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
UIUtils.createControlLabel(loadSettings, DTUIMessages.database_consumer_wizard_on_duplicate_key_insert_method_text);
onDuplicateKeyInsertMethods = new Combo(loadSettings, SWT.DROP_DOWN | SWT.READ_ONLY);
onDuplicateKeyInsertMethods.setLayoutData(new GridData(GridData.HORIZONTAL_ALIGN_BEGINNING));
onDuplicateKeyInsertMethods.setLayoutData(new GridData(GridData.FILL_HORIZONTAL));
onDuplicateKeyInsertMethods.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
int selIndex = onDuplicateKeyInsertMethods.getSelectionIndex();
if (selIndex > 0 && !CommonUtils.isEmpty(availableInsertMethodsDescriptors)) {
SQLInsertReplaceMethodDescriptor methodDescriptor = availableInsertMethodsDescriptors.get(selIndex - 1);
settings.setOnDuplicateKeyInsertMethodId(methodDescriptor.getId());
} else {
settings.setOnDuplicateKeyInsertMethodId(onDuplicateKeyInsertMethods.getText());
}
}
});
Link urlLabel = UIUtils.createLink(loadSettings, "<a href=\"" + HelpUtils.getHelpExternalReference(HELP_TOPIC_REPLACE_METHOD) + "\">"
+ DTUIMessages.database_consumer_wizard_link_label_replace_method_wiki + "</a>", new SelectionAdapter() {
......@@ -179,18 +195,17 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
}
});
final Spinner commitAfterEdit = UIUtils.createLabelSpinner(performanceSettings, DTUIMessages.database_consumer_wizard_commit_spinner_label, settings.getCommitAfterRows(), 1, Integer.MAX_VALUE);
commitAfterEdit.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
settings.setCommitAfterRows(commitAfterEdit.getSelection());
}
});
commitAfterEdit.setLayoutData(new GridData(GridData.HORIZONTAL_ALIGN_BEGINNING, GridData.VERTICAL_ALIGN_BEGINNING, false, false, 3, 1));
final Text commitAfterEdit = UIUtils.createLabelText(performanceSettings, DTUIMessages.database_consumer_wizard_commit_spinner_label, String.valueOf(settings.getCommitAfterRows()), SWT.BORDER);
commitAfterEdit.addModifyListener(e -> settings.setCommitAfterRows(CommonUtils.toInt(commitAfterEdit.getText())));
GridData gd = new GridData(GridData.HORIZONTAL_ALIGN_BEGINNING, GridData.VERTICAL_ALIGN_BEGINNING, false, false, 3, 1);
gd.widthHint = UIUtils.getFontHeight(commitAfterEdit) * 6;
commitAfterEdit.setLayoutData(gd);
final Button useMultiRowInsert = UIUtils.createCheckbox(performanceSettings, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_label, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_description, settings.isUseMultiRowInsert(), 4);
if (useBatchCheck != null && ((!useBatchCheck.isDisposed() && useBatchCheck.getSelection())
|| (useBatchCheck.isDisposed() && settings.isDisableUsingBatches()))) {
final Button useMultiRowInsert = UIUtils.createCheckbox(performanceSettings, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_label, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_description, settings.isUseMultiRowInsert(), 1);
if (useBatchCheck != null && (
(!useBatchCheck.isDisposed() && useBatchCheck.getSelection()) ||
(useBatchCheck.isDisposed() && settings.isDisableUsingBatches())))
{
useMultiRowInsert.setEnabled(false);
}
useMultiRowInsert.addSelectionListener(new SelectionAdapter() {
......@@ -207,17 +222,16 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
}
});
multiRowInsertBatch = UIUtils.createLabelSpinner(performanceSettings, DTUIMessages.database_consumer_wizard_spinner_multi_insert_batch_size, settings.getMultiRowInsertBatch(), 2, Integer.MAX_VALUE);
multiRowInsertBatch = new Text(performanceSettings, SWT.BORDER);
multiRowInsertBatch.setToolTipText(DTUIMessages.database_consumer_wizard_spinner_multi_insert_batch_size);
gd = new GridData(GridData.FILL_HORIZONTAL);
gd.horizontalSpan = 3;
multiRowInsertBatch.setLayoutData(gd);
multiRowInsertBatch.setText(String.valueOf(settings.getMultiRowInsertBatch()));
if (!useMultiRowInsert.getSelection() || useBatchCheck != null && !useBatchCheck.isDisposed() && useBatchCheck.getSelection()) {
multiRowInsertBatch.setEnabled(false);
}
multiRowInsertBatch.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
settings.setMultiRowInsertBatch(multiRowInsertBatch.getSelection());
}
});
multiRowInsertBatch.setLayoutData(new GridData(GridData.HORIZONTAL_ALIGN_BEGINNING, GridData.VERTICAL_ALIGN_BEGINNING, false, false, 3, 1));
multiRowInsertBatch.addModifyListener(e -> settings.setMultiRowInsertBatch(CommonUtils.toInt(multiRowInsertBatch.getText())));
skipBindValues = UIUtils.createCheckbox(performanceSettings, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_skip_bind_values_label, DTUIMessages.database_consumer_wizard_checkbox_multi_insert_skip_bind_values_description, settings.isSkipBindValues(), 4);
skipBindValues.addSelectionListener(new SelectionAdapter() {
......@@ -227,7 +241,12 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
}
});
useBatchCheck = UIUtils.createCheckbox(performanceSettings, DTUIMessages.database_consumer_wizard_disable_import_batches_label, DTUIMessages.database_consumer_wizard_disable_import_batches_description, settings.isDisableUsingBatches(), 4);
useBatchCheck = UIUtils.createCheckbox(
performanceSettings,
DTUIMessages.database_consumer_wizard_disable_import_batches_label,
DTUIMessages.database_consumer_wizard_disable_import_batches_description,
settings.isDisableUsingBatches(),
4);
useBatchCheck.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
......@@ -242,6 +261,19 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
}
}
});
useBulkLoadCheck = UIUtils.createCheckbox(
performanceSettings,
DTUIMessages.database_consumer_wizard_use_bulk_load_label,
DTUIMessages.database_consumer_wizard_use_bulk_load_description,
settings.isUseBulkLoad(),
4);
useBulkLoadCheck.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
settings.setUseBulkLoad(useBulkLoadCheck.getSelection());
}
});
}
setControl(composite);
......@@ -309,11 +341,20 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
truncateTargetTable.setSelection(false);
settings.setTruncateBeforeLoad(false);
}
if (useBulkLoadCheck != null && !useBulkLoadCheck.isDisposed()) {
DBPDataSource dataSource = settings.getContainerNode() == null ? null : settings.getContainerNode().getDataSource();
useBulkLoadCheck.setEnabled(DBUtils.getAdapter(DBSDataBulkLoader.class, dataSource) != null);
}
loadInsertMethods();
}
private boolean confirmDataTruncate() {
Shell shell = getContainer().getShell();
if (shell == null) {
return true;
}
if (shell.isVisible() || getSettings().isTruncateBeforeLoad()) {
String tableNames = getWizard().getSettings().getDataPipes().stream().map(pipe -> pipe.getConsumer() == null ? "" : pipe.getConsumer().getObjectName()).collect(Collectors.joining(","));
String checkbox_question = NLS.bind(DTUIMessages.database_consumer_wizard_truncate_checkbox_question, tableNames);
......@@ -333,12 +374,14 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
}
DBPDataSource dataSource = containerNode.getDataSource();
List<SQLInsertReplaceMethodDescriptor> insertMethodsDescriptors = null;
if (dataSource != null) {
SQLDialectDescriptor dialectDescriptor = SQLDialectRegistry.getInstance().getDialect(dataSource.getSQLDialect().getDialectId());
insertMethodsDescriptors = dialectDescriptor.getSupportedInsertReplaceMethodsDescriptors();
}
onDuplicateKeyInsertMethods.removeAll();
onDuplicateKeyInsertMethods.add(DBSDataManipulator.INSERT_NONE_METHOD);
if (!CommonUtils.isEmpty(insertMethodsDescriptors)) {
boolean emptyButton = true;
......@@ -359,28 +402,9 @@ public class DatabaseConsumerPageLoadSettings extends DataTransferPageNodeSettin
} else {
onDuplicateKeyInsertMethods.setText(DBSDataManipulator.INSERT_NONE_METHOD);
onDuplicateKeyInsertMethods.setEnabled(false);
Label descLabel = new Label(loadSettings, SWT.NONE);
descLabel.setLayoutData(new GridData(GridData.HORIZONTAL_ALIGN_BEGINNING, GridData.VERTICAL_ALIGN_BEGINNING, false, false, 2, 1));
descLabel.setText(DTUIMessages.database_consumer_wizard_label_replace_method_not_supported);
if (!CommonUtils.isEmpty(settings.getOnDuplicateKeyInsertMethodId())) {
// May be this setting was used for another database
settings.setOnDuplicateKeyInsertMethodId(null);
}
}
List<SQLInsertReplaceMethodDescriptor> finalInsertMethodsDescriptors = insertMethodsDescriptors;
onDuplicateKeyInsertMethods.addSelectionListener(new SelectionAdapter() {
@Override
public void widgetSelected(SelectionEvent e) {
int selIndex = onDuplicateKeyInsertMethods.getSelectionIndex();
if (selIndex > 0 && !CommonUtils.isEmpty(finalInsertMethodsDescriptors)) {
SQLInsertReplaceMethodDescriptor methodDescriptor = finalInsertMethodsDescriptors.get(selIndex - 1);
settings.setOnDuplicateKeyInsertMethodId(methodDescriptor.getId());
} else {
settings.setOnDuplicateKeyInsertMethodId(onDuplicateKeyInsertMethods.getText());
}
}
});
availableInsertMethodsDescriptors = insertMethodsDescriptors;
}
@Override
......
......@@ -58,9 +58,10 @@ public class DatabaseConsumerSettings implements IDataTransferSettings {
private boolean truncateBeforeLoad = false;
private boolean openTableOnFinish = true;
private boolean useMultiRowInsert;
private int multiRowInsertBatch = 100;
private int multiRowInsertBatch = 500;
private boolean skipBindValues;
private boolean disableUsingBatches = false;
private boolean useBulkLoad = false;
private String onDuplicateKeyInsertMethodId;
private boolean disableReferentialIntegrity;
......@@ -196,6 +197,14 @@ public class DatabaseConsumerSettings implements IDataTransferSettings {
this.commitAfterRows = commitAfterRows;
}
public boolean isUseBulkLoad() {
return useBulkLoad;
}
public void setUseBulkLoad(boolean useBulkLoad) {
this.useBulkLoad = useBulkLoad;
}
@Nullable
public DBPDataSource getTargetDataSource(DatabaseMappingObject attrMapping) {
DBSObjectContainer container = getContainer();
......@@ -223,6 +232,7 @@ public class DatabaseConsumerSettings implements IDataTransferSettings {
disableUsingBatches = CommonUtils.getBoolean(settings.get("disableUsingBatches"), disableUsingBatches);
transferAutoGeneratedColumns = CommonUtils.getBoolean(settings.get("transferAutoGeneratedColumns"), transferAutoGeneratedColumns);
disableReferentialIntegrity = CommonUtils.getBoolean(settings.get("disableReferentialIntegrity"), disableReferentialIntegrity);
useBulkLoad = CommonUtils.getBoolean(settings.get("useBulkLoad"), useBulkLoad);
truncateBeforeLoad = CommonUtils.getBoolean(settings.get("truncateBeforeLoad"), truncateBeforeLoad);
openTableOnFinish = CommonUtils.getBoolean(settings.get("openTableOnFinish"), openTableOnFinish);
......@@ -292,6 +302,7 @@ public class DatabaseConsumerSettings implements IDataTransferSettings {
settings.put("onDuplicateKeyMethod", onDuplicateKeyInsertMethodId);
settings.put("transferAutoGeneratedColumns", transferAutoGeneratedColumns);
settings.put("disableReferentialIntegrity", disableReferentialIntegrity);
settings.put("useBulkLoad", useBulkLoad);
settings.put("truncateBeforeLoad", truncateBeforeLoad);
settings.put("openTableOnFinish", openTableOnFinish);
......@@ -325,6 +336,7 @@ public class DatabaseConsumerSettings implements IDataTransferSettings {
DTUtils.addSummary(summary, DTMessages.database_consumer_settings_option_on_duplicate_key_method_label, onDuplicateKeyInsertMethodId);
DTUtils.addSummary(summary, DTMessages.database_consumer_settings_option_transfer_auto_generated_columns, transferAutoGeneratedColumns);
DTUtils.addSummary(summary, DTMessages.database_consumer_settings_option_disable_referential_integrity, disableReferentialIntegrity);
DTUtils.addSummary(summary, DTMessages.database_consumer_settings_option_use_bulk_load, useBulkLoad);
DTUtils.addSummary(summary, DTMessages.database_consumer_settings_option_truncate_before_load, truncateBeforeLoad);
return summary.toString();
......
......@@ -258,12 +258,15 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
options.put(DBSDataManipulator.OPTION_SKIP_BIND_VALUES, settings.isSkipBindValues());
if (!isPreview) {
DBSDataBulkLoader bulkLoader = DBUtils.getAdapter(DBSDataBulkLoader.class, targetObject);
if (bulkLoader != null) {
try {
bulkLoadManager = bulkLoader.createBulkLoad(targetSession, attributes, executionSource, settings.getCommitAfterRows(), options);
} catch (Exception e) {
log.debug("Error creating bulk loader", e);
if (settings.isUseBulkLoad()) {
DBSDataBulkLoader bulkLoader = DBUtils.getAdapter(DBSDataBulkLoader.class, targetContext.getDataSource());
if (targetObject != null && bulkLoader != null) {
try {
bulkLoadManager = bulkLoader.createBulkLoad(
targetSession, targetObject, attributes, executionSource, settings.getCommitAfterRows(), options);
} catch (Exception e) {
log.debug("Error creating bulk loader", e);
}
}
}
if (bulkLoadManager == null) {
......@@ -345,7 +348,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
}
if (bulkLoadManager != null) {
bulkLoadManager.addRow(session, rowValues);
bulkLoadManager.addRow(targetSession, rowValues);
} else {
executeBatch.add(rowValues);
}
......@@ -366,6 +369,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
if (needCommit) {
bulkLoadManager.flushRows(targetSession);
}
return;
} else {
boolean disableUsingBatches = settings.isDisableUsingBatches();
if ((needCommit || disableUsingBatches) && executeBatch != null) {
......@@ -452,8 +456,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
insertBatch(true);
}
if (bulkLoadManager != null) {
bulkLoadManager.close();
bulkLoadManager = null;
bulkLoadManager.finishBulkLoad(targetSession);
} else if (executeBatch != null) {
executeBatch.close();
executeBatch = null;
......@@ -510,7 +513,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
}
}
DBSObject checkTargetContainer(DBRProgressMonitor monitor) throws DBException {
private DBSObject checkTargetContainer(DBRProgressMonitor monitor) throws DBException {
DBSDataManipulator targetObject = getTargetObject();
if (targetObject == null) {
if (settings.getContainerNode() != null && settings.getContainerNode().getDataSource() == null) {
......@@ -549,6 +552,11 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
targetContext.close();
targetContext = null;
}
if (bulkLoadManager != null) {
bulkLoadManager.close();
bulkLoadManager = null;
}
}
@Override
......
......@@ -132,6 +132,7 @@ public class DTMessages extends NLS {
public static String database_consumer_settings_option_on_duplicate_key_method_label;
public static String database_consumer_settings_option_transfer_auto_generated_columns;
public static String database_consumer_settings_option_disable_referential_integrity;
public static String database_consumer_settings_option_use_bulk_load;
public static String database_consumer_settings_option_truncate_before_load;
public static String data_transfer_settings_title_find_producer;
......
......@@ -105,6 +105,7 @@ database_consumer_settings_option_use_transactions = Use transactions
database_consumer_settings_option_commit_after = Do Commit after row insert
database_consumer_settings_option_transfer_auto_generated_columns = Transfer auto-generated columns
database_consumer_settings_option_disable_referential_integrity = Disable referential integrity
database_consumer_settings_option_use_bulk_load = Use bulk load
database_consumer_settings_option_truncate_before_load = Truncate before load
database_consumer_settings_option_use_multi_insert = Use multi-row Insert
database_consumer_settings_option_multi_insert_batch = Multi-row insert batch size
......
......@@ -19,6 +19,7 @@ package org.jkiss.dbeaver.tools.transfer.stream.importer;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBFetchProgress;
import org.jkiss.dbeaver.model.DBPDataKind;
import org.jkiss.dbeaver.model.DBPDataSource;
import org.jkiss.dbeaver.model.DBUtils;
......@@ -270,8 +271,8 @@ public class DataImporterCSV extends StreamImporterAbstract {
consumer.fetchRow(producerSession, resultSet);
lineNum++;
if (lineNum % 1000 == 0) {
monitor.subTask(String.valueOf(lineNum) + " rows processed");
if (DBFetchProgress.monitorFetchProgress(lineNum)) {
monitor.subTask(lineNum + " rows processed");
}
}
}
......
......@@ -143,6 +143,11 @@ public class PostgreServerGreenplum extends PostgreServerExtensionBase {
return true;
}
@Override
public boolean supportsCopyFromStdIn() {
return true;
}
@Override
public boolean supportsExternalTypes() {
return true;
......
......@@ -15,7 +15,7 @@ Require-Bundle: org.jkiss.dbeaver.model,
com.sun.jna.platform,
org.jkiss.dbeaver.model.sql
Bundle-ActivationPolicy: lazy
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Bundle-RequiredExecutionEnvironment: JavaSE-11
Bundle-Vendor: %Bundle-Vendor
Bundle-ClassPath: .
Export-Package: org.jkiss.dbeaver.ext.postgresql,
......
......@@ -18,56 +18,200 @@ package org.jkiss.dbeaver.ext.postgresql.model;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBPEvaluationContext;
import org.jkiss.dbeaver.model.DBUtils;
import org.jkiss.dbeaver.model.data.DBDDisplayFormat;
import org.jkiss.dbeaver.model.data.DBDValueHandler;
import org.jkiss.dbeaver.model.exec.DBCException;
import org.jkiss.dbeaver.model.exec.DBCExecutionSource;
import org.jkiss.dbeaver.model.exec.DBCSession;
import org.jkiss.dbeaver.model.exec.DBCTransactionManager;
import org.jkiss.dbeaver.model.exec.jdbc.JDBCSession;
import org.jkiss.dbeaver.model.struct.DBSAttributeBase;
import org.jkiss.dbeaver.model.struct.DBSDataBulkLoader;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.runtime.DBWorkbench;
import org.jkiss.utils.ArrayUtils;
import org.jkiss.utils.CommonUtils;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
/**
* Bulk loader based on CopyManager
*
* // new CopyManager((BaseConnection) conn)
* // .copyIn(
* // "COPY table1 FROM STDIN (FORMAT csv)",
* // new BufferedReader(new FileReader("data.csv"))
* // );
*/
public class PostgreCopyLoader implements DBSDataBulkLoader, DBSDataBulkLoader.BulkLoadManager {
private static final Log log = Log.getLog(PostgreCopyLoader.class);
private final PostgreTableReal table;
private final PostgreDataSource dataSource;
private PostgreTableReal table;
private Object copyManager;
private Method copyInMethod;
private Writer csvWriter;
private File csvFile;
private AttrMapping[] mappings;
private int copyBufferSize = 100 * 1024;
public PostgreCopyLoader(PostgreTableReal table) {
this.table = table;
private static class AttrMapping {
PostgreTableColumn tableAttr;
DBDValueHandler valueHandler;
int srcPos;
AttrMapping(PostgreTableColumn tableAttr, DBDValueHandler valueHandler, int srcPos) {
this.tableAttr = tableAttr;
this.valueHandler = valueHandler;
this.srcPos = srcPos;
}
}
public PostgreCopyLoader(PostgreDataSource dataSource) {
this.dataSource = dataSource;
}
@NotNull
@Override
public BulkLoadManager createBulkLoad(@NotNull DBCSession session, @NotNull DBSAttributeBase[] attributes, @NotNull DBCExecutionSource source, int batchSize, Map<String, Object> options) throws DBCException {
public BulkLoadManager createBulkLoad(
@NotNull DBCSession session,
@NotNull DBSDataContainer dataContainer,
@NotNull DBSAttributeBase[] attributes,
@NotNull DBCExecutionSource source,
int batchSize,
Map<String, Object> options) throws DBCException
{
this.table = (PostgreTableReal) dataContainer;
try {
Object driverInstance = session.getDataSource().getContainer().getDriver().getDriverInstance(session.getProgressMonitor());
Object copyManager = Class.forName("CopyManager", true, driverInstance.getClass().getClassLoader()).getConstructor().newInstance();
// Use reflection to create copy manager
Connection pgConnection = ((JDBCSession) session).getOriginal();
ClassLoader driverClassLoader = pgConnection.getClass().getClassLoader();
Class<?> baseConnectionClass = Class.forName("org.postgresql.core.BaseConnection", true, driverClassLoader);
Class<?> copyManagerClass = Class.forName("org.postgresql.copy.CopyManager", true, driverClassLoader);
// Get method copyIn(final String sql, Reader from, int bufferSize)
copyInMethod = copyManagerClass.getMethod("copyIn", String.class, Reader.class, Integer.TYPE);
copyManager = copyManagerClass.getConstructor(baseConnectionClass).newInstance(pgConnection);
File tempFolder = DBWorkbench.getPlatform().getTempFolder(session.getProgressMonitor(), "postgesql-copy-datasets");
csvFile = new File(tempFolder, CommonUtils.escapeFileName(table.getFullyQualifiedName(DBPEvaluationContext.DML)) + "-" + System.currentTimeMillis() + ".csv"); //$NON-NLS-1$ //$NON-NLS-2$
if (!csvFile.createNewFile()){
throw new IOException("Can't create CSV file " + csvFile.getAbsolutePath());
}
csvWriter = new BufferedWriter(
new FileWriter(csvFile, StandardCharsets.UTF_8),
copyBufferSize
);
List<? extends PostgreTableColumn> tableAttrs = CommonUtils.safeList(table.getAttributes(session.getProgressMonitor()));
tableAttrs.removeIf(a -> a.getOrdinalPosition() < 0);
mappings = new AttrMapping[tableAttrs.size()];
for (int i = 0; i < tableAttrs.size(); i++) {
PostgreTableColumn attr = tableAttrs.get(i);
DBDValueHandler valueHandler = DBUtils.findValueHandler(session, attr);
AttrMapping mapping = new AttrMapping(
attr,
valueHandler,
ArrayUtils.indexOf(attributes, attr)
);
mappings[i] = mapping;
}
} catch (Exception e) {
throw new DBCException("Can't instantiate CopyManager", e);
}
return this;
// new CopyManager((BaseConnection) conn)
// .copyIn(
// "COPY table1 FROM STDIN (FORMAT csv, HEADER)",
// new BufferedReader(new FileReader("data.csv"))
// ); throw new DBCException("Not implemented");
}
@Override
public void addRow(@NotNull DBCSession session, @NotNull Object[] attributeValues) throws DBCException {
StringBuilder line = new StringBuilder();
for (int i = 0; i < mappings.length; i++) {
if (i > 0) {
line.append(",");
}
AttrMapping attr = mappings[i];
if (attr.srcPos >= 0) {
String strValue = attr.valueHandler.getValueDisplayString(
attr.tableAttr, attributeValues[attr.srcPos], DBDDisplayFormat.NATIVE);
line.append(strValue);
}
}
line.append("\n");
try {
csvWriter.write(line.toString());
} catch (IOException e) {
throw new DBCException("Error writing CSV line", e);
}
}
@Override
public void flushRows(@NotNull DBCSession session) throws DBCException {
try {
csvWriter.flush();
} catch (IOException e) {
throw new DBCException("Error saving CSV data", e);
}
}
@Override
public void flushRows(@NotNull DBCSession session) {
public void finishBulkLoad(@NotNull DBCSession session) throws DBCException {
try {
csvWriter.flush();
csvWriter.close();
} catch (IOException e) {
log.debug(e);
}
csvWriter = null;
String tableFQN = table.getFullyQualifiedName(DBPEvaluationContext.DML);
session.getProgressMonitor().subTask("Copy into " + tableFQN);
String queryText = "COPY " + tableFQN + " FROM STDIN (FORMAT csv)";
try {
Reader csvReader = new FileReader(csvFile, StandardCharsets.UTF_8);
Object rowCount = copyInMethod.invoke(copyManager, queryText, csvReader, copyBufferSize);
// Commit changes
DBCTransactionManager txnManager = DBUtils.getTransactionManager(session.getExecutionContext());
if (txnManager != null && !txnManager.isAutoCommit()) {
session.getProgressMonitor().subTask("Commit COPY");
txnManager.commit(session);
}
log.debug("CSV has been imported (" + rowCount + ")");
} catch (Throwable e) {
if (e instanceof InvocationTargetException) {
e = ((InvocationTargetException) e).getTargetException();
}
throw new DBCException("Error copying dataset on remote server", e);
}
}
@Override
public void close() {
public void close() {
if (csvFile != null && csvFile.exists()) {
if (!csvFile.delete()) {
log.debug("Error deleting CSV file " + csvFile.getAbsolutePath());
csvFile.deleteOnExit();
}
}
}
}
......@@ -481,8 +481,14 @@ public class PostgreDataSource extends JDBCDataSource implements DBPDataTypeMapp
return adapter.cast(new PostgreSessionManager(this));
} else if (adapter == DBCQueryPlanner.class) {
return adapter.cast(new PostgreQueryPlaner(this));
} else if (getServerType().supportsAlterUserChangePassword() && adapter == DBAUserChangePassword.class) {
return adapter.cast(new PostgresUserChangePassword(this));
} else if (adapter == DBSDataBulkLoader.class) {
if (getDataSource().getServerType().supportsCopyFromStdIn()) {
return adapter.cast(new PostgreCopyLoader(this));
}
} else if (adapter == DBAUserChangePassword.class) {
if (getServerType().supportsAlterUserChangePassword()) {
return adapter.cast(new PostgresUserChangePassword(this));
}
}
return super.getAdapter(adapter);
}
......
......@@ -152,7 +152,10 @@ public interface PostgreServerExtension {
boolean supportsAlterUserChangePassword();
boolean supportsCopyFromStdIn();
int getParameterBindType(DBSTypedObject type, Object value);
int getTruncateToolModes();
}
......@@ -16,7 +16,6 @@
*/
package org.jkiss.dbeaver.ext.postgresql.model;
import org.eclipse.core.runtime.IAdaptable;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.DBException;
......@@ -36,7 +35,6 @@ import org.jkiss.dbeaver.model.meta.Association;
import org.jkiss.dbeaver.model.meta.Property;
import org.jkiss.dbeaver.model.preferences.DBPPropertySource;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.struct.DBSDataBulkLoader;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.model.struct.DBSObject;
import org.jkiss.utils.ByteNumberFormat;
......@@ -50,7 +48,7 @@ import java.util.List;
/**
* PostgreTable base
*/
public abstract class PostgreTableReal extends PostgreTableBase implements DBPObjectStatistics, IAdaptable
public abstract class PostgreTableReal extends PostgreTableBase implements DBPObjectStatistics
{
private static final Log log = Log.getLog(PostgreTableReal.class);
......@@ -271,14 +269,6 @@ public abstract class PostgreTableReal extends PostgreTableBase implements DBPOb
}
@Override
public <T> T getAdapter(Class<T> adapter) {
if (adapter == DBSDataBulkLoader.class) {
return adapter.cast(new PostgreCopyLoader(this));
}
return null;
}
class RuleCache extends JDBCObjectCache<PostgreTableReal, PostgreRule> {
@NotNull
@Override
......
......@@ -252,6 +252,11 @@ public class PostgreServerCockroachDB extends PostgreServerExtensionBase {
return true;
}
@Override
public boolean supportsCopyFromStdIn() {
return true;
}
@Override
public int getTruncateToolModes() {
return TRUNCATE_TOOL_MODE_SUPPORT_ONLY_ONE_TABLE | TRUNCATE_TOOL_MODE_SUPPORT_CASCADE;
......
......@@ -34,6 +34,11 @@ public class PostgreServerEdb extends PostgreServerExtensionBase {
return true;
}
@Override
public boolean supportsCopyFromStdIn() {
return true;
}
@Override
public String getServerTypeName() {
return "EnterpriseDB";
......
......@@ -495,6 +495,11 @@ public abstract class PostgreServerExtensionBase implements PostgreServerExtensi
return false;
}
@Override
public boolean supportsCopyFromStdIn() {
return false;
}
@Override
public int getParameterBindType(DBSTypedObject type, Object value) {
return Types.OTHER;
......
......@@ -80,4 +80,9 @@ public class PostgreServerPostgreSQL extends PostgreServerExtensionBase {
public boolean supportsAlterUserChangePassword() {
return true;
}
@Override
public boolean supportsCopyFromStdIn() {
return true;
}
}
......@@ -32,6 +32,11 @@ public class PostgreServerTimescale extends PostgreServerExtensionBase {
return true;
}
@Override
public boolean supportsCopyFromStdIn() {
return true;
}
@Override
public String getServerTypeName() {
return "Timescale";
......
......@@ -70,8 +70,10 @@ public final class DBFetchProgress {
return fetchedRows % 100 == 0;
} else if (fetchedRows < 100000) {
return fetchedRows % 1000 == 0;
} else {
} else if (fetchedRows < 1000000) {
return fetchedRows % 10000 == 0;
} else {
return fetchedRows % 100000 == 0;
}
}
......
......@@ -32,7 +32,9 @@ public interface DBSDataBulkLoader {
interface BulkLoadManager extends AutoCloseable {
void addRow(@NotNull DBCSession session, @NotNull Object[] attributeValues) throws DBCException;
void flushRows(@NotNull DBCSession session);
void flushRows(@NotNull DBCSession session) throws DBCException;
void finishBulkLoad(@NotNull DBCSession session) throws DBCException;
void close();
}
......@@ -40,6 +42,7 @@ public interface DBSDataBulkLoader {
@NotNull
BulkLoadManager createBulkLoad(
@NotNull DBCSession session,
@NotNull DBSDataContainer dataContainer,
@NotNull DBSAttributeBase[] attributes,
@NotNull DBCExecutionSource source,
int batchSize,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册