colo-compare.c 34.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
 * (a.k.a. Fault Tolerance or Continuous Replication)
 *
 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
 * Copyright (c) 2016 FUJITSU LIMITED
 * Copyright (c) 2016 Intel Corporation
 *
 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or
 * later.  See the COPYING file in the top-level directory.
 */

#include "qemu/osdep.h"
#include "qemu/error-report.h"
17
#include "trace.h"
18 19 20
#include "qemu-common.h"
#include "qapi/error.h"
#include "net/net.h"
21
#include "net/eth.h"
22 23 24 25
#include "qom/object_interfaces.h"
#include "qemu/iov.h"
#include "qom/object.h"
#include "net/queue.h"
26
#include "chardev/char-fe.h"
27
#include "qemu/sockets.h"
28
#include "colo.h"
29
#include "sysemu/iothread.h"
30 31
#include "net/colo-compare.h"
#include "migration/colo.h"
32
#include "migration/migration.h"
33 34 35 36 37

#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)

38 39 40
static QTAILQ_HEAD(, CompareState) net_compares =
       QTAILQ_HEAD_INITIALIZER(net_compares);

41 42 43
static NotifierList colo_compare_notifiers =
    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);

44
#define COMPARE_READ_LEN_MAX NET_BUFSIZE
45 46
#define MAX_QUEUE_SIZE 1024

47 48 49
#define COLO_COMPARE_FREE_PRIMARY     0x01
#define COLO_COMPARE_FREE_SECONDARY   0x02

50 51 52
/* TODO: Should be configurable */
#define REGULAR_PACKET_CHECK_MS 3000

53 54 55 56
static QemuMutex event_mtx;
static QemuCond event_complete_cond;
static int event_unhandled_count;

57
/*
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
 *  + CompareState ++
 *  |               |
 *  +---------------+   +---------------+         +---------------+
 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
 *  +---------------+   +---------------+         +---------------+
 *  |               |     |           |             |          |
 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
 *                    |primary |  |secondary    |primary | |secondary
 *                    |packet  |  |packet  +    |packet  | |packet  +
 *                    +--------+  +--------+    +--------+ +--------+
 *                        |           |             |          |
 *                    +---v----+  +---v----+    +---v----+ +---v----+
 *                    |primary |  |secondary    |primary | |secondary
 *                    |packet  |  |packet  +    |packet  | |packet  +
 *                    +--------+  +--------+    +--------+ +--------+
 *                        |           |             |          |
 *                    +---v----+  +---v----+    +---v----+ +---v----+
 *                    |primary |  |secondary    |primary | |secondary
 *                    |packet  |  |packet  +    |packet  | |packet  +
 *                    +--------+  +--------+    +--------+ +--------+
 */
79 80 81 82 83 84
typedef struct CompareState {
    Object parent;

    char *pri_indev;
    char *sec_indev;
    char *outdev;
85 86 87
    CharBackend chr_pri_in;
    CharBackend chr_sec_in;
    CharBackend chr_out;
88 89
    SocketReadState pri_rs;
    SocketReadState sec_rs;
90
    bool vnet_hdr;
91

92 93 94
    /*
     * Record the connection that through the NIC
     * Element type: Connection
95 96
     */
    GQueue conn_list;
97
    /* Record the connection without repetition */
98
    GHashTable *connection_track_table;
99

100
    IOThread *iothread;
101
    GMainContext *worker_context;
102
    QEMUTimer *packet_check_timer;
103 104 105 106 107

    QEMUBH *event_bh;
    enum colo_event event;

    QTAILQ_ENTRY(CompareState) next;
108 109 110 111 112 113
} CompareState;

typedef struct CompareClass {
    ObjectClass parent_class;
} CompareClass;

114 115 116 117 118
enum {
    PRIMARY_IN = 0,
    SECONDARY_IN,
};

119 120 121 122 123 124
static void colo_compare_inconsistency_notify(void)
{
    notifier_list_notify(&colo_compare_notifiers,
                migrate_get_current());
}

125
static int compare_chr_send(CompareState *s,
126
                            const uint8_t *buf,
127 128
                            uint32_t size,
                            uint32_t vnet_hdr_len);
129

130 131 132 133 134 135 136 137 138
static gint seq_sorter(Packet *a, Packet *b, gpointer data)
{
    struct tcphdr *atcp, *btcp;

    atcp = (struct tcphdr *)(a->transport_header);
    btcp = (struct tcphdr *)(b->transport_header);
    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
}

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
{
    Packet *pkt = data;
    struct tcphdr *tcphd;

    tcphd = (struct tcphdr *)pkt->transport_header;

    pkt->tcp_seq = ntohl(tcphd->th_seq);
    pkt->tcp_ack = ntohl(tcphd->th_ack);
    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
    pkt->payload_size = pkt->size - pkt->header_size;
    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
    pkt->flags = tcphd->th_flags;
}

156 157 158 159
/*
 * Return 1 on success, if return 0 means the
 * packet will be dropped
 */
160
static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
161 162 163
{
    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
        if (pkt->ip->ip_p == IPPROTO_TCP) {
164
            fill_pkt_tcp_info(pkt, max_ack);
165 166 167 168 169 170 171 172 173 174 175 176
            g_queue_insert_sorted(queue,
                                  pkt,
                                  (GCompareDataFunc)seq_sorter,
                                  NULL);
        } else {
            g_queue_push_tail(queue, pkt);
        }
        return 1;
    }
    return 0;
}

177 178 179 180
/*
 * Return 0 on success, if return -1 means the pkt
 * is unsupported(arp and ipv6) and will be sent later
 */
181
static int packet_enqueue(CompareState *s, int mode, Connection **con)
182
{
183
    ConnectionKey key;
184
    Packet *pkt = NULL;
185
    Connection *conn;
186 187

    if (mode == PRIMARY_IN) {
188 189 190
        pkt = packet_new(s->pri_rs.buf,
                         s->pri_rs.packet_len,
                         s->pri_rs.vnet_hdr_len);
191
    } else {
192 193 194
        pkt = packet_new(s->sec_rs.buf,
                         s->sec_rs.packet_len,
                         s->sec_rs.vnet_hdr_len);
195 196 197 198 199 200 201
    }

    if (parse_packet_early(pkt)) {
        packet_destroy(pkt, NULL);
        pkt = NULL;
        return -1;
    }
202
    fill_connection_key(pkt, &key);
203

204 205 206
    conn = connection_get(s->connection_track_table,
                          &key,
                          &s->conn_list);
207

208 209 210 211 212 213
    if (!conn->processing) {
        g_queue_push_tail(&s->conn_list, conn);
        conn->processing = true;
    }

    if (mode == PRIMARY_IN) {
214
        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
215 216 217 218
            error_report("colo compare primary queue size too big,"
                         "drop packet");
        }
    } else {
219
        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
220 221 222 223
            error_report("colo compare secondary queue size too big,"
                         "drop packet");
        }
    }
224
    *con = conn;
225 226 227 228

    return 0;
}

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
static inline bool after(uint32_t seq1, uint32_t seq2)
{
        return (int32_t)(seq1 - seq2) > 0;
}

static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
{
    int ret;
    ret = compare_chr_send(s,
                           pkt->data,
                           pkt->size,
                           pkt->vnet_hdr_len);
    if (ret < 0) {
        error_report("colo send primary packet failed");
    }
    trace_colo_compare_main("packet same and release packet");
    packet_destroy(pkt, NULL);
}

248 249 250 251 252 253 254
/*
 * The IP packets sent by primary and secondary
 * will be compared in here
 * TODO support ip fragment, Out-Of-Order
 * return:    0  means packet same
 *            > 0 || < 0 means packet different
 */
255 256 257 258 259 260
static int colo_compare_packet_payload(Packet *ppkt,
                                       Packet *spkt,
                                       uint16_t poffset,
                                       uint16_t soffset,
                                       uint16_t len)

261
{
262
    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
263 264 265 266 267 268 269 270 271 272 273
        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];

        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));

        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
                                   pri_ip_dst, spkt->size,
                                   sec_ip_src, sec_ip_dst);
    }
274

275
    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
276 277
}

278
/*
279 280 281 282 283 284
 * return true means that the payload is consist and
 * need to make the next comparison, false means do
 * the checkpoint
*/
static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
                              int8_t *mark, uint32_t max_ack)
285
{
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    *mark = 0;

    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
        if (colo_compare_packet_payload(ppkt, spkt,
                                        ppkt->header_size, spkt->header_size,
                                        ppkt->payload_size)) {
            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
            return true;
        }
    }
    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
        if (colo_compare_packet_payload(ppkt, spkt,
                                        ppkt->header_size, spkt->header_size,
                                        ppkt->payload_size)) {
            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
            return true;
        }
    }
304

305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
    /* one part of secondary packet payload still need to be compared */
    if (!after(ppkt->seq_end, spkt->seq_end)) {
        if (colo_compare_packet_payload(ppkt, spkt,
                                        ppkt->header_size + ppkt->offset,
                                        spkt->header_size + spkt->offset,
                                        ppkt->payload_size - ppkt->offset)) {
            if (!after(ppkt->tcp_ack, max_ack)) {
                *mark = COLO_COMPARE_FREE_PRIMARY;
                spkt->offset += ppkt->payload_size - ppkt->offset;
                return true;
            } else {
                /* secondary guest hasn't ack the data, don't send
                 * out this packet
                 */
                return false;
            }
        }
    } else {
        /* primary packet is longer than secondary packet, compare
         * the same part and mark the primary packet offset
         */
        if (colo_compare_packet_payload(ppkt, spkt,
                                        ppkt->header_size + ppkt->offset,
                                        spkt->header_size + spkt->offset,
                                        spkt->payload_size - spkt->offset)) {
            *mark = COLO_COMPARE_FREE_SECONDARY;
            ppkt->offset += spkt->payload_size - spkt->offset;
            return true;
        }
    }

    return false;
}
338

339 340 341 342
static void colo_compare_tcp(CompareState *s, Connection *conn)
{
    Packet *ppkt = NULL, *spkt = NULL;
    int8_t mark;
343 344

    /*
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
     * If ppkt and spkt have the same payload, but ppkt's ACK
     * is greater than spkt's ACK, in this case we can not
     * send the ppkt because it will cause the secondary guest
     * to miss sending some data in the next. Therefore, we
     * record the maximum ACK in the current queue at both
     * primary side and secondary side. Only when the ack is
     * less than the smaller of the two maximum ack, then we
     * can ensure that the packet's payload is acknowledged by
     * primary and secondary.
    */
    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;

pri:
    if (g_queue_is_empty(&conn->primary_list)) {
        return;
    }
    ppkt = g_queue_pop_head(&conn->primary_list);
sec:
    if (g_queue_is_empty(&conn->secondary_list)) {
        g_queue_push_head(&conn->primary_list, ppkt);
        return;
366
    }
367
    spkt = g_queue_pop_head(&conn->secondary_list);
368

369 370 371 372
    if (ppkt->tcp_seq == ppkt->seq_end) {
        colo_release_primary_pkt(s, ppkt);
        ppkt = NULL;
    }
373

374 375 376 377 378
    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
        trace_colo_compare_main("pri: this packet has compared");
        colo_release_primary_pkt(s, ppkt);
        ppkt = NULL;
    }
379

380 381 382 383 384 385 386
    if (spkt->tcp_seq == spkt->seq_end) {
        packet_destroy(spkt, NULL);
        if (!ppkt) {
            goto pri;
        } else {
            goto sec;
        }
387
    } else {
388 389 390 391 392 393 394 395 396 397 398 399 400
        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
            trace_colo_compare_main("sec: this packet has compared");
            packet_destroy(spkt, NULL);
            if (!ppkt) {
                goto pri;
            } else {
                goto sec;
            }
        }
        if (!ppkt) {
            g_queue_push_head(&conn->secondary_list, spkt);
            goto pri;
        }
401
    }
402

403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
        trace_colo_compare_tcp_info("pri",
                                    ppkt->tcp_seq, ppkt->tcp_ack,
                                    ppkt->header_size, ppkt->payload_size,
                                    ppkt->offset, ppkt->flags);

        trace_colo_compare_tcp_info("sec",
                                    spkt->tcp_seq, spkt->tcp_ack,
                                    spkt->header_size, spkt->payload_size,
                                    spkt->offset, spkt->flags);

        if (mark == COLO_COMPARE_FREE_PRIMARY) {
            conn->compare_seq = ppkt->seq_end;
            colo_release_primary_pkt(s, ppkt);
            g_queue_push_head(&conn->secondary_list, spkt);
            goto pri;
        }
        if (mark == COLO_COMPARE_FREE_SECONDARY) {
            conn->compare_seq = spkt->seq_end;
            packet_destroy(spkt, NULL);
            goto sec;
        }
        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
            conn->compare_seq = ppkt->seq_end;
            colo_release_primary_pkt(s, ppkt);
            packet_destroy(spkt, NULL);
            goto pri;
        }
    } else {
        g_queue_push_head(&conn->primary_list, ppkt);
        g_queue_push_head(&conn->secondary_list, spkt);
Z
Zhang Chen 已提交
434 435 436 437 438

        qemu_hexdump((char *)ppkt->data, stderr,
                     "colo-compare ppkt", ppkt->size);
        qemu_hexdump((char *)spkt->data, stderr,
                     "colo-compare spkt", spkt->size);
439

440
        colo_compare_inconsistency_notify();
441
    }
442 443
}

444

445 446 447 448 449 450
/*
 * Called from the compare thread on the primary
 * for compare udp packet
 */
static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
{
451 452
    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
453 454

    trace_colo_compare_main("compare udp");
455

456 457 458 459 460 461 462 463 464 465
    /*
     * Because of ppkt and spkt are both in the same connection,
     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
     * same with spkt. In addition, IP header's Identification is a random
     * field, we can handle it in IP fragmentation function later.
     * COLO just concern the response net packet payload from primary guest
     * and secondary guest are same or not, So we ignored all IP header include
     * other field like TOS,TTL,IP Checksum. we only need to compare
     * the ip payload here.
     */
466 467 468 469 470 471
    if (ppkt->size != spkt->size) {
        trace_colo_compare_main("UDP: payload size of packets are different");
        return -1;
    }
    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
                                    ppkt->size - offset)) {
472 473
        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
474
        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
475 476 477 478 479
            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
                         ppkt->size);
            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
                         spkt->size);
        }
480 481 482
        return -1;
    } else {
        return 0;
483 484 485 486 487 488 489 490 491
    }
}

/*
 * Called from the compare thread on the primary
 * for compare icmp packet
 */
static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
{
492 493
    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
494

495 496
    trace_colo_compare_main("compare icmp");

497 498 499 500 501 502 503 504 505 506
    /*
     * Because of ppkt and spkt are both in the same connection,
     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
     * same with spkt. In addition, IP header's Identification is a random
     * field, we can handle it in IP fragmentation function later.
     * COLO just concern the response net packet payload from primary guest
     * and secondary guest are same or not, So we ignored all IP header include
     * other field like TOS,TTL,IP Checksum. we only need to compare
     * the ip payload here.
     */
507 508 509 510 511 512
    if (ppkt->size != spkt->size) {
        trace_colo_compare_main("ICMP: payload size of packets are different");
        return -1;
    }
    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
                                    ppkt->size - offset)) {
513 514 515 516
        trace_colo_compare_icmp_miscompare("primary pkt size",
                                           ppkt->size);
        trace_colo_compare_icmp_miscompare("Secondary pkt size",
                                           spkt->size);
517
        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
518 519 520 521 522
            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
                         ppkt->size);
            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
                         spkt->size);
        }
523 524 525 526 527 528 529 530 531 532 533 534
        return -1;
    } else {
        return 0;
    }
}

/*
 * Called from the compare thread on the primary
 * for compare other packet
 */
static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
{
535 536
    uint16_t offset = ppkt->vnet_hdr_len;

537
    trace_colo_compare_main("compare other");
538
    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
539 540 541 542 543 544 545 546 547 548 549 550
        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];

        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));

        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
                                   pri_ip_dst, spkt->size,
                                   sec_ip_src, sec_ip_dst);
    }

551 552 553 554 555 556
    if (ppkt->size != spkt->size) {
        trace_colo_compare_main("Other: payload size of packets are different");
        return -1;
    }
    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
                                       ppkt->size - offset);
557 558 559 560 561 562 563 564 565 566 567 568 569 570
}

static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
{
    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);

    if ((now - pkt->creation_ms) > (*check_time)) {
        trace_colo_old_packet_check_found(pkt->creation_ms);
        return 0;
    } else {
        return 1;
    }
}

571 572 573 574 575 576 577 578 579 580
void colo_compare_register_notifier(Notifier *notify)
{
    notifier_list_add(&colo_compare_notifiers, notify);
}

void colo_compare_unregister_notifier(Notifier *notify)
{
    notifier_remove(notify);
}

581
static int colo_old_packet_check_one_conn(Connection *conn,
582
                                           void *user_data)
583 584 585 586 587 588 589 590 591
{
    GList *result = NULL;
    int64_t check_time = REGULAR_PACKET_CHECK_MS;

    result = g_queue_find_custom(&conn->primary_list,
                                 &check_time,
                                 (GCompareFunc)colo_old_packet_check_one);

    if (result) {
592
        /* Do checkpoint will flush old packet */
593
        colo_compare_inconsistency_notify();
594
        return 0;
595
    }
596 597

    return 1;
598 599 600 601 602 603 604 605 606 607 608
}

/*
 * Look for old packets that the secondary hasn't matched,
 * if we have some then we have to checkpoint to wake
 * the secondary up.
 */
static void colo_old_packet_check(void *opaque)
{
    CompareState *s = opaque;

609 610 611 612 613 614
    /*
     * If we find one old packet, stop finding job and notify
     * COLO frame do checkpoint.
     */
    g_queue_find_custom(&s->conn_list, NULL,
                        (GCompareFunc)colo_old_packet_check_one_conn);
615 616
}

617 618 619
static void colo_compare_packet(CompareState *s, Connection *conn,
                                int (*HandlePacket)(Packet *spkt,
                                Packet *ppkt))
620 621 622 623 624 625
{
    Packet *pkt = NULL;
    GList *result = NULL;

    while (!g_queue_is_empty(&conn->primary_list) &&
           !g_queue_is_empty(&conn->secondary_list)) {
626
        pkt = g_queue_pop_head(&conn->primary_list);
627 628
        result = g_queue_find_custom(&conn->secondary_list,
                 pkt, (GCompareFunc)HandlePacket);
629 630

        if (result) {
631
            colo_release_primary_pkt(s, pkt);
632 633 634 635 636
            g_queue_remove(&conn->secondary_list, result->data);
        } else {
            /*
             * If one packet arrive late, the secondary_list or
             * primary_list will be empty, so we can't compare it
637 638
             * until next comparison. If the packets in the list are
             * timeout, it will trigger a checkpoint request.
639 640
             */
            trace_colo_compare_main("packet different");
641
            g_queue_push_head(&conn->primary_list, pkt);
642
            colo_compare_inconsistency_notify();
643 644 645 646 647
            break;
        }
    }
}

648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
/*
 * Called from the compare thread on the primary
 * for compare packet with secondary list of the
 * specified connection when a new packet was
 * queued to it.
 */
static void colo_compare_connection(void *opaque, void *user_data)
{
    CompareState *s = user_data;
    Connection *conn = opaque;

    switch (conn->ip_proto) {
    case IPPROTO_TCP:
        colo_compare_tcp(s, conn);
        break;
    case IPPROTO_UDP:
        colo_compare_packet(s, conn, colo_packet_compare_udp);
        break;
    case IPPROTO_ICMP:
        colo_compare_packet(s, conn, colo_packet_compare_icmp);
        break;
    default:
        colo_compare_packet(s, conn, colo_packet_compare_other);
        break;
    }
}

675
static int compare_chr_send(CompareState *s,
676
                            const uint8_t *buf,
677 678
                            uint32_t size,
                            uint32_t vnet_hdr_len)
679 680 681 682 683 684 685 686
{
    int ret = 0;
    uint32_t len = htonl(size);

    if (!size) {
        return 0;
    }

687
    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
688 689 690 691
    if (ret != sizeof(len)) {
        goto err;
    }

692 693 694 695 696 697 698 699 700 701 702 703
    if (s->vnet_hdr) {
        /*
         * We send vnet header len make other module(like filter-redirector)
         * know how to parse net packet correctly.
         */
        len = htonl(vnet_hdr_len);
        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
        if (ret != sizeof(len)) {
            goto err;
        }
    }

704
    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
705 706 707 708 709 710 711 712 713 714
    if (ret != size) {
        goto err;
    }

    return 0;

err:
    return ret < 0 ? ret : -EIO;
}

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
static int compare_chr_can_read(void *opaque)
{
    return COMPARE_READ_LEN_MAX;
}

/*
 * Called from the main thread on the primary for packets
 * arriving over the socket from the primary.
 */
static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
{
    CompareState *s = COLO_COMPARE(opaque);
    int ret;

    ret = net_fill_rstate(&s->pri_rs, buf, size);
    if (ret == -1) {
731
        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
732
                                 NULL, NULL, true);
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
        error_report("colo-compare primary_in error");
    }
}

/*
 * Called from the main thread on the primary for packets
 * arriving over the socket from the secondary.
 */
static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
{
    CompareState *s = COLO_COMPARE(opaque);
    int ret;

    ret = net_fill_rstate(&s->sec_rs, buf, size);
    if (ret == -1) {
748
        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
749
                                 NULL, NULL, true);
750 751 752 753
        error_report("colo-compare secondary_in error");
    }
}

754 755 756 757
/*
 * Check old packet regularly so it can watch for any packets
 * that the secondary hasn't produced equivalents of.
 */
758
static void check_old_packet_regular(void *opaque)
759 760 761 762 763
{
    CompareState *s = opaque;

    /* if have old packet we will notify checkpoint */
    colo_old_packet_check(s);
764 765 766 767
    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
                REGULAR_PACKET_CHECK_MS);
}

768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
/* Public API, Used for COLO frame to notify compare event */
void colo_notify_compares_event(void *opaque, int event, Error **errp)
{
    CompareState *s;

    qemu_mutex_lock(&event_mtx);
    QTAILQ_FOREACH(s, &net_compares, next) {
        s->event = event;
        qemu_bh_schedule(s->event_bh);
        event_unhandled_count++;
    }
    /* Wait all compare threads to finish handling this event */
    while (event_unhandled_count > 0) {
        qemu_cond_wait(&event_complete_cond, &event_mtx);
    }

    qemu_mutex_unlock(&event_mtx);
}

787 788 789
static void colo_compare_timer_init(CompareState *s)
{
    AioContext *ctx = iothread_get_aio_context(s->iothread);
790

791 792 793 794 795
    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
                                SCALE_MS, check_old_packet_regular,
                                s);
    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
                    REGULAR_PACKET_CHECK_MS);
796 797
}

798
static void colo_compare_timer_del(CompareState *s)
799
{
800 801 802 803 804 805
    if (s->packet_check_timer) {
        timer_del(s->packet_check_timer);
        timer_free(s->packet_check_timer);
        s->packet_check_timer = NULL;
    }
 }
806

807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
static void colo_flush_packets(void *opaque, void *user_data);

static void colo_compare_handle_event(void *opaque)
{
    CompareState *s = opaque;

    switch (s->event) {
    case COLO_EVENT_CHECKPOINT:
        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
        break;
    case COLO_EVENT_FAILOVER:
        break;
    default:
        break;
    }

    assert(event_unhandled_count > 0);

    qemu_mutex_lock(&event_mtx);
    event_unhandled_count--;
    qemu_cond_broadcast(&event_complete_cond);
    qemu_mutex_unlock(&event_mtx);
}

831 832 833 834
static void colo_compare_iothread(CompareState *s)
{
    object_ref(OBJECT(s->iothread));
    s->worker_context = iothread_get_g_main_context(s->iothread);
835

836
    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
837 838
                             compare_pri_chr_in, NULL, NULL,
                             s, s->worker_context, true);
839
    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
840 841
                             compare_sec_chr_in, NULL, NULL,
                             s, s->worker_context, true);
842

843
    colo_compare_timer_init(s);
844
    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
845 846
}

847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
static char *compare_get_pri_indev(Object *obj, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    return g_strdup(s->pri_indev);
}

static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    g_free(s->pri_indev);
    s->pri_indev = g_strdup(value);
}

static char *compare_get_sec_indev(Object *obj, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    return g_strdup(s->sec_indev);
}

static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    g_free(s->sec_indev);
    s->sec_indev = g_strdup(value);
}

static char *compare_get_outdev(Object *obj, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    return g_strdup(s->outdev);
}

static void compare_set_outdev(Object *obj, const char *value, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    g_free(s->outdev);
    s->outdev = g_strdup(value);
}

892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
static bool compare_get_vnet_hdr(Object *obj, Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    return s->vnet_hdr;
}

static void compare_set_vnet_hdr(Object *obj,
                                 bool value,
                                 Error **errp)
{
    CompareState *s = COLO_COMPARE(obj);

    s->vnet_hdr = value;
}

908 909
static void compare_pri_rs_finalize(SocketReadState *pri_rs)
{
910
    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
911
    Connection *conn = NULL;
912

913
    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
914
        trace_colo_compare_main("primary: unsupported packet in");
915 916 917 918
        compare_chr_send(s,
                         pri_rs->buf,
                         pri_rs->packet_len,
                         pri_rs->vnet_hdr_len);
919
    } else {
M
Mao Zhongyi 已提交
920
        /* compare packet in the specified connection */
921
        colo_compare_connection(conn, s);
922
    }
923 924 925 926
}

static void compare_sec_rs_finalize(SocketReadState *sec_rs)
{
927
    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
928
    Connection *conn = NULL;
929

930
    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
931
        trace_colo_compare_main("secondary: unsupported packet in");
932
    } else {
M
Mao Zhongyi 已提交
933
        /* compare packet in the specified connection */
934
        colo_compare_connection(conn, s);
935
    }
936 937 938 939 940 941 942
}


/*
 * Return 0 is success.
 * Return 1 is failed.
 */
943
static int find_and_check_chardev(Chardev **chr,
944 945 946 947 948 949 950 951 952 953
                                  char *chr_name,
                                  Error **errp)
{
    *chr = qemu_chr_find(chr_name);
    if (*chr == NULL) {
        error_setg(errp, "Device '%s' not found",
                   chr_name);
        return 1;
    }

954 955
    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
        error_setg(errp, "chardev \"%s\" is not reconnectable",
956 957 958
                   chr_name);
        return 1;
    }
959

960 961 962 963 964 965 966 967 968 969
    return 0;
}

/*
 * Called from the main thread on the primary
 * to setup colo-compare.
 */
static void colo_compare_complete(UserCreatable *uc, Error **errp)
{
    CompareState *s = COLO_COMPARE(uc);
970
    Chardev *chr;
971

972
    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
973
        error_setg(errp, "colo compare needs 'primary_in' ,"
974
                   "'secondary_in','outdev','iothread' property set");
975 976 977 978 979 980 981 982 983
        return;
    } else if (!strcmp(s->pri_indev, s->outdev) ||
               !strcmp(s->sec_indev, s->outdev) ||
               !strcmp(s->pri_indev, s->sec_indev)) {
        error_setg(errp, "'indev' and 'outdev' could not be same "
                   "for compare module");
        return;
    }

984 985
    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
986 987 988
        return;
    }

989 990
    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
991 992 993
        return;
    }

994 995
    if (find_and_check_chardev(&chr, s->outdev, errp) ||
        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
996 997 998
        return;
    }

999 1000
    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1001

1002 1003
    QTAILQ_INSERT_TAIL(&net_compares, s, next);

1004 1005
    g_queue_init(&s->conn_list);

1006 1007 1008
    qemu_mutex_init(&event_mtx);
    qemu_cond_init(&event_complete_cond);

1009 1010 1011 1012
    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                      connection_key_equal,
                                                      g_free,
                                                      connection_destroy);
1013

1014
    colo_compare_iothread(s);
1015 1016 1017
    return;
}

1018 1019 1020 1021 1022 1023 1024 1025
static void colo_flush_packets(void *opaque, void *user_data)
{
    CompareState *s = user_data;
    Connection *conn = opaque;
    Packet *pkt = NULL;

    while (!g_queue_is_empty(&conn->primary_list)) {
        pkt = g_queue_pop_head(&conn->primary_list);
1026 1027 1028 1029
        compare_chr_send(s,
                         pkt->data,
                         pkt->size,
                         pkt->vnet_hdr_len);
1030 1031 1032 1033 1034 1035 1036 1037
        packet_destroy(pkt, NULL);
    }
    while (!g_queue_is_empty(&conn->secondary_list)) {
        pkt = g_queue_pop_head(&conn->secondary_list);
        packet_destroy(pkt, NULL);
    }
}

1038 1039 1040 1041 1042 1043 1044 1045 1046
static void colo_compare_class_init(ObjectClass *oc, void *data)
{
    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);

    ucc->complete = colo_compare_complete;
}

static void colo_compare_init(Object *obj)
{
1047 1048
    CompareState *s = COLO_COMPARE(obj);

1049 1050 1051 1052 1053 1054 1055 1056 1057
    object_property_add_str(obj, "primary_in",
                            compare_get_pri_indev, compare_set_pri_indev,
                            NULL);
    object_property_add_str(obj, "secondary_in",
                            compare_get_sec_indev, compare_set_sec_indev,
                            NULL);
    object_property_add_str(obj, "outdev",
                            compare_get_outdev, compare_set_outdev,
                            NULL);
1058 1059 1060
    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
                            (Object **)&s->iothread,
                            object_property_allow_set_link,
1061
                            OBJ_PROP_LINK_STRONG, NULL);
1062 1063 1064 1065

    s->vnet_hdr = false;
    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
                             compare_set_vnet_hdr, NULL);
1066 1067 1068 1069 1070
}

static void colo_compare_finalize(Object *obj)
{
    CompareState *s = COLO_COMPARE(obj);
1071
    CompareState *tmp = NULL;
1072

1073 1074 1075
    qemu_chr_fe_deinit(&s->chr_pri_in, false);
    qemu_chr_fe_deinit(&s->chr_sec_in, false);
    qemu_chr_fe_deinit(&s->chr_out, false);
1076 1077 1078
    if (s->iothread) {
        colo_compare_timer_del(s);
    }
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088

    qemu_bh_delete(s->event_bh);

    QTAILQ_FOREACH(tmp, &net_compares, next) {
        if (tmp == s) {
            QTAILQ_REMOVE(&net_compares, s, next);
            break;
        }
    }

1089 1090 1091
    /* Release all unhandled packets after compare thead exited */
    g_queue_foreach(&s->conn_list, colo_flush_packets, s);

1092
    g_queue_clear(&s->conn_list);
1093

1094 1095 1096 1097 1098 1099 1100
    if (s->connection_track_table) {
        g_hash_table_destroy(s->connection_track_table);
    }

    if (s->iothread) {
        object_unref(OBJECT(s->iothread));
    }
1101 1102 1103 1104

    qemu_mutex_destroy(&event_mtx);
    qemu_cond_destroy(&event_complete_cond);

1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
    g_free(s->pri_indev);
    g_free(s->sec_indev);
    g_free(s->outdev);
}

static const TypeInfo colo_compare_info = {
    .name = TYPE_COLO_COMPARE,
    .parent = TYPE_OBJECT,
    .instance_size = sizeof(CompareState),
    .instance_init = colo_compare_init,
    .instance_finalize = colo_compare_finalize,
    .class_size = sizeof(CompareClass),
    .class_init = colo_compare_class_init,
    .interfaces = (InterfaceInfo[]) {
        { TYPE_USER_CREATABLE },
        { }
    }
};

static void register_types(void)
{
    type_register_static(&colo_compare_info);
}

type_init(register_types);