diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/sbitmap.c | 113 |
1 files changed, 81 insertions, 32 deletions
diff --git a/lib/sbitmap.c b/lib/sbitmap.c index e6a9c06ec70c..6fdc6267f4a8 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -270,18 +270,33 @@ void sbitmap_bitmap_show(struct sbitmap *sb, struct seq_file *m) } EXPORT_SYMBOL_GPL(sbitmap_bitmap_show); -static unsigned int sbq_calc_wake_batch(unsigned int depth) +static unsigned int sbq_calc_wake_batch(struct sbitmap_queue *sbq, + unsigned int depth) { unsigned int wake_batch; + unsigned int shallow_depth; /* * For each batch, we wake up one queue. We need to make sure that our - * batch size is small enough that the full depth of the bitmap is - * enough to wake up all of the queues. + * batch size is small enough that the full depth of the bitmap, + * potentially limited by a shallow depth, is enough to wake up all of + * the queues. + * + * Each full word of the bitmap has bits_per_word bits, and there might + * be a partial word. There are depth / bits_per_word full words and + * depth % bits_per_word bits left over. In bitwise arithmetic: + * + * bits_per_word = 1 << shift + * depth / bits_per_word = depth >> shift + * depth % bits_per_word = depth & ((1 << shift) - 1) + * + * Each word can be limited to sbq->min_shallow_depth bits. */ - wake_batch = SBQ_WAKE_BATCH; - if (wake_batch > depth / SBQ_WAIT_QUEUES) - wake_batch = max(1U, depth / SBQ_WAIT_QUEUES); + shallow_depth = min(1U << sbq->sb.shift, sbq->min_shallow_depth); + depth = ((depth >> sbq->sb.shift) * shallow_depth + + min(depth & ((1U << sbq->sb.shift) - 1), shallow_depth)); + wake_batch = clamp_t(unsigned int, depth / SBQ_WAIT_QUEUES, 1, + SBQ_WAKE_BATCH); return wake_batch; } @@ -307,7 +322,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, *per_cpu_ptr(sbq->alloc_hint, i) = prandom_u32() % depth; } - sbq->wake_batch = sbq_calc_wake_batch(depth); + sbq->min_shallow_depth = UINT_MAX; + sbq->wake_batch = sbq_calc_wake_batch(sbq, depth); atomic_set(&sbq->wake_index, 0); sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node); @@ -327,21 +343,28 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, } EXPORT_SYMBOL_GPL(sbitmap_queue_init_node); -void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq, + unsigned int depth) { - unsigned int wake_batch = sbq_calc_wake_batch(depth); + unsigned int wake_batch = sbq_calc_wake_batch(sbq, depth); int i; if (sbq->wake_batch != wake_batch) { WRITE_ONCE(sbq->wake_batch, wake_batch); /* - * Pairs with the memory barrier in sbq_wake_up() to ensure that - * the batch size is updated before the wait counts. + * Pairs with the memory barrier in sbitmap_queue_wake_up() + * to ensure that the batch size is updated before the wait + * counts. */ smp_mb__before_atomic(); for (i = 0; i < SBQ_WAIT_QUEUES; i++) atomic_set(&sbq->ws[i].wait_cnt, 1); } +} + +void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +{ + sbitmap_queue_update_wake_batch(sbq, depth); sbitmap_resize(&sbq->sb, depth); } EXPORT_SYMBOL_GPL(sbitmap_queue_resize); @@ -380,6 +403,8 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, unsigned int hint, depth; int nr; + WARN_ON_ONCE(shallow_depth < sbq->min_shallow_depth); + hint = this_cpu_read(*sbq->alloc_hint); depth = READ_ONCE(sbq->sb.depth); if (unlikely(hint >= depth)) { @@ -403,6 +428,14 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, } EXPORT_SYMBOL_GPL(__sbitmap_queue_get_shallow); +void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq, + unsigned int min_shallow_depth) +{ + sbq->min_shallow_depth = min_shallow_depth; + sbitmap_queue_update_wake_batch(sbq, sbq->sb.depth); +} +EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth); + static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) { int i, wake_index; @@ -425,52 +458,67 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) return NULL; } -static void sbq_wake_up(struct sbitmap_queue *sbq) +static bool __sbq_wake_up(struct sbitmap_queue *sbq) { struct sbq_wait_state *ws; unsigned int wake_batch; int wait_cnt; - /* - * Pairs with the memory barrier in set_current_state() to ensure the - * proper ordering of clear_bit()/waitqueue_active() in the waker and - * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the - * waiter. See the comment on waitqueue_active(). This is __after_atomic - * because we just did clear_bit_unlock() in the caller. - */ - smp_mb__after_atomic(); - ws = sbq_wake_ptr(sbq); if (!ws) - return; + return false; wait_cnt = atomic_dec_return(&ws->wait_cnt); if (wait_cnt <= 0) { + int ret; + wake_batch = READ_ONCE(sbq->wake_batch); + /* * Pairs with the memory barrier in sbitmap_queue_resize() to * ensure that we see the batch size update before the wait * count is reset. */ smp_mb__before_atomic(); + /* - * If there are concurrent callers to sbq_wake_up(), the last - * one to decrement the wait count below zero will bump it back - * up. If there is a concurrent resize, the count reset will - * either cause the cmpxchg to fail or overwrite after the - * cmpxchg. + * For concurrent callers of this, the one that failed the + * atomic_cmpxhcg() race should call this function again + * to wakeup a new batch on a different 'ws'. */ - atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch); - sbq_index_atomic_inc(&sbq->wake_index); - wake_up_nr(&ws->wait, wake_batch); + ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); + if (ret == wait_cnt) { + sbq_index_atomic_inc(&sbq->wake_index); + wake_up_nr(&ws->wait, wake_batch); + return false; + } + + return true; } + + return false; +} + +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) +{ + while (__sbq_wake_up(sbq)) + ; } +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, unsigned int cpu) { sbitmap_clear_bit_unlock(&sbq->sb, nr); - sbq_wake_up(sbq); + /* + * Pairs with the memory barrier in set_current_state() to ensure the + * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker + * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the + * waiter. See the comment on waitqueue_active(). + */ + smp_mb__after_atomic(); + sbitmap_queue_wake_up(sbq); + if (likely(!sbq->round_robin && nr < sbq->sb.depth)) *per_cpu_ptr(sbq->alloc_hint, cpu) = nr; } @@ -482,7 +530,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq) /* * Pairs with the memory barrier in set_current_state() like in - * sbq_wake_up(). + * sbitmap_queue_wake_up(). */ smp_mb(); wake_index = atomic_read(&sbq->wake_index); @@ -528,5 +576,6 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m) seq_puts(m, "}\n"); seq_printf(m, "round_robin=%d\n", sbq->round_robin); + seq_printf(m, "min_shallow_depth=%u\n", sbq->min_shallow_depth); } EXPORT_SYMBOL_GPL(sbitmap_queue_show); |