diff --git a/MAINTAINERS b/MAINTAINERS index efd3f3875fd77375e4b42a167e2e39d8aee60038..c45e886d886e335c6337fe3d789a654a4aa42ebf 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1975,6 +1975,7 @@ F: ioport.c F: include/exec/memop.h F: include/exec/memory.h F: include/exec/ram_addr.h +F: include/exec/ramblock.h F: memory.c F: include/exec/memory-internal.h F: exec.c diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h index 5adebb0bc7ccb4b47f43aff56973c4278ea46160..5e59a3d8d7d97ba13f4b6d698885b94d6f33c1a7 100644 --- a/include/exec/ram_addr.h +++ b/include/exec/ram_addr.h @@ -24,45 +24,7 @@ #include "hw/xen/xen.h" #include "sysemu/tcg.h" #include "exec/ramlist.h" - -struct RAMBlock { - struct rcu_head rcu; - struct MemoryRegion *mr; - uint8_t *host; - uint8_t *colo_cache; /* For colo, VM's ram cache */ - ram_addr_t offset; - ram_addr_t used_length; - ram_addr_t max_length; - void (*resized)(const char*, uint64_t length, void *host); - uint32_t flags; - /* Protected by iothread lock. */ - char idstr[256]; - /* RCU-enabled, writes protected by the ramlist lock */ - QLIST_ENTRY(RAMBlock) next; - QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers; - int fd; - size_t page_size; - /* dirty bitmap used during migration */ - unsigned long *bmap; - /* bitmap of already received pages in postcopy */ - unsigned long *receivedmap; - - /* - * bitmap to track already cleared dirty bitmap. When the bit is - * set, it means the corresponding memory chunk needs a log-clear. - * Set this up to non-NULL to enable the capability to postpone - * and split clearing of dirty bitmap on the remote node (e.g., - * KVM). The bitmap will be set only when doing global sync. - * - * NOTE: this bitmap is different comparing to the other bitmaps - * in that one bit can represent multiple guest pages (which is - * decided by the `clear_bmap_shift' variable below). On - * destination side, this should always be NULL, and the variable - * `clear_bmap_shift' is meaningless. - */ - unsigned long *clear_bmap; - uint8_t clear_bmap_shift; -}; +#include "exec/ramblock.h" /** * clear_bmap_size: calculate clear bitmap size diff --git a/include/exec/ramblock.h b/include/exec/ramblock.h new file mode 100644 index 0000000000000000000000000000000000000000..07d50864d862686298efa4b11066676895d341ee --- /dev/null +++ b/include/exec/ramblock.h @@ -0,0 +1,64 @@ +/* + * Declarations for cpu physical memory functions + * + * Copyright 2011 Red Hat, Inc. and/or its affiliates + * + * Authors: + * Avi Kivity + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + * + */ + +/* + * This header is for use by exec.c and memory.c ONLY. Do not include it. + * The functions declared here will be removed soon. + */ + +#ifndef QEMU_EXEC_RAMBLOCK_H +#define QEMU_EXEC_RAMBLOCK_H + +#ifndef CONFIG_USER_ONLY +#include "cpu-common.h" + +struct RAMBlock { + struct rcu_head rcu; + struct MemoryRegion *mr; + uint8_t *host; + uint8_t *colo_cache; /* For colo, VM's ram cache */ + ram_addr_t offset; + ram_addr_t used_length; + ram_addr_t max_length; + void (*resized)(const char*, uint64_t length, void *host); + uint32_t flags; + /* Protected by iothread lock. */ + char idstr[256]; + /* RCU-enabled, writes protected by the ramlist lock */ + QLIST_ENTRY(RAMBlock) next; + QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers; + int fd; + size_t page_size; + /* dirty bitmap used during migration */ + unsigned long *bmap; + /* bitmap of already received pages in postcopy */ + unsigned long *receivedmap; + + /* + * bitmap to track already cleared dirty bitmap. When the bit is + * set, it means the corresponding memory chunk needs a log-clear. + * Set this up to non-NULL to enable the capability to postpone + * and split clearing of dirty bitmap on the remote node (e.g., + * KVM). The bitmap will be set only when doing global sync. + * + * NOTE: this bitmap is different comparing to the other bitmaps + * in that one bit can represent multiple guest pages (which is + * decided by the `clear_bmap_shift' variable below). On + * destination side, this should always be NULL, and the variable + * `clear_bmap_shift' is meaningless. + */ + unsigned long *clear_bmap; + uint8_t clear_bmap_shift; +}; +#endif +#endif diff --git a/include/qemu/queue.h b/include/qemu/queue.h index 4d4554a7cef82f34e052ff93df5fb3f3c0fc610c..19425f973f3aaad5f40d1a782c57533e789b2ea5 100644 --- a/include/qemu/queue.h +++ b/include/qemu/queue.h @@ -515,6 +515,12 @@ union { \ (elm); \ (elm) = *QLIST_RAW_NEXT(elm, entry)) +#define QLIST_RAW_INSERT_AFTER(head, prev, elem, entry) do { \ + *QLIST_RAW_NEXT(prev, entry) = elem; \ + *QLIST_RAW_PREVIOUS(elem, entry) = QLIST_RAW_NEXT(prev, entry); \ + *QLIST_RAW_NEXT(elem, entry) = NULL; \ +} while (0) + #define QLIST_RAW_INSERT_HEAD(head, elm, entry) do { \ void *first = *QLIST_RAW_FIRST(head); \ *QLIST_RAW_FIRST(head) = elm; \ @@ -527,17 +533,4 @@ union { \ } \ } while (0) -#define QLIST_RAW_REVERSE(head, elm, entry) do { \ - void *iter = *QLIST_RAW_FIRST(head), *prev = NULL, *next; \ - while (iter) { \ - next = *QLIST_RAW_NEXT(iter, entry); \ - *QLIST_RAW_PREVIOUS(iter, entry) = QLIST_RAW_NEXT(next, entry); \ - *QLIST_RAW_NEXT(iter, entry) = prev; \ - prev = iter; \ - iter = next; \ - } \ - *QLIST_RAW_FIRST(head) = prev; \ - *QLIST_RAW_PREVIOUS(prev, entry) = QLIST_RAW_FIRST(head); \ -} while (0) - #endif /* QEMU_SYS_QUEUE_H */ diff --git a/migration/Makefile.objs b/migration/Makefile.objs index a4f3bafd860262047bb97bf5cff20734f931b44a..d3623d5f9b991555de387dbb22a1df79a46a97c1 100644 --- a/migration/Makefile.objs +++ b/migration/Makefile.objs @@ -7,6 +7,7 @@ common-obj-y += qemu-file-channel.o common-obj-y += xbzrle.o postcopy-ram.o common-obj-y += qjson.o common-obj-y += block-dirty-bitmap.o +common-obj-y += multifd.o common-obj-$(CONFIG_RDMA) += rdma.o diff --git a/migration/migration.c b/migration/migration.c index efd5350e84a33b42deaa73aeef341c132500b20b..3a21a4686c31aa2149347c4aec88cd7f8a02f5e3 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -53,6 +53,7 @@ #include "monitor/monitor.h" #include "net/announce.h" #include "qemu/queue.h" +#include "multifd.h" #define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */ @@ -518,13 +519,23 @@ fail: exit(EXIT_FAILURE); } -static void migration_incoming_setup(QEMUFile *f) +/** + * @migration_incoming_setup: Setup incoming migration + * + * Returns 0 for no error or 1 for error + * + * @f: file for main migration channel + * @errp: where to put errors + */ +static int migration_incoming_setup(QEMUFile *f, Error **errp) { MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; - if (multifd_load_setup() != 0) { + if (multifd_load_setup(&local_err) != 0) { /* We haven't been able to create multifd threads nothing better to do */ + error_report_err(local_err); exit(EXIT_FAILURE); } @@ -532,6 +543,7 @@ static void migration_incoming_setup(QEMUFile *f) mis->from_src_file = f; } qemu_file_set_blocking(f, false); + return 0; } void migration_incoming_process(void) @@ -572,19 +584,27 @@ static bool postcopy_try_recover(QEMUFile *f) return false; } -void migration_fd_process_incoming(QEMUFile *f) +void migration_fd_process_incoming(QEMUFile *f, Error **errp) { + Error *local_err = NULL; + if (postcopy_try_recover(f)) { return; } - migration_incoming_setup(f); + if (migration_incoming_setup(f, &local_err)) { + if (local_err) { + error_propagate(errp, local_err); + } + return; + } migration_incoming_process(); } void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) { MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; bool start_migration; if (!mis->from_src_file) { @@ -596,7 +616,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) return; } - migration_incoming_setup(f); + if (migration_incoming_setup(f, &local_err)) { + if (local_err) { + error_propagate(errp, local_err); + } + return; + } /* * Common migration only needs one channel, so we can start @@ -604,7 +629,6 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) */ start_migration = !migrate_use_multifd(); } else { - Error *local_err = NULL; /* Multiple connections */ assert(migrate_use_multifd()); start_migration = multifd_recv_new_channel(ioc, &local_err); @@ -829,6 +853,27 @@ bool migration_is_setup_or_active(int state) } } +bool migration_is_running(int state) +{ + switch (state) { + case MIGRATION_STATUS_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_PAUSED: + case MIGRATION_STATUS_POSTCOPY_RECOVER: + case MIGRATION_STATUS_SETUP: + case MIGRATION_STATUS_PRE_SWITCHOVER: + case MIGRATION_STATUS_DEVICE: + case MIGRATION_STATUS_WAIT_UNPLUG: + case MIGRATION_STATUS_CANCELLING: + case MIGRATION_STATUS_COLO: + return true; + + default: + return false; + + } +} + static void populate_time_info(MigrationInfo *info, MigrationState *s) { info->has_status = true; @@ -1077,7 +1122,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, MigrationCapabilityStatusList *cap; bool cap_list[MIGRATION_CAPABILITY__MAX]; - if (migration_is_setup_or_active(s->state)) { + if (migration_is_running(s->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return; } @@ -1590,7 +1635,7 @@ static void migrate_fd_cancel(MigrationState *s) do { old_state = s->state; - if (!migration_is_setup_or_active(old_state)) { + if (!migration_is_running(old_state)) { break; } /* If the migration is paused, kick it out of the pause */ @@ -1888,9 +1933,7 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc, return true; } - if (migration_is_setup_or_active(s->state) || - s->state == MIGRATION_STATUS_CANCELLING || - s->state == MIGRATION_STATUS_COLO) { + if (migration_is_running(s->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return false; } @@ -3348,6 +3391,7 @@ static void *migration_thread(void *opaque) void migrate_fd_connect(MigrationState *s, Error *error_in) { + Error *local_err = NULL; int64_t rate_limit; bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED; @@ -3396,7 +3440,8 @@ void migrate_fd_connect(MigrationState *s, Error *error_in) return; } - if (multifd_save_setup() != 0) { + if (multifd_save_setup(&local_err) != 0) { + error_report_err(local_err); migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED); migrate_fd_cleanup(s); diff --git a/migration/migration.h b/migration/migration.h index aa9ff6f27b19d7ee165048aa7b6de81d41154be0..8473ddfc887b81f88e6e50b6050e252f3dab0481 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -265,7 +265,7 @@ struct MigrationState void migrate_set_state(int *state, int old_state, int new_state); -void migration_fd_process_incoming(QEMUFile *f); +void migration_fd_process_incoming(QEMUFile *f, Error **errp); void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp); void migration_incoming_process(void); @@ -279,6 +279,7 @@ void migrate_fd_error(MigrationState *s, const Error *error); void migrate_fd_connect(MigrationState *s, Error *error_in); bool migration_is_setup_or_active(int state); +bool migration_is_running(int state); void migrate_init(MigrationState *s); bool migration_is_blocked(Error **errp); diff --git a/migration/multifd.c b/migration/multifd.c new file mode 100644 index 0000000000000000000000000000000000000000..b3e8ae9bcca54d8082935ee16871868ef0ffee31 --- /dev/null +++ b/migration/multifd.c @@ -0,0 +1,899 @@ +/* + * Multifd common code + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/rcu.h" +#include "exec/target_page.h" +#include "sysemu/sysemu.h" +#include "exec/ramblock.h" +#include "qemu/error-report.h" +#include "qapi/error.h" +#include "ram.h" +#include "migration.h" +#include "socket.h" +#include "qemu-file.h" +#include "trace.h" +#include "multifd.h" + +/* Multiple fd's */ + +#define MULTIFD_MAGIC 0x11223344U +#define MULTIFD_VERSION 1 + +typedef struct { + uint32_t magic; + uint32_t version; + unsigned char uuid[16]; /* QemuUUID */ + uint8_t id; + uint8_t unused1[7]; /* Reserved for future use */ + uint64_t unused2[4]; /* Reserved for future use */ +} __attribute__((packed)) MultiFDInit_t; + +static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) +{ + MultiFDInit_t msg = {}; + int ret; + + msg.magic = cpu_to_be32(MULTIFD_MAGIC); + msg.version = cpu_to_be32(MULTIFD_VERSION); + msg.id = p->id; + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); + + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + return 0; +} + +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) +{ + MultiFDInit_t msg; + int ret; + + ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + + msg.magic = be32_to_cpu(msg.magic); + msg.version = be32_to_cpu(msg.version); + + if (msg.magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet magic %x " + "expected %x", msg.magic, MULTIFD_MAGIC); + return -1; + } + + if (msg.version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { + char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); + char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); + + error_setg(errp, "multifd: received uuid '%s' and expected " + "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); + g_free(uuid); + g_free(msg_uuid); + return -1; + } + + if (msg.id > migrate_multifd_channels()) { + error_setg(errp, "multifd: received channel version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + return msg.id; +} + +static MultiFDPages_t *multifd_pages_init(size_t size) +{ + MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); + + pages->allocated = size; + pages->iov = g_new0(struct iovec, size); + pages->offset = g_new0(ram_addr_t, size); + + return pages; +} + +static void multifd_pages_clear(MultiFDPages_t *pages) +{ + pages->used = 0; + pages->allocated = 0; + pages->packet_num = 0; + pages->block = NULL; + g_free(pages->iov); + pages->iov = NULL; + g_free(pages->offset); + pages->offset = NULL; + g_free(pages); +} + +static void multifd_send_fill_packet(MultiFDSendParams *p) +{ + MultiFDPacket_t *packet = p->packet; + int i; + + packet->flags = cpu_to_be32(p->flags); + packet->pages_alloc = cpu_to_be32(p->pages->allocated); + packet->pages_used = cpu_to_be32(p->pages->used); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); + packet->packet_num = cpu_to_be64(p->packet_num); + + if (p->pages->block) { + strncpy(packet->ramblock, p->pages->block->idstr, 256); + } + + for (i = 0; i < p->pages->used; i++) { + /* there are architectures where ram_addr_t is 32 bit */ + uint64_t temp = p->pages->offset[i]; + + packet->offset[i] = cpu_to_be64(temp); + } +} + +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + MultiFDPacket_t *packet = p->packet; + uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + RAMBlock *block; + int i; + + packet->magic = be32_to_cpu(packet->magic); + if (packet->magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet " + "magic %x and expected magic %x", + packet->magic, MULTIFD_MAGIC); + return -1; + } + + packet->version = be32_to_cpu(packet->version); + if (packet->version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet " + "version %d and expected version %d", + packet->version, MULTIFD_VERSION); + return -1; + } + + p->flags = be32_to_cpu(packet->flags); + + packet->pages_alloc = be32_to_cpu(packet->pages_alloc); + /* + * If we received a packet that is 100 times bigger than expected + * just stop migration. It is a magic number. + */ + if (packet->pages_alloc > pages_max * 100) { + error_setg(errp, "multifd: received packet " + "with size %d and expected a maximum size of %d", + packet->pages_alloc, pages_max * 100) ; + return -1; + } + /* + * We received a packet that is bigger than expected but inside + * reasonable limits (see previous comment). Just reallocate. + */ + if (packet->pages_alloc > p->pages->allocated) { + multifd_pages_clear(p->pages); + p->pages = multifd_pages_init(packet->pages_alloc); + } + + p->pages->used = be32_to_cpu(packet->pages_used); + if (p->pages->used > packet->pages_alloc) { + error_setg(errp, "multifd: received packet " + "with %d pages and expected maximum pages are %d", + p->pages->used, packet->pages_alloc) ; + return -1; + } + + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + p->packet_num = be64_to_cpu(packet->packet_num); + + if (p->pages->used == 0) { + return 0; + } + + /* make sure that ramblock is 0 terminated */ + packet->ramblock[255] = 0; + block = qemu_ram_block_by_name(packet->ramblock); + if (!block) { + error_setg(errp, "multifd: unknown ram block %s", + packet->ramblock); + return -1; + } + + for (i = 0; i < p->pages->used; i++) { + uint64_t offset = be64_to_cpu(packet->offset[i]); + + if (offset > (block->used_length - qemu_target_page_size())) { + error_setg(errp, "multifd: offset too long %" PRIu64 + " (max " RAM_ADDR_FMT ")", + offset, block->max_length); + return -1; + } + p->pages->iov[i].iov_base = block->host + offset; + p->pages->iov[i].iov_len = qemu_target_page_size(); + } + + return 0; +} + +struct { + MultiFDSendParams *params; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; + /* + * Have we already run terminate threads. There is a race when it + * happens that we got one error while we are exiting. + * We will use atomic operations. Only valid values are 0 and 1. + */ + int exiting; +} *multifd_send_state; + +/* + * How we use multifd_send_state->pages and channel->pages? + * + * We create a pages for each channel, and a main one. Each time that + * we need to send a batch of pages we interchange the ones between + * multifd_send_state and the channel that is sending it. There are + * two reasons for that: + * - to not have to do so many mallocs during migration + * - to make easier to know what to free at the end of migration + * + * This way we always know who is the owner of each "pages" struct, + * and we don't need any locking. It belongs to the migration thread + * or to the channel thread. Switching is safe because the migration + * thread is using the channel mutex when changing it, and the channel + * have to had finish with its own, otherwise pending_job can't be + * false. + */ + +static int multifd_send_pages(QEMUFile *f) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + uint64_t transferred; + + if (atomic_read(&multifd_send_state->exiting)) { + return -1; + } + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (p->quit) { + error_report("%s: channel %d has already quit!", __func__, i); + qemu_mutex_unlock(&p->mutex); + return -1; + } + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + assert(!p->pages->used); + assert(!p->pages->block); + + p->packet_num = multifd_send_state->packet_num++; + multifd_send_state->pages = p->pages; + p->pages = pages; + transferred = ((uint64_t) pages->used) * qemu_target_page_size() + + p->packet_len; + qemu_file_update_transfer(f, transferred); + ram_counters.multifd_bytes += transferred; + ram_counters.transferred += transferred;; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + + return 1; +} + +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = qemu_target_page_size(); + pages->used++; + + if (pages->used < pages->allocated) { + return 1; + } + } + + if (multifd_send_pages(f) < 0) { + return -1; + } + + if (pages->block != block) { + return multifd_queue_page(f, block, offset); + } + + return 1; +} + +static void multifd_send_terminate_threads(Error *err) +{ + int i; + + trace_multifd_send_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_PRE_SWITCHOVER || + s->state == MIGRATION_STATUS_DEVICE || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + /* + * We don't want to exit each threads twice. Depending on where + * we get the error, or if there are two independent errors in two + * threads at the same time, we can end calling this function + * twice. + */ + if (atomic_xchg(&multifd_send_state->exiting, 1)) { + return; + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + qemu_sem_post(&p->sem); + qemu_mutex_unlock(&p->mutex); + } +} + +void multifd_save_cleanup(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + multifd_send_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + if (p->running) { + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + socket_send_channel_destroy(p->c); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_send_state->channels_ready); + g_free(multifd_send_state->params); + multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; + g_free(multifd_send_state); + multifd_send_state = NULL; +} + +void multifd_send_sync_main(QEMUFile *f) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + if (multifd_send_state->pages->used) { + if (multifd_send_pages(f) < 0) { + error_report("%s: multifd_send_pages fail", __func__); + return; + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + + if (p->quit) { + error_report("%s: channel %d has already quit", __func__, i); + qemu_mutex_unlock(&p->mutex); + return; + } + + p->packet_num = multifd_send_state->packet_num++; + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_file_update_transfer(f, p->packet_len); + ram_counters.multifd_bytes += p->packet_len; + ram_counters.transferred += p->packet_len; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&p->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->packet_num); +} + +static void *multifd_send_thread(void *opaque) +{ + MultiFDSendParams *p = opaque; + Error *local_err = NULL; + int ret = 0; + uint32_t flags = 0; + + trace_multifd_send_thread_start(p->id); + rcu_register_thread(); + + if (multifd_send_initial_packet(p, &local_err) < 0) { + ret = -1; + goto out; + } + /* initial packet */ + p->num_packets = 1; + + while (true) { + qemu_sem_wait(&p->sem); + + if (atomic_read(&multifd_send_state->exiting)) { + break; + } + qemu_mutex_lock(&p->mutex); + + if (p->pending_job) { + uint32_t used = p->pages->used; + uint64_t packet_num = p->packet_num; + flags = p->flags; + + p->next_packet_size = used * qemu_target_page_size(); + multifd_send_fill_packet(p); + p->flags = 0; + p->num_packets++; + p->num_pages += used; + p->pages->used = 0; + p->pages->block = NULL; + qemu_mutex_unlock(&p->mutex); + + trace_multifd_send(p->id, packet_num, used, flags, + p->next_packet_size); + + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + + if (used) { + ret = qio_channel_writev_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + qemu_mutex_lock(&p->mutex); + p->pending_job--; + qemu_mutex_unlock(&p->mutex); + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&p->sem_sync); + } + qemu_sem_post(&multifd_send_state->channels_ready); + } else if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ + } + } + +out: + if (local_err) { + trace_multifd_send_error(p->id); + multifd_send_terminate_threads(local_err); + } + + /* + * Error happen, I will exit, but I can't just leave, tell + * who pay attention to me. + */ + if (ret != 0) { + qemu_sem_post(&p->sem_sync); + qemu_sem_post(&multifd_send_state->channels_ready); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) +{ + MultiFDSendParams *p = opaque; + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); + Error *local_err = NULL; + + trace_multifd_new_send_channel_async(p->id); + if (qio_task_propagate_error(task, &local_err)) { + migrate_set_error(migrate_get_current(), local_err); + /* Error happen, we need to tell who pay attention to me */ + qemu_sem_post(&multifd_send_state->channels_ready); + qemu_sem_post(&p->sem_sync); + /* + * Although multifd_send_thread is not created, but main migration + * thread neet to judge whether it is running, so we need to mark + * its status. + */ + p->quit = true; + } else { + p->c = QIO_CHANNEL(sioc); + qio_channel_set_delay(p->c, false); + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); + } +} + +int multifd_save_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); + multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); + multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->channels_ready, 0); + atomic_set(&multifd_send_state->exiting, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->pending_job = 0; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet->version = cpu_to_be32(MULTIFD_VERSION); + p->name = g_strdup_printf("multifdsend_%d", i); + socket_send_channel_create(multifd_new_send_channel_async, p); + } + return 0; +} + +struct { + MultiFDRecvParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; +} *multifd_recv_state; + +static void multifd_recv_terminate_threads(Error *err) +{ + int i; + + trace_multifd_recv_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + /* + * We could arrive here for two reasons: + * - normal quit, i.e. everything went fine, just finished + * - error quit: We close the channels so the channel threads + * finish the qio_channel_read_all_eof() + */ + if (p->c) { + qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + } + qemu_mutex_unlock(&p->mutex); + } +} + +int multifd_load_cleanup(Error **errp) +{ + int i; + int ret = 0; + + if (!migrate_use_multifd()) { + return 0; + } + multifd_recv_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + if (p->running) { + p->quit = true; + /* + * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, + * however try to wakeup it without harm in cleanup phase. + */ + qemu_sem_post(&p->sem_sync); + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + object_unref(OBJECT(p->c)); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_recv_state->sem_sync); + g_free(multifd_recv_state->params); + multifd_recv_state->params = NULL; + g_free(multifd_recv_state); + multifd_recv_state = NULL; + + return ret; +} + +void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->packet_num < p->packet_num) { + multifd_recv_state->packet_num = p->packet_num; + } + qemu_mutex_unlock(&p->mutex); + trace_multifd_recv_sync_main_signal(p->id); + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->packet_num); +} + +static void *multifd_recv_thread(void *opaque) +{ + MultiFDRecvParams *p = opaque; + Error *local_err = NULL; + int ret; + + trace_multifd_recv_thread_start(p->id); + rcu_register_thread(); + + while (true) { + uint32_t used; + uint32_t flags; + + if (p->quit) { + break; + } + + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0) { /* EOF */ + break; + } + if (ret == -1) { /* Error */ + break; + } + + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { + qemu_mutex_unlock(&p->mutex); + break; + } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags, + p->next_packet_size); + p->num_packets++; + p->num_pages += used; + qemu_mutex_unlock(&p->mutex); + + if (used) { + ret = qio_channel_readv_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } + } + + if (local_err) { + multifd_recv_terminate_threads(local_err); + } + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +int multifd_load_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); + multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); + atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->name = g_strdup_printf("multifdrecv_%d", i); + } + return 0; +} + +bool multifd_recv_all_channels_created(void) +{ + int thread_count = migrate_multifd_channels(); + + if (!migrate_use_multifd()) { + return true; + } + + return thread_count == atomic_read(&multifd_recv_state->count); +} + +/* + * Try to receive all multifd channels to get ready for the migration. + * - Return true and do not set @errp when correctly receving all channels; + * - Return false and do not set @errp when correctly receiving the current one; + * - Return false and set @errp when failing to receive the current channel. + */ +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) +{ + MultiFDRecvParams *p; + Error *local_err = NULL; + int id; + + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + atomic_read(&multifd_recv_state->count)); + return false; + } + trace_multifd_recv_new_channel(id); + + p = &multifd_recv_state->params[id]; + if (p->c != NULL) { + error_setg(&local_err, "multifd: received id '%d' already setup'", + id); + multifd_recv_terminate_threads(local_err); + error_propagate(errp, local_err); + return false; + } + p->c = ioc; + object_ref(OBJECT(ioc)); + /* initial packet */ + p->num_packets = 1; + + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multifd_recv_state->count); + return atomic_read(&multifd_recv_state->count) == + migrate_multifd_channels(); +} + diff --git a/migration/multifd.h b/migration/multifd.h new file mode 100644 index 0000000000000000000000000000000000000000..d8b0205977a6cadc8978babef0a6bb4396a85e56 --- /dev/null +++ b/migration/multifd.h @@ -0,0 +1,139 @@ +/* + * Multifd common functions + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef QEMU_MIGRATION_MULTIFD_H +#define QEMU_MIGRATION_MULTIFD_H + +int multifd_save_setup(Error **errp); +void multifd_save_cleanup(void); +int multifd_load_setup(Error **errp); +int multifd_load_cleanup(Error **errp); +bool multifd_recv_all_channels_created(void); +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); +void multifd_recv_sync_main(void); +void multifd_send_sync_main(QEMUFile *f); +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); + +#define MULTIFD_FLAG_SYNC (1 << 0) + +/* This value needs to be a multiple of qemu_target_page_size() */ +#define MULTIFD_PACKET_SIZE (512 * 1024) + +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t flags; + /* maximum number of allocated pages */ + uint32_t pages_alloc; + uint32_t pages_used; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + uint64_t packet_num; + uint64_t unused[4]; /* Reserved for future use */ + char ramblock[256]; + uint64_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + +typedef struct { + /* number of used pages */ + uint32_t used; + /* number of allocated pages */ + uint32_t allocated; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* offset of each page */ + ram_addr_t *offset; + /* pointer to each page */ + struct iovec *iov; + RAMBlock *block; +} MultiFDPages_t; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* thread has work to do */ + int pending_job; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDSendParams; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* array of pages to receive */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDRecvParams; + +#endif + diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 26fb25ddc1341af62b80571f8699b5e9740dd8d3..1c3a358a140d83de6709ddadf7e3ec6e2d7bfbb8 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -53,6 +53,8 @@ struct QEMUFile { int last_error; Error *last_error_obj; + /* has the file has been shutdown */ + bool shutdown; }; /* @@ -61,10 +63,18 @@ struct QEMUFile { */ int qemu_file_shutdown(QEMUFile *f) { + int ret; + + f->shutdown = true; if (!f->ops->shut_down) { return -ENOSYS; } - return f->ops->shut_down(f->opaque, true, true, NULL); + ret = f->ops->shut_down(f->opaque, true, true, NULL); + + if (!f->last_error) { + qemu_file_set_error(f, -EIO); + } + return ret; } /* @@ -214,6 +224,9 @@ void qemu_fflush(QEMUFile *f) return; } + if (f->shutdown) { + return; + } if (f->iovcnt > 0) { expect = iov_size(f->iov, f->iovcnt); ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos, @@ -328,6 +341,10 @@ static ssize_t qemu_fill_buffer(QEMUFile *f) f->buf_index = 0; f->buf_size = pending; + if (f->shutdown) { + return 0; + } + len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos, IO_BUF_SIZE - pending, &local_error); if (len > 0) { @@ -642,6 +659,9 @@ int64_t qemu_ftell(QEMUFile *f) int qemu_file_rate_limit(QEMUFile *f) { + if (f->shutdown) { + return 1; + } if (qemu_file_get_error(f)) { return 1; } @@ -744,11 +764,8 @@ static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, /* Compress size bytes of data start at p and store the compressed * data to the buffer of f. * - * When f is not writable, return -1 if f has no space to save the - * compressed data. - * When f is wirtable and it has no space to save the compressed data, - * do fflush first, if f still has no space to save the compressed - * data, return -1. + * Since the file is dummy file with empty_ops, return -1 if f has no space to + * save the compressed data. */ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, const uint8_t *p, size_t size) @@ -756,14 +773,7 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); if (blen < compressBound(size)) { - if (!qemu_file_is_writable(f)) { - return -1; - } - qemu_fflush(f); - blen = IO_BUF_SIZE - sizeof(int32_t); - if (blen < compressBound(size)) { - return -1; - } + return -1; } blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), diff --git a/migration/ram.c b/migration/ram.c index d2208b553422aa43928fff1116d7f643f7892e72..ed23ed1c7cc712f77086b02975c2d73b5d0c5c30 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -36,7 +36,6 @@ #include "xbzrle.h" #include "ram.h" #include "migration.h" -#include "socket.h" #include "migration/register.h" #include "migration/misc.h" #include "qemu-file.h" @@ -53,9 +52,9 @@ #include "migration/colo.h" #include "block.h" #include "sysemu/sysemu.h" -#include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "multifd.h" /***********************************************************/ /* ram save/restore */ @@ -575,980 +574,6 @@ exit: return -1; } -/* Multiple fd's */ - -#define MULTIFD_MAGIC 0x11223344U -#define MULTIFD_VERSION 1 - -#define MULTIFD_FLAG_SYNC (1 << 0) - -/* This value needs to be a multiple of qemu_target_page_size() */ -#define MULTIFD_PACKET_SIZE (512 * 1024) - -typedef struct { - uint32_t magic; - uint32_t version; - unsigned char uuid[16]; /* QemuUUID */ - uint8_t id; - uint8_t unused1[7]; /* Reserved for future use */ - uint64_t unused2[4]; /* Reserved for future use */ -} __attribute__((packed)) MultiFDInit_t; - -typedef struct { - uint32_t magic; - uint32_t version; - uint32_t flags; - /* maximum number of allocated pages */ - uint32_t pages_alloc; - uint32_t pages_used; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - uint64_t packet_num; - uint64_t unused[4]; /* Reserved for future use */ - char ramblock[256]; - uint64_t offset[]; -} __attribute__((packed)) MultiFDPacket_t; - -typedef struct { - /* number of used pages */ - uint32_t used; - /* number of allocated pages */ - uint32_t allocated; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* offset of each page */ - ram_addr_t *offset; - /* pointer to each page */ - struct iovec *iov; - RAMBlock *block; -} MultiFDPages_t; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* sem where to wait for more work */ - QemuSemaphore sem; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* thread has work to do */ - int pending_job; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDSendParams; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* array of pages to receive */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDRecvParams; - -static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) -{ - MultiFDInit_t msg = {}; - int ret; - - msg.magic = cpu_to_be32(MULTIFD_MAGIC); - msg.version = cpu_to_be32(MULTIFD_VERSION); - msg.id = p->id; - memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); - - ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - return 0; -} - -static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) -{ - MultiFDInit_t msg; - int ret; - - ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - - msg.magic = be32_to_cpu(msg.magic); - msg.version = be32_to_cpu(msg.version); - - if (msg.magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet magic %x " - "expected %x", msg.magic, MULTIFD_MAGIC); - return -1; - } - - if (msg.version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { - char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); - char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); - - error_setg(errp, "multifd: received uuid '%s' and expected " - "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); - g_free(uuid); - g_free(msg_uuid); - return -1; - } - - if (msg.id > migrate_multifd_channels()) { - error_setg(errp, "multifd: received channel version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - return msg.id; -} - -static MultiFDPages_t *multifd_pages_init(size_t size) -{ - MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); - - pages->allocated = size; - pages->iov = g_new0(struct iovec, size); - pages->offset = g_new0(ram_addr_t, size); - - return pages; -} - -static void multifd_pages_clear(MultiFDPages_t *pages) -{ - pages->used = 0; - pages->allocated = 0; - pages->packet_num = 0; - pages->block = NULL; - g_free(pages->iov); - pages->iov = NULL; - g_free(pages->offset); - pages->offset = NULL; - g_free(pages); -} - -static void multifd_send_fill_packet(MultiFDSendParams *p) -{ - MultiFDPacket_t *packet = p->packet; - int i; - - packet->flags = cpu_to_be32(p->flags); - packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->pages_used = cpu_to_be32(p->pages->used); - packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); - - if (p->pages->block) { - strncpy(packet->ramblock, p->pages->block->idstr, 256); - } - - for (i = 0; i < p->pages->used; i++) { - /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = p->pages->offset[i]; - - packet->offset[i] = cpu_to_be64(temp); - } -} - -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) -{ - MultiFDPacket_t *packet = p->packet; - uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - RAMBlock *block; - int i; - - packet->magic = be32_to_cpu(packet->magic); - if (packet->magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet " - "magic %x and expected magic %x", - packet->magic, MULTIFD_MAGIC); - return -1; - } - - packet->version = be32_to_cpu(packet->version); - if (packet->version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet " - "version %d and expected version %d", - packet->version, MULTIFD_VERSION); - return -1; - } - - p->flags = be32_to_cpu(packet->flags); - - packet->pages_alloc = be32_to_cpu(packet->pages_alloc); - /* - * If we received a packet that is 100 times bigger than expected - * just stop migration. It is a magic number. - */ - if (packet->pages_alloc > pages_max * 100) { - error_setg(errp, "multifd: received packet " - "with size %d and expected a maximum size of %d", - packet->pages_alloc, pages_max * 100) ; - return -1; - } - /* - * We received a packet that is bigger than expected but inside - * reasonable limits (see previous comment). Just reallocate. - */ - if (packet->pages_alloc > p->pages->allocated) { - multifd_pages_clear(p->pages); - p->pages = multifd_pages_init(packet->pages_alloc); - } - - p->pages->used = be32_to_cpu(packet->pages_used); - if (p->pages->used > packet->pages_alloc) { - error_setg(errp, "multifd: received packet " - "with %d pages and expected maximum pages are %d", - p->pages->used, packet->pages_alloc) ; - return -1; - } - - p->next_packet_size = be32_to_cpu(packet->next_packet_size); - p->packet_num = be64_to_cpu(packet->packet_num); - - if (p->pages->used == 0) { - return 0; - } - - /* make sure that ramblock is 0 terminated */ - packet->ramblock[255] = 0; - block = qemu_ram_block_by_name(packet->ramblock); - if (!block) { - error_setg(errp, "multifd: unknown ram block %s", - packet->ramblock); - return -1; - } - - for (i = 0; i < p->pages->used; i++) { - uint64_t offset = be64_to_cpu(packet->offset[i]); - - if (offset > (block->used_length - TARGET_PAGE_SIZE)) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, block->max_length); - return -1; - } - p->pages->iov[i].iov_base = block->host + offset; - p->pages->iov[i].iov_len = TARGET_PAGE_SIZE; - } - - return 0; -} - -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; -} *multifd_send_state; - -/* - * How we use multifd_send_state->pages and channel->pages? - * - * We create a pages for each channel, and a main one. Each time that - * we need to send a batch of pages we interchange the ones between - * multifd_send_state and the channel that is sending it. There are - * two reasons for that: - * - to not have to do so many mallocs during migration - * - to make easier to know what to free at the end of migration - * - * This way we always know who is the owner of each "pages" struct, - * and we don't need any locking. It belongs to the migration thread - * or to the channel thread. Switching is safe because the migration - * thread is using the channel mutex when changing it, and the channel - * have to had finish with its own, otherwise pending_job can't be - * false. - */ - -static int multifd_send_pages(RAMState *rs) -{ - int i; - static int next_channel; - MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; - - if (atomic_read(&multifd_send_state->exiting)) { - return -1; - } - - qemu_sem_wait(&multifd_send_state->channels_ready); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; - } - if (!p->pending_job) { - p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); - break; - } - qemu_mutex_unlock(&p->mutex); - } - assert(!p->pages->used); - assert(!p->pages->block); - - p->packet_num = multifd_send_state->packet_num++; - multifd_send_state->pages = p->pages; - p->pages = pages; - transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len; - qemu_file_update_transfer(rs->f, transferred); - ram_counters.multifd_bytes += transferred; - ram_counters.transferred += transferred;; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - - return 1; -} - -static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) -{ - MultiFDPages_t *pages = multifd_send_state->pages; - - if (!pages->block) { - pages->block = block; - } - - if (pages->block == block) { - pages->offset[pages->used] = offset; - pages->iov[pages->used].iov_base = block->host + offset; - pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE; - pages->used++; - - if (pages->used < pages->allocated) { - return 1; - } - } - - if (multifd_send_pages(rs) < 0) { - return -1; - } - - if (pages->block != block) { - return multifd_queue_page(rs, block, offset); - } - - return 1; -} - -static void multifd_send_terminate_threads(Error *err) -{ - int i; - - trace_multifd_send_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_PRE_SWITCHOVER || - s->state == MIGRATION_STATUS_DEVICE || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. - */ - if (atomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); - qemu_mutex_unlock(&p->mutex); - } -} - -void multifd_save_cleanup(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - if (p->running) { - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - socket_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; -} - -static void multifd_send_sync_main(RAMState *rs) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - if (multifd_send_state->pages->used) { - if (multifd_send_pages(rs) < 0) { - error_report("%s: multifd_send_pages fail", __func__); - return; - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); - return; - } - - p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; - qemu_file_update_transfer(rs->f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; - ram_counters.transferred += p->packet_len; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_wait(p->id); - qemu_sem_wait(&p->sem_sync); - } - trace_multifd_send_sync_main(multifd_send_state->packet_num); -} - -static void *multifd_send_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; - Error *local_err = NULL; - int ret = 0; - uint32_t flags = 0; - - trace_multifd_send_thread_start(p->id); - rcu_register_thread(); - - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; - } - /* initial packet */ - p->num_packets = 1; - - while (true) { - qemu_sem_wait(&p->sem); - - if (atomic_read(&multifd_send_state->exiting)) { - break; - } - qemu_mutex_lock(&p->mutex); - - if (p->pending_job) { - uint32_t used = p->pages->used; - uint64_t packet_num = p->packet_num; - flags = p->flags; - - p->next_packet_size = used * qemu_target_page_size(); - multifd_send_fill_packet(p); - p->flags = 0; - p->num_packets++; - p->num_pages += used; - p->pages->used = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); - - trace_multifd_send(p->id, packet_num, used, flags, - p->next_packet_size); - - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; - } - - if (used) { - ret = qio_channel_writev_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - qemu_mutex_lock(&p->mutex); - p->pending_job--; - qemu_mutex_unlock(&p->mutex); - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); - } - qemu_sem_post(&multifd_send_state->channels_ready); - } else if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } else { - qemu_mutex_unlock(&p->mutex); - /* sometimes there are spurious wakeups */ - } - } - -out: - if (local_err) { - trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); - } - - /* - * Error happen, I will exit, but I can't just leave, tell - * who pay attention to me. - */ - if (ret != 0) { - qemu_sem_post(&p->sem_sync); - qemu_sem_post(&multifd_send_state->channels_ready); - } - - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *local_err = NULL; - - trace_multifd_new_send_channel_async(p->id); - if (qio_task_propagate_error(task, &local_err)) { - migrate_set_error(migrate_get_current(), local_err); - multifd_save_cleanup(); - } else { - p->c = QIO_CHANNEL(sioc); - qio_channel_set_delay(p->c, false); - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - } -} - -int multifd_save_setup(void) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); - multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->pages = multifd_pages_init(page_count); - qemu_sem_init(&multifd_send_state->channels_ready, 0); - atomic_set(&multifd_send_state->exiting, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->pending_job = 0; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); - p->name = g_strdup_printf("multifdsend_%d", i); - socket_send_channel_create(multifd_new_send_channel_async, p); - } - return 0; -} - -struct { - MultiFDRecvParams *params; - /* number of created threads */ - int count; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; - /* global number of generated multifd packets */ - uint64_t packet_num; -} *multifd_recv_state; - -static void multifd_recv_terminate_threads(Error *err) -{ - int i; - - trace_multifd_recv_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - /* We could arrive here for two reasons: - - normal quit, i.e. everything went fine, just finished - - error quit: We close the channels so the channel threads - finish the qio_channel_read_all_eof() */ - if (p->c) { - qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } - qemu_mutex_unlock(&p->mutex); - } -} - -int multifd_load_cleanup(Error **errp) -{ - int i; - int ret = 0; - - if (!migrate_use_multifd()) { - return 0; - } - multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - if (p->running) { - p->quit = true; - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - object_unref(OBJECT(p->c)); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_recv_state->sem_sync); - g_free(multifd_recv_state->params); - multifd_recv_state->params = NULL; - g_free(multifd_recv_state); - multifd_recv_state = NULL; - - return ret; -} - -static void multifd_recv_sync_main(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - trace_multifd_recv_sync_main_wait(p->id); - qemu_sem_wait(&multifd_recv_state->sem_sync); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (multifd_recv_state->packet_num < p->packet_num) { - multifd_recv_state->packet_num = p->packet_num; - } - qemu_mutex_unlock(&p->mutex); - trace_multifd_recv_sync_main_signal(p->id); - qemu_sem_post(&p->sem_sync); - } - trace_multifd_recv_sync_main(multifd_recv_state->packet_num); -} - -static void *multifd_recv_thread(void *opaque) -{ - MultiFDRecvParams *p = opaque; - Error *local_err = NULL; - int ret; - - trace_multifd_recv_thread_start(p->id); - rcu_register_thread(); - - while (true) { - uint32_t used; - uint32_t flags; - - if (p->quit) { - break; - } - - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0) { /* EOF */ - break; - } - if (ret == -1) { /* Error */ - break; - } - - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; - } - - used = p->pages->used; - flags = p->flags; - trace_multifd_recv(p->id, p->packet_num, used, flags, - p->next_packet_size); - p->num_packets++; - p->num_pages += used; - qemu_mutex_unlock(&p->mutex); - - if (used) { - ret = qio_channel_readv_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&multifd_recv_state->sem_sync); - qemu_sem_wait(&p->sem_sync); - } - } - - if (local_err) { - multifd_recv_terminate_threads(local_err); - } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -int multifd_load_setup(void) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); - multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - atomic_set(&multifd_recv_state->count, 0); - qemu_sem_init(&multifd_recv_state->sem_sync, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->name = g_strdup_printf("multifdrecv_%d", i); - } - return 0; -} - -bool multifd_recv_all_channels_created(void) -{ - int thread_count = migrate_multifd_channels(); - - if (!migrate_use_multifd()) { - return true; - } - - return thread_count == atomic_read(&multifd_recv_state->count); -} - -/* - * Try to receive all multifd channels to get ready for the migration. - * - Return true and do not set @errp when correctly receving all channels; - * - Return false and do not set @errp when correctly receiving the current one; - * - Return false and set @errp when failing to receive the current channel. - */ -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) -{ - MultiFDRecvParams *p; - Error *local_err = NULL; - int id; - - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - atomic_read(&multifd_recv_state->count)); - return false; - } - trace_multifd_recv_new_channel(id); - - p = &multifd_recv_state->params[id]; - if (p->c != NULL) { - error_setg(&local_err, "multifd: received id '%d' already setup'", - id); - multifd_recv_terminate_threads(local_err); - error_propagate(errp, local_err); - return false; - } - p->c = ioc; - object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; - - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); - atomic_inc(&multifd_recv_state->count); - return atomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); -} - /** * save_page_header: write page header to wire * @@ -2128,7 +1153,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - if (multifd_queue_page(rs, block, offset) < 0) { + if (multifd_queue_page(rs->f, block, offset) < 0) { return -1; } ram_counters.normal++; @@ -3426,7 +2451,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); - multifd_send_sync_main(*rsp); + multifd_send_sync_main(f); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); qemu_fflush(f); @@ -3445,7 +2470,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) { RAMState **temp = opaque; RAMState *rs = *temp; - int ret; + int ret = 0; int i; int64_t t0; int done = 0; @@ -3524,12 +2549,15 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_ROUND); out: - multifd_send_sync_main(rs); - qemu_put_be64(f, RAM_SAVE_FLAG_EOS); - qemu_fflush(f); - ram_counters.transferred += 8; + if (ret >= 0 + && migration_is_setup_or_active(migrate_get_current()->state)) { + multifd_send_sync_main(rs->f); + qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); + ram_counters.transferred += 8; - ret = qemu_file_get_error(f); + ret = qemu_file_get_error(f); + } if (ret < 0) { return ret; } @@ -3581,9 +2609,11 @@ static int ram_save_complete(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_FINISH); } - multifd_send_sync_main(rs); - qemu_put_be64(f, RAM_SAVE_FLAG_EOS); - qemu_fflush(f); + if (ret >= 0) { + multifd_send_sync_main(rs->f); + qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); + } return ret; } diff --git a/migration/ram.h b/migration/ram.h index bd0eee79b6cfa6bb522a563960106ffb8f9d6db9..a553d40751fe9b98e56feca9da15f0d4754f62ad 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -41,13 +41,6 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_total(void); -int multifd_save_setup(void); -void multifd_save_cleanup(void); -int multifd_load_setup(void); -int multifd_load_cleanup(Error **errp); -bool multifd_recv_all_channels_created(void); -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); - uint64_t ram_pagesize_summary(void); int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len); void acct_update_position(QEMUFile *f, size_t size, bool zero); diff --git a/migration/rdma.c b/migration/rdma.c index e241dcb9925e5e2d33b51acf05c5d1e862724f80..2379b8345b2f8b0508ca242513a17d4264aba14f 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -4004,7 +4004,7 @@ static void rdma_accept_incoming_migration(void *opaque) } rdma->migration_started_on_destination = 1; - migration_fd_process_incoming(f); + migration_fd_process_incoming(f, errp); } void rdma_start_incoming_migration(const char *host_port, Error **errp) diff --git a/migration/savevm.c b/migration/savevm.c index adfdca26ac5c55fd8b453c5649457ee9a3e02f18..f19cb9ec7a89c3c0c9ab06f9d55c55c4b3fa6529 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -1531,9 +1531,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp) MigrationState *ms = migrate_get_current(); MigrationStatus status; - if (migration_is_setup_or_active(ms->state) || - ms->state == MIGRATION_STATUS_CANCELLING || - ms->state == MIGRATION_STATUS_COLO) { + if (migration_is_running(ms->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return -EINVAL; } diff --git a/migration/vmstate-types.c b/migration/vmstate-types.c index 1eee36773a168192a8ff49fd3da5d97433d8a98d..35e784c9d938f2120ddf04972dfcaeba151ec05d 100644 --- a/migration/vmstate-types.c +++ b/migration/vmstate-types.c @@ -879,7 +879,7 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size, /* offset of the QLIST entry in a QLIST element */ size_t entry_offset = field->start; int version_id = field->version_id; - void *elm; + void *elm, *prev = NULL; trace_get_qlist(field->name, vmsd->name, vmsd->version_id); if (version_id > vmsd->version_id) { @@ -900,9 +900,13 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size, g_free(elm); return ret; } - QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset); + if (!prev) { + QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset); + } else { + QLIST_RAW_INSERT_AFTER(pv, prev, elm, entry_offset); + } + prev = elm; } - QLIST_RAW_REVERSE(pv, elm, entry_offset); trace_get_qlist_end(field->name, vmsd->name); return ret; diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c index 26e2e77289882cc8880ef06612fada5c34fe51e3..cf27ebbc9d3b1a63ab0b0a621f78feebfca7926d 100644 --- a/tests/qtest/migration-test.c +++ b/tests/qtest/migration-test.c @@ -424,6 +424,14 @@ static void migrate_recover(QTestState *who, const char *uri) qobject_unref(rsp); } +static void migrate_cancel(QTestState *who) +{ + QDict *rsp; + + rsp = wait_command(who, "{ 'execute': 'migrate_cancel' }"); + qobject_unref(rsp); +} + static void migrate_set_capability(QTestState *who, const char *capability, bool value) { @@ -456,6 +464,8 @@ static void migrate_postcopy_start(QTestState *from, QTestState *to) typedef struct { bool hide_stderr; bool use_shmem; + /* only launch the target process */ + bool only_target; char *opts_source; char *opts_target; } MigrateStart; @@ -571,7 +581,9 @@ static int test_migrate_start(QTestState **from, QTestState **to, arch_source, shmem_opts, args->opts_source, ignore_stderr); g_free(arch_source); - *from = qtest_init(cmd_source); + if (!args->only_target) { + *from = qtest_init(cmd_source); + } g_free(cmd_source); cmd_target = g_strdup_printf("-accel kvm -accel tcg%s%s " @@ -1291,7 +1303,104 @@ static void test_multifd_tcp(void) wait_for_serial("dest_serial"); wait_for_migration_complete(from); test_migrate_end(from, to, true); - free(uri); + g_free(uri); +} + +/* + * This test does: + * source target + * migrate_incoming + * migrate + * migrate_cancel + * launch another target + * migrate + * + * And see that it works + */ + +static void test_multifd_tcp_cancel(void) +{ + MigrateStart *args = migrate_start_new(); + QTestState *from, *to, *to2; + QDict *rsp; + char *uri; + + args->hide_stderr = true; + + if (test_migrate_start(&from, &to, "defer", args)) { + return; + } + + /* + * We want to pick a speed slow enough that the test completes + * quickly, but that it doesn't complete precopy even on a slow + * machine, so also set the downtime. + */ + /* 1 ms should make it not converge*/ + migrate_set_parameter_int(from, "downtime-limit", 1); + /* 300MB/s */ + migrate_set_parameter_int(from, "max-bandwidth", 30000000); + + migrate_set_parameter_int(from, "multifd-channels", 16); + migrate_set_parameter_int(to, "multifd-channels", 16); + + migrate_set_capability(from, "multifd", "true"); + migrate_set_capability(to, "multifd", "true"); + + /* Start incoming migration from the 1st socket */ + rsp = wait_command(to, "{ 'execute': 'migrate-incoming'," + " 'arguments': { 'uri': 'tcp:127.0.0.1:0' }}"); + qobject_unref(rsp); + + /* Wait for the first serial output from the source */ + wait_for_serial("src_serial"); + + uri = migrate_get_socket_address(to, "socket-address"); + + migrate_qmp(from, uri, "{}"); + + wait_for_migration_pass(from); + + migrate_cancel(from); + + args = migrate_start_new(); + args->only_target = true; + + if (test_migrate_start(&from, &to2, "defer", args)) { + return; + } + + migrate_set_parameter_int(to2, "multifd-channels", 16); + + migrate_set_capability(to2, "multifd", "true"); + + /* Start incoming migration from the 1st socket */ + rsp = wait_command(to2, "{ 'execute': 'migrate-incoming'," + " 'arguments': { 'uri': 'tcp:127.0.0.1:0' }}"); + qobject_unref(rsp); + + uri = migrate_get_socket_address(to2, "socket-address"); + + wait_for_migration_status(from, "cancelled", NULL); + + /* 300ms it should converge */ + migrate_set_parameter_int(from, "downtime-limit", 300); + /* 1GB/s */ + migrate_set_parameter_int(from, "max-bandwidth", 1000000000); + + migrate_qmp(from, uri, "{}"); + + wait_for_migration_pass(from); + + if (!got_stop) { + qtest_qmp_eventwait(from, "STOP"); + } + qtest_qmp_eventwait(to2, "RESUME"); + + wait_for_serial("dest_serial"); + wait_for_migration_complete(from); + test_migrate_end(from, to2, true); + g_free(uri); } int main(int argc, char **argv) @@ -1359,6 +1468,7 @@ int main(int argc, char **argv) qtest_add_func("/migration/auto_converge", test_migrate_auto_converge); qtest_add_func("/migration/multifd/tcp", test_multifd_tcp); + qtest_add_func("/migration/multifd/tcp/cancel", test_multifd_tcp_cancel); ret = g_test_run();