object_event.c 31.8 KB
Newer Older
1 2 3
/*
 * object_event.c: object event queue processing helpers
 *
4
 * Copyright (C) 2010-2014 Red Hat, Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 * Copyright (C) 2008 VirtualIron
 * Copyright (C) 2013 SUSE LINUX Products GmbH, Nuernberg, Germany.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library.  If not, see
 * <http://www.gnu.org/licenses/>.
 *
 * Author: Ben Guthro
 */

#include <config.h>

#include "domain_event.h"
28
#include "network_event.h"
29 30 31 32 33 34
#include "object_event.h"
#include "object_event_private.h"
#include "virlog.h"
#include "datatypes.h"
#include "viralloc.h"
#include "virerror.h"
35
#include "virobject.h"
36 37 38 39
#include "virstring.h"

#define VIR_FROM_THIS VIR_FROM_NONE

40 41
VIR_LOG_INIT("conf.object_event");

42 43 44 45 46 47
struct _virObjectEventCallback {
    int callbackID;
    virClassPtr klass;
    int eventID;
    virConnectPtr conn;
    int remoteID;
48 49
    bool key_filter;
    char *key;
50 51 52 53 54 55 56 57 58 59 60
    virObjectEventCallbackFilter filter;
    void *filter_opaque;
    virConnectObjectEventGenericCallback cb;
    void *opaque;
    virFreeCallback freecb;
    bool deleted;
    bool legacy; /* true if end user does not know callbackID */
};
typedef struct _virObjectEventCallback virObjectEventCallback;
typedef virObjectEventCallback *virObjectEventCallbackPtr;

61 62 63 64 65 66
struct _virObjectEventCallbackList {
    unsigned int nextID;
    size_t count;
    virObjectEventCallbackPtr *callbacks;
};

67
struct _virObjectEventQueue {
68
    size_t count;
69 70
    virObjectEventPtr *events;
};
71 72 73 74
typedef struct _virObjectEventQueue virObjectEventQueue;
typedef virObjectEventQueue *virObjectEventQueuePtr;

struct _virObjectEventState {
75
    virObjectLockable parent;
76 77 78 79 80 81 82 83 84 85
    /* The list of domain event callbacks */
    virObjectEventCallbackListPtr callbacks;
    /* The queue of object events */
    virObjectEventQueuePtr queue;
    /* Timer for flushing events queue */
    int timer;
    /* Flag if we're in process of dispatching */
    bool isDispatching;
};

86
static virClassPtr virObjectEventClass;
87
static virClassPtr virObjectEventStateClass;
88 89

static void virObjectEventDispose(void *obj);
90
static void virObjectEventStateDispose(void *obj);
91

92 93
static int
virObjectEventOnceInit(void)
94
{
95 96 97 98 99 100 101
    if (!(virObjectEventStateClass =
          virClassNew(virClassForObjectLockable(),
                      "virObjectEventState",
                      sizeof(virObjectEventState),
                      virObjectEventStateDispose)))
        return -1;

102 103 104 105 106 107
    if (!(virObjectEventClass =
          virClassNew(virClassForObject(),
                      "virObjectEvent",
                      sizeof(virObjectEvent),
                      virObjectEventDispose)))
        return -1;
108

109 110 111 112 113
    return 0;
}

VIR_ONCE_GLOBAL_INIT(virObjectEvent)

114 115 116 117 118 119
/**
 * virClassForObjectEvent:
 *
 * Return the class object to be used as a parent when creating an
 * event subclass.
 */
120 121
virClassPtr
virClassForObjectEvent(void)
122 123 124 125 126 127 128
{
    if (virObjectEventInitialize() < 0)
        return NULL;
    return virObjectEventClass;
}


129 130
static void
virObjectEventDispose(void *obj)
131 132 133 134 135 136
{
    virObjectEventPtr event = obj;

    VIR_DEBUG("obj=%p", event);

    VIR_FREE(event->meta.name);
137
    VIR_FREE(event->meta.key);
138 139
}

140 141 142 143 144 145 146 147 148 149 150 151 152
/**
 * virObjectEventCallbackFree:
 * @list: event callback to free
 *
 * Free the memory in the domain event callback
 */
static void
virObjectEventCallbackFree(virObjectEventCallbackPtr cb)
{
    if (!cb)
        return;

    virObjectUnref(cb->conn);
153
    VIR_FREE(cb->key);
154 155 156
    VIR_FREE(cb);
}

157 158 159 160 161 162 163 164 165 166 167 168 169
/**
 * virObjectEventCallbackListFree:
 * @list: event callback list head
 *
 * Free the memory in the domain event callback list
 */
static void
virObjectEventCallbackListFree(virObjectEventCallbackListPtr list)
{
    size_t i;
    if (!list)
        return;

170
    for (i = 0; i < list->count; i++) {
171 172 173 174 175 176 177 178 179 180
        virFreeCallback freecb = list->callbacks[i]->freecb;
        if (freecb)
            (*freecb)(list->callbacks[i]->opaque);
        VIR_FREE(list->callbacks[i]);
    }
    VIR_FREE(list->callbacks);
    VIR_FREE(list);
}


181 182 183 184 185 186
/**
 * virObjectEventCallbackListCount:
 * @conn: pointer to the connection
 * @cbList: the list
 * @klass: the base event class
 * @eventID: the event ID
187
 * @key: optional key of per-object filtering
188
 * @serverFilter: true if server supports object filtering
189 190
 *
 * Internal function to count how many callbacks remain registered for
191
 * the given @eventID and @key; knowing this allows the client side
192 193 194 195 196
 * of the remote driver know when it must send an RPC to adjust the
 * callbacks on the server.  When @serverFilter is false, this function
 * returns a count that includes both global and per-object callbacks,
 * since the remote side will use a single global event to feed both.
 * When true, the count is limited to the callbacks with the same
197
 * @key, and where a remoteID has already been set on the callback
198 199 200 201
 * with virObjectEventStateSetRemote().  Note that this function
 * intentionally ignores the legacy field, since RPC calls use only a
 * single callback on the server to manage both legacy and modern
 * global domain lifecycle events.
202 203 204 205 206
 */
static int
virObjectEventCallbackListCount(virConnectPtr conn,
                                virObjectEventCallbackListPtr cbList,
                                virClassPtr klass,
207
                                int eventID,
208
                                const char *key,
209
                                bool serverFilter)
210 211 212 213 214 215 216
{
    size_t i;
    int ret = 0;

    for (i = 0; i < cbList->count; i++) {
        virObjectEventCallbackPtr cb = cbList->callbacks[i];

217 218
        if (cb->filter)
            continue;
219 220 221
        if (cb->klass == klass &&
            cb->eventID == eventID &&
            cb->conn == conn &&
222 223 224
            !cb->deleted &&
            (!serverFilter ||
             (cb->remoteID >= 0 &&
225 226
              ((key && cb->key_filter && STREQ(cb->key, key)) ||
               (!key && !cb->key_filter)))))
227 228 229 230 231
            ret++;
    }
    return ret;
}

232 233 234 235 236
/**
 * virObjectEventCallbackListRemoveID:
 * @conn: pointer to the connection
 * @cbList: the list
 * @callback: the callback to remove
237
 * @doFreeCb: Inhibit calling the freecb
238 239 240 241 242 243
 *
 * Internal function to remove a callback from a virObjectEventCallbackListPtr
 */
static int
virObjectEventCallbackListRemoveID(virConnectPtr conn,
                                   virObjectEventCallbackListPtr cbList,
244 245
                                   int callbackID,
                                   bool doFreeCb)
246 247 248
{
    size_t i;

249 250 251 252
    for (i = 0; i < cbList->count; i++) {
        virObjectEventCallbackPtr cb = cbList->callbacks[i];

        if (cb->callbackID == callbackID && cb->conn == conn) {
253 254
            int ret;

255 256 257
            ret = cb->filter ? 0 :
                (virObjectEventCallbackListCount(conn, cbList, cb->klass,
                                                 cb->eventID,
258
                                                 cb->key_filter ? cb->key : NULL,
259
                                                 cb->remoteID >= 0) - 1);
260

261 262 263 264
            /* @doFreeCb inhibits calling @freecb from error paths in
             * register functions to ensure the caller of a failed register
             * function won't end up with a double free error */
            if (doFreeCb && cb->freecb)
265
                (*cb->freecb)(cb->opaque);
266
            virObjectEventCallbackFree(cb);
267
            VIR_DELETE_ELEMENT(cbList->callbacks, i, cbList->count);
268
            return ret;
269 270 271
        }
    }

272 273 274
    virReportError(VIR_ERR_INVALID_ARG,
                   _("could not find event callback %d for deletion"),
                   callbackID);
275 276 277 278 279 280 281 282 283 284
    return -1;
}


static int
virObjectEventCallbackListMarkDeleteID(virConnectPtr conn,
                                       virObjectEventCallbackListPtr cbList,
                                       int callbackID)
{
    size_t i;
285

286
    for (i = 0; i < cbList->count; i++) {
287 288 289 290
        virObjectEventCallbackPtr cb = cbList->callbacks[i];

        if (cb->callbackID == callbackID && cb->conn == conn) {
            cb->deleted = true;
291 292 293
            return cb->filter ? 0 :
                virObjectEventCallbackListCount(conn, cbList, cb->klass,
                                                cb->eventID,
294
                                                cb->key_filter ? cb->key : NULL,
295
                                                cb->remoteID >= 0);
296 297 298
        }
    }

299 300 301
    virReportError(VIR_ERR_INVALID_ARG,
                   _("could not find event callback %d for deletion"),
                   callbackID);
302 303 304 305 306 307 308
    return -1;
}


static int
virObjectEventCallbackListPurgeMarked(virObjectEventCallbackListPtr cbList)
{
309
    size_t n;
310 311 312 313 314
    for (n = 0; n < cbList->count; n++) {
        if (cbList->callbacks[n]->deleted) {
            virFreeCallback freecb = cbList->callbacks[n]->freecb;
            if (freecb)
                (*freecb)(cbList->callbacks[n]->opaque);
315
            virObjectEventCallbackFree(cbList->callbacks[n]);
316

317
            VIR_DELETE_ELEMENT(cbList->callbacks, n, cbList->count);
318 319 320 321 322 323 324
            n--;
        }
    }
    return 0;
}


325 326 327 328
/**
 * virObjectEventCallbackLookup:
 * @conn: pointer to the connection
 * @cbList: the list
329
 * @key: the key of the object to filter on
330 331 332
 * @klass: the base event class
 * @eventID: the event ID
 * @callback: the callback to locate
333
 * @legacy: true if callback is tracked by function instead of callbackID
334
 * @remoteID: optionally return a known remoteID
335 336
 *
 * Internal function to determine if @callback already has a
337 338 339 340
 * callbackID in @cbList for the given @conn and other filters.  If
 * @remoteID is non-NULL, and another callback exists that can be
 * serviced by the same remote event, then set it to that remote ID.
 *
341 342 343 344 345
 * Return the id if found, or -1 with no error issued if not present.
 */
static int ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2)
virObjectEventCallbackLookup(virConnectPtr conn,
                             virObjectEventCallbackListPtr cbList,
346
                             const char *key,
347 348
                             virClassPtr klass,
                             int eventID,
349
                             virConnectObjectEventGenericCallback callback,
350 351
                             bool legacy,
                             int *remoteID)
352 353 354
{
    size_t i;

355 356 357
    if (remoteID)
        *remoteID = -1;

358 359 360 361 362
    for (i = 0; i < cbList->count; i++) {
        virObjectEventCallbackPtr cb = cbList->callbacks[i];

        if (cb->deleted)
            continue;
363
        if (cb->klass == klass &&
364 365
            cb->eventID == eventID &&
            cb->conn == conn &&
366 367
            ((key && cb->key_filter && STREQ(cb->key, key)) ||
             (!key && !cb->key_filter))) {
368 369
            if (remoteID)
                *remoteID = cb->remoteID;
370 371
            if (cb->legacy == legacy &&
                cb->cb == callback)
372
                return cb->callbackID;
373 374
        }
    }
375
    return -1;
376 377 378
}


379 380 381 382
/**
 * virObjectEventCallbackListAddID:
 * @conn: pointer to the connection
 * @cbList: the list
383
 * @key: the optional key of the object to filter on
384 385
 * @filter: optional last-ditch filter callback
 * @filter_opaque: opaque data to pass to @filter
386
 * @klass: the base event class
387 388
 * @eventID: the event ID
 * @callback: the callback to add
389 390
 * @opaque: opaque data to pass to @callback
 * @freecb: callback to free @opaque
391
 * @legacy: true if callback is tracked by function instead of callbackID
392
 * @callbackID: filled with callback ID
393
 * @serverFilter: true if server supports object filtering
394 395 396
 *
 * Internal function to add a callback from a virObjectEventCallbackListPtr
 */
397
static int
398 399
virObjectEventCallbackListAddID(virConnectPtr conn,
                                virObjectEventCallbackListPtr cbList,
400
                                const char *key,
401 402
                                virObjectEventCallbackFilter filter,
                                void *filter_opaque,
403
                                virClassPtr klass,
404 405 406 407
                                int eventID,
                                virConnectObjectEventGenericCallback callback,
                                void *opaque,
                                virFreeCallback freecb,
408
                                bool legacy,
409 410
                                int *callbackID,
                                bool serverFilter)
411
{
412
    virObjectEventCallbackPtr cb;
413
    int ret = -1;
414
    int remoteID = -1;
415

416
    VIR_DEBUG("conn=%p cblist=%p key=%p filter=%p filter_opaque=%p "
417 418
              "klass=%p eventID=%d callback=%p opaque=%p "
              "legacy=%d callbackID=%p serverFilter=%d",
419
              conn, cbList, key, filter, filter_opaque, klass, eventID,
420
              callback, opaque, legacy, callbackID, serverFilter);
421

422
    /* Check incoming */
423
    if (!cbList)
424 425
        return -1;

426 427 428
    /* If there is no additional filtering, then check if we already
     * have this callback on our list.  */
    if (!filter &&
429
        virObjectEventCallbackLookup(conn, cbList, key,
430
                                     klass, eventID, callback, legacy,
431
                                     serverFilter ? &remoteID : NULL) != -1) {
432
        virReportError(VIR_ERR_INVALID_ARG, "%s",
433 434
                       _("event callback already tracked"));
        return -1;
435
    }
436 437
    /* Allocate new cb */
    if (VIR_ALLOC(cb) < 0)
438
        goto cleanup;
439 440 441 442 443 444 445 446
    cb->conn = virObjectRef(conn);
    *callbackID = cb->callbackID = cbList->nextID++;
    cb->cb = callback;
    cb->klass = klass;
    cb->eventID = eventID;
    cb->opaque = opaque;
    cb->freecb = freecb;
    cb->remoteID = remoteID;
447

448 449 450
    if (key) {
        cb->key_filter = true;
        if (VIR_STRDUP(cb->key, key) < 0)
C
Cole Robinson 已提交
451
            goto cleanup;
452
    }
453 454 455
    cb->filter = filter;
    cb->filter_opaque = filter_opaque;
    cb->legacy = legacy;
456

457
    if (VIR_APPEND_ELEMENT(cbList->callbacks, cbList->count, cb) < 0)
458
        goto cleanup;
459

460 461 462 463 464 465
    /* When additional filtering is being done, every client callback
     * is matched to exactly one server callback.  */
    if (filter) {
        ret = 1;
    } else {
        ret = virObjectEventCallbackListCount(conn, cbList, klass, eventID,
466
                                              key, serverFilter);
467 468 469
        if (serverFilter && remoteID < 0)
            ret++;
    }
470

471
 cleanup:
472
    virObjectEventCallbackFree(cb);
473
    return ret;
474 475 476 477 478 479 480 481 482
}


/**
 * virObjectEventQueueClear:
 * @queue: pointer to the queue
 *
 * Removes all elements from the queue
 */
483
static void
484 485 486 487 488 489
virObjectEventQueueClear(virObjectEventQueuePtr queue)
{
    size_t i;
    if (!queue)
        return;

490
    for (i = 0; i < queue->count; i++)
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
        virObjectUnref(queue->events[i]);
    VIR_FREE(queue->events);
    queue->count = 0;
}

/**
 * virObjectEventQueueFree:
 * @queue: pointer to the queue
 *
 * Free the memory in the queue. We process this like a list here
 */
static void
virObjectEventQueueFree(virObjectEventQueuePtr queue)
{
    if (!queue)
        return;

    virObjectEventQueueClear(queue);
    VIR_FREE(queue);
}

static virObjectEventQueuePtr
virObjectEventQueueNew(void)
{
    virObjectEventQueuePtr ret;

    ignore_value(VIR_ALLOC(ret));
    return ret;
}

521 522

/**
523
 * virObjectEventStateDispose:
524 525 526 527
 * @list: virObjectEventStatePtr to free
 *
 * Free a virObjectEventStatePtr and its members, and unregister the timer.
 */
528 529
static void
virObjectEventStateDispose(void *obj)
530
{
531 532 533
    virObjectEventStatePtr state = obj;

    VIR_DEBUG("obj=%p", state);
534 535 536 537 538 539 540 541 542 543 544

    virObjectEventCallbackListFree(state->callbacks);
    virObjectEventQueueFree(state->queue);

    if (state->timer != -1)
        virEventRemoveTimeout(state->timer);
}


static void virObjectEventStateFlush(virObjectEventStatePtr state);

545 546 547 548 549 550 551 552 553 554

/**
 * virObjectEventTimer:
 * @timer: id of the event loop timer
 * @opaque: the event state object
 *
 * Register this function with the event state as its opaque data as
 * the callback of a periodic timer on the event loop, in order to
 * flush the callback queue.
 */
555
static void
556 557 558 559 560 561 562
virObjectEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
{
    virObjectEventStatePtr state = opaque;

    virObjectEventStateFlush(state);
}

563

564 565
/**
 * virObjectEventStateNew:
566 567
 *
 * Allocate a new event state object.
568 569 570 571 572 573
 */
virObjectEventStatePtr
virObjectEventStateNew(void)
{
    virObjectEventStatePtr state = NULL;

574 575
    if (virObjectEventInitialize() < 0)
        return NULL;
576

577 578
    if (!(state = virObjectLockableNew(virObjectEventStateClass)))
        return NULL;
579 580 581 582 583 584 585 586 587 588 589

    if (VIR_ALLOC(state->callbacks) < 0)
        goto error;

    if (!(state->queue = virObjectEventQueueNew()))
        goto error;

    state->timer = -1;

    return state;

590
 error:
591
    virObjectUnref(state);
592 593 594
    return NULL;
}

595 596 597 598 599 600 601 602 603

/**
 * virObjectEventNew:
 * @klass: subclass of event to be created
 * @dispatcher: callback for dispatching the particular subclass of event
 * @eventID: id of the event
 * @id: id of the object the event describes, or 0
 * @name: name of the object the event describes
 * @uuid: uuid of the object the event describes
604
 * @key: key for per-object filtering
605 606 607
 *
 * Create a new event, with the information common to all events.
 */
608 609
void *
virObjectEventNew(virClassPtr klass,
610
                  virObjectEventDispatchFunc dispatcher,
611 612 613
                  int eventID,
                  int id,
                  const char *name,
614 615
                  const unsigned char *uuid,
                  const char *key)
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
{
    virObjectEventPtr event;

    if (virObjectEventInitialize() < 0)
        return NULL;

    if (!virClassIsDerivedFrom(klass, virObjectEventClass)) {
        virReportInvalidArg(klass,
                            _("Class %s must derive from virObjectEvent"),
                            virClassName(klass));
        return NULL;
    }

    if (!(event = virObjectNew(klass)))
        return NULL;

632
    event->dispatch = dispatcher;
633
    event->eventID = eventID;
634
    event->remoteID = -1;
635

636 637 638
    if (VIR_STRDUP(event->meta.name, name) < 0 ||
        VIR_STRDUP(event->meta.key, key) < 0) {
        virObjectUnref(event);
639 640
        return NULL;
    }
641
    event->meta.id = id;
642 643
    if (uuid)
        memcpy(event->meta.uuid, uuid, VIR_UUID_BUFLEN);
644 645 646 647 648

    VIR_DEBUG("obj=%p", event);
    return event;
}

649

650 651 652 653 654 655 656 657 658 659 660 661 662
/**
 * virObjectEventQueuePush:
 * @evtQueue: the object event queue
 * @event: the event to add
 *
 * Internal function to push to the back of a virObjectEventQueue
 *
 * Returns: 0 on success, -1 on failure
 */
static int
virObjectEventQueuePush(virObjectEventQueuePtr evtQueue,
                        virObjectEventPtr event)
{
663
    if (!evtQueue)
664 665
        return -1;

666
    if (VIR_APPEND_ELEMENT(evtQueue->events, evtQueue->count, event) < 0)
667 668 669 670 671
        return -1;
    return 0;
}


E
Eric Blake 已提交
672
static bool
673 674
virObjectEventDispatchMatchCallback(virObjectEventPtr event,
                                    virObjectEventCallbackPtr cb)
675 676
{
    if (!cb)
E
Eric Blake 已提交
677
        return false;
678
    if (cb->deleted)
E
Eric Blake 已提交
679
        return false;
680
    if (!virObjectIsClass(event, cb->klass))
E
Eric Blake 已提交
681
        return false;
682
    if (cb->eventID != event->eventID)
E
Eric Blake 已提交
683
        return false;
684 685
    if (cb->remoteID != event->remoteID)
        return false;
686

687 688 689
    if (cb->filter && !(cb->filter)(cb->conn, event, cb->filter_opaque))
        return false;

690 691
    if (cb->key_filter)
        return STREQ(event->meta.key, cb->key);
E
Eric Blake 已提交
692
    return true;
693 694 695 696
}


static void
697 698 699
virObjectEventStateDispatchCallbacks(virObjectEventStatePtr state,
                                     virObjectEventPtr event,
                                     virObjectEventCallbackListPtr callbacks)
700 701 702 703 704
{
    size_t i;
    /* Cache this now, since we may be dropping the lock,
       and have more callbacks added. We're guaranteed not
       to have any removed */
705
    size_t cbCount = callbacks->count;
706 707

    for (i = 0; i < cbCount; i++) {
708 709 710
        virObjectEventCallbackPtr cb = callbacks->callbacks[i];

        if (!virObjectEventDispatchMatchCallback(event, cb))
711 712
            continue;

713
        /* Drop the lock whle dispatching, for sake of re-entrancy */
714
        virObjectUnlock(state);
715
        event->dispatch(cb->conn, event, cb->cb, cb->opaque);
716
        virObjectLock(state);
717 718 719 720 721
    }
}


static void
722 723 724
virObjectEventStateQueueDispatch(virObjectEventStatePtr state,
                                 virObjectEventQueuePtr queue,
                                 virObjectEventCallbackListPtr callbacks)
725 726 727 728
{
    size_t i;

    for (i = 0; i < queue->count; i++) {
729 730
        virObjectEventStateDispatchCallbacks(state, queue->events[i],
                                             callbacks);
731 732 733 734 735 736
        virObjectUnref(queue->events[i]);
    }
    VIR_FREE(queue->events);
    queue->count = 0;
}

737 738

/**
739
 * virObjectEventStateQueueRemote:
740 741
 * @state: the event state object
 * @event: event to add to the queue
742
 * @remoteID: limit dispatch to callbacks with the same remote id
743 744 745
 *
 * Adds @event to the queue of events to be dispatched at the next
 * safe moment.  The caller should no longer use @event after this
746 747 748
 * call.  If @remoteID is non-negative, the event will only be sent to
 * callbacks where virObjectEventStateSetRemote() registered a remote
 * id.
749
 */
750
void
751 752 753
virObjectEventStateQueueRemote(virObjectEventStatePtr state,
                               virObjectEventPtr event,
                               int remoteID)
754 755 756 757 758 759
{
    if (state->timer < 0) {
        virObjectUnref(event);
        return;
    }

760
    virObjectLock(state);
761

762
    event->remoteID = remoteID;
763 764 765 766 767 768 769
    if (virObjectEventQueuePush(state->queue, event) < 0) {
        VIR_DEBUG("Error adding event to queue");
        virObjectUnref(event);
    }

    if (state->queue->count == 1)
        virEventUpdateTimeout(state->timer, 0);
770
    virObjectUnlock(state);
771 772 773
}


774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
/**
 * virObjectEventStateQueue:
 * @state: the event state object
 * @event: event to add to the queue
 *
 * Adds @event to the queue of events to be dispatched at the next
 * safe moment.  The caller should no longer use @event after this
 * call.
 */
void
virObjectEventStateQueue(virObjectEventStatePtr state,
                         virObjectEventPtr event)
{
    virObjectEventStateQueueRemote(state, event, -1);
}


791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
static void
virObjectEventStateCleanupTimer(virObjectEventStatePtr state, bool clear_queue)
{
    /* There are still some callbacks, keep the timer. */
    if (state->callbacks->count)
        return;

    /* The timer is not registered, nothing to do. */
    if (state->timer == -1)
        return;

    virEventRemoveTimeout(state->timer);
    state->timer = -1;

    if (clear_queue)
        virObjectEventQueueClear(state->queue);
}


810 811 812 813 814
static void
virObjectEventStateFlush(virObjectEventStatePtr state)
{
    virObjectEventQueue tempQueue;

815 816 817
    /* We need to lock as well as ref due to the fact that we might
     * unref the state we're working on in this very function */
    virObjectRef(state);
818
    virObjectLock(state);
819 820 821 822 823 824 825 826
    state->isDispatching = true;

    /* Copy the queue, so we're reentrant safe when dispatchFunc drops the
     * driver lock */
    tempQueue.count = state->queue->count;
    tempQueue.events = state->queue->events;
    state->queue->count = 0;
    state->queue->events = NULL;
827 828
    if (state->timer != -1)
        virEventUpdateTimeout(state->timer, -1);
829

830 831 832
    virObjectEventStateQueueDispatch(state,
                                     &tempQueue,
                                     state->callbacks);
833 834 835 836

    /* Purge any deleted callbacks */
    virObjectEventCallbackListPurgeMarked(state->callbacks);

837 838 839 840
    /* If we purged all callbacks, we need to remove the timeout as
     * well like virObjectEventStateDeregisterID() would do. */
    virObjectEventStateCleanupTimer(state, true);

841
    state->isDispatching = false;
842
    virObjectUnlock(state);
843
    virObjectUnref(state);
844 845 846 847 848 849 850
}


/**
 * virObjectEventStateRegisterID:
 * @conn: connection to associate with callback
 * @state: domain event state
851
 * @key: key of the object for event filtering
852
 * @klass: the base event class
853
 * @eventID: ID of the event type to register for
854 855
 * @cb: function to invoke when event occurs
 * @opaque: data blob to pass to @callback
856
 * @freecb: callback to free @opaque
857
 * @legacy: true if callback is tracked by function instead of callbackID
858
 * @callbackID: filled with callback ID
859
 * @serverFilter: true if server supports object filtering
860
 *
861 862 863
 * Register the function @cb with connection @conn, from @state, for
 * events of type @eventID, and return the registration handle in
 * @callbackID.
864
 *
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
 * The return value is only important when registering client-side
 * mirroring of remote events (since the public API is documented to
 * return the callbackID rather than a count).  A return of 1 means
 * that this is the first use of this type of event, so a remote event
 * must be enabled; a return larger than 1 means that an existing
 * remote event can already feed this callback.  If @serverFilter is
 * false, the return count assumes that a single global remote feeds
 * both generic and per-object callbacks, and that the event queue
 * will be fed with virObjectEventStateQueue().  If it is true, then
 * the return count assumes that the remote side is capable of per-
 * object filtering; the user must call virObjectEventStateSetRemote()
 * to record the remote id, and queue events with
 * virObjectEventStateQueueRemote().
 *
 * Returns: the number of callbacks now registered, or -1 on error.
880 881 882 883
 */
int
virObjectEventStateRegisterID(virConnectPtr conn,
                              virObjectEventStatePtr state,
884
                              const char *key,
885 886
                              virObjectEventCallbackFilter filter,
                              void *filter_opaque,
887
                              virClassPtr klass,
888 889 890 891
                              int eventID,
                              virConnectObjectEventGenericCallback cb,
                              void *opaque,
                              virFreeCallback freecb,
892
                              bool legacy,
893 894
                              int *callbackID,
                              bool serverFilter)
895 896 897
{
    int ret = -1;

898
    virObjectLock(state);
899 900 901 902 903 904

    if ((state->callbacks->count == 0) &&
        (state->timer == -1) &&
        (state->timer = virEventAddTimeout(-1,
                                           virObjectEventTimer,
                                           state,
905
                                           virObjectFreeCallback)) < 0) {
906 907 908 909 910
        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                       _("could not initialize domain event timer"));
        goto cleanup;
    }

911 912 913 914
    /* event loop has one reference, but we need one more for the
     * timer's opaque argument */
    virObjectRef(state);

915
    ret = virObjectEventCallbackListAddID(conn, state->callbacks,
916
                                          key, filter, filter_opaque,
917
                                          klass, eventID,
918
                                          cb, opaque, freecb,
919
                                          legacy, callbackID, serverFilter);
920

921 922
    if (ret < 0)
        virObjectEventStateCleanupTimer(state, false);
923

924
 cleanup:
925
    virObjectUnlock(state);
926 927 928 929 930 931 932 933 934
    return ret;
}


/**
 * virObjectEventStateDeregisterID:
 * @conn: connection to associate with callback
 * @state: object event state
 * @callbackID: ID of the function to remove from event
935
 * @doFreeCb: Allow the calling of a freecb
936 937
 *
 * Unregister the function @callbackID with connection @conn,
938 939 940 941 942
 * from @state, for events. If @doFreeCb is false, then we
 * are being called from a remote call failure path for the
 * Event registration indicating a -1 return to the caller. The
 * caller wouldn't expect us to run their freecb function if it
 * exists, so we cannot do so.
943 944 945 946 947 948
 *
 * Returns: the number of callbacks still registered, or -1 on error
 */
int
virObjectEventStateDeregisterID(virConnectPtr conn,
                                virObjectEventStatePtr state,
949 950
                                int callbackID,
                                bool doFreeCb)
951 952 953
{
    int ret;

954
    virObjectLock(state);
955 956
    if (state->isDispatching)
        ret = virObjectEventCallbackListMarkDeleteID(conn,
957 958
                                                     state->callbacks,
                                                     callbackID);
959
    else
960 961
        ret = virObjectEventCallbackListRemoveID(conn, state->callbacks,
                                                 callbackID, doFreeCb);
962

963
    virObjectEventStateCleanupTimer(state, true);
964

965
    virObjectUnlock(state);
966 967 968
    return ret;
}

969 970 971 972 973 974 975
/**
 * virObjectEventStateCallbackID:
 * @conn: connection associated with callback
 * @state: object event state
 * @klass: the base event class
 * @eventID: the event ID
 * @callback: function registered as a callback
976
 * @remoteID: optional output, containing resulting remote id
977 978
 *
 * Returns the callbackID of @callback, or -1 with an error issued if the
979 980 981
 * function is not currently registered.  This only finds functions
 * registered via virConnectDomainEventRegister, even if modern style
 * virConnectDomainEventRegisterAny also registers the same callback.
982 983 984 985 986 987
 */
int
virObjectEventStateCallbackID(virConnectPtr conn,
                              virObjectEventStatePtr state,
                              virClassPtr klass,
                              int eventID,
988 989
                              virConnectObjectEventGenericCallback callback,
                              int *remoteID)
990 991 992
{
    int ret = -1;

993
    virObjectLock(state);
994
    ret = virObjectEventCallbackLookup(conn, state->callbacks, NULL,
995 996
                                       klass, eventID, callback, true,
                                       remoteID);
997
    virObjectUnlock(state);
998 999

    if (ret < 0)
1000
        virReportError(VIR_ERR_INVALID_ARG,
1001 1002 1003 1004 1005
                       _("event callback function %p not registered"),
                       callback);
    return ret;
}

1006 1007 1008 1009 1010 1011

/**
 * virObjectEventStateEventID:
 * @conn: connection associated with the callback
 * @state: object event state
 * @callbackID: the callback to query
1012
 * @remoteID: optionally output remote ID of the callback
1013
 *
1014 1015 1016 1017
 * Query what event ID type is associated with the callback
 * @callbackID for connection @conn.  If @remoteID is non-null, it
 * will be set to the remote id previously registered with
 * virObjectEventStateSetRemote().
1018 1019 1020 1021 1022 1023
 *
 * Returns 0 on success, -1 on error
 */
int
virObjectEventStateEventID(virConnectPtr conn,
                           virObjectEventStatePtr state,
1024 1025
                           int callbackID,
                           int *remoteID)
1026
{
1027 1028 1029
    int ret = -1;
    size_t i;
    virObjectEventCallbackListPtr cbList = state->callbacks;
1030

1031
    virObjectLock(state);
1032 1033 1034 1035 1036 1037 1038
    for (i = 0; i < cbList->count; i++) {
        virObjectEventCallbackPtr cb = cbList->callbacks[i];

        if (cb->deleted)
            continue;

        if (cb->callbackID == callbackID && cb->conn == conn) {
1039 1040
            if (remoteID)
                *remoteID = cb->remoteID;
1041 1042 1043 1044
            ret = cb->eventID;
            break;
        }
    }
1045
    virObjectUnlock(state);
1046 1047 1048 1049 1050

    if (ret < 0)
        virReportError(VIR_ERR_INVALID_ARG,
                       _("event callback id %d not registered"),
                       callbackID);
1051 1052
    return ret;
}
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074


/**
 * virObjectEventStateSetRemote:
 * @conn: connection associated with the callback
 * @state: object event state
 * @callbackID: the callback to adjust
 * @remoteID: the remote ID to associate with the callback
 *
 * Update @callbackID for connection @conn to record that it is now
 * tied to @remoteID, and will therefore only match events that are
 * sent with virObjectEventStateQueueRemote() with the same remote ID.
 * Silently does nothing if @callbackID is invalid.
 */
void
virObjectEventStateSetRemote(virConnectPtr conn,
                             virObjectEventStatePtr state,
                             int callbackID,
                             int remoteID)
{
    size_t i;

1075
    virObjectLock(state);
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    for (i = 0; i < state->callbacks->count; i++) {
        virObjectEventCallbackPtr cb = state->callbacks->callbacks[i];

        if (cb->deleted)
            continue;

        if (cb->callbackID == callbackID && cb->conn == conn) {
            cb->remoteID = remoteID;
            break;
        }
    }
1087
    virObjectUnlock(state);
1088
}