virkeepalive.c 9.9 KB
Newer Older
1 2 3
/*
 * virkeepalive.c: keepalive handling
 *
4
 * Copyright (C) 2011-2013 Red Hat, Inc.
5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library.  If not, see
O
Osier Yang 已提交
18
 * <http://www.gnu.org/licenses/>.
19 20 21 22 23 24
 *
 * Author: Jiri Denemark <jdenemar@redhat.com>
 */

#include <config.h>

25
#include "viralloc.h"
26
#include "virthread.h"
27
#include "virfile.h"
28
#include "virlog.h"
29
#include "virerror.h"
30 31 32
#include "virnetsocket.h"
#include "virkeepaliveprotocol.h"
#include "virkeepalive.h"
33
#include "virprobe.h"
34 35 36

#define VIR_FROM_THIS VIR_FROM_RPC

37 38
VIR_LOG_INIT("rpc.keepalive");

39
struct _virKeepAlive {
40
    virObjectLockable parent;
41 42 43 44 45

    int interval;
    unsigned int count;
    unsigned int countToDeath;
    time_t lastPacketReceived;
46
    time_t intervalStart;
47 48 49 50 51 52 53 54 55
    int timer;

    virKeepAliveSendFunc sendCB;
    virKeepAliveDeadFunc deadCB;
    virKeepAliveFreeFunc freeCB;
    void *client;
};


56 57 58 59 60
static virClassPtr virKeepAliveClass;
static void virKeepAliveDispose(void *obj);

static int virKeepAliveOnceInit(void)
{
61
    if (!(virKeepAliveClass = virClassNew(virClassForObjectLockable(),
62
                                          "virKeepAlive",
63 64 65 66 67 68 69 70 71
                                          sizeof(virKeepAlive),
                                          virKeepAliveDispose)))
        return -1;

    return 0;
}

VIR_ONCE_GLOBAL_INIT(virKeepAlive)

72
static virNetMessagePtr
73
virKeepAliveMessage(virKeepAlivePtr ka, int proc)
74 75
{
    virNetMessagePtr msg;
76
    const char *procstr = NULL;
77

78 79 80 81 82 83 84 85 86
    switch (proc) {
    case KEEPALIVE_PROC_PING:
        procstr = "request";
        break;
    case KEEPALIVE_PROC_PONG:
        procstr = "response";
        break;
    default:
        VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
87
        return NULL;
88 89 90 91
    }

    if (!(msg = virNetMessageNew(false)))
        goto error;
92 93 94 95 96 97 98 99 100

    msg->header.prog = KEEPALIVE_PROGRAM;
    msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
    msg->header.type = VIR_NET_MESSAGE;
    msg->header.proc = proc;

    if (virNetMessageEncodeHeader(msg) < 0 ||
        virNetMessageEncodePayloadEmpty(msg) < 0) {
        virNetMessageFree(msg);
101
        goto error;
102 103
    }

104
    VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
105 106 107 108
    PROBE(RPC_KEEPALIVE_SEND,
          "ka=%p client=%p prog=%d vers=%d proc=%d",
          ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);

109
    return msg;
110

111
 error:
112 113
    VIR_WARN("Failed to generate keepalive %s", procstr);
    return NULL;
114 115 116
}


117 118 119
static bool
virKeepAliveTimerInternal(virKeepAlivePtr ka,
                          virNetMessagePtr *msg)
120 121
{
    time_t now = time(NULL);
122
    int timeval;
123

124 125 126 127
    if (ka->interval <= 0 || ka->intervalStart == 0)
        return false;

    if (now - ka->intervalStart < ka->interval) {
128 129
        timeval = ka->interval - (now - ka->intervalStart);
        virEventUpdateTimeout(ka->timer, timeval * 1000);
130 131
        return false;
    }
132

133
    timeval = now - ka->lastPacketReceived;
134 135
    PROBE(RPC_KEEPALIVE_TIMEOUT,
          "ka=%p client=%p countToDeath=%d idle=%d",
136
          ka, ka->client, ka->countToDeath, timeval);
137 138

    if (ka->countToDeath == 0) {
139 140 141 142 143
        VIR_DEBUG("No response from client %p after %d keepalive messages "
                  "in %d seconds",
                  ka->client, ka->count, timeval);
        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                       _("connection closed due to keepalive timeout"));
144 145 146
        return true;
    } else {
        ka->countToDeath--;
147
        ka->intervalStart = now;
148
        *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
149 150 151 152 153 154 155 156 157 158 159 160
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
        return false;
    }
}


static void
virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
{
    virKeepAlivePtr ka = opaque;
    virNetMessagePtr msg = NULL;
    bool dead;
161
    void *client;
162

163
    virObjectLock(ka);
164

165
    client = ka->client;
166 167
    dead = virKeepAliveTimerInternal(ka, &msg);

168 169
    if (!dead && !msg)
        goto cleanup;
170

171
    virObjectRef(ka);
172
    virObjectUnlock(ka);
173

174 175 176 177 178
    if (dead) {
        ka->deadCB(client);
    } else if (ka->sendCB(client, msg) < 0) {
        VIR_WARN("Failed to send keepalive request to client %p", client);
        virNetMessageFree(msg);
179 180
    }

181
    virObjectLock(ka);
182
    virObjectUnref(ka);
183

184
 cleanup:
185
    virObjectUnlock(ka);
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
}


virKeepAlivePtr
virKeepAliveNew(int interval,
                unsigned int count,
                void *client,
                virKeepAliveSendFunc sendCB,
                virKeepAliveDeadFunc deadCB,
                virKeepAliveFreeFunc freeCB)
{
    virKeepAlivePtr ka;

    VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);

201 202 203
    if (virKeepAliveInitialize() < 0)
        return NULL;

204
    if (!(ka = virObjectLockableNew(virKeepAliveClass)))
205 206 207 208 209 210 211 212 213 214 215 216
        return NULL;

    ka->interval = interval;
    ka->count = count;
    ka->countToDeath = count;
    ka->timer = -1;
    ka->client = client;
    ka->sendCB = sendCB;
    ka->deadCB = deadCB;
    ka->freeCB = freeCB;

    PROBE(RPC_KEEPALIVE_NEW,
217 218
          "ka=%p client=%p",
          ka, ka->client);
219 220 221 222 223 224

    return ka;
}


void
225
virKeepAliveDispose(void *obj)
226
{
227
    virKeepAlivePtr ka = obj;
228

229 230 231
    PROBE(RPC_KEEPALIVE_DISPOSE,
          "ka=%p", ka);

232 233 234 235 236 237 238 239 240 241 242 243
    ka->freeCB(ka->client);
}


int
virKeepAliveStart(virKeepAlivePtr ka,
                  int interval,
                  unsigned int count)
{
    int ret = -1;
    time_t delay;
    int timeout;
244
    time_t now;
245

246
    virObjectLock(ka);
247 248 249 250 251 252 253 254 255

    if (ka->timer >= 0) {
        VIR_DEBUG("Keepalive messages already enabled");
        ret = 0;
        goto cleanup;
    }

    if (interval > 0) {
        if (ka->interval > 0) {
256 257
            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                           _("keepalive interval already set"));
258 259
            goto cleanup;
        }
260 261 262 263 264 265
        /* Guard against overflow */
        if (interval > INT_MAX / 1000) {
            virReportError(VIR_ERR_INTERNAL_ERROR,
                           _("keepalive interval %d too large"), interval);
            goto cleanup;
        }
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
        ka->interval = interval;
        ka->count = count;
        ka->countToDeath = count;
    }

    if (ka->interval <= 0) {
        VIR_DEBUG("Keepalive messages disabled by configuration");
        ret = 0;
        goto cleanup;
    }

    PROBE(RPC_KEEPALIVE_START,
          "ka=%p client=%p interval=%d count=%u",
          ka, ka->client, interval, count);

281 282
    now = time(NULL);
    delay = now - ka->lastPacketReceived;
283 284 285 286
    if (delay > ka->interval)
        timeout = 0;
    else
        timeout = ka->interval - delay;
287
    ka->intervalStart = now - (ka->interval - timeout);
288
    ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
289
                                   ka, virObjectFreeCallback);
290 291 292 293
    if (ka->timer < 0)
        goto cleanup;

    /* the timer now has another reference to this object */
294
    virObjectRef(ka);
295 296
    ret = 0;

297
 cleanup:
298
    virObjectUnlock(ka);
299 300 301 302
    return ret;
}


303 304
void
virKeepAliveStop(virKeepAlivePtr ka)
305
{
306
    virObjectLock(ka);
307 308

    PROBE(RPC_KEEPALIVE_STOP,
309 310
          "ka=%p client=%p",
          ka, ka->client);
311 312 313 314 315 316

    if (ka->timer > 0) {
        virEventRemoveTimeout(ka->timer);
        ka->timer = -1;
    }

317
    virObjectUnlock(ka);
318 319 320
}


321 322 323 324 325 326 327 328
int
virKeepAliveTimeout(virKeepAlivePtr ka)
{
    int timeout;

    if (!ka)
        return -1;

329
    virObjectLock(ka);
330 331 332 333 334 335 336

    if (ka->interval <= 0 || ka->intervalStart == 0) {
        timeout = -1;
    } else {
        timeout = ka->interval - (time(NULL) - ka->intervalStart);
        if (timeout < 0)
            timeout = 0;
337 338 339
        /* Guard against overflow */
        if (timeout > INT_MAX / 1000)
            timeout = INT_MAX / 1000;
340 341
    }

342
    virObjectUnlock(ka);
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360

    if (timeout < 0)
        return -1;
    else
        return timeout * 1000;
}


bool
virKeepAliveTrigger(virKeepAlivePtr ka,
                    virNetMessagePtr *msg)
{
    bool dead;

    *msg = NULL;
    if (!ka)
        return false;

361
    virObjectLock(ka);
362
    dead = virKeepAliveTimerInternal(ka, msg);
363
    virObjectUnlock(ka);
364 365 366 367 368

    return dead;
}


369 370
bool
virKeepAliveCheckMessage(virKeepAlivePtr ka,
371 372
                         virNetMessagePtr msg,
                         virNetMessagePtr *response)
373 374 375 376 377 378
{
    bool ret = false;

    VIR_DEBUG("ka=%p, client=%p, msg=%p",
              ka, ka ? ka->client : "(null)", msg);

379
    *response = NULL;
380 381 382
    if (!ka)
        return false;

383
    virObjectLock(ka);
384 385

    ka->countToDeath = ka->count;
386
    ka->lastPacketReceived = ka->intervalStart = time(NULL);
387 388 389 390 391 392 393 394 395 396 397 398

    if (msg->header.prog == KEEPALIVE_PROGRAM &&
        msg->header.vers == KEEPALIVE_PROTOCOL_VERSION &&
        msg->header.type == VIR_NET_MESSAGE) {
        PROBE(RPC_KEEPALIVE_RECEIVED,
              "ka=%p client=%p prog=%d vers=%d proc=%d",
              ka, ka->client, msg->header.prog,
              msg->header.vers, msg->header.proc);
        ret = true;
        switch (msg->header.proc) {
        case KEEPALIVE_PROC_PING:
            VIR_DEBUG("Got keepalive request from client %p", ka->client);
399
            *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
            break;

        case KEEPALIVE_PROC_PONG:
            VIR_DEBUG("Got keepalive response from client %p", ka->client);
            break;

        default:
            VIR_DEBUG("Ignoring unknown keepalive message %d from client %p",
                      msg->header.proc, ka->client);
        }
    }

    if (ka->timer >= 0)
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);

415
    virObjectUnlock(ka);
416 417 418

    return ret;
}