提交 683bca4f 编写于 作者: S serge-rider

#2372 Task view: progress monitoring fixes. Data transfer from custom SQL


Former-commit-id: 15db4df8
上级 49d56301
......@@ -19,6 +19,7 @@ Require-Bundle: org.eclipse.core.runtime,
org.eclipse.core.expressions,
org.eclipse.core.resources,
org.jkiss.dbeaver.model,
org.jkiss.dbeaver.model.sql,
net.sf.opencsv
Bundle-ClassPath: .
Automatic-Module-Name: org.jkiss.dbeaver.data.transfer
......@@ -63,6 +63,7 @@ public class DataTransferJob implements DBRRunnableWithProgress {
@Override
public void run(DBRProgressMonitor monitor) throws InvocationTargetException, InterruptedException {
monitor.beginTask("Perform data transfer", 1);
hasErrors = false;
long startTime = System.currentTimeMillis();
for (; ;) {
......@@ -82,6 +83,7 @@ public class DataTransferJob implements DBRRunnableWithProgress {
throw new InvocationTargetException(e);
}
}
monitor.done();
listener.subTaskFinished(null);
elapsedTime = System.currentTimeMillis() - startTime;
}
......
......@@ -27,10 +27,13 @@ import org.jkiss.dbeaver.model.app.DBPProject;
import org.jkiss.dbeaver.model.data.DBDDataFilter;
import org.jkiss.dbeaver.model.exec.*;
import org.jkiss.dbeaver.model.impl.AbstractExecutionSource;
import org.jkiss.dbeaver.model.impl.DataSourceContextProvider;
import org.jkiss.dbeaver.model.meta.DBSerializable;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRRunnableContext;
import org.jkiss.dbeaver.model.sql.SQLQuery;
import org.jkiss.dbeaver.model.sql.SQLQueryContainer;
import org.jkiss.dbeaver.model.sql.data.SQLQueryDataContainer;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.model.struct.DBSEntity;
import org.jkiss.dbeaver.model.task.DBTTask;
......@@ -284,14 +287,18 @@ public class DatabaseTransferProducer implements IDataTransferProducer<DatabaseP
}
case "query": {
String dsId = CommonUtils.toString(state.get("dataSource"));
String query = CommonUtils.toString(state.get("query"));
String queryText = CommonUtils.toString(state.get("query"));
DBPDataSourceContainer ds = project.getDataSourceRegistry().getDataSource(dsId);
if (ds == null) {
log.debug("Can't find datasource "+ dsId);
return;
}
//producer.dataContainer = new SQLQueryContainer()
throw new DBException("SQL data containers not supported yet");
if (!ds.isConnected()) {
ds.connect(monitor, true, true);
}
SQLQuery query = new SQLQuery(ds.getDataSource(), queryText);
producer.dataContainer = new SQLQueryDataContainer(new DataSourceContextProvider(ds), query, log);
//throw new DBException("SQL data containers not supported yet");
}
default:
log.warn("Unsupported selector type: " + selType);
......
......@@ -54,15 +54,19 @@ public class DTTaskHandlerTransfer implements DBTTaskHandler {
List<DataTransferPipe> dataPipes = settings.getDataPipes();
try {
runnableContext.run(false, false, monitor -> {
runnableContext.run(true, false, monitor -> {
monitor.beginTask("Initialize pipes", dataPipes.size());
try {
for (int i = 0; i < dataPipes.size(); i++) {
DataTransferPipe pipe = dataPipes.get(i);
pipe.initPipe(settings, i, dataPipes.size());
pipe.getConsumer().startTransfer(monitor);
monitor.worked(1);
}
} catch (DBException e) {
throw new InvocationTargetException(e);
} finally {
monitor.done();
}
});
} catch (InvocationTargetException e) {
......
......@@ -10,6 +10,7 @@ Bundle-ActivationPolicy: lazy
Bundle-Activator: org.jkiss.dbeaver.model.sql.internal.SQLModelActivator
Export-Package: org.jkiss.dbeaver.model.sql,
org.jkiss.dbeaver.model.sql.completion,
org.jkiss.dbeaver.model.sql.data,
org.jkiss.dbeaver.model.sql.format,
org.jkiss.dbeaver.model.sql.format.external,
org.jkiss.dbeaver.model.sql.format.tokenized,
......
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2019 Serge Rider (serge@jkiss.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jkiss.dbeaver.model.sql.data;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.*;
import org.jkiss.dbeaver.model.data.DBDDataFilter;
import org.jkiss.dbeaver.model.data.DBDDataReceiver;
import org.jkiss.dbeaver.model.exec.*;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.sql.*;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.model.struct.DBSObject;
/**
* Data container for single SQL query.
* Doesn't support multiple resulsets.
*/
public class SQLQueryDataContainer implements DBSDataContainer, SQLQueryContainer, DBPContextProvider {
private DBPContextProvider contextProvider;
private SQLQuery query;
private Log log;
public SQLQueryDataContainer(DBPContextProvider contextProvider, SQLQuery query, Log log) {
this.contextProvider = contextProvider;
this.query = query;
this.log = log;
}
@Override
public DBCExecutionContext getExecutionContext() {
return contextProvider.getExecutionContext();
}
@Override
public int getSupportedFeatures() {
return DATA_SELECT;
}
@NotNull
@Override
public DBCStatistics readData(@NotNull DBCExecutionSource source, @NotNull DBCSession session, @NotNull DBDDataReceiver dataReceiver, DBDDataFilter dataFilter, long firstRow, long maxRows, long flags, int fetchSize) throws DBCException
{
DBCStatistics statistics = new DBCStatistics();
// Modify query (filters + parameters)
DBPDataSource dataSource = session.getDataSource();
SQLQuery sqlQuery = query;
String queryText = sqlQuery.getText();//.trim();
if (dataFilter != null && dataFilter.hasFilters() && dataSource instanceof SQLDataSource) {
String filteredQueryText = ((SQLDataSource) dataSource).getSQLDialect().addFiltersToQuery(
dataSource, queryText, dataFilter);
sqlQuery = new SQLQuery(dataSource, filteredQueryText, sqlQuery);
} else {
sqlQuery = new SQLQuery(dataSource, queryText, sqlQuery);
}
final SQLQueryResult curResult = new SQLQueryResult(sqlQuery);
if (firstRow > 0) {
curResult.setRowOffset(firstRow);
}
statistics.setQueryText(sqlQuery.getText());
long startTime = System.currentTimeMillis();
try (final DBCStatement dbcStatement = DBUtils.makeStatement(
source,
session,
DBCStatementType.SCRIPT,
sqlQuery,
firstRow,
maxRows))
{
DBExecUtils.setStatementFetchSize(dbcStatement, firstRow, maxRows, fetchSize);
// Execute statement
session.getProgressMonitor().subTask("Execute query");
boolean hasResultSet = dbcStatement.executeStatement();
statistics.addExecuteTime(System.currentTimeMillis() - startTime);
statistics.addStatementsCount();
curResult.setHasResultSet(hasResultSet);
if (hasResultSet) {
DBCResultSet resultSet = dbcStatement.openResultSet();
if (resultSet != null) {
SQLQueryResult.ExecuteResult executeResult = curResult.addExecuteResult(true);
DBRProgressMonitor monitor = session.getProgressMonitor();
monitor.subTask("Fetch result set");
DBFetchProgress fetchProgress = new DBFetchProgress(session.getProgressMonitor());
dataReceiver.fetchStart(session, resultSet, firstRow, maxRows);
try {
long fetchStartTime = System.currentTimeMillis();
// Fetch all rows
while (!fetchProgress.isMaxRowsFetched(maxRows) && !fetchProgress.isCanceled() && resultSet.nextRow()) {
dataReceiver.fetchRow(session, resultSet);
fetchProgress.monitorRowFetch();
}
statistics.addFetchTime(System.currentTimeMillis() - fetchStartTime);
}
finally {
try {
resultSet.close();
} catch (Throwable e) {
log.error("Error while closing resultset", e);
}
try {
dataReceiver.fetchEnd(session, resultSet);
} catch (Throwable e) {
log.error("Error while handling end of result set fetch", e);
}
dataReceiver.close();
}
if (executeResult != null) {
executeResult.setRowCount(fetchProgress.getRowCount());
}
statistics.setRowsFetched(fetchProgress.getRowCount());
monitor.subTask(fetchProgress.getRowCount() + " rows fetched");
}
} else {
log.warn("No results returned by query execution");
}
try {
curResult.addWarnings(dbcStatement.getStatementWarnings());
} catch (Throwable e) {
log.warn("Can't read execution warnings", e);
}
}
return statistics;
}
@Override
public long countData(@NotNull DBCExecutionSource source, @NotNull DBCSession session, DBDDataFilter dataFilter, long flags)
throws DBCException
{
return -1;
}
@Nullable
@Override
public String getDescription()
{
return "SQL Query";
}
@Nullable
@Override
public DBSObject getParentObject()
{
return getDataSource();
}
@Nullable
@Override
public DBPDataSource getDataSource()
{
DBCExecutionContext executionContext = getExecutionContext();
return executionContext == null ? null : executionContext.getDataSource();
}
@Override
public boolean isPersisted() {
return false;
}
@NotNull
@Override
public String getName()
{
String name = query.getOriginalText();
if (name == null) {
name = "SQL";
}
return name;
}
@Nullable
@Override
public DBPDataSourceContainer getDataSourceContainer() {
DBPDataSource dataSource = getDataSource();
return dataSource == null ? null : dataSource.getContainer();
}
@Override
public String toString() {
return query.getOriginalText();
}
@Override
public SQLScriptElement getQuery() {
return query;
}
}
......@@ -16,32 +16,9 @@
*/
package org.jkiss.dbeaver.model;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.ModelPreferences;
import org.jkiss.dbeaver.model.data.DBDBinaryFormatter;
import org.jkiss.dbeaver.model.data.DBDDataFormatter;
import org.jkiss.dbeaver.model.data.DBDDisplayFormat;
import org.jkiss.dbeaver.model.exec.DBCStatistics;
import org.jkiss.dbeaver.model.messages.ModelMessages;
import org.jkiss.dbeaver.model.preferences.DBPPreferenceStore;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.sql.SQLDataSource;
import org.jkiss.dbeaver.model.struct.DBSDataType;
import org.jkiss.dbeaver.model.struct.DBSTypedObject;
import org.jkiss.dbeaver.model.struct.DBSTypedObjectEx;
import org.jkiss.dbeaver.utils.GeneralUtils;
import org.jkiss.utils.CommonUtils;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.ParseException;
import java.util.Collection;
import java.util.Locale;
/**
* DB value formatting utilities
......@@ -85,7 +62,7 @@ public final class DBFetchProgress {
}
public boolean isMaxRowsFetched(long maxRows) {
return rowCount >= maxRows;
return maxRows > 0 && rowCount >= maxRows;
}
public static boolean monitorFetchProgress(long fetchedRows) {
......
......@@ -80,12 +80,14 @@ public class TaskRunJob extends AbstractJob implements DBRRunnableContext {
try (OutputStream logStream = new FileOutputStream(logFile)) {
taskLog = new Log(getName(), logStream);
try {
monitor.beginTask("Run task '" + task.getName() + " (" + task.getType().getName() + ")", 1);
executeTask(new LoggingProgressMonitor(monitor));
} catch (Throwable e) {
taskError = e;
taskLog.error("Task fatal error", e);
throw e;
} finally {
monitor.done();
taskLog.flush();
}
} finally {
......
......@@ -76,6 +76,7 @@ import org.jkiss.dbeaver.model.runtime.DBRProgressListener;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRRunnableWithProgress;
import org.jkiss.dbeaver.model.sql.*;
import org.jkiss.dbeaver.model.sql.data.SQLQueryDataContainer;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.model.struct.DBSInstance;
import org.jkiss.dbeaver.model.struct.DBSObject;
......@@ -2505,7 +2506,7 @@ public class SQLEditor extends SQLEditorBase implements
}
} else {
SQLQuery query = (SQLQuery) element;
producers.add(new DatabaseTransferProducer(new SQLQueryDataContainer(job, resultsConsumer, query, i), null));
producers.add(new DatabaseTransferProducer(new SQLQueryDataContainer(SQLEditor.this, query, log), null));
}
}
......
package org.jkiss.dbeaver.ui.editors.sql;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.model.DBPContextProvider;
import org.jkiss.dbeaver.model.DBPDataSource;
import org.jkiss.dbeaver.model.DBPDataSourceContainer;
import org.jkiss.dbeaver.model.data.DBDDataFilter;
import org.jkiss.dbeaver.model.data.DBDDataReceiver;
import org.jkiss.dbeaver.model.exec.*;
import org.jkiss.dbeaver.model.sql.SQLQuery;
import org.jkiss.dbeaver.model.sql.SQLQueryContainer;
import org.jkiss.dbeaver.model.sql.SQLScriptElement;
import org.jkiss.dbeaver.model.struct.DBSDataContainer;
import org.jkiss.dbeaver.model.struct.DBSObject;
import org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob;
import org.jkiss.dbeaver.ui.editors.sql.internal.SQLEditorMessages;
class SQLQueryDataContainer implements DBSDataContainer, SQLQueryContainer, DBPContextProvider {
private SQLQueryJob queryJob;
private SQLQueryResultsConsumer resultsConsumer;
private SQLQuery query;
private int resultSetNumber;
SQLQueryDataContainer(SQLQueryJob queryJob, SQLQueryResultsConsumer resultsConsumer, SQLQuery query, int resultSetNumber)
{
this.queryJob = queryJob;
this.resultsConsumer = resultsConsumer;
this.query = query;
this.resultSetNumber = resultSetNumber;
}
@Override
public DBCExecutionContext getExecutionContext() {
return queryJob.getExecutionContext();
}
@Override
public int getSupportedFeatures() {
return DATA_SELECT;
}
@NotNull
@Override
public DBCStatistics readData(@NotNull DBCExecutionSource source, @NotNull DBCSession session, @NotNull DBDDataReceiver dataReceiver, DBDDataFilter dataFilter, long firstRow, long maxRows, long flags, int fetchSize) throws DBCException
{
if (query.getResultsMaxRows() >= 0) {
firstRow = query.getResultsOffset();
maxRows = query.getResultsMaxRows();
}
resultsConsumer.setDataReceiver(dataReceiver);
// Count number of results for this query. If > 1 then we will refresh them all at once
if (resultSetNumber > 0) {
queryJob.setFetchResultSetNumber(resultSetNumber);
} else {
queryJob.setFetchResultSetNumber(-1);
}
queryJob.setResultSetLimit(firstRow, maxRows);
queryJob.setReadFlags(flags);
queryJob.setDataFilter(dataFilter);
queryJob.extractData(session, this.query, resultSetNumber);
return queryJob.getStatistics();
}
@Override
public long countData(@NotNull DBCExecutionSource source, @NotNull DBCSession session, DBDDataFilter dataFilter, long flags)
throws DBCException
{
return -1;
}
@Nullable
@Override
public String getDescription()
{
return SQLEditorMessages.editors_sql_description;
}
@Nullable
@Override
public DBSObject getParentObject()
{
return getDataSource();
}
@Nullable
@Override
public DBPDataSource getDataSource()
{
return queryJob.getDataSourceContainer().getDataSource();
}
@Override
public boolean isPersisted() {
return false;
}
@NotNull
@Override
public String getName()
{
String name = query.getOriginalText();
if (name == null) {
name = "SQL";
}
return name;
}
@Nullable
@Override
public DBPDataSourceContainer getDataSourceContainer() {
return queryJob.getDataSourceContainer();
}
@Override
public String toString() {
return query.getOriginalText();
}
@Override
public SQLScriptElement getQuery() {
return query;
}
}
......@@ -335,6 +335,8 @@ public class DatabaseTasksView extends ViewPart implements DBTTaskListener {
DBTTaskRun lastRun = task.getLastRun();
if (lastRun != null && !lastRun.isRunSuccess()) {
cell.setBackground(colorError);
} else {
cell.setBackground(null);
}
update(cell, task);
}
......
......@@ -121,7 +121,7 @@ public class TaskProcessorUI implements DBRRunnableContext, DBTTaskExecutionList
@Override
public void run(boolean fork, boolean cancelable, DBRRunnableWithProgress runnable) throws InvocationTargetException, InterruptedException {
staticContext.run(false, true, runnable);
staticContext.run(true, true, runnable);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册