提交 262db388 编写于 作者: P Paolo Bonzini

qemu-nbd: asynchronous operation

Using coroutines enable asynchronous operation on both the network and
the block side.  Network can be owned by two coroutines at the same time,
one writing and one reading.  On the send side, mutual exclusion is
guaranteed by a CoMutex.  On the receive side, mutual exclusion is
guaranteed because new coroutines immediately start receiving data,
and no new coroutines are created as long as the previous one is receiving.

Between receive and send, qemu-nbd can have an arbitrary number of
in-flight block transfers.  Throttling is implemented by the next
patch.
Signed-off-by: NPaolo Bonzini <pbonzini@redhat.com>
上级 72deddc5
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include "block.h" #include "block.h"
#include "block_int.h" #include "block_int.h"
#include "qemu-coroutine.h"
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#ifndef _WIN32 #ifndef _WIN32
...@@ -607,6 +609,11 @@ struct NBDClient { ...@@ -607,6 +609,11 @@ struct NBDClient {
NBDExport *exp; NBDExport *exp;
int sock; int sock;
Coroutine *recv_coroutine;
CoMutex send_lock;
Coroutine *send_coroutine;
}; };
static void nbd_client_get(NBDClient *client) static void nbd_client_get(NBDClient *client)
...@@ -681,13 +688,20 @@ void nbd_export_close(NBDExport *exp) ...@@ -681,13 +688,20 @@ void nbd_export_close(NBDExport *exp)
g_free(exp); g_free(exp);
} }
static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, static void nbd_read(void *opaque);
static void nbd_restart_write(void *opaque);
static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int len) int len)
{ {
NBDClient *client = req->client; NBDClient *client = req->client;
int csock = client->sock; int csock = client->sock;
int rc, ret; int rc, ret;
qemu_co_mutex_lock(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
client->send_coroutine = qemu_coroutine_self();
if (!len) { if (!len) {
rc = nbd_send_reply(csock, reply); rc = nbd_send_reply(csock, reply);
if (rc == -1) { if (rc == -1) {
...@@ -697,7 +711,7 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, ...@@ -697,7 +711,7 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
socket_set_cork(csock, 1); socket_set_cork(csock, 1);
rc = nbd_send_reply(csock, reply); rc = nbd_send_reply(csock, reply);
if (rc != -1) { if (rc != -1) {
ret = write_sync(csock, req->data, len); ret = qemu_co_send(csock, req->data, len);
if (ret != len) { if (ret != len) {
errno = EIO; errno = EIO;
rc = -1; rc = -1;
...@@ -708,15 +722,20 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, ...@@ -708,15 +722,20 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
} }
socket_set_cork(csock, 0); socket_set_cork(csock, 0);
} }
client->send_coroutine = NULL;
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
qemu_co_mutex_unlock(&client->send_lock);
return rc; return rc;
} }
static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
{ {
NBDClient *client = req->client; NBDClient *client = req->client;
int csock = client->sock; int csock = client->sock;
int rc; int rc;
client->recv_coroutine = qemu_coroutine_self();
if (nbd_receive_request(csock, request) == -1) { if (nbd_receive_request(csock, request) == -1) {
rc = -EIO; rc = -EIO;
goto out; goto out;
...@@ -741,7 +760,7 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) ...@@ -741,7 +760,7 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) {
TRACE("Reading %u byte(s)", request->len); TRACE("Reading %u byte(s)", request->len);
if (read_sync(csock, req->data, request->len) != request->len) { if (qemu_co_recv(csock, req->data, request->len) != request->len) {
LOG("reading from socket failed"); LOG("reading from socket failed");
rc = -EIO; rc = -EIO;
goto out; goto out;
...@@ -750,21 +769,22 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) ...@@ -750,21 +769,22 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
rc = 0; rc = 0;
out: out:
client->recv_coroutine = NULL;
return rc; return rc;
} }
static int nbd_trip(NBDClient *client) static void nbd_trip(void *opaque)
{ {
NBDClient *client = opaque;
NBDRequest *req = nbd_request_get(client); NBDRequest *req = nbd_request_get(client);
NBDExport *exp = client->exp; NBDExport *exp = client->exp;
struct nbd_request request; struct nbd_request request;
struct nbd_reply reply; struct nbd_reply reply;
int rc = -1;
int ret; int ret;
TRACE("Reading request."); TRACE("Reading request.");
ret = nbd_do_receive_request(req, &request); ret = nbd_co_receive_request(req, &request);
if (ret == -EIO) { if (ret == -EIO) {
goto out; goto out;
} }
...@@ -799,7 +819,7 @@ static int nbd_trip(NBDClient *client) ...@@ -799,7 +819,7 @@ static int nbd_trip(NBDClient *client)
} }
TRACE("Read %u byte(s)", request.len); TRACE("Read %u byte(s)", request.len);
if (nbd_do_send_reply(req, &reply, request.len) < 0) if (nbd_co_send_reply(req, &reply, request.len) < 0)
goto out; goto out;
break; break;
case NBD_CMD_WRITE: case NBD_CMD_WRITE:
...@@ -822,7 +842,7 @@ static int nbd_trip(NBDClient *client) ...@@ -822,7 +842,7 @@ static int nbd_trip(NBDClient *client)
} }
if (request.type & NBD_CMD_FLAG_FUA) { if (request.type & NBD_CMD_FLAG_FUA) {
ret = bdrv_flush(exp->bs); ret = bdrv_co_flush(exp->bs);
if (ret < 0) { if (ret < 0) {
LOG("flush failed"); LOG("flush failed");
reply.error = -ret; reply.error = -ret;
...@@ -830,34 +850,34 @@ static int nbd_trip(NBDClient *client) ...@@ -830,34 +850,34 @@ static int nbd_trip(NBDClient *client)
} }
} }
if (nbd_do_send_reply(req, &reply, 0) < 0) if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out; goto out;
break; break;
case NBD_CMD_DISC: case NBD_CMD_DISC:
TRACE("Request type is DISCONNECT"); TRACE("Request type is DISCONNECT");
errno = 0; errno = 0;
return 1; goto out;
case NBD_CMD_FLUSH: case NBD_CMD_FLUSH:
TRACE("Request type is FLUSH"); TRACE("Request type is FLUSH");
ret = bdrv_flush(exp->bs); ret = bdrv_co_flush(exp->bs);
if (ret < 0) { if (ret < 0) {
LOG("flush failed"); LOG("flush failed");
reply.error = -ret; reply.error = -ret;
} }
if (nbd_do_send_reply(req, &reply, 0) < 0) if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out; goto out;
break; break;
case NBD_CMD_TRIM: case NBD_CMD_TRIM:
TRACE("Request type is TRIM"); TRACE("Request type is TRIM");
ret = bdrv_discard(exp->bs, (request.from + exp->dev_offset) / 512, ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512,
request.len / 512); request.len / 512);
if (ret < 0) { if (ret < 0) {
LOG("discard failed"); LOG("discard failed");
reply.error = -ret; reply.error = -ret;
} }
if (nbd_do_send_reply(req, &reply, 0) < 0) if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out; goto out;
break; break;
default: default:
...@@ -865,28 +885,39 @@ static int nbd_trip(NBDClient *client) ...@@ -865,28 +885,39 @@ static int nbd_trip(NBDClient *client)
invalid_request: invalid_request:
reply.error = -EINVAL; reply.error = -EINVAL;
error_reply: error_reply:
if (nbd_do_send_reply(req, &reply, 0) == -1) if (nbd_co_send_reply(req, &reply, 0) == -1)
goto out; goto out;
break; break;
} }
TRACE("Request/Reply complete"); TRACE("Request/Reply complete");
rc = 0; nbd_request_put(req);
return;
out: out:
nbd_request_put(req); nbd_request_put(req);
return rc; nbd_client_close(client);
} }
static void nbd_read(void *opaque) static void nbd_read(void *opaque)
{ {
NBDClient *client = opaque; NBDClient *client = opaque;
if (nbd_trip(client) != 0) { if (client->recv_coroutine) {
nbd_client_close(client); qemu_coroutine_enter(client->recv_coroutine, NULL);
} else {
qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
} }
} }
static void nbd_restart_write(void *opaque)
{
NBDClient *client = opaque;
qemu_coroutine_enter(client->send_coroutine, NULL);
}
NBDClient *nbd_client_new(NBDExport *exp, int csock, NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *)) void (*close)(NBDClient *))
{ {
...@@ -899,6 +930,7 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, ...@@ -899,6 +930,7 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
client->exp = exp; client->exp = exp;
client->sock = csock; client->sock = csock;
client->close = close; client->close = close;
qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
return client; return client;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册