diff --git a/plugins/org.jkiss.dbeaver.core/src/org/jkiss/dbeaver/ui/dialogs/tools/AbstractToolWizard.java b/plugins/org.jkiss.dbeaver.core/src/org/jkiss/dbeaver/ui/dialogs/tools/AbstractToolWizard.java index c3ae53c6dfccbd037c0b99c39661d67ad474a934..151c493d56c306eecd6127dd08f501780b071023 100644 --- a/plugins/org.jkiss.dbeaver.core/src/org/jkiss/dbeaver/ui/dialogs/tools/AbstractToolWizard.java +++ b/plugins/org.jkiss.dbeaver.core/src/org/jkiss/dbeaver/ui/dialogs/tools/AbstractToolWizard.java @@ -37,6 +37,7 @@ import org.jkiss.dbeaver.model.preferences.DBPPreferenceStore; import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor; import org.jkiss.dbeaver.model.runtime.DBRRunnableWithProgress; import org.jkiss.dbeaver.model.struct.DBSObject; +import org.jkiss.dbeaver.runtime.ProgressStreamReader; import org.jkiss.dbeaver.runtime.ui.DBUserInterface; import org.jkiss.dbeaver.ui.UIUtils; import org.jkiss.utils.CommonUtils; @@ -436,6 +437,7 @@ public abstract class AbstractToolWizard readColumnsInfo(InputStream inputStream, StreamProducerSettings settings, Map processorProperties) throws DBException; + void runImport(DBRProgressMonitor monitor, InputStream inputStream, StreamProducerSettings.EntityMapping mapping, Map properties, int rowCount, IDataTransferConsumer consumer) throws DBException; + void dispose(); } diff --git a/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/StreamProducerPagePreview.java b/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/StreamProducerPagePreview.java index eedaf77a9468c1ccc520f14b1a8205c4115863e7..a32a32b9f73c5a63cf69a45179f746b46e16d25d 100644 --- a/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/StreamProducerPagePreview.java +++ b/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/StreamProducerPagePreview.java @@ -196,22 +196,41 @@ public class StreamProducerPagePreview extends ActiveWizardPage { + IDataTransferProcessor importer = processor.getInstance(); + DBRProgressMonitor monitor = new DefaultProgressMonitor(mon); + monitor.beginTask("Load mappings", 3); try { + monitor.subTask("Load attributes form target object"); for (DBSEntityAttribute attr : CommonUtils.safeCollection(entity.getAttributes(monitor))) { if (DBUtils.isPseudoAttribute(attr) || DBUtils.isHiddenObject(attr)) { continue; } entityMapping.getAttributeMapping(attr); } + monitor.worked(1); + + // Load header and mappings + monitor.subTask("Load attribute mappings"); + if (importer instanceof IStreamDataImporter) { + loadStreamMappings((IStreamDataImporter)importer, entityMapping, currentProducer); + } - IDataTransferProcessor importer = processor.getInstance(); + UIUtils.syncExec(() -> updateAttributeMappings(entityMapping)); + monitor.worked(1); + + // Load preview + monitor.subTask("Load import preview"); if (importer instanceof IStreamDataImporter) { - loadStreamMappings((IStreamDataImporter)importer, entityMapping, currentPipe); + loadImportPreview(monitor, (IStreamDataImporter)importer, entityMapping, currentProducer); } + monitor.worked(1); + + monitor.done(); } catch (DBException e) { throw new InvocationTargetException(e); @@ -224,6 +243,14 @@ public class StreamProducerPagePreview extends ActiveWizardPage { + UIUtils.packColumns(mappingsTable, true); + UIUtils.packColumns(previewTable, false); + }); + } + + private void updateAttributeMappings(StreamProducerSettings.EntityMapping entityMapping) { for (StreamProducerSettings.AttributeMapping am : entityMapping.getAttributeMappings()) { // Create mapping item TableItem mappingItem = new TableItem(mappingsTable, SWT.NONE); @@ -242,19 +269,13 @@ public class StreamProducerPagePreview extends ActiveWizardPage { - UIUtils.packColumns(mappingsTable, true); - UIUtils.packColumns(previewTable, false); - }); } public DataTransferPipe getCurrentPipe() { return pipeList.get(tableList.getSelectionIndex()); } - private void loadStreamMappings(IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, DataTransferPipe currentPipe) throws DBException { - StreamTransferProducer currentProducer = (StreamTransferProducer) currentPipe.getProducer(); + private void loadStreamMappings(IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, StreamTransferProducer currentProducer) throws DBException { File inputFile = currentProducer.getInputFile(); final StreamProducerSettings settings = getWizard().getPageSettings(this, StreamProducerSettings.class); @@ -272,28 +293,52 @@ public class StreamProducerPagePreview extends ActiveWizardPage attributeMappings = entityMapping.getAttributeMappings(); for (StreamDataImporterColumnInfo columnInfo : columnInfos) { + boolean mappingFound = false; if (columnInfo.getColumnName() != null) { for (StreamProducerSettings.AttributeMapping attr : attributeMappings) { if (CommonUtils.equalObjects(attr.getTargetAttributeName(), columnInfo.getColumnName())) { if (attr.getMappingType() == StreamProducerSettings.AttributeMapping.MappingType.NONE) { // Set source name only if it wasn't set attr.setSourceAttributeName(columnInfo.getColumnName()); + attr.setSourceAttributeIndex(columnInfo.getColumnIndex()); attr.setMappingType(StreamProducerSettings.AttributeMapping.MappingType.IMPORT); } + mappingFound = true; break; } } - } else { + } + if (!mappingFound) { if (columnInfo.getColumnIndex() >= 0 && columnInfo.getColumnIndex() < attributeMappings.size()) { StreamProducerSettings.AttributeMapping attr = attributeMappings.get(columnInfo.getColumnIndex()); if (attr.getMappingType() == StreamProducerSettings.AttributeMapping.MappingType.NONE) { + if (!CommonUtils.isEmpty(columnInfo.getColumnName())) { + attr.setSourceAttributeName(columnInfo.getColumnName()); + } attr.setSourceAttributeIndex(columnInfo.getColumnIndex()); + attr.setMappingType(StreamProducerSettings.AttributeMapping.MappingType.IMPORT); } } } } } + private void loadImportPreview(DBRProgressMonitor monitor, IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, StreamTransferProducer currentProducer) throws DBException { + File inputFile = currentProducer.getInputFile(); + + final StreamProducerSettings settings = getWizard().getPageSettings(this, StreamProducerSettings.class); + final Map processorProperties = getWizard().getSettings().getProcessorProperties(); + + try (InputStream is = new FileInputStream(inputFile)) { + importer.init(new StreamDataImporterSite()); + //importer.runImport(monitor, is, entityMapping, processorProperties, 10, ); + importer.dispose(); + } catch (IOException e) { + throw new DBException("IO error", e); + } + + } + @Override public void deactivatePage() { diff --git a/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/importer/DataImporterCSV.java b/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/importer/DataImporterCSV.java index 2bb51f18d19afb542a2cfa07f644d1ebcdfdc750..f0d0174d5052bdc0f9b1249d7ba079df958f705e 100644 --- a/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/importer/DataImporterCSV.java +++ b/plugins/org.jkiss.dbeaver.data.transfer/src/org/jkiss/dbeaver/tools/transfer/stream/importer/DataImporterCSV.java @@ -19,6 +19,8 @@ package org.jkiss.dbeaver.tools.transfer.stream.importer; import au.com.bytecode.opencsv.CSVReader; import org.jkiss.dbeaver.DBException; import org.jkiss.dbeaver.Log; +import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor; +import org.jkiss.dbeaver.tools.transfer.IDataTransferConsumer; import org.jkiss.dbeaver.tools.transfer.stream.StreamDataImporterColumnInfo; import org.jkiss.dbeaver.tools.transfer.stream.StreamProducerSettings; import org.jkiss.dbeaver.utils.GeneralUtils; @@ -52,7 +54,36 @@ public class DataImporterCSV extends StreamImporterAbstract { public List readColumnsInfo(InputStream inputStream, StreamProducerSettings settings, Map processorProperties) throws DBException { List columnsInfo = new ArrayList<>(); - String encoding = CommonUtils.toString(processorProperties.get(PROP_ENCODING), GeneralUtils.UTF8_ENCODING); + HeaderPosition headerPosition = getHeaderPosition(processorProperties); + + try (Reader reader = openStreamReader(inputStream, processorProperties)) { + try (CSVReader csvReader = openCSVReader(reader, processorProperties)) { + for (;;) { + String[] line = csvReader.readNext(); + if (line == null) { + break; + } + if (line.length == 0) { + continue; + } + for (int i = 0; i < line.length; i++) { + String column = line[i]; + if (headerPosition == HeaderPosition.none) { + column = null; + } + columnsInfo.add(new StreamDataImporterColumnInfo(i, column)); + } + break; + } + } + } catch (IOException e) { + throw new DBException("IO error reading CSV", e); + } + + return columnsInfo; + } + + private HeaderPosition getHeaderPosition(Map processorProperties) { String header = CommonUtils.toString(processorProperties.get(PROP_HEADER), HeaderPosition.top.name()); HeaderPosition headerPosition = HeaderPosition.none; try { @@ -60,9 +91,26 @@ public class DataImporterCSV extends StreamImporterAbstract { } catch (IllegalArgumentException e) { log.warn("Invalid header position: " + header); } - try (Reader reader = new InputStreamReader(inputStream, encoding)) { - try (CSVReader csvReader = new CSVReader(reader)) { - for (;;) { + return headerPosition; + } + + private CSVReader openCSVReader(Reader reader, Map processorProperties) { + return new CSVReader(reader); + } + + private InputStreamReader openStreamReader(InputStream inputStream, Map processorProperties) throws UnsupportedEncodingException { + String encoding = CommonUtils.toString(processorProperties.get(PROP_ENCODING), GeneralUtils.UTF8_ENCODING); + return new InputStreamReader(inputStream, encoding); + } + + @Override + public void runImport(DBRProgressMonitor monitor, InputStream inputStream, StreamProducerSettings.EntityMapping mapping, Map properties, int rowCount, IDataTransferConsumer consumer) throws DBException { + HeaderPosition headerPosition = getHeaderPosition(properties); + + try (Reader reader = openStreamReader(inputStream, properties)) { + try (CSVReader csvReader = openCSVReader(reader, properties)) { + boolean headerRead = false; + for (int lineNum = 0; rowCount > 0 && lineNum < rowCount; lineNum++) { String[] line = csvReader.readNext(); if (line == null) { break; @@ -70,12 +118,16 @@ public class DataImporterCSV extends StreamImporterAbstract { if (line.length == 0) { continue; } + if (headerPosition != HeaderPosition.none && !headerRead) { + // First line is a header + headerRead = true; + continue; + } for (int i = 0; i < line.length; i++) { String column = line[i]; if (headerPosition == HeaderPosition.none) { column = null; } - columnsInfo.add(new StreamDataImporterColumnInfo(i, column)); } break; } @@ -83,7 +135,6 @@ public class DataImporterCSV extends StreamImporterAbstract { } catch (IOException e) { throw new DBException("IO error reading CSV", e); } - - return columnsInfo; } + } \ No newline at end of file diff --git a/plugins/org.jkiss.dbeaver.model/src/org/jkiss/dbeaver/runtime/ProgressStreamReader.java b/plugins/org.jkiss.dbeaver.model/src/org/jkiss/dbeaver/runtime/ProgressStreamReader.java new file mode 100644 index 0000000000000000000000000000000000000000..60808df9e74bc6adfc329b0633a0c7c0c39ddf36 --- /dev/null +++ b/plugins/org.jkiss.dbeaver.model/src/org/jkiss/dbeaver/runtime/ProgressStreamReader.java @@ -0,0 +1,94 @@ +/* + * DBeaver - Universal Database Manager + * Copyright (C) 2010-2017 Serge Rider (serge@jkiss.org) + * Copyright (C) 2011-2012 Eugene Fradkin (eugene.fradkin@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jkiss.dbeaver.runtime; + +import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor; + +import java.io.IOException; +import java.io.InputStream; + +public class ProgressStreamReader extends InputStream { + + static final int BUFFER_SIZE = 10000; + + private final DBRProgressMonitor monitor; + private final InputStream original; + private final long streamLength; + private long totalRead; + + public ProgressStreamReader(DBRProgressMonitor monitor, String task, InputStream original, long streamLength) + { + this.monitor = monitor; + this.original = original; + this.streamLength = streamLength; + this.totalRead = 0; + + monitor.beginTask(task, (int)streamLength); + } + + @Override + public int read() throws IOException + { + int res = original.read(); + showProgress(res); + return res; + } + + @Override + public int read(byte[] b) throws IOException + { + int res = original.read(b); + showProgress(res); + return res; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + int res = original.read(b, off, len); + showProgress(res); + return res; + } + + @Override + public long skip(long n) throws IOException + { + long res = original.skip(n); + showProgress(res); + return res; + } + + @Override + public int available() throws IOException + { + return original.available(); + } + + @Override + public void close() throws IOException + { + monitor.done(); + original.close(); + } + + private void showProgress(long length) + { + totalRead += length; + monitor.worked((int)length); + } +}