mirror.c 18.7 KB
Newer Older
P
Paolo Bonzini 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Image mirroring
 *
 * Copyright Red Hat, Inc. 2012
 *
 * Authors:
 *  Paolo Bonzini  <pbonzini@redhat.com>
 *
 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
 * See the COPYING.LIB file in the top-level directory.
 *
 */

#include "trace.h"
15 16
#include "block/blockjob.h"
#include "block/block_int.h"
P
Paolo Bonzini 已提交
17
#include "qemu/ratelimit.h"
18
#include "qemu/bitmap.h"
P
Paolo Bonzini 已提交
19

20 21 22 23 24 25 26 27 28
#define SLICE_TIME    100000000ULL /* ns */
#define MAX_IN_FLIGHT 16

/* The mirroring buffer is a list of granularity-sized chunks.
 * Free chunks are organized in a list.
 */
typedef struct MirrorBuffer {
    QSIMPLEQ_ENTRY(MirrorBuffer) next;
} MirrorBuffer;
P
Paolo Bonzini 已提交
29 30 31 32 33 34

typedef struct MirrorBlockJob {
    BlockJob common;
    RateLimit limit;
    BlockDriverState *target;
    MirrorSyncMode mode;
35
    BlockdevOnError on_source_error, on_target_error;
P
Paolo Bonzini 已提交
36 37
    bool synced;
    bool should_complete;
P
Paolo Bonzini 已提交
38
    int64_t sector_num;
39
    int64_t granularity;
40 41
    size_t buf_size;
    unsigned long *cow_bitmap;
42
    HBitmapIter hbi;
P
Paolo Bonzini 已提交
43
    uint8_t *buf;
44 45
    QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
    int buf_free_count;
46

47
    unsigned long *in_flight_bitmap;
48 49
    int in_flight;
    int ret;
P
Paolo Bonzini 已提交
50 51
} MirrorBlockJob;

52 53 54 55 56 57 58
typedef struct MirrorOp {
    MirrorBlockJob *s;
    QEMUIOVector qiov;
    int64_t sector_num;
    int nb_sectors;
} MirrorOp;

59 60 61 62 63 64 65 66 67 68 69 70 71
static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
                                            int error)
{
    s->synced = false;
    if (read) {
        return block_job_error_action(&s->common, s->common.bs,
                                      s->on_source_error, true, error);
    } else {
        return block_job_error_action(&s->common, s->target,
                                      s->on_target_error, false, error);
    }
}

72 73 74
static void mirror_iteration_done(MirrorOp *op, int ret)
{
    MirrorBlockJob *s = op->s;
75
    struct iovec *iov;
76
    int64_t chunk_num;
77
    int i, nb_chunks, sectors_per_chunk;
78 79 80 81

    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors, ret);

    s->in_flight--;
82 83 84 85 86 87 88
    iov = op->qiov.iov;
    for (i = 0; i < op->qiov.niov; i++) {
        MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
        QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
        s->buf_free_count++;
    }

89 90 91
    sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
    chunk_num = op->sector_num / sectors_per_chunk;
    nb_chunks = op->nb_sectors / sectors_per_chunk;
92
    bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    if (s->cow_bitmap && ret >= 0) {
        bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
    }

    g_slice_free(MirrorOp, op);
    qemu_coroutine_enter(s->common.co, NULL);
}

static void mirror_write_complete(void *opaque, int ret)
{
    MirrorOp *op = opaque;
    MirrorBlockJob *s = op->s;
    if (ret < 0) {
        BlockDriverState *source = s->common.bs;
        BlockErrorAction action;

        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
        action = mirror_error_action(s, false, -ret);
        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
            s->ret = ret;
        }
    }
    mirror_iteration_done(op, ret);
}

static void mirror_read_complete(void *opaque, int ret)
{
    MirrorOp *op = opaque;
    MirrorBlockJob *s = op->s;
    if (ret < 0) {
        BlockDriverState *source = s->common.bs;
        BlockErrorAction action;

        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
        action = mirror_error_action(s, true, -ret);
        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
            s->ret = ret;
        }

        mirror_iteration_done(op, ret);
        return;
    }
    bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
                    mirror_write_complete, op);
}

static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
P
Paolo Bonzini 已提交
140 141
{
    BlockDriverState *source = s->common.bs;
142
    int nb_sectors, sectors_per_chunk, nb_chunks;
143
    int64_t end, sector_num, next_chunk, next_sector, hbitmap_next_sector;
144
    MirrorOp *op;
P
Paolo Bonzini 已提交
145

146 147 148 149 150 151 152 153
    s->sector_num = hbitmap_iter_next(&s->hbi);
    if (s->sector_num < 0) {
        bdrv_dirty_iter_init(source, &s->hbi);
        s->sector_num = hbitmap_iter_next(&s->hbi);
        trace_mirror_restart_iter(s, bdrv_get_dirty_count(source));
        assert(s->sector_num >= 0);
    }

154
    hbitmap_next_sector = s->sector_num;
155 156 157
    sector_num = s->sector_num;
    sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
    end = s->common.len >> BDRV_SECTOR_BITS;
158

159 160
    /* Extend the QEMUIOVector to include all adjacent blocks that will
     * be copied in this operation.
161
     *
162 163 164 165 166 167 168 169 170
     * We have to do this if we have no backing file yet in the destination,
     * and the cluster size is very large.  Then we need to do COW ourselves.
     * The first time a cluster is copied, copy it entirely.  Note that,
     * because both the granularity and the cluster size are powers of two,
     * the number of sectors to copy cannot exceed one cluster.
     *
     * We also want to extend the QEMUIOVector to include more adjacent
     * dirty blocks if possible, to limit the number of I/O operations and
     * run efficiently even with a small granularity.
171
     */
172 173 174 175
    nb_chunks = 0;
    nb_sectors = 0;
    next_sector = sector_num;
    next_chunk = sector_num / sectors_per_chunk;
176 177

    /* Wait for I/O to this cluster (from a previous iteration) to be done.  */
178
    while (test_bit(next_chunk, s->in_flight_bitmap)) {
179 180
        trace_mirror_yield_in_flight(s, sector_num, s->in_flight);
        qemu_coroutine_yield();
181 182
    }

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    do {
        int added_sectors, added_chunks;

        if (!bdrv_get_dirty(source, next_sector) ||
            test_bit(next_chunk, s->in_flight_bitmap)) {
            assert(nb_sectors > 0);
            break;
        }

        added_sectors = sectors_per_chunk;
        if (s->cow_bitmap && !test_bit(next_chunk, s->cow_bitmap)) {
            bdrv_round_to_clusters(s->target,
                                   next_sector, added_sectors,
                                   &next_sector, &added_sectors);

            /* On the first iteration, the rounding may make us copy
             * sectors before the first dirty one.
             */
            if (next_sector < sector_num) {
                assert(nb_sectors == 0);
                sector_num = next_sector;
                next_chunk = next_sector / sectors_per_chunk;
            }
        }

        added_sectors = MIN(added_sectors, end - (sector_num + nb_sectors));
        added_chunks = (added_sectors + sectors_per_chunk - 1) / sectors_per_chunk;

        /* When doing COW, it may happen that there is not enough space for
         * a full cluster.  Wait if that is the case.
         */
        while (nb_chunks == 0 && s->buf_free_count < added_chunks) {
            trace_mirror_yield_buf_busy(s, nb_chunks, s->in_flight);
            qemu_coroutine_yield();
        }
        if (s->buf_free_count < nb_chunks + added_chunks) {
            trace_mirror_break_buf_busy(s, nb_chunks, s->in_flight);
            break;
        }

        /* We have enough free space to copy these sectors.  */
        bitmap_set(s->in_flight_bitmap, next_chunk, added_chunks);
225

226 227 228 229 230
        nb_sectors += added_sectors;
        nb_chunks += added_chunks;
        next_sector += added_sectors;
        next_chunk += added_chunks;
    } while (next_sector < end);
231 232 233 234 235 236

    /* Allocate a MirrorOp that is used as an AIO callback.  */
    op = g_slice_new(MirrorOp);
    op->s = s;
    op->sector_num = sector_num;
    op->nb_sectors = nb_sectors;
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

    /* Now make a QEMUIOVector taking enough granularity-sized chunks
     * from s->buf_free.
     */
    qemu_iovec_init(&op->qiov, nb_chunks);
    next_sector = sector_num;
    while (nb_chunks-- > 0) {
        MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
        QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
        s->buf_free_count--;
        qemu_iovec_add(&op->qiov, buf, s->granularity);

        /* Advance the HBitmapIter in parallel, so that we do not examine
         * the same sector twice.
         */
        if (next_sector > hbitmap_next_sector && bdrv_get_dirty(source, next_sector)) {
            hbitmap_next_sector = hbitmap_iter_next(&s->hbi);
        }

        next_sector += sectors_per_chunk;
    }
258

259
    bdrv_reset_dirty(source, sector_num, nb_sectors);
P
Paolo Bonzini 已提交
260 261

    /* Copy the dirty cluster.  */
262
    s->in_flight++;
263
    trace_mirror_one_iteration(s, sector_num, nb_sectors);
264 265 266
    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
                   mirror_read_complete, op);
}
267

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
static void mirror_free_init(MirrorBlockJob *s)
{
    int granularity = s->granularity;
    size_t buf_size = s->buf_size;
    uint8_t *buf = s->buf;

    assert(s->buf_free_count == 0);
    QSIMPLEQ_INIT(&s->buf_free);
    while (buf_size != 0) {
        MirrorBuffer *cur = (MirrorBuffer *)buf;
        QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
        s->buf_free_count++;
        buf_size -= granularity;
        buf += granularity;
    }
}

285 286 287 288 289
static void mirror_drain(MirrorBlockJob *s)
{
    while (s->in_flight > 0) {
        qemu_coroutine_yield();
    }
P
Paolo Bonzini 已提交
290 291 292 293 294 295
}

static void coroutine_fn mirror_run(void *opaque)
{
    MirrorBlockJob *s = opaque;
    BlockDriverState *bs = s->common.bs;
296
    int64_t sector_num, end, sectors_per_chunk, length;
297
    uint64_t last_pause_ns;
298 299
    BlockDriverInfo bdi;
    char backing_filename[1024];
P
Paolo Bonzini 已提交
300 301 302 303 304 305 306 307
    int ret = 0;
    int n;

    if (block_job_is_cancelled(&s->common)) {
        goto immediate_exit;
    }

    s->common.len = bdrv_getlength(bs);
308
    if (s->common.len <= 0) {
P
Paolo Bonzini 已提交
309 310 311 312
        block_job_completed(&s->common, s->common.len);
        return;
    }

313 314 315
    length = (bdrv_getlength(bs) + s->granularity - 1) / s->granularity;
    s->in_flight_bitmap = bitmap_new(length);

316 317 318 319 320 321 322 323
    /* If we have no backing file yet in the destination, we cannot let
     * the destination do COW.  Instead, we copy sectors around the
     * dirty data if needed.  We need a bitmap to do that.
     */
    bdrv_get_backing_filename(s->target, backing_filename,
                              sizeof(backing_filename));
    if (backing_filename[0] && !s->target->backing_hd) {
        bdrv_get_info(s->target, &bdi);
324
        if (s->granularity < bdi.cluster_size) {
325
            s->buf_size = MAX(s->buf_size, bdi.cluster_size);
326 327 328 329
            s->cow_bitmap = bitmap_new(length);
        }
    }

P
Paolo Bonzini 已提交
330
    end = s->common.len >> BDRV_SECTOR_BITS;
331
    s->buf = qemu_blockalign(bs, s->buf_size);
332
    sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
333
    mirror_free_init(s);
P
Paolo Bonzini 已提交
334 335 336 337 338 339

    if (s->mode != MIRROR_SYNC_MODE_NONE) {
        /* First part, loop on the sectors and initialize the dirty bitmap.  */
        BlockDriverState *base;
        base = s->mode == MIRROR_SYNC_MODE_FULL ? NULL : bs->backing_hd;
        for (sector_num = 0; sector_num < end; ) {
340
            int64_t next = (sector_num | (sectors_per_chunk - 1)) + 1;
P
Paolo Bonzini 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
            ret = bdrv_co_is_allocated_above(bs, base,
                                             sector_num, next - sector_num, &n);

            if (ret < 0) {
                goto immediate_exit;
            }

            assert(n > 0);
            if (ret == 1) {
                bdrv_set_dirty(bs, sector_num, n);
                sector_num = next;
            } else {
                sector_num += n;
            }
        }
    }

358
    bdrv_dirty_iter_init(bs, &s->hbi);
359
    last_pause_ns = qemu_get_clock_ns(rt_clock);
P
Paolo Bonzini 已提交
360 361 362 363 364
    for (;;) {
        uint64_t delay_ns;
        int64_t cnt;
        bool should_complete;

365 366 367 368 369
        if (s->ret < 0) {
            ret = s->ret;
            goto immediate_exit;
        }

P
Paolo Bonzini 已提交
370
        cnt = bdrv_get_dirty_count(bs);
371 372 373 374 375 376 377 378

        /* Note that even when no rate limit is applied we need to yield
         * periodically with no pending I/O so that qemu_aio_flush() returns.
         * We do so every SLICE_TIME nanoseconds, or when there is an error,
         * or when the source is clean, whichever comes first.
         */
        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
379 380 381
            if (s->in_flight == MAX_IN_FLIGHT || s->buf_free_count == 0 ||
                (cnt == 0 && s->in_flight > 0)) {
                trace_mirror_yield(s, s->in_flight, s->buf_free_count, cnt);
382 383 384 385 386
                qemu_coroutine_yield();
                continue;
            } else if (cnt != 0) {
                mirror_iteration(s);
                continue;
P
Paolo Bonzini 已提交
387 388 389 390
            }
        }

        should_complete = false;
391
        if (s->in_flight == 0 && cnt == 0) {
P
Paolo Bonzini 已提交
392 393 394
            trace_mirror_before_flush(s);
            ret = bdrv_flush(s->target);
            if (ret < 0) {
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
                if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) {
                    goto immediate_exit;
                }
            } else {
                /* We're out of the streaming phase.  From now on, if the job
                 * is cancelled we will actually complete all pending I/O and
                 * report completion.  This way, block-job-cancel will leave
                 * the target in a consistent state.
                 */
                s->common.offset = end * BDRV_SECTOR_SIZE;
                if (!s->synced) {
                    block_job_ready(&s->common);
                    s->synced = true;
                }

                should_complete = s->should_complete ||
                    block_job_is_cancelled(&s->common);
                cnt = bdrv_get_dirty_count(bs);
P
Paolo Bonzini 已提交
413
            }
P
Paolo Bonzini 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
        }

        if (cnt == 0 && should_complete) {
            /* The dirty bitmap is not updated while operations are pending.
             * If we're about to exit, wait for pending operations before
             * calling bdrv_get_dirty_count(bs), or we may exit while the
             * source has dirty data to copy!
             *
             * Note that I/O can be submitted by the guest while
             * mirror_populate runs.
             */
            trace_mirror_before_drain(s, cnt);
            bdrv_drain_all();
            cnt = bdrv_get_dirty_count(bs);
        }

        ret = 0;
P
Paolo Bonzini 已提交
431 432
        trace_mirror_before_sleep(s, cnt, s->synced);
        if (!s->synced) {
P
Paolo Bonzini 已提交
433
            /* Publish progress */
434
            s->common.offset = (end - cnt) * BDRV_SECTOR_SIZE;
P
Paolo Bonzini 已提交
435 436

            if (s->common.speed) {
437
                delay_ns = ratelimit_calculate_delay(&s->limit, sectors_per_chunk);
P
Paolo Bonzini 已提交
438 439 440 441 442 443 444 445 446
            } else {
                delay_ns = 0;
            }

            block_job_sleep_ns(&s->common, rt_clock, delay_ns);
            if (block_job_is_cancelled(&s->common)) {
                break;
            }
        } else if (!should_complete) {
447
            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
P
Paolo Bonzini 已提交
448 449 450 451 452 453 454 455 456
            block_job_sleep_ns(&s->common, rt_clock, delay_ns);
        } else if (cnt == 0) {
            /* The two disks are in sync.  Exit and report successful
             * completion.
             */
            assert(QLIST_EMPTY(&bs->tracked_requests));
            s->common.cancelled = false;
            break;
        }
457
        last_pause_ns = qemu_get_clock_ns(rt_clock);
P
Paolo Bonzini 已提交
458 459 460
    }

immediate_exit:
461 462 463 464 465 466 467 468 469 470
    if (s->in_flight > 0) {
        /* We get here only if something went wrong.  Either the job failed,
         * or it was cancelled prematurely so that we do not guarantee that
         * the target is a copy of the source.
         */
        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
        mirror_drain(s);
    }

    assert(s->in_flight == 0);
471
    qemu_vfree(s->buf);
472
    g_free(s->cow_bitmap);
473
    g_free(s->in_flight_bitmap);
474
    bdrv_set_dirty_tracking(bs, 0);
475
    bdrv_iostatus_disable(s->target);
P
Paolo Bonzini 已提交
476 477 478 479 480 481
    if (s->should_complete && ret == 0) {
        if (bdrv_get_flags(s->target) != bdrv_get_flags(s->common.bs)) {
            bdrv_reopen(s->target, bdrv_get_flags(s->common.bs), NULL);
        }
        bdrv_swap(s->target, s->common.bs);
    }
P
Paolo Bonzini 已提交
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
    bdrv_close(s->target);
    bdrv_delete(s->target);
    block_job_completed(&s->common, ret);
}

static void mirror_set_speed(BlockJob *job, int64_t speed, Error **errp)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    if (speed < 0) {
        error_set(errp, QERR_INVALID_PARAMETER, "speed");
        return;
    }
    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
}

498 499 500 501 502 503 504
static void mirror_iostatus_reset(BlockJob *job)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    bdrv_iostatus_reset(s->target);
}

P
Paolo Bonzini 已提交
505 506 507 508 509
static void mirror_complete(BlockJob *job, Error **errp)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
    int ret;

510
    ret = bdrv_open_backing_file(s->target, NULL);
P
Paolo Bonzini 已提交
511 512 513 514
    if (ret < 0) {
        char backing_filename[PATH_MAX];
        bdrv_get_full_backing_filename(s->target, backing_filename,
                                       sizeof(backing_filename));
515
        error_setg_file_open(errp, -ret, backing_filename);
P
Paolo Bonzini 已提交
516 517 518 519 520 521 522 523 524 525 526
        return;
    }
    if (!s->synced) {
        error_set(errp, QERR_BLOCK_JOB_NOT_READY, job->bs->device_name);
        return;
    }

    s->should_complete = true;
    block_job_resume(job);
}

K
Kevin Wolf 已提交
527
static const BlockJobType mirror_job_type = {
P
Paolo Bonzini 已提交
528 529 530
    .instance_size = sizeof(MirrorBlockJob),
    .job_type      = "mirror",
    .set_speed     = mirror_set_speed,
531
    .iostatus_reset= mirror_iostatus_reset,
P
Paolo Bonzini 已提交
532
    .complete      = mirror_complete,
P
Paolo Bonzini 已提交
533 534 535
};

void mirror_start(BlockDriverState *bs, BlockDriverState *target,
536 537
                  int64_t speed, int64_t granularity, int64_t buf_size,
                  MirrorSyncMode mode, BlockdevOnError on_source_error,
538
                  BlockdevOnError on_target_error,
P
Paolo Bonzini 已提交
539 540 541 542 543
                  BlockDriverCompletionFunc *cb,
                  void *opaque, Error **errp)
{
    MirrorBlockJob *s;

544 545 546 547 548 549 550 551 552 553 554 555 556 557
    if (granularity == 0) {
        /* Choose the default granularity based on the target file's cluster
         * size, clamped between 4k and 64k.  */
        BlockDriverInfo bdi;
        if (bdrv_get_info(target, &bdi) >= 0 && bdi.cluster_size != 0) {
            granularity = MAX(4096, bdi.cluster_size);
            granularity = MIN(65536, granularity);
        } else {
            granularity = 65536;
        }
    }

    assert ((granularity & (granularity - 1)) == 0);

558 559 560 561 562 563 564
    if ((on_source_error == BLOCKDEV_ON_ERROR_STOP ||
         on_source_error == BLOCKDEV_ON_ERROR_ENOSPC) &&
        !bdrv_iostatus_is_enabled(bs)) {
        error_set(errp, QERR_INVALID_PARAMETER, "on-source-error");
        return;
    }

P
Paolo Bonzini 已提交
565 566 567 568 569
    s = block_job_create(&mirror_job_type, bs, speed, cb, opaque, errp);
    if (!s) {
        return;
    }

570 571
    s->on_source_error = on_source_error;
    s->on_target_error = on_target_error;
P
Paolo Bonzini 已提交
572 573
    s->target = target;
    s->mode = mode;
574
    s->granularity = granularity;
575
    s->buf_size = MAX(buf_size, granularity);
576

577
    bdrv_set_dirty_tracking(bs, granularity);
P
Paolo Bonzini 已提交
578
    bdrv_set_enable_write_cache(s->target, true);
579 580
    bdrv_set_on_error(s->target, on_target_error, on_target_error);
    bdrv_iostatus_enable(s->target);
P
Paolo Bonzini 已提交
581 582 583 584
    s->common.co = qemu_coroutine_create(mirror_run);
    trace_mirror_start(bs, s, s->common.co, opaque);
    qemu_coroutine_enter(s->common.co, s);
}