提交 84390bed 编写于 作者: S Stefan Hajnoczi

sheepdog: implement .bdrv_detach/attach_aio_context()

Drop the assumption that we're using the main AioContext.  Convert
qemu_aio_set_fd_handler() to aio_set_fd_handler() and qemu_aio_wait() to
aio_poll().

The .bdrv_detach/attach_aio_context() interfaces also need to be
implemented to move the socket fd handler from the old to the new
AioContext.

Cc: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Acked-by: NLiu Yuan <namei.unix@gmail.com>
Signed-off-by: NStefan Hajnoczi <stefanha@redhat.com>
上级 ea800191
...@@ -314,6 +314,7 @@ struct SheepdogAIOCB { ...@@ -314,6 +314,7 @@ struct SheepdogAIOCB {
typedef struct BDRVSheepdogState { typedef struct BDRVSheepdogState {
BlockDriverState *bs; BlockDriverState *bs;
AioContext *aio_context;
SheepdogInode inode; SheepdogInode inode;
...@@ -496,7 +497,7 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb) ...@@ -496,7 +497,7 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
sd_finish_aiocb(acb); sd_finish_aiocb(acb);
return; return;
} }
qemu_aio_wait(); aio_poll(s->aio_context, true);
} }
} }
...@@ -578,6 +579,7 @@ static void restart_co_req(void *opaque) ...@@ -578,6 +579,7 @@ static void restart_co_req(void *opaque)
typedef struct SheepdogReqCo { typedef struct SheepdogReqCo {
int sockfd; int sockfd;
AioContext *aio_context;
SheepdogReq *hdr; SheepdogReq *hdr;
void *data; void *data;
unsigned int *wlen; unsigned int *wlen;
...@@ -598,14 +600,14 @@ static coroutine_fn void do_co_req(void *opaque) ...@@ -598,14 +600,14 @@ static coroutine_fn void do_co_req(void *opaque)
unsigned int *rlen = srco->rlen; unsigned int *rlen = srco->rlen;
co = qemu_coroutine_self(); co = qemu_coroutine_self();
qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co); aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, co);
ret = send_co_req(sockfd, hdr, data, wlen); ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) { if (ret < 0) {
goto out; goto out;
} }
qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co); aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, co);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret != sizeof(*hdr)) { if (ret != sizeof(*hdr)) {
...@@ -630,18 +632,19 @@ static coroutine_fn void do_co_req(void *opaque) ...@@ -630,18 +632,19 @@ static coroutine_fn void do_co_req(void *opaque)
out: out:
/* there is at most one request for this sockfd, so it is safe to /* there is at most one request for this sockfd, so it is safe to
* set each handler to NULL. */ * set each handler to NULL. */
qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL); aio_set_fd_handler(srco->aio_context, sockfd, NULL, NULL, NULL);
srco->ret = ret; srco->ret = ret;
srco->finished = true; srco->finished = true;
} }
static int do_req(int sockfd, SheepdogReq *hdr, void *data, static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
unsigned int *wlen, unsigned int *rlen) void *data, unsigned int *wlen, unsigned int *rlen)
{ {
Coroutine *co; Coroutine *co;
SheepdogReqCo srco = { SheepdogReqCo srco = {
.sockfd = sockfd, .sockfd = sockfd,
.aio_context = aio_context,
.hdr = hdr, .hdr = hdr,
.data = data, .data = data,
.wlen = wlen, .wlen = wlen,
...@@ -656,7 +659,7 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, ...@@ -656,7 +659,7 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
co = qemu_coroutine_create(do_co_req); co = qemu_coroutine_create(do_co_req);
qemu_coroutine_enter(co, &srco); qemu_coroutine_enter(co, &srco);
while (!srco.finished) { while (!srco.finished) {
qemu_aio_wait(); aio_poll(aio_context, true);
} }
} }
...@@ -709,7 +712,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque) ...@@ -709,7 +712,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque)
BDRVSheepdogState *s = opaque; BDRVSheepdogState *s = opaque;
AIOReq *aio_req, *next; AIOReq *aio_req, *next;
qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL); aio_set_fd_handler(s->aio_context, s->fd, NULL, NULL, NULL);
close(s->fd); close(s->fd);
s->fd = -1; s->fd = -1;
...@@ -922,7 +925,7 @@ static int get_sheep_fd(BDRVSheepdogState *s, Error **errp) ...@@ -922,7 +925,7 @@ static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
return fd; return fd;
} }
qemu_aio_set_fd_handler(fd, co_read_response, NULL, s); aio_set_fd_handler(s->aio_context, fd, co_read_response, NULL, s);
return fd; return fd;
} }
...@@ -1092,7 +1095,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename, ...@@ -1092,7 +1095,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
hdr.snapid = snapid; hdr.snapid = snapid;
hdr.flags = SD_FLAG_CMD_WRITE; hdr.flags = SD_FLAG_CMD_WRITE;
ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen); ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
if (ret) { if (ret) {
error_setg_errno(errp, -ret, "cannot get vdi info"); error_setg_errno(errp, -ret, "cannot get vdi info");
goto out; goto out;
...@@ -1173,7 +1176,8 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, ...@@ -1173,7 +1176,8 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
qemu_co_mutex_lock(&s->lock); qemu_co_mutex_lock(&s->lock);
s->co_send = qemu_coroutine_self(); s->co_send = qemu_coroutine_self();
qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s); aio_set_fd_handler(s->aio_context, s->fd,
co_read_response, co_write_request, s);
socket_set_cork(s->fd, 1); socket_set_cork(s->fd, 1);
/* send a header */ /* send a header */
...@@ -1191,12 +1195,13 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, ...@@ -1191,12 +1195,13 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
} }
out: out:
socket_set_cork(s->fd, 0); socket_set_cork(s->fd, 0);
qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s); aio_set_fd_handler(s->aio_context, s->fd, co_read_response, NULL, s);
s->co_send = NULL; s->co_send = NULL;
qemu_co_mutex_unlock(&s->lock); qemu_co_mutex_unlock(&s->lock);
} }
static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies, static int read_write_object(int fd, AioContext *aio_context, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset, unsigned int datalen, uint64_t offset,
bool write, bool create, uint32_t cache_flags) bool write, bool create, uint32_t cache_flags)
{ {
...@@ -1229,7 +1234,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies, ...@@ -1229,7 +1234,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
hdr.offset = offset; hdr.offset = offset;
hdr.copies = copies; hdr.copies = copies;
ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen); ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
if (ret) { if (ret) {
error_report("failed to send a request to the sheep"); error_report("failed to send a request to the sheep");
return ret; return ret;
...@@ -1244,19 +1249,23 @@ static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies, ...@@ -1244,19 +1249,23 @@ static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
} }
} }
static int read_object(int fd, char *buf, uint64_t oid, uint8_t copies, static int read_object(int fd, AioContext *aio_context, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset, unsigned int datalen, uint64_t offset,
uint32_t cache_flags) uint32_t cache_flags)
{ {
return read_write_object(fd, buf, oid, copies, datalen, offset, false, return read_write_object(fd, aio_context, buf, oid, copies,
datalen, offset, false,
false, cache_flags); false, cache_flags);
} }
static int write_object(int fd, char *buf, uint64_t oid, uint8_t copies, static int write_object(int fd, AioContext *aio_context, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset, bool create, unsigned int datalen, uint64_t offset, bool create,
uint32_t cache_flags) uint32_t cache_flags)
{ {
return read_write_object(fd, buf, oid, copies, datalen, offset, true, return read_write_object(fd, aio_context, buf, oid, copies,
datalen, offset, true,
create, cache_flags); create, cache_flags);
} }
...@@ -1284,7 +1293,7 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag) ...@@ -1284,7 +1293,7 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
goto out; goto out;
} }
ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid), ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags); s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
if (ret < 0) { if (ret < 0) {
goto out; goto out;
...@@ -1359,6 +1368,22 @@ out: ...@@ -1359,6 +1368,22 @@ out:
} }
} }
static void sd_detach_aio_context(BlockDriverState *bs)
{
BDRVSheepdogState *s = bs->opaque;
aio_set_fd_handler(s->aio_context, s->fd, NULL, NULL, NULL);
}
static void sd_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
BDRVSheepdogState *s = bs->opaque;
s->aio_context = new_context;
aio_set_fd_handler(new_context, s->fd, co_read_response, NULL, s);
}
/* TODO Convert to fine grained options */ /* TODO Convert to fine grained options */
static QemuOptsList runtime_opts = { static QemuOptsList runtime_opts = {
.name = "sheepdog", .name = "sheepdog",
...@@ -1387,6 +1412,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, ...@@ -1387,6 +1412,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
const char *filename; const char *filename;
s->bs = bs; s->bs = bs;
s->aio_context = bdrv_get_aio_context(bs);
opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
qemu_opts_absorb_qdict(opts, options, &local_err); qemu_opts_absorb_qdict(opts, options, &local_err);
...@@ -1448,8 +1474,8 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, ...@@ -1448,8 +1474,8 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
} }
buf = g_malloc(SD_INODE_SIZE); buf = g_malloc(SD_INODE_SIZE);
ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0, ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
s->cache_flags); 0, SD_INODE_SIZE, 0, s->cache_flags);
closesocket(fd); closesocket(fd);
...@@ -1469,7 +1495,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, ...@@ -1469,7 +1495,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
g_free(buf); g_free(buf);
return 0; return 0;
out: out:
qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL); aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, NULL, NULL, NULL);
if (s->fd >= 0) { if (s->fd >= 0) {
closesocket(s->fd); closesocket(s->fd);
} }
...@@ -1512,7 +1538,7 @@ static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot, ...@@ -1512,7 +1538,7 @@ static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
hdr.copy_policy = s->inode.copy_policy; hdr.copy_policy = s->inode.copy_policy;
hdr.copies = s->inode.nr_copies; hdr.copies = s->inode.nr_copies;
ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen); ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
closesocket(fd); closesocket(fd);
...@@ -1766,7 +1792,8 @@ static void sd_close(BlockDriverState *bs) ...@@ -1766,7 +1792,8 @@ static void sd_close(BlockDriverState *bs)
hdr.data_length = wlen; hdr.data_length = wlen;
hdr.flags = SD_FLAG_CMD_WRITE; hdr.flags = SD_FLAG_CMD_WRITE;
ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen); ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
s->name, &wlen, &rlen);
closesocket(fd); closesocket(fd);
...@@ -1775,7 +1802,7 @@ static void sd_close(BlockDriverState *bs) ...@@ -1775,7 +1802,7 @@ static void sd_close(BlockDriverState *bs)
error_report("%s, %s", sd_strerror(rsp->result), s->name); error_report("%s, %s", sd_strerror(rsp->result), s->name);
} }
qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL); aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, NULL, NULL, NULL);
closesocket(s->fd); closesocket(s->fd);
g_free(s->host_spec); g_free(s->host_spec);
} }
...@@ -1812,8 +1839,9 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset) ...@@ -1812,8 +1839,9 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
/* we don't need to update entire object */ /* we don't need to update entire object */
datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id); datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
s->inode.vdi_size = offset; s->inode.vdi_size = offset;
ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id), ret = write_object(fd, s->aio_context, (char *)&s->inode,
s->inode.nr_copies, datalen, 0, false, s->cache_flags); vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
datalen, 0, false, s->cache_flags);
close(fd); close(fd);
if (ret < 0) { if (ret < 0) {
...@@ -1882,7 +1910,8 @@ static bool sd_delete(BDRVSheepdogState *s) ...@@ -1882,7 +1910,8 @@ static bool sd_delete(BDRVSheepdogState *s)
return false; return false;
} }
ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen); ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
s->name, &wlen, &rlen);
closesocket(fd); closesocket(fd);
if (ret) { if (ret) {
return false; return false;
...@@ -1939,8 +1968,8 @@ static int sd_create_branch(BDRVSheepdogState *s) ...@@ -1939,8 +1968,8 @@ static int sd_create_branch(BDRVSheepdogState *s)
goto out; goto out;
} }
ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies, ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
SD_INODE_SIZE, 0, s->cache_flags); s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
closesocket(fd); closesocket(fd);
...@@ -2187,8 +2216,9 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) ...@@ -2187,8 +2216,9 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
goto cleanup; goto cleanup;
} }
ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id), ret = write_object(fd, s->aio_context, (char *)&s->inode,
s->inode.nr_copies, datalen, 0, false, s->cache_flags); vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
datalen, 0, false, s->cache_flags);
if (ret < 0) { if (ret < 0) {
error_report("failed to write snapshot's inode."); error_report("failed to write snapshot's inode.");
goto cleanup; goto cleanup;
...@@ -2203,8 +2233,9 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) ...@@ -2203,8 +2233,9 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
goto cleanup; goto cleanup;
} }
ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid), ret = read_object(fd, s->aio_context, (char *)inode,
s->inode.nr_copies, datalen, 0, s->cache_flags); vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
s->cache_flags);
if (ret < 0) { if (ret < 0) {
error_report("failed to read new inode info. %s", strerror(errno)); error_report("failed to read new inode info. %s", strerror(errno));
...@@ -2311,7 +2342,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) ...@@ -2311,7 +2342,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
req.opcode = SD_OP_READ_VDIS; req.opcode = SD_OP_READ_VDIS;
req.data_length = max; req.data_length = max;
ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen); ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
vdi_inuse, &wlen, &rlen);
closesocket(fd); closesocket(fd);
if (ret) { if (ret) {
...@@ -2338,7 +2370,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) ...@@ -2338,7 +2370,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
} }
/* we don't need to read entire object */ /* we don't need to read entire object */
ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid), ret = read_object(fd, s->aio_context, (char *)&inode,
vid_to_vdi_oid(vid),
0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0, 0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
s->cache_flags); s->cache_flags);
...@@ -2403,11 +2436,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data, ...@@ -2403,11 +2436,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
create = (offset == 0); create = (offset == 0);
if (load) { if (load) {
ret = read_object(fd, (char *)data, vmstate_oid, ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
s->inode.nr_copies, data_len, offset, s->inode.nr_copies, data_len, offset,
s->cache_flags); s->cache_flags);
} else { } else {
ret = write_object(fd, (char *)data, vmstate_oid, ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
s->inode.nr_copies, data_len, offset, create, s->inode.nr_copies, data_len, offset, create,
s->cache_flags); s->cache_flags);
} }
...@@ -2580,6 +2613,9 @@ static BlockDriver bdrv_sheepdog = { ...@@ -2580,6 +2613,9 @@ static BlockDriver bdrv_sheepdog = {
.bdrv_save_vmstate = sd_save_vmstate, .bdrv_save_vmstate = sd_save_vmstate,
.bdrv_load_vmstate = sd_load_vmstate, .bdrv_load_vmstate = sd_load_vmstate,
.bdrv_detach_aio_context = sd_detach_aio_context,
.bdrv_attach_aio_context = sd_attach_aio_context,
.create_options = sd_create_options, .create_options = sd_create_options,
}; };
...@@ -2610,6 +2646,9 @@ static BlockDriver bdrv_sheepdog_tcp = { ...@@ -2610,6 +2646,9 @@ static BlockDriver bdrv_sheepdog_tcp = {
.bdrv_save_vmstate = sd_save_vmstate, .bdrv_save_vmstate = sd_save_vmstate,
.bdrv_load_vmstate = sd_load_vmstate, .bdrv_load_vmstate = sd_load_vmstate,
.bdrv_detach_aio_context = sd_detach_aio_context,
.bdrv_attach_aio_context = sd_attach_aio_context,
.create_options = sd_create_options, .create_options = sd_create_options,
}; };
...@@ -2640,6 +2679,9 @@ static BlockDriver bdrv_sheepdog_unix = { ...@@ -2640,6 +2679,9 @@ static BlockDriver bdrv_sheepdog_unix = {
.bdrv_save_vmstate = sd_save_vmstate, .bdrv_save_vmstate = sd_save_vmstate,
.bdrv_load_vmstate = sd_load_vmstate, .bdrv_load_vmstate = sd_load_vmstate,
.bdrv_detach_aio_context = sd_detach_aio_context,
.bdrv_attach_aio_context = sd_attach_aio_context,
.create_options = sd_create_options, .create_options = sd_create_options,
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册