stream.c 7.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * Image streaming
 *
 * Copyright IBM, Corp. 2011
 *
 * Authors:
 *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
 *
 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
 * See the COPYING.LIB file in the top-level directory.
 *
 */

#include "trace.h"
#include "block_int.h"

enum {
    /*
     * Size of data buffer for populating the image file.  This should be large
     * enough to process multiple clusters in a single call, so that populating
     * contiguous regions of the image is efficient.
     */
    STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
};

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#define SLICE_TIME 100000000ULL /* ns */

typedef struct {
    int64_t next_slice_time;
    uint64_t slice_quota;
    uint64_t dispatched;
} RateLimit;

static int64_t ratelimit_calculate_delay(RateLimit *limit, uint64_t n)
{
    int64_t now = qemu_get_clock_ns(rt_clock);

    if (limit->next_slice_time < now) {
        limit->next_slice_time = now + SLICE_TIME;
        limit->dispatched = 0;
    }
42
    if (limit->dispatched == 0 || limit->dispatched + n <= limit->slice_quota) {
43
        limit->dispatched += n;
44 45 46 47
        return 0;
    } else {
        limit->dispatched = n;
        return limit->next_slice_time - now;
48 49 50 51 52 53 54 55
    }
}

static void ratelimit_set_speed(RateLimit *limit, uint64_t speed)
{
    limit->slice_quota = speed / (1000000000ULL / SLICE_TIME);
}

56 57
typedef struct StreamBlockJob {
    BlockJob common;
58
    RateLimit limit;
59
    BlockDriverState *base;
60
    char backing_file_id[1024];
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
} StreamBlockJob;

static int coroutine_fn stream_populate(BlockDriverState *bs,
                                        int64_t sector_num, int nb_sectors,
                                        void *buf)
{
    struct iovec iov = {
        .iov_base = buf,
        .iov_len  = nb_sectors * BDRV_SECTOR_SIZE,
    };
    QEMUIOVector qiov;

    qemu_iovec_init_external(&qiov, &iov, 1);

    /* Copy-on-read the unallocated clusters */
    return bdrv_co_copy_on_readv(bs, sector_num, nb_sectors, &qiov);
}

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
static void close_unused_images(BlockDriverState *top, BlockDriverState *base,
                                const char *base_id)
{
    BlockDriverState *intermediate;
    intermediate = top->backing_hd;

    while (intermediate) {
        BlockDriverState *unused;

        /* reached base */
        if (intermediate == base) {
            break;
        }

        unused = intermediate;
        intermediate = intermediate->backing_hd;
        unused->backing_hd = NULL;
        bdrv_delete(unused);
    }
    top->backing_hd = base;
}

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
/*
 * Given an image chain: [BASE] -> [INTER1] -> [INTER2] -> [TOP]
 *
 * Return true if the given sector is allocated in top.
 * Return false if the given sector is allocated in intermediate images.
 * Return true otherwise.
 *
 * 'pnum' is set to the number of sectors (including and immediately following
 *  the specified sector) that are known to be in the same
 *  allocated/unallocated state.
 *
 */
static int coroutine_fn is_allocated_base(BlockDriverState *top,
                                          BlockDriverState *base,
                                          int64_t sector_num,
                                          int nb_sectors, int *pnum)
{
    BlockDriverState *intermediate;
    int ret, n;

    ret = bdrv_co_is_allocated(top, sector_num, nb_sectors, &n);
    if (ret) {
        *pnum = n;
        return ret;
    }

    /*
     * Is the unallocated chunk [sector_num, n] also
     * unallocated between base and top?
     */
    intermediate = top->backing_hd;

133
    while (intermediate != base) {
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
        int pnum_inter;

        ret = bdrv_co_is_allocated(intermediate, sector_num, nb_sectors,
                                   &pnum_inter);
        if (ret < 0) {
            return ret;
        } else if (ret) {
            *pnum = pnum_inter;
            return 0;
        }

        /*
         * [sector_num, nb_sectors] is unallocated on top but intermediate
         * might have
         *
         * [sector_num+x, nr_sectors] allocated.
         */
        if (n > pnum_inter) {
            n = pnum_inter;
        }

        intermediate = intermediate->backing_hd;
    }

158
    *pnum = n;
159 160 161
    return 1;
}

162 163 164 165
static void coroutine_fn stream_run(void *opaque)
{
    StreamBlockJob *s = opaque;
    BlockDriverState *bs = s->common.bs;
166
    BlockDriverState *base = s->base;
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    int64_t sector_num, end;
    int ret = 0;
    int n;
    void *buf;

    s->common.len = bdrv_getlength(bs);
    if (s->common.len < 0) {
        block_job_complete(&s->common, s->common.len);
        return;
    }

    end = s->common.len >> BDRV_SECTOR_BITS;
    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);

    /* Turn on copy-on-read for the whole block device so that guest read
     * requests help us make progress.  Only do this when copying the entire
     * backing chain since the copy-on-read operation does not take base into
     * account.
     */
    if (!base) {
        bdrv_enable_copy_on_read(bs);
    }

    for (sector_num = 0; sector_num < end; sector_num += n) {
P
Paolo Bonzini 已提交
191 192 193 194 195 196 197
        uint64_t delay_ns = 0;

wait:
        /* Note that even when no rate limit is applied we need to yield
         * with no pending I/O here so that qemu_aio_flush() returns.
         */
        block_job_sleep_ns(&s->common, rt_clock, delay_ns);
198 199 200 201
        if (block_job_is_cancelled(&s->common)) {
            break;
        }

202 203
        ret = is_allocated_base(bs, base, sector_num,
                                STREAM_BUFFER_SIZE / BDRV_SECTOR_SIZE, &n);
204 205
        trace_stream_one_iteration(s, sector_num, n, ret);
        if (ret == 0) {
206
            if (s->common.speed) {
P
Paolo Bonzini 已提交
207
                delay_ns = ratelimit_calculate_delay(&s->limit, n);
208
                if (delay_ns > 0) {
P
Paolo Bonzini 已提交
209
                    goto wait;
210 211
                }
            }
212 213 214 215 216
            ret = stream_populate(bs, sector_num, n, buf);
        }
        if (ret < 0) {
            break;
        }
217
        ret = 0;
218 219 220 221 222 223 224 225 226

        /* Publish progress */
        s->common.offset += n * BDRV_SECTOR_SIZE;
    }

    if (!base) {
        bdrv_disable_copy_on_read(bs);
    }

227
    if (!block_job_is_cancelled(&s->common) && sector_num == end && ret == 0) {
228
        const char *base_id = NULL, *base_fmt = NULL;
229 230
        if (base) {
            base_id = s->backing_file_id;
231 232 233
            if (base->drv) {
                base_fmt = base->drv->format_name;
            }
234
        }
235
        ret = bdrv_change_backing_file(bs, base_id, base_fmt);
236
        close_unused_images(bs, base, base_id);
237 238 239 240 241 242
    }

    qemu_vfree(buf);
    block_job_complete(&s->common, ret);
}

243
static void stream_set_speed(BlockJob *job, int64_t speed, Error **errp)
244 245 246
{
    StreamBlockJob *s = container_of(job, StreamBlockJob, common);

247 248
    if (speed < 0) {
        error_set(errp, QERR_INVALID_PARAMETER, "speed");
249
        return;
250
    }
251
    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE);
252 253
}

254 255 256
static BlockJobType stream_job_type = {
    .instance_size = sizeof(StreamBlockJob),
    .job_type      = "stream",
257
    .set_speed     = stream_set_speed,
258 259
};

260
void stream_start(BlockDriverState *bs, BlockDriverState *base,
261 262
                  const char *base_id, int64_t speed,
                  BlockDriverCompletionFunc *cb,
263
                  void *opaque, Error **errp)
264 265 266
{
    StreamBlockJob *s;

267
    s = block_job_create(&stream_job_type, bs, speed, cb, opaque, errp);
268
    if (!s) {
269
        return;
270 271 272
    }

    s->base = base;
273 274 275
    if (base_id) {
        pstrcpy(s->backing_file_id, sizeof(s->backing_file_id), base_id);
    }
276

277 278 279
    s->common.co = qemu_coroutine_create(stream_run);
    trace_stream_start(bs, base, s, s->common.co, opaque);
    qemu_coroutine_enter(s->common.co, s);
280
}