virkeepalive.c 9.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * virkeepalive.c: keepalive handling
 *
 * Copyright (C) 2011 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library.  If not, see
O
Osier Yang 已提交
18
 * <http://www.gnu.org/licenses/>.
19 20 21 22 23 24
 *
 * Author: Jiri Denemark <jdenemar@redhat.com>
 */

#include <config.h>

25
#include "viralloc.h"
26
#include "virthread.h"
27
#include "virfile.h"
28
#include "virlog.h"
29
#include "virutil.h"
30
#include "virerror.h"
31 32 33 34 35 36 37
#include "virnetsocket.h"
#include "virkeepaliveprotocol.h"
#include "virkeepalive.h"

#define VIR_FROM_THIS VIR_FROM_RPC

struct _virKeepAlive {
38
    virObjectLockable parent;
39 40 41 42 43

    int interval;
    unsigned int count;
    unsigned int countToDeath;
    time_t lastPacketReceived;
44
    time_t intervalStart;
45 46 47 48 49 50 51 52 53
    int timer;

    virKeepAliveSendFunc sendCB;
    virKeepAliveDeadFunc deadCB;
    virKeepAliveFreeFunc freeCB;
    void *client;
};


54 55 56 57 58
static virClassPtr virKeepAliveClass;
static void virKeepAliveDispose(void *obj);

static int virKeepAliveOnceInit(void)
{
59
    if (!(virKeepAliveClass = virClassNew(virClassForObjectLockable(),
60
                                          "virKeepAlive",
61 62 63 64 65 66 67 68 69
                                          sizeof(virKeepAlive),
                                          virKeepAliveDispose)))
        return -1;

    return 0;
}

VIR_ONCE_GLOBAL_INIT(virKeepAlive)

70
static virNetMessagePtr
71
virKeepAliveMessage(virKeepAlivePtr ka, int proc)
72 73
{
    virNetMessagePtr msg;
74
    const char *procstr = NULL;
75

76 77 78 79 80 81 82 83 84
    switch (proc) {
    case KEEPALIVE_PROC_PING:
        procstr = "request";
        break;
    case KEEPALIVE_PROC_PONG:
        procstr = "response";
        break;
    default:
        VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
85
        return NULL;
86 87 88 89
    }

    if (!(msg = virNetMessageNew(false)))
        goto error;
90 91 92 93 94 95 96 97 98

    msg->header.prog = KEEPALIVE_PROGRAM;
    msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
    msg->header.type = VIR_NET_MESSAGE;
    msg->header.proc = proc;

    if (virNetMessageEncodeHeader(msg) < 0 ||
        virNetMessageEncodePayloadEmpty(msg) < 0) {
        virNetMessageFree(msg);
99
        goto error;
100 101
    }

102
    VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
103 104 105 106
    PROBE(RPC_KEEPALIVE_SEND,
          "ka=%p client=%p prog=%d vers=%d proc=%d",
          ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);

107
    return msg;
108

109 110 111
error:
    VIR_WARN("Failed to generate keepalive %s", procstr);
    return NULL;
112 113 114
}


115 116 117
static bool
virKeepAliveTimerInternal(virKeepAlivePtr ka,
                          virNetMessagePtr *msg)
118 119 120
{
    time_t now = time(NULL);

121 122 123 124 125
    if (ka->interval <= 0 || ka->intervalStart == 0)
        return false;

    if (now - ka->intervalStart < ka->interval) {
        int timeout = ka->interval - (now - ka->intervalStart);
126 127 128
        virEventUpdateTimeout(ka->timer, timeout * 1000);
        return false;
    }
129 130 131 132 133 134 135 136 137 138 139 140 141

    PROBE(RPC_KEEPALIVE_TIMEOUT,
          "ka=%p client=%p countToDeath=%d idle=%d",
          ka, ka->client, ka->countToDeath,
          (int) (now - ka->lastPacketReceived));


    if (ka->countToDeath == 0) {
        VIR_WARN("No response from client %p after %d keepalive messages in"
                 " %d seconds",
                 ka->client,
                 ka->count,
                 (int) (now - ka->lastPacketReceived));
142 143 144
        return true;
    } else {
        ka->countToDeath--;
145
        ka->intervalStart = now;
146
        *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
147 148 149 150 151 152 153 154 155 156 157 158
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
        return false;
    }
}


static void
virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
{
    virKeepAlivePtr ka = opaque;
    virNetMessagePtr msg = NULL;
    bool dead;
159
    void *client;
160

161
    virObjectLock(ka);
162

163
    client = ka->client;
164 165
    dead = virKeepAliveTimerInternal(ka, &msg);

166 167
    if (!dead && !msg)
        goto cleanup;
168

169
    virObjectRef(ka);
170
    virObjectUnlock(ka);
171

172 173 174 175 176
    if (dead) {
        ka->deadCB(client);
    } else if (ka->sendCB(client, msg) < 0) {
        VIR_WARN("Failed to send keepalive request to client %p", client);
        virNetMessageFree(msg);
177 178
    }

179
    virObjectLock(ka);
180
    virObjectUnref(ka);
181

182
cleanup:
183
    virObjectUnlock(ka);
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
}


virKeepAlivePtr
virKeepAliveNew(int interval,
                unsigned int count,
                void *client,
                virKeepAliveSendFunc sendCB,
                virKeepAliveDeadFunc deadCB,
                virKeepAliveFreeFunc freeCB)
{
    virKeepAlivePtr ka;

    VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);

199 200 201
    if (virKeepAliveInitialize() < 0)
        return NULL;

202
    if (!(ka = virObjectLockableNew(virKeepAliveClass)))
203 204 205 206 207 208 209 210 211 212 213 214
        return NULL;

    ka->interval = interval;
    ka->count = count;
    ka->countToDeath = count;
    ka->timer = -1;
    ka->client = client;
    ka->sendCB = sendCB;
    ka->deadCB = deadCB;
    ka->freeCB = freeCB;

    PROBE(RPC_KEEPALIVE_NEW,
215 216
          "ka=%p client=%p",
          ka, ka->client);
217 218 219 220 221 222

    return ka;
}


void
223
virKeepAliveDispose(void *obj)
224
{
225
    virKeepAlivePtr ka = obj;
226 227 228 229 230 231 232 233 234 235 236 237 238

    ka->freeCB(ka->client);
}


int
virKeepAliveStart(virKeepAlivePtr ka,
                  int interval,
                  unsigned int count)
{
    int ret = -1;
    time_t delay;
    int timeout;
239
    time_t now;
240

241
    virObjectLock(ka);
242 243 244 245 246 247 248 249 250

    if (ka->timer >= 0) {
        VIR_DEBUG("Keepalive messages already enabled");
        ret = 0;
        goto cleanup;
    }

    if (interval > 0) {
        if (ka->interval > 0) {
251 252
            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                           _("keepalive interval already set"));
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
            goto cleanup;
        }
        ka->interval = interval;
        ka->count = count;
        ka->countToDeath = count;
    }

    if (ka->interval <= 0) {
        VIR_DEBUG("Keepalive messages disabled by configuration");
        ret = 0;
        goto cleanup;
    }

    PROBE(RPC_KEEPALIVE_START,
          "ka=%p client=%p interval=%d count=%u",
          ka, ka->client, interval, count);

270 271
    now = time(NULL);
    delay = now - ka->lastPacketReceived;
272 273 274 275
    if (delay > ka->interval)
        timeout = 0;
    else
        timeout = ka->interval - delay;
276
    ka->intervalStart = now - (ka->interval - timeout);
277
    ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
278
                                   ka, virObjectFreeCallback);
279 280 281 282
    if (ka->timer < 0)
        goto cleanup;

    /* the timer now has another reference to this object */
283
    virObjectRef(ka);
284 285 286
    ret = 0;

cleanup:
287
    virObjectUnlock(ka);
288 289 290 291
    return ret;
}


292 293
void
virKeepAliveStop(virKeepAlivePtr ka)
294
{
295
    virObjectLock(ka);
296 297

    PROBE(RPC_KEEPALIVE_STOP,
298 299
          "ka=%p client=%p",
          ka, ka->client);
300 301 302 303 304 305

    if (ka->timer > 0) {
        virEventRemoveTimeout(ka->timer);
        ka->timer = -1;
    }

306
    virObjectUnlock(ka);
307 308 309
}


310 311 312 313 314 315 316 317
int
virKeepAliveTimeout(virKeepAlivePtr ka)
{
    int timeout;

    if (!ka)
        return -1;

318
    virObjectLock(ka);
319 320 321 322 323 324 325 326 327

    if (ka->interval <= 0 || ka->intervalStart == 0) {
        timeout = -1;
    } else {
        timeout = ka->interval - (time(NULL) - ka->intervalStart);
        if (timeout < 0)
            timeout = 0;
    }

328
    virObjectUnlock(ka);
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346

    if (timeout < 0)
        return -1;
    else
        return timeout * 1000;
}


bool
virKeepAliveTrigger(virKeepAlivePtr ka,
                    virNetMessagePtr *msg)
{
    bool dead;

    *msg = NULL;
    if (!ka)
        return false;

347
    virObjectLock(ka);
348
    dead = virKeepAliveTimerInternal(ka, msg);
349
    virObjectUnlock(ka);
350 351 352 353 354

    return dead;
}


355 356
bool
virKeepAliveCheckMessage(virKeepAlivePtr ka,
357 358
                         virNetMessagePtr msg,
                         virNetMessagePtr *response)
359 360 361 362 363 364
{
    bool ret = false;

    VIR_DEBUG("ka=%p, client=%p, msg=%p",
              ka, ka ? ka->client : "(null)", msg);

365
    *response = NULL;
366 367 368
    if (!ka)
        return false;

369
    virObjectLock(ka);
370 371

    ka->countToDeath = ka->count;
372
    ka->lastPacketReceived = ka->intervalStart = time(NULL);
373 374 375 376 377 378 379 380 381 382 383 384

    if (msg->header.prog == KEEPALIVE_PROGRAM &&
        msg->header.vers == KEEPALIVE_PROTOCOL_VERSION &&
        msg->header.type == VIR_NET_MESSAGE) {
        PROBE(RPC_KEEPALIVE_RECEIVED,
              "ka=%p client=%p prog=%d vers=%d proc=%d",
              ka, ka->client, msg->header.prog,
              msg->header.vers, msg->header.proc);
        ret = true;
        switch (msg->header.proc) {
        case KEEPALIVE_PROC_PING:
            VIR_DEBUG("Got keepalive request from client %p", ka->client);
385
            *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
            break;

        case KEEPALIVE_PROC_PONG:
            VIR_DEBUG("Got keepalive response from client %p", ka->client);
            break;

        default:
            VIR_DEBUG("Ignoring unknown keepalive message %d from client %p",
                      msg->header.proc, ka->client);
        }
    }

    if (ka->timer >= 0)
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);

401
    virObjectUnlock(ka);
402 403 404

    return ret;
}