提交 00815801 编写于 作者: S serge-rider

#130 Import column mappings and preview

上级 3f286dcd
......@@ -37,6 +37,7 @@ import org.jkiss.dbeaver.model.preferences.DBPPreferenceStore;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRRunnableWithProgress;
import org.jkiss.dbeaver.model.struct.DBSObject;
import org.jkiss.dbeaver.runtime.ProgressStreamReader;
import org.jkiss.dbeaver.runtime.ui.DBUserInterface;
import org.jkiss.dbeaver.ui.UIUtils;
import org.jkiss.utils.CommonUtils;
......@@ -436,6 +437,7 @@ public abstract class AbstractToolWizard<BASE_OBJECT extends DBSObject, PROCESS_
try {
try (InputStream scriptStream = new ProgressStreamReader(
monitor,
task,
new FileInputStream(inputFile),
inputFile.length()))
{
......@@ -482,6 +484,7 @@ public abstract class AbstractToolWizard<BASE_OBJECT extends DBSObject, PROCESS_
{
try (InputStream scriptStream = new ProgressStreamReader(
monitor,
task,
new FileInputStream(inputFile),
inputFile.length()))
{
......@@ -511,75 +514,4 @@ public abstract class AbstractToolWizard<BASE_OBJECT extends DBSObject, PROCESS_
}
}
private class ProgressStreamReader extends InputStream {
static final int BUFFER_SIZE = 10000;
private final DBRProgressMonitor monitor;
private final InputStream original;
private final long streamLength;
private long totalRead;
private ProgressStreamReader(DBRProgressMonitor monitor, InputStream original, long streamLength)
{
this.monitor = monitor;
this.original = original;
this.streamLength = streamLength;
this.totalRead = 0;
monitor.beginTask(task, (int)streamLength);
}
@Override
public int read() throws IOException
{
int res = original.read();
showProgress(res);
return res;
}
@Override
public int read(byte[] b) throws IOException
{
int res = original.read(b);
showProgress(res);
return res;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int res = original.read(b, off, len);
showProgress(res);
return res;
}
@Override
public long skip(long n) throws IOException
{
long res = original.skip(n);
showProgress(res);
return res;
}
@Override
public int available() throws IOException
{
return original.available();
}
@Override
public void close() throws IOException
{
monitor.done();
original.close();
}
private void showProgress(long length)
{
totalRead += length;
monitor.worked((int)length);
}
}
}
......@@ -18,9 +18,10 @@
package org.jkiss.dbeaver.tools.transfer.stream;
import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.tools.transfer.IDataTransferConsumer;
import org.jkiss.dbeaver.tools.transfer.IDataTransferProcessor;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
......@@ -34,6 +35,8 @@ public interface IStreamDataImporter extends IDataTransferProcessor {
List<StreamDataImporterColumnInfo> readColumnsInfo(InputStream inputStream, StreamProducerSettings settings, Map<Object, Object> processorProperties) throws DBException;
void runImport(DBRProgressMonitor monitor, InputStream inputStream, StreamProducerSettings.EntityMapping mapping, Map<Object, Object> properties, int rowCount, IDataTransferConsumer consumer) throws DBException;
void dispose();
}
......@@ -196,22 +196,41 @@ public class StreamProducerPagePreview extends ActiveWizardPage<DataTransferWiza
final DBSEntity entity = (DBSEntity) currentObject;
StreamProducerSettings.EntityMapping entityMapping = settings.getEntityMapping(entity);
DataTransferPipe currentPipe = getCurrentPipe();
StreamTransferProducer currentProducer = (StreamTransferProducer) currentPipe.getProducer();
try {
getWizard().getContainer().run(true, true, mon -> {
IDataTransferProcessor importer = processor.getInstance();
DBRProgressMonitor monitor = new DefaultProgressMonitor(mon);
monitor.beginTask("Load mappings", 3);
try {
monitor.subTask("Load attributes form target object");
for (DBSEntityAttribute attr : CommonUtils.safeCollection(entity.getAttributes(monitor))) {
if (DBUtils.isPseudoAttribute(attr) || DBUtils.isHiddenObject(attr)) {
continue;
}
entityMapping.getAttributeMapping(attr);
}
monitor.worked(1);
// Load header and mappings
monitor.subTask("Load attribute mappings");
if (importer instanceof IStreamDataImporter) {
loadStreamMappings((IStreamDataImporter)importer, entityMapping, currentProducer);
}
IDataTransferProcessor importer = processor.getInstance();
UIUtils.syncExec(() -> updateAttributeMappings(entityMapping));
monitor.worked(1);
// Load preview
monitor.subTask("Load import preview");
if (importer instanceof IStreamDataImporter) {
loadStreamMappings((IStreamDataImporter)importer, entityMapping, currentPipe);
loadImportPreview(monitor, (IStreamDataImporter)importer, entityMapping, currentProducer);
}
monitor.worked(1);
monitor.done();
} catch (DBException e) {
throw new InvocationTargetException(e);
......@@ -224,6 +243,14 @@ public class StreamProducerPagePreview extends ActiveWizardPage<DataTransferWiza
return;
}
UIUtils.asyncExec(() -> {
UIUtils.packColumns(mappingsTable, true);
UIUtils.packColumns(previewTable, false);
});
}
private void updateAttributeMappings(StreamProducerSettings.EntityMapping entityMapping) {
for (StreamProducerSettings.AttributeMapping am : entityMapping.getAttributeMappings()) {
// Create mapping item
TableItem mappingItem = new TableItem(mappingsTable, SWT.NONE);
......@@ -242,19 +269,13 @@ public class StreamProducerPagePreview extends ActiveWizardPage<DataTransferWiza
// Create preview column
UIUtils.createTableColumn(previewTable, SWT.LEFT, am.getTargetAttributeName());
}
UIUtils.asyncExec(() -> {
UIUtils.packColumns(mappingsTable, true);
UIUtils.packColumns(previewTable, false);
});
}
public DataTransferPipe getCurrentPipe() {
return pipeList.get(tableList.getSelectionIndex());
}
private void loadStreamMappings(IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, DataTransferPipe currentPipe) throws DBException {
StreamTransferProducer currentProducer = (StreamTransferProducer) currentPipe.getProducer();
private void loadStreamMappings(IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, StreamTransferProducer currentProducer) throws DBException {
File inputFile = currentProducer.getInputFile();
final StreamProducerSettings settings = getWizard().getPageSettings(this, StreamProducerSettings.class);
......@@ -272,28 +293,52 @@ public class StreamProducerPagePreview extends ActiveWizardPage<DataTransferWiza
// Map source columns
List<StreamProducerSettings.AttributeMapping> attributeMappings = entityMapping.getAttributeMappings();
for (StreamDataImporterColumnInfo columnInfo : columnInfos) {
boolean mappingFound = false;
if (columnInfo.getColumnName() != null) {
for (StreamProducerSettings.AttributeMapping attr : attributeMappings) {
if (CommonUtils.equalObjects(attr.getTargetAttributeName(), columnInfo.getColumnName())) {
if (attr.getMappingType() == StreamProducerSettings.AttributeMapping.MappingType.NONE) {
// Set source name only if it wasn't set
attr.setSourceAttributeName(columnInfo.getColumnName());
attr.setSourceAttributeIndex(columnInfo.getColumnIndex());
attr.setMappingType(StreamProducerSettings.AttributeMapping.MappingType.IMPORT);
}
mappingFound = true;
break;
}
}
} else {
}
if (!mappingFound) {
if (columnInfo.getColumnIndex() >= 0 && columnInfo.getColumnIndex() < attributeMappings.size()) {
StreamProducerSettings.AttributeMapping attr = attributeMappings.get(columnInfo.getColumnIndex());
if (attr.getMappingType() == StreamProducerSettings.AttributeMapping.MappingType.NONE) {
if (!CommonUtils.isEmpty(columnInfo.getColumnName())) {
attr.setSourceAttributeName(columnInfo.getColumnName());
}
attr.setSourceAttributeIndex(columnInfo.getColumnIndex());
attr.setMappingType(StreamProducerSettings.AttributeMapping.MappingType.IMPORT);
}
}
}
}
}
private void loadImportPreview(DBRProgressMonitor monitor, IStreamDataImporter importer, StreamProducerSettings.EntityMapping entityMapping, StreamTransferProducer currentProducer) throws DBException {
File inputFile = currentProducer.getInputFile();
final StreamProducerSettings settings = getWizard().getPageSettings(this, StreamProducerSettings.class);
final Map<Object, Object> processorProperties = getWizard().getSettings().getProcessorProperties();
try (InputStream is = new FileInputStream(inputFile)) {
importer.init(new StreamDataImporterSite());
//importer.runImport(monitor, is, entityMapping, processorProperties, 10, );
importer.dispose();
} catch (IOException e) {
throw new DBException("IO error", e);
}
}
@Override
public void deactivatePage()
{
......
......@@ -19,6 +19,8 @@ package org.jkiss.dbeaver.tools.transfer.stream.importer;
import au.com.bytecode.opencsv.CSVReader;
import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.tools.transfer.IDataTransferConsumer;
import org.jkiss.dbeaver.tools.transfer.stream.StreamDataImporterColumnInfo;
import org.jkiss.dbeaver.tools.transfer.stream.StreamProducerSettings;
import org.jkiss.dbeaver.utils.GeneralUtils;
......@@ -52,7 +54,36 @@ public class DataImporterCSV extends StreamImporterAbstract {
public List<StreamDataImporterColumnInfo> readColumnsInfo(InputStream inputStream, StreamProducerSettings settings, Map<Object, Object> processorProperties) throws DBException {
List<StreamDataImporterColumnInfo> columnsInfo = new ArrayList<>();
String encoding = CommonUtils.toString(processorProperties.get(PROP_ENCODING), GeneralUtils.UTF8_ENCODING);
HeaderPosition headerPosition = getHeaderPosition(processorProperties);
try (Reader reader = openStreamReader(inputStream, processorProperties)) {
try (CSVReader csvReader = openCSVReader(reader, processorProperties)) {
for (;;) {
String[] line = csvReader.readNext();
if (line == null) {
break;
}
if (line.length == 0) {
continue;
}
for (int i = 0; i < line.length; i++) {
String column = line[i];
if (headerPosition == HeaderPosition.none) {
column = null;
}
columnsInfo.add(new StreamDataImporterColumnInfo(i, column));
}
break;
}
}
} catch (IOException e) {
throw new DBException("IO error reading CSV", e);
}
return columnsInfo;
}
private HeaderPosition getHeaderPosition(Map<Object, Object> processorProperties) {
String header = CommonUtils.toString(processorProperties.get(PROP_HEADER), HeaderPosition.top.name());
HeaderPosition headerPosition = HeaderPosition.none;
try {
......@@ -60,9 +91,26 @@ public class DataImporterCSV extends StreamImporterAbstract {
} catch (IllegalArgumentException e) {
log.warn("Invalid header position: " + header);
}
try (Reader reader = new InputStreamReader(inputStream, encoding)) {
try (CSVReader csvReader = new CSVReader(reader)) {
for (;;) {
return headerPosition;
}
private CSVReader openCSVReader(Reader reader, Map<Object, Object> processorProperties) {
return new CSVReader(reader);
}
private InputStreamReader openStreamReader(InputStream inputStream, Map<Object, Object> processorProperties) throws UnsupportedEncodingException {
String encoding = CommonUtils.toString(processorProperties.get(PROP_ENCODING), GeneralUtils.UTF8_ENCODING);
return new InputStreamReader(inputStream, encoding);
}
@Override
public void runImport(DBRProgressMonitor monitor, InputStream inputStream, StreamProducerSettings.EntityMapping mapping, Map<Object, Object> properties, int rowCount, IDataTransferConsumer consumer) throws DBException {
HeaderPosition headerPosition = getHeaderPosition(properties);
try (Reader reader = openStreamReader(inputStream, properties)) {
try (CSVReader csvReader = openCSVReader(reader, properties)) {
boolean headerRead = false;
for (int lineNum = 0; rowCount > 0 && lineNum < rowCount; lineNum++) {
String[] line = csvReader.readNext();
if (line == null) {
break;
......@@ -70,12 +118,16 @@ public class DataImporterCSV extends StreamImporterAbstract {
if (line.length == 0) {
continue;
}
if (headerPosition != HeaderPosition.none && !headerRead) {
// First line is a header
headerRead = true;
continue;
}
for (int i = 0; i < line.length; i++) {
String column = line[i];
if (headerPosition == HeaderPosition.none) {
column = null;
}
columnsInfo.add(new StreamDataImporterColumnInfo(i, column));
}
break;
}
......@@ -83,7 +135,6 @@ public class DataImporterCSV extends StreamImporterAbstract {
} catch (IOException e) {
throw new DBException("IO error reading CSV", e);
}
return columnsInfo;
}
}
\ No newline at end of file
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2017 Serge Rider (serge@jkiss.org)
* Copyright (C) 2011-2012 Eugene Fradkin (eugene.fradkin@gmail.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jkiss.dbeaver.runtime;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import java.io.IOException;
import java.io.InputStream;
public class ProgressStreamReader extends InputStream {
static final int BUFFER_SIZE = 10000;
private final DBRProgressMonitor monitor;
private final InputStream original;
private final long streamLength;
private long totalRead;
public ProgressStreamReader(DBRProgressMonitor monitor, String task, InputStream original, long streamLength)
{
this.monitor = monitor;
this.original = original;
this.streamLength = streamLength;
this.totalRead = 0;
monitor.beginTask(task, (int)streamLength);
}
@Override
public int read() throws IOException
{
int res = original.read();
showProgress(res);
return res;
}
@Override
public int read(byte[] b) throws IOException
{
int res = original.read(b);
showProgress(res);
return res;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int res = original.read(b, off, len);
showProgress(res);
return res;
}
@Override
public long skip(long n) throws IOException
{
long res = original.skip(n);
showProgress(res);
return res;
}
@Override
public int available() throws IOException
{
return original.available();
}
@Override
public void close() throws IOException
{
monitor.done();
original.close();
}
private void showProgress(long length)
{
totalRead += length;
monitor.worked((int)length);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册