diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/cache.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 143 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 1 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 108 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 204 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 14 |
7 files changed, 253 insertions, 247 deletions
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 2b32fd602669..273bc3a35425 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -1225,7 +1225,7 @@ int qword_get(char **bpp, char *dest, int bufsize) if (bp[0] == '\\' && bp[1] == 'x') { /* HEX STRING */ bp += 2; - while (len < bufsize) { + while (len < bufsize - 1) { int h, l; h = hex_to_bin(bp[0]); diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index c14f3a4bff68..b289e106540b 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (!r) goto out; - r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * - sizeof(u64), GFP_KERNEL); - if (!r->r.fmr.physaddrs) + r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * + sizeof(u64), GFP_KERNEL); + if (!r->fmr.physaddrs) goto out_free; - r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->r.fmr.fmr)) + r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); + if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; list_add(&r->mw_list, &buf->rb_mws); @@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) return 0; out_fmr_err: - rc = PTR_ERR(r->r.fmr.fmr); + rc = PTR_ERR(r->fmr.fmr); dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); out_free: kfree(r); out: @@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r) { LIST_HEAD(l); - list_add(&r->r.fmr.fmr->list, &l); + list_add(&r->fmr.fmr->list, &l); return ib_unmap_fmr(&l); } @@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { rpcrdma_map_one(device, seg, direction); - mw->r.fmr.physaddrs[i] = seg->mr_dma; + mw->fmr.physaddrs[i] = seg->mr_dma; len += seg->mr_len; ++seg; ++i; @@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, break; } - rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs, + rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, i, seg1->mr_dma); if (rc) goto out_maperr; seg1->rl_mw = mw; - seg1->mr_rkey = mw->r.fmr.fmr->rkey; + seg1->mr_rkey = mw->fmr.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; mw = seg->rl_mw; - list_add(&mw->r.fmr.fmr->list, &unmap_list); + list_add(&mw->fmr.fmr->list, &unmap_list); i += seg->mr_nsegs; } @@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); - rc = ib_dealloc_fmr(r->r.fmr.fmr); + rc = ib_dealloc_fmr(r->fmr.fmr); if (rc) dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", __func__, rc); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index e16567389e28..c250924a9fd3 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -109,20 +109,20 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - r.frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt; + frmr.fr_work); + struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - if (ib_dereg_mr(r->r.frmr.fr_mr)) + if (ib_dereg_mr(r->frmr.fr_mr)) goto out_fail; - r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->r.frmr.fr_mr)) + r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(r->frmr.fr_mr)) goto out_fail; dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->r.frmr.fr_state = FRMR_IS_INVALID; + r->frmr.fr_state = FRMR_IS_INVALID; rpcrdma_put_mw(r_xprt, r); return; @@ -137,15 +137,15 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->r.frmr.fr_work); + INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->frmr.fr_work); } static int __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, unsigned int depth) { - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *f = &r->frmr; int rc; f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); @@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, sg_init_table(f->sg, depth); + init_completion(&f->fr_linv_done); + return 0; out_mr_err: @@ -179,11 +181,11 @@ __frwr_release(struct rpcrdma_mw *r) { int rc; - rc = ib_dereg_mr(r->r.frmr.fr_mr); + rc = ib_dereg_mr(r->frmr.fr_mr); if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->r.frmr.sg); + kfree(r->frmr.sg); } static int @@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); } -/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs - * to be reset. +static void +__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr, + const char *wr) +{ + frmr->fr_state = FRMR_IS_STALE; + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: %s: %s (%u/0x%x)\n", + wr, ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); +} + +/** + * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * WARNING: Only wr_id and status are reliable at this point */ static void -__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r) +frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { - if (likely(wc->status == IB_WC_SUCCESS)) - return; - - /* WARNING: Only wr_id and status are reliable at this point */ - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - if (wc->status == IB_WC_WR_FLUSH_ERR) - dprintk("RPC: %s: frmr %p flushed\n", __func__, r); - else - pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - r->r.frmr.fr_state = FRMR_IS_STALE; + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "fastreg"); + } } +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -frwr_sendcompletion(struct ib_wc *wc) +frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - if (unlikely(wc->status != IB_WC_SUCCESS)) - __frwr_sendcompletion_flush(wc, r); + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + } +} - if (f->fr_waiter) - complete(&f->fr_linv_done); +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + * Awaken anyone waiting for an MR to finish being fenced. + */ +static void +frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; + + /* WARNING: Only wr_cqe and status are reliable at this point */ + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + if (wc->status != IB_WC_SUCCESS) + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + complete_all(&frmr->fr_linv_done); } static int @@ -313,8 +352,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->mw_sendcompletion = frwr_sendcompletion; - r->r.frmr.fr_xprt = r_xprt; + r->frmr.fr_xprt = r_xprt; } return 0; @@ -347,10 +385,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw = rpcrdma_get_mw(r_xprt); if (!mw) return -ENOMEM; - } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); - frmr = &mw->r.frmr; + } while (mw->frmr.fr_state != FRMR_IS_INVALID); + frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; - frmr->fr_waiter = false; mr = frmr->fr_mr; reg_wr = &frmr->fr_regwr; @@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, reg_wr->wr.next = NULL; reg_wr->wr.opcode = IB_WR_REG_MR; - reg_wr->wr.wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_fastreg; + reg_wr->wr.wr_cqe = &frmr->fr_cqe; reg_wr->wr.num_sge = 0; reg_wr->wr.send_flags = 0; reg_wr->mr = mr; @@ -434,15 +472,15 @@ static struct ib_send_wr * __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) { struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; - f->fr_waiter = false; f->fr_state = FRMR_IS_INVALID; invalidate_wr = &f->fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (unsigned long)(void *)mw; + f->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &f->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey; @@ -455,7 +493,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, { struct ib_device *device = r_xprt->rx_ia.ri_device; struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; seg->rl_mw = NULL; @@ -504,15 +542,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) i += seg->mr_nsegs; } - f = &seg->rl_mw->r.frmr; + f = &seg->rl_mw->frmr; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ f->fr_invwr.send_flags = IB_SEND_SIGNALED; - f->fr_waiter = true; - init_completion(&f->fr_linv_done); + f->fr_cqe.done = frwr_wc_localinv_wake; + reinit_completion(&f->fr_linv_done); INIT_CQCOUNT(&r_xprt->rx_ep); /* Transport disconnect drains the receive CQ before it @@ -520,14 +558,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) + if (rc) { pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + rdma_disconnect(ia->ri_id); + goto unmap; + } wait_for_completion(&f->fr_linv_done); /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ +unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; @@ -549,7 +591,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->r.frmr; + struct rpcrdma_frmr *frmr = &mw->frmr; struct ib_send_wr *invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; @@ -557,10 +599,11 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) seg1->rl_mw = NULL; frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->r.frmr.fr_invwr; + invalidate_wr = &mw->frmr.fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &frmr->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index dbb302ecf590..481b9b6f4a15 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); seg->mr_rkey = ia->ri_dma_mr->rkey; seg->mr_base = seg->mr_dma; - seg->mr_nsegs = 1; return 1; } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 0f28f2d743ed..888823bb6dae 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) return tlen; } +/* Split "vec" on page boundaries into segments. FMR registers pages, + * not a byte range. Other modes coalesce these segments into a single + * MR when they can. + */ +static int +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, + int n, int nsegs) +{ + size_t page_offset; + u32 remaining; + char *base; + + base = vec->iov_base; + page_offset = offset_in_page(base); + remaining = vec->iov_len; + while (remaining && n < nsegs) { + seg[n].mr_page = NULL; + seg[n].mr_offset = base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); + remaining -= seg[n].mr_len; + base += seg[n].mr_len; + ++n; + page_offset = 0; + } + return n; +} + /* * Chunk assembly from upper layer xdr_buf. * @@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, int page_base; struct page **ppages; - if (pos == 0 && xdrbuf->head[0].iov_len) { - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->head[0].iov_base; - seg[n].mr_len = xdrbuf->head[0].iov_len; - ++n; + if (pos == 0) { + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); + if (n == nsegs) + return -EIO; } len = xdrbuf->page_len; @@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); if (n == nsegs) - /* Tail remains, but we're out of segments */ return -EIO; - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->tail[0].iov_base; - seg[n].mr_len = xdrbuf->tail[0].iov_len; - ++n; } return n; @@ -773,20 +795,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; - int rdmalen, status; + int rdmalen, status, rmerr; unsigned long cwnd; - u32 credits; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); if (rep->rr_len == RPCRDMA_BAD_LEN) goto out_badstatus; - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) goto out_shortreply; headerp = rdmab_to_msg(rep->rr_rdmabuf); - if (headerp->rm_vers != rpcrdma_version) - goto out_badversion; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (rpcrdma_is_bcall(headerp)) goto out_bcall; @@ -809,15 +828,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) */ list_del_init(&rqst->rq_list); spin_unlock_bh(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" - " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, - be32_to_cpu(headerp->rm_xid)); + dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; xprt->reestablish_timeout = 0; + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; + /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { @@ -878,6 +898,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) status = rdmalen; break; + case rdma_error: + goto out_rdmaerr; + badheader: default: dprintk("%s: invalid rpcrdma reply header (type %d):" @@ -893,6 +916,7 @@ badheader: break; } +out: /* Invalidate and flush the data payloads before waking the * waiting application. This guarantees the memory region is * properly fenced from the server before the application @@ -903,15 +927,9 @@ badheader: if (req->rl_nchunks) r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); - credits = be32_to_cpu(headerp->rm_credit); - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > r_xprt->rx_buf.rb_max_requests) - credits = r_xprt->rx_buf.rb_max_requests; - spin_lock_bh(&xprt->transport_lock); cwnd = xprt->cwnd; - xprt->cwnd = credits << RPC_CWNDSHIFT; + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); @@ -935,13 +953,43 @@ out_bcall: return; #endif -out_shortreply: - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - +/* If the incoming reply terminated a pending RPC, the next + * RPC call will post a replacement receive buffer as it is + * being marshaled. + */ out_badversion: dprintk("RPC: %s: invalid version %d\n", __func__, be32_to_cpu(headerp->rm_vers)); + status = -EIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +out_rdmaerr: + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); + switch (rmerr) { + case ERR_VERS: + pr_err("%s: server reports header version error (%u-%u)\n", + __func__, + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low), + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high)); + break; + case ERR_CHUNK: + pr_err("%s: server reports header decoding error\n", + __func__); + break; + default: + pr_err("%s: server reports unknown error %d\n", + __func__, rmerr); + } + status = -EREMOTEIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +/* If no pending RPC transaction was matched, post a replacement + * receive buffer before returning. + */ +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; out_nomatch: diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 878f1bfb1db9..f5ed9f982cd7 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -112,89 +112,65 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) } } +/** + * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) +rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_ep *ep = context; - - pr_err("RPC: %s: %s on device %s ep %p\n", - __func__, ib_event_msg(event->event), - event->device->name, context); - if (ep->rep_connected == 1) { - ep->rep_connected = -EIO; - rpcrdma_conn_func(ep); - wake_up_all(&ep->rep_connect_wait); - } + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); } static void -rpcrdma_sendcq_process_wc(struct ib_wc *wc) +rpcrdma_receive_worker(struct work_struct *work) { - /* WARNING: Only wr_id and status are reliable at this point */ - if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) { - if (wc->status != IB_WC_SUCCESS && - wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: SEND: %s\n", - __func__, ib_wc_status_msg(wc->status)); - } else { - struct rpcrdma_mw *r; + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - r->mw_sendcompletion(wc); - } + rpcrdma_reply_handler(rep); } -/* The common case is a single send completion is waiting. By - * passing two WC entries to ib_poll_cq, a return code of 1 - * means there is exactly one WC waiting and no more. We don't - * have to invoke ib_poll_cq again to know that the CQ has been - * properly drained. +/* Perform basic sanity checking to avoid using garbage + * to update the credit grant value. */ static void -rpcrdma_sendcq_poll(struct ib_cq *cq) +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) { - struct ib_wc *pos, wcs[2]; - int count, rc; + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; + u32 credits; - do { - pos = wcs; + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) + return; - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; + credits = be32_to_cpu(rmsgp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > buffer->rb_max_requests) + credits = buffer->rb_max_requests; - count = rc; - while (count-- > 0) - rpcrdma_sendcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); - return; + atomic_set(&buffer->rb_credits, credits); } -/* Handle provider send completion upcalls. +/** + * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC + * @cq: completion queue (ignored) + * @wc: completed WR + * */ static void -rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) { - do { - rpcrdma_sendcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); -} - -static void -rpcrdma_receive_worker(struct work_struct *work) -{ - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); - - rpcrdma_reply_handler(rep); -} - -static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc) -{ - struct rpcrdma_rep *rep = - (struct rpcrdma_rep *)(unsigned long)wc->wr_id; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, + rr_cqe); /* WARNING: Only wr_id and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS) @@ -211,7 +187,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc) ib_dma_sync_single_for_cpu(rep->rr_device, rdmab_addr(rep->rr_rdmabuf), rep->rr_len, DMA_FROM_DEVICE); - prefetch(rdmab_to_msg(rep->rr_rdmabuf)); + + rpcrdma_update_granted_credits(rep); out_schedule: queue_work(rpcrdma_receive_wq, &rep->rr_work); @@ -219,57 +196,20 @@ out_schedule: out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: rep %p: %s\n", - __func__, rep, ib_wc_status_msg(wc->status)); + pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } -/* The wc array is on stack: automatic memory is always CPU-local. - * - * struct ib_wc is 64 bytes, making the poll array potentially - * large. But this is at the bottom of the call chain. Further - * substantial work is done in another thread. - */ -static void -rpcrdma_recvcq_poll(struct ib_cq *cq) -{ - struct ib_wc *pos, wcs[4]; - int count, rc; - - do { - pos = wcs; - - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; - - count = rc; - while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); -} - -/* Handle provider receive completion upcalls. - */ -static void -rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) -{ - do { - rpcrdma_recvcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); -} - static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc); - while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) - rpcrdma_sendcq_process_wc(&wc); + rpcrdma_receive_wc(NULL, &wc); } static int @@ -330,6 +270,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) connected: dprintk("RPC: %s: %sconnected\n", __func__, connstate > 0 ? "" : "dis"); + atomic_set(&xprt->rx_buf.rb_credits, 1); ep->rep_connected = connstate; rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); @@ -560,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { struct ib_cq *sendcq, *recvcq; - struct ib_cq_init_attr cq_attr = {}; unsigned int max_qp_wr; - int rc, err; + int rc; if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { dprintk("RPC: %s: insufficient sge's available\n", @@ -614,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); - cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; - sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + sendcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_send_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -624,16 +564,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out1; } - rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - goto out2; - } - - cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; - recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + recvcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_recv_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@ -641,14 +574,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out2; } - rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - ib_destroy_cq(recvcq); - goto out2; - } - ep->rep_attr.send_cq = sendcq; ep->rep_attr.recv_cq = recvcq; @@ -673,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return 0; out2: - err = ib_destroy_cq(sendcq); - if (err) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, err); + ib_free_cq(sendcq); out1: if (ia->ri_dma_mr) ib_dereg_mr(ia->ri_dma_mr); @@ -711,15 +633,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - rc = ib_destroy_cq(ep->rep_attr.recv_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); - - rc = ib_destroy_cq(ep->rep_attr.send_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); + ib_free_cq(ep->rep_attr.recv_cq); + ib_free_cq(ep->rep_attr.send_cq); if (ia->ri_dma_mr) { rc = ib_dereg_mr(ia->ri_dma_mr); @@ -898,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_lock(&buffer->rb_reqslock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_reqslock); + req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; return req; } @@ -923,6 +839,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) } rep->rr_device = ia->ri_device; + rep->rr_cqe.done = rpcrdma_receive_wc; rep->rr_rxprt = r_xprt; INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; @@ -943,6 +860,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); + atomic_set(&buf->rb_credits, 1); rc = ia->ri_ops->ro_init(r_xprt); if (rc) @@ -1259,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, } send_wr.next = NULL; - send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION; + send_wr.wr_cqe = &req->rl_cqe; send_wr.sg_list = iov; send_wr.num_sge = req->rl_niovs; send_wr.opcode = IB_WR_SEND; @@ -1297,7 +1215,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, int rc; recv_wr.next = NULL; - recv_wr.wr_id = (u64) (unsigned long) rep; + recv_wr.wr_cqe = &rep->rr_cqe; recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 38fe11b09875..2ebc743cb96f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -95,10 +95,6 @@ struct rpcrdma_ep { #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) -/* Force completion handler to ignore the signal - */ -#define RPCRDMA_IGNORE_COMPLETION (0ULL) - /* Pre-allocate extra Work Requests for handling backward receives * and sends. This is a fixed value because the Work Queues are * allocated when the forward channel is set up. @@ -171,6 +167,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) struct rpcrdma_buffer; struct rpcrdma_rep { + struct ib_cqe rr_cqe; unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; @@ -204,11 +201,11 @@ struct rpcrdma_frmr { struct scatterlist *sg; int sg_nents; struct ib_mr *fr_mr; + struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; + struct completion fr_linv_done; struct work_struct fr_work; struct rpcrdma_xprt *fr_xprt; - bool fr_waiter; - struct completion fr_linv_done;; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -224,8 +221,7 @@ struct rpcrdma_mw { union { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; - } r; - void (*mw_sendcompletion)(struct ib_wc *); + }; struct list_head mw_list; struct list_head mw_all; }; @@ -281,6 +277,7 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; }; @@ -311,6 +308,7 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + atomic_t rb_credits; /* most recent credit grant */ u32 rb_bc_srv_max_requests; spinlock_t rb_reqslock; /* protect rb_allreqs */ |