diff options
author | David Teigland <teigland@redhat.com> | 2007-09-27 15:53:38 -0500 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2007-10-10 08:56:38 +0100 |
commit | c36258b5925e6cf6bf72904635100593573bfcff (patch) | |
tree | 565f1ce29a7f8a2cd1c25f2d36c932727adbdbc2 | |
parent | b434eda6fda5bcdcc2dd918e5ffbf7184f2d4e17 (diff) |
[DLM] block dlm_recv in recovery transition
Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm. This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.
The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle. While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up. dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero. This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.
The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv. The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa. Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.
Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
-rw-r--r-- | fs/dlm/dlm_internal.h | 1 | ||||
-rw-r--r-- | fs/dlm/lock.c | 136 | ||||
-rw-r--r-- | fs/dlm/lock.h | 3 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 1 | ||||
-rw-r--r-- | fs/dlm/member.c | 41 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 17 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 36 | ||||
-rw-r--r-- | fs/dlm/rcom.h | 5 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 11 | ||||
-rw-r--r-- | fs/dlm/requestqueue.c | 58 | ||||
-rw-r--r-- | fs/dlm/requestqueue.h | 4 |
11 files changed, 161 insertions, 152 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 74901e981e10..d2fc2384c3be 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -491,6 +491,7 @@ struct dlm_ls { uint64_t ls_recover_seq; struct dlm_recover *ls_recover_args; struct rw_semaphore ls_in_recovery; /* block local requests */ + struct rw_semaphore ls_recv_active; /* block dlm_recv */ struct list_head ls_requestqueue;/* queue remote requests */ struct mutex ls_requestqueue_mutex; char *ls_recover_buf; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 031229f144fa..3915b8e14146 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) +static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) { - struct dlm_message *ms = (struct dlm_message *) hd; - struct dlm_ls *ls; - int error = 0; - - if (!recovery) - dlm_message_in(ms); - - ls = dlm_find_lockspace_global(hd->h_lockspace); - if (!ls) { - log_print("drop message %d from %d for unknown lockspace %d", - ms->m_type, nodeid, hd->h_lockspace); - return -EINVAL; - } - - /* recovery may have just ended leaving a bunch of backed-up requests - in the requestqueue; wait while dlm_recoverd clears them */ - - if (!recovery) - dlm_wait_requestqueue(ls); - - /* recovery may have just started while there were a bunch of - in-flight requests -- save them in requestqueue to be processed - after recovery. we can't let dlm_recvd block on the recovery - lock. if dlm_recoverd is calling this function to clear the - requestqueue, it needs to be interrupted (-EINTR) if another - recovery operation is starting. */ - - while (1) { - if (dlm_locking_stopped(ls)) { - if (recovery) { - error = -EINTR; - goto out; - } - error = dlm_add_requestqueue(ls, nodeid, hd); - if (error == -EAGAIN) - continue; - else { - error = -EINTR; - goto out; - } - } - - if (dlm_lock_recovery_try(ls)) - break; - schedule(); - } - switch (ms->m_type) { /* messages sent to a master node */ @@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) log_error(ls, "unknown message type %d", ms->m_type); } - dlm_unlock_recovery(ls); - out: - dlm_put_lockspace(ls); dlm_astd_wake(); - return error; } +/* If the lockspace is in recovery mode (locking stopped), then normal + messages are saved on the requestqueue for processing after recovery is + done. When not in recovery mode, we wait for dlm_recoverd to drain saved + messages off the requestqueue before we process new ones. This occurs right + after recovery completes when we transition from saving all messages on + requestqueue, to processing all the saved messages, to processing new + messages as they arrive. */ -/* - * Recovery related - */ +static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, + int nodeid) +{ + if (dlm_locking_stopped(ls)) { + dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms); + } else { + dlm_wait_requestqueue(ls); + _receive_message(ls, ms); + } +} + +/* This is called by dlm_recoverd to process messages that were saved on + the requestqueue. */ + +void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) +{ + _receive_message(ls, ms); +} + +/* This is called by the midcomms layer when something is received for + the lockspace. It could be either a MSG (normal message sent as part of + standard locking activity) or an RCOM (recovery message sent as part of + lockspace recovery). */ + +void dlm_receive_buffer(struct dlm_header *hd, int nodeid) +{ + struct dlm_message *ms = (struct dlm_message *) hd; + struct dlm_rcom *rc = (struct dlm_rcom *) hd; + struct dlm_ls *ls; + int type = 0; + + switch (hd->h_cmd) { + case DLM_MSG: + dlm_message_in(ms); + type = ms->m_type; + break; + case DLM_RCOM: + dlm_rcom_in(rc); + type = rc->rc_type; + break; + default: + log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); + return; + } + + if (hd->h_nodeid != nodeid) { + log_print("invalid h_nodeid %d from %d lockspace %x", + hd->h_nodeid, nodeid, hd->h_lockspace); + return; + } + + ls = dlm_find_lockspace_global(hd->h_lockspace); + if (!ls) { + log_print("invalid h_lockspace %x from %d cmd %d type %d", + hd->h_lockspace, nodeid, hd->h_cmd, type); + + if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) + dlm_send_ls_not_ready(nodeid, rc); + return; + } + + /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to + be inactive (in this ls) before transitioning to recovery mode */ + + down_read(&ls->ls_recv_active); + if (hd->h_cmd == DLM_MSG) + dlm_receive_message(ls, ms, nodeid); + else + dlm_receive_rcom(ls, rc, nodeid); + up_read(&ls->ls_recv_active); + + dlm_put_lockspace(ls); +} static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) { diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 1720313c22df..ada04680a1e5 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -16,7 +16,8 @@ void dlm_print_rsb(struct dlm_rsb *r); void dlm_dump_rsb(struct dlm_rsb *r); void dlm_print_lkb(struct dlm_lkb *lkb); -int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery); +void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms); +void dlm_receive_buffer(struct dlm_header *hd, int nodeid); int dlm_modes_compat(int mode1, int mode2); int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, unsigned int flags, struct dlm_rsb **r_ret); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 1dc72105ab12..628eaa669e68 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, ls->ls_recover_seq = 0; ls->ls_recover_args = NULL; init_rwsem(&ls->ls_in_recovery); + init_rwsem(&ls->ls_recv_active); INIT_LIST_HEAD(&ls->ls_requestqueue); mutex_init(&ls->ls_requestqueue_mutex); mutex_init(&ls->ls_clear_proc_locks); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index d09977528f69..e9cdcab306e2 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -18,10 +18,6 @@ #include "rcom.h" #include "config.h" -/* - * Following called by dlm_recoverd thread - */ - static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) { struct dlm_member *memb = NULL; @@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) return error; } -/* - * Following called from lockspace.c - */ +/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before + dlm_ls_start() is called on any of them to start the new recovery. */ int dlm_ls_stop(struct dlm_ls *ls) { int new; /* - * A stop cancels any recovery that's in progress (see RECOVERY_STOP, - * dlm_recovery_stopped()) and prevents any new locks from being - * processed (see RUNNING, dlm_locking_stopped()). + * Prevent dlm_recv from being in the middle of something when we do + * the stop. This includes ensuring dlm_recv isn't processing a + * recovery message (rcom), while dlm_recoverd is aborting and + * resetting things from an in-progress recovery. i.e. we want + * dlm_recoverd to abort its recovery without worrying about dlm_recv + * processing an rcom at the same time. Stopping dlm_recv also makes + * it easy for dlm_receive_message() to check locking stopped and add a + * message to the requestqueue without races. + */ + + down_write(&ls->ls_recv_active); + + /* + * Abort any recovery that's in progress (see RECOVERY_STOP, + * dlm_recovery_stopped()) and tell any other threads running in the + * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). */ spin_lock(&ls->ls_recover_lock); @@ -271,8 +279,14 @@ int dlm_ls_stop(struct dlm_ls *ls) spin_unlock(&ls->ls_recover_lock); /* + * Let dlm_recv run again, now any normal messages will be saved on the + * requestqueue for later. + */ + + up_write(&ls->ls_recv_active); + + /* * This in_recovery lock does two things: - * * 1) Keeps this function from returning until all threads are out * of locking routines and locking is truely stopped. * 2) Keeps any new requests from being processed until it's unlocked @@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls) /* * The recoverd suspend/resume makes sure that dlm_recoverd (if - * running) has noticed the clearing of RUNNING above and quit - * processing the previous recovery. This will be true for all nodes - * before any nodes start the new recovery. + * running) has noticed RECOVERY_STOP above and quit processing the + * previous recovery. */ dlm_recoverd_suspend(ls); diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index a5126e0c68a6..f8c69dda16a0 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -27,7 +27,6 @@ #include "dlm_internal.h" #include "lowcomms.h" #include "config.h" -#include "rcom.h" #include "lock.h" #include "midcomms.h" @@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base, offset &= (limit - 1); len -= msglen; - switch (msg->h_cmd) { - case DLM_MSG: - dlm_receive_message(msg, nodeid, 0); - break; - - case DLM_RCOM: - dlm_receive_rcom(msg, nodeid); - break; - - default: - log_print("unknown msg type %x from %u: %u %u %u %u", - msg->h_cmd, nodeid, msglen, len, offset, ret); - } + dlm_receive_buffer(msg, nodeid); } if (msg != (struct dlm_header *) __tmp) diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 188b91c027e4..ae2fd97fa4ad 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) dlm_recover_process_copy(ls, rc_in); } -static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) +/* If the lockspace doesn't exist then still send a status message + back; it's possible that it just doesn't have its global_id yet. */ + +int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) { struct dlm_rcom *rc; struct rcom_config *rf; @@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc) return rv; } -/* Called by dlm_recvd; corresponds to dlm_receive_message() but special +/* Called by dlm_recv; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ -void dlm_receive_rcom(struct dlm_header *hd, int nodeid) +void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) { - struct dlm_rcom *rc = (struct dlm_rcom *) hd; - struct dlm_ls *ls; - - dlm_rcom_in(rc); - - /* If the lockspace doesn't exist then still send a status message - back; it's possible that it just doesn't have its global_id yet. */ - - ls = dlm_find_lockspace_global(hd->h_lockspace); - if (!ls) { - log_print("lockspace %x from %d type %x not found", - hd->h_lockspace, nodeid, rc->rc_type); - if (rc->rc_type == DLM_RCOM_STATUS) - send_ls_not_ready(nodeid, rc); - return; - } - if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { log_debug(ls, "ignoring recovery message %x from %d", rc->rc_type, nodeid); @@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid) if (is_old_reply(ls, rc)) goto out; - if (nodeid != rc->rc_header.h_nodeid) { - log_error(ls, "bad rcom nodeid %d from %d", - rc->rc_header.h_nodeid, nodeid); - goto out; - } - switch (rc->rc_type) { case DLM_RCOM_STATUS: receive_rcom_status(ls, rc); @@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid) DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type);); } out: - dlm_put_lockspace(ls); + return; } diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h index d7984321ff41..b09abd29ba38 100644 --- a/fs/dlm/rcom.h +++ b/fs/dlm/rcom.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); -void dlm_receive_rcom(struct dlm_header *hd, int nodeid); +void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); +int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); #endif diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 66575997861c..4b89e20eebe7 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -24,19 +24,28 @@ /* If the start for which we're re-enabling locking (seq) has been superseded - by a newer stop (ls_recover_seq), we need to leave locking disabled. */ + by a newer stop (ls_recover_seq), we need to leave locking disabled. + + We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees + locking stopped and b) adds a message to the requestqueue, but dlm_recoverd + enables locking and clears the requestqueue between a and b. */ static int enable_locking(struct dlm_ls *ls, uint64_t seq) { int error = -EINTR; + down_write(&ls->ls_recv_active); + spin_lock(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); + /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); error = 0; } spin_unlock(&ls->ls_recover_lock); + + up_write(&ls->ls_recv_active); return error; } diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index 65008d79c96d..0de04f17ccea 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -20,7 +20,7 @@ struct rq_entry { struct list_head list; int nodeid; - char request[1]; + char request[0]; }; /* @@ -30,42 +30,39 @@ struct rq_entry { * lockspace is enabled on some while still suspended on others. */ -int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) { struct rq_entry *e; int length = hd->h_length; - int rv = 0; e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); if (!e) { - log_print("dlm_add_requestqueue: out of memory\n"); - return 0; + log_print("dlm_add_requestqueue: out of memory len %d", length); + return; } e->nodeid = nodeid; memcpy(e->request, hd, length); - /* We need to check dlm_locking_stopped() after taking the mutex to - avoid a race where dlm_recoverd enables locking and runs - process_requestqueue between our earlier dlm_locking_stopped check - and this addition to the requestqueue. */ - mutex_lock(&ls->ls_requestqueue_mutex); - if (dlm_locking_stopped(ls)) - list_add_tail(&e->list, &ls->ls_requestqueue); - else { - log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid); - kfree(e); - rv = -EAGAIN; - } + list_add_tail(&e->list, &ls->ls_requestqueue); mutex_unlock(&ls->ls_requestqueue_mutex); - return rv; } +/* + * Called by dlm_recoverd to process normal messages saved while recovery was + * happening. Normal locking has been enabled before this is called. dlm_recv + * upon receiving a message, will wait for all saved messages to be drained + * here before processing the message it got. If a new dlm_ls_stop() arrives + * while we're processing these saved messages, it may block trying to suspend + * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that + * case, we don't abort since locking_stopped is still 0. If dlm_recv is not + * waiting for us, then this processing may be aborted due to locking_stopped. + */ + int dlm_process_requestqueue(struct dlm_ls *ls) { struct rq_entry *e; - struct dlm_header *hd; int error = 0; mutex_lock(&ls->ls_requestqueue_mutex); @@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls) e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); mutex_unlock(&ls->ls_requestqueue_mutex); - hd = (struct dlm_header *) e->request; - error = dlm_receive_message(hd, e->nodeid, 1); - - if (error == -EINTR) { - /* entry is left on requestqueue */ - log_debug(ls, "process_requestqueue abort eintr"); - break; - } + dlm_receive_message_saved(ls, (struct dlm_message *)e->request); mutex_lock(&ls->ls_requestqueue_mutex); list_del(&e->list); @@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls) /* * After recovery is done, locking is resumed and dlm_recoverd takes all the - * saved requests and processes them as they would have been by dlm_recvd. At - * the same time, dlm_recvd will start receiving new requests from remote - * nodes. We want to delay dlm_recvd processing new requests until - * dlm_recoverd has finished processing the old saved requests. + * saved requests and processes them as they would have been by dlm_recv. At + * the same time, dlm_recv will start receiving new requests from remote nodes. + * We want to delay dlm_recv processing new requests until dlm_recoverd has + * finished processing the old saved requests. We don't check for locking + * stopped here because dlm_ls_stop won't stop locking until it's suspended us + * (dlm_recv). */ void dlm_wait_requestqueue(struct dlm_ls *ls) @@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls) mutex_lock(&ls->ls_requestqueue_mutex); if (list_empty(&ls->ls_requestqueue)) break; - if (dlm_locking_stopped(ls)) - break; mutex_unlock(&ls->ls_requestqueue_mutex); schedule(); } diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h index 6a53ea03335d..aba34fc05ee4 100644 --- a/fs/dlm/requestqueue.h +++ b/fs/dlm/requestqueue.h @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -13,7 +13,7 @@ #ifndef __REQUESTQUEUE_DOT_H__ #define __REQUESTQUEUE_DOT_H__ -int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); int dlm_process_requestqueue(struct dlm_ls *ls); void dlm_wait_requestqueue(struct dlm_ls *ls); void dlm_purge_requestqueue(struct dlm_ls *ls); |