summaryrefslogtreecommitdiff
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 16:00:03 -0800
commitcb1b728096f54e7408d60fb571944bed00c5b771 (patch)
tree1b1e50e705f4ddd3b36a43ac9067538db3e8233f /net/tipc/bcast.c
parent3c724acdd5049907555a831f814bfd5927c3350c (diff)
tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during unicast message reception. Here, we resolve the same problem at multicast reception. We apply the same technique: an input queue serializing the delivery of arriving buffers. The main difference is that here we do it in two steps. First, the broadcast link feeds arriving buffers into the tail of an arrival queue, which head is consumed at the socket level, and where destination lookup is performed. Second, if the lookup is successful, the resulting buffer clones are fed into a second queue, the input queue. This queue is consumed at reception in the socket just like in the unicast case. Both queues are protected by the same lock, -the one of the input queue. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c57
1 files changed, 35 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
tipc_link_reset_all(node);
}
+void tipc_bclink_input(struct net *net)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+
+ tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
+}
+
uint tipc_bclink_get_mtu(void)
{
return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
-/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets
* @net: the applicable net namespace
* @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
int rc = 0;
int bc = 0;
struct sk_buff *skb;
+ struct sk_buff_head arrvq;
+ struct sk_buff_head inputq;
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
return -EHOSTUNREACH;
}
- /* Broadcast to all other nodes */
+ /* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
if (unlikely(!bc))
__skb_queue_purge(list);
- /* Deliver message clone */
- if (likely(!rc))
- tipc_sk_mcast_rcv(net, skb);
- else
+ if (unlikely(rc)) {
kfree_skb(skb);
-
+ return rc;
+ }
+ /* Deliver message clone */
+ __skb_queue_head_init(&arrvq);
+ skb_queue_head_init(&inputq);
+ __skb_queue_tail(&arrvq, skb);
+ tipc_sk_mcast_rcv(net, &arrvq, &inputq);
return rc;
}
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
- struct sk_buff_head msgs;
+ struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
+ arrvq = &tn->bclink->arrvq;
+ inputq = &tn->bclink->inputq;
if (likely(seqno == next_in)) {
receive:
@@ -493,21 +507,26 @@ receive:
if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, buf);
+ spin_unlock_bh(&inputq->lock);
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- if (likely(msg_mcast(msg)))
- tipc_sk_mcast_rcv(net, buf);
- else
- kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
+ pos = 0;
+ while (tipc_msg_extract(buf, &iskb, &pos)) {
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, iskb);
+ spin_unlock_bh(&inputq->lock);
+ }
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- while (tipc_msg_extract(buf, &iskb, &pos))
- tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
}
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
- tipc_bclink_lock(net);
- bclink_accept_pkt(node, seqno);
- tipc_bclink_unlock(net);
- tipc_node_unlock(node);
- skb_queue_head_init(&msgs);
- skb_queue_tail(&msgs, buf);
- tipc_named_rcv(net, &msgs);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
+ __skb_queue_head_init(&bclink->arrvq);
+ skb_queue_head_init(&bclink->inputq);
bcl->owner = &bclink->node;
bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;