diff options
Diffstat (limited to 'net/rds/ib_recv.c')
-rw-r--r-- | net/rds/ib_recv.c | 549 |
1 files changed, 332 insertions, 217 deletions
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index c74e9904a6b..e29e0ca32f7 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -43,42 +43,6 @@ static struct kmem_cache *rds_ib_incoming_slab; static struct kmem_cache *rds_ib_frag_slab; static atomic_t rds_ib_allocation = ATOMIC_INIT(0); -static void rds_ib_frag_drop_page(struct rds_page_frag *frag) -{ - rdsdebug("frag %p page %p\n", frag, frag->f_page); - __free_page(frag->f_page); - frag->f_page = NULL; -} - -static void rds_ib_frag_free(struct rds_page_frag *frag) -{ - rdsdebug("frag %p page %p\n", frag, frag->f_page); - BUG_ON(frag->f_page != NULL); - kmem_cache_free(rds_ib_frag_slab, frag); -} - -/* - * We map a page at a time. Its fragments are posted in order. This - * is called in fragment order as the fragments get send completion events. - * Only the last frag in the page performs the unmapping. - * - * It's OK for ring cleanup to call this in whatever order it likes because - * DMA is not in flight and so we can unmap while other ring entries still - * hold page references in their frags. - */ -static void rds_ib_recv_unmap_page(struct rds_ib_connection *ic, - struct rds_ib_recv_work *recv) -{ - struct rds_page_frag *frag = recv->r_frag; - - rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page); - if (frag->f_mapped) - ib_dma_unmap_page(ic->i_cm_id->device, - frag->f_mapped, - RDS_FRAG_SIZE, DMA_FROM_DEVICE); - frag->f_mapped = 0; -} - void rds_ib_recv_init_ring(struct rds_ib_connection *ic) { struct rds_ib_recv_work *recv; @@ -95,16 +59,161 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic) recv->r_wr.sg_list = recv->r_sge; recv->r_wr.num_sge = RDS_IB_RECV_SGE; - sge = rds_ib_data_sge(ic, recv->r_sge); + sge = &recv->r_sge[0]; + sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header)); + sge->length = sizeof(struct rds_header); + sge->lkey = ic->i_mr->lkey; + + sge = &recv->r_sge[1]; sge->addr = 0; sge->length = RDS_FRAG_SIZE; sge->lkey = ic->i_mr->lkey; + } +} - sge = rds_ib_header_sge(ic, recv->r_sge); - sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header)); - sge->length = sizeof(struct rds_header); - sge->lkey = ic->i_mr->lkey; +/* + * The entire 'from' list, including the from element itself, is put on + * to the tail of the 'to' list. + */ +static void list_splice_entire_tail(struct list_head *from, + struct list_head *to) +{ + struct list_head *from_last = from->prev; + + list_splice_tail(from_last, to); + list_add_tail(from_last, to); +} + +static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache) +{ + struct list_head *tmp; + + tmp = xchg(&cache->xfer, NULL); + if (tmp) { + if (cache->ready) + list_splice_entire_tail(tmp, cache->ready); + else + cache->ready = tmp; + } +} + +static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache) +{ + struct rds_ib_cache_head *head; + int cpu; + + cache->percpu = alloc_percpu(struct rds_ib_cache_head); + if (!cache->percpu) + return -ENOMEM; + + for_each_possible_cpu(cpu) { + head = per_cpu_ptr(cache->percpu, cpu); + head->first = NULL; + head->count = 0; + } + cache->xfer = NULL; + cache->ready = NULL; + + return 0; +} + +int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic) +{ + int ret; + + ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs); + if (!ret) { + ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags); + if (ret) + free_percpu(ic->i_cache_incs.percpu); } + + return ret; +} + +static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache, + struct list_head *caller_list) +{ + struct rds_ib_cache_head *head; + int cpu; + + for_each_possible_cpu(cpu) { + head = per_cpu_ptr(cache->percpu, cpu); + if (head->first) { + list_splice_entire_tail(head->first, caller_list); + head->first = NULL; + } + } + + if (cache->ready) { + list_splice_entire_tail(cache->ready, caller_list); + cache->ready = NULL; + } +} + +void rds_ib_recv_free_caches(struct rds_ib_connection *ic) +{ + struct rds_ib_incoming *inc; + struct rds_ib_incoming *inc_tmp; + struct rds_page_frag *frag; + struct rds_page_frag *frag_tmp; + LIST_HEAD(list); + + rds_ib_cache_xfer_to_ready(&ic->i_cache_incs); + rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list); + free_percpu(ic->i_cache_incs.percpu); + + list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) { + list_del(&inc->ii_cache_entry); + WARN_ON(!list_empty(&inc->ii_frags)); + kmem_cache_free(rds_ib_incoming_slab, inc); + } + + rds_ib_cache_xfer_to_ready(&ic->i_cache_frags); + rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list); + free_percpu(ic->i_cache_frags.percpu); + + list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) { + list_del(&frag->f_cache_entry); + WARN_ON(!list_empty(&frag->f_item)); + kmem_cache_free(rds_ib_frag_slab, frag); + } +} + +/* fwd decl */ +static void rds_ib_recv_cache_put(struct list_head *new_item, + struct rds_ib_refill_cache *cache); +static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache); + + +/* Recycle frag and attached recv buffer f_sg */ +static void rds_ib_frag_free(struct rds_ib_connection *ic, + struct rds_page_frag *frag) +{ + rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg)); + + rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags); +} + +/* Recycle inc after freeing attached frags */ +void rds_ib_inc_free(struct rds_incoming *inc) +{ + struct rds_ib_incoming *ibinc; + struct rds_page_frag *frag; + struct rds_page_frag *pos; + struct rds_ib_connection *ic = inc->i_conn->c_transport_data; + + ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); + + /* Free attached frags */ + list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) { + list_del_init(&frag->f_item); + rds_ib_frag_free(ic, frag); + } + BUG_ON(!list_empty(&ibinc->ii_frags)); + + rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc); + rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs); } static void rds_ib_recv_clear_one(struct rds_ib_connection *ic, @@ -115,10 +224,8 @@ static void rds_ib_recv_clear_one(struct rds_ib_connection *ic, recv->r_ibinc = NULL; } if (recv->r_frag) { - rds_ib_recv_unmap_page(ic, recv); - if (recv->r_frag->f_page) - rds_ib_frag_drop_page(recv->r_frag); - rds_ib_frag_free(recv->r_frag); + ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); + rds_ib_frag_free(ic, recv->r_frag); recv->r_frag = NULL; } } @@ -129,84 +236,111 @@ void rds_ib_recv_clear_ring(struct rds_ib_connection *ic) for (i = 0; i < ic->i_recv_ring.w_nr; i++) rds_ib_recv_clear_one(ic, &ic->i_recvs[i]); - - if (ic->i_frag.f_page) - rds_ib_frag_drop_page(&ic->i_frag); } -static int rds_ib_recv_refill_one(struct rds_connection *conn, - struct rds_ib_recv_work *recv, - gfp_t kptr_gfp, gfp_t page_gfp) +static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic, + gfp_t slab_mask) { - struct rds_ib_connection *ic = conn->c_transport_data; - dma_addr_t dma_addr; - struct ib_sge *sge; - int ret = -ENOMEM; + struct rds_ib_incoming *ibinc; + struct list_head *cache_item; + int avail_allocs; - if (recv->r_ibinc == NULL) { - if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) { + cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs); + if (cache_item) { + ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry); + } else { + avail_allocs = atomic_add_unless(&rds_ib_allocation, + 1, rds_ib_sysctl_max_recv_allocation); + if (!avail_allocs) { rds_ib_stats_inc(s_ib_rx_alloc_limit); - goto out; + return NULL; } - recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, - kptr_gfp); - if (recv->r_ibinc == NULL) { + ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask); + if (!ibinc) { atomic_dec(&rds_ib_allocation); - goto out; + return NULL; } - INIT_LIST_HEAD(&recv->r_ibinc->ii_frags); - rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr); } + INIT_LIST_HEAD(&ibinc->ii_frags); + rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr); - if (recv->r_frag == NULL) { - recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, kptr_gfp); - if (recv->r_frag == NULL) - goto out; - INIT_LIST_HEAD(&recv->r_frag->f_item); - recv->r_frag->f_page = NULL; + return ibinc; +} + +static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic, + gfp_t slab_mask, gfp_t page_mask) +{ + struct rds_page_frag *frag; + struct list_head *cache_item; + int ret; + + cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags); + if (cache_item) { + frag = container_of(cache_item, struct rds_page_frag, f_cache_entry); + } else { + frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask); + if (!frag) + return NULL; + + sg_init_table(&frag->f_sg, 1); + ret = rds_page_remainder_alloc(&frag->f_sg, + RDS_FRAG_SIZE, page_mask); + if (ret) { + kmem_cache_free(rds_ib_frag_slab, frag); + return NULL; + } } - if (ic->i_frag.f_page == NULL) { - ic->i_frag.f_page = alloc_page(page_gfp); - if (ic->i_frag.f_page == NULL) - goto out; - ic->i_frag.f_offset = 0; + INIT_LIST_HEAD(&frag->f_item); + + return frag; +} + +static int rds_ib_recv_refill_one(struct rds_connection *conn, + struct rds_ib_recv_work *recv, int prefill) +{ + struct rds_ib_connection *ic = conn->c_transport_data; + struct ib_sge *sge; + int ret = -ENOMEM; + gfp_t slab_mask = GFP_NOWAIT; + gfp_t page_mask = GFP_NOWAIT; + + if (prefill) { + slab_mask = GFP_KERNEL; + page_mask = GFP_HIGHUSER; } - dma_addr = ib_dma_map_page(ic->i_cm_id->device, - ic->i_frag.f_page, - ic->i_frag.f_offset, - RDS_FRAG_SIZE, - DMA_FROM_DEVICE); - if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr)) - goto out; + if (!ic->i_cache_incs.ready) + rds_ib_cache_xfer_to_ready(&ic->i_cache_incs); + if (!ic->i_cache_frags.ready) + rds_ib_cache_xfer_to_ready(&ic->i_cache_frags); /* - * Once we get the RDS_PAGE_LAST_OFF frag then rds_ib_frag_unmap() - * must be called on this recv. This happens as completions hit - * in order or on connection shutdown. + * ibinc was taken from recv if recv contained the start of a message. + * recvs that were continuations will still have this allocated. */ - recv->r_frag->f_page = ic->i_frag.f_page; - recv->r_frag->f_offset = ic->i_frag.f_offset; - recv->r_frag->f_mapped = dma_addr; + if (!recv->r_ibinc) { + recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask); + if (!recv->r_ibinc) + goto out; + } - sge = rds_ib_data_sge(ic, recv->r_sge); - sge->addr = dma_addr; - sge->length = RDS_FRAG_SIZE; + WARN_ON(recv->r_frag); /* leak! */ + recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask); + if (!recv->r_frag) + goto out; + + ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, + 1, DMA_FROM_DEVICE); + WARN_ON(ret != 1); - sge = rds_ib_header_sge(ic, recv->r_sge); + sge = &recv->r_sge[0]; sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header); sge->length = sizeof(struct rds_header); - get_page(recv->r_frag->f_page); - - if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) { - ic->i_frag.f_offset += RDS_FRAG_SIZE; - } else { - put_page(ic->i_frag.f_page); - ic->i_frag.f_page = NULL; - ic->i_frag.f_offset = 0; - } + sge = &recv->r_sge[1]; + sge->addr = sg_dma_address(&recv->r_frag->f_sg); + sge->length = sg_dma_len(&recv->r_frag->f_sg); ret = 0; out: @@ -216,13 +350,11 @@ out: /* * This tries to allocate and post unused work requests after making sure that * they have all the allocations they need to queue received fragments into - * sockets. The i_recv_mutex is held here so that ring_alloc and _unalloc - * pairs don't go unmatched. + * sockets. * * -1 is returned if posting fails due to temporary resource exhaustion. */ -int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp, - gfp_t page_gfp, int prefill) +void rds_ib_recv_refill(struct rds_connection *conn, int prefill) { struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_recv_work *recv; @@ -236,28 +368,25 @@ int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp, if (pos >= ic->i_recv_ring.w_nr) { printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n", pos); - ret = -EINVAL; break; } recv = &ic->i_recvs[pos]; - ret = rds_ib_recv_refill_one(conn, recv, kptr_gfp, page_gfp); + ret = rds_ib_recv_refill_one(conn, recv, prefill); if (ret) { - ret = -1; break; } /* XXX when can this fail? */ ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr); rdsdebug("recv %p ibinc %p page %p addr %lu ret %d\n", recv, - recv->r_ibinc, recv->r_frag->f_page, - (long) recv->r_frag->f_mapped, ret); + recv->r_ibinc, sg_page(&recv->r_frag->f_sg), + (long) sg_dma_address(&recv->r_frag->f_sg), ret); if (ret) { rds_ib_conn_error(conn, "recv post on " "%pI4 returned %d, disconnecting and " "reconnecting\n", &conn->c_faddr, ret); - ret = -1; break; } @@ -270,37 +399,73 @@ int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp, if (ret) rds_ib_ring_unalloc(&ic->i_recv_ring, 1); - return ret; } -void rds_ib_inc_purge(struct rds_incoming *inc) +/* + * We want to recycle several types of recv allocations, like incs and frags. + * To use this, the *_free() function passes in the ptr to a list_head within + * the recyclee, as well as the cache to put it on. + * + * First, we put the memory on a percpu list. When this reaches a certain size, + * We move it to an intermediate non-percpu list in a lockless manner, with some + * xchg/compxchg wizardry. + * + * N.B. Instead of a list_head as the anchor, we use a single pointer, which can + * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and + * list_empty() will return true with one element is actually present. + */ +static void rds_ib_recv_cache_put(struct list_head *new_item, + struct rds_ib_refill_cache *cache) { - struct rds_ib_incoming *ibinc; - struct rds_page_frag *frag; - struct rds_page_frag *pos; + unsigned long flags; + struct rds_ib_cache_head *chp; + struct list_head *old; - ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); - rdsdebug("purging ibinc %p inc %p\n", ibinc, inc); + local_irq_save(flags); - list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) { - list_del_init(&frag->f_item); - rds_ib_frag_drop_page(frag); - rds_ib_frag_free(frag); - } + chp = per_cpu_ptr(cache->percpu, smp_processor_id()); + if (!chp->first) + INIT_LIST_HEAD(new_item); + else /* put on front */ + list_add_tail(new_item, chp->first); + chp->first = new_item; + chp->count++; + + if (chp->count < RDS_IB_RECYCLE_BATCH_COUNT) + goto end; + + /* + * Return our per-cpu first list to the cache's xfer by atomically + * grabbing the current xfer list, appending it to our per-cpu list, + * and then atomically returning that entire list back to the + * cache's xfer list as long as it's still empty. + */ + do { + old = xchg(&cache->xfer, NULL); + if (old) + list_splice_entire_tail(old, chp->first); + old = cmpxchg(&cache->xfer, NULL, chp->first); + } while (old); + + chp->first = NULL; + chp->count = 0; +end: + local_irq_restore(flags); } -void rds_ib_inc_free(struct rds_incoming *inc) +static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache) { - struct rds_ib_incoming *ibinc; - - ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); + struct list_head *head = cache->ready; + + if (head) { + if (!list_empty(head)) { + cache->ready = head->next; + list_del_init(head); + } else + cache->ready = NULL; + } - rds_ib_inc_purge(inc); - rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc); - BUG_ON(!list_empty(&ibinc->ii_frags)); - kmem_cache_free(rds_ib_incoming_slab, ibinc); - atomic_dec(&rds_ib_allocation); - BUG_ON(atomic_read(&rds_ib_allocation) < 0); + return head; } int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov, @@ -336,13 +501,13 @@ int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov, to_copy = min_t(unsigned long, to_copy, len - copied); rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag " - "[%p, %lu] + %lu\n", + "[%p, %u] + %lu\n", to_copy, iov->iov_base, iov->iov_len, iov_off, - frag->f_page, frag->f_offset, frag_off); + sg_page(&frag->f_sg), frag->f_sg.offset, frag_off); /* XXX needs + offset for multiple recvs per page */ - ret = rds_page_copy_to_user(frag->f_page, - frag->f_offset + frag_off, + ret = rds_page_copy_to_user(sg_page(&frag->f_sg), + frag->f_sg.offset + frag_off, iov->iov_base + iov_off, to_copy); if (ret) { @@ -557,47 +722,6 @@ u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic) return rds_ib_get_ack(ic); } -static struct rds_header *rds_ib_get_header(struct rds_connection *conn, - struct rds_ib_recv_work *recv, - u32 data_len) -{ - struct rds_ib_connection *ic = conn->c_transport_data; - void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs]; - void *addr; - u32 misplaced_hdr_bytes; - - /* - * Support header at the front (RDS 3.1+) as well as header-at-end. - * - * Cases: - * 1) header all in header buff (great!) - * 2) header all in data page (copy all to header buff) - * 3) header split across hdr buf + data page - * (move bit in hdr buff to end before copying other bit from data page) - */ - if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDS_FRAG_SIZE) - return hdr_buff; - - if (data_len <= (RDS_FRAG_SIZE - sizeof(struct rds_header))) { - addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0); - memcpy(hdr_buff, - addr + recv->r_frag->f_offset + data_len, - sizeof(struct rds_header)); - kunmap_atomic(addr, KM_SOFTIRQ0); - return hdr_buff; - } - - misplaced_hdr_bytes = (sizeof(struct rds_header) - (RDS_FRAG_SIZE - data_len)); - - memmove(hdr_buff + misplaced_hdr_bytes, hdr_buff, misplaced_hdr_bytes); - - addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0); - memcpy(hdr_buff, addr + recv->r_frag->f_offset + data_len, - sizeof(struct rds_header) - misplaced_hdr_bytes); - kunmap_atomic(addr, KM_SOFTIRQ0); - return hdr_buff; -} - /* * It's kind of lame that we're copying from the posted receive pages into * long-lived bitmaps. We could have posted the bitmaps and rdma written into @@ -639,7 +763,7 @@ static void rds_ib_cong_recv(struct rds_connection *conn, to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off); BUG_ON(to_copy & 7); /* Must be 64bit aligned. */ - addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0); + addr = kmap_atomic(sg_page(&frag->f_sg), KM_SOFTIRQ0); src = addr + frag_off; dst = (void *)map->m_page_addrs[map_page] + map_off; @@ -710,7 +834,7 @@ static void rds_ib_process_recv(struct rds_connection *conn, } data_len -= sizeof(struct rds_header); - ihdr = rds_ib_get_header(conn, recv, data_len); + ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs]; /* Validate the checksum. */ if (!rds_message_verify_checksum(ihdr)) { @@ -742,12 +866,12 @@ static void rds_ib_process_recv(struct rds_connection *conn, * the inc is freed. We don't go that route, so we have to drop the * page ref ourselves. We can't just leave the page on the recv * because that confuses the dma mapping of pages and each recv's use - * of a partial page. We can leave the frag, though, it will be - * reused. + * of a partial page. * * FIXME: Fold this into the code path below. */ - rds_ib_frag_drop_page(recv->r_frag); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; return; } @@ -757,7 +881,7 @@ static void rds_ib_process_recv(struct rds_connection *conn, * into the inc and save the inc so we can hang upcoming fragments * off its list. */ - if (ibinc == NULL) { + if (!ibinc) { ibinc = recv->r_ibinc; recv->r_ibinc = NULL; ic->i_ibinc = ibinc; @@ -842,32 +966,38 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic, struct rds_ib_recv_work *recv; while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, wc.byte_len, + rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", + (unsigned long long)wc.wr_id, wc.status, + rds_ib_wc_status_str(wc.status), wc.byte_len, be32_to_cpu(wc.ex.imm_data)); rds_ib_stats_inc(s_ib_rx_cq_event); recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; - rds_ib_recv_unmap_page(ic, recv); + ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); /* * Also process recvs in connecting state because it is possible * to get a recv completion _before_ the rdmacm ESTABLISHED * event is processed. */ - if (rds_conn_up(conn) || rds_conn_connecting(conn)) { + if (wc.status == IB_WC_SUCCESS) { + rds_ib_process_recv(conn, recv, wc.byte_len, state); + } else { /* We expect errors as the qp is drained during shutdown */ - if (wc.status == IB_WC_SUCCESS) { - rds_ib_process_recv(conn, recv, wc.byte_len, state); - } else { - rds_ib_conn_error(conn, "recv completion on " - "%pI4 had status %u, disconnecting and " - "reconnecting\n", &conn->c_faddr, - wc.status); - } + if (rds_conn_up(conn) || rds_conn_connecting(conn)) + rds_ib_conn_error(conn, "recv completion on %pI4 had " + "status %u (%s), disconnecting and " + "reconnecting\n", &conn->c_faddr, + wc.status, + rds_ib_wc_status_str(wc.status)); } + /* + * It's very important that we only free this ring entry if we've truly + * freed the resources allocated to the entry. The refilling path can + * leak if we don't. + */ rds_ib_ring_free(&ic->i_recv_ring, 1); } } @@ -897,11 +1027,8 @@ void rds_ib_recv_tasklet_fn(unsigned long data) if (rds_ib_ring_empty(&ic->i_recv_ring)) rds_ib_stats_inc(s_ib_rx_ring_empty); - /* - * If the ring is running low, then schedule the thread to refill. - */ if (rds_ib_ring_low(&ic->i_recv_ring)) - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + rds_ib_recv_refill(conn, 0); } int rds_ib_recv(struct rds_connection *conn) @@ -910,25 +1037,13 @@ int rds_ib_recv(struct rds_connection *conn) int ret = 0; rdsdebug("conn %p\n", conn); - - /* - * If we get a temporary posting failure in this context then - * we're really low and we want the caller to back off for a bit. - */ - mutex_lock(&ic->i_recv_mutex); - if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0)) - ret = -ENOMEM; - else - rds_ib_stats_inc(s_ib_rx_refill_from_thread); - mutex_unlock(&ic->i_recv_mutex); - if (rds_conn_up(conn)) rds_ib_attempt_ack(ic); return ret; } -int __init rds_ib_recv_init(void) +int rds_ib_recv_init(void) { struct sysinfo si; int ret = -ENOMEM; @@ -939,14 +1054,14 @@ int __init rds_ib_recv_init(void) rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming", sizeof(struct rds_ib_incoming), - 0, 0, NULL); - if (rds_ib_incoming_slab == NULL) + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!rds_ib_incoming_slab) goto out; rds_ib_frag_slab = kmem_cache_create("rds_ib_frag", sizeof(struct rds_page_frag), - 0, 0, NULL); - if (rds_ib_frag_slab == NULL) + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!rds_ib_frag_slab) kmem_cache_destroy(rds_ib_incoming_slab); else ret = 0; |