提交 da6c0602 编写于 作者: S serge@jkiss.org

#5934 References panel - avoid too many data read requests

上级 8556f26a
......@@ -18,6 +18,8 @@ package org.jkiss.dbeaver.ui.controls.resultset;
import org.eclipse.core.runtime.IAdaptable;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.IJobChangeEvent;
import org.eclipse.core.runtime.jobs.JobChangeAdapter;
import org.eclipse.jface.action.*;
......@@ -66,6 +68,7 @@ import org.jkiss.dbeaver.model.impl.AbstractExecutionSource;
import org.jkiss.dbeaver.model.impl.local.StatResultSet;
import org.jkiss.dbeaver.model.messages.ModelMessages;
import org.jkiss.dbeaver.model.preferences.DBPPreferenceStore;
import org.jkiss.dbeaver.model.runtime.AbstractJob;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRRunnableWithProgress;
import org.jkiss.dbeaver.model.runtime.VoidProgressMonitor;
......@@ -78,7 +81,6 @@ import org.jkiss.dbeaver.model.struct.*;
import org.jkiss.dbeaver.model.virtual.*;
import org.jkiss.dbeaver.runtime.DBWorkbench;
import org.jkiss.dbeaver.runtime.DBeaverNotifications;
import org.jkiss.dbeaver.runtime.jobs.DataSourceJob;
import org.jkiss.dbeaver.tools.transfer.registry.DataTransferNodeDescriptor;
import org.jkiss.dbeaver.tools.transfer.registry.DataTransferProcessorDescriptor;
import org.jkiss.dbeaver.tools.transfer.registry.DataTransferRegistry;
......@@ -112,8 +114,9 @@ import org.jkiss.utils.CommonUtils;
import java.lang.reflect.InvocationTargetException;
import java.text.DecimalFormat;
import java.util.*;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ResultSetViewer
......@@ -195,7 +198,8 @@ public class ResultSetViewer extends Viewer
private final List<IResultSetListener> listeners = new ArrayList<>();
private volatile ResultSetJobDataRead dataPumpJob;
private final List<ResultSetJobDataRead> dataPumpJobQueue = new ArrayList<>();
private final AtomicBoolean dataPumpRunning = new AtomicBoolean();
private final ResultSetModel model = new ResultSetModel();
private HistoryStateItem curState = null;
......@@ -2036,12 +2040,24 @@ public class ResultSetViewer extends Viewer
}
public boolean isRefreshInProgress() {
return dataPumpJob != null;
synchronized (dataPumpJobQueue) {
return !dataPumpJobQueue.isEmpty();
}
}
public DataSourceJob getDataReadJob() {
return dataPumpJob;
public void cancelJobs() {
List<ResultSetJobDataRead> dpjCopy;
synchronized (dataPumpJobQueue) {
dpjCopy = new ArrayList<>(this.dataPumpJobQueue);
this.dataPumpJobQueue.clear();
}
for (ResultSetJobDataRead dpj : dpjCopy) {
if (dpj.isActiveTask()) {
dpj.cancel();
}
}
}
///////////////////////////////////////////////////////
// Context menu & filters
......@@ -2985,12 +3001,9 @@ public class ResultSetViewer extends Viewer
// Pump data
DBSDataContainer dataContainer = getDataContainer();
if (dataContainer == null) {
return;
}
DBDDataFilter dataFilter = restoreDataFilter(dataContainer);
if (container.isReadyToRun() && dataContainer != null && dataPumpJob == null) {
if (dataContainer != null) {
DBDDataFilter dataFilter = restoreDataFilter(dataContainer);
int segmentSize = getSegmentMaxRows();
Runnable finalizer = () -> {
if (activePresentation.getControl() != null && !activePresentation.getControl().isDisposed()) {
......@@ -3001,13 +3014,7 @@ public class ResultSetViewer extends Viewer
dataReceiver.setNextSegmentRead(false);
runDataPump(dataContainer, dataFilter, 0, segmentSize, -1, true, false, finalizer);
} else {
DBWorkbench.getPlatformUI().showError(
"Error executing query",
dataContainer == null ?
"Viewer detached from data source" :
dataPumpJob == null ?
"Can't refresh after reconnect. Re-execute query." :
"Previous query is still running");
DBWorkbench.getPlatformUI().showError("Error executing query", "Viewer detached from data source");
}
}
......@@ -3059,7 +3066,7 @@ public class ResultSetViewer extends Viewer
}
DBSDataContainer dataContainer = getDataContainer();
if (container.isReadyToRun() && dataContainer != null && dataPumpJob == null) {
if (dataContainer != null) {
int segmentSize = getSegmentMaxRows();
if (curRow != null && curRow.getVisualNumber() >= segmentSize && segmentSize > 0) {
segmentSize = (curRow.getVisualNumber() / segmentSize + 1) * segmentSize;
......@@ -3077,7 +3084,7 @@ public class ResultSetViewer extends Viewer
return;
}
DBSDataContainer dataContainer = getDataContainer();
if (dataContainer != null && !model.isUpdateInProgress() && dataPumpJob == null) {
if (dataContainer != null && !model.isUpdateInProgress()) {
dataReceiver.setHasMoreData(false);
dataReceiver.setNextSegmentRead(true);
......@@ -3109,7 +3116,7 @@ public class ResultSetViewer extends Viewer
}
DBSDataContainer dataContainer = getDataContainer();
if (dataContainer != null && !model.isUpdateInProgress() && dataPumpJob == null) {
if (dataContainer != null && !model.isUpdateInProgress()) {
dataReceiver.setHasMoreData(false);
dataReceiver.setNextSegmentRead(true);
......@@ -3189,13 +3196,6 @@ public class ResultSetViewer extends Viewer
final boolean scroll,
@Nullable final Runnable finalizer)
{
if (dataPumpJob != null) {
UIUtils.waitInUI(() -> dataPumpJob != null, 2000);
if (dataPumpJob != null) {
UIUtils.showMessageBox(viewerPanel.getShell(), "Data read", "Data read is in progress - can't run another", SWT.ICON_WARNING);
}
return false;
}
if (viewerPanel.isDisposed()) {
return false;
}
......@@ -3215,120 +3215,170 @@ public class ResultSetViewer extends Viewer
progressControl = (Composite) activePresentation.getControl();
}
synchronized (this) {
final Object presentationState = savePresentationState();
dataReceiver.setFocusRow(focusRow);
// Set explicit target container
dataReceiver.setTargetDataContainer(dataContainer);
dataPumpJob = new ResultSetJobDataRead(
dataContainer,
useDataFilter,
this,
executionContext,
progressControl);
dataPumpJob.addJobChangeListener(new JobChangeAdapter() {
@Override
public void aboutToRun(IJobChangeEvent event) {
model.setUpdateInProgress(true);
model.setStatistics(null);
if (filtersPanel != null) {
UIUtils.syncExec(() -> filtersPanel.enableFilters(false));
}
final Object presentationState = savePresentationState();
ResultSetJobDataRead dataPumpJob = new ResultSetJobDataRead(
dataContainer,
useDataFilter,
this,
executionContext,
progressControl);
dataPumpJob.addJobChangeListener(new JobChangeAdapter() {
@Override
public void aboutToRun(IJobChangeEvent event) {
dataPumpRunning.set(true);
dataReceiver.setFocusRow(focusRow);
// Set explicit target container
dataReceiver.setTargetDataContainer(dataContainer);
model.setUpdateInProgress(true);
model.setStatistics(null);
if (filtersPanel != null) {
UIUtils.syncExec(() -> filtersPanel.enableFilters(false));
}
}
@Override
public void done(IJobChangeEvent event) {
final ResultSetJobDataRead job = (ResultSetJobDataRead) event.getJob();
final Throwable error = job.getError();
if (job.getStatistics() != null) {
model.setStatistics(job.getStatistics());
}
final Control control = getControl();
if (control.isDisposed()) {
return;
}
UIUtils.syncExec(() -> {
try {
final Control control1 = getControl();
if (control1.isDisposed()) {
return;
@Override
public void done(IJobChangeEvent event) {
dataPumpRunning.set(false);
final ResultSetJobDataRead job = (ResultSetJobDataRead) event.getJob();
final Throwable error = job.getError();
if (job.getStatistics() != null) {
model.setStatistics(job.getStatistics());
}
final Control control = getControl();
if (control.isDisposed()) {
return;
}
UIUtils.syncExec(() -> {
try {
final Control control1 = getControl();
if (control1.isDisposed()) {
return;
}
model.setUpdateInProgress(false);
final boolean metadataChanged = model.isMetadataChanged();
if (error != null) {
String errorMessage = error.getMessage();
setStatus(errorMessage, DBPMessageType.ERROR);
String sqlText;
if (error instanceof DBSQLException) {
sqlText = ((DBSQLException) error).getSqlQuery();
} else if (dataContainer instanceof SQLQueryContainer) {
sqlText = ((SQLQueryContainer) dataContainer).getQuery().getText();
} else {
sqlText = getActiveQueryText();
}
model.setUpdateInProgress(false);
final boolean metadataChanged = model.isMetadataChanged();
if (error != null) {
String errorMessage = error.getMessage();
setStatus(errorMessage, DBPMessageType.ERROR);
String sqlText;
if (error instanceof DBSQLException) {
sqlText = ((DBSQLException) error).getSqlQuery();
} else if (dataContainer instanceof SQLQueryContainer) {
sqlText = ((SQLQueryContainer) dataContainer).getQuery().getText();
} else {
sqlText = getActiveQueryText();
}
if (getPreferenceStore().getBoolean(ResultSetPreferences.RESULT_SET_SHOW_ERRORS_IN_DIALOG)) {
DBWorkbench.getPlatformUI().showError("Error executing query", "Query execution failed", error);
} else {
showErrorPresentation(sqlText, CommonUtils.isEmpty(errorMessage) ? "Error executing query" : errorMessage, error);
log.error("Error executing query", error);
}
if (getPreferenceStore().getBoolean(ResultSetPreferences.RESULT_SET_SHOW_ERRORS_IN_DIALOG)) {
DBWorkbench.getPlatformUI().showError("Error executing query", "Query execution failed", error);
} else {
if (!metadataChanged && focusRow >= 0 && focusRow < model.getRowCount() && model.getVisibleAttributeCount() > 0) {
// Seems to be refresh
// Restore original position
restorePresentationState(presentationState);
}
showErrorPresentation(sqlText, CommonUtils.isEmpty(errorMessage) ? "Error executing query" : errorMessage, error);
log.error("Error executing query", error);
}
if (metadataChanged) {
activePresentation.updateValueView();
} else {
if (!metadataChanged && focusRow >= 0 && focusRow < model.getRowCount() && model.getVisibleAttributeCount() > 0) {
// Seems to be refresh
// Restore original position
restorePresentationState(presentationState);
}
updatePanelsContent(false);
}
if (metadataChanged) {
activePresentation.updateValueView();
}
updatePanelsContent(false);
if (!scroll) {
// Add new history item
if (saveHistory && error == null) {
setNewState(dataContainer, dataFilter);
}
if (!scroll) {
// Add new history item
if (saveHistory && error == null) {
setNewState(dataContainer, dataFilter);
}
if (dataFilter != null) {
model.updateDataFilter(dataFilter, true);
// New data filter may have different columns visibility
redrawData(true, false);
}
if (dataFilter != null) {
model.updateDataFilter(dataFilter, true);
// New data filter may have different columns visibility
redrawData(true, false);
}
if (job.getStatistics() == null || !job.getStatistics().isEmpty()) {
if (error == null) {
// Update status (update execution statistics)
updateStatusMessage();
}
updateFiltersText(true);
updateToolbar();
fireResultSetLoad();
}
if (job.getStatistics() == null || !job.getStatistics().isEmpty()) {
if (error == null) {
// Update status (update execution statistics)
updateStatusMessage();
}
// auto-refresh
autoRefreshControl.scheduleAutoRefresh(error != null);
} finally {
if (finalizer != null) {
try {
finalizer.run();
} catch (Throwable e) {
log.error(e);
}
updateFiltersText(true);
updateToolbar();
fireResultSetLoad();
}
// auto-refresh
autoRefreshControl.scheduleAutoRefresh(error != null);
} finally {
if (finalizer != null) {
try {
finalizer.run();
} catch (Throwable e) {
log.error(e);
}
}
finishDataPump(dataPumpJob);
}
});
}
});
dataPumpJob.setOffset(offset);
dataPumpJob.setMaxRows(maxRows);
dataPumpJob = null;
queueDataPump(dataPumpJob);
return true;
}
/**
* Adds new data read job in queue.
* In some cases there may be many frequent data read requests (e.g. when user works
* with references panel). We need to execute only current one and the last one. All
* intrmediate data read requests must be ignored.
* @param dataPumpJob
*/
private void queueDataPump(ResultSetJobDataRead dataPumpJob) {
synchronized (dataPumpJobQueue) {
if (dataPumpJobQueue.size() > 1) {
// Dequeue all jobs but last and new one
dataPumpJobQueue.removeAll(
dataPumpJobQueue.subList(1, dataPumpJobQueue.size() - 1));
}
dataPumpJobQueue.add(dataPumpJob);
}
new AbstractJob("Initiate data read") {
{
setUser(false);
}
@Override
protected IStatus run(DBRProgressMonitor monitor) {
if (dataPumpRunning.get()) {
// Retry later
schedule(50);
} else {
synchronized (dataPumpJobQueue) {
if (dataPumpRunning.get()) {
schedule(50);
} else if (!dataPumpJobQueue.isEmpty()) {
ResultSetJobDataRead curJob = dataPumpJobQueue.get(0);
curJob.schedule();
}
});
}
}
});
dataPumpJob.setOffset(offset);
dataPumpJob.setMaxRows(maxRows);
}
dataPumpJob.schedule();
return Status.OK_STATUS;
}
}.schedule();
}
return true;
private void finishDataPump(ResultSetJobDataRead dataPumpJob) {
synchronized (dataPumpJobQueue) {
dataPumpJobQueue.remove(dataPumpJob);
}
}
public void clearData()
......
......@@ -2393,10 +2393,7 @@ public class SQLEditor extends SQLEditorBase implements
public void cancelJob() {
for (QueryResultsContainer rc : resultContainers) {
DataSourceJob dataReadJob = rc.viewer.getDataReadJob();
if (dataReadJob != null && !dataReadJob.isFinished()) {
dataReadJob.cancel();
}
rc.viewer.cancelJobs();
}
final SQLQueryJob job = curJob;
if (job != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册