From 176e21ee2ec89cae8d45cf1a850ea45a45428fb8 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Mon, 9 May 2011 15:22:44 -0400 Subject: SUNRPC: Support for RPC over AF_LOCAL transports TI-RPC introduces the capability of performing RPC over AF_LOCAL sockets. It uses this mainly for registering and unregistering local RPC services securely with the local rpcbind, but we could also conceivably use it as a generic upcall mechanism. This patch provides a client-side only implementation for the moment. We might also consider a server-side implementation to provide AF_LOCAL access to NLM (for statd downcalls, and such like). Autobinding is not supported on kernel AF_LOCAL transports at this time. Kernel ULPs must specify the pathname of the remote endpoint when an AF_LOCAL transport is created. rpcbind supports registering services available via AF_LOCAL, so the kernel could handle it with some adjustment to ->rpcbind and ->set_port. But we don't need this feature for doing upcalls via well-known named sockets. This has not been tested with ULPs that move a substantial amount of data. Thus, I can't attest to how robust the write_space and congestion management logic is. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 8 + net/sunrpc/xprtsock.c | 395 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 400 insertions(+), 3 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 08ed49629b86..b84d7395535e 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -28,7 +28,9 @@ #include #include #include +#include #include +#include #include #include @@ -294,6 +296,8 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) * up a string representation of the passed-in address. */ if (args->servername == NULL) { + struct sockaddr_un *sun = + (struct sockaddr_un *)args->address; struct sockaddr_in *sin = (struct sockaddr_in *)args->address; struct sockaddr_in6 *sin6 = @@ -301,6 +305,10 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) servername[0] = '\0'; switch (args->address->sa_family) { + case AF_LOCAL: + snprintf(servername, sizeof(servername), "%s", + sun->sun_path); + break; case AF_INET: snprintf(servername, sizeof(servername), "%pI4", &sin->sin_addr.s_addr); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 079366974e1f..72abb7358933 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -19,6 +19,7 @@ */ #include +#include #include #include #include @@ -28,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +47,9 @@ #include #include "sunrpc.h" + +static void xs_close(struct rpc_xprt *xprt); + /* * xprtsock tunables */ @@ -261,6 +266,11 @@ static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) return (struct sockaddr *) &xprt->addr; } +static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt) +{ + return (struct sockaddr_un *) &xprt->addr; +} + static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) { return (struct sockaddr_in *) &xprt->addr; @@ -276,23 +286,34 @@ static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) struct sockaddr *sap = xs_addr(xprt); struct sockaddr_in6 *sin6; struct sockaddr_in *sin; + struct sockaddr_un *sun; char buf[128]; - (void)rpc_ntop(sap, buf, sizeof(buf)); - xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); - switch (sap->sa_family) { + case AF_LOCAL: + sun = xs_addr_un(xprt); + strlcpy(buf, sun->sun_path, sizeof(buf)); + xprt->address_strings[RPC_DISPLAY_ADDR] = + kstrdup(buf, GFP_KERNEL); + break; case AF_INET: + (void)rpc_ntop(sap, buf, sizeof(buf)); + xprt->address_strings[RPC_DISPLAY_ADDR] = + kstrdup(buf, GFP_KERNEL); sin = xs_addr_in(xprt); snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); break; case AF_INET6: + (void)rpc_ntop(sap, buf, sizeof(buf)); + xprt->address_strings[RPC_DISPLAY_ADDR] = + kstrdup(buf, GFP_KERNEL); sin6 = xs_addr_in6(xprt); snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); break; default: BUG(); } + xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); } @@ -505,6 +526,60 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); } +/** + * xs_local_send_request - write an RPC request to an AF_LOCAL socket + * @task: RPC task that manages the state of an RPC request + * + * Return values: + * 0: The request has been sent + * EAGAIN: The socket was blocked, please call again later to + * complete the request + * ENOTCONN: Caller needs to invoke connect logic then call again + * other: Some other error occured, the request was not sent + */ +static int xs_local_send_request(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct xdr_buf *xdr = &req->rq_snd_buf; + int status; + + xs_encode_stream_record_marker(&req->rq_snd_buf); + + xs_pktdump("packet data:", + req->rq_svec->iov_base, req->rq_svec->iov_len); + + status = xs_sendpages(transport->sock, NULL, 0, + xdr, req->rq_bytes_sent); + dprintk("RPC: %s(%u) = %d\n", + __func__, xdr->len - req->rq_bytes_sent, status); + if (likely(status >= 0)) { + req->rq_bytes_sent += status; + req->rq_xmit_bytes_sent += status; + if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_bytes_sent = 0; + return 0; + } + status = -EAGAIN; + } + + switch (status) { + case -EAGAIN: + status = xs_nospace(task); + break; + default: + dprintk("RPC: sendmsg returned unrecognized error %d\n", + -status); + case -EPIPE: + xs_close(xprt); + status = -ENOTCONN; + } + + return status; +} + /** * xs_udp_send_request - write an RPC request to a UDP socket * @task: address of RPC task that manages the state of an RPC request @@ -788,6 +863,88 @@ static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) return (struct rpc_xprt *) sk->sk_user_data; } +static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) +{ + struct xdr_skb_reader desc = { + .skb = skb, + .offset = sizeof(rpc_fraghdr), + .count = skb->len - sizeof(rpc_fraghdr), + }; + + if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) + return -1; + if (desc.count) + return -1; + return 0; +} + +/** + * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets + * @sk: socket with data to read + * @len: how much data to read + * + * Currently this assumes we can read the whole reply in a single gulp. + */ +static void xs_local_data_ready(struct sock *sk, int len) +{ + struct rpc_task *task; + struct rpc_xprt *xprt; + struct rpc_rqst *rovr; + struct sk_buff *skb; + int err, repsize, copied; + u32 _xid; + __be32 *xp; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: %s...\n", __func__); + xprt = xprt_from_sock(sk); + if (xprt == NULL) + goto out; + + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + goto out; + + if (xprt->shutdown) + goto dropit; + + repsize = skb->len - sizeof(rpc_fraghdr); + if (repsize < 4) { + dprintk("RPC: impossible RPC reply size %d\n", repsize); + goto dropit; + } + + /* Copy the XID from the skb... */ + xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); + if (xp == NULL) + goto dropit; + + /* Look up and lock the request corresponding to the given XID */ + spin_lock(&xprt->transport_lock); + rovr = xprt_lookup_rqst(xprt, *xp); + if (!rovr) + goto out_unlock; + task = rovr->rq_task; + + copied = rovr->rq_private_buf.buflen; + if (copied > repsize) + copied = repsize; + + if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { + dprintk("RPC: sk_buff copy failed\n"); + goto out_unlock; + } + + xprt_complete_rqst(task, copied); + + out_unlock: + spin_unlock(&xprt->transport_lock); + dropit: + skb_free_datagram(sk, skb); + out: + read_unlock_bh(&sk->sk_callback_lock); +} + /** * xs_udp_data_ready - "data ready" callback for UDP sockets * @sk: socket with data to read @@ -1573,11 +1730,31 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) return err; } +/* + * We don't support autobind on AF_LOCAL sockets + */ +static void xs_local_rpcbind(struct rpc_task *task) +{ + xprt_set_bound(task->tk_xprt); +} + +static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) +{ +} #ifdef CONFIG_DEBUG_LOCK_ALLOC static struct lock_class_key xs_key[2]; static struct lock_class_key xs_slock_key[2]; +static inline void xs_reclassify_socketu(struct socket *sock) +{ + struct sock *sk = sock->sk; + + BUG_ON(sock_owned_by_user(sk)); + sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", + &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]); +} + static inline void xs_reclassify_socket4(struct socket *sock) { struct sock *sk = sock->sk; @@ -1599,6 +1776,9 @@ static inline void xs_reclassify_socket6(struct socket *sock) static inline void xs_reclassify_socket(int family, struct socket *sock) { switch (family) { + case AF_LOCAL: + xs_reclassify_socketu(sock); + break; case AF_INET: xs_reclassify_socket4(sock); break; @@ -1608,6 +1788,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } } #else +static inline void xs_reclassify_socketu(struct socket *sock) +{ +} + static inline void xs_reclassify_socket4(struct socket *sock) { } @@ -1646,6 +1830,94 @@ out: return ERR_PTR(err); } +static int xs_local_finish_connecting(struct rpc_xprt *xprt, + struct socket *sock) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, + xprt); + + if (!transport->inet) { + struct sock *sk = sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + + xs_save_old_callbacks(transport, sk); + + sk->sk_user_data = xprt; + sk->sk_data_ready = xs_local_data_ready; + sk->sk_write_space = xs_udp_write_space; + sk->sk_error_report = xs_error_report; + sk->sk_allocation = GFP_ATOMIC; + + xprt_clear_connected(xprt); + + /* Reset to new socket */ + transport->sock = sock; + transport->inet = sk; + + write_unlock_bh(&sk->sk_callback_lock); + } + + /* Tell the socket layer to start connecting... */ + xprt->stat.connect_count++; + xprt->stat.connect_start = jiffies; + return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); +} + +/** + * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint + * @xprt: RPC transport to connect + * @transport: socket transport to connect + * @create_sock: function to create a socket of the correct type + * + * Invoked by a work queue tasklet. + */ +static void xs_local_setup_socket(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, connect_worker.work); + struct rpc_xprt *xprt = &transport->xprt; + struct socket *sock; + int status = -EIO; + + if (xprt->shutdown) + goto out; + + clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); + status = __sock_create(xprt->xprt_net, AF_LOCAL, + SOCK_STREAM, 0, &sock, 1); + if (status < 0) { + dprintk("RPC: can't create AF_LOCAL " + "transport socket (%d).\n", -status); + goto out; + } + xs_reclassify_socketu(sock); + + dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", + xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); + + status = xs_local_finish_connecting(xprt, sock); + switch (status) { + case 0: + dprintk("RPC: xprt %p connected to %s\n", + xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); + xprt_set_connected(xprt); + break; + case -ENOENT: + dprintk("RPC: xprt %p: socket %s does not exist\n", + xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); + break; + default: + printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n", + __func__, -status, + xprt->address_strings[RPC_DISPLAY_ADDR]); + } + +out: + xprt_clear_connecting(xprt); + xprt_wake_pending_tasks(xprt, status); +} + static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -1929,6 +2201,32 @@ static void xs_connect(struct rpc_task *task) } } +/** + * xs_local_print_stats - display AF_LOCAL socket-specifc stats + * @xprt: rpc_xprt struct containing statistics + * @seq: output file + * + */ +static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) +{ + long idle_time = 0; + + if (xprt_connected(xprt)) + idle_time = (long)(jiffies - xprt->last_used) / HZ; + + seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu " + "%llu %llu\n", + xprt->stat.bind_count, + xprt->stat.connect_count, + xprt->stat.connect_time, + idle_time, + xprt->stat.sends, + xprt->stat.recvs, + xprt->stat.bad_xids, + xprt->stat.req_u, + xprt->stat.bklog_u); +} + /** * xs_udp_print_stats - display UDP socket-specifc stats * @xprt: rpc_xprt struct containing statistics @@ -2099,6 +2397,21 @@ static void bc_destroy(struct rpc_xprt *xprt) { } +static struct rpc_xprt_ops xs_local_ops = { + .reserve_xprt = xprt_reserve_xprt, + .release_xprt = xs_tcp_release_xprt, + .rpcbind = xs_local_rpcbind, + .set_port = xs_local_set_port, + .connect = xs_connect, + .buf_alloc = rpc_malloc, + .buf_free = rpc_free, + .send_request = xs_local_send_request, + .set_retrans_timeout = xprt_set_retrans_timeout_def, + .close = xs_close, + .destroy = xs_destroy, + .print_stats = xs_local_print_stats, +}; + static struct rpc_xprt_ops xs_udp_ops = { .set_buffer_size = xs_udp_set_buffer_size, .reserve_xprt = xprt_reserve_xprt_cong, @@ -2160,6 +2473,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap) }; switch (family) { + case AF_LOCAL: + break; case AF_INET: memcpy(sap, &sin, sizeof(sin)); break; @@ -2207,6 +2522,70 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, return xprt; } +static const struct rpc_timeout xs_local_default_timeout = { + .to_initval = 10 * HZ, + .to_maxval = 10 * HZ, + .to_retries = 2, +}; + +/** + * xs_setup_local - Set up transport to use an AF_LOCAL socket + * @args: rpc transport creation arguments + * + * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP + */ +static struct rpc_xprt *xs_setup_local(struct xprt_create *args) +{ + struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr; + struct sock_xprt *transport; + struct rpc_xprt *xprt; + struct rpc_xprt *ret; + + xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); + if (IS_ERR(xprt)) + return xprt; + transport = container_of(xprt, struct sock_xprt, xprt); + + xprt->prot = 0; + xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); + xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; + + xprt->bind_timeout = XS_BIND_TO; + xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; + xprt->idle_timeout = XS_IDLE_DISC_TO; + + xprt->ops = &xs_local_ops; + xprt->timeout = &xs_local_default_timeout; + + switch (sun->sun_family) { + case AF_LOCAL: + if (sun->sun_path[0] != '/') { + dprintk("RPC: bad AF_LOCAL address: %s\n", + sun->sun_path); + ret = ERR_PTR(-EINVAL); + goto out_err; + } + xprt_set_bound(xprt); + INIT_DELAYED_WORK(&transport->connect_worker, + xs_local_setup_socket); + xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); + break; + default: + ret = ERR_PTR(-EAFNOSUPPORT); + goto out_err; + } + + dprintk("RPC: set up xprt to %s via AF_LOCAL\n", + xprt->address_strings[RPC_DISPLAY_ADDR]); + + if (try_module_get(THIS_MODULE)) + return xprt; + ret = ERR_PTR(-EINVAL); +out_err: + xprt_free(xprt); + return ret; +} + static const struct rpc_timeout xs_udp_default_timeout = { .to_initval = 5 * HZ, .to_maxval = 30 * HZ, @@ -2448,6 +2827,14 @@ out_err: return ret; } +static struct xprt_class xs_local_transport = { + .list = LIST_HEAD_INIT(xs_local_transport.list), + .name = "named UNIX socket", + .owner = THIS_MODULE, + .ident = XPRT_TRANSPORT_LOCAL, + .setup = xs_setup_local, +}; + static struct xprt_class xs_udp_transport = { .list = LIST_HEAD_INIT(xs_udp_transport.list), .name = "udp", @@ -2483,6 +2870,7 @@ int init_socket_xprt(void) sunrpc_table_header = register_sysctl_table(sunrpc_table); #endif + xprt_register_transport(&xs_local_transport); xprt_register_transport(&xs_udp_transport); xprt_register_transport(&xs_tcp_transport); xprt_register_transport(&xs_bc_tcp_transport); @@ -2503,6 +2891,7 @@ void cleanup_socket_xprt(void) } #endif + xprt_unregister_transport(&xs_local_transport); xprt_unregister_transport(&xs_udp_transport); xprt_unregister_transport(&xs_tcp_transport); xprt_unregister_transport(&xs_bc_tcp_transport); -- cgit v1.2.3