summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael S. Tsirkin <mst@redhat.com>2018-01-26 01:36:27 +0200
committerDavid S. Miller <davem@davemloft.net>2018-01-29 12:02:53 -0500
commit406de7555424d93849166684d0bd172743d2a30c (patch)
tree8f440597d97aade025b56f2127d0a39bb60fd124
parent7ece54a60ee2ba7a386308cae73c790bd580589c (diff)
ptr_ring: keep consumer_head valid at all times
The comment near __ptr_ring_peek says: * If ring is never resized, and if the pointer is merely * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. but this was in fact never possible since consumer_head would sometimes point outside the ring. Refactor the code so that it's always pointing within a ring. Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array") Signed-off-by: Michael S. Tsirkin <mst@redhat.com> Acked-by: John Fastabend <john.fastabend@gmail.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/linux/ptr_ring.h25
1 files changed, 16 insertions, 9 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 9ca1726ff963..5ebcdd40df99 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
/* Fundamentally, what we want to do is update consumer
* index and zero out the entry so producer can reuse it.
* Doing it naively at each consume would be as simple as:
- * r->queue[r->consumer++] = NULL;
- * if (unlikely(r->consumer >= r->size))
- * r->consumer = 0;
+ * consumer = r->consumer;
+ * r->queue[consumer++] = NULL;
+ * if (unlikely(consumer >= r->size))
+ * consumer = 0;
+ * r->consumer = consumer;
* but that is suboptimal when the ring is full as producer is writing
* out new entries in the same cache line. Defer these updates until a
* batch of entries has been consumed.
*/
- int head = r->consumer_head++;
+ /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+ * to work correctly.
+ */
+ int consumer_head = r->consumer_head;
+ int head = consumer_head++;
/* Once we have processed enough entries invalidate them in
* the ring all at once so producer can reuse their space in the ring.
* We also do this when we reach end of the ring - not mandatory
* but helps keep the implementation simple.
*/
- if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
- r->consumer_head >= r->size)) {
+ if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+ consumer_head >= r->size)) {
/* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines
@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
*/
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
- r->consumer_tail = r->consumer_head;
+ r->consumer_tail = consumer_head;
}
- if (unlikely(r->consumer_head >= r->size)) {
- r->consumer_head = 0;
+ if (unlikely(consumer_head >= r->size)) {
+ consumer_head = 0;
r->consumer_tail = 0;
}
+ r->consumer_head = consumer_head;
}
static inline void *__ptr_ring_consume(struct ptr_ring *r)