From 88c853c3f5c0a07c5db61b494ee25152535cfeee Mon Sep 17 00:00:00 2001 From: David Howells Date: Tue, 23 Jul 2019 11:24:59 +0100 Subject: afs: Fix cell refcounting by splitting the usage counter Management of the lifetime of afs_cell struct has some problems due to the usage counter being used to determine whether objects of that type are in use in addition to whether anyone might be interested in the structure. This is made trickier by cell objects being cached for a period of time in case they're quickly reused as they hold the result of a setup process that may be slow (DNS lookups, AFS RPC ops). Problems include the cached root volume from alias resolution pinning its parent cell record, rmmod occasionally hanging and occasionally producing assertion failures. Fix this by splitting the count of active users from the struct reference count. Things then work as follows: (1) The cell cache keeps +1 on the cell's activity count and this has to be dropped before the cell can be removed. afs_manage_cell() tries to exchange the 1 to a 0 with the cells_lock write-locked, and if successful, the record is removed from the net->cells. (2) One struct ref is 'owned' by the activity count. That is put when the active count is reduced to 0 (final_destruction label). (3) A ref can be held on a cell whilst it is queued for management on a work queue without confusing the active count. afs_queue_cell() is added to wrap this. (4) The queue's ref is dropped at the end of the management. This is split out into a separate function, afs_manage_cell_work(). (5) The root volume record is put after a cell is removed (at the final_destruction label) rather then in the RCU destruction routine. (6) Volumes hold struct refs, but aren't active users. (7) Both counts are displayed in /proc/net/afs/cells. There are some management function changes: (*) afs_put_cell() now just decrements the refcount and triggers the RCU destruction if it becomes 0. It no longer sets a timer to have the manager do this. (*) afs_use_cell() and afs_unuse_cell() are added to increase and decrease the active count. afs_unuse_cell() sets the management timer. (*) afs_queue_cell() is added to queue a cell with approprate refs. There are also some other fixes: (*) Don't let /proc/net/afs/cells access a cell's vllist if it's NULL. (*) Make sure that candidate cells in lookups are properly destroyed rather than being simply kfree'd. This ensures the bits it points to are destroyed also. (*) afs_dec_cells_outstanding() is now called in cell destruction rather than at "final_destruction". This ensures that cell->net is still valid to the end of the destructor. (*) As a consequence of the previous two changes, move the increment of net->cells_outstanding that was at the point of insertion into the tree to the allocation routine to correctly balance things. Fixes: 989782dcdc91 ("afs: Overhaul cell database management") Signed-off-by: David Howells --- fs/afs/cell.c | 149 ++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 46 deletions(-) (limited to 'fs/afs/cell.c') diff --git a/fs/afs/cell.c b/fs/afs/cell.c index 5da83e84952a..c906000b0ff8 100644 --- a/fs/afs/cell.c +++ b/fs/afs/cell.c @@ -19,7 +19,7 @@ static unsigned __read_mostly afs_cell_gc_delay = 10; static unsigned __read_mostly afs_cell_min_ttl = 10 * 60; static unsigned __read_mostly afs_cell_max_ttl = 24 * 60 * 60; -static void afs_manage_cell(struct work_struct *); +static void afs_manage_cell_work(struct work_struct *); static void afs_dec_cells_outstanding(struct afs_net *net) { @@ -62,8 +62,7 @@ static struct afs_cell *afs_find_cell_locked(struct afs_net *net, cell = net->ws_cell; if (!cell) return ERR_PTR(-EDESTADDRREQ); - afs_get_cell(cell); - return cell; + goto found; } p = net->cells.rb_node; @@ -85,12 +84,12 @@ static struct afs_cell *afs_find_cell_locked(struct afs_net *net, return ERR_PTR(-ENOENT); found: - if (!atomic_inc_not_zero(&cell->usage)) - return ERR_PTR(-ENOENT); - - return cell; + return afs_use_cell(cell); } +/* + * Look up and get an activation reference on a cell record. + */ struct afs_cell *afs_find_cell(struct afs_net *net, const char *name, unsigned int namesz) { @@ -153,8 +152,9 @@ static struct afs_cell *afs_alloc_cell(struct afs_net *net, cell->name[i] = tolower(name[i]); cell->name[i] = 0; - atomic_set(&cell->usage, 2); - INIT_WORK(&cell->manager, afs_manage_cell); + atomic_set(&cell->ref, 1); + atomic_set(&cell->active, 0); + INIT_WORK(&cell->manager, afs_manage_cell_work); cell->volumes = RB_ROOT; INIT_HLIST_HEAD(&cell->proc_volumes); seqlock_init(&cell->volume_lock); @@ -193,6 +193,7 @@ static struct afs_cell *afs_alloc_cell(struct afs_net *net, cell->dns_source = vllist->source; cell->dns_status = vllist->status; smp_store_release(&cell->dns_lookup_count, 1); /* vs source/status */ + atomic_inc(&net->cells_outstanding); _leave(" = %p", cell); return cell; @@ -275,12 +276,12 @@ struct afs_cell *afs_lookup_cell(struct afs_net *net, cell = candidate; candidate = NULL; + atomic_set(&cell->active, 2); rb_link_node_rcu(&cell->net_node, parent, pp); rb_insert_color(&cell->net_node, &net->cells); - atomic_inc(&net->cells_outstanding); up_write(&net->cells_lock); - queue_work(afs_wq, &cell->manager); + afs_queue_cell(cell); wait_for_cell: _debug("wait_for_cell"); @@ -305,16 +306,17 @@ cell_already_exists: if (excl) { ret = -EEXIST; } else { - afs_get_cell(cursor); + afs_use_cell(cursor); ret = 0; } up_write(&net->cells_lock); - kfree(candidate); + if (candidate) + afs_put_cell(candidate); if (ret == 0) goto wait_for_cell; goto error_noput; error: - afs_put_cell(net, cell); + afs_unuse_cell(net, cell); error_noput: _leave(" = %d [error]", ret); return ERR_PTR(ret); @@ -359,7 +361,7 @@ int afs_cell_init(struct afs_net *net, const char *rootcell) } if (!test_and_set_bit(AFS_CELL_FL_NO_GC, &new_root->flags)) - afs_get_cell(new_root); + afs_use_cell(new_root); /* install the new cell */ down_write(&net->cells_lock); @@ -367,7 +369,7 @@ int afs_cell_init(struct afs_net *net, const char *rootcell) net->ws_cell = new_root; up_write(&net->cells_lock); - afs_put_cell(net, old_root); + afs_unuse_cell(net, old_root); _leave(" = 0"); return 0; } @@ -473,18 +475,21 @@ out_wake: static void afs_cell_destroy(struct rcu_head *rcu) { struct afs_cell *cell = container_of(rcu, struct afs_cell, rcu); + struct afs_net *net = cell->net; + int u; _enter("%p{%s}", cell, cell->name); - ASSERTCMP(atomic_read(&cell->usage), ==, 0); + u = atomic_read(&cell->ref); + ASSERTCMP(u, ==, 0); - afs_put_volume(cell->net, cell->root_volume, afs_volume_trace_put_cell_root); - afs_put_vlserverlist(cell->net, rcu_access_pointer(cell->vl_servers)); - afs_put_cell(cell->net, cell->alias_of); + afs_put_vlserverlist(net, rcu_access_pointer(cell->vl_servers)); + afs_unuse_cell(net, cell->alias_of); key_put(cell->anonymous_key); kfree(cell->name); kfree(cell); + afs_dec_cells_outstanding(net); _leave(" [destroyed]"); } @@ -519,16 +524,50 @@ void afs_cells_timer(struct timer_list *timer) */ struct afs_cell *afs_get_cell(struct afs_cell *cell) { - atomic_inc(&cell->usage); + if (atomic_read(&cell->ref) <= 0) + BUG(); + + atomic_inc(&cell->ref); return cell; } /* * Drop a reference on a cell record. */ -void afs_put_cell(struct afs_net *net, struct afs_cell *cell) +void afs_put_cell(struct afs_cell *cell) +{ + if (cell) { + unsigned int u, a; + + u = atomic_dec_return(&cell->ref); + if (u == 0) { + a = atomic_read(&cell->active); + WARN(a != 0, "Cell active count %u > 0\n", a); + call_rcu(&cell->rcu, afs_cell_destroy); + } + } +} + +/* + * Note a cell becoming more active. + */ +struct afs_cell *afs_use_cell(struct afs_cell *cell) +{ + if (atomic_read(&cell->ref) <= 0) + BUG(); + + atomic_inc(&cell->active); + return cell; +} + +/* + * Record a cell becoming less active. When the active counter reaches 1, it + * is scheduled for destruction, but may get reactivated. + */ +void afs_unuse_cell(struct afs_net *net, struct afs_cell *cell) { time64_t now, expire_delay; + int a; if (!cell) return; @@ -541,11 +580,21 @@ void afs_put_cell(struct afs_net *net, struct afs_cell *cell) if (cell->vl_servers->nr_servers) expire_delay = afs_cell_gc_delay; - if (atomic_dec_return(&cell->usage) > 1) - return; + a = atomic_dec_return(&cell->active); + WARN_ON(a == 0); + if (a == 1) + /* 'cell' may now be garbage collected. */ + afs_set_cell_timer(net, expire_delay); +} - /* 'cell' may now be garbage collected. */ - afs_set_cell_timer(net, expire_delay); +/* + * Queue a cell for management, giving the workqueue a ref to hold. + */ +void afs_queue_cell(struct afs_cell *cell) +{ + afs_get_cell(cell); + if (!queue_work(afs_wq, &cell->manager)) + afs_put_cell(cell); } /* @@ -645,12 +694,11 @@ static void afs_deactivate_cell(struct afs_net *net, struct afs_cell *cell) * Manage a cell record, initialising and destroying it, maintaining its DNS * records. */ -static void afs_manage_cell(struct work_struct *work) +static void afs_manage_cell(struct afs_cell *cell) { - struct afs_cell *cell = container_of(work, struct afs_cell, manager); struct afs_net *net = cell->net; bool deleted; - int ret, usage; + int ret, active; _enter("%s", cell->name); @@ -660,10 +708,11 @@ again: case AFS_CELL_INACTIVE: case AFS_CELL_FAILED: down_write(&net->cells_lock); - usage = 1; - deleted = atomic_try_cmpxchg_relaxed(&cell->usage, &usage, 0); - if (deleted) + active = 1; + deleted = atomic_try_cmpxchg_relaxed(&cell->active, &active, 0); + if (deleted) { rb_erase(&cell->net_node, &net->cells); + } up_write(&net->cells_lock); if (deleted) goto final_destruction; @@ -688,7 +737,7 @@ again: goto again; case AFS_CELL_ACTIVE: - if (atomic_read(&cell->usage) > 1) { + if (atomic_read(&cell->active) > 1) { if (test_and_clear_bit(AFS_CELL_FL_DO_LOOKUP, &cell->flags)) { ret = afs_update_cell(cell); if (ret < 0) @@ -701,7 +750,7 @@ again: goto again; case AFS_CELL_DEACTIVATING: - if (atomic_read(&cell->usage) > 1) + if (atomic_read(&cell->active) > 1) goto reverse_deactivation; afs_deactivate_cell(net, cell); smp_store_release(&cell->state, AFS_CELL_INACTIVE); @@ -733,9 +782,18 @@ done: return; final_destruction: - call_rcu(&cell->rcu, afs_cell_destroy); - afs_dec_cells_outstanding(net); - _leave(" [destruct %d]", atomic_read(&net->cells_outstanding)); + /* The root volume is pinning the cell */ + afs_put_volume(cell->net, cell->root_volume, afs_volume_trace_put_cell_root); + cell->root_volume = NULL; + afs_put_cell(cell); +} + +static void afs_manage_cell_work(struct work_struct *work) +{ + struct afs_cell *cell = container_of(work, struct afs_cell, manager); + + afs_manage_cell(cell); + afs_put_cell(cell); } /* @@ -769,21 +827,20 @@ void afs_manage_cells(struct work_struct *work) for (cursor = rb_first(&net->cells); cursor; cursor = rb_next(cursor)) { struct afs_cell *cell = rb_entry(cursor, struct afs_cell, net_node); - unsigned usage; + unsigned active; bool sched_cell = false; - usage = atomic_read(&cell->usage); - _debug("manage %s %u", cell->name, usage); + active = atomic_read(&cell->active); + _debug("manage %s %u %u", cell->name, atomic_read(&cell->ref), active); - ASSERTCMP(usage, >=, 1); + ASSERTCMP(active, >=, 1); if (purging) { if (test_and_clear_bit(AFS_CELL_FL_NO_GC, &cell->flags)) - usage = atomic_dec_return(&cell->usage); - ASSERTCMP(usage, ==, 1); + atomic_dec(&cell->active); } - if (usage == 1) { + if (active == 1) { struct afs_vlserver_list *vllist; time64_t expire_at = cell->last_inactive; @@ -806,7 +863,7 @@ void afs_manage_cells(struct work_struct *work) } if (sched_cell) - queue_work(afs_wq, &cell->manager); + afs_queue_cell(cell); } up_read(&net->cells_lock); @@ -843,7 +900,7 @@ void afs_cell_purge(struct afs_net *net) ws = net->ws_cell; net->ws_cell = NULL; up_write(&net->cells_lock); - afs_put_cell(net, ws); + afs_unuse_cell(net, ws); _debug("del timer"); if (del_timer_sync(&net->cells_timer)) -- cgit v1.2.3