buffered_file.c 6.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * QEMU buffered QEMUFile
 *
 * Copyright IBM, Corp. 2008
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
12 13
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
14 15 16 17
 */

#include "qemu-common.h"
#include "hw/hw.h"
18
#include "qemu/timer.h"
19 20 21 22 23 24
#include "buffered_file.h"

//#define DEBUG_BUFFERED_FILE

typedef struct QEMUFileBuffered
{
25
    MigrationState *migration_state;
26 27 28 29 30 31 32 33 34 35 36
    QEMUFile *file;
    int freeze_output;
    size_t bytes_xfer;
    size_t xfer_limit;
    uint8_t *buffer;
    size_t buffer_size;
    size_t buffer_capacity;
    QEMUTimer *timer;
} QEMUFileBuffered;

#ifdef DEBUG_BUFFERED_FILE
M
malc 已提交
37
#define DPRINTF(fmt, ...) \
38 39
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
#else
M
malc 已提交
40
#define DPRINTF(fmt, ...) \
41 42 43 44 45 46 47
    do { } while (0)
#endif

static void buffered_append(QEMUFileBuffered *s,
                            const uint8_t *buf, size_t size)
{
    if (size > (s->buffer_capacity - s->buffer_size)) {
M
malc 已提交
48
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
49 50 51 52
                s->buffer_capacity, size + 1024);

        s->buffer_capacity += size + 1024;

53
        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
54 55 56 57 58 59
    }

    memcpy(s->buffer + s->buffer_size, buf, size);
    s->buffer_size += size;
}

60
static ssize_t buffered_flush(QEMUFileBuffered *s)
61 62
{
    size_t offset = 0;
63
    ssize_t ret = 0;
64

M
malc 已提交
65
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
66

P
Paolo Bonzini 已提交
67
    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
68

69 70
        ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
                                    s->buffer_size - offset);
71
        if (ret == -EAGAIN) {
M
malc 已提交
72
            DPRINTF("backend not ready, freezing\n");
73
            ret = 0;
74 75 76 77 78
            s->freeze_output = 1;
            break;
        }

        if (ret <= 0) {
M
malc 已提交
79
            DPRINTF("error flushing data, %zd\n", ret);
80 81
            break;
        } else {
M
malc 已提交
82
            DPRINTF("flushed %zd byte(s)\n", ret);
83
            offset += ret;
P
Paolo Bonzini 已提交
84
            s->bytes_xfer += ret;
85 86 87
        }
    }

M
malc 已提交
88
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
89 90
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
    s->buffer_size -= offset;
91 92 93 94 95

    if (ret < 0) {
        return ret;
    }
    return offset;
96 97 98 99 100
}

static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{
    QEMUFileBuffered *s = opaque;
101
    ssize_t error;
102

M
malc 已提交
103
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
104

105 106 107 108
    error = qemu_file_get_error(s->file);
    if (error) {
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
        return error;
109 110
    }

M
malc 已提交
111
    DPRINTF("unfreezing output\n");
112 113
    s->freeze_output = 0;

P
Paolo Bonzini 已提交
114
    if (size > 0) {
M
malc 已提交
115
        DPRINTF("buffering %d bytes\n", size - offset);
P
Paolo Bonzini 已提交
116
        buffered_append(s, buf, size);
117 118
    }

119 120 121 122 123
    error = buffered_flush(s);
    if (error < 0) {
        DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
        return error;
    }
P
Paolo Bonzini 已提交
124

125 126
    if (pos == 0 && size == 0) {
        DPRINTF("file is ready\n");
P
Paolo Bonzini 已提交
127
        if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) {
128
            DPRINTF("notifying client\n");
129
            migrate_fd_put_ready(s->migration_state);
130 131 132
        }
    }

P
Paolo Bonzini 已提交
133
    return size;
134 135 136 137 138
}

static int buffered_close(void *opaque)
{
    QEMUFileBuffered *s = opaque;
139 140
    ssize_t ret = 0;
    int ret2;
141

M
malc 已提交
142
    DPRINTF("closing\n");
143

P
Paolo Bonzini 已提交
144
    s->xfer_limit = INT_MAX;
145
    while (!qemu_file_get_error(s->file) && s->buffer_size) {
146 147 148 149
        ret = buffered_flush(s);
        if (ret < 0) {
            break;
        }
150 151 152 153 154 155
        if (s->freeze_output) {
            ret = migrate_fd_wait_for_unfreeze(s->migration_state);
            if (ret < 0) {
                break;
            }
        }
156 157
    }

158 159 160 161
    ret2 = migrate_fd_close(s->migration_state);
    if (ret >= 0) {
        ret = ret2;
    }
162 163
    qemu_del_timer(s->timer);
    qemu_free_timer(s->timer);
164 165
    g_free(s->buffer);
    g_free(s);
166 167 168 169

    return ret;
}

170 171 172 173
/*
 * The meaning of the return values is:
 *   0: We can continue sending
 *   1: Time to stop
174
 *   negative: There has been an error
175
 */
P
Paolo Bonzini 已提交
176 177 178 179 180 181 182
static int buffered_get_fd(void *opaque)
{
    QEMUFileBuffered *s = opaque;

    return qemu_get_fd(s->file);
}

183 184 185
static int buffered_rate_limit(void *opaque)
{
    QEMUFileBuffered *s = opaque;
186
    int ret;
187

188 189 190
    ret = qemu_file_get_error(s->file);
    if (ret) {
        return ret;
191
    }
192 193 194 195 196 197 198 199 200
    if (s->freeze_output)
        return 1;

    if (s->bytes_xfer > s->xfer_limit)
        return 1;

    return 0;
}

M
Michael S. Tsirkin 已提交
201
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
202 203
{
    QEMUFileBuffered *s = opaque;
204
    if (qemu_file_get_error(s->file)) {
205
        goto out;
206
    }
M
Michael S. Tsirkin 已提交
207 208 209 210
    if (new_rate > SIZE_MAX) {
        new_rate = SIZE_MAX;
    }

211 212 213 214 215 216
    s->xfer_limit = new_rate / 10;
    
out:
    return s->xfer_limit;
}

M
Michael S. Tsirkin 已提交
217
static int64_t buffered_get_rate_limit(void *opaque)
L
lirans@il.ibm.com 已提交
218 219 220 221 222 223
{
    QEMUFileBuffered *s = opaque;
  
    return s->xfer_limit;
}

224 225 226 227
static void buffered_rate_tick(void *opaque)
{
    QEMUFileBuffered *s = opaque;

228
    if (qemu_file_get_error(s->file)) {
229
        buffered_close(s);
230
        return;
231
    }
232

233
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
234 235 236 237 238 239

    if (s->freeze_output)
        return;

    s->bytes_xfer = 0;

P
Paolo Bonzini 已提交
240
    buffered_put_buffer(s, NULL, 0, 0);
241 242
}

243
static const QEMUFileOps buffered_file_ops = {
P
Paolo Bonzini 已提交
244
    .get_fd =         buffered_get_fd,
245 246 247 248 249 250 251
    .put_buffer =     buffered_put_buffer,
    .close =          buffered_close,
    .rate_limit =     buffered_rate_limit,
    .get_rate_limit = buffered_get_rate_limit,
    .set_rate_limit = buffered_set_rate_limit,
};

252
QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
253 254 255
{
    QEMUFileBuffered *s;

256
    s = g_malloc0(sizeof(*s));
257

258
    s->migration_state = migration_state;
259
    s->xfer_limit = migration_state->bandwidth_limit / 10;
260

261
    s->file = qemu_fopen_ops(s, &buffered_file_ops);
262

263
    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
264

265
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
266 267 268

    return s->file;
}