diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 0a4851b3cd4b89457270607c3e200c282367c937..14ca3eda6a835027ba2ed55937bebaf5a1963e15 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -81,10 +81,13 @@ struct connection { #define CF_CONNECTED 10 #define CF_RECONNECT 11 #define CF_DELAY_CONNECT 12 +#define CF_EOF 13 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; + atomic_t writequeue_cnt; void (*connect_action) (struct connection *); /* What to do to connect */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ + bool (*eof_condition)(struct connection *con); /* What to do to eof check */ int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -179,6 +182,11 @@ static struct connection *__find_con(int nodeid, int r) return NULL; } +static bool tcp_eof_condition(struct connection *con) +{ + return atomic_read(&con->writequeue_cnt); +} + static int dlm_con_init(struct connection *con, int nodeid) { con->rx_buflen = dlm_config.ci_buffer_size; @@ -190,6 +198,7 @@ static int dlm_con_init(struct connection *con, int nodeid) mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); + atomic_set(&con->writequeue_cnt, 0); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); init_waitqueue_head(&con->shutdown_wait); @@ -197,6 +206,7 @@ static int dlm_con_init(struct connection *con, int nodeid) if (dlm_config.ci_protocol == 0) { con->connect_action = tcp_connect_to_sock; con->shutdown_action = dlm_tcp_shutdown; + con->eof_condition = tcp_eof_condition; } else { con->connect_action = sctp_connect_to_sock; } @@ -723,6 +733,7 @@ static void close_connection(struct connection *con, bool and_other, clear_bit(CF_CONNECTED, &con->flags); clear_bit(CF_DELAY_CONNECT, &con->flags); clear_bit(CF_RECONNECT, &con->flags); + clear_bit(CF_EOF, &con->flags); mutex_unlock(&con->sock_mutex); clear_bit(CF_CLOSING, &con->flags); } @@ -860,16 +871,26 @@ static int receive_from_sock(struct connection *con) return -EAGAIN; out_close: - mutex_unlock(&con->sock_mutex); if (ret == 0) { - close_connection(con, false, true, false); log_print("connection %p got EOF from %d", con, con->nodeid); - /* handling for tcp shutdown */ - clear_bit(CF_SHUTDOWN, &con->flags); - wake_up(&con->shutdown_wait); + + if (con->eof_condition && con->eof_condition(con)) { + set_bit(CF_EOF, &con->flags); + mutex_unlock(&con->sock_mutex); + } else { + mutex_unlock(&con->sock_mutex); + close_connection(con, false, true, false); + + /* handling for tcp shutdown */ + clear_bit(CF_SHUTDOWN, &con->flags); + wake_up(&con->shutdown_wait); + } + /* signal to breaking receive worker */ ret = -1; + } else { + mutex_unlock(&con->sock_mutex); } return ret; } @@ -1021,6 +1042,7 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) if (e->len == 0 && e->users == 0) { list_del(&e->list); + atomic_dec(&e->con->writequeue_cnt); free_entry(e); } } @@ -1417,6 +1439,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, *ppc = page_address(e->page); e->end += len; + atomic_inc(&con->writequeue_cnt); spin_lock(&con->writequeue_lock); list_add_tail(&e->list, &con->writequeue); @@ -1536,6 +1559,21 @@ static void send_to_sock(struct connection *con) writequeue_entry_complete(e, ret); } spin_unlock(&con->writequeue_lock); + + /* close if we got EOF */ + if (test_and_clear_bit(CF_EOF, &con->flags)) { + mutex_unlock(&con->sock_mutex); + close_connection(con, false, false, true); + + /* handling for tcp shutdown */ + clear_bit(CF_SHUTDOWN, &con->flags); + wake_up(&con->shutdown_wait); + } else { + mutex_unlock(&con->sock_mutex); + } + + return; + out: mutex_unlock(&con->sock_mutex); return;