mirror.c 56.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Image mirroring
 *
 * Copyright Red Hat, Inc. 2012
 *
 * Authors:
 *  Paolo Bonzini  <pbonzini@redhat.com>
 *
 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
 * See the COPYING.LIB file in the top-level directory.
 *
 */

14
#include "qemu/osdep.h"
15
#include "qemu/cutils.h"
16
#include "qemu/coroutine.h"
17
#include "qemu/range.h"
18
#include "trace.h"
19
#include "block/blockjob_int.h"
20
#include "block/block_int.h"
21
#include "sysemu/block-backend.h"
22
#include "qapi/error.h"
23
#include "qapi/qmp/qerror.h"
24
#include "qemu/ratelimit.h"
25
#include "qemu/bitmap.h"
26

27
#define MAX_IN_FLIGHT 16
28 29
#define MAX_IO_BYTES (1 << 20) /* 1 Mb */
#define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
30 31 32 33 34 35 36

/* The mirroring buffer is a list of granularity-sized chunks.
 * Free chunks are organized in a list.
 */
typedef struct MirrorBuffer {
    QSIMPLEQ_ENTRY(MirrorBuffer) next;
} MirrorBuffer;
37

38 39
typedef struct MirrorOp MirrorOp;

40 41
typedef struct MirrorBlockJob {
    BlockJob common;
42
    BlockBackend *target;
43
    BlockDriverState *mirror_top_bs;
44
    BlockDriverState *base;
45

46 47 48 49 50 51
    /* The name of the graph node to replace */
    char *replaces;
    /* The BDS to replace */
    BlockDriverState *to_replace;
    /* Used to block operations on the drive-mirror-replace target */
    Error *replace_blocker;
52
    bool is_none_mode;
53
    BlockMirrorBackingMode backing_mode;
54
    MirrorCopyMode copy_mode;
55
    BlockdevOnError on_source_error, on_target_error;
56
    bool synced;
57 58 59
    /* Set when the target is synced (dirty bitmap is clean, nothing
     * in flight) and the job is running in active mode */
    bool actively_synced;
60
    bool should_complete;
61
    int64_t granularity;
62
    size_t buf_size;
63
    int64_t bdev_length;
64
    unsigned long *cow_bitmap;
65
    BdrvDirtyBitmap *dirty_bitmap;
66
    BdrvDirtyBitmapIter *dbi;
67
    uint8_t *buf;
68 69
    QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
    int buf_free_count;
70

71
    uint64_t last_pause_ns;
72
    unsigned long *in_flight_bitmap;
73
    int in_flight;
74
    int64_t bytes_in_flight;
75
    QTAILQ_HEAD(MirrorOpList, MirrorOp) ops_in_flight;
76
    int ret;
77
    bool unmap;
78
    int target_cluster_size;
79
    int max_iov;
80
    bool initial_zeroing_ongoing;
81
    int in_active_write_counter;
82
    bool prepared;
83 84
} MirrorBlockJob;

85 86 87 88
typedef struct MirrorBDSOpaque {
    MirrorBlockJob *job;
} MirrorBDSOpaque;

89
struct MirrorOp {
90 91
    MirrorBlockJob *s;
    QEMUIOVector qiov;
92 93
    int64_t offset;
    uint64_t bytes;
94 95 96 97

    /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
     * mirror_co_discard() before yielding for the first time */
    int64_t *bytes_handled;
98

99
    bool is_pseudo_op;
100
    bool is_active_write;
101 102 103 104
    CoQueue waiting_requests;

    QTAILQ_ENTRY(MirrorOp) next;
};
105

106 107 108 109 110 111
typedef enum MirrorMethod {
    MIRROR_METHOD_COPY,
    MIRROR_METHOD_ZERO,
    MIRROR_METHOD_DISCARD,
} MirrorMethod;

112 113 114 115
static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
                                            int error)
{
    s->synced = false;
116
    s->actively_synced = false;
117
    if (read) {
118 119
        return block_job_error_action(&s->common, s->on_source_error,
                                      true, error);
120
    } else {
121 122
        return block_job_error_action(&s->common, s->on_target_error,
                                      false, error);
123 124 125
    }
}

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
                                                  MirrorBlockJob *s,
                                                  uint64_t offset,
                                                  uint64_t bytes)
{
    uint64_t self_start_chunk = offset / s->granularity;
    uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
    uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;

    while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
                         self_start_chunk) < self_end_chunk &&
           s->ret >= 0)
    {
        MirrorOp *op;

        QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
            uint64_t op_start_chunk = op->offset / s->granularity;
            uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
                                                 s->granularity) -
                                    op_start_chunk;

            if (op == self) {
                continue;
            }

            if (ranges_overlap(self_start_chunk, self_nb_chunks,
                               op_start_chunk, op_nb_chunks))
            {
                qemu_co_queue_wait(&op->waiting_requests, NULL);
                break;
            }
        }
    }
}

161
static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
162 163
{
    MirrorBlockJob *s = op->s;
164
    struct iovec *iov;
165
    int64_t chunk_num;
166
    int i, nb_chunks;
167

168
    trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
169 170

    s->in_flight--;
171
    s->bytes_in_flight -= op->bytes;
172 173 174 175 176 177 178
    iov = op->qiov.iov;
    for (i = 0; i < op->qiov.niov; i++) {
        MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
        QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
        s->buf_free_count++;
    }

179 180
    chunk_num = op->offset / s->granularity;
    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
181

182
    bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
183
    QTAILQ_REMOVE(&s->ops_in_flight, op, next);
184 185 186 187
    if (ret >= 0) {
        if (s->cow_bitmap) {
            bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
        }
188
        if (!s->initial_zeroing_ongoing) {
189
            job_progress_update(&s->common.job, op->bytes);
190
        }
191
    }
Z
Zhang Min 已提交
192
    qemu_iovec_destroy(&op->qiov);
193

194 195
    qemu_co_queue_restart_all(&op->waiting_requests);
    g_free(op);
196 197
}

198
static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
199 200
{
    MirrorBlockJob *s = op->s;
201

202 203 204
    if (ret < 0) {
        BlockErrorAction action;

205
        bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
206
        action = mirror_error_action(s, false, -ret);
207
        if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
208 209 210
            s->ret = ret;
        }
    }
211

212 213 214
    mirror_iteration_done(op, ret);
}

215
static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
216 217
{
    MirrorBlockJob *s = op->s;
218

219 220 221
    if (ret < 0) {
        BlockErrorAction action;

222
        bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
223
        action = mirror_error_action(s, true, -ret);
224
        if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
225 226 227 228
            s->ret = ret;
        }

        mirror_iteration_done(op, ret);
229
        return;
230
    }
231 232 233

    ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
    mirror_write_complete(op, ret);
234 235
}

236 237 238 239 240 241 242 243 244 245 246
/* Clip bytes relative to offset to not exceed end-of-file */
static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
                                        int64_t offset,
                                        int64_t bytes)
{
    return MIN(bytes, s->bdev_length - offset);
}

/* Round offset and/or bytes to target cluster if COW is needed, and
 * return the offset of the adjusted tail against original. */
static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
247
                            uint64_t *bytes)
248
{
249 250
    bool need_cow;
    int ret = 0;
251
    int64_t align_offset = *offset;
252
    int64_t align_bytes = *bytes;
253
    int max_bytes = s->granularity * s->max_iov;
254

255 256
    need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
    need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
257 258
                          s->cow_bitmap);
    if (need_cow) {
259 260
        bdrv_round_to_clusters(blk_bs(s->target), *offset, *bytes,
                               &align_offset, &align_bytes);
261
    }
262

263 264
    if (align_bytes > max_bytes) {
        align_bytes = max_bytes;
265
        if (need_cow) {
266
            align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
267
        }
268
    }
269
    /* Clipping may result in align_bytes unaligned to chunk boundary, but
270
     * that doesn't matter because it's already the end of source image. */
271
    align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
272

273 274 275
    ret = align_offset + align_bytes - (*offset + *bytes);
    *offset = align_offset;
    *bytes = align_bytes;
276 277 278 279
    assert(ret >= 0);
    return ret;
}

280
static inline void mirror_wait_for_any_operation(MirrorBlockJob *s, bool active)
281
{
282 283
    MirrorOp *op;

284 285 286 287 288 289
    QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
        /* Do not wait on pseudo ops, because it may in turn wait on
         * some other operation to start, which may in fact be the
         * caller of this function.  Since there is only one pseudo op
         * at any given time, we will always find some real operation
         * to wait on. */
290
        if (!op->is_pseudo_op && op->is_active_write == active) {
291 292 293 294 295
            qemu_co_queue_wait(&op->waiting_requests, NULL);
            return;
        }
    }
    abort();
296 297
}

298 299 300 301 302 303
static inline void mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
{
    /* Only non-active operations use up in-flight slots */
    mirror_wait_for_any_operation(s, false);
}

304 305 306 307 308 309 310
/* Perform a mirror copy operation.
 *
 * *op->bytes_handled is set to the number of bytes copied after and
 * including offset, excluding any bytes copied prior to offset due
 * to alignment.  This will be op->bytes if no alignment is necessary,
 * or (new_end - op->offset) if the tail is rounded up or down due to
 * alignment or buffer limit.
311
 */
312
static void coroutine_fn mirror_co_read(void *opaque)
313
{
314 315
    MirrorOp *op = opaque;
    MirrorBlockJob *s = op->s;
316 317 318
    int nb_chunks;
    uint64_t ret;
    uint64_t max_bytes;
319

320
    max_bytes = s->granularity * s->max_iov;
321

322
    /* We can only handle as much as buf_size at a time. */
323 324 325 326
    op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
    assert(op->bytes);
    assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
    *op->bytes_handled = op->bytes;
327

328
    if (s->cow_bitmap) {
329
        *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
330
    }
331 332 333
    /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
    assert(*op->bytes_handled <= UINT_MAX);
    assert(op->bytes <= s->buf_size);
334
    /* The offset is granularity-aligned because:
335 336
     * 1) Caller passes in aligned values;
     * 2) mirror_cow_align is used only when target cluster is larger. */
337
    assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
338
    /* The range is sector-aligned, since bdrv_getlength() rounds up. */
339 340
    assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
341 342

    while (s->buf_free_count < nb_chunks) {
343
        trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
344
        mirror_wait_for_free_in_flight_slot(s);
345 346
    }

347 348 349 350 351 352
    /* Now make a QEMUIOVector taking enough granularity-sized chunks
     * from s->buf_free.
     */
    qemu_iovec_init(&op->qiov, nb_chunks);
    while (nb_chunks-- > 0) {
        MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
353
        size_t remaining = op->bytes - op->qiov.size;
354

355 356
        QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
        s->buf_free_count--;
357
        qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
358
    }
359

360
    /* Copy the dirty cluster.  */
361
    s->in_flight++;
362 363
    s->bytes_in_flight += op->bytes;
    trace_mirror_one_iteration(s, op->offset, op->bytes);
364

365 366
    ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
                         &op->qiov, 0);
367
    mirror_read_complete(op, ret);
368 369
}

370
static void coroutine_fn mirror_co_zero(void *opaque)
371
{
372 373
    MirrorOp *op = opaque;
    int ret;
374

375 376 377
    op->s->in_flight++;
    op->s->bytes_in_flight += op->bytes;
    *op->bytes_handled = op->bytes;
378

379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
    ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
                               op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
    mirror_write_complete(op, ret);
}

static void coroutine_fn mirror_co_discard(void *opaque)
{
    MirrorOp *op = opaque;
    int ret;

    op->s->in_flight++;
    op->s->bytes_in_flight += op->bytes;
    *op->bytes_handled = op->bytes;

    ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
    mirror_write_complete(op, ret);
395 396
}

397 398 399
static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
                               unsigned bytes, MirrorMethod mirror_method)
{
400 401 402 403 404 405 406 407 408 409 410
    MirrorOp *op;
    Coroutine *co;
    int64_t bytes_handled = -1;

    op = g_new(MirrorOp, 1);
    *op = (MirrorOp){
        .s              = s,
        .offset         = offset,
        .bytes          = bytes,
        .bytes_handled  = &bytes_handled,
    };
411
    qemu_co_queue_init(&op->waiting_requests);
412

413 414
    switch (mirror_method) {
    case MIRROR_METHOD_COPY:
415 416
        co = qemu_coroutine_create(mirror_co_read, op);
        break;
417
    case MIRROR_METHOD_ZERO:
418 419
        co = qemu_coroutine_create(mirror_co_zero, op);
        break;
420
    case MIRROR_METHOD_DISCARD:
421 422
        co = qemu_coroutine_create(mirror_co_discard, op);
        break;
423 424 425
    default:
        abort();
    }
426

427
    QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
428 429 430 431 432 433 434 435 436 437 438 439
    qemu_coroutine_enter(co);
    /* At this point, ownership of op has been moved to the coroutine
     * and the object may already be freed */

    /* Assert that this value has been set */
    assert(bytes_handled >= 0);

    /* Same assertion as in mirror_co_read() (and for mirror_co_read()
     * and mirror_co_discard(), bytes_handled == op->bytes, which
     * is the @bytes parameter given to this function) */
    assert(bytes_handled <= UINT_MAX);
    return bytes_handled;
440 441
}

442 443
static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
{
444
    BlockDriverState *source = s->mirror_top_bs->backing->bs;
445 446 447
    MirrorOp *pseudo_op;
    int64_t offset;
    uint64_t delay_ns = 0, ret = 0;
448 449
    /* At least the first dirty chunk is mirrored in one iteration. */
    int nb_chunks = 1;
450
    bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
451
    int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
452

453
    bdrv_dirty_bitmap_lock(s->dirty_bitmap);
454
    offset = bdrv_dirty_iter_next(s->dbi);
455
    if (offset < 0) {
456
        bdrv_set_dirty_iter(s->dbi, 0);
457
        offset = bdrv_dirty_iter_next(s->dbi);
458
        trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
459
        assert(offset >= 0);
460
    }
461
    bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
462

463
    mirror_wait_on_conflicts(NULL, s, offset, 1);
464

465
    job_pause_point(&s->common.job);
466

467 468
    /* Find the number of consective dirty chunks following the first dirty
     * one, and wait for in flight requests in them. */
469
    bdrv_dirty_bitmap_lock(s->dirty_bitmap);
470
    while (nb_chunks * s->granularity < s->buf_size) {
471
        int64_t next_dirty;
472 473 474
        int64_t next_offset = offset + nb_chunks * s->granularity;
        int64_t next_chunk = next_offset / s->granularity;
        if (next_offset >= s->bdev_length ||
475
            !bdrv_get_dirty_locked(source, s->dirty_bitmap, next_offset)) {
476 477 478
            break;
        }
        if (test_bit(next_chunk, s->in_flight_bitmap)) {
479
            break;
480
        }
481

482
        next_dirty = bdrv_dirty_iter_next(s->dbi);
483
        if (next_dirty > next_offset || next_dirty < 0) {
484
            /* The bitmap iterator's cache is stale, refresh it */
485
            bdrv_set_dirty_iter(s->dbi, next_offset);
486
            next_dirty = bdrv_dirty_iter_next(s->dbi);
487
        }
488
        assert(next_dirty == next_offset);
489
        nb_chunks++;
490 491 492
    }

    /* Clear dirty bits before querying the block status, because
493
     * calling bdrv_block_status_above could yield - if some blocks are
494 495
     * marked dirty in this window, we need to know.
     */
496 497
    bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
                                   nb_chunks * s->granularity);
498 499
    bdrv_dirty_bitmap_unlock(s->dirty_bitmap);

500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
    /* Before claiming an area in the in-flight bitmap, we have to
     * create a MirrorOp for it so that conflicting requests can wait
     * for it.  mirror_perform() will create the real MirrorOps later,
     * for now we just create a pseudo operation that will wake up all
     * conflicting requests once all real operations have been
     * launched. */
    pseudo_op = g_new(MirrorOp, 1);
    *pseudo_op = (MirrorOp){
        .offset         = offset,
        .bytes          = nb_chunks * s->granularity,
        .is_pseudo_op   = true,
    };
    qemu_co_queue_init(&pseudo_op->waiting_requests);
    QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);

515 516
    bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
    while (nb_chunks > 0 && offset < s->bdev_length) {
517
        int ret;
518
        int64_t io_bytes;
519
        int64_t io_bytes_acct;
520
        MirrorMethod mirror_method = MIRROR_METHOD_COPY;
521

522
        assert(!(offset % s->granularity));
523 524 525
        ret = bdrv_block_status_above(source, NULL, offset,
                                      nb_chunks * s->granularity,
                                      &io_bytes, NULL, NULL);
526
        if (ret < 0) {
527
            io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
528
        } else if (ret & BDRV_BLOCK_DATA) {
529
            io_bytes = MIN(io_bytes, max_io_bytes);
530 531
        }

532 533 534
        io_bytes -= io_bytes % s->granularity;
        if (io_bytes < s->granularity) {
            io_bytes = s->granularity;
535
        } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
536
            int64_t target_offset;
537
            int64_t target_bytes;
538 539 540 541
            bdrv_round_to_clusters(blk_bs(s->target), offset, io_bytes,
                                   &target_offset, &target_bytes);
            if (target_offset == offset &&
                target_bytes == io_bytes) {
542 543 544 545 546 547
                mirror_method = ret & BDRV_BLOCK_ZERO ?
                                    MIRROR_METHOD_ZERO :
                                    MIRROR_METHOD_DISCARD;
            }
        }

548
        while (s->in_flight >= MAX_IN_FLIGHT) {
549
            trace_mirror_yield_in_flight(s, offset, s->in_flight);
550
            mirror_wait_for_free_in_flight_slot(s);
551 552
        }

553
        if (s->ret < 0) {
554 555
            ret = 0;
            goto fail;
556 557
        }

558
        io_bytes = mirror_clip_bytes(s, offset, io_bytes);
559 560 561 562 563
        io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
        if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
            io_bytes_acct = 0;
        } else {
            io_bytes_acct = io_bytes;
564
        }
565 566 567
        assert(io_bytes);
        offset += io_bytes;
        nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
568
        delay_ns = block_job_ratelimit_get_delay(&s->common, io_bytes_acct);
569
    }
570 571 572 573 574 575 576 577

    ret = delay_ns;
fail:
    QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
    qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
    g_free(pseudo_op);

    return ret;
578
}
579

580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
static void mirror_free_init(MirrorBlockJob *s)
{
    int granularity = s->granularity;
    size_t buf_size = s->buf_size;
    uint8_t *buf = s->buf;

    assert(s->buf_free_count == 0);
    QSIMPLEQ_INIT(&s->buf_free);
    while (buf_size != 0) {
        MirrorBuffer *cur = (MirrorBuffer *)buf;
        QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
        s->buf_free_count++;
        buf_size -= granularity;
        buf += granularity;
    }
}

597 598 599 600 601
/* This is also used for the .pause callback. There is no matching
 * mirror_resume() because mirror_run() will begin iterating again
 * when the job is resumed.
 */
static void mirror_wait_for_all_io(MirrorBlockJob *s)
602 603
{
    while (s->in_flight > 0) {
604
        mirror_wait_for_free_in_flight_slot(s);
605
    }
606 607
}

608 609 610 611 612 613
/**
 * mirror_exit_common: handle both abort() and prepare() cases.
 * for .prepare, returns 0 on success and -errno on failure.
 * for .abort cases, denoted by abort = true, MUST return 0.
 */
static int mirror_exit_common(Job *job)
614
{
615 616
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
    BlockJob *bjob = &s->common;
617
    MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque;
618
    AioContext *replace_aio_context = NULL;
619
    BlockDriverState *src = s->mirror_top_bs->backing->bs;
620
    BlockDriverState *target_bs = blk_bs(s->target);
621
    BlockDriverState *mirror_top_bs = s->mirror_top_bs;
622
    Error *local_err = NULL;
623 624 625 626 627 628 629
    bool abort = job->ret < 0;
    int ret = 0;

    if (s->prepared) {
        return 0;
    }
    s->prepared = true;
630

631 632
    bdrv_release_dirty_bitmap(src, s->dirty_bitmap);

633 634
    /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
     * before we can call bdrv_drained_end */
635
    bdrv_ref(src);
636
    bdrv_ref(mirror_top_bs);
637 638 639 640
    bdrv_ref(target_bs);

    /* Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
     * inserting target_bs at s->to_replace, where we might not be able to get
641 642 643 644 645 646
     * these permissions.
     *
     * Note that blk_unref() alone doesn't necessarily drop permissions because
     * we might be running nested inside mirror_drain(), which takes an extra
     * reference, so use an explicit blk_set_perm() first. */
    blk_set_perm(s->target, 0, BLK_PERM_ALL, &error_abort);
647 648
    blk_unref(s->target);
    s->target = NULL;
649 650 651 652 653

    /* We don't access the source any more. Dropping any WRITE/RESIZE is
     * required before it could become a backing file of target_bs. */
    bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL,
                            &error_abort);
654
    if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
655 656
        BlockDriverState *backing = s->is_none_mode ? src : s->base;
        if (backing_bs(target_bs) != backing) {
657 658 659
            bdrv_set_backing_hd(target_bs, backing, &local_err);
            if (local_err) {
                error_report_err(local_err);
660
                ret = -EPERM;
661
            }
662 663
        }
    }
664 665 666 667 668 669

    if (s->to_replace) {
        replace_aio_context = bdrv_get_aio_context(s->to_replace);
        aio_context_acquire(replace_aio_context);
    }

670 671
    if (s->should_complete && !abort) {
        BlockDriverState *to_replace = s->to_replace ?: src;
672
        bool ro = bdrv_is_read_only(to_replace);
673

674 675
        if (ro != bdrv_is_read_only(target_bs)) {
            bdrv_reopen_set_read_only(target_bs, ro, NULL);
676
        }
677 678 679

        /* The mirror job has no requests in flight any more, but we need to
         * drain potential other users of the BDS before changing the graph. */
680
        bdrv_drained_begin(target_bs);
681
        bdrv_replace_node(to_replace, target_bs, &local_err);
682
        bdrv_drained_end(target_bs);
683 684
        if (local_err) {
            error_report_err(local_err);
685
            ret = -EPERM;
686
        }
687 688 689 690 691 692 693 694 695 696
    }
    if (s->to_replace) {
        bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
        error_free(s->replace_blocker);
        bdrv_unref(s->to_replace);
    }
    if (replace_aio_context) {
        aio_context_release(replace_aio_context);
    }
    g_free(s->replaces);
697
    bdrv_unref(target_bs);
698 699 700

    /* Remove the mirror filter driver from the graph. Before this, get rid of
     * the blockers on the intermediate nodes so that the resulting state is
701 702
     * valid. Also give up permissions on mirror_top_bs->backing, which might
     * block the removal. */
703
    block_job_remove_all_bdrv(bjob);
704 705
    bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL,
                            &error_abort);
706
    bdrv_replace_node(mirror_top_bs, backing_bs(mirror_top_bs), &error_abort);
707 708

    /* We just changed the BDS the job BB refers to (with either or both of the
709 710
     * bdrv_replace_node() calls), so switch the BB back so the cleanup does
     * the right thing. We don't need any permissions any more now. */
711 712 713
    blk_remove_bs(bjob->blk);
    blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort);
    blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
714

715
    bs_opaque->job = NULL;
716

717
    bdrv_drained_end(src);
718
    bdrv_unref(mirror_top_bs);
719
    bdrv_unref(src);
720

721 722 723 724 725 726 727 728 729 730 731 732
    return ret;
}

static int mirror_prepare(Job *job)
{
    return mirror_exit_common(job);
}

static void mirror_abort(Job *job)
{
    int ret = mirror_exit_common(job);
    assert(ret == 0);
733 734
}

735 736 737 738
static void mirror_throttle(MirrorBlockJob *s)
{
    int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);

739
    if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
740
        s->last_pause_ns = now;
K
Kevin Wolf 已提交
741
        job_sleep_ns(&s->common.job, 0);
742
    } else {
743
        job_pause_point(&s->common.job);
744 745 746
    }
}

747 748
static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
{
749
    int64_t offset;
750
    BlockDriverState *base = s->base;
751
    BlockDriverState *bs = s->mirror_top_bs->backing->bs;
752
    BlockDriverState *target_bs = blk_bs(s->target);
753
    int ret;
754
    int64_t count;
755

756
    if (base == NULL && !bdrv_has_zero_init(target_bs)) {
757
        if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
758
            bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
759 760 761
            return 0;
        }

762
        s->initial_zeroing_ongoing = true;
763 764 765
        for (offset = 0; offset < s->bdev_length; ) {
            int bytes = MIN(s->bdev_length - offset,
                            QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
766 767 768

            mirror_throttle(s);

K
Kevin Wolf 已提交
769
            if (job_is_cancelled(&s->common.job)) {
770
                s->initial_zeroing_ongoing = false;
771 772 773 774
                return 0;
            }

            if (s->in_flight >= MAX_IN_FLIGHT) {
775 776
                trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
                                   s->in_flight);
777
                mirror_wait_for_free_in_flight_slot(s);
778 779 780
                continue;
            }

781
            mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
782
            offset += bytes;
783 784
        }

785
        mirror_wait_for_all_io(s);
786
        s->initial_zeroing_ongoing = false;
787 788
    }

789
    /* First part, loop on the sectors and initialize the dirty bitmap.  */
790
    for (offset = 0; offset < s->bdev_length; ) {
791
        /* Just to make sure we are not exceeding int limit. */
792 793
        int bytes = MIN(s->bdev_length - offset,
                        QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
794 795 796

        mirror_throttle(s);

K
Kevin Wolf 已提交
797
        if (job_is_cancelled(&s->common.job)) {
798 799 800
            return 0;
        }

801
        ret = bdrv_is_allocated_above(bs, base, offset, bytes, &count);
802 803 804 805
        if (ret < 0) {
            return ret;
        }

806
        assert(count);
807
        if (ret == 1) {
808
            bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
809
        }
810
        offset += count;
811 812 813 814
    }
    return 0;
}

815 816 817 818 819 820 821 822 823 824 825 826 827 828
/* Called when going out of the streaming phase to flush the bulk of the
 * data to the medium, or just before completing.
 */
static int mirror_flush(MirrorBlockJob *s)
{
    int ret = blk_flush(s->target);
    if (ret < 0) {
        if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
            s->ret = ret;
        }
    }
    return ret;
}

829
static int coroutine_fn mirror_run(Job *job, Error **errp)
830
{
831
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
832
    BlockDriverState *bs = s->mirror_top_bs->backing->bs;
833
    BlockDriverState *target_bs = blk_bs(s->target);
834
    bool need_drain = true;
835
    int64_t length;
836
    BlockDriverInfo bdi;
837 838
    char backing_filename[2]; /* we only need 2 characters because we are only
                                 checking for a NULL string */
839 840
    int ret = 0;

K
Kevin Wolf 已提交
841
    if (job_is_cancelled(&s->common.job)) {
842 843 844
        goto immediate_exit;
    }

845 846 847
    s->bdev_length = bdrv_getlength(bs);
    if (s->bdev_length < 0) {
        ret = s->bdev_length;
848
        goto immediate_exit;
849 850 851 852 853 854 855 856 857 858 859 860 861 862
    }

    /* Active commit must resize the base image if its size differs from the
     * active layer. */
    if (s->base == blk_bs(s->target)) {
        int64_t base_length;

        base_length = blk_getlength(s->target);
        if (base_length < 0) {
            ret = base_length;
            goto immediate_exit;
        }

        if (s->bdev_length > base_length) {
863 864
            ret = blk_truncate(s->target, s->bdev_length, PREALLOC_MODE_OFF,
                               NULL);
865 866 867 868 869 870 871
            if (ret < 0) {
                goto immediate_exit;
            }
        }
    }

    if (s->bdev_length == 0) {
872 873
        /* Transition to the READY state and wait for complete. */
        job_transition_to_ready(&s->common.job);
874
        s->synced = true;
875
        s->actively_synced = true;
K
Kevin Wolf 已提交
876
        while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
K
Kevin Wolf 已提交
877
            job_yield(&s->common.job);
878
        }
K
Kevin Wolf 已提交
879
        s->common.job.cancelled = false;
880
        goto immediate_exit;
881 882
    }

883
    length = DIV_ROUND_UP(s->bdev_length, s->granularity);
884 885
    s->in_flight_bitmap = bitmap_new(length);

886 887 888 889
    /* If we have no backing file yet in the destination, we cannot let
     * the destination do COW.  Instead, we copy sectors around the
     * dirty data if needed.  We need a bitmap to do that.
     */
890
    bdrv_get_backing_filename(target_bs, backing_filename,
891
                              sizeof(backing_filename));
892
    if (!bdrv_get_info(target_bs, &bdi) && bdi.cluster_size) {
893 894 895
        s->target_cluster_size = bdi.cluster_size;
    } else {
        s->target_cluster_size = BDRV_SECTOR_SIZE;
896
    }
897 898 899
    if (backing_filename[0] && !target_bs->backing &&
        s->granularity < s->target_cluster_size) {
        s->buf_size = MAX(s->buf_size, s->target_cluster_size);
900
        s->cow_bitmap = bitmap_new(length);
901
    }
902
    s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
903

904 905 906 907 908 909
    s->buf = qemu_try_blockalign(bs, s->buf_size);
    if (s->buf == NULL) {
        ret = -ENOMEM;
        goto immediate_exit;
    }

910
    mirror_free_init(s);
911

912
    s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
913
    if (!s->is_none_mode) {
914
        ret = mirror_dirty_init(s);
K
Kevin Wolf 已提交
915
        if (ret < 0 || job_is_cancelled(&s->common.job)) {
916
            goto immediate_exit;
917 918 919
        }
    }

920
    assert(!s->dbi);
921
    s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
922
    for (;;) {
923
        uint64_t delay_ns = 0;
924
        int64_t cnt, delta;
925 926
        bool should_complete;

927 928 929 930 931 932
        /* Do not start passive operations while there are active
         * writes in progress */
        while (s->in_active_write_counter) {
            mirror_wait_for_any_operation(s, true);
        }

933 934 935 936 937
        if (s->ret < 0) {
            ret = s->ret;
            goto immediate_exit;
        }

938
        job_pause_point(&s->common.job);
939

940
        cnt = bdrv_get_dirty_count(s->dirty_bitmap);
941 942 943
        /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
         * the number of bytes currently being processed; together those are
         * the current remaining operation length */
944
        job_progress_set_remaining(&s->common.job, s->bytes_in_flight + cnt);
945 946

        /* Note that even when no rate limit is applied we need to yield
947
         * periodically with no pending I/O so that bdrv_drain_all() returns.
948 949
         * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
         * an error, or when the source is clean, whichever comes first. */
950
        delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
951
        if (delta < BLOCK_JOB_SLICE_TIME &&
952
            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
953
            if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
954
                (cnt == 0 && s->in_flight > 0)) {
955
                trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
956
                mirror_wait_for_free_in_flight_slot(s);
957 958
                continue;
            } else if (cnt != 0) {
959
                delay_ns = mirror_iteration(s);
960 961 962 963
            }
        }

        should_complete = false;
964
        if (s->in_flight == 0 && cnt == 0) {
965
            trace_mirror_before_flush(s);
966 967 968 969
            if (!s->synced) {
                if (mirror_flush(s) < 0) {
                    /* Go check s->ret.  */
                    continue;
970 971 972 973 974 975
                }
                /* We're out of the streaming phase.  From now on, if the job
                 * is cancelled we will actually complete all pending I/O and
                 * report completion.  This way, block-job-cancel will leave
                 * the target in a consistent state.
                 */
976
                job_transition_to_ready(&s->common.job);
977
                s->synced = true;
978 979 980
                if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
                    s->actively_synced = true;
                }
981
            }
982 983

            should_complete = s->should_complete ||
K
Kevin Wolf 已提交
984
                job_is_cancelled(&s->common.job);
985
            cnt = bdrv_get_dirty_count(s->dirty_bitmap);
986 987 988 989 990 991 992 993 994
        }

        if (cnt == 0 && should_complete) {
            /* The dirty bitmap is not updated while operations are pending.
             * If we're about to exit, wait for pending operations before
             * calling bdrv_get_dirty_count(bs), or we may exit while the
             * source has dirty data to copy!
             *
             * Note that I/O can be submitted by the guest while
995 996 997
             * mirror_populate runs, so pause it now.  Before deciding
             * whether to switch to target check one last time if I/O has
             * come in the meanwhile, and if not flush the data to disk.
998
             */
999
            trace_mirror_before_drain(s, cnt);
1000 1001

            bdrv_drained_begin(bs);
1002
            cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1003
            if (cnt > 0 || mirror_flush(s) < 0) {
1004 1005 1006 1007 1008 1009 1010 1011
                bdrv_drained_end(bs);
                continue;
            }

            /* The two disks are in sync.  Exit and report successful
             * completion.
             */
            assert(QLIST_EMPTY(&bs->tracked_requests));
K
Kevin Wolf 已提交
1012
            s->common.job.cancelled = false;
1013 1014
            need_drain = false;
            break;
1015 1016 1017
        }

        ret = 0;
1018 1019

        if (s->synced && !should_complete) {
1020 1021
            delay_ns = (s->in_flight == 0 &&
                        cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0);
1022
        }
1023
        trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
K
Kevin Wolf 已提交
1024
        job_sleep_ns(&s->common.job, delay_ns);
K
Kevin Wolf 已提交
1025
        if (job_is_cancelled(&s->common.job) &&
1026
            (!s->synced || s->common.job.force_cancel))
1027
        {
1028
            break;
1029
        }
1030
        s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1031 1032 1033
    }

immediate_exit:
1034 1035 1036 1037 1038
    if (s->in_flight > 0) {
        /* We get here only if something went wrong.  Either the job failed,
         * or it was cancelled prematurely so that we do not guarantee that
         * the target is a copy of the source.
         */
1039
        assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) &&
K
Kevin Wolf 已提交
1040
               job_is_cancelled(&s->common.job)));
1041
        assert(need_drain);
1042
        mirror_wait_for_all_io(s);
1043 1044 1045
    }

    assert(s->in_flight == 0);
1046
    qemu_vfree(s->buf);
1047
    g_free(s->cow_bitmap);
1048
    g_free(s->in_flight_bitmap);
1049
    bdrv_dirty_iter_free(s->dbi);
1050

1051 1052 1053
    if (need_drain) {
        bdrv_drained_begin(bs);
    }
1054 1055

    return ret;
1056 1057
}

1058
static void mirror_complete(Job *job, Error **errp)
1059
{
1060
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1061
    BlockDriverState *target;
1062 1063

    target = blk_bs(s->target);
1064 1065

    if (!s->synced) {
1066
        error_setg(errp, "The active block job '%s' cannot be completed",
1067
                   job->id);
1068 1069 1070
        return;
    }

1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
    if (s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
        int ret;

        assert(!target->backing);
        ret = bdrv_open_backing_file(target, NULL, "backing", errp);
        if (ret < 0) {
            return;
        }
    }

1081
    /* block all operations on to_replace bs */
1082
    if (s->replaces) {
1083 1084
        AioContext *replace_aio_context;

1085
        s->to_replace = bdrv_find_node(s->replaces);
1086
        if (!s->to_replace) {
1087
            error_setg(errp, "Node name '%s' not found", s->replaces);
1088 1089 1090
            return;
        }

1091 1092 1093
        replace_aio_context = bdrv_get_aio_context(s->to_replace);
        aio_context_acquire(replace_aio_context);

1094 1095 1096 1097
        /* TODO Translate this into permission system. Current definition of
         * GRAPH_MOD would require to request it for the parents; they might
         * not even be BlockDriverStates, however, so a BdrvChild can't address
         * them. May need redefinition of GRAPH_MOD. */
1098 1099 1100 1101
        error_setg(&s->replace_blocker,
                   "block device is in use by block-job-complete");
        bdrv_op_block_all(s->to_replace, s->replace_blocker);
        bdrv_ref(s->to_replace);
1102 1103

        aio_context_release(replace_aio_context);
1104 1105
    }

1106
    s->should_complete = true;
1107
    job_enter(job);
1108 1109
}

1110
static void mirror_pause(Job *job)
1111
{
1112
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1113

1114
    mirror_wait_for_all_io(s);
1115 1116
}

1117 1118 1119 1120 1121 1122
static bool mirror_drained_poll(BlockJob *job)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
    return !!s->in_flight;
}

1123 1124 1125 1126 1127 1128 1129
static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    blk_set_aio_context(s->target, new_context);
}

1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
static void mirror_drain(BlockJob *job)
{
    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);

    /* Need to keep a reference in case blk_drain triggers execution
     * of mirror_complete...
     */
    if (s->target) {
        BlockBackend *target = s->target;
        blk_ref(target);
        blk_drain(target);
        blk_unref(target);
    }
}

1145
static const BlockJobDriver mirror_job_driver = {
1146 1147
    .job_driver = {
        .instance_size          = sizeof(MirrorBlockJob),
1148
        .job_type               = JOB_TYPE_MIRROR,
1149
        .free                   = block_job_free,
1150
        .user_resume            = block_job_user_resume,
K
Kevin Wolf 已提交
1151
        .drain                  = block_job_drain,
1152
        .run                    = mirror_run,
1153 1154
        .prepare                = mirror_prepare,
        .abort                  = mirror_abort,
1155
        .pause                  = mirror_pause,
1156
        .complete               = mirror_complete,
1157
    },
1158
    .drained_poll           = mirror_drained_poll,
1159
    .attached_aio_context   = mirror_attached_aio_context,
1160
    .drain                  = mirror_drain,
1161 1162
};

1163
static const BlockJobDriver commit_active_job_driver = {
1164 1165
    .job_driver = {
        .instance_size          = sizeof(MirrorBlockJob),
1166
        .job_type               = JOB_TYPE_COMMIT,
1167
        .free                   = block_job_free,
1168
        .user_resume            = block_job_user_resume,
K
Kevin Wolf 已提交
1169
        .drain                  = block_job_drain,
1170
        .run                    = mirror_run,
1171 1172
        .prepare                = mirror_prepare,
        .abort                  = mirror_abort,
1173
        .pause                  = mirror_pause,
1174
        .complete               = mirror_complete,
1175
    },
1176
    .drained_poll           = mirror_drained_poll,
1177
    .attached_aio_context   = mirror_attached_aio_context,
1178
    .drain                  = mirror_drain,
1179 1180
};

1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
static void do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
                                 uint64_t offset, uint64_t bytes,
                                 QEMUIOVector *qiov, int flags)
{
    BdrvDirtyBitmapIter *iter;
    QEMUIOVector target_qiov;
    uint64_t dirty_offset;
    int dirty_bytes;

    if (qiov) {
        qemu_iovec_init(&target_qiov, qiov->niov);
    }

    iter = bdrv_dirty_iter_new(job->dirty_bitmap);
    bdrv_set_dirty_iter(iter, offset);

    while (true) {
        bool valid_area;
        int ret;

        bdrv_dirty_bitmap_lock(job->dirty_bitmap);
        valid_area = bdrv_dirty_iter_next_area(iter, offset + bytes,
                                               &dirty_offset, &dirty_bytes);
        if (!valid_area) {
            bdrv_dirty_bitmap_unlock(job->dirty_bitmap);
            break;
        }

        bdrv_reset_dirty_bitmap_locked(job->dirty_bitmap,
                                       dirty_offset, dirty_bytes);
        bdrv_dirty_bitmap_unlock(job->dirty_bitmap);

        job_progress_increase_remaining(&job->common.job, dirty_bytes);

        assert(dirty_offset - offset <= SIZE_MAX);
        if (qiov) {
            qemu_iovec_reset(&target_qiov);
            qemu_iovec_concat(&target_qiov, qiov,
                              dirty_offset - offset, dirty_bytes);
        }

        switch (method) {
        case MIRROR_METHOD_COPY:
            ret = blk_co_pwritev(job->target, dirty_offset, dirty_bytes,
                                 qiov ? &target_qiov : NULL, flags);
            break;

        case MIRROR_METHOD_ZERO:
            assert(!qiov);
            ret = blk_co_pwrite_zeroes(job->target, dirty_offset, dirty_bytes,
                                       flags);
            break;

        case MIRROR_METHOD_DISCARD:
            assert(!qiov);
            ret = blk_co_pdiscard(job->target, dirty_offset, dirty_bytes);
            break;

        default:
            abort();
        }

        if (ret >= 0) {
            job_progress_update(&job->common.job, dirty_bytes);
        } else {
            BlockErrorAction action;

            bdrv_set_dirty_bitmap(job->dirty_bitmap, dirty_offset, dirty_bytes);
            job->actively_synced = false;

            action = mirror_error_action(job, false, -ret);
            if (action == BLOCK_ERROR_ACTION_REPORT) {
                if (!job->ret) {
                    job->ret = ret;
                }
                break;
            }
        }
    }

    bdrv_dirty_iter_free(iter);
    if (qiov) {
        qemu_iovec_destroy(&target_qiov);
    }
}

static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
                                                   uint64_t offset,
                                                   uint64_t bytes)
{
    MirrorOp *op;
    uint64_t start_chunk = offset / s->granularity;
    uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);

    op = g_new(MirrorOp, 1);
    *op = (MirrorOp){
        .s                  = s,
        .offset             = offset,
        .bytes              = bytes,
        .is_active_write    = true,
    };
    qemu_co_queue_init(&op->waiting_requests);
    QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);

    s->in_active_write_counter++;

    mirror_wait_on_conflicts(op, s, offset, bytes);

    bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);

    return op;
}

static void coroutine_fn active_write_settle(MirrorOp *op)
{
    uint64_t start_chunk = op->offset / op->s->granularity;
    uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
                                      op->s->granularity);

    if (!--op->s->in_active_write_counter && op->s->actively_synced) {
        BdrvChild *source = op->s->mirror_top_bs->backing;

        if (QLIST_FIRST(&source->bs->parents) == source &&
            QLIST_NEXT(source, next_parent) == NULL)
        {
            /* Assert that we are back in sync once all active write
             * operations are settled.
             * Note that we can only assert this if the mirror node
             * is the source node's only parent. */
            assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
        }
    }
    bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
    QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
    qemu_co_queue_restart_all(&op->waiting_requests);
    g_free(op);
}

1319 1320 1321 1322 1323 1324
static int coroutine_fn bdrv_mirror_top_preadv(BlockDriverState *bs,
    uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
{
    return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
}

1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs,
    MirrorMethod method, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov,
    int flags)
{
    MirrorOp *op = NULL;
    MirrorBDSOpaque *s = bs->opaque;
    int ret = 0;
    bool copy_to_target;

    copy_to_target = s->job->ret >= 0 &&
                     s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;

    if (copy_to_target) {
        op = active_write_prepare(s->job, offset, bytes);
    }

    switch (method) {
    case MIRROR_METHOD_COPY:
        ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
        break;

    case MIRROR_METHOD_ZERO:
        ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
        break;

    case MIRROR_METHOD_DISCARD:
1351
        ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372
        break;

    default:
        abort();
    }

    if (ret < 0) {
        goto out;
    }

    if (copy_to_target) {
        do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
    }

out:
    if (copy_to_target) {
        active_write_settle(op);
    }
    return ret;
}

1373 1374 1375
static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
    uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
{
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
    MirrorBDSOpaque *s = bs->opaque;
    QEMUIOVector bounce_qiov;
    void *bounce_buf;
    int ret = 0;
    bool copy_to_target;

    copy_to_target = s->job->ret >= 0 &&
                     s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;

    if (copy_to_target) {
        /* The guest might concurrently modify the data to write; but
         * the data on source and destination must match, so we have
         * to use a bounce buffer if we are going to write to the
         * target now. */
        bounce_buf = qemu_blockalign(bs, bytes);
        iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);

        qemu_iovec_init(&bounce_qiov, 1);
        qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
        qiov = &bounce_qiov;
    }

    ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov,
                                   flags);

    if (copy_to_target) {
        qemu_iovec_destroy(&bounce_qiov);
        qemu_vfree(bounce_buf);
    }

    return ret;
1407 1408 1409 1410
}

static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs)
{
1411 1412 1413 1414
    if (bs->backing == NULL) {
        /* we can be here after failed bdrv_append in mirror_start_job */
        return 0;
    }
1415 1416 1417 1418
    return bdrv_co_flush(bs->backing->bs);
}

static int coroutine_fn bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs,
1419
    int64_t offset, int bytes, BdrvRequestFlags flags)
1420
{
1421 1422
    return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL,
                                    flags);
1423 1424 1425
}

static int coroutine_fn bdrv_mirror_top_pdiscard(BlockDriverState *bs,
1426
    int64_t offset, int bytes)
1427
{
1428 1429
    return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes,
                                    NULL, 0);
1430 1431
}

1432 1433
static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs, QDict *opts)
{
1434 1435 1436 1437 1438
    if (bs->backing == NULL) {
        /* we can be here after failed bdrv_attach_child in
         * bdrv_set_backing_hd */
        return;
    }
1439 1440 1441 1442 1443
    bdrv_refresh_filename(bs->backing->bs);
    pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
            bs->backing->bs->filename);
}

1444 1445
static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
                                       const BdrvChildRole *role,
1446
                                       BlockReopenQueue *reopen_queue,
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
                                       uint64_t perm, uint64_t shared,
                                       uint64_t *nperm, uint64_t *nshared)
{
    /* Must be able to forward guest writes to the real image */
    *nperm = 0;
    if (perm & BLK_PERM_WRITE) {
        *nperm |= BLK_PERM_WRITE;
    }

    *nshared = BLK_PERM_ALL;
}

/* Dummy node that provides consistent read to its users without requiring it
 * from its backing file and that allows writes on the backing file chain. */
static BlockDriver bdrv_mirror_top = {
    .format_name                = "mirror_top",
    .bdrv_co_preadv             = bdrv_mirror_top_preadv,
    .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
    .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
    .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
    .bdrv_co_flush              = bdrv_mirror_top_flush,
1468
    .bdrv_co_block_status       = bdrv_co_block_status_from_backing,
1469
    .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1470 1471 1472
    .bdrv_child_perm            = bdrv_mirror_top_child_perm,
};

1473
static void mirror_start_job(const char *job_id, BlockDriverState *bs,
1474 1475 1476
                             int creation_flags, BlockDriverState *target,
                             const char *replaces, int64_t speed,
                             uint32_t granularity, int64_t buf_size,
1477
                             BlockMirrorBackingMode backing_mode,
1478 1479
                             BlockdevOnError on_source_error,
                             BlockdevOnError on_target_error,
1480
                             bool unmap,
1481
                             BlockCompletionFunc *cb,
1482
                             void *opaque,
1483
                             const BlockJobDriver *driver,
1484
                             bool is_none_mode, BlockDriverState *base,
1485
                             bool auto_complete, const char *filter_node_name,
1486
                             bool is_mirror, MirrorCopyMode copy_mode,
1487
                             Error **errp)
1488 1489
{
    MirrorBlockJob *s;
1490
    MirrorBDSOpaque *bs_opaque;
1491 1492 1493
    BlockDriverState *mirror_top_bs;
    bool target_graph_mod;
    bool target_is_backing;
1494
    Error *local_err = NULL;
1495
    int ret;
1496

1497
    if (granularity == 0) {
1498
        granularity = bdrv_get_default_bitmap_granularity(target);
1499 1500
    }

1501
    assert(is_power_of_2(granularity));
1502

W
Wen Congyang 已提交
1503 1504 1505 1506 1507 1508 1509 1510
    if (buf_size < 0) {
        error_setg(errp, "Invalid parameter 'buf-size'");
        return;
    }

    if (buf_size == 0) {
        buf_size = DEFAULT_MIRROR_BUF_SIZE;
    }
1511

1512 1513 1514 1515 1516
    if (bs == target) {
        error_setg(errp, "Can't mirror node into itself");
        return;
    }

1517 1518 1519
    /* In the case of active commit, add dummy driver to provide consistent
     * reads on the top, while disabling it in the intermediate nodes, and make
     * the backing chain writable. */
1520 1521
    mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
                                         BDRV_O_RDWR, errp);
1522 1523 1524
    if (mirror_top_bs == NULL) {
        return;
    }
1525 1526 1527
    if (!filter_node_name) {
        mirror_top_bs->implicit = true;
    }
1528
    mirror_top_bs->total_sectors = bs->total_sectors;
1529 1530
    mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
    mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED;
1531 1532
    bs_opaque = g_new0(MirrorBDSOpaque, 1);
    mirror_top_bs->opaque = bs_opaque;
1533
    bdrv_set_aio_context(mirror_top_bs, bdrv_get_aio_context(bs));
1534 1535

    /* bdrv_append takes ownership of the mirror_top_bs reference, need to keep
1536
     * it alive until block_job_create() succeeds even if bs has no parent. */
1537 1538
    bdrv_ref(mirror_top_bs);
    bdrv_drained_begin(bs);
1539
    bdrv_append(mirror_top_bs, bs, &local_err);
1540 1541
    bdrv_drained_end(bs);

1542 1543 1544 1545 1546 1547
    if (local_err) {
        bdrv_unref(mirror_top_bs);
        error_propagate(errp, local_err);
        return;
    }

1548
    /* Make sure that the source is not resized while the job is running */
1549
    s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1550 1551 1552
                         BLK_PERM_CONSISTENT_READ,
                         BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
                         BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD, speed,
1553
                         creation_flags, cb, opaque, errp);
1554
    if (!s) {
1555
        goto fail;
1556
    }
1557 1558
    bs_opaque->job = s;

1559 1560 1561
    /* The block job now has a reference to this node */
    bdrv_unref(mirror_top_bs);

1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
    s->mirror_top_bs = mirror_top_bs;

    /* No resize for the target either; while the mirror is still running, a
     * consistent read isn't necessarily possible. We could possibly allow
     * writes and graph modifications, though it would likely defeat the
     * purpose of a mirror, so leave them blocked for now.
     *
     * In the case of active commit, things look a bit different, though,
     * because the target is an already populated backing file in active use.
     * We can allow anything except resize there.*/
    target_is_backing = bdrv_chain_contains(bs, target);
    target_graph_mod = (backing_mode != MIRROR_LEAVE_BACKING_CHAIN);
    s->target = blk_new(BLK_PERM_WRITE | BLK_PERM_RESIZE |
                        (target_graph_mod ? BLK_PERM_GRAPH_MOD : 0),
                        BLK_PERM_WRITE_UNCHANGED |
                        (target_is_backing ? BLK_PERM_CONSISTENT_READ |
                                             BLK_PERM_WRITE |
                                             BLK_PERM_GRAPH_MOD : 0));
1580 1581
    ret = blk_insert_bs(s->target, target, errp);
    if (ret < 0) {
1582
        goto fail;
1583
    }
1584 1585 1586 1587 1588 1589 1590 1591 1592
    if (is_mirror) {
        /* XXX: Mirror target could be a NBD server of target QEMU in the case
         * of non-shared block migration. To allow migration completion, we
         * have to allow "inactivate" of the target BB.  When that happens, we
         * know the job is drained, and the vcpus are stopped, so no write
         * operation will be performed. Block layer already has assertions to
         * ensure that. */
        blk_set_force_allow_inactivate(s->target);
    }
1593

1594
    s->replaces = g_strdup(replaces);
1595 1596
    s->on_source_error = on_source_error;
    s->on_target_error = on_target_error;
1597
    s->is_none_mode = is_none_mode;
1598
    s->backing_mode = backing_mode;
1599
    s->copy_mode = copy_mode;
1600
    s->base = base;
1601
    s->granularity = granularity;
W
Wen Congyang 已提交
1602
    s->buf_size = ROUND_UP(buf_size, granularity);
1603
    s->unmap = unmap;
1604 1605 1606
    if (auto_complete) {
        s->should_complete = true;
    }
1607

1608
    s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp);
1609
    if (!s->dirty_bitmap) {
1610
        goto fail;
1611
    }
1612

1613
    /* Required permissions are already taken with blk_new() */
1614 1615 1616
    block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
                       &error_abort);

1617 1618
    /* In commit_active_start() all intermediate nodes disappear, so
     * any jobs in them must be blocked */
1619
    if (target_is_backing) {
1620 1621
        BlockDriverState *iter;
        for (iter = backing_bs(bs); iter != target; iter = backing_bs(iter)) {
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
            /* XXX BLK_PERM_WRITE needs to be allowed so we don't block
             * ourselves at s->base (if writes are blocked for a node, they are
             * also blocked for its backing file). The other options would be a
             * second filter driver above s->base (== target). */
            ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
                                     BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE,
                                     errp);
            if (ret < 0) {
                goto fail;
            }
1632 1633
        }
    }
1634

1635 1636
    QTAILQ_INIT(&s->ops_in_flight);

1637
    trace_mirror_start(bs, s, opaque);
1638
    job_start(&s->common.job);
1639 1640 1641 1642
    return;

fail:
    if (s) {
1643 1644 1645 1646
        /* Make sure this BDS does not go away until we have completed the graph
         * changes below */
        bdrv_ref(mirror_top_bs);

1647 1648
        g_free(s->replaces);
        blk_unref(s->target);
1649
        bs_opaque->job = NULL;
1650
        job_early_fail(&s->common.job);
1651 1652
    }

1653 1654
    bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL,
                            &error_abort);
1655
    bdrv_replace_node(mirror_top_bs, backing_bs(mirror_top_bs), &error_abort);
1656 1657

    bdrv_unref(mirror_top_bs);
1658
}
1659

1660 1661
void mirror_start(const char *job_id, BlockDriverState *bs,
                  BlockDriverState *target, const char *replaces,
1662 1663
                  int creation_flags, int64_t speed,
                  uint32_t granularity, int64_t buf_size,
1664 1665
                  MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
                  BlockdevOnError on_source_error,
1666
                  BlockdevOnError on_target_error,
1667 1668
                  bool unmap, const char *filter_node_name,
                  MirrorCopyMode copy_mode, Error **errp)
1669 1670 1671 1672
{
    bool is_none_mode;
    BlockDriverState *base;

1673 1674
    if (mode == MIRROR_SYNC_MODE_INCREMENTAL) {
        error_setg(errp, "Sync mode 'incremental' not supported");
1675 1676
        return;
    }
1677
    is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
1678
    base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL;
1679
    mirror_start_job(job_id, bs, creation_flags, target, replaces,
1680
                     speed, granularity, buf_size, backing_mode,
1681
                     on_source_error, on_target_error, unmap, NULL, NULL,
1682
                     &mirror_job_driver, is_none_mode, base, false,
1683
                     filter_node_name, true, copy_mode, errp);
1684 1685
}

1686
void commit_active_start(const char *job_id, BlockDriverState *bs,
1687 1688
                         BlockDriverState *base, int creation_flags,
                         int64_t speed, BlockdevOnError on_error,
1689
                         const char *filter_node_name,
1690 1691
                         BlockCompletionFunc *cb, void *opaque,
                         bool auto_complete, Error **errp)
1692
{
1693
    bool base_read_only;
1694
    Error *local_err = NULL;
1695

1696
    base_read_only = bdrv_is_read_only(base);
1697

1698 1699 1700 1701
    if (base_read_only) {
        if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
            return;
        }
1702
    }
1703

1704
    mirror_start_job(job_id, bs, creation_flags, base, NULL, speed, 0, 0,
1705
                     MIRROR_LEAVE_BACKING_CHAIN,
1706
                     on_error, on_error, true, cb, opaque,
1707
                     &commit_active_job_driver, false, base, auto_complete,
1708 1709
                     filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
                     &local_err);
1710
    if (local_err) {
1711
        error_propagate(errp, local_err);
1712 1713 1714 1715 1716 1717 1718 1719
        goto error_restore_flags;
    }

    return;

error_restore_flags:
    /* ignore error and errp for bdrv_reopen, because we want to propagate
     * the original error */
1720 1721 1722
    if (base_read_only) {
        bdrv_reopen_set_read_only(base, true, NULL);
    }
1723
    return;
1724
}
反馈
建议
客服 返回
顶部