From 66d7a56a6254389587d0999dcaab1d2634cd4e24 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 11:19:20 -0400 Subject: SUNRPC: Refactor TCP receive Move the TCP data receive loop out of xs_tcp_data_ready(). Doing so will allow us to move the data receive out of the softirq context in a set of followup patches. Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0e..fa8d0c1 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1391,6 +1391,30 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns return len - desc.count; } +static void xs_tcp_data_receive(struct sock_xprt *transport) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct sock *sk; + read_descriptor_t rd_desc = { + .count = 2*1024*1024, + .arg.data = xprt, + }; + unsigned long total = 0; + int read = 0; + + sk = transport->inet; + + /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ + for (;;) { + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + if (read <= 0) + break; + total += read; + rd_desc.count = 65536; + } + trace_xs_tcp_data_ready(xprt, read, total); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1398,34 +1422,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns */ static void xs_tcp_data_ready(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - read_descriptor_t rd_desc; - int read; - unsigned long total = 0; dprintk("RPC: xs_tcp_data_ready...\n"); read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) { - read = 0; + if (!(xprt = xprt_from_sock(sk))) goto out; - } + transport = container_of(xprt, struct sock_xprt, xprt); + /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - rd_desc.arg.data = xprt; - do { - rd_desc.count = 65536; - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read > 0) - total += read; - } while (read > 0); + xs_tcp_data_receive(transport); out: - trace_xs_tcp_data_ready(xprt, read, total); read_unlock_bh(&sk->sk_callback_lock); } -- cgit v0.10.2 From edc1b01cd3b20a5fff049e98f82a2b0d24a34c89 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 10:53:49 -0400 Subject: SUNRPC: Move TCP receive data path into a workqueue context Stream protocols such as TCP can often build up a backlog of data to be read due to ordering. Combine this with the fact that some workloads such as NFS read()-intensive workloads need to receive a lot of data per RPC call, and it turns out that receiving the data from inside a softirq context can cause starvation. The following patch moves the TCP data receive into a workqueue context. We still end up calling tcp_read_sock(), but we do so from a process context, meaning that softirqs are enabled for most of the time. With this patch, I see a doubling of read bandwidth when running a multi-threaded iozone workload between a virtual client and server setup. Signed-off-by: Trond Myklebust diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h index 357e44c..0ece4ba 100644 --- a/include/linux/sunrpc/xprtsock.h +++ b/include/linux/sunrpc/xprtsock.h @@ -44,6 +44,8 @@ struct sock_xprt { */ unsigned long sock_state; struct delayed_work connect_worker; + struct work_struct recv_worker; + struct mutex recv_mutex; struct sockaddr_storage srcaddr; unsigned short srcport; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fa8d0c1..58dc90c 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -823,6 +823,7 @@ static void xs_reset_transport(struct sock_xprt *transport) kernel_sock_shutdown(sock, SHUT_RDWR); + mutex_lock(&transport->recv_mutex); write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -833,6 +834,7 @@ static void xs_reset_transport(struct sock_xprt *transport) xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); sock_release(sock); @@ -886,6 +888,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); + cancel_work_sync(&transport->recv_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -1243,12 +1246,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_rqst(xprt, transport->tcp_xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return -1; } @@ -1257,7 +1260,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1277,10 +1280,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; @@ -1291,7 +1294,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_bc_request(req, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1402,19 +1405,33 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) unsigned long total = 0; int read = 0; + mutex_lock(&transport->recv_mutex); sk = transport->inet; + if (sk == NULL) + goto out; /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ for (;;) { + lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + release_sock(sk); if (read <= 0) break; total += read; rd_desc.count = 65536; } +out: + mutex_unlock(&transport->recv_mutex); trace_xs_tcp_data_ready(xprt, read, total); } +static void xs_tcp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_tcp_data_receive(transport); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1437,8 +1454,8 @@ static void xs_tcp_data_ready(struct sock *sk) */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; + queue_work(rpciod_workqueue, &transport->recv_worker); - xs_tcp_data_receive(transport); out: read_unlock_bh(&sk->sk_callback_lock); } @@ -1840,6 +1857,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif +static void xs_dummy_data_receive_workfn(struct work_struct *work) +{ +} + static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -2664,6 +2685,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, } new = container_of(xprt, struct sock_xprt, xprt); + mutex_init(&new->recv_mutex); memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) @@ -2717,6 +2739,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); @@ -2788,21 +2811,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); break; default: @@ -2867,21 +2889,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); break; default: -- cgit v0.10.2 From f9b2ee714c5c945cda27e9cbca5f60d5199fb93f Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 16:26:05 -0400 Subject: SUNRPC: Move UDP receive data path into a workqueue context Now that we've done it for TCP, let's convert UDP as well. Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 58dc90c..df8bdcc 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -972,42 +972,36 @@ static void xs_local_data_ready(struct sock *sk) } /** - * xs_udp_data_ready - "data ready" callback for UDP sockets - * @sk: socket with data to read + * xs_udp_data_read_skb - receive callback for UDP sockets + * @xprt: transport + * @sk: socket + * @skb: skbuff * */ -static void xs_udp_data_ready(struct sock *sk) +static void xs_udp_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) - goto out; - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; - repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(struct udphdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -1028,10 +1022,54 @@ static void xs_udp_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_udp_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_udp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_udp_data_receive(transport); +} + +/** + * xs_data_ready - "data ready" callback for UDP sockets + * @sk: socket with data to read + * + */ +static void xs_data_ready(struct sock *sk) +{ + struct rpc_xprt *xprt; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: xs_data_ready...\n"); + xprt = xprt_from_sock(sk); + if (xprt != NULL) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); + queue_work(rpciod_workqueue, &transport->recv_worker); + } read_unlock_bh(&sk->sk_callback_lock); } @@ -2094,7 +2132,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_udp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_allocation = GFP_NOIO; @@ -2811,7 +2849,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { -- cgit v0.10.2 From a26480942c99d84a7682ea4c00f47a3a42ed41d2 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 17:03:00 -0400 Subject: SUNRPC: Move AF_LOCAL receive data path into a workqueue context Now that we've done it for TCP and UDP, let's convert AF_LOCAL as well. Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index df8bdcc..1471ecc 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -909,44 +909,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) } /** - * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets - * @sk: socket with data to read + * xs_local_data_read_skb + * @xprt: transport + * @sk: socket + * @skb: skbuff * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk) +static void xs_local_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: %s...\n", __func__); - xprt = xprt_from_sock(sk); - if (xprt == NULL) - goto out; - - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) - goto out; - repsize = skb->len - sizeof(rpc_fraghdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -964,11 +956,35 @@ static void xs_local_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock_bh(&sk->sk_callback_lock); + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_local_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_local_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_local_data_receive(transport); } /** @@ -1895,10 +1911,6 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif -static void xs_dummy_data_receive_workfn(struct work_struct *work) -{ -} - static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -1946,7 +1958,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_local_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2777,7 +2789,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); -- cgit v0.10.2 From 31303d6cbb24ba94e8b82170213bd2fde6365d9a Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 15:59:20 -0400 Subject: SUNRPC: Use MSG_SENDPAGE_NOTLAST in xs_send_pagedata() If we're sending more than one page via kernel_sendpage(), then set MSG_SENDPAGE_NOTLAST between the pages so that we don't send suboptimal frames (see commit 2f5338442425 and commit 35f9c09fe9c7). Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1471ecc..e71aff2 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i int flags = XS_SENDMSG_FLAGS; remainder -= len; - if (remainder != 0 || more) + if (more) flags |= MSG_MORE; + if (remainder != 0) + flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; -- cgit v0.10.2 From b3c2aa07454cf7ab4ec3ee882c586abbea033132 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 20 Sep 2015 14:32:45 -0400 Subject: NFSv4: Refactor NFSv4 error handling Prepare for unification of the synchronous and asynchronous error handling. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h index 50cfc4c..4afdee4 100644 --- a/fs/nfs/nfs4_fs.h +++ b/fs/nfs/nfs4_fs.h @@ -183,10 +183,12 @@ struct nfs4_state { struct nfs4_exception { - long timeout; - int retry; struct nfs4_state *state; struct inode *inode; + long timeout; + unsigned char delay : 1, + recovering : 1, + retry : 1; }; struct nfs4_state_recovery_ops { diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 5133bb1..d5c2a46 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -344,13 +344,16 @@ static int nfs4_delay(struct rpc_clnt *clnt, long *timeout) /* This is the error handling routine for processes that are allowed * to sleep. */ -int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception) +static int nfs4_do_handle_exception(struct nfs_server *server, + int errorcode, struct nfs4_exception *exception) { struct nfs_client *clp = server->nfs_client; struct nfs4_state *state = exception->state; struct inode *inode = exception->inode; int ret = errorcode; + exception->delay = 0; + exception->recovering = 0; exception->retry = 0; switch(errorcode) { case 0: @@ -411,9 +414,9 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_ } case -NFS4ERR_GRACE: case -NFS4ERR_DELAY: - ret = nfs4_delay(server->client, &exception->timeout); - if (ret != 0) - break; + exception->delay = 1; + return 0; + case -NFS4ERR_RETRY_UNCACHED_REP: case -NFS4ERR_OLD_STATEID: exception->retry = 1; @@ -434,9 +437,31 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_ /* We failed to handle the error */ return nfs4_map_errors(ret); wait_on_recovery: - ret = nfs4_wait_clnt_recover(clp); - if (test_bit(NFS_MIG_FAILED, &server->mig_status)) - return -EIO; + exception->recovering = 1; + return 0; +} + +/* This is the error handling routine for processes that are allowed + * to sleep. + */ +int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception) +{ + struct nfs_client *clp = server->nfs_client; + int ret; + + ret = nfs4_do_handle_exception(server, errorcode, exception); + if (exception->delay) { + ret = nfs4_delay(server->client, &exception->timeout); + goto out_retry; + } + if (exception->recovering) { + ret = nfs4_wait_clnt_recover(clp); + if (test_bit(NFS_MIG_FAILED, &server->mig_status)) + return -EIO; + goto out_retry; + } + return ret; +out_retry: if (ret == 0) exception->retry = 1; return ret; -- cgit v0.10.2 From 2598ed344586cf3612bfbd33041527c55543acfe Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 20 Sep 2015 16:10:18 -0400 Subject: NFSv4: Update the delay statistics counter for synchronous delays Currently, we only do so for asynchronous delays. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index d5c2a46..d044c7b 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -412,8 +412,9 @@ static int nfs4_do_handle_exception(struct nfs_server *server, ret = -EBUSY; break; } - case -NFS4ERR_GRACE: case -NFS4ERR_DELAY: + nfs_inc_server_stats(server, NFSIOS_DELAY); + case -NFS4ERR_GRACE: exception->delay = 1; return 0; -- cgit v0.10.2 From 516285ebe0efadc40b914a0e61a913a390604810 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 20 Sep 2015 16:15:24 -0400 Subject: NFSv4: nfs4_async_handle_error should take a non-const nfs_server For symmetry with the synchronous handler, and so that we can potentially handle errors such as NFS4ERR_BADNAME. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index d044c7b..ae5cde6 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -78,7 +78,7 @@ struct nfs4_opendata; static int _nfs4_proc_open(struct nfs4_opendata *data); static int _nfs4_recover_proc_open(struct nfs4_opendata *data); static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *); -static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *, long *); +static int nfs4_async_handle_error(struct rpc_task *, struct nfs_server *, struct nfs4_state *, long *); static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr); static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label); static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label); @@ -4982,7 +4982,7 @@ out: static int -nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server, +nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server, struct nfs4_state *state, long *timeout) { struct nfs_client *clp = server->nfs_client; @@ -5559,7 +5559,7 @@ struct nfs4_unlockdata { struct nfs4_lock_state *lsp; struct nfs_open_context *ctx; struct file_lock fl; - const struct nfs_server *server; + struct nfs_server *server; unsigned long timestamp; }; diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h index 52faf7e..53f2acc 100644 --- a/include/linux/nfs_xdr.h +++ b/include/linux/nfs_xdr.h @@ -528,7 +528,7 @@ struct nfs4_delegreturnargs { struct nfs4_delegreturnres { struct nfs4_sequence_res seq_res; struct nfs_fattr * fattr; - const struct nfs_server *server; + struct nfs_server *server; }; /* @@ -601,7 +601,7 @@ struct nfs_removeargs { struct nfs_removeres { struct nfs4_sequence_res seq_res; - const struct nfs_server *server; + struct nfs_server *server; struct nfs_fattr *dir_attr; struct nfs4_change_info cinfo; }; @@ -619,7 +619,7 @@ struct nfs_renameargs { struct nfs_renameres { struct nfs4_sequence_res seq_res; - const struct nfs_server *server; + struct nfs_server *server; struct nfs4_change_info old_cinfo; struct nfs_fattr *old_fattr; struct nfs4_change_info new_cinfo; -- cgit v0.10.2 From 4816fdadab9ff874ec1a4138dad43b636c7d220b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 20 Sep 2015 14:58:42 -0400 Subject: NFSv4: Don't use synchronous delegation recall in exception handling The code needs to be able to work from inside an asynchronous context. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c index be806ea..5166adc 100644 --- a/fs/nfs/delegation.c +++ b/fs/nfs/delegation.c @@ -721,14 +721,12 @@ int nfs_async_inode_return_delegation(struct inode *inode, struct nfs_client *clp = server->nfs_client; struct nfs_delegation *delegation; - filemap_flush(inode->i_mapping); - rcu_read_lock(); delegation = rcu_dereference(NFS_I(inode)->delegation); if (delegation == NULL) goto out_enoent; - - if (!clp->cl_mvops->match_stateid(&delegation->stateid, stateid)) + if (stateid != NULL && + !clp->cl_mvops->match_stateid(&delegation->stateid, stateid)) goto out_enoent; nfs_mark_return_delegation(server, delegation); rcu_read_unlock(); diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index ae5cde6..2a155ec 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -362,11 +362,9 @@ static int nfs4_do_handle_exception(struct nfs_server *server, case -NFS4ERR_DELEG_REVOKED: case -NFS4ERR_ADMIN_REVOKED: case -NFS4ERR_BAD_STATEID: - if (inode && nfs4_have_delegation(inode, FMODE_READ)) { - nfs4_inode_return_delegation(inode); - exception->retry = 1; - return 0; - } + if (inode && nfs_async_inode_return_delegation(inode, + NULL) == 0) + goto wait_on_recovery; if (state == NULL) break; ret = nfs4_schedule_stateid_recovery(server, state); -- cgit v0.10.2 From 037fc9808a777f6fb6a54c6510b9656716e0c8c8 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sun, 20 Sep 2015 15:51:00 -0400 Subject: NFSv4: Unify synchronous and asynchronous error handling They now only differ in the way we handle waiting, so let's unify. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 2a155ec..e32b7e9 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -78,7 +78,6 @@ struct nfs4_opendata; static int _nfs4_proc_open(struct nfs4_opendata *data); static int _nfs4_recover_proc_open(struct nfs4_opendata *data); static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *); -static int nfs4_async_handle_error(struct rpc_task *, struct nfs_server *, struct nfs4_state *, long *); static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr); static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label); static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label); @@ -466,6 +465,55 @@ out_retry: return ret; } +static int +nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server, + int errorcode, struct nfs4_exception *exception) +{ + struct nfs_client *clp = server->nfs_client; + int ret; + + ret = nfs4_do_handle_exception(server, errorcode, exception); + if (exception->delay) { + rpc_delay(task, nfs4_update_delay(&exception->timeout)); + goto out_retry; + } + if (exception->recovering) { + rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL); + if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0) + rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task); + goto out_retry; + } + if (test_bit(NFS_MIG_FAILED, &server->mig_status)) + ret = -EIO; + return ret; +out_retry: + if (ret == 0) + exception->retry = 1; + return ret; +} + +static int +nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server, + struct nfs4_state *state, long *timeout) +{ + struct nfs4_exception exception = { + .state = state, + }; + + if (task->tk_status >= 0) + return 0; + if (timeout) + exception.timeout = *timeout; + task->tk_status = nfs4_async_handle_exception(task, server, + task->tk_status, + &exception); + if (exception.delay && timeout) + *timeout = exception.timeout; + if (exception.retry) + return -EAGAIN; + return 0; +} + /* * Return 'true' if 'clp' is using an rpc_client that is integrity protected * or 'false' otherwise. @@ -4979,79 +5027,6 @@ out: #endif /* CONFIG_NFS_V4_SECURITY_LABEL */ -static int -nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server, - struct nfs4_state *state, long *timeout) -{ - struct nfs_client *clp = server->nfs_client; - - if (task->tk_status >= 0) - return 0; - switch(task->tk_status) { - case -NFS4ERR_DELEG_REVOKED: - case -NFS4ERR_ADMIN_REVOKED: - case -NFS4ERR_BAD_STATEID: - case -NFS4ERR_OPENMODE: - if (state == NULL) - break; - if (nfs4_schedule_stateid_recovery(server, state) < 0) - goto recovery_failed; - goto wait_on_recovery; - case -NFS4ERR_EXPIRED: - if (state != NULL) { - if (nfs4_schedule_stateid_recovery(server, state) < 0) - goto recovery_failed; - } - case -NFS4ERR_STALE_STATEID: - case -NFS4ERR_STALE_CLIENTID: - nfs4_schedule_lease_recovery(clp); - goto wait_on_recovery; - case -NFS4ERR_MOVED: - if (nfs4_schedule_migration_recovery(server) < 0) - goto recovery_failed; - goto wait_on_recovery; - case -NFS4ERR_LEASE_MOVED: - nfs4_schedule_lease_moved_recovery(clp); - goto wait_on_recovery; -#if defined(CONFIG_NFS_V4_1) - case -NFS4ERR_BADSESSION: - case -NFS4ERR_BADSLOT: - case -NFS4ERR_BAD_HIGH_SLOT: - case -NFS4ERR_DEADSESSION: - case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION: - case -NFS4ERR_SEQ_FALSE_RETRY: - case -NFS4ERR_SEQ_MISORDERED: - dprintk("%s ERROR %d, Reset session\n", __func__, - task->tk_status); - nfs4_schedule_session_recovery(clp->cl_session, task->tk_status); - goto wait_on_recovery; -#endif /* CONFIG_NFS_V4_1 */ - case -NFS4ERR_DELAY: - nfs_inc_server_stats(server, NFSIOS_DELAY); - rpc_delay(task, nfs4_update_delay(timeout)); - goto restart_call; - case -NFS4ERR_GRACE: - rpc_delay(task, NFS4_POLL_RETRY_MAX); - case -NFS4ERR_RETRY_UNCACHED_REP: - case -NFS4ERR_OLD_STATEID: - goto restart_call; - } - task->tk_status = nfs4_map_errors(task->tk_status); - return 0; -recovery_failed: - task->tk_status = -EIO; - return 0; -wait_on_recovery: - rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL); - if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0) - rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task); - if (test_bit(NFS_MIG_FAILED, &server->mig_status)) - goto recovery_failed; -restart_call: - task->tk_status = 0; - return -EAGAIN; -} - static void nfs4_init_boot_verifier(const struct nfs_client *clp, nfs4_verifier *bootverf) { -- cgit v0.10.2 From 36022770de6cf9a403c40a68712ed2d2ea2746be Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:34 +0800 Subject: nfs42: add CLONE xdr functions xdr definitions per draft-ietf-nfsv4-minorversion2-38.txt Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c index 0eb29e1..0ca482a 100644 --- a/fs/nfs/nfs42xdr.c +++ b/fs/nfs/nfs42xdr.c @@ -34,6 +34,12 @@ 1 /* opaque devaddr4 length */ + \ XDR_QUADLEN(PNFS_LAYOUTSTATS_MAXSIZE)) #define decode_layoutstats_maxsz (op_decode_hdr_maxsz) +#define encode_clone_maxsz (encode_stateid_maxsz + \ + encode_stateid_maxsz + \ + 2 /* src offset */ + \ + 2 /* dst offset */ + \ + 2 /* count */) +#define decode_clone_maxsz (op_decode_hdr_maxsz) #define NFS4_enc_allocate_sz (compound_encode_hdr_maxsz + \ encode_putfh_maxsz + \ @@ -65,7 +71,20 @@ decode_sequence_maxsz + \ decode_putfh_maxsz + \ PNFS_LAYOUTSTATS_MAXDEV * decode_layoutstats_maxsz) - +#define NFS4_enc_clone_sz (compound_encode_hdr_maxsz + \ + encode_sequence_maxsz + \ + encode_putfh_maxsz + \ + encode_savefh_maxsz + \ + encode_putfh_maxsz + \ + encode_clone_maxsz + \ + encode_getattr_maxsz) +#define NFS4_dec_clone_sz (compound_decode_hdr_maxsz + \ + decode_sequence_maxsz + \ + decode_putfh_maxsz + \ + decode_savefh_maxsz + \ + decode_putfh_maxsz + \ + decode_clone_maxsz + \ + decode_getattr_maxsz) static void encode_fallocate(struct xdr_stream *xdr, struct nfs42_falloc_args *args) @@ -128,6 +147,21 @@ static void encode_layoutstats(struct xdr_stream *xdr, encode_uint32(xdr, 0); } +static void encode_clone(struct xdr_stream *xdr, + struct nfs42_clone_args *args, + struct compound_hdr *hdr) +{ + __be32 *p; + + encode_op_hdr(xdr, OP_CLONE, decode_clone_maxsz, hdr); + encode_nfs4_stateid(xdr, &args->src_stateid); + encode_nfs4_stateid(xdr, &args->dst_stateid); + p = reserve_space(xdr, 3*8); + p = xdr_encode_hyper(p, args->src_offset); + p = xdr_encode_hyper(p, args->dst_offset); + xdr_encode_hyper(p, args->count); +} + /* * Encode ALLOCATE request */ @@ -206,6 +240,27 @@ static void nfs4_xdr_enc_layoutstats(struct rpc_rqst *req, encode_nops(&hdr); } +/* + * Encode CLONE request + */ +static void nfs4_xdr_enc_clone(struct rpc_rqst *req, + struct xdr_stream *xdr, + struct nfs42_clone_args *args) +{ + struct compound_hdr hdr = { + .minorversion = nfs4_xdr_minorversion(&args->seq_args), + }; + + encode_compound_hdr(xdr, req, &hdr); + encode_sequence(xdr, &args->seq_args, &hdr); + encode_putfh(xdr, args->src_fh, &hdr); + encode_savefh(xdr, &hdr); + encode_putfh(xdr, args->dst_fh, &hdr); + encode_clone(xdr, args, &hdr); + encode_getfattr(xdr, args->dst_bitmask, &hdr); + encode_nops(&hdr); +} + static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res) { return decode_op_hdr(xdr, OP_ALLOCATE); @@ -243,6 +298,11 @@ static int decode_layoutstats(struct xdr_stream *xdr) return decode_op_hdr(xdr, OP_LAYOUTSTATS); } +static int decode_clone(struct xdr_stream *xdr) +{ + return decode_op_hdr(xdr, OP_CLONE); +} + /* * Decode ALLOCATE request */ @@ -351,4 +411,39 @@ out: return status; } +/* + * Decode CLONE request + */ +static int nfs4_xdr_dec_clone(struct rpc_rqst *rqstp, + struct xdr_stream *xdr, + struct nfs42_clone_res *res) +{ + struct compound_hdr hdr; + int status; + + status = decode_compound_hdr(xdr, &hdr); + if (status) + goto out; + status = decode_sequence(xdr, &res->seq_res, rqstp); + if (status) + goto out; + status = decode_putfh(xdr); + if (status) + goto out; + status = decode_savefh(xdr); + if (status) + goto out; + status = decode_putfh(xdr); + if (status) + goto out; + status = decode_clone(xdr); + if (status) + goto out; + status = decode_getfattr(xdr, res->dst_fattr, res->server); + +out: + res->rpc_status = status; + return status; +} + #endif /* __LINUX_FS_NFS_NFS4_2XDR_H */ diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c index 788adf3..868472b 100644 --- a/fs/nfs/nfs4xdr.c +++ b/fs/nfs/nfs4xdr.c @@ -7465,6 +7465,7 @@ struct rpc_procinfo nfs4_procedures[] = { PROC(ALLOCATE, enc_allocate, dec_allocate), PROC(DEALLOCATE, enc_deallocate, dec_deallocate), PROC(LAYOUTSTATS, enc_layoutstats, dec_layoutstats), + PROC(CLONE, enc_clone, dec_clone), #endif /* CONFIG_NFS_V4_2 */ }; diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h index 00121f2..c0c695b 100644 --- a/include/linux/nfs4.h +++ b/include/linux/nfs4.h @@ -130,6 +130,7 @@ enum nfs_opnum4 { OP_READ_PLUS = 68, OP_SEEK = 69, OP_WRITE_SAME = 70, + OP_CLONE = 71, OP_ILLEGAL = 10044, }; @@ -501,6 +502,7 @@ enum { NFSPROC4_CLNT_ALLOCATE, NFSPROC4_CLNT_DEALLOCATE, NFSPROC4_CLNT_LAYOUTSTATS, + NFSPROC4_CLNT_CLONE, }; /* nfs41 types */ diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h index 52faf7e..ac678b7 100644 --- a/include/linux/nfs_xdr.h +++ b/include/linux/nfs_xdr.h @@ -359,6 +359,25 @@ struct nfs42_layoutstat_data { struct nfs42_layoutstat_res res; }; +struct nfs42_clone_args { + struct nfs4_sequence_args seq_args; + struct nfs_fh *src_fh; + struct nfs_fh *dst_fh; + nfs4_stateid src_stateid; + nfs4_stateid dst_stateid; + __u64 src_offset; + __u64 dst_offset; + __u64 count; + const u32 *dst_bitmask; +}; + +struct nfs42_clone_res { + struct nfs4_sequence_res seq_res; + unsigned int rpc_status; + struct nfs_fattr *dst_fattr; + const struct nfs_server *server; +}; + struct stateowner_id { __u64 create_time; __u32 uniquifier; -- cgit v0.10.2 From e5341f3a5762d17be9cdd06257c02c0098bdcab8 Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:35 +0800 Subject: nfs42: add CLONE proc functions Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h index 814c125..b587ccd 100644 --- a/fs/nfs/nfs42.h +++ b/fs/nfs/nfs42.h @@ -17,5 +17,6 @@ int nfs42_proc_deallocate(struct file *, loff_t, loff_t); loff_t nfs42_proc_llseek(struct file *, loff_t, int); int nfs42_proc_layoutstats_generic(struct nfs_server *, struct nfs42_layoutstat_data *); +int nfs42_proc_clone(struct file *, struct file *, loff_t, loff_t, loff_t); #endif /* __LINUX_FS_NFS_NFS4_2_H */ diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c index 0f020e4..3e92a3c 100644 --- a/fs/nfs/nfs42proc.c +++ b/fs/nfs/nfs42proc.c @@ -271,3 +271,74 @@ int nfs42_proc_layoutstats_generic(struct nfs_server *server, return PTR_ERR(task); return 0; } + +static int _nfs42_proc_clone(struct rpc_message *msg, struct file *src_f, + struct file *dst_f, loff_t src_offset, + loff_t dst_offset, loff_t count) +{ + struct inode *src_inode = file_inode(src_f); + struct inode *dst_inode = file_inode(dst_f); + struct nfs_server *server = NFS_SERVER(dst_inode); + struct nfs42_clone_args args = { + .src_fh = NFS_FH(src_inode), + .dst_fh = NFS_FH(dst_inode), + .src_offset = src_offset, + .dst_offset = dst_offset, + .dst_bitmask = server->cache_consistency_bitmask, + }; + struct nfs42_clone_res res = { + .server = server, + }; + int status; + + msg->rpc_argp = &args; + msg->rpc_resp = &res; + + status = nfs42_set_rw_stateid(&args.src_stateid, src_f, FMODE_READ); + if (status) + return status; + + status = nfs42_set_rw_stateid(&args.dst_stateid, dst_f, FMODE_WRITE); + if (status) + return status; + + res.dst_fattr = nfs_alloc_fattr(); + if (!res.dst_fattr) + return -ENOMEM; + + status = nfs4_call_sync(server->client, server, msg, + &args.seq_args, &res.seq_res, 0); + if (status == 0) + status = nfs_post_op_update_inode(dst_inode, res.dst_fattr); + + kfree(res.dst_fattr); + return status; +} + +int nfs42_proc_clone(struct file *src_f, struct file *dst_f, + loff_t src_offset, loff_t dst_offset, loff_t count) +{ + struct rpc_message msg = { + .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLONE], + }; + struct inode *inode = file_inode(src_f); + struct nfs_server *server = NFS_SERVER(file_inode(src_f)); + struct nfs4_exception exception = { }; + int err; + + if (!nfs_server_capable(inode, NFS_CAP_CLONE)) + return -EOPNOTSUPP; + + do { + err = _nfs42_proc_clone(&msg, src_f, dst_f, src_offset, + dst_offset, count); + if (err == -ENOTSUPP || err == -EOPNOTSUPP) { + NFS_SERVER(inode)->caps &= ~NFS_CAP_CLONE; + return -EOPNOTSUPP; + } + err = nfs4_handle_exception(server, err, &exception); + } while (exception.retry); + + return err; + +} diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 5133bb1..9688b1a 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -8729,7 +8729,8 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = { | NFS_CAP_ALLOCATE | NFS_CAP_DEALLOCATE | NFS_CAP_SEEK - | NFS_CAP_LAYOUTSTATS, + | NFS_CAP_LAYOUTSTATS + | NFS_CAP_CLONE, .init_client = nfs41_init_client, .shutdown_client = nfs41_shutdown_client, .match_stateid = nfs41_match_stateid, diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h index 570a7df..a50de10 100644 --- a/include/linux/nfs_fs_sb.h +++ b/include/linux/nfs_fs_sb.h @@ -243,5 +243,6 @@ struct nfs_server { #define NFS_CAP_ALLOCATE (1U << 20) #define NFS_CAP_DEALLOCATE (1U << 21) #define NFS_CAP_LAYOUTSTATS (1U << 22) +#define NFS_CAP_CLONE (1U << 23) #endif -- cgit v0.10.2 From bea51b30b281039f0f43fb4f42028ddf33fb601f Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:36 +0800 Subject: nfs42: add NFS_IOC_CLONE ioctl It can be called by user space to CLONE two files. Follow btrfs lead and define NFS_IOC_CLONE same as BTRFS_IOC_CLONE. Thus we don't mess up userspace with too many ioctls. Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index b0dbe0a..fc68ba5 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -4,6 +4,7 @@ * Copyright (C) 1992 Rick Sladkey */ #include +#include #include #include #include "delegation.h" @@ -192,8 +193,104 @@ static long nfs42_fallocate(struct file *filep, int mode, loff_t offset, loff_t return nfs42_proc_deallocate(filep, offset, len); return nfs42_proc_allocate(filep, offset, len); } + +static noinline long +nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd, + u64 src_off, u64 dst_off, u64 count) +{ + struct inode *dst_inode = file_inode(dst_file); + struct fd src_file; + struct inode *src_inode; + int ret; + + /* dst file must be opened for writing */ + if (!(dst_file->f_mode & FMODE_WRITE)) + return -EINVAL; + + ret = mnt_want_write_file(dst_file); + if (ret) + return ret; + + src_file = fdget(srcfd); + if (!src_file.file) { + ret = -EBADF; + goto out_drop_write; + } + + src_inode = file_inode(src_file.file); + + /* src and dst must be different files */ + ret = -EINVAL; + if (src_inode == dst_inode) + goto out_fput; + + /* src file must be opened for reading */ + if (!(src_file.file->f_mode & FMODE_READ)) + goto out_fput; + + /* src and dst must be regular files */ + ret = -EISDIR; + if (!S_ISREG(src_inode->i_mode) || !S_ISREG(dst_inode->i_mode)) + goto out_fput; + + ret = -EXDEV; + if (src_file.file->f_path.mnt != dst_file->f_path.mnt || + src_inode->i_sb != dst_inode->i_sb) + goto out_fput; + + /* XXX: do we lock at all? what if server needs CB_RECALL_LAYOUT? */ + if (dst_inode < src_inode) { + mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_PARENT); + mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_CHILD); + } else { + mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_PARENT); + mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_CHILD); + } + + /* flush all pending writes on both src and dst so that server + * has the latest data */ + ret = nfs_sync_inode(src_inode); + if (ret) + goto out_unlock; + ret = nfs_sync_inode(dst_inode); + if (ret) + goto out_unlock; + + ret = nfs42_proc_clone(src_file.file, dst_file, src_off, dst_off, count); + + /* truncate inode page cache of the dst range so that future reads can fetch + * new data from server */ + if (!ret) + truncate_inode_pages_range(&dst_inode->i_data, dst_off, dst_off + count - 1); + +out_unlock: + if (dst_inode < src_inode) { + mutex_unlock(&src_inode->i_mutex); + mutex_unlock(&dst_inode->i_mutex); + } else { + mutex_unlock(&dst_inode->i_mutex); + mutex_unlock(&src_inode->i_mutex); + } +out_fput: + fdput(src_file); +out_drop_write: + mnt_drop_write_file(dst_file); + return ret; +} #endif /* CONFIG_NFS_V4_2 */ +long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + switch (cmd) { +#ifdef CONFIG_NFS_V4_2 + case NFS_IOC_CLONE: + return nfs42_ioctl_clone(file, arg, 0, 0, 0); +#endif + } + + return -ENOTTY; +} + const struct file_operations nfs4_file_operations = { #ifdef CONFIG_NFS_V4_2 .llseek = nfs4_file_llseek, @@ -216,4 +313,9 @@ const struct file_operations nfs4_file_operations = { #endif /* CONFIG_NFS_V4_2 */ .check_flags = nfs_check_flags, .setlease = simple_nosetlease, +#ifdef CONFIG_COMPAT + .unlocked_ioctl = nfs4_ioctl, +#else + .compat_ioctl = nfs4_ioctl, +#endif /* CONFIG_COMPAT */ }; diff --git a/include/uapi/linux/nfs.h b/include/uapi/linux/nfs.h index 5199a36..d85748d 100644 --- a/include/uapi/linux/nfs.h +++ b/include/uapi/linux/nfs.h @@ -31,6 +31,10 @@ #define NFS_PIPE_DIRNAME "nfs" +/* NFS ioctls */ +/* Let's follow btrfs lead on CLONE to avoid messing userspace */ +#define NFS_IOC_CLONE _IOW(0x94, 9, int) + /* * NFS stats. The good thing with these values is that NFSv3 errors are * a superset of NFSv2 errors (with the exception of NFSERR_WFLUSH which -- cgit v0.10.2 From 2a92ee92d4545448066fb664674c0ae5a9d5ea99 Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:37 +0800 Subject: nfs: get clone_blksize when probing fsinfo NFSv42 CLONE operation is supposed to respect it. Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/client.c b/fs/nfs/client.c index 57c5a02..d6d5d2a 100644 --- a/fs/nfs/client.c +++ b/fs/nfs/client.c @@ -764,6 +764,7 @@ static void nfs_server_set_fsinfo(struct nfs_server *server, server->time_delta = fsinfo->time_delta; + server->clone_blksize = fsinfo->clone_blksize; /* We're airborne Set socket buffersize */ rpc_setbufsize(server->client, server->wsize + 100, server->rsize + 100); } diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 9688b1a..8814fbe 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -239,6 +239,7 @@ const u32 nfs4_fsinfo_bitmap[3] = { FATTR4_WORD0_MAXFILESIZE FATTR4_WORD1_TIME_DELTA | FATTR4_WORD1_FS_LAYOUT_TYPES, FATTR4_WORD2_LAYOUT_BLKSIZE + | FATTR4_WORD2_CLONE_BLKSIZE }; const u32 nfs4_fs_locations_bitmap[3] = { diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c index 868472b..9f65679 100644 --- a/fs/nfs/nfs4xdr.c +++ b/fs/nfs/nfs4xdr.c @@ -4764,6 +4764,28 @@ static int decode_attr_layout_blksize(struct xdr_stream *xdr, uint32_t *bitmap, return 0; } +/* + * The granularity of a CLONE operation. + */ +static int decode_attr_clone_blksize(struct xdr_stream *xdr, uint32_t *bitmap, + uint32_t *res) +{ + __be32 *p; + + dprintk("%s: bitmap is %x\n", __func__, bitmap[2]); + *res = 0; + if (bitmap[2] & FATTR4_WORD2_CLONE_BLKSIZE) { + p = xdr_inline_decode(xdr, 4); + if (unlikely(!p)) { + print_overflow_msg(__func__, xdr); + return -EIO; + } + *res = be32_to_cpup(p); + bitmap[2] &= ~FATTR4_WORD2_CLONE_BLKSIZE; + } + return 0; +} + static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo) { unsigned int savep; @@ -4798,6 +4820,9 @@ static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo) status = decode_attr_layout_blksize(xdr, bitmap, &fsinfo->blksize); if (status) goto xdr_error; + status = decode_attr_clone_blksize(xdr, bitmap, &fsinfo->clone_blksize); + if (status) + goto xdr_error; status = verify_attr_len(xdr, savep, attrlen); xdr_error: diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h index c0c695b..e7e7853 100644 --- a/include/linux/nfs4.h +++ b/include/linux/nfs4.h @@ -422,6 +422,7 @@ enum lock_type4 { #define FATTR4_WORD2_LAYOUT_TYPES (1UL << 0) #define FATTR4_WORD2_LAYOUT_BLKSIZE (1UL << 1) #define FATTR4_WORD2_MDSTHRESHOLD (1UL << 4) +#define FATTR4_WORD2_CLONE_BLKSIZE (1UL << 13) #define FATTR4_WORD2_SECURITY_LABEL (1UL << 16) /* MDS threshold bitmap bits */ diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h index a50de10..2469ab0 100644 --- a/include/linux/nfs_fs_sb.h +++ b/include/linux/nfs_fs_sb.h @@ -147,6 +147,7 @@ struct nfs_server { unsigned int acdirmax; unsigned int namelen; unsigned int options; /* extra options enabled by mount */ + unsigned int clone_blksize; /* granularity of a CLONE operation */ #define NFS_OPTION_FSCACHE 0x00000001 /* - local caching enabled */ #define NFS_OPTION_MIGRATION 0x00000002 /* - NFSv4 migration enabled */ diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h index ac678b7..92ff445 100644 --- a/include/linux/nfs_xdr.h +++ b/include/linux/nfs_xdr.h @@ -141,6 +141,7 @@ struct nfs_fsinfo { __u32 lease_time; /* in seconds */ __u32 layouttype; /* supported pnfs layout driver */ __u32 blksize; /* preferred pnfs io block size */ + __u32 clone_blksize; /* granularity of a CLONE operation */ }; struct nfs_fsstat { -- cgit v0.10.2 From 811b7b85d6641df580a6c43184cf13d6fcc7498d Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:38 +0800 Subject: nfs42: respect clone_blksize draft-ietf-nfsv4-minorversion2-38.txt says: Both cl_src_offset and cl_dst_offset must be aligned to the clone block size Section 12.2.1. The number of bytes to be cloned must be a multiple of the clone block size, except in the case in which cl_src_offset plus the number of bytes to be cloned is equal to the source file size. Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index fc68ba5..4f463dd 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -199,8 +199,10 @@ nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd, u64 src_off, u64 dst_off, u64 count) { struct inode *dst_inode = file_inode(dst_file); + struct nfs_server *server = NFS_SERVER(dst_inode); struct fd src_file; struct inode *src_inode; + unsigned int bs = server->clone_blksize; int ret; /* dst file must be opened for writing */ @@ -238,6 +240,15 @@ nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd, src_inode->i_sb != dst_inode->i_sb) goto out_fput; + /* check alignment w.r.t. clone_blksize */ + ret = -EINVAL; + if (bs) { + if (!IS_ALIGNED(src_off, bs) || !IS_ALIGNED(dst_off, bs)) + goto out_fput; + if (!IS_ALIGNED(count, bs) && i_size_read(src_inode) != (src_off + count)) + goto out_fput; + } + /* XXX: do we lock at all? what if server needs CB_RECALL_LAYOUT? */ if (dst_inode < src_inode) { mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_PARENT); -- cgit v0.10.2 From a340abcf4173461f688292a6879b4d5bc781c2b1 Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Sat, 26 Sep 2015 02:24:39 +0800 Subject: nfs42: add NFS_IOC_CLONE_RANGE ioctl It follows btrfs BTRFS_IOC_CLONE_RANGE lead on ioctl number and arguments. Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index 4f463dd..4384a1d 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -288,14 +288,28 @@ out_drop_write: mnt_drop_write_file(dst_file); return ret; } + +static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp) +{ + struct nfs_ioctl_clone_range_args args; + + if (copy_from_user(&args, argp, sizeof(args))) + return -EFAULT; + + return nfs42_ioctl_clone(dst_file, args.src_fd, args.src_off, args.dst_off, args.count); +} #endif /* CONFIG_NFS_V4_2 */ long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { + void __user *argp = (void __user *)arg; + switch (cmd) { #ifdef CONFIG_NFS_V4_2 case NFS_IOC_CLONE: return nfs42_ioctl_clone(file, arg, 0, 0, 0); + case NFS_IOC_CLONE_RANGE: + return nfs42_ioctl_clone_range(file, argp); #endif } diff --git a/include/uapi/linux/nfs.h b/include/uapi/linux/nfs.h index d85748d..c6b86cc 100644 --- a/include/uapi/linux/nfs.h +++ b/include/uapi/linux/nfs.h @@ -33,7 +33,14 @@ /* NFS ioctls */ /* Let's follow btrfs lead on CLONE to avoid messing userspace */ -#define NFS_IOC_CLONE _IOW(0x94, 9, int) +#define NFS_IOC_CLONE _IOW(0x94, 9, int) +#define NFS_IOC_CLONE_RANGE _IOW(0x94, 13, int) + +struct nfs_ioctl_clone_range_args { + __s64 src_fd; + __u64 src_off, count; + __u64 dst_off; +}; /* * NFS stats. The good thing with these values is that NFSv3 errors are -- cgit v0.10.2 From 275058a2188903786e42b380ea5889ef0a7cdf95 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 15 Oct 2015 16:20:11 -0400 Subject: NFS: Fix an 'unused variable' complaint when #ifndef CONFIG_NFS_V4_2 Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index 4384a1d..4aa5719 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -298,6 +298,17 @@ static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp) return nfs42_ioctl_clone(dst_file, args.src_fd, args.src_off, args.dst_off, args.count); } +#else +static long nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd, + u64 src_off, u64 dst_off, u64 count) +{ + return -ENOTTY; +} + +static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp) +{ + return -ENOTTY; +} #endif /* CONFIG_NFS_V4_2 */ long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg) @@ -305,12 +316,10 @@ long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg) void __user *argp = (void __user *)arg; switch (cmd) { -#ifdef CONFIG_NFS_V4_2 case NFS_IOC_CLONE: return nfs42_ioctl_clone(file, arg, 0, 0, 0); case NFS_IOC_CLONE_RANGE: return nfs42_ioctl_clone_range(file, argp); -#endif } return -ENOTTY; -- cgit v0.10.2 From b9788a446828703cf126dfb9d3843d240af44122 Mon Sep 17 00:00:00 2001 From: Peng Tao Date: Wed, 21 Oct 2015 03:18:16 +0800 Subject: nfs: add missing linux/types.h After merging the nfs tree, today's linux-next build (powerpc allyesconfig produced this warning: ./usr/include/linux/nfs.h:40: found __[us]{8,16,32,64} type without #include Reported-by: Stephen Rothwell Signed-off-by: Peng Tao Signed-off-by: Trond Myklebust diff --git a/include/uapi/linux/nfs.h b/include/uapi/linux/nfs.h index c6b86cc..654bae3 100644 --- a/include/uapi/linux/nfs.h +++ b/include/uapi/linux/nfs.h @@ -7,6 +7,8 @@ #ifndef _UAPI_LINUX_NFS_H #define _UAPI_LINUX_NFS_H +#include + #define NFS_PROGRAM 100003 #define NFS_PORT 2049 #define NFS_MAXDATA 8192 -- cgit v0.10.2 From c646619355d196293daffdbb4fd877c8f5048e49 Mon Sep 17 00:00:00 2001 From: Li RongQing Date: Thu, 24 Sep 2015 10:19:09 +0800 Subject: nfsroot: make nfsroot to accept the 1024 bytes long directory name although NFS_MAXPATHLEN is defined to 1024, nfs client hopes to accept a 1024 byte path, but nfs_root_parms is limited to 256, and the nfs path will truncated when a user inputs nfs path from kernel cmdline enlarge nfs_root_parms to 1024, to make it accept the 1024 bytes long directory name, since nfs_root_parms is defined as _initdata, it will be released after system bootup Signed-off-by: Li RongQing Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfsroot.c b/fs/nfs/nfsroot.c index 9bc9f04..89a15db 100644 --- a/fs/nfs/nfsroot.c +++ b/fs/nfs/nfsroot.c @@ -90,7 +90,7 @@ #define NFS_DEF_OPTIONS "vers=2,udp,rsize=4096,wsize=4096" /* Parameters passed from the kernel command line */ -static char nfs_root_parms[256] __initdata = ""; +static char nfs_root_parms[NFS_MAXPATHLEN + 1] __initdata = ""; /* Text-based mount options passed to super.c */ static char nfs_root_options[256] __initdata = NFS_DEF_OPTIONS; -- cgit v0.10.2 From d4e2ce096101bad9ca5bad3c27488905f91e30cb Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:55:21 +0800 Subject: NFS: Get rid of the unneeded addr stored in callback arguments Commit c36fca52f5 "NFS refactor nfs_find_client and reference client across callback processing" has store clp in cb_process_state which is set in cb_sequence. So that, it's unneeded to store address pointer in any callback arguments. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h index 84326e9..0a590f0 100644 --- a/fs/nfs/callback.h +++ b/fs/nfs/callback.h @@ -61,7 +61,6 @@ struct cb_compound_hdr_res { }; struct cb_getattrargs { - struct sockaddr *addr; struct nfs_fh fh; uint32_t bitmap[2]; }; @@ -76,7 +75,6 @@ struct cb_getattrres { }; struct cb_recallargs { - struct sockaddr *addr; struct nfs_fh fh; nfs4_stateid stateid; uint32_t truncate; @@ -134,7 +132,6 @@ extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation, #define RCA4_TYPE_MASK_ALL 0xf31f struct cb_recallanyargs { - struct sockaddr *craa_addr; uint32_t craa_objs_to_keep; uint32_t craa_type_mask; }; @@ -144,7 +141,6 @@ extern __be32 nfs4_callback_recallany(struct cb_recallanyargs *args, struct cb_process_state *cps); struct cb_recallslotargs { - struct sockaddr *crsa_addr; uint32_t crsa_target_highest_slotid; }; extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args, @@ -152,7 +148,6 @@ extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args, struct cb_process_state *cps); struct cb_layoutrecallargs { - struct sockaddr *cbl_addr; uint32_t cbl_recall_type; uint32_t cbl_layout_type; uint32_t cbl_layoutchanged; diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 6b1697a..e87d7f0 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -198,7 +198,6 @@ static __be32 decode_getattr_args(struct svc_rqst *rqstp, struct xdr_stream *xdr status = decode_fh(xdr, &args->fh); if (unlikely(status != 0)) goto out; - args->addr = svc_addr(rqstp); status = decode_bitmap(xdr, args->bitmap); out: dprintk("%s: exit with status = %d\n", __func__, ntohl(status)); @@ -210,7 +209,6 @@ static __be32 decode_recall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr, __be32 *p; __be32 status; - args->addr = svc_addr(rqstp); status = decode_stateid(xdr, &args->stateid); if (unlikely(status != 0)) goto out; @@ -236,7 +234,6 @@ static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp, __be32 status = 0; uint32_t iomode; - args->cbl_addr = svc_addr(rqstp); p = read_buf(xdr, 4 * sizeof(uint32_t)); if (unlikely(p == NULL)) { status = htonl(NFS4ERR_BADXDR); @@ -500,7 +497,6 @@ static __be32 decode_recallany_args(struct svc_rqst *rqstp, uint32_t bitmap[2]; __be32 *p, status; - args->craa_addr = svc_addr(rqstp); p = read_buf(xdr, 4); if (unlikely(p == NULL)) return htonl(NFS4ERR_BADXDR); @@ -519,7 +515,6 @@ static __be32 decode_recallslot_args(struct svc_rqst *rqstp, { __be32 *p; - args->crsa_addr = svc_addr(rqstp); p = read_buf(xdr, 4); if (unlikely(p == NULL)) return htonl(NFS4ERR_BADXDR); -- cgit v0.10.2 From 8c163d8e5ad2bd7982904bbe568706e1b0bbf60a Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:55:59 +0800 Subject: NFS: Remove the left global variable nfs_callback_tcpport Commit bbe0a3aa4e22 "NFS: make nfs_callback_tcpport per network context" has make nfs_callback_tcpport per network, but left the global nfs_callback_tcpport, remove it. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h index 0a590f0..cbcc903 100644 --- a/fs/nfs/callback.h +++ b/fs/nfs/callback.h @@ -204,6 +204,5 @@ extern int nfs4_set_callback_sessionid(struct nfs_client *clp); #define NFS41_BC_MAX_CALLBACKS 1 extern unsigned int nfs_callback_set_tcpport; -extern unsigned short nfs_callback_tcpport; #endif /* __LINUX_FS_NFS_CALLBACK_H */ diff --git a/fs/nfs/super.c b/fs/nfs/super.c index 383a027..f126828 100644 --- a/fs/nfs/super.c +++ b/fs/nfs/super.c @@ -2816,7 +2816,6 @@ out_invalid_transport_udp: * NFS client for backwards compatibility */ unsigned int nfs_callback_set_tcpport; -unsigned short nfs_callback_tcpport; /* Default cache timeout is 10 minutes */ unsigned int nfs_idmap_cache_timeout = 600; /* Turn off NFSv4 uid/gid mapping when using AUTH_SYS */ @@ -2827,7 +2826,6 @@ char nfs4_client_id_uniquifier[NFS4_CLIENT_ID_UNIQ_LEN] = ""; bool recover_lost_locks = false; EXPORT_SYMBOL_GPL(nfs_callback_set_tcpport); -EXPORT_SYMBOL_GPL(nfs_callback_tcpport); EXPORT_SYMBOL_GPL(nfs_idmap_cache_timeout); EXPORT_SYMBOL_GPL(nfs4_disable_idmapping); EXPORT_SYMBOL_GPL(max_session_slots); -- cgit v0.10.2 From f765bf762bb57d8bda825e5440a4801232cd835f Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:56:29 +0800 Subject: NFS: Remove the left function defines in callback.h Commit 778be232a207 "NFS do not find client in NFSv4 pg_authenticate" has remove the define and using of nfs4_set_callback_sessionid(), and commit 36281caa839f "NFSv4: Further clean-ups of delegation stateid validation" has update the checking of stateid, and move the code to nfs4proc.c. This patch remove those function defines left in callback.h Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h index cbcc903..ff8195b 100644 --- a/fs/nfs/callback.h +++ b/fs/nfs/callback.h @@ -117,9 +117,6 @@ extern __be32 nfs4_callback_sequence(struct cb_sequenceargs *args, struct cb_sequenceres *res, struct cb_process_state *cps); -extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation, - const nfs4_stateid *stateid); - #define RCA4_TYPE_MASK_RDATA_DLG 0 #define RCA4_TYPE_MASK_WDATA_DLG 1 #define RCA4_TYPE_MASK_DIR_DLG 2 @@ -191,9 +188,6 @@ extern __be32 nfs4_callback_recall(struct cb_recallargs *args, void *dummy, #if IS_ENABLED(CONFIG_NFS_V4) extern int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt); extern void nfs_callback_down(int minorversion, struct net *net); -extern int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation, - const nfs4_stateid *stateid); -extern int nfs4_set_callback_sessionid(struct nfs_client *clp); #endif /* CONFIG_NFS_V4 */ /* * nfs41: Callbacks are expected to not cause substantial latency, -- cgit v0.10.2 From 39de493e8801812cae076a02d84b4f80b88f94b9 Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:57:03 +0800 Subject: NFS: Remove unneeded NFS_DEBUG checking before define NFSDBG_FACILITY It's not needed to checking NFS_DEBUG before define NFSDBG_FACILITY, remove it. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c index b85cf7a..807eb6e 100644 --- a/fs/nfs/callback_proc.c +++ b/fs/nfs/callback_proc.c @@ -17,9 +17,7 @@ #include "nfs4session.h" #include "nfs4trace.h" -#ifdef NFS_DEBUG #define NFSDBG_FACILITY NFSDBG_CALLBACK -#endif __be32 nfs4_callback_getattr(struct cb_getattrargs *args, struct cb_getattrres *res, diff --git a/fs/nfs/mount_clnt.c b/fs/nfs/mount_clnt.c index 99a4528..09b1900 100644 --- a/fs/nfs/mount_clnt.c +++ b/fs/nfs/mount_clnt.c @@ -16,9 +16,7 @@ #include #include "internal.h" -#ifdef NFS_DEBUG -# define NFSDBG_FACILITY NFSDBG_MOUNT -#endif +#define NFSDBG_FACILITY NFSDBG_MOUNT /* * Defined by RFC 1094, section A.3; and RFC 1813, section 5.1.4 -- cgit v0.10.2 From 590184a6cea32c1f864a8e614f44e35a81a53fe0 Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:57:37 +0800 Subject: NFS: Use NFS4_MAX_SESSIONID_LEN directly for decode/encode sessionid It's no need to define a temporary variables for NFS4_MAX_SESSIONID_LEN. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index e87d7f0..09aeb9a 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -380,13 +380,12 @@ static __be32 decode_sessionid(struct xdr_stream *xdr, struct nfs4_sessionid *sid) { __be32 *p; - int len = NFS4_MAX_SESSIONID_LEN; - p = read_buf(xdr, len); + p = read_buf(xdr, NFS4_MAX_SESSIONID_LEN); if (unlikely(p == NULL)) return htonl(NFS4ERR_RESOURCE); - memcpy(sid->data, p, len); + memcpy(sid->data, p, NFS4_MAX_SESSIONID_LEN); return 0; } @@ -679,13 +678,12 @@ static __be32 encode_sessionid(struct xdr_stream *xdr, const struct nfs4_sessionid *sid) { __be32 *p; - int len = NFS4_MAX_SESSIONID_LEN; - p = xdr_reserve_space(xdr, len); + p = xdr_reserve_space(xdr, NFS4_MAX_SESSIONID_LEN); if (unlikely(p == NULL)) return htonl(NFS4ERR_RESOURCE); - memcpy(p, sid, len); + memcpy(p, sid, NFS4_MAX_SESSIONID_LEN); return 0; } -- cgit v0.10.2 From 45724e8a5b2a69b9524fd16ff73345fab9aae279 Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:57:58 +0800 Subject: NFS: Fix bad defines of callback response maxsize As CB_OP_TAGLEN_MAXSZ, all XXX_MAXSZ should be defined as bit. Each operation should not cantains CB_OP_TAGLEN_MAXSZ. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 09aeb9a..9f0f0f6 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -18,19 +18,21 @@ #include "internal.h" #include "nfs4session.h" -#define CB_OP_TAGLEN_MAXSZ (512) -#define CB_OP_HDR_RES_MAXSZ (2 + CB_OP_TAGLEN_MAXSZ) -#define CB_OP_GETATTR_BITMAP_MAXSZ (4) -#define CB_OP_GETATTR_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \ - CB_OP_GETATTR_BITMAP_MAXSZ + \ - 2 + 2 + 3 + 3) -#define CB_OP_RECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) +#define CB_OP_TAGLEN_MAXSZ (512) +#define CB_OP_HDR_RES_MAXSZ (2 * 4) // opcode, status +#define CB_OP_GETATTR_BITMAP_MAXSZ (4 * 4) // bitmap length, 3 bitmaps +#define CB_OP_GETATTR_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \ + CB_OP_GETATTR_BITMAP_MAXSZ + \ + /* change, size, ctime, mtime */\ + (2 + 2 + 3 + 3) * 4) +#define CB_OP_RECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) #if defined(CONFIG_NFS_V4_1) #define CB_OP_LAYOUTRECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) #define CB_OP_DEVICENOTIFY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) #define CB_OP_SEQUENCE_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \ - 4 + 1 + 3) + NFS4_MAX_SESSIONID_LEN + \ + (1 + 3) * 4) // seqid, 3 slotids #define CB_OP_RECALLANY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) #define CB_OP_RECALLSLOT_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) #endif /* CONFIG_NFS_V4_1 */ -- cgit v0.10.2 From 403889c0399c01a12877e3736ae1e96c9ded27be Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:58:16 +0800 Subject: NFS: Fix bad checking of max taglen in callback request The taglen should be checked with CB_OP_TAGLEN_MAXSZ directly. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 9f0f0f6..4ad39fe 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -159,7 +159,7 @@ static __be32 decode_compound_hdr_arg(struct xdr_stream *xdr, struct cb_compound if (unlikely(status != 0)) return status; /* We do not like overly long tags! */ - if (hdr->taglen > CB_OP_TAGLEN_MAXSZ - 12) { + if (hdr->taglen > CB_OP_TAGLEN_MAXSZ) { printk("NFS: NFSv4 CALLBACK %s: client sent tag of length %u\n", __func__, hdr->taglen); return htonl(NFS4ERR_RESOURCE); -- cgit v0.10.2 From e0a63c0bfcc6f8283377258b5d892f88abd913c1 Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Thu, 24 Sep 2015 20:58:38 +0800 Subject: NFS: Return directly if encode_sessionid fail encode_sessionid() may return error, nfs needs process the return value. Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 4ad39fe..646cdac 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -699,7 +699,9 @@ static __be32 encode_cb_sequence_res(struct svc_rqst *rqstp, if (unlikely(status != 0)) goto out; - encode_sessionid(xdr, &res->csr_sessionid); + status = encode_sessionid(xdr, &res->csr_sessionid); + if (status) + goto out; p = xdr_reserve_space(xdr, 4 * sizeof(uint32_t)); if (unlikely(p == NULL)) -- cgit v0.10.2 From 15ae2c7bdc9a5a46999319b88a465a64d265dc40 Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Fri, 16 Oct 2015 17:22:50 +0800 Subject: nfs/blocklayout: Fix bad using of page offset in bl_read_pagelist Blocklayout uses file offset for the read-back page's offset of first writing, it's definitely wrong, it writes data to bad address of page that cause userspace application segment fault. It must be the page base stored in header->args.pgbase. Also, the pg_offset has no influence with isect and extent length. Note: The offset of the non-first page is always zero. Ps: A test program will segment fault at read() as, #define _GNU_SOURCE #include #include #include #include #include #include #include int main(int argc, char **argv) { char buf[2049]; char *filename = NULL; int fd = -1; if (argc < 2) { printf("Usage: %s filename\n", argv[0]); return 0; } filename = argv[1]; fd = open(filename, O_RDONLY | O_DIRECT); if (fd < 0) { printf("Open %s fail: %m\n", filename); return 1; } lseek(fd, 2048, SEEK_SET); if (read(fd, buf, sizeof(buf) - 1) != (sizeof(buf) - 1)) printf("Read 4096 bityes data from %s fail: %m\n", filename); out: close(fd); return 0; } Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c index 9cd4eb3..ddd0138 100644 --- a/fs/nfs/blocklayout/blocklayout.c +++ b/fs/nfs/blocklayout/blocklayout.c @@ -229,7 +229,7 @@ bl_read_pagelist(struct nfs_pgio_header *header) struct parallel_io *par; loff_t f_offset = header->args.offset; size_t bytes_left = header->args.count; - unsigned int pg_offset, pg_len; + unsigned int pg_offset = header->args.pgbase, pg_len; struct page **pages = header->args.pages; int pg_index = header->args.pgbase >> PAGE_CACHE_SHIFT; const bool is_dio = (header->dreq != NULL); @@ -262,7 +262,6 @@ bl_read_pagelist(struct nfs_pgio_header *header) extent_length = be.be_length - (isect - be.be_f_offset); } - pg_offset = f_offset & ~PAGE_CACHE_MASK; if (is_dio) { if (pg_offset + bytes_left > PAGE_CACHE_SIZE) pg_len = PAGE_CACHE_SIZE - pg_offset; @@ -273,9 +272,6 @@ bl_read_pagelist(struct nfs_pgio_header *header) pg_len = PAGE_CACHE_SIZE; } - isect += (pg_offset >> SECTOR_SHIFT); - extent_length -= (pg_offset >> SECTOR_SHIFT); - if (is_hole(&be)) { bio = bl_submit_bio(READ, bio); /* Fill hole w/ zeroes w/o accessing device */ @@ -301,6 +297,7 @@ bl_read_pagelist(struct nfs_pgio_header *header) extent_length -= (pg_len >> SECTOR_SHIFT); f_offset += pg_len; bytes_left -= pg_len; + pg_offset = 0; } if ((isect << SECTOR_SHIFT) >= header->inode->i_size) { header->res.eof = 1; -- cgit v0.10.2 From f8417b481cce2bed4744fda733f2ff22278bd7ce Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Fri, 16 Oct 2015 17:23:29 +0800 Subject: NFSv4.1/pnfs: Retry through MDS when getting bad length of data If non rpc-based layout driver return bad length of data, nfs retries by calling rpc_restart_call_prepare() that cause an NULL reference panic. This patch lets nfs retry through MDS for non rpc-based layout driver return bad length of data. [13034.883329] BUG: unable to handle kernel NULL pointer dereference at (null) [13034.884902] IP: [] rpc_restart_call_prepare+0x62/0x90 [sunrpc] [13034.886558] PGD 0 [13034.888126] Oops: 0000 [#1] KASAN [13034.889710] Modules linked in: blocklayoutdriver(OE) nfsv4(OE) nfs(OE) fscache(E) nfsd(OE) xfs libcrc32c coretemp btrfs crct10dif_pclmul crc32_pclmul crc32c_intel ghash_clmulni_intel ppdev vmw_balloon auth_rpcgss shpchp nfs_acl lockd vmw_vmci parport_pc xor raid6_pq grace parport sunrpc i2c_piix4 vmwgfx drm_kms_helper ttm drm mptspi e1000 serio_raw scsi_transport_spi mptscsih mptbase ata_generic pata_acpi [last unloaded: fscache] [13034.898260] CPU: 0 PID: 10112 Comm: kworker/0:1 Tainted: G OE 4.3.0-rc5+ #279 [13034.899932] Hardware name: VMware, Inc. VMware Virtual Platform/440BX Desktop Reference Platform, BIOS 6.00 07/02/2015 [13034.903342] Workqueue: events bl_read_cleanup [blocklayoutdriver] [13034.905059] task: ffff88006a9148c0 ti: ffff880035e90000 task.ti: ffff880035e90000 [13034.906827] RIP: 0010:[] [] rpc_restart_call_prepare+0x62/0x90 [sunrpc] [13034.910522] RSP: 0018:ffff880035e97b58 EFLAGS: 00010282 [13034.912378] RAX: fffffbfff04a5a94 RBX: ffff880068fe4858 RCX: 0000000000000003 [13034.914339] RDX: dffffc0000000000 RSI: 0000000000000003 RDI: 0000000000000282 [13034.916236] RBP: ffff880035e97b68 R08: 0000000000000001 R09: 0000000000000001 [13034.918229] R10: 0000000000000000 R11: 0000000000000001 R12: 0000000000000000 [13034.920007] R13: ffff880068fe4858 R14: ffff880068fe4a60 R15: 0000000000001000 [13034.921845] FS: 0000000000000000(0000) GS:ffffffff82247000(0000) knlGS:0000000000000000 [13034.923645] CS: 0010 DS: 0000 ES: 0000 CR0: 0000000080050033 [13034.925525] CR2: 0000000000000000 CR3: 00000000063dd000 CR4: 00000000001406f0 [13034.932808] Stack: [13034.934813] ffff880068fe4780 0000000000001000 ffff880035e97ba8 ffffffffa08800d2 [13034.936675] ffffffffa088029d ffff880068fe4780 ffff880068fe4858 ffffffffa089c0a0 [13034.938593] ffff880068fe47e0 ffff88005d59faf0 ffff880035e97be0 ffffffffa087e08f [13034.940454] Call Trace: [13034.942388] [] nfs_readpage_result+0x112/0x200 [nfs] [13034.944317] [] ? nfs_readpage_done+0xdd/0x160 [nfs] [13034.946267] [] nfs_pgio_result+0x9f/0x120 [nfs] [13034.948166] [] pnfs_ld_read_done+0x7c/0x1e0 [nfsv4] [13034.950247] [] bl_read_cleanup+0x2e/0x60 [blocklayoutdriver] [13034.952156] [] process_one_work+0x412/0x870 [13034.954102] [] ? process_one_work+0x334/0x870 [13034.955949] [] ? queue_delayed_work_on+0x40/0x40 [13034.957985] [] worker_thread+0x81/0x6a0 [13034.959817] [] ? process_one_work+0x870/0x870 [13034.961785] [] kthread+0x17d/0x1a0 [13034.963544] [] ? kthread_create_on_node+0x330/0x330 [13034.965479] [] ? finish_task_switch+0x88/0x220 [13034.967223] [] ? kthread_create_on_node+0x330/0x330 [13034.968929] [] ret_from_fork+0x3f/0x70 [13034.970534] [] ? kthread_create_on_node+0x330/0x330 [13034.972176] Code: c7 43 50 40 84 0d a0 e8 3d fe 1c e1 48 8d 7b 58 c7 83 e4 00 00 00 00 00 00 00 e8 ca fe 1c e1 4c 8b 63 58 4c 89 e7 e8 be fe 1c e1 <49> 83 3c 24 00 74 12 48 c7 43 50 f0 a2 0e a0 b8 01 00 00 00 5b [13034.977148] RIP [] rpc_restart_call_prepare+0x62/0x90 [sunrpc] [13034.978780] RSP [13034.980399] CR2: 0000000000000000 Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c index 8abe271..93496c0 100644 --- a/fs/nfs/pnfs.c +++ b/fs/nfs/pnfs.c @@ -1912,12 +1912,13 @@ static void pnfs_ld_handle_write_error(struct nfs_pgio_header *hdr) */ void pnfs_ld_write_done(struct nfs_pgio_header *hdr) { - trace_nfs4_pnfs_write(hdr, hdr->pnfs_error); - if (!hdr->pnfs_error) { + if (likely(!hdr->pnfs_error)) { pnfs_set_layoutcommit(hdr->inode, hdr->lseg, hdr->mds_offset + hdr->res.count); hdr->mds_ops->rpc_call_done(&hdr->task, hdr); - } else + } + trace_nfs4_pnfs_write(hdr, hdr->pnfs_error); + if (unlikely(hdr->pnfs_error)) pnfs_ld_handle_write_error(hdr); hdr->mds_ops->rpc_release(hdr); } @@ -2028,11 +2029,12 @@ static void pnfs_ld_handle_read_error(struct nfs_pgio_header *hdr) */ void pnfs_ld_read_done(struct nfs_pgio_header *hdr) { - trace_nfs4_pnfs_read(hdr, hdr->pnfs_error); if (likely(!hdr->pnfs_error)) { __nfs4_read_done_cb(hdr); hdr->mds_ops->rpc_call_done(&hdr->task, hdr); - } else + } + trace_nfs4_pnfs_read(hdr, hdr->pnfs_error); + if (unlikely(hdr->pnfs_error)) pnfs_ld_handle_read_error(hdr); hdr->mds_ops->rpc_release(hdr); } diff --git a/fs/nfs/read.c b/fs/nfs/read.c index 01b8cc8..0a5e33f 100644 --- a/fs/nfs/read.c +++ b/fs/nfs/read.c @@ -246,6 +246,13 @@ static void nfs_readpage_retry(struct rpc_task *task, nfs_set_pgio_error(hdr, -EIO, argp->offset); return; } + + /* For non rpc-based layout drivers, retry-through-MDS */ + if (!task->tk_ops) { + hdr->pnfs_error = -EAGAIN; + return; + } + /* Yes, so retry the read at the end of the hdr */ hdr->mds_offset += resp->count; argp->offset += resp->count; @@ -268,7 +275,7 @@ static void nfs_readpage_result(struct rpc_task *task, hdr->good_bytes = bound - hdr->io_start; } spin_unlock(&hdr->lock); - } else if (hdr->res.count != hdr->args.count) + } else if (hdr->res.count < hdr->args.count) nfs_readpage_retry(task, hdr); } diff --git a/fs/nfs/write.c b/fs/nfs/write.c index 75ab762..7b93164 100644 --- a/fs/nfs/write.c +++ b/fs/nfs/write.c @@ -1505,6 +1505,13 @@ static void nfs_writeback_result(struct rpc_task *task, task->tk_status = -EIO; return; } + + /* For non rpc-based layout drivers, retry-through-MDS */ + if (!task->tk_ops) { + hdr->pnfs_error = -EAGAIN; + return; + } + /* Was this an NFSv2 write or an NFSv3 stable write? */ if (resp->verf->committed != NFS_UNSTABLE) { /* Resend from where the server left off */ -- cgit v0.10.2 From 8610586d820aa75dc7da470d77fc23492c2492f8 Mon Sep 17 00:00:00 2001 From: Steve Wise Date: Mon, 21 Sep 2015 12:24:34 -0500 Subject: xprtrdma: don't log warnings for flushed completions Unsignaled send WRs can get flushed as part of normal unmount, so don't log them as warnings. Signed-off-by: Steve Wise Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 5318951..0a36239 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -252,8 +252,11 @@ frwr_sendcompletion(struct ib_wc *wc) /* WARNING: Only wr_id and status are reliable at this point */ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + if (wc->status == IB_WC_WR_FLUSH_ERR) + dprintk("RPC: %s: frmr %p flushed\n", __func__, r); + else + pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", + __func__, r, ib_wc_status_msg(wc->status), wc->status); r->r.frmr.fr_state = FRMR_IS_STALE; } -- cgit v0.10.2 From a045178887ebafa9514d6b4cb840ac13a26c8365 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:29 -0400 Subject: xprtrdma: Enable swap-on-NFS/RDMA After adding a swapfile on an NFS/RDMA mount and removing the normal swap partition, I was able to push the NFS client well into swap without any issue. I forgot to swapoff the NFS file before rebooting. This pinned the NFS mount and the IB core and provider, causing shutdown to hang. I think this is expected and safe behavior. Probably shutdown scripts should "swapoff -a" before unmounting any filesystems. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 41e452b..e9e5ed7 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) static int xprt_rdma_enable_swap(struct rpc_xprt *xprt) { - return -EINVAL; + return 0; } static void -- cgit v0.10.2 From 7b3d770c67bc07db5035999e4f864c5f2ff7b10e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:37 -0400 Subject: xprtrdma: Re-arm after missed events ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns a positive value if WCs were added to a CQ after the last completion upcall but before the CQ has been re-armed. Commit 7f23f6f6e388 ("xprtrmda: Reduce lock contention in completion handlers") assumed that when ib_req_notify_cq() returned a positive RC, the CQ had also been successfully re-armed, making it safe to return control to the provider without losing any completion signals. That is an invalid assumption. Change both completion handlers to continue polling while ib_req_notify_cq() returns a positive value. Fixes: 7f23f6f6e388 ("xprtrmda: Reduce lock contention in ...") Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 5502d4d..61eea73 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -179,38 +179,17 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) return 0; } -/* - * Handle send, fast_reg_mr, and local_inv completions. - * - * Send events are typically suppressed and thus do not result - * in an upcall. Occasionally one is signaled, however. This - * prevents the provider's completion queue from wrapping and - * losing a completion. +/* Handle provider send completion upcalls. */ static void rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) { struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_sendcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } - - rpcrdma_sendcq_poll(cq, ep); + do { + rpcrdma_sendcq_poll(cq, ep); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); } static void @@ -274,42 +253,17 @@ out_schedule: return rc; } -/* - * Handle receive completions. - * - * It is reentrant but processes single events in order to maintain - * ordering of receives to keep server credits. - * - * It is the responsibility of the scheduled tasklet to return - * recv buffers to the pool. NOTE: this affects synchronization of - * connection shutdown. That is, the structures required for - * the completion of the reply handler must remain intact until - * all memory has been reclaimed. +/* Handle provider receive completion upcalls. */ static void rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_recvcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } - - rpcrdma_recvcq_poll(cq, ep); + do { + rpcrdma_recvcq_poll(cq, ep); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); } static void -- cgit v0.10.2 From 4220a07264c0517006a534aed201e29c8d297306 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:45 -0400 Subject: xprtrdma: Prevent loss of completion signals Commit 8301a2c047cc ("xprtrdma: Limit work done by completion handler") was supposed to prevent xprtrdma's upcall handlers from starving other softIRQ work by letting them return to the provider before all CQEs have been polled. The logic assumes the provider will call the upcall handler again immediately if the CQ is re-armed while there are still queued CQEs. This assumption is invalid. The IBTA spec says that after a CQ is armed, the hardware must interrupt only when a new CQE is inserted. xprtrdma can't rely on the provider calling again, even though some providers do. Therefore, leaving CQEs on queue makes sense only when there is another mechanism that ensures all remaining CQEs are consumed in a timely fashion. xprtrdma does not have such a mechanism. If a CQE remains queued, the transport can wait forever to send the next RPC. Finally, move the wcs array back onto the stack to ensure that the poll array is always local to the CPU where the completion upcall is running. Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...") Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 61eea73..6661b1b 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -158,25 +158,30 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc) } } -static int -rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +/* The common case is a single send completion is waiting. By + * passing two WC entries to ib_poll_cq, a return code of 1 + * means there is exactly one WC waiting and no more. We don't + * have to invoke ib_poll_cq again to know that the CQ has been + * properly drained. + */ +static void +rpcrdma_sendcq_poll(struct ib_cq *cq) { - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[2]; + int count, rc; - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_send_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - return rc; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_sendcq_process_wc(wcs++); - } while (rc == RPCRDMA_POLLSIZE && --budget); - return 0; + rpcrdma_sendcq_process_wc(pos++); + } while (rc == ARRAY_SIZE(wcs)); + return; } /* Handle provider send completion upcalls. @@ -184,10 +189,8 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) static void rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - do { - rpcrdma_sendcq_poll(cq, ep); + rpcrdma_sendcq_poll(cq); } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); } @@ -226,31 +229,32 @@ out_fail: goto out_schedule; } -static int -rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +/* The wc array is on stack: automatic memory is always CPU-local. + * + * struct ib_wc is 64 bytes, making the poll array potentially + * large. But this is at the bottom of the call chain. Further + * substantial work is done in another thread. + */ +static void +rpcrdma_recvcq_poll(struct ib_cq *cq) { - struct list_head sched_list; - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[4]; + LIST_HEAD(sched_list); + int count, rc; - INIT_LIST_HEAD(&sched_list); - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_recv_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - goto out_schedule; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(wcs++, &sched_list); - } while (rc == RPCRDMA_POLLSIZE && --budget); - rc = 0; + rpcrdma_recvcq_process_wc(pos++, &sched_list); + } while (rc == ARRAY_SIZE(wcs)); -out_schedule: rpcrdma_schedule_tasklet(&sched_list); - return rc; } /* Handle provider receive completion upcalls. @@ -258,10 +262,8 @@ out_schedule: static void rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - do { - rpcrdma_recvcq_poll(cq, ep); + rpcrdma_recvcq_poll(cq); } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); } @@ -623,7 +625,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -640,7 +642,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index c09414e..42c8d44 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -77,9 +77,6 @@ struct rpcrdma_ia { * RDMA Endpoint -- one per transport instance */ -#define RPCRDMA_WC_BUDGET (128) -#define RPCRDMA_POLLSIZE (16) - struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; @@ -89,8 +86,6 @@ struct rpcrdma_ep { struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE]; - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE]; }; /* -- cgit v0.10.2 From b0e178a2d8ad4bd6c6bbf5d3f3cf50ca8907581b Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:54 -0400 Subject: xprtrdma: Refactor reply handler error handling Clean up: The error cases in rpcrdma_reply_handler() almost never execute. Ensure the compiler places them out of the hot path. No behavior change expected. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index bc8bd65..60ffa63 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) unsigned long cwnd; u32 credits; - /* Check status. If bad, signal disconnect and return rep to pool */ - if (rep->rr_len == ~0U) { - rpcrdma_recv_buffer_put(rep); - if (r_xprt->rx_ep.rep_connected == 1) { - r_xprt->rx_ep.rep_connected = -EIO; - rpcrdma_conn_func(&r_xprt->rx_ep); - } - return; - } - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) { - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - } + dprintk("RPC: %s: incoming rep %p\n", __func__, rep); + + if (rep->rr_len == RPCRDMA_BAD_LEN) + goto out_badstatus; + if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + goto out_shortreply; + headerp = rdmab_to_msg(rep->rr_rdmabuf); - if (headerp->rm_vers != rpcrdma_version) { - dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(headerp->rm_vers)); - goto repost; - } + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; /* Get XID and try for a match. */ spin_lock(&xprt->transport_lock); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); - if (rqst == NULL) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p failed " - "to match any request xid 0x%08x len %d\n", - __func__, rep, be32_to_cpu(headerp->rm_xid), - rep->rr_len); -repost: - r_xprt->rx_stats.bad_reply_count++; - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) - rpcrdma_recv_buffer_put(rep); - - return; - } + if (!rqst) + goto out_nomatch; /* get request object */ req = rpcr_to_rdmar(rqst); - if (req->rl_reply) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: duplicate reply 0x%p to RPC " - "request 0x%p: xid 0x%08x\n", __func__, rep, req, - be32_to_cpu(headerp->rm_xid)); - goto repost; - } + if (req->rl_reply) + goto out_duplicate; dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", @@ -883,8 +858,44 @@ badheader: if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); + xprt_complete_rqst(rqst->rq_task, status); + spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); - xprt_complete_rqst(rqst->rq_task, status); + return; + +out_badstatus: + rpcrdma_recv_buffer_put(rep); + if (r_xprt->rx_ep.rep_connected == 1) { + r_xprt->rx_ep.rep_connected = -EIO; + rpcrdma_conn_func(&r_xprt->rx_ep); + } + return; + +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); + goto repost; + +out_badversion: + dprintk("RPC: %s: invalid version %d\n", + __func__, be32_to_cpu(headerp->rm_vers)); + goto repost; + +out_nomatch: + spin_unlock(&xprt->transport_lock); + dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", + __func__, be32_to_cpu(headerp->rm_xid), + rep->rr_len); + goto repost; + +out_duplicate: spin_unlock(&xprt->transport_lock); + dprintk("RPC: %s: " + "duplicate reply %p to RPC request %p: xid 0x%08x\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); + +repost: + r_xprt->rx_stats.bad_reply_count++; + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + rpcrdma_recv_buffer_put(rep); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 6661b1b..a60b4c4 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -225,7 +225,7 @@ out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", __func__, rep, ib_wc_status_msg(wc->status)); - rep->rr_len = ~0U; + rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 42c8d44..a13508b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -168,6 +168,8 @@ struct rpcrdma_rep { struct rpcrdma_regbuf *rr_rdmabuf; }; +#define RPCRDMA_BAD_LEN (~0U) + /* * struct rpcrdma_mw - external memory region metadata * -- cgit v0.10.2 From 1e465fd4ff475cc29c866ee75496c941b3908e69 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:02 -0400 Subject: xprtrdma: Replace send and receive arrays The rb_send_bufs and rb_recv_bufs arrays are used to implement a pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep structs. Replace those arrays with free lists. To allow more than 512 RPCs in-flight at once, each of these arrays would be larger than a page (assuming 8-byte addresses and 4KB pages). Allowing up to 64K in-flight RPCs (as TCP now does), each buffer array would have to be 128 pages. That's an order-6 allocation. (Not that we're going there.) A list is easier to expand dynamically. Instead of allocating a larger array of pointers and copying the existing pointers to the new array, simply append more buffers to each list. This also makes it simpler to manage receive buffers that might catch backwards-direction calls, or to post receive buffers in bulk to amortize the overhead of ib_post_recv. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index a60b4c4..c09f1b6 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -926,44 +926,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - char *p; - size_t len; int i, rc; - buf->rb_max_requests = cdata->max_requests; + buf->rb_max_requests = r_xprt->rx_data.max_requests; spin_lock_init(&buf->rb_lock); - /* Need to allocate: - * 1. arrays for send and recv pointers - * 2. arrays of struct rpcrdma_req to fill in pointers - * 3. array of struct rpcrdma_rep for replies - * Send/recv buffers in req/rep need to be registered - */ - len = buf->rb_max_requests * - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - - p = kzalloc(len, GFP_KERNEL); - if (p == NULL) { - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n", - __func__, len); - rc = -ENOMEM; - goto out; - } - buf->rb_pool = p; /* for freeing it later */ - - buf->rb_send_bufs = (struct rpcrdma_req **) p; - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests]; - buf->rb_recv_bufs = (struct rpcrdma_rep **) p; - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - rc = ia->ri_ops->ro_init(r_xprt); if (rc) goto out; + INIT_LIST_HEAD(&buf->rb_send_bufs); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; - struct rpcrdma_rep *rep; req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) { @@ -972,7 +946,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } - buf->rb_send_bufs[i] = req; + list_add(&req->rl_free, &buf->rb_send_bufs); + } + + INIT_LIST_HEAD(&buf->rb_recv_bufs); + for (i = 0; i < buf->rb_max_requests + 2; i++) { + struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); if (IS_ERR(rep)) { @@ -981,7 +960,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(rep); goto out; } - buf->rb_recv_bufs[i] = rep; + list_add(&rep->rr_list, &buf->rb_recv_bufs); } return 0; @@ -990,6 +969,28 @@ out: return rc; } +static struct rpcrdma_req * +rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_req *req; + + req = list_first_entry(&buf->rb_send_bufs, + struct rpcrdma_req, rl_free); + list_del(&req->rl_free); + return req; +} + +static struct rpcrdma_rep * +rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + rep = list_first_entry(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + list_del(&rep->rr_list); + return rep; +} + static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { @@ -1015,25 +1016,22 @@ void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); - int i; - /* clean up in reverse order from create - * 1. recv mr memory (mr free, then kfree) - * 2. send mr memory (mr free, then kfree) - * 3. MWs - */ - dprintk("RPC: %s: entering\n", __func__); + while (!list_empty(&buf->rb_recv_bufs)) { + struct rpcrdma_rep *rep; - for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs) - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); - if (buf->rb_send_bufs) - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); + rep = rpcrdma_buffer_get_rep_locked(buf); + rpcrdma_destroy_rep(ia, rep); } - ia->ri_ops->ro_destroy(buf); + while (!list_empty(&buf->rb_send_bufs)) { + struct rpcrdma_req *req; - kfree(buf->rb_pool); + req = rpcrdma_buffer_get_req_locked(buf); + rpcrdma_destroy_req(ia, req); + } + + ia->ri_ops->ro_destroy(buf); } struct rpcrdma_mw * @@ -1065,25 +1063,10 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) spin_unlock(&buf->rb_mwlock); } -static void -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) -{ - buf->rb_send_bufs[--buf->rb_send_index] = req; - req->rl_niovs = 0; - if (req->rl_reply) { - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; - req->rl_reply = NULL; - } -} - /* * Get a set of request/reply buffers. * - * Reply buffer (if needed) is attached to send buffer upon return. - * Rule: - * rb_send_index and rb_recv_index MUST always be pointing to the - * *next* available buffer (non-NULL). They are incremented after - * removing buffers, and decremented *before* returning them. + * Reply buffer (if available) is attached to send buffer upon return. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) @@ -1092,26 +1075,23 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); + if (list_empty(&buffers->rb_send_bufs)) + goto out_reqbuf; + req = rpcrdma_buffer_get_req_locked(buffers); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_repbuf; + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + return req; - if (buffers->rb_send_index == buffers->rb_max_requests) { - spin_unlock_irqrestore(&buffers->rb_lock, flags); - dprintk("RPC: %s: out of request buffers\n", __func__); - return ((struct rpcrdma_req *)NULL); - } - - req = buffers->rb_send_bufs[buffers->rb_send_index]; - if (buffers->rb_send_index < buffers->rb_recv_index) { - dprintk("RPC: %s: %d extra receives outstanding (ok)\n", - __func__, - buffers->rb_recv_index - buffers->rb_send_index); - req->rl_reply = NULL; - } else { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; - +out_reqbuf: + spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("RPC: %s: out of request buffers\n", __func__); + return NULL; +out_repbuf: spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("RPC: %s: out of reply buffers\n", __func__); + req->rl_reply = NULL; return req; } @@ -1123,17 +1103,22 @@ void rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; + struct rpcrdma_rep *rep = req->rl_reply; unsigned long flags; + req->rl_niovs = 0; + req->rl_reply = NULL; + spin_lock_irqsave(&buffers->rb_lock, flags); - rpcrdma_buffer_put_sendbuf(req, buffers); + list_add_tail(&req->rl_free, &buffers->rb_send_bufs); + if (rep) + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); spin_unlock_irqrestore(&buffers->rb_lock, flags); } /* * Recover reply buffers from pool. - * This happens when recovering from error conditions. - * Post-increment counter/array index. + * This happens when recovering from disconnect. */ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) @@ -1142,10 +1127,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_recv_index < buffers->rb_max_requests) { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } + if (!list_empty(&buffers->rb_recv_bufs)) + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); } @@ -1160,7 +1143,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); spin_unlock_irqrestore(&buffers->rb_lock, flags); } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a13508b..e6a358f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ #define RPCRDMA_MAX_IOVS (2) struct rpcrdma_req { + struct list_head rl_free; unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; @@ -285,12 +286,10 @@ struct rpcrdma_buffer { struct list_head rb_all; char *rb_pool; - spinlock_t rb_lock; /* protect buf arrays */ + spinlock_t rb_lock; /* protect buf lists */ + struct list_head rb_send_bufs; + struct list_head rb_recv_bufs; u32 rb_max_requests; - int rb_send_index; - int rb_recv_index; - struct rpcrdma_req **rb_send_bufs; - struct rpcrdma_rep **rb_recv_bufs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) -- cgit v0.10.2 From fe97b47cd623ebbaa55a163c336abc47153526d1 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:10 -0400 Subject: xprtrdma: Use workqueue to process RPC/RDMA replies The reply tasklet is fast, but it's single threaded. After reply traffic saturates a single CPU, there's no more reply processing capacity. Replace the tasklet with a workqueue to spread reply handling across all CPUs. This also moves RPC/RDMA reply handling out of the soft IRQ context and into a context that allows sleeps. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 60ffa63..95774fc 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) schedule_delayed_work(&ep->rep_connect_worker, 0); } -/* - * Called as a tasklet to do req/reply match and complete a request +/* Process received RPC/RDMA messages. + * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. */ @@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (headerp->rm_vers != rpcrdma_version) goto out_badversion; - /* Get XID and try for a match. */ - spin_lock(&xprt->transport_lock); + /* Match incoming rpcrdma_rep to an rpcrdma_req to + * get context for handling any incoming chunks. + */ + spin_lock_bh(&xprt->transport_lock); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); if (!rqst) goto out_nomatch; - /* get request object */ req = rpcr_to_rdmar(rqst); if (req->rl_reply) goto out_duplicate; @@ -859,7 +860,7 @@ badheader: xprt_release_rqst_cong(rqst->rq_task); xprt_complete_rqst(rqst->rq_task, status); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); return; @@ -882,14 +883,14 @@ out_badversion: goto repost; out_nomatch: - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); goto repost; out_duplicate: - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: " "duplicate reply %p to RPC request %p: xid 0x%08x\n", __func__, rep, req, be32_to_cpu(headerp->rm_xid)); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index e9e5ed7..897a2f3 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void) dprintk("RPC: %s: xprt_unregister returned %i\n", __func__, rc); + rpcrdma_destroy_wq(); frwr_destroy_recovery_wq(); } @@ -743,8 +744,15 @@ int xprt_rdma_init(void) if (rc) return rc; + rc = rpcrdma_alloc_wq(); + if (rc) { + frwr_destroy_recovery_wq(); + return rc; + } + rc = xprt_register_transport(&xprt_rdma); if (rc) { + rpcrdma_destroy_wq(); frwr_destroy_recovery_wq(); return rc; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c09f1b6..5c20629 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data) static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); +static struct workqueue_struct *rpcrdma_receive_wq; + +int +rpcrdma_alloc_wq(void) +{ + struct workqueue_struct *recv_wq; + + recv_wq = alloc_workqueue("xprtrdma_receive", + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, + 0); + if (!recv_wq) + return -ENOMEM; + + rpcrdma_receive_wq = recv_wq; + return 0; +} + +void +rpcrdma_destroy_wq(void) +{ + struct workqueue_struct *wq; + + if (rpcrdma_receive_wq) { + wq = rpcrdma_receive_wq; + rpcrdma_receive_wq = NULL; + destroy_workqueue(wq); + } +} + static void rpcrdma_schedule_tasklet(struct list_head *sched_list) { @@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) } static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) +rpcrdma_receive_worker(struct work_struct *work) +{ + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); + + rpcrdma_reply_handler(rep); +} + +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc) { struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; @@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: - list_add_tail(&rep->rr_list, sched_list); + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; + out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", @@ -239,7 +278,6 @@ static void rpcrdma_recvcq_poll(struct ib_cq *cq) { struct ib_wc *pos, wcs[4]; - LIST_HEAD(sched_list); int count, rc; do { @@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq) count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++, &sched_list); + rpcrdma_recvcq_process_wc(pos++); } while (rc == ARRAY_SIZE(wcs)); - - rpcrdma_schedule_tasklet(&sched_list); } /* Handle provider receive completion upcalls. @@ -272,12 +308,9 @@ static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; - LIST_HEAD(sched_list); while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc, &sched_list); - if (!list_empty(&sched_list)) - rpcrdma_schedule_tasklet(&sched_list); + rpcrdma_recvcq_process_wc(&wc); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) rpcrdma_sendcq_process_wc(&wc); } @@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_device = ia->ri_device; rep->rr_rxprt = r_xprt; + INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; out_free: diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e6a358f..6ea1dbe 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -164,6 +164,7 @@ struct rpcrdma_rep { unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; + struct work_struct rr_work; struct list_head rr_list; struct rpcrdma_regbuf *rr_rdmabuf; }; @@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); +int rpcrdma_alloc_wq(void); +void rpcrdma_destroy_wq(void); + /* * Wrappers for chunk registration, shared by read/write chunk code. */ -- cgit v0.10.2 From 2da9ab3008f359857eb594b0b4b0fee62f2a73c2 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:18 -0400 Subject: xprtrdma: Remove reply tasklet Clean up: The reply tasklet is no longer used. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 5c20629..3dd5a7c 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -68,38 +68,6 @@ * internal functions */ -/* - * handle replies in tasklet context, using a single, global list - * rdma tasklet function -- just turn around and call the func - * for all replies on the list - */ - -static DEFINE_SPINLOCK(rpcrdma_tk_lock_g); -static LIST_HEAD(rpcrdma_tasklets_g); - -static void -rpcrdma_run_tasklet(unsigned long data) -{ - struct rpcrdma_rep *rep; - unsigned long flags; - - data = data; - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - while (!list_empty(&rpcrdma_tasklets_g)) { - rep = list_entry(rpcrdma_tasklets_g.next, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - - rpcrdma_reply_handler(rep); - - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - } - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); -} - -static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); - static struct workqueue_struct *rpcrdma_receive_wq; int @@ -130,17 +98,6 @@ rpcrdma_destroy_wq(void) } static void -rpcrdma_schedule_tasklet(struct list_head *sched_list) -{ - unsigned long flags; - - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - list_splice_tail(sched_list, &rpcrdma_tasklets_g); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - tasklet_schedule(&rpcrdma_tasklet_g); -} - -static void rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) { struct rpcrdma_ep *ep = context; -- cgit v0.10.2 From a5b027e1897c811401862877d0ba4ca26fabc4da Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:27 -0400 Subject: xprtrdma: Saving IRQs no longer needed for rb_lock Now that RPC replies are processed in a workqueue, there's no need to disable IRQs when managing send and receive buffers. This saves noticeable overhead per RPC. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 3dd5a7c..baa0523 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1063,24 +1063,23 @@ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) { struct rpcrdma_req *req; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); if (list_empty(&buffers->rb_send_bufs)) goto out_reqbuf; req = rpcrdma_buffer_get_req_locked(buffers); if (list_empty(&buffers->rb_recv_bufs)) goto out_repbuf; req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); return req; out_reqbuf: - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); pr_warn("RPC: %s: out of request buffers\n", __func__); return NULL; out_repbuf: - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); pr_warn("RPC: %s: out of reply buffers\n", __func__); req->rl_reply = NULL; return req; @@ -1095,16 +1094,15 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; - unsigned long flags; req->rl_niovs = 0; req->rl_reply = NULL; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); list_add_tail(&req->rl_free, &buffers->rb_send_bufs); if (rep) list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* @@ -1115,12 +1113,11 @@ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); if (!list_empty(&buffers->rb_recv_bufs)) req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* @@ -1131,11 +1128,10 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* -- cgit v0.10.2 From 42e5c3e2725ba0c0affc1fc8a6aa1d5cf31ecb75 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:35 -0400 Subject: SUNRPC: Abstract backchannel operations xprt_{setup,destroy}_backchannel() won't be adequate for RPC/RMDA bi-direction. In particular, receive buffers have to be pre- registered and posted in order to receive incoming backchannel requests. Add a virtual function call to allow the insertion of appropriate backchannel setup and destruction methods for each transport. In addition, freeing a backchannel request is a little different for RPC/RDMA. Introduce an rpc_xprt_op to handle the difference. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h index 8df43c9f..4397a48 100644 --- a/include/linux/sunrpc/bc_xprt.h +++ b/include/linux/sunrpc/bc_xprt.h @@ -38,6 +38,11 @@ void xprt_free_bc_request(struct rpc_rqst *req); int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs); void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs); +/* Socket backchannel transport methods */ +int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs); +void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs); +void xprt_free_bc_rqst(struct rpc_rqst *req); + /* * Determine if a shared backchannel is in use */ diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 0fb9acb..3f79c4a 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -136,6 +136,11 @@ struct rpc_xprt_ops { int (*enable_swap)(struct rpc_xprt *xprt); void (*disable_swap)(struct rpc_xprt *xprt); void (*inject_disconnect)(struct rpc_xprt *xprt); + int (*bc_setup)(struct rpc_xprt *xprt, + unsigned int min_reqs); + void (*bc_free_rqst)(struct rpc_rqst *rqst); + void (*bc_destroy)(struct rpc_xprt *xprt, + unsigned int max_reqs); }; /* diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 6255d14..229956b 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -138,6 +138,14 @@ out_free: */ int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) { + if (!xprt->ops->bc_setup) + return 0; + return xprt->ops->bc_setup(xprt, min_reqs); +} +EXPORT_SYMBOL_GPL(xprt_setup_backchannel); + +int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) +{ struct rpc_rqst *req; struct list_head tmp_list; int i; @@ -192,7 +200,6 @@ out_free: dprintk("RPC: setup backchannel transport failed\n"); return -ENOMEM; } -EXPORT_SYMBOL_GPL(xprt_setup_backchannel); /** * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. @@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel); */ void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) { + if (xprt->ops->bc_destroy) + xprt->ops->bc_destroy(xprt, max_reqs); +} +EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); + +void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) +{ struct rpc_rqst *req = NULL, *tmp = NULL; dprintk("RPC: destroy backchannel transport\n"); @@ -227,7 +241,6 @@ out: dprintk("RPC: backchannel list empty= %s\n", list_empty(&xprt->bc_pa_list) ? "true" : "false"); } -EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) { @@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; + xprt->ops->bc_free_rqst(req); +} + +void xprt_free_bc_rqst(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + dprintk("RPC: free backchannel req=%p\n", req); req->rq_connect_cookie = xprt->connect_cookie - 1; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0e..44a81e4 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2580,6 +2580,11 @@ static struct rpc_xprt_ops xs_tcp_ops = { .enable_swap = xs_enable_swap, .disable_swap = xs_disable_swap, .inject_disconnect = xs_inject_disconnect, +#ifdef CONFIG_SUNRPC_BACKCHANNEL + .bc_setup = xprt_setup_bc, + .bc_free_rqst = xprt_free_bc_rqst, + .bc_destroy = xprt_destroy_bc, +#endif }; /* -- cgit v0.10.2 From f531a5dbc451afb66e9d6c71a69e8358d1847969 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:43 -0400 Subject: xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers xprtrdma's backward direction send and receive buffers are the same size as the forechannel's inline threshold, and must be pre- registered. The consumer has no control over which receive buffer the adapter chooses to catch an incoming backwards-direction call. Any receive buffer can be used for either a forward reply or a backward call. Thus both types of RPC message must all be the same size. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 48913de..33f99d3 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ svc_rdma.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c new file mode 100644 index 0000000..3d01b32 --- /dev/null +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2015 Oracle. All rights reserved. + * + * Support for backward direction RPCs on RPC/RDMA. + */ + +#include + +#include "xprt_rdma.h" + +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) +# define RPCDBG_FACILITY RPCDBG_TRANS +#endif + +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + spin_lock(&buf->rb_reqslock); + list_del(&req->rl_all); + spin_unlock(&buf->rb_reqslock); + + rpcrdma_destroy_req(&r_xprt->rx_ia, req); + + kfree(rqst); +} + +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + struct xdr_buf *buf; + size_t size; + + req = rpcrdma_create_req(r_xprt); + if (!req) + return -ENOMEM; + req->rl_backchannel = true; + + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + req->rl_sendbuf = rb; + /* so that rpcr_to_rdmar works when receiving a request */ + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; + + buf = &rqst->rq_snd_buf; + buf->head[0].iov_base = rqst->rq_buffer; + buf->head[0].iov_len = 0; + buf->tail[0].iov_base = NULL; + buf->tail[0].iov_len = 0; + buf->page_len = 0; + buf->len = 0; + buf->buflen = size; + + return 0; + +out_fail: + rpcrdma_bc_free_rqst(r_xprt, rqst); + return -ENOMEM; +} + +/* Allocate and add receive buffers to the rpcrdma_buffer's + * existing list of rep's. These are released when the + * transport is destroyed. + */ +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, + unsigned int count) +{ + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc = 0; + + while (count--) { + rep = rpcrdma_create_rep(r_xprt); + if (IS_ERR(rep)) { + pr_err("RPC: %s: reply buffer alloc failed\n", + __func__); + rc = PTR_ERR(rep); + break; + } + + spin_lock_irqsave(&buffers->rb_lock, flags); + list_add(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + } + + return rc; +} + +/** + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of concurrent incoming requests to expect + * + * Returns 0 on success; otherwise a negative errno + */ +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; + struct rpc_rqst *rqst; + unsigned int i; + int rc; + + /* The backchannel reply path returns each rpc_rqst to the + * bc_pa_list _after_ the reply is sent. If the server is + * faster than the client, it can send another backward + * direction request before the rpc_rqst is returned to the + * list. The client rejects the request in this case. + * + * Twice as many rpc_rqsts are prepared to ensure there is + * always an rpc_rqst available as soon as a reply is sent. + */ + for (i = 0; i < (reqs << 1); i++) { + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); + if (!rqst) { + pr_err("RPC: %s: Failed to create bc rpc_rqst\n", + __func__); + goto out_free; + } + + rqst->rq_xprt = &r_xprt->rx_xprt; + INIT_LIST_HEAD(&rqst->rq_list); + INIT_LIST_HEAD(&rqst->rq_bc_list); + + if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) + goto out_free; + + spin_lock_bh(&xprt->bc_pa_lock); + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + } + + rc = rpcrdma_bc_setup_reps(r_xprt, reqs); + if (rc) + goto out_free; + + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); + if (rc) + goto out_free; + + buffer->rb_bc_srv_max_requests = reqs; + request_module("svcrdma"); + + return 0; + +out_free: + xprt_rdma_bc_destroy(xprt, reqs); + + pr_err("RPC: %s: setup backchannel transport failed\n", __func__); + return -ENOMEM; +} + +/** + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of incoming requests to destroy; ignored + */ +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpc_rqst *rqst, *tmp; + + spin_lock_bh(&xprt->bc_pa_lock); + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { + list_del(&rqst->rq_bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + + rpcrdma_bc_free_rqst(r_xprt, rqst); + + spin_lock_bh(&xprt->bc_pa_lock); + } + spin_unlock_bh(&xprt->bc_pa_lock); +} + +/** + * xprt_rdma_bc_free_rqst - Release a backchannel rqst + * @rqst: request to release + */ +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + + smp_mb__before_atomic(); + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + smp_mb__after_atomic(); + + spin_lock_bh(&xprt->bc_pa_lock); + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); +} diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 897a2f3..845278e 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, - .inject_disconnect = xprt_rdma_inject_disconnect + .inject_disconnect = xprt_rdma_inject_disconnect, +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + .bc_setup = xprt_rdma_bc_setup, + .bc_free_rqst = xprt_rdma_bc_free_rqst, + .bc_destroy = xprt_rdma_bc_destroy, +#endif }; static struct xprt_class xprt_rdma = { diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index baa0523..7f0ed30 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -831,7 +831,21 @@ retry: } rc = ep->rep_connected; } else { + struct rpcrdma_xprt *r_xprt; + unsigned int extras; + dprintk("RPC: %s: connected\n", __func__); + + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; + + if (extras) { + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); + if (rc) + pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", + __func__, rc); + rc = 0; + } } out: @@ -868,20 +882,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } } -static struct rpcrdma_req * +struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), GFP_KERNEL); if (req == NULL) return ERR_PTR(-ENOMEM); + INIT_LIST_HEAD(&req->rl_free); + spin_lock(&buffer->rb_reqslock); + list_add(&req->rl_all, &buffer->rb_allreqs); + spin_unlock(&buffer->rb_reqslock); req->rl_buffer = &r_xprt->rx_buf; return req; } -static struct rpcrdma_rep * +struct rpcrdma_rep * rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; @@ -920,6 +939,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) int i, rc; buf->rb_max_requests = r_xprt->rx_data.max_requests; + buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); rc = ia->ri_ops->ro_init(r_xprt); @@ -927,6 +947,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) goto out; INIT_LIST_HEAD(&buf->rb_send_bufs); + INIT_LIST_HEAD(&buf->rb_allreqs); + spin_lock_init(&buf->rb_reqslock); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; @@ -937,6 +959,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } + req->rl_backchannel = false; list_add(&req->rl_free, &buf->rb_send_bufs); } @@ -985,19 +1008,13 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { - if (!rep) - return; - rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); kfree(rep); } -static void +void rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) { - if (!req) - return; - rpcrdma_free_regbuf(ia, req->rl_sendbuf); rpcrdma_free_regbuf(ia, req->rl_rdmabuf); kfree(req); @@ -1015,12 +1032,19 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) rpcrdma_destroy_rep(ia, rep); } - while (!list_empty(&buf->rb_send_bufs)) { + spin_lock(&buf->rb_reqslock); + while (!list_empty(&buf->rb_allreqs)) { struct rpcrdma_req *req; - req = rpcrdma_buffer_get_req_locked(buf); + req = list_first_entry(&buf->rb_allreqs, + struct rpcrdma_req, rl_all); + list_del(&req->rl_all); + + spin_unlock(&buf->rb_reqslock); rpcrdma_destroy_req(ia, req); + spin_lock(&buf->rb_reqslock); } + spin_unlock(&buf->rb_reqslock); ia->ri_ops->ro_destroy(buf); } @@ -1288,6 +1312,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, return rc; } +/** + * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests + * @r_xprt: transport associated with these backchannel resources + * @min_reqs: minimum number of incoming requests expected + * + * Returns zero if all requested buffers were posted, or a negative errno. + */ +int +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) +{ + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc; + + while (count--) { + spin_lock_irqsave(&buffers->rb_lock, flags); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_reqbuf; + rep = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + + rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (rc) + goto out_rc; + } + + return 0; + +out_reqbuf: + spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("%s: no extra receive buffers\n", __func__); + return -ENOMEM; + +out_rc: + rpcrdma_recv_buffer_put(rep); + return rc; +} + /* How many chunk list items fit within our inline buffers? */ unsigned int diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 6ea1dbe..1eb86c79f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -263,6 +263,9 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + + struct list_head rl_all; + bool rl_backchannel; }; static inline struct rpcrdma_req * @@ -291,6 +294,10 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + + u32 rb_bc_srv_max_requests; + spinlock_t rb_reqslock; /* protect rb_allreqs */ + struct list_head rb_allreqs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -411,6 +418,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, /* * Buffer calls - xprtrdma/verbs.c */ +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@ -427,6 +437,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); @@ -494,6 +505,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *); int xprt_rdma_init(void); void xprt_rdma_cleanup(void); +/* Backchannel calls - xprtrdma/backchannel.c + */ +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void xprt_rdma_bc_free_rqst(struct rpc_rqst *); +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* Temporary NFS request map cache. Created in svc_rdma.c */ extern struct kmem_cache *svc_rdma_map_cachep; /* WR context cache. Created in svc_rdma.c */ -- cgit v0.10.2 From 124fa17d3e33060fbb28e995a42c7f5c8b31b345 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:51 -0400 Subject: xprtrdma: Pre-allocate Work Requests for backchannel Pre-allocate extra send and receive Work Requests needed to handle backchannel receives and sends. The transport doesn't know how many extra WRs to pre-allocate until the xprt_setup_backchannel() call, but that's long after the WRs are allocated during forechannel setup. So, use a fixed value for now. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 3d01b32..3165ed6 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -125,6 +125,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) * Twice as many rpc_rqsts are prepared to ensure there is * always an rpc_rqst available as soon as a reply is sent. */ + if (reqs > RPCRDMA_BACKWARD_WRS >> 1) + goto out_err; + for (i = 0; i < (reqs << 1); i++) { rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); if (!rqst) { @@ -161,6 +164,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) out_free: xprt_rdma_bc_destroy(xprt, reqs); +out_err: pr_err("RPC: %s: setup backchannel transport failed\n", __func__); return -ENOMEM; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 7f0ed30..93883ff 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -568,6 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; struct ib_cq_init_attr cq_attr = {}; + unsigned int max_qp_wr; int rc, err; if (devattr->max_sge < RPCRDMA_MAX_IOVS) { @@ -576,18 +577,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return -ENOMEM; } + if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) { + dprintk("RPC: %s: insufficient wqe's available\n", + __func__); + return -ENOMEM; + } + max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS; + /* check provider's send/recv wr limits */ - if (cdata->max_requests > devattr->max_qp_wr) - cdata->max_requests = devattr->max_qp_wr; + if (cdata->max_requests > max_qp_wr) + cdata->max_requests = max_qp_wr; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 1eb86c79f..55d2660 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -101,6 +101,16 @@ struct rpcrdma_ep { */ #define RPCRDMA_IGNORE_COMPLETION (0ULL) +/* Pre-allocate extra Work Requests for handling backward receives + * and sends. This is a fixed value because the Work Queues are + * allocated when the forward channel is set up. + */ +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +#define RPCRDMA_BACKWARD_WRS (8) +#else +#define RPCRDMA_BACKWARD_WRS (0) +#endif + /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV * * The below structure appears at the front of a large region of kmalloc'd -- cgit v0.10.2 From 83128a60ca74e996c5e0336c4fff0579f4a8c909 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:59 -0400 Subject: xprtrdma: Add support for sending backward direction RPC replies Backward direction RPC replies are sent via the client transport's send_request method, the same way forward direction RPC calls are sent. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 3165ed6..ffc4853 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -170,6 +170,51 @@ out_err: } /** + * rpcrdma_bc_marshal_reply - Send backwards direction reply + * @rqst: buffer containing RPC reply data + * + * Returns zero on success. + */ +int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct rpcrdma_msg *headerp; + size_t rpclen; + + headerp = rdmab_to_msg(req->rl_rdmabuf); + headerp->rm_xid = rqst->rq_xid; + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = + cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); + headerp->rm_type = rdma_msg; + headerp->rm_body.rm_chunks[0] = xdr_zero; + headerp->rm_body.rm_chunks[1] = xdr_zero; + headerp->rm_body.rm_chunks[2] = xdr_zero; + + rpclen = rqst->rq_svec[0].iov_len; + + pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n", + __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf)); + pr_info("RPC: %s: RPC/RDMA: %*ph\n", + __func__, (int)RPCRDMA_HDRLEN_MIN, headerp); + pr_info("RPC: %s: RPC: %*ph\n", + __func__, (int)rpclen, rqst->rq_svec[0].iov_base); + + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); + req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); + + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); + req->rl_send_iov[1].length = rpclen; + req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); + + req->rl_niovs = 2; + return 0; +} + +/** * xprt_rdma_bc_destroy - Release resources for handling backchannel requests * @xprt: transport associated with these backchannel resources * @reqs: number of incoming requests to destroy; ignored diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 95774fc..b7a21e5 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) + return rpcrdma_bc_marshal_reply(rqst); +#endif + /* * rpclen gets amount of data in first buffer, which is the * pre-registered buffer. diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 55d2660..e2d23ea 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -520,6 +520,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -- cgit v0.10.2 From 63cae47005af51c937f4cdcc4835f29075add2ba Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:08 -0400 Subject: xprtrdma: Handle incoming backward direction RPC calls Introduce a code path in the rpcrdma_reply_handler() to catch incoming backward direction RPC calls and route them to the ULP's backchannel server. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index ffc4853..0b3387f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -5,6 +5,8 @@ */ #include +#include +#include #include "xprt_rdma.h" @@ -12,6 +14,8 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif +#define RPCRDMA_BACKCHANNEL_DEBUG + static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { @@ -253,3 +257,117 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock_bh(&xprt->bc_pa_lock); } + +/** + * rpcrdma_bc_receive_call - Handle a backward direction call + * @xprt: transport receiving the call + * @rep: receive buffer containing the call + * + * Called in the RPC reply handler, which runs in a tasklet. + * Be quick about it. + * + * Operational assumptions: + * o Backchannel credits are ignored, just as the NFS server + * forechannel currently does + * o The ULP manages a replay cache (eg, NFSv4.1 sessions). + * No replay detection is done at the transport level + */ +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpcrdma_msg *headerp; + struct svc_serv *bc_serv; + struct rpcrdma_req *req; + struct rpc_rqst *rqst; + struct xdr_buf *buf; + size_t size; + __be32 *p; + + headerp = rdmab_to_msg(rep->rr_rdmabuf); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: callback XID %08x, length=%u\n", + __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); + pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); +#endif + + /* Sanity check: + * Need at least enough bytes for RPC/RDMA header, as code + * here references the header fields by array offset. Also, + * backward calls are always inline, so ensure there + * are some bytes beyond the RPC/RDMA header. + */ + if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24) + goto out_short; + p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); + size = rep->rr_len - RPCRDMA_HDRLEN_MIN; + + /* Grab a free bc rqst */ + spin_lock(&xprt->bc_pa_lock); + if (list_empty(&xprt->bc_pa_list)) { + spin_unlock(&xprt->bc_pa_lock); + goto out_overflow; + } + rqst = list_first_entry(&xprt->bc_pa_list, + struct rpc_rqst, rq_bc_pa_list); + list_del(&rqst->rq_bc_pa_list); + spin_unlock(&xprt->bc_pa_lock); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: using rqst %p\n", __func__, rqst); +#endif + + /* Prepare rqst */ + rqst->rq_reply_bytes_recvd = 0; + rqst->rq_bytes_sent = 0; + rqst->rq_xid = headerp->rm_xid; + set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + + buf = &rqst->rq_rcv_buf; + memset(buf, 0, sizeof(*buf)); + buf->head[0].iov_base = p; + buf->head[0].iov_len = size; + buf->len = size; + + /* The receive buffer has to be hooked to the rpcrdma_req + * so that it can be reposted after the server is done + * parsing it but just before sending the backward + * direction reply. + */ + req = rpcr_to_rdmar(rqst); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: attaching rep %p to req %p\n", + __func__, rep, req); +#endif + req->rl_reply = rep; + + /* Defeat the retransmit detection logic in send_request */ + req->rl_connect_cookie = 0; + + /* Queue rqst for ULP's callback service */ + bc_serv = xprt->bc_serv; + spin_lock(&bc_serv->sv_cb_lock); + list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + + wake_up(&bc_serv->sv_cb_waitq); + + r_xprt->rx_stats.bcall_count++; + return; + +out_overflow: + pr_warn("RPC/RDMA backchannel overflow\n"); + xprt_disconnect_done(xprt); + /* This receive buffer gets reposted automatically + * when the connection is re-established. + */ + return; + +out_short: + pr_warn("RPC/RDMA short backward direction call\n"); + + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + xprt_disconnect_done(xprt); + else + pr_warn("RPC: %s: reposting rep %p\n", + __func__, rep); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index b7a21e5..c10d969 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work) spin_unlock_bh(&xprt->transport_lock); } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. + */ +static bool +rpcrdma_is_bcall(struct rpcrdma_msg *headerp) +{ + __be32 *p = (__be32 *)headerp; + + if (headerp->rm_type != rdma_msg) + return false; + if (headerp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != headerp->rm_xid) + return false; + /* call direction */ + if (p[8] != cpu_to_be32(RPC_CALL)) + return false; + + return true; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* * This function is called when an async event is posted to * the connection which changes the connection state. All it @@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) headerp = rdmab_to_msg(rep->rr_rdmabuf); if (headerp->rm_vers != rpcrdma_version) goto out_badversion; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (rpcrdma_is_bcall(headerp)) + goto out_bcall; +#endif /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. @@ -878,6 +913,12 @@ out_badstatus: } return; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +out_bcall: + rpcrdma_bc_receive_call(r_xprt, rep); + return; +#endif + out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e2d23ea..eb87d96e8 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -353,6 +353,7 @@ struct rpcrdma_stats { unsigned long failed_marshal_count; unsigned long bad_reply_count; unsigned long nomsg_call_count; + unsigned long bcall_count; }; /* @@ -520,6 +521,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); -- cgit v0.10.2 From 9468431962616c2449d47c482208a5967e011bf9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:16 -0400 Subject: svcrdma: Add backward direction service for RPC/RDMA transport On NFSv4.1 mount points, the Linux NFS client uses this transport endpoint to receive backward direction calls and route replies back to the NFSv4.1 server. Signed-off-by: Chuck Lever Acked-by: "J. Bruce Fields" Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 7ccc961..fb4013e 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -228,9 +228,13 @@ extern void svc_rdma_put_frmr(struct svcxprt_rdma *, struct svc_rdma_fastreg_mr *); extern void svc_sq_reap(struct svcxprt_rdma *); extern void svc_rq_reap(struct svcxprt_rdma *); -extern struct svc_xprt_class svc_rdma_class; extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); +extern struct svc_xprt_class svc_rdma_class; +#ifdef CONFIG_SUNRPC_BACKCHANNEL +extern struct svc_xprt_class svc_rdma_bc_class; +#endif + /* svc_rdma.c */ extern int svc_rdma_init(void); extern void svc_rdma_cleanup(void); diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 3f79c4a..82c0839 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -158,6 +158,7 @@ enum xprt_transports { XPRT_TRANSPORT_TCP = IPPROTO_TCP, XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC, XPRT_TRANSPORT_RDMA = 256, + XPRT_TRANSPORT_BC_RDMA = XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC, XPRT_TRANSPORT_LOCAL = 257, }; diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index 2cd252f..1b7051b 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -239,6 +239,9 @@ void svc_rdma_cleanup(void) unregister_sysctl_table(svcrdma_table_header); svcrdma_table_header = NULL; } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + svc_unreg_xprt_class(&svc_rdma_bc_class); +#endif svc_unreg_xprt_class(&svc_rdma_class); kmem_cache_destroy(svc_rdma_map_cachep); kmem_cache_destroy(svc_rdma_ctxt_cachep); @@ -286,6 +289,9 @@ int svc_rdma_init(void) /* Register RDMA with the SVC transport switch */ svc_reg_xprt_class(&svc_rdma_class); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + svc_reg_xprt_class(&svc_rdma_bc_class); +#endif return 0; err1: kmem_cache_destroy(svc_rdma_map_cachep); diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index fcc3eb8..a133b1e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -56,6 +56,7 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, struct net *net, struct sockaddr *sa, int salen, @@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = { .xcl_ident = XPRT_TRANSPORT_RDMA, }; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, + struct sockaddr *, int, int); +static void svc_rdma_bc_detach(struct svc_xprt *); +static void svc_rdma_bc_free(struct svc_xprt *); + +static struct svc_xprt_ops svc_rdma_bc_ops = { + .xpo_create = svc_rdma_bc_create, + .xpo_detach = svc_rdma_bc_detach, + .xpo_free = svc_rdma_bc_free, + .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, + .xpo_secure_port = svc_rdma_secure_port, +}; + +struct svc_xprt_class svc_rdma_bc_class = { + .xcl_name = "rdma-bc", + .xcl_owner = THIS_MODULE, + .xcl_ops = &svc_rdma_bc_ops, + .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) +}; + +static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, + struct net *net, + struct sockaddr *sa, int salen, + int flags) +{ + struct svcxprt_rdma *cma_xprt; + struct svc_xprt *xprt; + + cma_xprt = rdma_create_xprt(serv, 0); + if (!cma_xprt) + return ERR_PTR(-ENOMEM); + xprt = &cma_xprt->sc_xprt; + + svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); + serv->sv_bc_xprt = xprt; + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + return xprt; +} + +static void svc_rdma_bc_detach(struct svc_xprt *xprt) +{ + dprintk("svcrdma: %s(%p)\n", __func__, xprt); +} + +static void svc_rdma_bc_free(struct svc_xprt *xprt) +{ + struct svcxprt_rdma *rdma = + container_of(xprt, struct svcxprt_rdma, sc_xprt); + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + if (xprt) + kfree(rdma); +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) { struct svc_rdma_op_ctxt *ctxt; -- cgit v0.10.2 From 0f2e3bdab6590a5d8900e7d701e21ac9af19924c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:24 -0400 Subject: SUNRPC: Remove the TCP-only restriction in bc_svc_process() Allow the use of other transport classes when handling a backward direction RPC call. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index a8f579d..bc5b7b5 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, /* reset result send buffer "put" position */ resv->iov_len = 0; - if (rqstp->rq_prot != IPPROTO_TCP) { - printk(KERN_ERR "No support for Non-TCP transports!\n"); - BUG(); - } - /* * Skip the next two words because they've already been * processed in the transport -- cgit v0.10.2 From 135444126a1175912b43366f6109cb297018f034 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 2 Nov 2015 10:11:17 -0500 Subject: pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors If the pNFS/flexfiles file is mirrored, and a read to one mirror fails, then we should bump the mirror index, so that we retry to a different mirror. Once we've iterated through all mirrors and all failed, we can return the layout and issue a new LAYOUTGET. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c index fbc5a56..7fc14b9 100644 --- a/fs/nfs/flexfilelayout/flexfilelayout.c +++ b/fs/nfs/flexfilelayout/flexfilelayout.c @@ -741,17 +741,17 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg, } static struct nfs4_pnfs_ds * -ff_layout_choose_best_ds_for_read(struct nfs_pageio_descriptor *pgio, +ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg, + int start_idx, int *best_idx) { - struct nfs4_ff_layout_segment *fls; + struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg); struct nfs4_pnfs_ds *ds; int idx; - fls = FF_LAYOUT_LSEG(pgio->pg_lseg); /* mirrors are sorted by efficiency */ - for (idx = 0; idx < fls->mirror_array_cnt; idx++) { - ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, idx, false); + for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) { + ds = nfs4_ff_layout_prepare_ds(lseg, idx, false); if (ds) { *best_idx = idx; return ds; @@ -782,7 +782,7 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio, if (pgio->pg_lseg == NULL) goto out_mds; - ds = ff_layout_choose_best_ds_for_read(pgio, &ds_idx); + ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx); if (!ds) goto out_mds; mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx); @@ -1171,6 +1171,10 @@ static int ff_layout_read_done_cb(struct rpc_task *task, switch (err) { case -NFS4ERR_RESET_TO_PNFS: + if (ff_layout_choose_best_ds_for_read(hdr->lseg, + hdr->pgio_mirror_idx + 1, + &hdr->pgio_mirror_idx)) + goto out_eagain; set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE, &hdr->lseg->pls_layout->plh_flags); pnfs_read_resend_pnfs(hdr); @@ -1179,11 +1183,13 @@ static int ff_layout_read_done_cb(struct rpc_task *task, ff_layout_reset_read(hdr); return task->tk_status; case -EAGAIN: - rpc_restart_call_prepare(task); - return -EAGAIN; + goto out_eagain; } return 0; +out_eagain: + rpc_restart_call_prepare(task); + return -EAGAIN; } static bool -- cgit v0.10.2 From 260074cd8413489903d4484058e61649d6e08580 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 2 Nov 2015 09:59:00 -0500 Subject: pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS For loosely coupled pNFS/flexfiles systems, there is often no advantage at all in going through the MDS for I/O, since the MDS is subject to the same limitations as all other clients when talking to DSes. If a DS is unresponsive, I/O through the MDS will fail. For such systems, the only scalable solution is to have the pNFS clients retry doing pNFS, and so the protocol now provides a flag that allows the pNFS server to signal this. If LAYOUTGET returns FF_FLAGS_NO_IO_THRU_MDS, then we should assume that the MDS wants the client to retry using these devices, even if they were previously marked as being unavailable. To do so, we add a helper, ff_layout_mark_devices_valid() that will be called from layoutget. Signed-off-by: Trond Myklebust diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c index 7fc14b9..03516c8 100644 --- a/fs/nfs/flexfilelayout/flexfilelayout.c +++ b/fs/nfs/flexfilelayout/flexfilelayout.c @@ -339,6 +339,19 @@ static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls) } } +static void ff_layout_mark_devices_valid(struct nfs4_ff_layout_segment *fls) +{ + struct nfs4_deviceid_node *node; + int i; + + if (!(fls->flags & FF_FLAGS_NO_IO_THRU_MDS)) + return; + for (i = 0; i < fls->mirror_array_cnt; i++) { + node = &fls->mirror_array[i]->mirror_ds->id_node; + clear_bit(NFS_DEVICEID_UNAVAILABLE, &node->flags); + } +} + static struct pnfs_layout_segment * ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, struct nfs4_layoutget_res *lgr, @@ -499,6 +512,7 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, rc = ff_layout_check_layout(lgr); if (rc) goto out_err_free; + ff_layout_mark_devices_valid(fls); ret = &fls->generic_hdr; dprintk("<-- %s (success)\n", __func__); @@ -1035,7 +1049,8 @@ static int ff_layout_async_handle_error_v4(struct rpc_task *task, rpc_wake_up(&tbl->slot_tbl_waitq); /* fall through */ default: - if (ff_layout_has_available_ds(lseg)) + if (ff_layout_no_fallback_to_mds(lseg) || + ff_layout_has_available_ds(lseg)) return -NFS4ERR_RESET_TO_PNFS; reset: dprintk("%s Retry through MDS. Error %d\n", __func__, @@ -1153,7 +1168,6 @@ static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg, } /* NFS_PROTO call done callback routines */ - static int ff_layout_read_done_cb(struct rpc_task *task, struct nfs_pgio_header *hdr) { diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h index 68cc0d9..2bb08bc 100644 --- a/fs/nfs/flexfilelayout/flexfilelayout.h +++ b/fs/nfs/flexfilelayout/flexfilelayout.h @@ -10,6 +10,7 @@ #define FS_NFS_NFS4FLEXFILELAYOUT_H #define FF_FLAGS_NO_LAYOUTCOMMIT 1 +#define FF_FLAGS_NO_IO_THRU_MDS 2 #include "../pnfs.h" @@ -146,6 +147,12 @@ FF_LAYOUT_MIRROR_COUNT(struct pnfs_layout_segment *lseg) } static inline bool +ff_layout_no_fallback_to_mds(struct pnfs_layout_segment *lseg) +{ + return FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_IO_THRU_MDS; +} + +static inline bool ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node) { return nfs4_test_deviceid_unavailable(node); -- cgit v0.10.2 From 76566773a1f1c2295ed901b6f1241cfe10d99029 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:32 -0400 Subject: NFS: Enable client side NFSv4.1 backchannel to use other transports Forechannel transports get their own "bc_up" method to create an endpoint for the backchannel service. Signed-off-by: Chuck Lever [Anna Schumaker: Add forward declaration of struct net to xprt.h] Signed-off-by: Anna Schumaker diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c index 75f7c0a..a7f2e6e 100644 --- a/fs/nfs/callback.c +++ b/fs/nfs/callback.c @@ -99,17 +99,6 @@ nfs4_callback_up(struct svc_serv *serv) } #if defined(CONFIG_NFS_V4_1) -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net) -{ - /* - * Create an svc_sock for the back channel service that shares the - * fore channel connection. - * Returns the input port (0) and sets the svc_serv bc_xprt on success - */ - return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, - SVC_SOCK_ANONYMOUS); -} - /* * The callback service for NFSv4.1 callbacks */ @@ -184,11 +173,6 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt, xprt->bc_serv = serv; } #else -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net) -{ - return 0; -} - static void nfs_minorversion_callback_svc_setup(struct svc_serv *serv, struct svc_rqst **rqstpp, int (**callback_svc)(void *vrqstp)) { @@ -259,7 +243,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc svc_shutdown_net(serv, net); } -static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net) +static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, + struct net *net, struct rpc_xprt *xprt) { struct nfs_net *nn = net_generic(net, nfs_net_id); int ret; @@ -275,20 +260,11 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n goto err_bind; } - switch (minorversion) { - case 0: - ret = nfs4_callback_up_net(serv, net); - break; - case 1: - case 2: - ret = nfs41_callback_up_net(serv, net); - break; - default: - printk(KERN_ERR "NFS: unknown callback version: %d\n", - minorversion); - ret = -EINVAL; - break; - } + ret = -EPROTONOSUPPORT; + if (minorversion == 0) + ret = nfs4_callback_up_net(serv, net); + else if (xprt->ops->bc_up) + ret = xprt->ops->bc_up(serv, net); if (ret < 0) { printk(KERN_ERR "NFS: callback service start failed\n"); @@ -364,7 +340,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt) goto err_create; } - ret = nfs_callback_up_net(minorversion, serv, net); + ret = nfs_callback_up_net(minorversion, serv, net, xprt); if (ret < 0) goto err_net; diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 82c0839..69ef5b3 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -54,6 +54,8 @@ enum rpc_display_format_t { struct rpc_task; struct rpc_xprt; struct seq_file; +struct svc_serv; +struct net; /* * This describes a complete RPC request @@ -138,6 +140,7 @@ struct rpc_xprt_ops { void (*inject_disconnect)(struct rpc_xprt *xprt); int (*bc_setup)(struct rpc_xprt *xprt, unsigned int min_reqs); + int (*bc_up)(struct svc_serv *serv, struct net *net); void (*bc_free_rqst)(struct rpc_rqst *rqst); void (*bc_destroy)(struct rpc_xprt *xprt, unsigned int max_reqs); diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 0b3387f..2dcb44f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "xprt_rdma.h" @@ -174,6 +175,26 @@ out_err: } /** + * xprt_rdma_bc_up - Create transport endpoint for backchannel service + * @serv: server endpoint + * @net: network namespace + * + * The "xprt" is an implied argument: it supplies the name of the + * backchannel transport class. + * + * Returns zero on success, negative errno on failure + */ +int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0); + if (ret < 0) + return ret; + return 0; +} + +/** * rpcrdma_bc_marshal_reply - Send backwards direction reply * @rqst: buffer containing RPC reply data * diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 845278e..8c545f7 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -708,6 +708,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .inject_disconnect = xprt_rdma_inject_disconnect, #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, + .bc_up = xprt_rdma_bc_up, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index eb87d96e8..f8dd17b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -520,6 +520,7 @@ void xprt_rdma_cleanup(void); */ #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); +int xprt_rdma_bc_up(struct svc_serv *, struct net *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 44a81e4..dc47067 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1306,6 +1306,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } + +static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, + SVC_SOCK_ANONYMOUS); + if (ret < 0) + return ret; + return 0; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -2582,6 +2593,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { .inject_disconnect = xs_inject_disconnect, #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, + .bc_up = xs_tcp_bc_up, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif -- cgit v0.10.2 From 7fc561362da38253522899ccab30c7955d4132df Mon Sep 17 00:00:00 2001 From: Andrzej Hajda Date: Thu, 24 Sep 2015 16:00:09 +0200 Subject: SUNRPC: fix variable type Due to incorrect len type bc_send_request returned always zero. The problem has been detected using proposed semantic patch scripts/coccinelle/tests/assign_signed_to_unsigned.cocci [1]. [1]: http://permalink.gmane.org/gmane.linux.kernel/2046107 Signed-off-by: Andrzej Hajda Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 94824ff..1d1a704 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2570,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; - u32 len; + int len; dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* -- cgit v0.10.2 From 118c9163562426face9ff0903f1cbd3e1a959ca9 Mon Sep 17 00:00:00 2001 From: Yaowei Bai Date: Mon, 28 Sep 2015 22:30:32 +0800 Subject: fs/nfs: remove unnecessary new_valid_dev check As new_valid_dev always returns 1, so !new_valid_dev check is not needed, remove it. Signed-off-by: Yaowei Bai Signed-off-by: Trond Myklebust diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c index 3d8e4ff..ce5a218 100644 --- a/fs/nfs/dir.c +++ b/fs/nfs/dir.c @@ -1714,9 +1714,6 @@ nfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t rdev) dfprintk(VFS, "NFS: mknod(%s/%lu), %pd\n", dir->i_sb->s_id, dir->i_ino, dentry); - if (!new_valid_dev(rdev)) - return -EINVAL; - attr.ia_mode = mode; attr.ia_valid = ATTR_MODE; -- cgit v0.10.2 From 8fbcf237439f841e7e9c4675790e08ea1c295bd3 Mon Sep 17 00:00:00 2001 From: Andreas Gruenbacher Date: Tue, 3 Nov 2015 18:25:34 +0100 Subject: nfs: Remove unused xdr page offsets in getacl/setacl arguments The arguments passed around for getacl and setacl xdr encoding, struct nfs_setaclargs and struct nfs_getaclargs, both contain an array of pages, an offset into the first page, and the length of the page data. The offset is unused as it is always zero; remove it. Signed-off-by: Andreas Gruenbacher Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 87a081c..7ed8f2c 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -4603,7 +4603,7 @@ static inline int nfs4_server_supports_acls(struct nfs_server *server) #define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE) static int buf_to_pages_noslab(const void *buf, size_t buflen, - struct page **pages, unsigned int *pgbase) + struct page **pages) { struct page *newpage, **spages; int rc = 0; @@ -4747,7 +4747,6 @@ static ssize_t __nfs4_get_acl_uncached(struct inode *inode, void *buf, size_t bu goto out_free; args.acl_len = npages * PAGE_SIZE; - args.acl_pgbase = 0; dprintk("%s buf %p buflen %zu npages %d args.acl_len %zu\n", __func__, buf, buflen, npages, args.acl_len); @@ -4839,7 +4838,7 @@ static int __nfs4_proc_set_acl(struct inode *inode, const void *buf, size_t bufl return -EOPNOTSUPP; if (npages > ARRAY_SIZE(pages)) return -ERANGE; - i = buf_to_pages_noslab(buf, buflen, arg.acl_pages, &arg.acl_pgbase); + i = buf_to_pages_noslab(buf, buflen, arg.acl_pages); if (i < 0) return i; nfs4_inode_return_delegation(inode); diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c index 9f65679..22a1ddd 100644 --- a/fs/nfs/nfs4xdr.c +++ b/fs/nfs/nfs4xdr.c @@ -1659,7 +1659,7 @@ encode_setacl(struct xdr_stream *xdr, struct nfs_setaclargs *arg, struct compoun *p = cpu_to_be32(FATTR4_WORD0_ACL); p = reserve_space(xdr, 4); *p = cpu_to_be32(arg->acl_len); - xdr_write_pages(xdr, arg->acl_pages, arg->acl_pgbase, arg->acl_len); + xdr_write_pages(xdr, arg->acl_pages, 0, arg->acl_len); } static void @@ -2491,7 +2491,7 @@ static void nfs4_xdr_enc_getacl(struct rpc_rqst *req, struct xdr_stream *xdr, encode_getattr_two(xdr, FATTR4_WORD0_ACL, 0, &hdr); xdr_inline_pages(&req->rq_rcv_buf, replen << 2, - args->acl_pages, args->acl_pgbase, args->acl_len); + args->acl_pages, 0, args->acl_len); encode_nops(&hdr); } diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h index 4728e7e..570d630 100644 --- a/include/linux/nfs_xdr.h +++ b/include/linux/nfs_xdr.h @@ -705,7 +705,6 @@ struct nfs_setaclargs { struct nfs4_sequence_args seq_args; struct nfs_fh * fh; size_t acl_len; - unsigned int acl_pgbase; struct page ** acl_pages; }; @@ -717,7 +716,6 @@ struct nfs_getaclargs { struct nfs4_sequence_args seq_args; struct nfs_fh * fh; size_t acl_len; - unsigned int acl_pgbase; struct page ** acl_pages; }; -- cgit v0.10.2 From 1ca843a2d28dec89e58e7227c27a9d55f21f59e1 Mon Sep 17 00:00:00 2001 From: Andreas Gruenbacher Date: Tue, 3 Nov 2015 18:25:33 +0100 Subject: nfs: Fix GETATTR bitmap verification When decoding GETATTR replies, the client checks the attribute bitmap for which attributes the server has sent. It misses bits at the word boundaries, though; fix that. Signed-off-by: Andreas Gruenbacher Signed-off-by: Trond Myklebust diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c index 22a1ddd..dfed4f5 100644 --- a/fs/nfs/nfs4xdr.c +++ b/fs/nfs/nfs4xdr.c @@ -4375,6 +4375,11 @@ static int decode_statfs(struct xdr_stream *xdr, struct nfs_fsstat *fsstat) goto xdr_error; if ((status = decode_attr_files_total(xdr, bitmap, &fsstat->tfiles)) != 0) goto xdr_error; + + status = -EIO; + if (unlikely(bitmap[0])) + goto xdr_error; + if ((status = decode_attr_space_avail(xdr, bitmap, &fsstat->abytes)) != 0) goto xdr_error; if ((status = decode_attr_space_free(xdr, bitmap, &fsstat->fbytes)) != 0) @@ -4574,6 +4579,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap, goto xdr_error; fattr->valid |= status; + status = -EIO; + if (unlikely(bitmap[0])) + goto xdr_error; + status = decode_attr_mode(xdr, bitmap, &fmode); if (status < 0) goto xdr_error; @@ -4627,6 +4636,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap, goto xdr_error; fattr->valid |= status; + status = -EIO; + if (unlikely(bitmap[1])) + goto xdr_error; + status = decode_attr_mdsthreshold(xdr, bitmap, fattr->mdsthreshold); if (status < 0) goto xdr_error; @@ -4811,12 +4824,22 @@ static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo) if ((status = decode_attr_maxwrite(xdr, bitmap, &fsinfo->wtmax)) != 0) goto xdr_error; fsinfo->wtpref = fsinfo->wtmax; + + status = -EIO; + if (unlikely(bitmap[0])) + goto xdr_error; + status = decode_attr_time_delta(xdr, bitmap, &fsinfo->time_delta); if (status != 0) goto xdr_error; status = decode_attr_pnfstype(xdr, bitmap, &fsinfo->layouttype); if (status != 0) goto xdr_error; + + status = -EIO; + if (unlikely(bitmap[1])) + goto xdr_error; + status = decode_attr_layout_blksize(xdr, bitmap, &fsinfo->blksize); if (status) goto xdr_error; -- cgit v0.10.2 From 941c3ff3102ccce440034d59cf9e4e9cc10b720d Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Sat, 12 Sep 2015 09:37:18 +0800 Subject: Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug The sunrpc debug sysctl files only accept decimal number right now. But all the XXXDBUG_XXX macros are defined as hexadecimal. It is not easy to set or check an separate flag. This patch let those files support accepting hexadecimal number, (decimal number is also supported). Also, display it as hexadecimal. v2, Remove duplicate parsing of '0x...', just using simple_strtol(tmpbuf, &s, 0) Fix a bug of isspace() checking after parsing Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c index 887f018..c88d9bc 100644 --- a/net/sunrpc/sysctl.c +++ b/net/sunrpc/sysctl.c @@ -76,7 +76,7 @@ static int proc_dodebug(struct ctl_table *table, int write, void __user *buffer, size_t *lenp, loff_t *ppos) { - char tmpbuf[20], c, *s; + char tmpbuf[20], c, *s = NULL; char __user *p; unsigned int value; size_t left, len; @@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write, return -EFAULT; tmpbuf[left] = '\0'; - for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--) - value = 10 * value + (*s - '0'); - if (*s && !isspace(*s)) - return -EINVAL; - while (left && isspace(*s)) - left--, s++; + value = simple_strtol(tmpbuf, &s, 0); + if (s) { + left -= (s - tmpbuf); + if (left && !isspace(*s)) + return -EINVAL; + while (left && isspace(*s)) + left--, s++; + } else + left = 0; *(unsigned int *) table->data = value; /* Display the RPC tasks on writing to rpc_debug */ if (strcmp(table->procname, "rpc_debug") == 0) rpc_show_tasks(&init_net); } else { - if (!access_ok(VERIFY_WRITE, buffer, left)) - return -EFAULT; - len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data); + len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data); if (len > left) len = left; - if (__copy_to_user(buffer, tmpbuf, len)) + if (copy_to_user(buffer, tmpbuf, len)) return -EFAULT; if ((left -= len) > 0) { if (put_user('\n', (char __user *)buffer + len)) -- cgit v0.10.2