提交 c7d52749 编写于 作者: S Serge Rider

#8669 Stream import: use target column type as a hint for transformation

上级 55bbd6c0
......@@ -58,6 +58,10 @@ public interface IDataTransferConsumer<SETTINGS extends IDataTransferSettings, P
*/
void finishTransfer(DBRProgressMonitor monitor, boolean last);
// Target object. May be null or target database object (table)
@Nullable
Object getTargetObject();
// If not null then this consumer is a fake one which must be replaced by explicit target consumers on configuration stage
@Nullable
Object getTargetObjectContainer();
......
......@@ -20,8 +20,18 @@ package org.jkiss.dbeaver.tools.transfer.database;
* Mapping type
*/
public enum DatabaseMappingType {
unspecified,
existing,
create,
skip
unspecified(false),
existing(true),
create(true),
skip(false);
private final boolean isValid;
DatabaseMappingType(boolean isValid) {
this.isValid = isValid;
}
public boolean isValid() {
return isValid;
}
}
......@@ -27,7 +27,7 @@ import org.jkiss.dbeaver.model.struct.DBSEntityAttribute;
public class StreamDataImporterColumnInfo extends AbstractAttribute implements DBSEntityAttribute {
private StreamEntityMapping entityMapping;
private final DBPDataKind dataKind;
private DBPDataKind dataKind;
public StreamDataImporterColumnInfo(StreamEntityMapping entity, int columnIndex, String columnName, String typeName, int maxLength, DBPDataKind dataKind) {
super(columnName, typeName, -1, columnIndex, maxLength, null, null, false, false);
......@@ -57,4 +57,8 @@ public class StreamDataImporterColumnInfo extends AbstractAttribute implements D
public DBPDataSource getDataSource() {
return entityMapping.getDataSource();
}
public void setDataKind(DBPDataKind dataKind) {
this.dataKind = dataKind;
}
}
......@@ -395,6 +395,11 @@ public class StreamTransferConsumer implements IDataTransferConsumer<StreamConsu
}
}
@Override
public Object getTargetObject() {
return null;
}
@Nullable
@Override
public Object getTargetObjectContainer() {
......
......@@ -60,6 +60,10 @@ public class StreamTransferResultSet implements DBCResultSet {
.collect(Collectors.toList());
}
public List<StreamDataImporterColumnInfo> getAttributeMappings() {
return attributeMappings;
}
public void setStreamRow(Object[] streamRow) {
this.streamRow = streamRow;
}
......@@ -93,7 +97,7 @@ public class StreamTransferResultSet implements DBCResultSet {
} catch (Exception e) {
LocalDateTime localDT = LocalDateTime.from(ta);
if (localDT != null) {
value = java.util.Date.from(localDT.atZone(ZoneId.systemDefault()).toInstant());
value = java.util.Date.from(localDT.atZone(ZoneId.of("UTC")).toInstant());
}
}
} catch (Exception e) {
......
......@@ -19,7 +19,6 @@ package org.jkiss.dbeaver.tools.transfer.stream.importer;
import au.com.bytecode.opencsv.CSVReader;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBPDataKind;
import org.jkiss.dbeaver.model.DBPDataSource;
import org.jkiss.dbeaver.model.DBUtils;
......@@ -34,7 +33,6 @@ import org.jkiss.dbeaver.utils.GeneralUtils;
import org.jkiss.utils.CommonUtils;
import java.io.*;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -44,8 +42,6 @@ import java.util.Map;
*/
public class DataImporterCSV extends StreamImporterAbstract {
private static final Log log = Log.getLog(DataImporterCSV.class);
private static final String PROP_ENCODING = "encoding";
private static final String PROP_HEADER = "header";
private static final String PROP_DELIMITER = "delimiter";
......@@ -132,28 +128,16 @@ public class DataImporterCSV extends StreamImporterAbstract {
HeaderPosition headerPosition = getHeaderPosition(properties);
boolean emptyStringNull = CommonUtils.getBoolean(properties.get(PROP_EMPTY_STRING_NULL), false);
String nullValueMark = CommonUtils.toString(properties.get(PROP_NULL_STRING));
DateTimeFormatter tsFormat = null;
String tsFormatPattern = CommonUtils.toString(properties.get(PROP_TIMESTAMP_FORMAT));
if (!CommonUtils.isEmpty(tsFormatPattern)) {
try {
tsFormat = DateTimeFormatter.ofPattern(tsFormatPattern);
} catch (Exception e) {
log.error("Wrong timestamp format: " + tsFormatPattern, e);
}
//Map<Object, Object> defTSProps = site.getSourceObject().getDataSource().getContainer().getDataFormatterProfile().getFormatterProperties(DBDDataFormatter.TYPE_NAME_TIMESTAMP);
}
DBCExecutionContext context = streamDataSource.getDefaultInstance().getDefaultContext(monitor, false);
try (DBCSession producerSession = context.openSession(monitor, DBCExecutionPurpose.UTIL, "Transfer stream data")) {
LocalStatement localStatement = new LocalStatement(producerSession, "SELECT * FROM Stream");
StreamTransferResultSet resultSet = new StreamTransferResultSet(producerSession, localStatement, entityMapping);
if (tsFormat != null) {
resultSet.setDateTimeFormat(tsFormat);
}
consumer.fetchStart(producerSession, resultSet, -1, -1);
applyTransformHints(resultSet, consumer, getTimeStampFormat(properties, PROP_TIMESTAMP_FORMAT));
try (Reader reader = openStreamReader(inputStream, properties)) {
try (CSVReader csvReader = openCSVReader(reader, properties)) {
......
......@@ -17,15 +17,30 @@
package org.jkiss.dbeaver.tools.transfer.stream.importer;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.DBException;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBPDataKind;
import org.jkiss.dbeaver.model.struct.DBSEntity;
import org.jkiss.dbeaver.model.struct.DBSEntityAttribute;
import org.jkiss.dbeaver.tools.transfer.IDataTransferConsumer;
import org.jkiss.dbeaver.tools.transfer.database.DatabaseTransferConsumer;
import org.jkiss.dbeaver.tools.transfer.stream.IStreamDataImporter;
import org.jkiss.dbeaver.tools.transfer.stream.IStreamDataImporterSite;
import org.jkiss.dbeaver.tools.transfer.stream.StreamDataImporterColumnInfo;
import org.jkiss.dbeaver.tools.transfer.stream.StreamTransferResultSet;
import org.jkiss.utils.CommonUtils;
import java.time.format.DateTimeFormatter;
import java.util.Map;
/**
* Abstract stream importer
*/
public abstract class StreamImporterAbstract implements IStreamDataImporter {
private static final Log log = Log.getLog(StreamImporterAbstract.class);
private IStreamDataImporterSite site;
public IStreamDataImporterSite getSite()
......@@ -45,4 +60,54 @@ public abstract class StreamImporterAbstract implements IStreamDataImporter {
// do nothing
}
@Nullable
protected DateTimeFormatter getTimeStampFormat(Map<String, Object> properties, String propName) {
DateTimeFormatter tsFormat = null;
String tsFormatPattern = CommonUtils.toString(properties.get(propName));
if (!CommonUtils.isEmpty(tsFormatPattern)) {
try {
tsFormat = DateTimeFormatter.ofPattern(tsFormatPattern);
} catch (Exception e) {
log.error("Wrong timestamp format: " + tsFormatPattern, e);
}
}
return tsFormat;
}
protected void applyTransformHints(StreamTransferResultSet resultSet, IDataTransferConsumer consumer, DateTimeFormatter tsFormat) throws DBException {
if (tsFormat != null) {
resultSet.setDateTimeFormat(tsFormat);
}
// Try to find source/target attributes
// Modify source data type and data kind for timestamps and numerics
// Do it only for valid String mappings
if (consumer instanceof DatabaseTransferConsumer) {
for (DatabaseTransferConsumer.ColumnMapping cm : ((DatabaseTransferConsumer) consumer).getColumnMappings()) {
for (StreamDataImporterColumnInfo attributeMapping : resultSet.getAttributeMappings()) {
if (cm.targetAttr.getMappingType().isValid()) {
if (cm.sourceAttr.getDataKind() == DBPDataKind.STRING && cm.sourceAttr.getName().equals(attributeMapping.getName())) {
// Gotcha
DBSEntityAttribute targetAttr = cm.targetAttr.getTarget();
if (targetAttr != null) {
switch (targetAttr.getDataKind()) {
case DATETIME:
case NUMERIC:
case BOOLEAN:
attributeMapping.setDataKind(targetAttr.getDataKind());
break;
}
}
}
}
}
}
Object targetObject = consumer.getTargetObject();
if (targetObject instanceof DBSEntity) {
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册