提交 d0d6ff58 编写于 作者: D Daniel P. Berrange 提交者: Paolo Bonzini

nbd: convert qemu-nbd server to use I/O channels for connection setup

This converts the qemu-nbd server to use the QIOChannelSocket
class for initial listener socket setup and accepting of client
connections. Actual I/O is still being performed against the
socket file descriptor using the POSIX socket APIs.
Signed-off-by: NDaniel P. Berrange <berrange@redhat.com>
Message-Id: <1455129674-17255-5-git-send-email-berrange@redhat.com>
Signed-off-by: NPaolo Bonzini <pbonzini@redhat.com>
上级 064097d9
...@@ -22,19 +22,17 @@ ...@@ -22,19 +22,17 @@
#include "block/block_int.h" #include "block/block_int.h"
#include "block/nbd.h" #include "block/nbd.h"
#include "qemu/main-loop.h" #include "qemu/main-loop.h"
#include "qemu/sockets.h"
#include "qemu/error-report.h" #include "qemu/error-report.h"
#include "qemu/config-file.h" #include "qemu/config-file.h"
#include "block/snapshot.h" #include "block/snapshot.h"
#include "qapi/util.h" #include "qapi/util.h"
#include "qapi/qmp/qstring.h" #include "qapi/qmp/qstring.h"
#include "qom/object_interfaces.h" #include "qom/object_interfaces.h"
#include "io/channel-socket.h"
#include <getopt.h> #include <getopt.h>
#include <sys/socket.h> #include <sys/types.h>
#include <netinet/in.h> #include <signal.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <libgen.h> #include <libgen.h>
#include <pthread.h> #include <pthread.h>
...@@ -53,7 +51,8 @@ static int persistent = 0; ...@@ -53,7 +51,8 @@ static int persistent = 0;
static enum { RUNNING, TERMINATE, TERMINATING, TERMINATED } state; static enum { RUNNING, TERMINATE, TERMINATING, TERMINATED } state;
static int shared = 1; static int shared = 1;
static int nb_fds; static int nb_fds;
static int server_fd; static QIOChannelSocket *server_ioc;
static int server_watch = -1;
static void usage(const char *name) static void usage(const char *name)
{ {
...@@ -236,19 +235,21 @@ static void *nbd_client_thread(void *arg) ...@@ -236,19 +235,21 @@ static void *nbd_client_thread(void *arg)
char *device = arg; char *device = arg;
off_t size; off_t size;
uint32_t nbdflags; uint32_t nbdflags;
int fd, sock; QIOChannelSocket *sioc;
int fd;
int ret; int ret;
pthread_t show_parts_thread; pthread_t show_parts_thread;
Error *local_error = NULL; Error *local_error = NULL;
sioc = qio_channel_socket_new();
sock = socket_connect(saddr, &local_error, NULL, NULL); if (qio_channel_socket_connect_sync(sioc,
if (sock < 0) { saddr,
&local_error) < 0) {
error_report_err(local_error); error_report_err(local_error);
goto out; goto out;
} }
ret = nbd_receive_negotiate(sock, NULL, &nbdflags, ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
&size, &local_error); &size, &local_error);
if (ret < 0) { if (ret < 0) {
if (local_error) { if (local_error) {
...@@ -264,7 +265,7 @@ static void *nbd_client_thread(void *arg) ...@@ -264,7 +265,7 @@ static void *nbd_client_thread(void *arg)
goto out_socket; goto out_socket;
} }
ret = nbd_init(fd, sock, nbdflags, size); ret = nbd_init(fd, sioc->fd, nbdflags, size);
if (ret < 0) { if (ret < 0) {
goto out_fd; goto out_fd;
} }
...@@ -285,13 +286,14 @@ static void *nbd_client_thread(void *arg) ...@@ -285,13 +286,14 @@ static void *nbd_client_thread(void *arg)
goto out_fd; goto out_fd;
} }
close(fd); close(fd);
object_unref(OBJECT(sioc));
kill(getpid(), SIGTERM); kill(getpid(), SIGTERM);
return (void *) EXIT_SUCCESS; return (void *) EXIT_SUCCESS;
out_fd: out_fd:
close(fd); close(fd);
out_socket: out_socket:
closesocket(sock); object_unref(OBJECT(sioc));
out: out:
kill(getpid(), SIGTERM); kill(getpid(), SIGTERM);
return (void *) EXIT_FAILURE; return (void *) EXIT_FAILURE;
...@@ -308,7 +310,7 @@ static void nbd_export_closed(NBDExport *exp) ...@@ -308,7 +310,7 @@ static void nbd_export_closed(NBDExport *exp)
state = TERMINATED; state = TERMINATED;
} }
static void nbd_update_server_fd_handler(int fd); static void nbd_update_server_watch(void);
static void nbd_client_closed(NBDClient *client) static void nbd_client_closed(NBDClient *client)
{ {
...@@ -316,37 +318,51 @@ static void nbd_client_closed(NBDClient *client) ...@@ -316,37 +318,51 @@ static void nbd_client_closed(NBDClient *client)
if (nb_fds == 0 && !persistent && state == RUNNING) { if (nb_fds == 0 && !persistent && state == RUNNING) {
state = TERMINATE; state = TERMINATE;
} }
nbd_update_server_fd_handler(server_fd); nbd_update_server_watch();
nbd_client_put(client); nbd_client_put(client);
} }
static void nbd_accept(void *opaque) static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
{ {
struct sockaddr_in addr; QIOChannelSocket *cioc;
socklen_t addr_len = sizeof(addr); int fd;
int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len); cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
if (fd < 0) { NULL);
perror("accept"); if (!cioc) {
return; return TRUE;
} }
if (state >= TERMINATE) { if (state >= TERMINATE) {
close(fd); object_unref(OBJECT(cioc));
return; return TRUE;
} }
nb_fds++; nb_fds++;
nbd_update_server_fd_handler(server_fd); nbd_update_server_watch();
fd = dup(cioc->fd);
if (fd >= 0) {
nbd_client_new(exp, fd, nbd_client_closed); nbd_client_new(exp, fd, nbd_client_closed);
}
object_unref(OBJECT(cioc));
return TRUE;
} }
static void nbd_update_server_fd_handler(int fd) static void nbd_update_server_watch(void)
{ {
if (nbd_can_accept()) { if (nbd_can_accept()) {
qemu_set_fd_handler(fd, nbd_accept, NULL, (void *)(uintptr_t)fd); if (server_watch == -1) {
server_watch = qio_channel_add_watch(QIO_CHANNEL(server_ioc),
G_IO_IN,
nbd_accept,
NULL, NULL);
}
} else { } else {
qemu_set_fd_handler(fd, NULL, NULL, NULL); if (server_watch != -1) {
g_source_remove(server_watch);
server_watch = -1;
}
} }
} }
...@@ -433,7 +449,6 @@ int main(int argc, char **argv) ...@@ -433,7 +449,6 @@ int main(int argc, char **argv)
int flags = BDRV_O_RDWR; int flags = BDRV_O_RDWR;
int partition = -1; int partition = -1;
int ret = 0; int ret = 0;
int fd;
bool seen_cache = false; bool seen_cache = false;
bool seen_discard = false; bool seen_discard = false;
bool seen_aio = false; bool seen_aio = false;
...@@ -632,15 +647,15 @@ int main(int argc, char **argv) ...@@ -632,15 +647,15 @@ int main(int argc, char **argv)
} }
if (disconnect) { if (disconnect) {
fd = open(argv[optind], O_RDWR); int nbdfd = open(argv[optind], O_RDWR);
if (fd < 0) { if (nbdfd < 0) {
error_report("Cannot open %s: %s", argv[optind], error_report("Cannot open %s: %s", argv[optind],
strerror(errno)); strerror(errno));
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
nbd_disconnect(fd); nbd_disconnect(nbdfd);
close(fd); close(nbdfd);
printf("%s disconnected\n", argv[optind]); printf("%s disconnected\n", argv[optind]);
...@@ -773,8 +788,9 @@ int main(int argc, char **argv) ...@@ -773,8 +788,9 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
fd = socket_listen(saddr, &local_err); server_ioc = qio_channel_socket_new();
if (fd < 0) { if (qio_channel_socket_listen_sync(server_ioc, saddr, &local_err) < 0) {
object_unref(OBJECT(server_ioc));
error_report_err(local_err); error_report_err(local_err);
return 1; return 1;
} }
...@@ -792,8 +808,7 @@ int main(int argc, char **argv) ...@@ -792,8 +808,7 @@ int main(int argc, char **argv)
memset(&client_thread, 0, sizeof(client_thread)); memset(&client_thread, 0, sizeof(client_thread));
} }
server_fd = fd; nbd_update_server_watch();
nbd_update_server_fd_handler(fd);
/* now when the initialization is (almost) complete, chdir("/") /* now when the initialization is (almost) complete, chdir("/")
* to free any busy filesystems */ * to free any busy filesystems */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册