提交 09d0726c 编写于 作者: A Anthony Liguori

Merge remote-tracking branch 'bonzini/nbd-next' into staging

* bonzini/nbd-next:
  nbd: add nbd_export_get_blockdev
  nbd: negotiate with named exports
  nbd: register named exports
  qemu-nbd: rewrite termination conditions to use a state machine
  nbd: add notification for closing an NBDExport
  nbd: track clients into NBDExport
  nbd: add reference counting to NBDExport
  nbd: do not leak nbd_trip coroutines when a connection is torn down
  nbd: make refcount interface public
  nbd: do not close BlockDriverState in nbd_export_close
  nbd: pass NBDClient to nbd_send_negotiate
  nbd: add more constants
......@@ -57,9 +57,12 @@
/* This is all part of the "official" NBD API */
#define NBD_REQUEST_SIZE (4 + 4 + 8 + 8 + 4)
#define NBD_REPLY_SIZE (4 + 4 + 8)
#define NBD_REQUEST_MAGIC 0x25609513
#define NBD_REPLY_MAGIC 0x67446698
#define NBD_OPTS_MAGIC 0x49484156454F5054LL
#define NBD_CLIENT_MAGIC 0x0000420281861253LL
#define NBD_SET_SOCK _IO(0xab, 0)
#define NBD_SET_BLKSIZE _IO(0xab, 1)
......@@ -75,6 +78,49 @@
#define NBD_OPT_EXPORT_NAME (1 << 0)
/* Definitions for opaque data types */
typedef struct NBDRequest NBDRequest;
struct NBDRequest {
QSIMPLEQ_ENTRY(NBDRequest) entry;
NBDClient *client;
uint8_t *data;
};
struct NBDExport {
int refcount;
void (*close)(NBDExport *exp);
BlockDriverState *bs;
char *name;
off_t dev_offset;
off_t size;
uint32_t nbdflags;
QTAILQ_HEAD(, NBDClient) clients;
QSIMPLEQ_HEAD(, NBDRequest) requests;
QTAILQ_ENTRY(NBDExport) next;
};
static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
struct NBDClient {
int refcount;
void (*close)(NBDClient *client);
NBDExport *exp;
int sock;
Coroutine *recv_coroutine;
CoMutex send_lock;
Coroutine *send_coroutine;
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
};
/* That's all folks */
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
......@@ -192,11 +238,23 @@ int unix_socket_outgoing(const char *path)
return unix_connect(path);
}
/* Basic flow
/* Basic flow for negotiation
Server Client
Negotiate
or
Server Client
Negotiate #1
Option
Negotiate #2
----
followed by
Server Client
Request
Response
Request
......@@ -204,18 +262,111 @@ int unix_socket_outgoing(const char *path)
...
...
Request (type == 2)
*/
static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
static int nbd_receive_options(NBDClient *client)
{
int csock = client->sock;
char name[256];
uint32_t tmp, length;
uint64_t magic;
int rc;
/* Client sends:
[ 0 .. 3] reserved (0)
[ 4 .. 11] NBD_OPTS_MAGIC
[12 .. 15] NBD_OPT_EXPORT_NAME
[16 .. 19] length
[20 .. xx] export name (length bytes)
*/
rc = -EINVAL;
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
goto fail;
}
TRACE("Checking reserved");
if (tmp != 0) {
LOG("Bad reserved received");
goto fail;
}
if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("read failed");
goto fail;
}
TRACE("Checking reserved");
if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
LOG("Bad magic received");
goto fail;
}
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
goto fail;
}
TRACE("Checking option");
if (tmp != be32_to_cpu(NBD_OPT_EXPORT_NAME)) {
LOG("Bad option received");
goto fail;
}
if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
LOG("read failed");
goto fail;
}
TRACE("Checking length");
length = be32_to_cpu(length);
if (length > 255) {
LOG("Bad length received");
goto fail;
}
if (read_sync(csock, name, length) != length) {
LOG("read failed");
goto fail;
}
name[length] = '\0';
client->exp = nbd_export_find(name);
if (!client->exp) {
LOG("export not found");
goto fail;
}
QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
nbd_export_get(client->exp);
TRACE("Option negotiation succeeded.");
rc = 0;
fail:
return rc;
}
static int nbd_send_negotiate(NBDClient *client)
{
int csock = client->sock;
char buf[8 + 8 + 8 + 128];
int rc;
const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
/* Negotiate
/* Negotiation header without options:
[ 0 .. 7] passwd ("NBDMAGIC")
[ 8 .. 15] magic (0x00420281861253)
[ 8 .. 15] magic (NBD_CLIENT_MAGIC)
[16 .. 23] size
[24 .. 27] flags
[24 .. 25] server flags (0)
[24 .. 27] export flags
[28 .. 151] reserved (0)
Negotiation header with options, part 1:
[ 0 .. 7] passwd ("NBDMAGIC")
[ 8 .. 15] magic (NBD_OPTS_MAGIC)
[16 .. 17] server flags (0)
part 2 (after options are sent):
[18 .. 25] size
[26 .. 27] export flags
[28 .. 151] reserved (0)
*/
......@@ -224,17 +375,40 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
TRACE("Beginning negotiation.");
memcpy(buf, "NBDMAGIC", 8);
cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL);
cpu_to_be64w((uint64_t*)(buf + 16), size);
cpu_to_be32w((uint32_t*)(buf + 24),
flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
if (client->exp) {
assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 8), NBD_CLIENT_MAGIC);
cpu_to_be64w((uint64_t*)(buf + 16), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
} else {
cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC);
}
memset(buf + 28, 0, 124);
if (client->exp) {
if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed");
goto fail;
}
} else {
if (write_sync(csock, buf, 18) != 18) {
LOG("write failed");
goto fail;
}
rc = nbd_receive_options(client);
if (rc < 0) {
LOG("option negotiation failed");
goto fail;
}
assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
LOG("write failed");
goto fail;
}
}
TRACE("Negotiation succeeded.");
rc = 0;
......@@ -295,7 +469,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
uint32_t namesize;
TRACE("Checking magic (opts_magic)");
if (magic != 0x49484156454F5054LL) {
if (magic != NBD_OPTS_MAGIC) {
LOG("Bad magic received");
goto fail;
}
......@@ -334,7 +508,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
} else {
TRACE("Checking magic (cli_magic)");
if (magic != 0x00420281861253LL) {
if (magic != NBD_CLIENT_MAGIC) {
LOG("Bad magic received");
goto fail;
}
......@@ -477,7 +651,7 @@ int nbd_client(int fd)
ssize_t nbd_send_request(int csock, struct nbd_request *request)
{
uint8_t buf[4 + 4 + 8 + 8 + 4];
uint8_t buf[NBD_REQUEST_SIZE];
ssize_t ret;
cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC);
......@@ -504,7 +678,7 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request)
static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
{
uint8_t buf[4 + 4 + 8 + 8 + 4];
uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic;
ssize_t ret;
......@@ -582,7 +756,7 @@ ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
{
uint8_t buf[4 + 4 + 8];
uint8_t buf[NBD_REPLY_SIZE];
ssize_t ret;
/* Reply
......@@ -610,58 +784,47 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
#define MAX_NBD_REQUESTS 16
typedef struct NBDRequest NBDRequest;
struct NBDRequest {
QSIMPLEQ_ENTRY(NBDRequest) entry;
NBDClient *client;
uint8_t *data;
};
struct NBDExport {
BlockDriverState *bs;
off_t dev_offset;
off_t size;
uint32_t nbdflags;
QSIMPLEQ_HEAD(, NBDRequest) requests;
};
struct NBDClient {
int refcount;
void (*close)(NBDClient *client);
NBDExport *exp;
int sock;
Coroutine *recv_coroutine;
CoMutex send_lock;
Coroutine *send_coroutine;
int nb_requests;
};
static void nbd_client_get(NBDClient *client)
void nbd_client_get(NBDClient *client)
{
client->refcount++;
}
static void nbd_client_put(NBDClient *client)
void nbd_client_put(NBDClient *client)
{
if (--client->refcount == 0) {
/* The last reference should be dropped by client->close,
* which is called by nbd_client_close.
*/
assert(client->closing);
qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
close(client->sock);
client->sock = -1;
if (client->exp) {
QTAILQ_REMOVE(&client->exp->clients, client, next);
nbd_export_put(client->exp);
}
g_free(client);
}
}
static void nbd_client_close(NBDClient *client)
void nbd_client_close(NBDClient *client)
{
qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
close(client->sock);
client->sock = -1;
if (client->closing) {
return;
}
client->closing = true;
/* Force requests to finish. They will drop their own references,
* then we'll close the socket and free the NBDClient.
*/
shutdown(client->sock, 2);
/* Also tell the client, so that they release their reference. */
if (client->close) {
client->close(client);
}
nbd_client_put(client);
}
static NBDRequest *nbd_request_get(NBDClient *client)
......@@ -695,19 +858,86 @@ static void nbd_request_put(NBDRequest *req)
}
NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
off_t size, uint32_t nbdflags)
off_t size, uint32_t nbdflags,
void (*close)(NBDExport *))
{
NBDExport *exp = g_malloc0(sizeof(NBDExport));
QSIMPLEQ_INIT(&exp->requests);
exp->refcount = 1;
QTAILQ_INIT(&exp->clients);
exp->bs = bs;
exp->dev_offset = dev_offset;
exp->nbdflags = nbdflags;
exp->size = size == -1 ? bdrv_getlength(bs) : size;
exp->close = close;
return exp;
}
NBDExport *nbd_export_find(const char *name)
{
NBDExport *exp;
QTAILQ_FOREACH(exp, &exports, next) {
if (strcmp(name, exp->name) == 0) {
return exp;
}
}
return NULL;
}
void nbd_export_set_name(NBDExport *exp, const char *name)
{
if (exp->name == name) {
return;
}
nbd_export_get(exp);
if (exp->name != NULL) {
g_free(exp->name);
exp->name = NULL;
QTAILQ_REMOVE(&exports, exp, next);
nbd_export_put(exp);
}
if (name != NULL) {
nbd_export_get(exp);
exp->name = g_strdup(name);
QTAILQ_INSERT_TAIL(&exports, exp, next);
}
nbd_export_put(exp);
}
void nbd_export_close(NBDExport *exp)
{
NBDClient *client, *next;
nbd_export_get(exp);
QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
nbd_client_close(client);
}
nbd_export_set_name(exp, NULL);
nbd_export_put(exp);
}
void nbd_export_get(NBDExport *exp)
{
assert(exp->refcount > 0);
exp->refcount++;
}
void nbd_export_put(NBDExport *exp)
{
assert(exp->refcount > 0);
if (exp->refcount == 1) {
nbd_export_close(exp);
}
if (--exp->refcount == 0) {
assert(exp->name == NULL);
if (exp->close) {
exp->close(exp);
}
while (!QSIMPLEQ_EMPTY(&exp->requests)) {
NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests);
QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry);
......@@ -715,8 +945,22 @@ void nbd_export_close(NBDExport *exp)
g_free(first);
}
bdrv_close(exp->bs);
g_free(exp);
}
}
BlockDriverState *nbd_export_get_blockdev(NBDExport *exp)
{
return exp->bs;
}
void nbd_export_close_all(void)
{
NBDExport *exp, *next;
QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
nbd_export_close(exp);
}
}
static int nbd_can_read(void *opaque);
......@@ -805,14 +1049,18 @@ out:
static void nbd_trip(void *opaque)
{
NBDClient *client = opaque;
NBDRequest *req = nbd_request_get(client);
NBDExport *exp = client->exp;
NBDRequest *req;
struct nbd_request request;
struct nbd_reply reply;
ssize_t ret;
TRACE("Reading request.");
if (client->closing) {
return;
}
req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request);
if (ret == -EAGAIN) {
goto done;
......@@ -974,15 +1222,21 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *))
{
NBDClient *client;
if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) < 0) {
return NULL;
}
client = g_malloc0(sizeof(NBDClient));
client->refcount = 1;
client->exp = exp;
client->sock = csock;
if (nbd_send_negotiate(client) < 0) {
g_free(client);
return NULL;
}
client->close = close;
qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next);
nbd_export_get(exp);
}
return client;
}
......@@ -79,9 +79,22 @@ typedef struct NBDExport NBDExport;
typedef struct NBDClient NBDClient;
NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
off_t size, uint32_t nbdflags);
off_t size, uint32_t nbdflags,
void (*close)(NBDExport *));
void nbd_export_close(NBDExport *exp);
void nbd_export_get(NBDExport *exp);
void nbd_export_put(NBDExport *exp);
BlockDriverState *nbd_export_get_blockdev(NBDExport *exp);
NBDExport *nbd_export_find(const char *name);
void nbd_export_set_name(NBDExport *exp, const char *name);
void nbd_export_close_all(void);
NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *));
void nbd_client_close(NBDClient *client);
void nbd_client_get(NBDClient *client);
void nbd_client_put(NBDClient *client);
#endif
......@@ -41,8 +41,8 @@ static NBDExport *exp;
static int verbose;
static char *srcpath;
static char *sockpath;
static bool sigterm_reported;
static bool nbd_started;
static int persistent = 0;
static enum { RUNNING, TERMINATE, TERMINATING, TERMINATED } state;
static int shared = 1;
static int nb_fds;
......@@ -186,7 +186,7 @@ static int find_partition(BlockDriverState *bs, int partition,
static void termsig_handler(int signum)
{
sigterm_reported = true;
state = TERMINATE;
qemu_notify_event();
}
......@@ -269,10 +269,20 @@ static int nbd_can_accept(void *opaque)
return nb_fds < shared;
}
static void nbd_export_closed(NBDExport *exp)
{
assert(state == TERMINATING);
state = TERMINATED;
}
static void nbd_client_closed(NBDClient *client)
{
nb_fds--;
if (nb_fds == 0 && !persistent && state == RUNNING) {
state = TERMINATE;
}
qemu_notify_event();
nbd_client_put(client);
}
static void nbd_accept(void *opaque)
......@@ -282,7 +292,11 @@ static void nbd_accept(void *opaque)
socklen_t addr_len = sizeof(addr);
int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len);
nbd_started = true;
if (state >= TERMINATE) {
close(fd);
return;
}
if (fd >= 0 && nbd_client_new(exp, fd, nbd_client_closed)) {
nb_fds++;
}
......@@ -329,7 +343,6 @@ int main(int argc, char **argv)
int partition = -1;
int ret;
int fd;
int persistent = 0;
bool seen_cache = false;
#ifdef CONFIG_LINUX_AIO
bool seen_aio = false;
......@@ -546,7 +559,7 @@ int main(int argc, char **argv)
}
}
exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags);
exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags, nbd_export_closed);
if (sockpath) {
fd = unix_socket_incoming(sockpath);
......@@ -581,11 +594,18 @@ int main(int argc, char **argv)
err(EXIT_FAILURE, "Could not chdir to root directory");
}
state = RUNNING;
do {
main_loop_wait(false);
} while (!sigterm_reported && (persistent || !nbd_started || nb_fds > 0));
if (state == TERMINATE) {
state = TERMINATING;
nbd_export_close(exp);
nbd_export_put(exp);
exp = NULL;
}
} while (state != TERMINATED);
bdrv_close(bs);
if (sockpath) {
unlink(sockpath);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册