QMMCollectorImpl.java 11.7 KB
Newer Older
J
jurgen 已提交
1
/*
J
jurgen 已提交
2
 * DBeaver - Universal Database Manager
3
 * Copyright (C) 2010-2017 Serge Rider (serge@jkiss.org)
J
jurgen 已提交
4
 *
5 6 7
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
J
jurgen 已提交
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
J
jurgen 已提交
10
 *
11 12 13 14 15
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
J
jurgen 已提交
16
 */
J
jurgen 已提交
17
package org.jkiss.dbeaver.runtime.qm;
J
jurgen 已提交
18 19 20

import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
S
Serge Rider 已提交
21
import org.jkiss.code.NotNull;
J
jurgen 已提交
22
import org.jkiss.dbeaver.Log;
J
jurgen 已提交
23 24 25 26
import org.jkiss.dbeaver.model.exec.DBCExecutionContext;
import org.jkiss.dbeaver.model.exec.DBCResultSet;
import org.jkiss.dbeaver.model.exec.DBCSavepoint;
import org.jkiss.dbeaver.model.exec.DBCStatement;
J
jurgen 已提交
27 28 29 30
import org.jkiss.dbeaver.model.qm.QMMCollector;
import org.jkiss.dbeaver.model.qm.QMMetaEvent;
import org.jkiss.dbeaver.model.qm.QMMetaListener;
import org.jkiss.dbeaver.model.qm.meta.*;
J
jurgen 已提交
31
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
S
Serge Rider 已提交
32
import org.jkiss.dbeaver.model.runtime.AbstractJob;
J
jurgen 已提交
33 34 35 36 37 38

import java.util.*;

/**
 * Query manager execution handler implementation
 */
J
jurgen 已提交
39
public class QMMCollectorImpl extends DefaultExecutionHandler implements QMMCollector {
J
jurgen 已提交
40

41
    private static final Log log = Log.getLog(QMMCollectorImpl.class);
J
jurgen 已提交
42 43

    private static final long EVENT_DISPATCH_PERIOD = 250;
J
jurgen 已提交
44
    private static final int MAX_HISTORY_EVENTS = 1000;
J
jurgen 已提交
45

46
    // Session map
J
jurgen 已提交
47
    private Map<String, QMMSessionInfo> sessionMap = new HashMap<>();
48 49

    // External listeners
J
jurgen 已提交
50
    private List<QMMetaListener> listeners = new ArrayList<>();
51 52

    // Temporary event pool
J
jurgen 已提交
53
    private List<QMMetaEvent> eventPool = new ArrayList<>();
54
    // Sync object
J
jurgen 已提交
55
    private final Object historySync = new Object();
56
    // History (may be purged when limit reached)
J
jurgen 已提交
57
    private List<QMMetaEvent> pastEvents = new ArrayList<>();
J
jurgen 已提交
58 59
    private boolean running = true;

J
jurgen 已提交
60
    public QMMCollectorImpl()
J
jurgen 已提交
61 62 63 64 65 66 67
    {
        new EventDispatcher().schedule(EVENT_DISPATCH_PERIOD);
    }

    public synchronized void dispose()
    {
        if (!sessionMap.isEmpty()) {
J
jurgen 已提交
68
            List<QMMSessionInfo> openSessions = new ArrayList<>();
J
jurgen 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
            for (QMMSessionInfo session : sessionMap.values()) {
                if (!session.isClosed()) {
                    openSessions.add(session);
                }
            }
            if (!openSessions.isEmpty()) {
                log.warn("Some sessions are still open: " + openSessions);
            }
        }
        if (!listeners.isEmpty()) {
            log.warn("Some QM meta collector listeners are still open: " + listeners);
            listeners.clear();
        }
        running = false;
    }

    boolean isRunning()
    {
        return running;
    }

S
Serge Rider 已提交
90
    @NotNull
J
jurgen 已提交
91 92 93 94 95 96
    @Override
    public String getHandlerName()
    {
        return "Meta info collector";
    }

97 98 99 100
    private String makeContextId(DBCExecutionContext context) {
        return context.getDataSource().getContainer().getId() + ":" + context.getContextName();
    }

J
jurgen 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    public synchronized void addListener(QMMetaListener listener)
    {
        listeners.add(listener);
    }

    public synchronized void removeListener(QMMetaListener listener)
    {
        if (!listeners.remove(listener)) {
            log.warn("Listener '" + listener + "' is not registered in QM meta collector");
        }
    }

    private synchronized List<QMMetaListener> getListeners()
    {
        if (listeners.isEmpty()) {
            return Collections.emptyList();
        }
J
jurgen 已提交
118 119 120
        if (listeners.size() == 1) {
            return Collections.singletonList(listeners.get(0));
        }
J
jurgen 已提交
121
        return new ArrayList<>(listeners);
J
jurgen 已提交
122 123 124 125 126 127 128 129 130
    }

    private synchronized void fireMetaEvent(final QMMObject object, final QMMetaEvent.Action action)
    {
        eventPool.add(new QMMetaEvent(object, action));
    }

    private synchronized List<QMMetaEvent> obtainEvents()
    {
J
jurgen 已提交
131 132 133
        if (eventPool.isEmpty()) {
            return Collections.emptyList();
        }
J
jurgen 已提交
134
        List<QMMetaEvent> events = eventPool;
J
jurgen 已提交
135
        eventPool = new ArrayList<>();
J
jurgen 已提交
136 137 138
        return events;
    }

J
jurgen 已提交
139
    public QMMSessionInfo getSessionInfo(DBCExecutionContext context)
J
jurgen 已提交
140
    {
141
        String contextId = makeContextId(context);
142
        QMMSessionInfo sessionInfo = sessionMap.get(contextId);
J
jurgen 已提交
143
        if (sessionInfo == null) {
J
jurgen 已提交
144
            log.warn("Can't find sessionInfo meta information: " + contextId);
J
jurgen 已提交
145
        }
J
jurgen 已提交
146
        return sessionInfo;
J
jurgen 已提交
147 148 149 150
    }

    public List<QMMetaEvent> getPastEvents()
    {
J
jurgen 已提交
151
        synchronized (historySync) {
J
jurgen 已提交
152
            return new ArrayList<>(pastEvents);
J
jurgen 已提交
153 154 155 156
        }
    }

    @Override
S
Serge Rider 已提交
157
    public synchronized void handleContextOpen(@NotNull DBCExecutionContext context, boolean transactional)
J
jurgen 已提交
158
    {
159
        String contextId = makeContextId(context);
S
serge-rider 已提交
160 161 162
        if (sessionMap.containsKey(contextId)) {
            log.debug("Duplicate session '" + contextId + "' open");
        }
J
jurgen 已提交
163
        QMMSessionInfo session = new QMMSessionInfo(
J
jurgen 已提交
164
            context,
J
jurgen 已提交
165
            transactional,
166 167
            sessionMap.get(contextId));
        sessionMap.put(contextId, session);
J
jurgen 已提交
168 169

        if (session.getPrevious() != null && !session.getPrevious().isClosed()) {
J
jurgen 已提交
170 171 172
            // Is it really a problem? Maybe better to remove warning at all
            // Happens when we have open connection and perform another connection test
            log.debug("Previous '" + contextId + "' session wasn't closed");
J
jurgen 已提交
173 174 175 176 177
        }
        fireMetaEvent(session, QMMetaEvent.Action.BEGIN);
    }

    @Override
S
Serge Rider 已提交
178
    public synchronized void handleContextClose(@NotNull DBCExecutionContext context)
J
jurgen 已提交
179
    {
J
jurgen 已提交
180
        QMMSessionInfo session = getSessionInfo(context);
J
jurgen 已提交
181 182 183 184
        if (session != null) {
            session.close();
            fireMetaEvent(session, QMMetaEvent.Action.END);
        }
185 186 187
        // Remove closed context from map (otherwise we'll be out of memory eventually)
        String contextId = makeContextId(context);
        sessionMap.remove(contextId);
J
jurgen 已提交
188 189 190
    }

    @Override
S
Serge Rider 已提交
191
    public synchronized void handleTransactionAutocommit(@NotNull DBCExecutionContext context, boolean autoCommit)
J
jurgen 已提交
192
    {
193
        QMMSessionInfo sessionInfo = getSessionInfo(context);
J
jurgen 已提交
194 195
        if (sessionInfo != null) {
            QMMTransactionInfo oldTxn = sessionInfo.changeTransactional(!autoCommit);
J
jurgen 已提交
196 197 198
            if (oldTxn != null) {
                fireMetaEvent(oldTxn, QMMetaEvent.Action.END);
            }
J
jurgen 已提交
199
            fireMetaEvent(sessionInfo, QMMetaEvent.Action.UPDATE);
J
jurgen 已提交
200 201 202 203
        }
    }

    @Override
S
Serge Rider 已提交
204
    public synchronized void handleTransactionCommit(@NotNull DBCExecutionContext context)
J
jurgen 已提交
205
    {
206
        QMMSessionInfo sessionInfo = getSessionInfo(context);
J
jurgen 已提交
207 208
        if (sessionInfo != null) {
            QMMTransactionInfo oldTxn = sessionInfo.commit();
J
jurgen 已提交
209 210 211 212 213 214 215
            if (oldTxn != null) {
                fireMetaEvent(oldTxn, QMMetaEvent.Action.END);
            }
        }
    }

    @Override
S
Serge Rider 已提交
216
    public synchronized void handleTransactionRollback(@NotNull DBCExecutionContext context, DBCSavepoint savepoint)
J
jurgen 已提交
217
    {
218
        QMMSessionInfo sessionInfo = getSessionInfo(context);
J
jurgen 已提交
219 220
        if (sessionInfo != null) {
            QMMObject oldTxn = sessionInfo.rollback(savepoint);
J
jurgen 已提交
221 222 223 224 225 226 227
            if (oldTxn != null) {
                fireMetaEvent(oldTxn, QMMetaEvent.Action.END);
            }
        }
    }

    @Override
S
Serge Rider 已提交
228
    public synchronized void handleStatementOpen(@NotNull DBCStatement statement)
J
jurgen 已提交
229
    {
230
        QMMSessionInfo session = getSessionInfo(statement.getSession().getExecutionContext());
J
jurgen 已提交
231 232 233 234 235 236 237
        if (session != null) {
            QMMStatementInfo stat = session.openStatement(statement);
            fireMetaEvent(stat, QMMetaEvent.Action.BEGIN);
        }
    }

    @Override
S
Serge Rider 已提交
238
    public synchronized void handleStatementClose(@NotNull DBCStatement statement, long rows)
J
jurgen 已提交
239
    {
240
        QMMSessionInfo session = getSessionInfo(statement.getSession().getExecutionContext());
J
jurgen 已提交
241
        if (session != null) {
J
jurgen 已提交
242
            QMMStatementInfo stat = session.closeStatement(statement, rows);
J
jurgen 已提交
243
            if (stat == null) {
J
jurgen 已提交
244
                log.warn("Can't properly handle statement close");
J
jurgen 已提交
245 246 247 248 249 250 251
            } else {
                fireMetaEvent(stat, QMMetaEvent.Action.END);
            }
        }
    }

    @Override
S
Serge Rider 已提交
252
    public synchronized void handleStatementExecuteBegin(@NotNull DBCStatement statement)
J
jurgen 已提交
253
    {
254
        QMMSessionInfo session = getSessionInfo(statement.getSession().getExecutionContext());
J
jurgen 已提交
255 256 257 258 259 260 261 262 263
        if (session != null) {
            QMMStatementExecuteInfo exec = session.beginExecution(statement);
            if (exec != null) {
                fireMetaEvent(exec, QMMetaEvent.Action.BEGIN);
            }
        }
    }

    @Override
S
Serge Rider 已提交
264
    public synchronized void handleStatementExecuteEnd(@NotNull DBCStatement statement, long rows, Throwable error)
J
jurgen 已提交
265
    {
266
        QMMSessionInfo session = getSessionInfo(statement.getSession().getExecutionContext());
J
jurgen 已提交
267 268 269 270 271 272 273 274 275
        if (session != null) {
            QMMStatementExecuteInfo exec = session.endExecution(statement, rows, error);
            if (exec != null) {
                fireMetaEvent(exec, QMMetaEvent.Action.END);
            }
        }
    }

    @Override
S
Serge Rider 已提交
276
    public synchronized void handleResultSetOpen(@NotNull DBCResultSet resultSet)
J
jurgen 已提交
277
    {
278
        QMMSessionInfo session = getSessionInfo(resultSet.getSession().getExecutionContext());
J
jurgen 已提交
279 280 281 282 283 284 285 286 287
        if (session != null) {
            QMMStatementExecuteInfo exec = session.beginFetch(resultSet);
            if (exec != null) {
                fireMetaEvent(exec, QMMetaEvent.Action.UPDATE);
            }
        }
    }

    @Override
S
Serge Rider 已提交
288
    public synchronized void handleResultSetClose(@NotNull DBCResultSet resultSet, long rowCount)
J
jurgen 已提交
289
    {
290
        QMMSessionInfo session = getSessionInfo(resultSet.getSession().getExecutionContext());
J
jurgen 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
        if (session != null) {
            QMMStatementExecuteInfo exec = session.endFetch(resultSet, rowCount);
            if (exec != null) {
                fireMetaEvent(exec, QMMetaEvent.Action.UPDATE);
            }
        }
    }

    private class EventDispatcher extends AbstractJob {

        protected EventDispatcher()
        {
            super("QM meta events dispatcher");
            setUser(false);
            setSystem(true);
        }

        @Override
        protected IStatus run(DBRProgressMonitor monitor)
        {
J
jurgen 已提交
311
            final List<QMMetaEvent> events = obtainEvents();
J
jurgen 已提交
312 313 314
            final List<QMMetaListener> listeners = getListeners();
            if (!listeners.isEmpty() && !events.isEmpty()) {
                // Dispatch all events
J
jurgen 已提交
315 316 317 318 319
                for (QMMetaListener listener : listeners) {
                    try {
                        listener.metaInfoChanged(events);
                    } catch (Throwable e) {
                        log.error("Error notifying event listener", e);
J
jurgen 已提交
320
                    }
J
jurgen 已提交
321
                }
J
jurgen 已提交
322
            }
J
jurgen 已提交
323
            synchronized (historySync) {
J
jurgen 已提交
324
                pastEvents.addAll(events);
J
jurgen 已提交
325 326
                int size = pastEvents.size();
                if (size > MAX_HISTORY_EVENTS) {
J
jurgen 已提交
327
                    pastEvents = new ArrayList<>(pastEvents.subList(
J
jurgen 已提交
328 329 330
                        size - MAX_HISTORY_EVENTS,
                        size));
                }
J
jurgen 已提交
331 332 333 334 335 336 337 338 339
            }
            if (isRunning()) {
                this.schedule(EVENT_DISPATCH_PERIOD);
            }
            return Status.OK_STATUS;
        }
    }

}