提交 91f15215 编写于 作者: S Serge Rider

#130 Data transfer fixes (db to db, stream to db).

上级 52dc41da
......@@ -175,7 +175,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
attrValue = resultSet.getAttributeValue(i);
}
rowValues[column.targetIndex] = column.targetValueHandler.getValueFromObject(
session,
targetSession,
column.targetAttr.getTarget(),
attrValue,
false);
......
......@@ -113,11 +113,11 @@ public class DataImporterCSV extends StreamImporterAbstract {
Map<Object, Object> properties = site.getProcessorProperties();
HeaderPosition headerPosition = getHeaderPosition(properties);
try (StreamTransferSession session = new StreamTransferSession(monitor, DBCExecutionPurpose.UTIL, "Transfer stream data")) {
LocalStatement localStatement = new LocalStatement(session, "SELECT * FROM Stream");
StreamTransferResultSet resultSet = new StreamTransferResultSet(session, localStatement, entityMapping);
try (StreamTransferSession producerSession = new StreamTransferSession(monitor, DBCExecutionPurpose.UTIL, "Transfer stream data")) {
LocalStatement localStatement = new LocalStatement(producerSession, "SELECT * FROM Stream");
StreamTransferResultSet resultSet = new StreamTransferResultSet(producerSession, localStatement, entityMapping);
consumer.fetchStart(session, resultSet, -1, -1);
consumer.fetchStart(producerSession, resultSet, -1, -1);
try (Reader reader = openStreamReader(inputStream, properties)) {
try (CSVReader csvReader = openCSVReader(reader, properties)) {
......@@ -153,14 +153,14 @@ public class DataImporterCSV extends StreamImporterAbstract {
}
resultSet.setStreamRow(line);
consumer.fetchRow(session, resultSet);
consumer.fetchRow(producerSession, resultSet);
lineNum++;
}
}
} catch (IOException e) {
throw new DBException("IO error reading CSV", e);
} finally {
consumer.fetchEnd(session, resultSet);
consumer.fetchEnd(producerSession, resultSet);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册