diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/dlm/lowcomms.c | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 444dc858c4a4..6b8078085e56 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work); static DECLARE_WORK(process_work, process_dlm_messages); static DEFINE_SPINLOCK(processqueue_lock); static bool process_dlm_messages_pending; +static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); static atomic_t processqueue_count; static LIST_HEAD(processqueue); @@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work) } list_del(&pentry->list); - atomic_dec(&processqueue_count); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); spin_unlock_bh(&processqueue_lock); for (;;) { @@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work) } list_del(&pentry->list); - atomic_dec(&processqueue_count); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); spin_unlock_bh(&processqueue_lock); } } @@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work) /* CF_RECV_PENDING cleared */ break; case DLM_IO_FLUSH: - flush_workqueue(process_workqueue); + /* we can't flush the process_workqueue here because a + * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non + * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead + * we have a waitqueue to wait until all messages are + * processed. + * + * This handling is only necessary to backoff the sender and + * not queue all messages from the socket layer into DLM + * processqueue. When DLM is capable to parse multiple messages + * on an e.g. per socket basis this handling can might be + * removed. Especially in a message burst we are too slow to + * process messages and the queue will fill up memory. + */ + wait_event(processqueue_wq, !atomic_read(&processqueue_count)); fallthrough; case DLM_IO_RESCHED: cond_resched(); @@ -1701,11 +1717,7 @@ static int work_start(void) return -ENOMEM; } - /* ordered dlm message process queue, - * should be converted to a tasklet - */ - process_workqueue = alloc_ordered_workqueue("dlm_process", - WQ_HIGHPRI | WQ_MEM_RECLAIM); + process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0); if (!process_workqueue) { log_print("can't start dlm_process"); destroy_workqueue(io_workqueue); |