summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/lowcomms.c48
1 files changed, 43 insertions, 5 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 0a4851b3cd4b..14ca3eda6a83 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -81,10 +81,13 @@ struct connection {
#define CF_CONNECTED 10
#define CF_RECONNECT 11
#define CF_DELAY_CONNECT 12
+#define CF_EOF 13
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
+ atomic_t writequeue_cnt;
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+ bool (*eof_condition)(struct connection *con); /* What to do to eof check */
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -179,6 +182,11 @@ static struct connection *__find_con(int nodeid, int r)
return NULL;
}
+static bool tcp_eof_condition(struct connection *con)
+{
+ return atomic_read(&con->writequeue_cnt);
+}
+
static int dlm_con_init(struct connection *con, int nodeid)
{
con->rx_buflen = dlm_config.ci_buffer_size;
@@ -190,6 +198,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
+ atomic_set(&con->writequeue_cnt, 0);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
@@ -197,6 +206,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
if (dlm_config.ci_protocol == 0) {
con->connect_action = tcp_connect_to_sock;
con->shutdown_action = dlm_tcp_shutdown;
+ con->eof_condition = tcp_eof_condition;
} else {
con->connect_action = sctp_connect_to_sock;
}
@@ -723,6 +733,7 @@ static void close_connection(struct connection *con, bool and_other,
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
+ clear_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
@@ -860,16 +871,26 @@ out_resched:
return -EAGAIN;
out_close:
- mutex_unlock(&con->sock_mutex);
if (ret == 0) {
- close_connection(con, false, true, false);
log_print("connection %p got EOF from %d",
con, con->nodeid);
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
+
+ if (con->eof_condition && con->eof_condition(con)) {
+ set_bit(CF_EOF, &con->flags);
+ mutex_unlock(&con->sock_mutex);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, true, false);
+
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ }
+
/* signal to breaking receive worker */
ret = -1;
+ } else {
+ mutex_unlock(&con->sock_mutex);
}
return ret;
}
@@ -1021,6 +1042,7 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
free_entry(e);
}
}
@@ -1417,6 +1439,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
*ppc = page_address(e->page);
e->end += len;
+ atomic_inc(&con->writequeue_cnt);
spin_lock(&con->writequeue_lock);
list_add_tail(&e->list, &con->writequeue);
@@ -1536,6 +1559,21 @@ static void send_to_sock(struct connection *con)
writequeue_entry_complete(e, ret);
}
spin_unlock(&con->writequeue_lock);
+
+ /* close if we got EOF */
+ if (test_and_clear_bit(CF_EOF, &con->flags)) {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, false, true);
+
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ }
+
+ return;
+
out:
mutex_unlock(&con->sock_mutex);
return;