summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c119
1 files changed, 53 insertions, 66 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 96ead526b125..693966d3f33b 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -78,8 +78,7 @@ static const char transfertypes[][12] = {
* elements. Segments are then coalesced when registered, if possible
* within the selected memreg mode.
*
- * Note, this routine is never called if the connection's memory
- * registration strategy is 0 (bounce buffers).
+ * Returns positive number of segments converted, or a negative errno.
*/
static int
@@ -102,10 +101,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
page_base = xdrbuf->page_base & ~PAGE_MASK;
p = 0;
while (len && n < nsegs) {
+ if (!ppages[p]) {
+ /* alloc the pagelist for receiving buffer */
+ ppages[p] = alloc_page(GFP_ATOMIC);
+ if (!ppages[p])
+ return -ENOMEM;
+ }
seg[n].mr_page = ppages[p];
seg[n].mr_offset = (void *)(unsigned long) page_base;
seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
- BUG_ON(seg[n].mr_len > PAGE_SIZE);
+ if (seg[n].mr_len > PAGE_SIZE)
+ return -EIO;
len -= seg[n].mr_len;
++n;
++p;
@@ -114,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
/* Message overflows the seg array */
if (len && n == nsegs)
- return 0;
+ return -EIO;
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
@@ -123,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
return n;
if (n == nsegs)
/* Tail remains, but we're out of segments */
- return 0;
+ return -EIO;
seg[n].mr_page = NULL;
seg[n].mr_offset = xdrbuf->tail[0].iov_base;
seg[n].mr_len = xdrbuf->tail[0].iov_len;
@@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* Reply chunk (a counted array):
* N elements:
* 1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
*/
-static unsigned int
+static ssize_t
rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
{
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- int nsegs, nchunks = 0;
+ int n, nsegs, nchunks = 0;
unsigned int pos;
struct rpcrdma_mr_seg *seg = req->rl_segments;
struct rpcrdma_read_chunk *cur_rchunk = NULL;
@@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
pos = target->head[0].iov_len;
nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
- if (nsegs == 0)
- return 0;
+ if (nsegs < 0)
+ return nsegs;
do {
- /* bind/register the memory, then build chunk from result. */
- int n = rpcrdma_register_external(seg, nsegs,
+ n = rpcrdma_register_external(seg, nsegs,
cur_wchunk != NULL, r_xprt);
if (n <= 0)
goto out;
@@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
/* success. all failures return above */
req->rl_nchunks = nchunks;
- BUG_ON(nchunks == 0);
- BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
- && (nchunks > 3));
-
/*
* finish off header. If write, marshal discrim and nchunks.
*/
@@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
out:
for (pos = 0; nchunks--;)
pos += rpcrdma_deregister_external(
- &req->rl_segments[pos], r_xprt, NULL);
- return 0;
+ &req->rl_segments[pos], r_xprt);
+ return n;
}
/*
@@ -361,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
* [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
* [2] -- optional padding.
* [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
*/
int
@@ -370,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
char *base;
- size_t hdrlen, rpclen, padlen;
+ size_t rpclen, padlen;
+ ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
@@ -441,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
/* The following simplification is not true forever */
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
wtype = rpcrdma_noch;
- BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
-
- if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
- (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
- /* forced to "pure inline"? */
- dprintk("RPC: %s: too much data (%d/%d) for inline\n",
- __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
- return -1;
+ if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+ dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
+ __func__);
+ return -EIO;
}
hdrlen = 28; /*sizeof *headerp;*/
@@ -474,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
- BUG_ON(wtype != rpcrdma_noch);
-
+ if (wtype != rpcrdma_noch) {
+ dprintk("RPC: %s: invalid chunk list\n",
+ __func__);
+ return -EIO;
+ }
} else {
headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
@@ -492,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* on receive. Therefore, we request a reply chunk
* for non-writes wherever feasible and efficient.
*/
- if (wtype == rpcrdma_noch &&
- r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
+ if (wtype == rpcrdma_noch)
wtype = rpcrdma_replych;
}
}
@@ -511,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
hdrlen = rpcrdma_create_chunks(rqst,
&rqst->rq_rcv_buf, headerp, wtype);
}
-
- if (hdrlen == 0)
- return -1;
+ if (hdrlen < 0)
+ return hdrlen;
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
@@ -680,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
rqst->rq_private_buf = rqst->rq_rcv_buf;
}
-/*
- * This function is called when an async event is posted to
- * the connection which changes the connection state. All it
- * does at this point is mark the connection up/down, the rpc
- * timers do the rest.
- */
void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
+rpcrdma_connect_worker(struct work_struct *work)
{
+ struct rpcrdma_ep *ep =
+ container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
struct rpc_xprt *xprt = ep->rep_xprt;
spin_lock_bh(&xprt->transport_lock);
@@ -705,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
}
/*
- * This function is called when memory window unbind which we are waiting
- * for completes. Just use rr_func (zeroed by upcall) to signal completion.
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
*/
-static void
-rpcrdma_unbind_func(struct rpcrdma_rep *rep)
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
{
- wake_up(&rep->rr_unbind);
+ schedule_delayed_work(&ep->rep_connect_worker, 0);
}
/*
@@ -728,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpc_xprt *xprt = rep->rr_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
__be32 *iptr;
- int i, rdmalen, status;
+ int rdmalen, status;
+ unsigned long cwnd;
/* Check status. If bad, signal disconnect and return rep to pool */
if (rep->rr_len == ~0U) {
@@ -783,6 +785,7 @@ repost:
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
+ xprt->reestablish_timeout = 0;
/* check for expected message types */
/* The order of some of these tests is important. */
@@ -857,26 +860,10 @@ badheader:
break;
}
- /* If using mw bind, start the deregister process now. */
- /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
- if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
- case RPCRDMA_MEMWINDOWS:
- for (i = 0; req->rl_nchunks-- > 1;)
- i += rpcrdma_deregister_external(
- &req->rl_segments[i], r_xprt, NULL);
- /* Optionally wait (not here) for unbinds to complete */
- rep->rr_func = rpcrdma_unbind_func;
- (void) rpcrdma_deregister_external(&req->rl_segments[i],
- r_xprt, rep);
- break;
- case RPCRDMA_MEMWINDOWS_ASYNC:
- for (i = 0; req->rl_nchunks--;)
- i += rpcrdma_deregister_external(&req->rl_segments[i],
- r_xprt, NULL);
- break;
- default:
- break;
- }
+ cwnd = xprt->cwnd;
+ xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+ if (xprt->cwnd > cwnd)
+ xprt_release_rqst_cong(rqst->rq_task);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);