virkeepalive.c 9.9 KB
Newer Older
1 2 3
/*
 * virkeepalive.c: keepalive handling
 *
4
 * Copyright (C) 2011-2013 Red Hat, Inc.
5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library.  If not, see
O
Osier Yang 已提交
18
 * <http://www.gnu.org/licenses/>.
19 20 21 22 23 24
 *
 * Author: Jiri Denemark <jdenemar@redhat.com>
 */

#include <config.h>

25
#include "viralloc.h"
26
#include "virthread.h"
27
#include "virfile.h"
28
#include "virlog.h"
29
#include "virerror.h"
30 31 32
#include "virnetsocket.h"
#include "virkeepaliveprotocol.h"
#include "virkeepalive.h"
33
#include "virprobe.h"
34 35 36

#define VIR_FROM_THIS VIR_FROM_RPC

37 38
VIR_LOG_INIT("rpc.keepalive");

39
struct _virKeepAlive {
40
    virObjectLockable parent;
41 42 43 44 45

    int interval;
    unsigned int count;
    unsigned int countToDeath;
    time_t lastPacketReceived;
46
    time_t intervalStart;
47 48 49 50 51 52 53 54 55
    int timer;

    virKeepAliveSendFunc sendCB;
    virKeepAliveDeadFunc deadCB;
    virKeepAliveFreeFunc freeCB;
    void *client;
};


56 57 58 59 60
static virClassPtr virKeepAliveClass;
static void virKeepAliveDispose(void *obj);

static int virKeepAliveOnceInit(void)
{
61
    if (!(virKeepAliveClass = virClassNew(virClassForObjectLockable(),
62
                                          "virKeepAlive",
63 64 65 66 67 68 69 70 71
                                          sizeof(virKeepAlive),
                                          virKeepAliveDispose)))
        return -1;

    return 0;
}

VIR_ONCE_GLOBAL_INIT(virKeepAlive)

72
static virNetMessagePtr
73
virKeepAliveMessage(virKeepAlivePtr ka, int proc)
74 75
{
    virNetMessagePtr msg;
76
    const char *procstr = NULL;
77

78 79 80 81 82 83 84 85 86
    switch (proc) {
    case KEEPALIVE_PROC_PING:
        procstr = "request";
        break;
    case KEEPALIVE_PROC_PONG:
        procstr = "response";
        break;
    default:
        VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
87
        return NULL;
88 89 90 91
    }

    if (!(msg = virNetMessageNew(false)))
        goto error;
92 93 94 95 96 97 98 99 100

    msg->header.prog = KEEPALIVE_PROGRAM;
    msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
    msg->header.type = VIR_NET_MESSAGE;
    msg->header.proc = proc;

    if (virNetMessageEncodeHeader(msg) < 0 ||
        virNetMessageEncodePayloadEmpty(msg) < 0) {
        virNetMessageFree(msg);
101
        goto error;
102 103
    }

104
    VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
105 106 107 108
    PROBE(RPC_KEEPALIVE_SEND,
          "ka=%p client=%p prog=%d vers=%d proc=%d",
          ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);

109
    return msg;
110

111
 error:
112 113
    VIR_WARN("Failed to generate keepalive %s", procstr);
    return NULL;
114 115 116
}


117 118 119
static bool
virKeepAliveTimerInternal(virKeepAlivePtr ka,
                          virNetMessagePtr *msg)
120 121
{
    time_t now = time(NULL);
122
    int timeval;
123

124 125 126 127
    if (ka->interval <= 0 || ka->intervalStart == 0)
        return false;

    if (now - ka->intervalStart < ka->interval) {
128 129
        timeval = ka->interval - (now - ka->intervalStart);
        virEventUpdateTimeout(ka->timer, timeval * 1000);
130 131
        return false;
    }
132

133
    timeval = now - ka->lastPacketReceived;
134 135
    PROBE(RPC_KEEPALIVE_TIMEOUT,
          "ka=%p client=%p countToDeath=%d idle=%d",
136
          ka, ka->client, ka->countToDeath, timeval);
137 138

    if (ka->countToDeath == 0) {
139 140 141 142 143
        VIR_DEBUG("No response from client %p after %d keepalive messages "
                  "in %d seconds",
                  ka->client, ka->count, timeval);
        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                       _("connection closed due to keepalive timeout"));
144 145 146
        return true;
    } else {
        ka->countToDeath--;
147
        ka->intervalStart = now;
148
        *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
149 150 151 152 153 154 155 156 157 158 159 160
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
        return false;
    }
}


static void
virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
{
    virKeepAlivePtr ka = opaque;
    virNetMessagePtr msg = NULL;
    bool dead;
161
    void *client;
162

Y
Yi Wang 已提交
163
    virObjectRef(ka);
164
    virObjectLock(ka);
165

166
    client = ka->client;
167 168
    dead = virKeepAliveTimerInternal(ka, &msg);

Y
Yi Wang 已提交
169 170
    virObjectUnlock(ka);

171 172
    if (!dead && !msg)
        goto cleanup;
173

174 175 176 177 178
    if (dead) {
        ka->deadCB(client);
    } else if (ka->sendCB(client, msg) < 0) {
        VIR_WARN("Failed to send keepalive request to client %p", client);
        virNetMessageFree(msg);
179 180
    }

181
 cleanup:
Y
Yi Wang 已提交
182
    virObjectUnref(ka);
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
}


virKeepAlivePtr
virKeepAliveNew(int interval,
                unsigned int count,
                void *client,
                virKeepAliveSendFunc sendCB,
                virKeepAliveDeadFunc deadCB,
                virKeepAliveFreeFunc freeCB)
{
    virKeepAlivePtr ka;

    VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);

198 199 200
    if (virKeepAliveInitialize() < 0)
        return NULL;

201
    if (!(ka = virObjectLockableNew(virKeepAliveClass)))
202 203 204 205 206 207 208 209 210 211 212 213
        return NULL;

    ka->interval = interval;
    ka->count = count;
    ka->countToDeath = count;
    ka->timer = -1;
    ka->client = client;
    ka->sendCB = sendCB;
    ka->deadCB = deadCB;
    ka->freeCB = freeCB;

    PROBE(RPC_KEEPALIVE_NEW,
214 215
          "ka=%p client=%p",
          ka, ka->client);
216 217 218 219 220 221

    return ka;
}


void
222
virKeepAliveDispose(void *obj)
223
{
224
    virKeepAlivePtr ka = obj;
225

226 227 228
    PROBE(RPC_KEEPALIVE_DISPOSE,
          "ka=%p", ka);

229 230 231 232 233 234 235 236 237 238 239 240
    ka->freeCB(ka->client);
}


int
virKeepAliveStart(virKeepAlivePtr ka,
                  int interval,
                  unsigned int count)
{
    int ret = -1;
    time_t delay;
    int timeout;
241
    time_t now;
242

243
    virObjectLock(ka);
244 245 246 247 248 249 250 251 252

    if (ka->timer >= 0) {
        VIR_DEBUG("Keepalive messages already enabled");
        ret = 0;
        goto cleanup;
    }

    if (interval > 0) {
        if (ka->interval > 0) {
253 254
            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                           _("keepalive interval already set"));
255 256
            goto cleanup;
        }
257 258 259 260 261 262
        /* Guard against overflow */
        if (interval > INT_MAX / 1000) {
            virReportError(VIR_ERR_INTERNAL_ERROR,
                           _("keepalive interval %d too large"), interval);
            goto cleanup;
        }
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
        ka->interval = interval;
        ka->count = count;
        ka->countToDeath = count;
    }

    if (ka->interval <= 0) {
        VIR_DEBUG("Keepalive messages disabled by configuration");
        ret = 0;
        goto cleanup;
    }

    PROBE(RPC_KEEPALIVE_START,
          "ka=%p client=%p interval=%d count=%u",
          ka, ka->client, interval, count);

278 279
    now = time(NULL);
    delay = now - ka->lastPacketReceived;
280 281 282 283
    if (delay > ka->interval)
        timeout = 0;
    else
        timeout = ka->interval - delay;
284
    ka->intervalStart = now - (ka->interval - timeout);
285
    ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
286
                                   ka, virObjectFreeCallback);
287 288 289 290
    if (ka->timer < 0)
        goto cleanup;

    /* the timer now has another reference to this object */
291
    virObjectRef(ka);
292 293
    ret = 0;

294
 cleanup:
295
    virObjectUnlock(ka);
296 297 298 299
    return ret;
}


300 301
void
virKeepAliveStop(virKeepAlivePtr ka)
302
{
303
    virObjectLock(ka);
304 305

    PROBE(RPC_KEEPALIVE_STOP,
306 307
          "ka=%p client=%p",
          ka, ka->client);
308 309 310 311 312 313

    if (ka->timer > 0) {
        virEventRemoveTimeout(ka->timer);
        ka->timer = -1;
    }

314
    virObjectUnlock(ka);
315 316 317
}


318 319 320 321 322 323 324 325
int
virKeepAliveTimeout(virKeepAlivePtr ka)
{
    int timeout;

    if (!ka)
        return -1;

326
    virObjectLock(ka);
327 328 329 330 331 332 333

    if (ka->interval <= 0 || ka->intervalStart == 0) {
        timeout = -1;
    } else {
        timeout = ka->interval - (time(NULL) - ka->intervalStart);
        if (timeout < 0)
            timeout = 0;
334 335 336
        /* Guard against overflow */
        if (timeout > INT_MAX / 1000)
            timeout = INT_MAX / 1000;
337 338
    }

339
    virObjectUnlock(ka);
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

    if (timeout < 0)
        return -1;
    else
        return timeout * 1000;
}


bool
virKeepAliveTrigger(virKeepAlivePtr ka,
                    virNetMessagePtr *msg)
{
    bool dead;

    *msg = NULL;
    if (!ka)
        return false;

358
    virObjectLock(ka);
359
    dead = virKeepAliveTimerInternal(ka, msg);
360
    virObjectUnlock(ka);
361 362 363 364 365

    return dead;
}


366 367
bool
virKeepAliveCheckMessage(virKeepAlivePtr ka,
368 369
                         virNetMessagePtr msg,
                         virNetMessagePtr *response)
370 371 372 373 374 375
{
    bool ret = false;

    VIR_DEBUG("ka=%p, client=%p, msg=%p",
              ka, ka ? ka->client : "(null)", msg);

376
    *response = NULL;
377 378 379
    if (!ka)
        return false;

380
    virObjectLock(ka);
381 382

    ka->countToDeath = ka->count;
383
    ka->lastPacketReceived = ka->intervalStart = time(NULL);
384 385 386 387 388 389 390 391 392 393 394 395

    if (msg->header.prog == KEEPALIVE_PROGRAM &&
        msg->header.vers == KEEPALIVE_PROTOCOL_VERSION &&
        msg->header.type == VIR_NET_MESSAGE) {
        PROBE(RPC_KEEPALIVE_RECEIVED,
              "ka=%p client=%p prog=%d vers=%d proc=%d",
              ka, ka->client, msg->header.prog,
              msg->header.vers, msg->header.proc);
        ret = true;
        switch (msg->header.proc) {
        case KEEPALIVE_PROC_PING:
            VIR_DEBUG("Got keepalive request from client %p", ka->client);
396
            *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
            break;

        case KEEPALIVE_PROC_PONG:
            VIR_DEBUG("Got keepalive response from client %p", ka->client);
            break;

        default:
            VIR_DEBUG("Ignoring unknown keepalive message %d from client %p",
                      msg->header.proc, ka->client);
        }
    }

    if (ka->timer >= 0)
        virEventUpdateTimeout(ka->timer, ka->interval * 1000);

412
    virObjectUnlock(ka);
413 414 415

    return ret;
}