buffered_file.c 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * QEMU buffered QEMUFile
 *
 * Copyright IBM, Corp. 2008
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
12 13
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
14 15 16 17
 */

#include "qemu-common.h"
#include "hw/hw.h"
18
#include "qemu/timer.h"
19
#include "buffered_file.h"
20
#include "qemu/thread.h"
21 22 23 24 25

//#define DEBUG_BUFFERED_FILE

typedef struct QEMUFileBuffered
{
26
    MigrationState *migration_state;
27 28 29 30 31 32 33
    QEMUFile *file;
    int freeze_output;
    size_t bytes_xfer;
    size_t xfer_limit;
    uint8_t *buffer;
    size_t buffer_size;
    size_t buffer_capacity;
34 35
    QemuThread thread;
    bool complete;
36 37 38
} QEMUFileBuffered;

#ifdef DEBUG_BUFFERED_FILE
M
malc 已提交
39
#define DPRINTF(fmt, ...) \
40 41
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
#else
M
malc 已提交
42
#define DPRINTF(fmt, ...) \
43 44 45 46 47 48 49
    do { } while (0)
#endif

static void buffered_append(QEMUFileBuffered *s,
                            const uint8_t *buf, size_t size)
{
    if (size > (s->buffer_capacity - s->buffer_size)) {
M
malc 已提交
50
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
51 52 53 54
                s->buffer_capacity, size + 1024);

        s->buffer_capacity += size + 1024;

55
        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
56 57 58 59 60 61
    }

    memcpy(s->buffer + s->buffer_size, buf, size);
    s->buffer_size += size;
}

62
static ssize_t buffered_flush(QEMUFileBuffered *s)
63 64
{
    size_t offset = 0;
65
    ssize_t ret = 0;
66

M
malc 已提交
67
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
68

P
Paolo Bonzini 已提交
69
    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
70
        size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
71
        ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
72
                                    to_send);
73
        if (ret == -EAGAIN) {
M
malc 已提交
74
            DPRINTF("backend not ready, freezing\n");
75
            ret = 0;
76 77 78 79 80
            s->freeze_output = 1;
            break;
        }

        if (ret <= 0) {
M
malc 已提交
81
            DPRINTF("error flushing data, %zd\n", ret);
82 83
            break;
        } else {
M
malc 已提交
84
            DPRINTF("flushed %zd byte(s)\n", ret);
85
            offset += ret;
P
Paolo Bonzini 已提交
86
            s->bytes_xfer += ret;
87 88 89
        }
    }

M
malc 已提交
90
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
91 92
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
    s->buffer_size -= offset;
93 94 95 96 97

    if (ret < 0) {
        return ret;
    }
    return offset;
98 99 100 101 102
}

static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{
    QEMUFileBuffered *s = opaque;
103
    ssize_t error;
104

M
malc 已提交
105
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
106

107 108 109 110
    error = qemu_file_get_error(s->file);
    if (error) {
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
        return error;
111 112
    }

M
malc 已提交
113
    DPRINTF("unfreezing output\n");
114 115
    s->freeze_output = 0;

P
Paolo Bonzini 已提交
116
    if (size > 0) {
M
malc 已提交
117
        DPRINTF("buffering %d bytes\n", size - offset);
P
Paolo Bonzini 已提交
118
        buffered_append(s, buf, size);
119 120
    }

121 122 123 124 125
    error = buffered_flush(s);
    if (error < 0) {
        DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
        return error;
    }
P
Paolo Bonzini 已提交
126

127 128
    if (pos == 0 && size == 0) {
        DPRINTF("file is ready\n");
P
Paolo Bonzini 已提交
129
        if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) {
130
            DPRINTF("notifying client\n");
131
            migrate_fd_put_ready(s->migration_state);
132 133 134
        }
    }

P
Paolo Bonzini 已提交
135
    return size;
136 137 138 139 140
}

static int buffered_close(void *opaque)
{
    QEMUFileBuffered *s = opaque;
141 142
    ssize_t ret = 0;
    int ret2;
143

M
malc 已提交
144
    DPRINTF("closing\n");
145

P
Paolo Bonzini 已提交
146
    s->xfer_limit = INT_MAX;
147
    while (!qemu_file_get_error(s->file) && s->buffer_size) {
148 149 150 151
        ret = buffered_flush(s);
        if (ret < 0) {
            break;
        }
152 153 154 155 156 157
        if (s->freeze_output) {
            ret = migrate_fd_wait_for_unfreeze(s->migration_state);
            if (ret < 0) {
                break;
            }
        }
158 159
    }

160 161 162 163
    ret2 = migrate_fd_close(s->migration_state);
    if (ret >= 0) {
        ret = ret2;
    }
164 165
    ret = migrate_fd_close(s->migration_state);
    s->complete = true;
166 167 168
    return ret;
}

169 170 171 172
/*
 * The meaning of the return values is:
 *   0: We can continue sending
 *   1: Time to stop
173
 *   negative: There has been an error
174
 */
P
Paolo Bonzini 已提交
175 176 177 178 179 180 181
static int buffered_get_fd(void *opaque)
{
    QEMUFileBuffered *s = opaque;

    return qemu_get_fd(s->file);
}

182 183 184
static int buffered_rate_limit(void *opaque)
{
    QEMUFileBuffered *s = opaque;
185
    int ret;
186

187 188 189
    ret = qemu_file_get_error(s->file);
    if (ret) {
        return ret;
190
    }
191 192 193 194 195 196 197 198 199
    if (s->freeze_output)
        return 1;

    if (s->bytes_xfer > s->xfer_limit)
        return 1;

    return 0;
}

M
Michael S. Tsirkin 已提交
200
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
201 202
{
    QEMUFileBuffered *s = opaque;
203
    if (qemu_file_get_error(s->file)) {
204
        goto out;
205
    }
M
Michael S. Tsirkin 已提交
206 207 208 209
    if (new_rate > SIZE_MAX) {
        new_rate = SIZE_MAX;
    }

210 211 212 213 214 215
    s->xfer_limit = new_rate / 10;
    
out:
    return s->xfer_limit;
}

M
Michael S. Tsirkin 已提交
216
static int64_t buffered_get_rate_limit(void *opaque)
L
lirans@il.ibm.com 已提交
217 218 219 220 221 222
{
    QEMUFileBuffered *s = opaque;
  
    return s->xfer_limit;
}

223 224 225 226
/* 10ms  xfer_limit is the limit that we should write each 10ms */
#define BUFFER_DELAY 100

static void *buffered_file_thread(void *opaque)
227 228
{
    QEMUFileBuffered *s = opaque;
229
    int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;
230

231 232
    while (true) {
        int64_t current_time = qemu_get_clock_ms(rt_clock);
233

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
        if (s->complete) {
            break;
        }
        if (s->freeze_output) {
            continue;
        }
        if (current_time >= expire_time) {
            s->bytes_xfer = 0;
            expire_time = current_time + BUFFER_DELAY;
        }
        if (s->bytes_xfer >= s->xfer_limit) {
            /* usleep expects microseconds */
            g_usleep((expire_time - current_time)*1000);
        }
        qemu_mutex_lock_iothread();
        buffered_put_buffer(s, NULL, 0, 0);
        qemu_mutex_unlock_iothread();
    }
    g_free(s->buffer);
    g_free(s);
    return NULL;
255 256
}

257
static const QEMUFileOps buffered_file_ops = {
P
Paolo Bonzini 已提交
258
    .get_fd =         buffered_get_fd,
259 260 261 262 263 264 265
    .put_buffer =     buffered_put_buffer,
    .close =          buffered_close,
    .rate_limit =     buffered_rate_limit,
    .get_rate_limit = buffered_get_rate_limit,
    .set_rate_limit = buffered_set_rate_limit,
};

266
QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
267 268 269
{
    QEMUFileBuffered *s;

270
    s = g_malloc0(sizeof(*s));
271

272
    s->migration_state = migration_state;
273
    s->xfer_limit = migration_state->bandwidth_limit / 10;
274
    s->complete = false;
275

276
    s->file = qemu_fopen_ops(s, &buffered_file_ops);
277

278 279
    qemu_thread_create(&s->thread, buffered_file_thread, s,
                       QEMU_THREAD_DETACHED);
280 281 282

    return s->file;
}