diff options
author | Alexander Aring <aahringo@redhat.com> | 2024-04-02 15:18:10 -0400 |
---|---|---|
committer | David Teigland <teigland@redhat.com> | 2024-04-09 11:47:51 -0500 |
commit | 92d59adfaf710f34ae7788fa54f0731a7640833b (patch) | |
tree | 4de104575a8dfdd25f5f978e72cf0414dffb86b6 /fs | |
parent | 578acf9a87a87531df5b59b3799ccc1256a4bbcc (diff) |
dlm: do message processing in softirq context
Move dlm message processing from an ordered workqueue context to an
ordered softirq context. Handling dlm messages in softirq will allow
requests to be cleared more quickly and efficiently, and should avoid
longer queues of incomplete requests. Later patches are expected to
run completion/blocking callbacks directly from this message processing
context, further reducing context switches required to complete a request.
In the longer term, concurrent message processing could be implemented.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/dlm/lowcomms.c | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 444dc858c4a4..6b8078085e56 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work); static DECLARE_WORK(process_work, process_dlm_messages); static DEFINE_SPINLOCK(processqueue_lock); static bool process_dlm_messages_pending; +static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); static atomic_t processqueue_count; static LIST_HEAD(processqueue); @@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work) } list_del(&pentry->list); - atomic_dec(&processqueue_count); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); spin_unlock_bh(&processqueue_lock); for (;;) { @@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work) } list_del(&pentry->list); - atomic_dec(&processqueue_count); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); spin_unlock_bh(&processqueue_lock); } } @@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work) /* CF_RECV_PENDING cleared */ break; case DLM_IO_FLUSH: - flush_workqueue(process_workqueue); + /* we can't flush the process_workqueue here because a + * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non + * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead + * we have a waitqueue to wait until all messages are + * processed. + * + * This handling is only necessary to backoff the sender and + * not queue all messages from the socket layer into DLM + * processqueue. When DLM is capable to parse multiple messages + * on an e.g. per socket basis this handling can might be + * removed. Especially in a message burst we are too slow to + * process messages and the queue will fill up memory. + */ + wait_event(processqueue_wq, !atomic_read(&processqueue_count)); fallthrough; case DLM_IO_RESCHED: cond_resched(); @@ -1701,11 +1717,7 @@ static int work_start(void) return -ENOMEM; } - /* ordered dlm message process queue, - * should be converted to a tasklet - */ - process_workqueue = alloc_ordered_workqueue("dlm_process", - WQ_HIGHPRI | WQ_MEM_RECLAIM); + process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0); if (!process_workqueue) { log_print("can't start dlm_process"); destroy_workqueue(io_workqueue); |