提交 937d45c2 编写于 作者: S serge-rider

#8994 PG copy bulk load support

上级 3c4b75a2
......@@ -345,7 +345,7 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
}
if (bulkLoadManager != null) {
bulkLoadManager.addRow(session, rowValues);
bulkLoadManager.addRow(targetSession, rowValues);
} else {
executeBatch.add(rowValues);
}
......@@ -452,7 +452,11 @@ public class DatabaseTransferConsumer implements IDataTransferConsumer<DatabaseC
insertBatch(true);
}
if (bulkLoadManager != null) {
bulkLoadManager.close();
try {
bulkLoadManager.finishBulkLoad(targetSession);
} finally {
bulkLoadManager.close();
}
bulkLoadManager = null;
} else if (executeBatch != null) {
executeBatch.close();
......
......@@ -15,7 +15,7 @@ Require-Bundle: org.jkiss.dbeaver.model,
com.sun.jna.platform,
org.jkiss.dbeaver.model.sql
Bundle-ActivationPolicy: lazy
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Bundle-RequiredExecutionEnvironment: JavaSE-11
Bundle-Vendor: %Bundle-Vendor
Bundle-ClassPath: .
Export-Package: org.jkiss.dbeaver.ext.postgresql,
......
......@@ -18,22 +18,63 @@ package org.jkiss.dbeaver.ext.postgresql.model;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.DBPEvaluationContext;
import org.jkiss.dbeaver.model.DBUtils;
import org.jkiss.dbeaver.model.data.DBDDisplayFormat;
import org.jkiss.dbeaver.model.data.DBDValueHandler;
import org.jkiss.dbeaver.model.exec.DBCException;
import org.jkiss.dbeaver.model.exec.DBCExecutionSource;
import org.jkiss.dbeaver.model.exec.DBCSession;
import org.jkiss.dbeaver.model.exec.DBCTransactionManager;
import org.jkiss.dbeaver.model.exec.jdbc.JDBCSession;
import org.jkiss.dbeaver.model.struct.DBSAttributeBase;
import org.jkiss.dbeaver.model.struct.DBSDataBulkLoader;
import org.jkiss.dbeaver.runtime.DBWorkbench;
import org.jkiss.utils.ArrayUtils;
import org.jkiss.utils.CommonUtils;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
/**
* Bulk loader based on CopyManager
*
* // new CopyManager((BaseConnection) conn)
* // .copyIn(
* // "COPY table1 FROM STDIN (FORMAT csv)",
* // new BufferedReader(new FileReader("data.csv"))
* // );
*/
public class PostgreCopyLoader implements DBSDataBulkLoader, DBSDataBulkLoader.BulkLoadManager {
private static final Log log = Log.getLog(PostgreCopyLoader.class);
private final PostgreTableReal table;
private Object copyManager;
private Method copyInMethod;
private Writer csvWriter;
private File csvFile;
private AttrMapping[] mappings;
private int copyBufferSize = 100 * 1024;
private static class AttrMapping {
PostgreTableColumn tableAttr;
DBDValueHandler valueHandler;
int srcPos;
AttrMapping(PostgreTableColumn tableAttr, DBDValueHandler valueHandler, int srcPos) {
this.tableAttr = tableAttr;
this.valueHandler = valueHandler;
this.srcPos = srcPos;
}
}
public PostgreCopyLoader(PostgreTableReal table) {
this.table = table;
......@@ -43,31 +84,119 @@ public class PostgreCopyLoader implements DBSDataBulkLoader, DBSDataBulkLoader.B
@Override
public BulkLoadManager createBulkLoad(@NotNull DBCSession session, @NotNull DBSAttributeBase[] attributes, @NotNull DBCExecutionSource source, int batchSize, Map<String, Object> options) throws DBCException {
try {
Object driverInstance = session.getDataSource().getContainer().getDriver().getDriverInstance(session.getProgressMonitor());
Object copyManager = Class.forName("CopyManager", true, driverInstance.getClass().getClassLoader()).getConstructor().newInstance();
// Use reflection to create copy manager
Connection pgConnection = ((JDBCSession) session).getOriginal();
ClassLoader driverClassLoader = pgConnection.getClass().getClassLoader();
Class<?> baseConnectionClass = Class.forName("org.postgresql.core.BaseConnection", true, driverClassLoader);
Class<?> copyManagerClass = Class.forName("org.postgresql.copy.CopyManager", true, driverClassLoader);
// Get method copyIn(final String sql, Reader from, int bufferSize)
copyInMethod = copyManagerClass.getMethod("copyIn", String.class, Reader.class, Integer.TYPE);
copyManager = copyManagerClass.getConstructor(baseConnectionClass).newInstance(pgConnection);
File tempFolder = DBWorkbench.getPlatform().getTempFolder(session.getProgressMonitor(), "postgesql-copy-datasets");
csvFile = new File(tempFolder, CommonUtils.escapeFileName(table.getFullyQualifiedName(DBPEvaluationContext.DML)) + "-" + System.currentTimeMillis() + ".csv"); //$NON-NLS-1$ //$NON-NLS-2$
if (!csvFile.createNewFile()){
throw new IOException("Can't create CSV file " + csvFile.getAbsolutePath());
}
csvWriter = new BufferedWriter(
new FileWriter(csvFile, StandardCharsets.UTF_8),
copyBufferSize
);
List<? extends PostgreTableColumn> tableAttrs = CommonUtils.safeList(table.getAttributes(session.getProgressMonitor()));
tableAttrs.removeIf(a -> a.getOrdinalPosition() < 0);
mappings = new AttrMapping[tableAttrs.size()];
for (int i = 0; i < tableAttrs.size(); i++) {
PostgreTableColumn attr = tableAttrs.get(i);
DBDValueHandler valueHandler = DBUtils.findValueHandler(session, attr);
AttrMapping mapping = new AttrMapping(
attr,
valueHandler,
ArrayUtils.indexOf(attributes, attr)
);
mappings[i] = mapping;
}
} catch (Exception e) {
throw new DBCException("Can't instantiate CopyManager", e);
}
return this;
// new CopyManager((BaseConnection) conn)
// .copyIn(
// "COPY table1 FROM STDIN (FORMAT csv, HEADER)",
// new BufferedReader(new FileReader("data.csv"))
// ); throw new DBCException("Not implemented");
}
@Override
public void addRow(@NotNull DBCSession session, @NotNull Object[] attributeValues) throws DBCException {
StringBuilder line = new StringBuilder();
for (int i = 0; i < mappings.length; i++) {
if (i > 0) {
line.append(",");
}
AttrMapping attr = mappings[i];
if (attr.srcPos >= 0) {
String strValue = attr.valueHandler.getValueDisplayString(
attr.tableAttr, attributeValues[attr.srcPos], DBDDisplayFormat.NATIVE);
line.append(strValue);
}
}
line.append("\n");
try {
csvWriter.write(line.toString());
} catch (IOException e) {
throw new DBCException("Error writing CSV line", e);
}
}
@Override
public void flushRows(@NotNull DBCSession session) throws DBCException {
try {
csvWriter.flush();
} catch (IOException e) {
throw new DBCException("Error saving CSV data", e);
}
}
@Override
public void flushRows(@NotNull DBCSession session) {
public void finishBulkLoad(@NotNull DBCSession session) throws DBCException {
try {
csvWriter.flush();
csvWriter.close();
} catch (IOException e) {
log.debug(e);
}
csvWriter = null;
String queryText = "COPY " + table.getFullyQualifiedName(DBPEvaluationContext.DML) +
" FROM STDIN (FORMAT csv)";
try {
Reader csvReader = new FileReader(csvFile, StandardCharsets.UTF_8);
Object rowCount = copyInMethod.invoke(copyManager, queryText, csvReader, copyBufferSize);
// Commit changes
DBCTransactionManager txnManager = DBUtils.getTransactionManager(session.getExecutionContext());
if (txnManager != null && !txnManager.isAutoCommit()) {
txnManager.commit(session);
}
log.debug("CSV has been imported (" + rowCount + ")");
} catch (Throwable e) {
if (e instanceof InvocationTargetException) {
e = ((InvocationTargetException) e).getTargetException();
}
throw new DBCException("Error copying dataset on remote server", e);
}
}
@Override
public void close() {
if (csvFile != null && csvFile.exists()) {
if (!csvFile.delete()) {
log.debug("Error deleting CSV file " + csvFile.getAbsolutePath());
csvFile.deleteOnExit();
}
}
}
}
......@@ -32,7 +32,9 @@ public interface DBSDataBulkLoader {
interface BulkLoadManager extends AutoCloseable {
void addRow(@NotNull DBCSession session, @NotNull Object[] attributeValues) throws DBCException;
void flushRows(@NotNull DBCSession session);
void flushRows(@NotNull DBCSession session) throws DBCException;
void finishBulkLoad(@NotNull DBCSession session) throws DBCException;
void close();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册