summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2016-08-31 05:08:52 (GMT)
committerDavid S. Miller <davem@davemloft.net>2016-08-31 05:08:52 (GMT)
commit792438e59ab64bf44d559fbc9c1a42f878c50b4d (patch)
treedf8353d249f3103aad19f72c7a7750d71133f9d9
parentcf4d13fecfe02ce695fc44d8a136d28505365e8e (diff)
parent4de48af663d88d8c9a2550e60725f5a5c660970b (diff)
downloadlinux-792438e59ab64bf44d559fbc9c1a42f878c50b4d.tar.xz
Merge tag 'rxrpc-rewrite-20160830-1' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs
David Howells says: ==================== rxrpc: Preparation for removal of use of skbs from AFS Here's a set of patches that prepare the way for the removal of the use of sk_buffs from fs/afs (they'll be entirely retained within net/rxrpc): (1) Fix a potential NULL-pointer deref in rxrpc_abort_calls(). (2) Condense all the terminal call state machine states to a single one plus supplementary info. (3) Add a trace point for rxrpc call usage debugging. (4) Cleanups and missing headers. (5) Provide a way for AFS to ask about a call's peer address without having an sk_buff to query. (6) Use call->peer directly rather than going via call->conn (which might be NULL). (7) Pass struct socket * to various rxrpc kernel interface functions so they can use that directly rather than getting it from the rxrpc_call struct. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--Documentation/networking/rxrpc.txt18
-rw-r--r--fs/afs/cmservice.c26
-rw-r--r--fs/afs/internal.h5
-rw-r--r--fs/afs/main.c1
-rw-r--r--fs/afs/rxrpc.c28
-rw-r--r--fs/afs/server.c11
-rw-r--r--include/net/af_rxrpc.h12
-rw-r--r--include/trace/events/rxrpc.h39
-rw-r--r--net/rxrpc/af_rxrpc.c5
-rw-r--r--net/rxrpc/ar-internal.h135
-rw-r--r--net/rxrpc/call_accept.c24
-rw-r--r--net/rxrpc/call_event.c63
-rw-r--r--net/rxrpc/call_object.c133
-rw-r--r--net/rxrpc/conn_client.c3
-rw-r--r--net/rxrpc/conn_event.c53
-rw-r--r--net/rxrpc/conn_object.c4
-rw-r--r--net/rxrpc/input.c72
-rw-r--r--net/rxrpc/output.c48
-rw-r--r--net/rxrpc/peer_event.c25
-rw-r--r--net/rxrpc/peer_object.c15
-rw-r--r--net/rxrpc/proc.c3
-rw-r--r--net/rxrpc/recvmsg.c13
-rw-r--r--net/rxrpc/skbuff.c4
23 files changed, 470 insertions, 270 deletions
diff --git a/Documentation/networking/rxrpc.txt b/Documentation/networking/rxrpc.txt
index 70c926a..cfc8cb9 100644
--- a/Documentation/networking/rxrpc.txt
+++ b/Documentation/networking/rxrpc.txt
@@ -725,7 +725,8 @@ The kernel interface functions are as follows:
(*) End a client call.
- void rxrpc_kernel_end_call(struct rxrpc_call *call);
+ void rxrpc_kernel_end_call(struct socket *sock,
+ struct rxrpc_call *call);
This is used to end a previously begun call. The user_call_ID is expunged
from AF_RXRPC's knowledge and will not be seen again in association with
@@ -733,7 +734,9 @@ The kernel interface functions are as follows:
(*) Send data through a call.
- int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
+ int rxrpc_kernel_send_data(struct socket *sock,
+ struct rxrpc_call *call,
+ struct msghdr *msg,
size_t len);
This is used to supply either the request part of a client call or the
@@ -747,7 +750,9 @@ The kernel interface functions are as follows:
(*) Abort a call.
- void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code);
+ void rxrpc_kernel_abort_call(struct socket *sock,
+ struct rxrpc_call *call,
+ u32 abort_code);
This is used to abort a call if it's still in an abortable state. The
abort code specified will be placed in the ABORT message sent.
@@ -868,6 +873,13 @@ The kernel interface functions are as follows:
This is used to allocate a null RxRPC key that can be used to indicate
anonymous security for a particular domain.
+ (*) Get the peer address of a call.
+
+ void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call,
+ struct sockaddr_rxrpc *_srx);
+
+ This is used to find the remote peer address of a call.
+
=======================
CONFIGURABLE PARAMETERS
diff --git a/fs/afs/cmservice.c b/fs/afs/cmservice.c
index 85737e9..77ee481 100644
--- a/fs/afs/cmservice.c
+++ b/fs/afs/cmservice.c
@@ -17,10 +17,6 @@
#include "internal.h"
#include "afs_cm.h"
-#if 0
-struct workqueue_struct *afs_cm_workqueue;
-#endif /* 0 */
-
static int afs_deliver_cb_init_call_back_state(struct afs_call *,
struct sk_buff *, bool);
static int afs_deliver_cb_init_call_back_state3(struct afs_call *,
@@ -171,9 +167,9 @@ static void SRXAFSCB_CallBack(struct work_struct *work)
static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
bool last)
{
+ struct sockaddr_rxrpc srx;
struct afs_callback *cb;
struct afs_server *server;
- struct in_addr addr;
__be32 *bp;
u32 tmp;
int ret, loop;
@@ -182,6 +178,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
switch (call->unmarshall) {
case 0:
+ rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
call->offset = 0;
call->unmarshall++;
@@ -282,13 +279,11 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
break;
}
-
call->state = AFS_CALL_REPLYING;
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
- memcpy(&addr, &ip_hdr(skb)->saddr, 4);
- server = afs_find_server(&addr);
+ server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
@@ -319,12 +314,14 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call,
struct sk_buff *skb,
bool last)
{
+ struct sockaddr_rxrpc srx;
struct afs_server *server;
- struct in_addr addr;
int ret;
_enter(",{%u},%d", skb->len, last);
+ rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
+
ret = afs_data_complete(call, skb, last);
if (ret < 0)
return ret;
@@ -334,8 +331,7 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call,
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
- memcpy(&addr, &ip_hdr(skb)->saddr, 4);
- server = afs_find_server(&addr);
+ server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
@@ -352,11 +348,13 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call,
struct sk_buff *skb,
bool last)
{
+ struct sockaddr_rxrpc srx;
struct afs_server *server;
- struct in_addr addr;
_enter(",{%u},%d", skb->len, last);
+ rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
+
/* There are some arguments that we ignore */
afs_data_consumed(call, skb);
if (!last)
@@ -367,8 +365,7 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call,
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
- memcpy(&addr, &ip_hdr(skb)->saddr, 4);
- server = afs_find_server(&addr);
+ server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
@@ -426,7 +423,6 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
_enter("");
-
if (memcmp(r, &afs_uuid, sizeof(afs_uuid)) == 0)
reply.match = htonl(0);
else
diff --git a/fs/afs/internal.h b/fs/afs/internal.h
index df976b2..d97552d 100644
--- a/fs/afs/internal.h
+++ b/fs/afs/internal.h
@@ -20,6 +20,7 @@
#include <linux/sched.h>
#include <linux/fscache.h>
#include <linux/backing-dev.h>
+#include <net/af_rxrpc.h>
#include "afs.h"
#include "afs_vl.h"
@@ -607,6 +608,8 @@ extern void afs_proc_cell_remove(struct afs_cell *);
/*
* rxrpc.c
*/
+extern struct socket *afs_socket;
+
extern int afs_open_socket(void);
extern void afs_close_socket(void);
extern void afs_data_consumed(struct afs_call *, struct sk_buff *);
@@ -654,7 +657,7 @@ do { \
extern struct afs_server *afs_lookup_server(struct afs_cell *,
const struct in_addr *);
-extern struct afs_server *afs_find_server(const struct in_addr *);
+extern struct afs_server *afs_find_server(const struct sockaddr_rxrpc *);
extern void afs_put_server(struct afs_server *);
extern void __exit afs_purge_servers(void);
diff --git a/fs/afs/main.c b/fs/afs/main.c
index 35de0c0..0b187ef 100644
--- a/fs/afs/main.c
+++ b/fs/afs/main.c
@@ -14,6 +14,7 @@
#include <linux/init.h>
#include <linux/completion.h>
#include <linux/sched.h>
+#include <linux/random.h>
#include "internal.h"
MODULE_DESCRIPTION("AFS Client File System");
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index 14d04c8..7b0d189 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -16,7 +16,7 @@
#include "internal.h"
#include "afs_cm.h"
-static struct socket *afs_socket; /* my RxRPC socket */
+struct socket *afs_socket; /* my RxRPC socket */
static struct workqueue_struct *afs_async_calls;
static atomic_t afs_outstanding_calls;
static atomic_t afs_outstanding_skbs;
@@ -207,7 +207,7 @@ static void afs_free_call(struct afs_call *call)
static void afs_end_call_nofree(struct afs_call *call)
{
if (call->rxcall) {
- rxrpc_kernel_end_call(call->rxcall);
+ rxrpc_kernel_end_call(afs_socket, call->rxcall);
call->rxcall = NULL;
}
if (call->type->destructor)
@@ -325,8 +325,8 @@ static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
* returns from sending the request */
if (first + loop >= last)
call->state = AFS_CALL_AWAIT_REPLY;
- ret = rxrpc_kernel_send_data(call->rxcall, msg,
- to - offset);
+ ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
+ msg, to - offset);
kunmap(pages[loop]);
if (ret < 0)
break;
@@ -406,7 +406,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
* request */
if (!call->send_pages)
call->state = AFS_CALL_AWAIT_REPLY;
- ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
+ ret = rxrpc_kernel_send_data(afs_socket, rxcall,
+ &msg, call->request_size);
if (ret < 0)
goto error_do_abort;
@@ -421,7 +422,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
return wait_mode->wait(call);
error_do_abort:
- rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
+ rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT);
while ((skb = skb_dequeue(&call->rx_queue)))
afs_free_skb(skb);
error_kill_call:
@@ -509,7 +510,8 @@ static void afs_deliver_to_call(struct afs_call *call)
if (call->state != AFS_CALL_AWAIT_REPLY)
abort_code = RXGEN_SS_UNMARSHAL;
do_abort:
- rxrpc_kernel_abort_call(call->rxcall,
+ rxrpc_kernel_abort_call(afs_socket,
+ call->rxcall,
abort_code);
call->error = ret;
call->state = AFS_CALL_ERROR;
@@ -605,7 +607,7 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
/* kill the call */
if (call->state < AFS_CALL_COMPLETE) {
_debug("call incomplete");
- rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
+ rxrpc_kernel_abort_call(afs_socket, call->rxcall, RX_CALL_DEAD);
while ((skb = skb_dequeue(&call->rx_queue)))
afs_free_skb(skb);
}
@@ -823,14 +825,15 @@ void afs_send_empty_reply(struct afs_call *call)
msg.msg_flags = 0;
call->state = AFS_CALL_AWAIT_ACK;
- switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
+ switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
case 0:
_leave(" [replied]");
return;
case -ENOMEM:
_debug("oom");
- rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
+ rxrpc_kernel_abort_call(afs_socket, call->rxcall,
+ RX_USER_ABORT);
default:
afs_end_call(call);
_leave(" [error]");
@@ -859,7 +862,7 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
msg.msg_flags = 0;
call->state = AFS_CALL_AWAIT_ACK;
- n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
+ n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
if (n >= 0) {
/* Success */
_leave(" [replied]");
@@ -868,7 +871,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
if (n == -ENOMEM) {
_debug("oom");
- rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
+ rxrpc_kernel_abort_call(afs_socket, call->rxcall,
+ RX_USER_ABORT);
}
afs_end_call(call);
_leave(" [error]");
diff --git a/fs/afs/server.c b/fs/afs/server.c
index f342acf..d4066ab 100644
--- a/fs/afs/server.c
+++ b/fs/afs/server.c
@@ -178,13 +178,18 @@ server_in_two_cells:
/*
* look up a server by its IP address
*/
-struct afs_server *afs_find_server(const struct in_addr *_addr)
+struct afs_server *afs_find_server(const struct sockaddr_rxrpc *srx)
{
struct afs_server *server = NULL;
struct rb_node *p;
- struct in_addr addr = *_addr;
+ struct in_addr addr = srx->transport.sin.sin_addr;
- _enter("%pI4", &addr.s_addr);
+ _enter("{%d,%pI4}", srx->transport.family, &addr.s_addr);
+
+ if (srx->transport.family != AF_INET) {
+ WARN(true, "AFS does not yes support non-IPv4 addresses\n");
+ return NULL;
+ }
read_lock(&afs_servers_lock);
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index 7b0f886..f8d8079 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -15,6 +15,9 @@
#include <linux/skbuff.h>
#include <linux/rxrpc.h>
+struct key;
+struct sock;
+struct socket;
struct rxrpc_call;
/*
@@ -39,15 +42,18 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
struct key *,
unsigned long,
gfp_t);
-int rxrpc_kernel_send_data(struct rxrpc_call *, struct msghdr *, size_t);
+int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
+ struct msghdr *, size_t);
void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
-void rxrpc_kernel_abort_call(struct rxrpc_call *, u32);
-void rxrpc_kernel_end_call(struct rxrpc_call *);
+void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32);
+void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_is_data_last(struct sk_buff *);
u32 rxrpc_kernel_get_abort_code(struct sk_buff *);
int rxrpc_kernel_get_error_number(struct sk_buff *);
void rxrpc_kernel_free_skb(struct sk_buff *);
struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long);
int rxrpc_kernel_reject_call(struct socket *);
+void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
+ struct sockaddr_rxrpc *);
#endif /* _NET_RXRPC_H */
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 15283ee..cbe574e 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -16,6 +16,45 @@
#include <linux/tracepoint.h>
+TRACE_EVENT(rxrpc_call,
+ TP_PROTO(struct rxrpc_call *call, int op, int usage, int nskb,
+ const void *where, const void *aux),
+
+ TP_ARGS(call, op, usage, nskb, where, aux),
+
+ TP_STRUCT__entry(
+ __field(struct rxrpc_call *, call )
+ __field(int, op )
+ __field(int, usage )
+ __field(int, nskb )
+ __field(const void *, where )
+ __field(const void *, aux )
+ ),
+
+ TP_fast_assign(
+ __entry->call = call;
+ __entry->op = op;
+ __entry->usage = usage;
+ __entry->nskb = nskb;
+ __entry->where = where;
+ __entry->aux = aux;
+ ),
+
+ TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
+ __entry->call,
+ (__entry->op == 0 ? "NWc" :
+ __entry->op == 1 ? "NWs" :
+ __entry->op == 2 ? "SEE" :
+ __entry->op == 3 ? "GET" :
+ __entry->op == 4 ? "Gsb" :
+ __entry->op == 5 ? "PUT" :
+ "Psb"),
+ __entry->usage,
+ __entry->nskb,
+ __entry->where,
+ __entry->aux)
+ );
+
TRACE_EVENT(rxrpc_skb,
TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count,
const void *where),
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index c7cf356..e07c91a 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -279,15 +279,16 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call);
/**
* rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
+ * @sock: The socket the call is on
* @call: The call to end
*
* Allow a kernel service to end a call it was using. The call must be
* complete before this is called (the call should be aborted if necessary).
*/
-void rxrpc_kernel_end_call(struct rxrpc_call *call)
+void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
{
_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
- rxrpc_remove_user_ID(call->socket, call);
+ rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
rxrpc_put_call(call);
}
EXPORT_SYMBOL(rxrpc_kernel_end_call);
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index c761124..0c320b2 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -289,8 +289,6 @@ enum rxrpc_conn_proto_state {
RXRPC_CONN_SERVICE, /* Service secured connection */
RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */
RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */
- RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */
- RXRPC_CONN_LOCAL_ERROR, /* Conn terminated by local error */
RXRPC_CONN__NR_STATES
};
@@ -344,7 +342,6 @@ struct rxrpc_connection {
enum rxrpc_conn_proto_state state : 8; /* current state of connection */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
- int error; /* local error incurred */
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
unsigned int hi_serial; /* highest serial number received */
@@ -411,13 +408,22 @@ enum rxrpc_call_state {
RXRPC_CALL_SERVER_ACK_REQUEST, /* - server pending ACK of request */
RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */
RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */
- RXRPC_CALL_COMPLETE, /* - call completed */
+ RXRPC_CALL_COMPLETE, /* - call complete */
+ RXRPC_CALL_DEAD, /* - call is dead */
+ NR__RXRPC_CALL_STATES
+};
+
+/*
+ * Call completion condition (state == RXRPC_CALL_COMPLETE).
+ */
+enum rxrpc_call_completion {
+ RXRPC_CALL_SUCCEEDED, /* - Normal termination */
RXRPC_CALL_SERVER_BUSY, /* - call rejected by busy server */
RXRPC_CALL_REMOTELY_ABORTED, /* - call aborted by peer */
RXRPC_CALL_LOCALLY_ABORTED, /* - call aborted locally on error or close */
+ RXRPC_CALL_LOCAL_ERROR, /* - call failed due to local error */
RXRPC_CALL_NETWORK_ERROR, /* - call terminated by network error */
- RXRPC_CALL_DEAD, /* - call is dead */
- NR__RXRPC_CALL_STATES
+ NR__RXRPC_CALL_COMPLETIONS
};
/*
@@ -451,14 +457,13 @@ struct rxrpc_call {
unsigned long events;
spinlock_t lock;
rwlock_t state_lock; /* lock for state transition */
+ u32 abort_code; /* Local/remote abort code */
+ int error; /* Local error incurred */
+ enum rxrpc_call_state state : 8; /* current state of call */
+ enum rxrpc_call_completion completion : 8; /* Call completion condition */
atomic_t usage;
atomic_t skb_count; /* Outstanding packets on this call */
atomic_t sequence; /* Tx data packet sequence counter */
- u32 local_abort; /* local abort code */
- u32 remote_abort; /* remote abort code */
- int error_report; /* Network error (ICMP/local transport) */
- int error; /* Local error incurred */
- enum rxrpc_call_state state : 8; /* current state of call */
u16 service_id; /* service ID */
u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */
@@ -493,20 +498,6 @@ struct rxrpc_call {
unsigned long ackr_window[RXRPC_ACKR_WINDOW_ASZ + 1];
};
-/*
- * locally abort an RxRPC call
- */
-static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
-{
- write_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_COMPLETE) {
- call->local_abort = abort_code;
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
- }
- write_unlock_bh(&call->state_lock);
-}
-
#include <trace/events/rxrpc.h>
/*
@@ -534,6 +525,8 @@ void rxrpc_process_call(struct work_struct *);
/*
* call_object.c
*/
+extern const char *const rxrpc_call_states[];
+extern const char *const rxrpc_call_completions[];
extern unsigned int rxrpc_max_call_lifetime;
extern unsigned int rxrpc_dead_call_expiry;
extern struct kmem_cache *rxrpc_call_jar;
@@ -550,7 +543,11 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *,
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-void __rxrpc_put_call(struct rxrpc_call *);
+void rxrpc_see_call(struct rxrpc_call *);
+void rxrpc_get_call(struct rxrpc_call *);
+void rxrpc_put_call(struct rxrpc_call *);
+void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
void __exit rxrpc_destroy_all_calls(void);
static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
@@ -564,6 +561,78 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
}
/*
+ * Transition a call to the complete state.
+ */
+static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call,
+ enum rxrpc_call_completion compl,
+ u32 abort_code,
+ int error)
+{
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ call->abort_code = abort_code;
+ call->error = error;
+ call->completion = compl,
+ call->state = RXRPC_CALL_COMPLETE;
+ return true;
+ }
+ return false;
+}
+
+static inline bool rxrpc_set_call_completion(struct rxrpc_call *call,
+ enum rxrpc_call_completion compl,
+ u32 abort_code,
+ int error)
+{
+ int ret;
+
+ write_lock_bh(&call->state_lock);
+ ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
+ write_unlock_bh(&call->state_lock);
+ return ret;
+}
+
+/*
+ * Record that a call successfully completed.
+ */
+static inline void __rxrpc_call_completed(struct rxrpc_call *call)
+{
+ __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
+}
+
+static inline void rxrpc_call_completed(struct rxrpc_call *call)
+{
+ write_lock_bh(&call->state_lock);
+ __rxrpc_call_completed(call);
+ write_unlock_bh(&call->state_lock);
+}
+
+/*
+ * Record that a call is locally aborted.
+ */
+static inline bool __rxrpc_abort_call(struct rxrpc_call *call,
+ u32 abort_code, int error)
+{
+ if (__rxrpc_set_call_completion(call,
+ RXRPC_CALL_LOCALLY_ABORTED,
+ abort_code, error)) {
+ set_bit(RXRPC_CALL_EV_ABORT, &call->events);
+ return true;
+ }
+ return false;
+}
+
+static inline bool rxrpc_abort_call(struct rxrpc_call *call,
+ u32 abort_code, int error)
+{
+ bool ret;
+
+ write_lock_bh(&call->state_lock);
+ ret = __rxrpc_abort_call(call, abort_code, error);
+ write_unlock_bh(&call->state_lock);
+ return ret;
+}
+
+/*
* conn_client.c
*/
extern unsigned int rxrpc_max_client_connections;
@@ -778,7 +847,6 @@ static inline void rxrpc_put_peer(struct rxrpc_peer *peer)
/*
* proc.c
*/
-extern const char *const rxrpc_call_states[];
extern const struct file_operations rxrpc_call_seq_fops;
extern const struct file_operations rxrpc_connection_seq_fops;
@@ -958,16 +1026,3 @@ do { \
} while (0)
#endif /* __KDEBUGALL */
-
-
-#define rxrpc_get_call(CALL) \
-do { \
- CHECK_SLAB_OKAY(&(CALL)->usage); \
- if (atomic_inc_return(&(CALL)->usage) == 1) \
- BUG(); \
-} while (0)
-
-#define rxrpc_put_call(CALL) \
-do { \
- __rxrpc_put_call(CALL); \
-} while (0)
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 669ac79..03af88f 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -129,8 +129,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
_debug("conn ready");
call->state = RXRPC_CALL_SERVER_ACCEPTING;
list_add_tail(&call->accept_link, &rx->acceptq);
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, notification);
nsp = rxrpc_skb(notification);
nsp->call = call;
@@ -323,18 +322,15 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk);
+ rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_SERVER_ACCEPTING:
call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
break;
- case RXRPC_CALL_REMOTELY_ABORTED:
- case RXRPC_CALL_LOCALLY_ABORTED:
- ret = -ECONNABORTED;
- goto out_release;
- case RXRPC_CALL_NETWORK_ERROR:
- ret = call->conn->error;
+ case RXRPC_CALL_COMPLETE:
+ ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
@@ -399,21 +395,19 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk);
+ rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_SERVER_ACCEPTING:
- call->state = RXRPC_CALL_SERVER_BUSY;
+ __rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY,
+ 0, ECONNABORTED);
if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
rxrpc_queue_call(call);
ret = 0;
goto out_release;
- case RXRPC_CALL_REMOTELY_ABORTED:
- case RXRPC_CALL_LOCALLY_ABORTED:
- ret = -ECONNABORTED;
- goto out_release;
- case RXRPC_CALL_NETWORK_ERROR:
- ret = call->conn->error;
+ case RXRPC_CALL_COMPLETE:
+ ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 5292bcf..de72de6 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -95,7 +95,7 @@ cancel_timer:
_debug("cancel timer %%%u", serial);
try_to_del_timer_sync(&call->ack_timer);
read_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE &&
+ if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
read_unlock_bh(&call->state_lock);
@@ -123,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
unsigned long resend_at)
{
read_lock_bh(&call->state_lock);
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (call->state == RXRPC_CALL_COMPLETE)
resend = 0;
if (resend & 1) {
@@ -230,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call)
_enter("%d,%d,%d",
call->acks_tail, call->acks_unacked, call->acks_head);
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (call->state == RXRPC_CALL_COMPLETE)
return;
resend = 0;
@@ -465,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
skb->destructor = rxrpc_packet_destructor;
ASSERTCMP(sp->call, ==, NULL);
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
/* insert into the buffer in sequence order */
spin_lock_bh(&call->lock);
@@ -552,7 +551,7 @@ static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
- peer = call->conn->params.peer;
+ peer = call->peer;
if (mtu < peer->maxdata) {
spin_lock_bh(&peer->lock);
peer->maxdata = mtu;
@@ -711,7 +710,7 @@ all_acked:
break;
case RXRPC_CALL_SERVER_AWAIT_ACK:
_debug("srv complete");
- call->state = RXRPC_CALL_COMPLETE;
+ __rxrpc_call_completed(call);
post_ACK = true;
break;
case RXRPC_CALL_CLIENT_SEND_REQUEST:
@@ -741,8 +740,7 @@ all_acked:
_debug("post ACK");
skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
BUG();
@@ -801,8 +799,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
memset(sp, 0, sizeof(*sp));
sp->error = error;
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
@@ -834,6 +831,8 @@ void rxrpc_process_call(struct work_struct *work)
u32 serial, abort_code = RX_PROTOCOL_ERROR;
u8 *acks = NULL;
+ rxrpc_see_call(call);
+
//printk("\n--------------------\n");
_enter("{%d,%s,%lx} [%lu]",
call->debug_id, rxrpc_call_states[call->state], call->events,
@@ -844,8 +843,8 @@ void rxrpc_process_call(struct work_struct *work)
/* there's a good chance we're going to have to send a message, so set
* one up in advance */
- msg.msg_name = &call->conn->params.peer->srx.transport;
- msg.msg_namelen = call->conn->params.peer->srx.transport_len;
+ msg.msg_name = &call->peer->srx.transport;
+ msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
@@ -875,24 +874,22 @@ skip_msg_init:
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
- error = call->error_report;
- if (error < RXRPC_LOCAL_ERROR_OFFSET) {
+ if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
mark = RXRPC_SKB_MARK_NET_ERROR;
_debug("post net error %d", error);
} else {
mark = RXRPC_SKB_MARK_LOCAL_ERROR;
- error -= RXRPC_LOCAL_ERROR_OFFSET;
_debug("post net local error %d", error);
}
- if (rxrpc_post_message(call, mark, error, true) < 0)
+ if (rxrpc_post_message(call, mark, call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
goto kill_ACKs;
}
if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
- ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
@@ -900,7 +897,7 @@ skip_msg_init:
_debug("post conn abort");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
- call->conn->error, true) < 0)
+ call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
goto kill_ACKs;
@@ -913,13 +910,13 @@ skip_msg_init:
}
if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
- ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
- ECONNABORTED, true) < 0)
+ call->error, true) < 0)
goto no_mem;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
- data = htonl(call->local_abort);
+ data = htonl(call->abort_code);
iov[1].iov_base = &data;
iov[1].iov_len = sizeof(data);
genbit = RXRPC_CALL_EV_ABORT;
@@ -979,13 +976,7 @@ skip_msg_init:
}
if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
- write_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_CALL_TIMEOUT;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
- }
- write_unlock_bh(&call->state_lock);
+ rxrpc_abort_call(call, RX_CALL_TIMEOUT, ETIME);
_debug("post timeout");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
@@ -998,7 +989,8 @@ skip_msg_init:
/* deal with assorted inbound messages */
if (!skb_queue_empty(&call->rx_queue)) {
- switch (rxrpc_process_rx_queue(call, &abort_code)) {
+ ret = rxrpc_process_rx_queue(call, &abort_code);
+ switch (ret) {
case 0:
case -EAGAIN:
break;
@@ -1007,7 +999,7 @@ skip_msg_init:
case -EKEYEXPIRED:
case -EKEYREJECTED:
case -EPROTO:
- rxrpc_abort_call(call, abort_code);
+ rxrpc_abort_call(call, abort_code, -ret);
goto kill_ACKs;
}
}
@@ -1159,8 +1151,8 @@ skip_msg_init:
send_ACK_with_skew:
ack.maxSkew = htons(call->ackr_skew);
send_ACK:
- mtu = call->conn->params.peer->if_mtu;
- mtu -= call->conn->params.peer->hdrsize;
+ mtu = call->peer->if_mtu;
+ mtu -= call->peer->hdrsize;
ackinfo.maxMTU = htonl(mtu);
ackinfo.rwind = htonl(rxrpc_rx_window_size);
@@ -1232,10 +1224,7 @@ send_message_2:
goto kill_ACKs;
case RXRPC_CALL_EV_ACK_FINAL:
- write_lock_bh(&call->state_lock);
- if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
- call->state = RXRPC_CALL_COMPLETE;
- write_unlock_bh(&call->state_lock);
+ rxrpc_call_completed(call);
goto kill_ACKs;
default:
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index e7cbcc4..104ee8b 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -30,7 +30,7 @@ unsigned int rxrpc_max_call_lifetime = 60 * HZ;
unsigned int rxrpc_dead_call_expiry = 2 * HZ;
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
- [RXRPC_CALL_UNINITIALISED] = "Uninit",
+ [RXRPC_CALL_UNINITIALISED] = "Uninit ",
[RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn",
[RXRPC_CALL_CLIENT_SEND_REQUEST] = "ClSndReq",
[RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl",
@@ -43,11 +43,16 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
[RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl",
[RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK",
[RXRPC_CALL_COMPLETE] = "Complete",
+ [RXRPC_CALL_DEAD] = "Dead ",
+};
+
+const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
+ [RXRPC_CALL_SUCCEEDED] = "Complete",
[RXRPC_CALL_SERVER_BUSY] = "SvBusy ",
[RXRPC_CALL_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CALL_LOCALLY_ABORTED] = "LocAbort",
+ [RXRPC_CALL_LOCAL_ERROR] = "LocError",
[RXRPC_CALL_NETWORK_ERROR] = "NetError",
- [RXRPC_CALL_DEAD] = "Dead ",
};
struct kmem_cache *rxrpc_call_jar;
@@ -214,6 +219,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
{
struct rxrpc_call *call, *xcall;
struct rb_node *parent, **pp;
+ const void *here = __builtin_return_address(0);
int ret;
_enter("%p,%lx", rx, user_call_ID);
@@ -224,6 +230,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call;
}
+ trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
+ (const void *)user_call_ID);
+
/* Publish the call, even though it is incompletely set up as yet */
call->user_call_ID = user_call_ID;
__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
@@ -303,6 +312,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_call *call, *candidate;
+ const void *here = __builtin_return_address(0);
u32 call_id, chan;
_enter(",%d", conn->debug_id);
@@ -313,6 +323,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
if (!candidate)
return ERR_PTR(-EBUSY);
+ trace_rxrpc_call(candidate, 1, atomic_read(&candidate->usage),
+ 0, here, NULL);
+
chan = sp->hdr.cid & RXRPC_CHANNELMASK;
candidate->socket = rx;
candidate->conn = conn;
@@ -358,7 +371,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
_debug("CALL: %u { %s }",
call->debug_id, rxrpc_call_states[call->state]);
- if (call->state >= RXRPC_CALL_COMPLETE) {
+ if (call->state == RXRPC_CALL_COMPLETE) {
__rxrpc_disconnect_call(conn, call);
} else {
spin_unlock(&conn->channel_lock);
@@ -426,6 +439,44 @@ old_call:
}
/*
+ * Note the re-emergence of a call.
+ */
+void rxrpc_see_call(struct rxrpc_call *call)
+{
+ const void *here = __builtin_return_address(0);
+ if (call) {
+ int n = atomic_read(&call->usage);
+ int m = atomic_read(&call->skb_count);
+
+ trace_rxrpc_call(call, 2, n, m, here, 0);
+ }
+}
+
+/*
+ * Note the addition of a ref on a call.
+ */
+void rxrpc_get_call(struct rxrpc_call *call)
+{
+ const void *here = __builtin_return_address(0);
+ int n = atomic_inc_return(&call->usage);
+ int m = atomic_read(&call->skb_count);
+
+ trace_rxrpc_call(call, 3, n, m, here, 0);
+}
+
+/*
+ * Note the addition of a ref on a call for a socket buffer.
+ */
+void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
+{
+ const void *here = __builtin_return_address(0);
+ int n = atomic_inc_return(&call->usage);
+ int m = atomic_inc_return(&call->skb_count);
+
+ trace_rxrpc_call(call, 4, n, m, here, skb);
+}
+
+/*
* detach a call from a socket and set up for release
*/
void rxrpc_release_call(struct rxrpc_call *call)
@@ -438,6 +489,8 @@ void rxrpc_release_call(struct rxrpc_call *call)
atomic_read(&call->ackr_not_idle),
call->rx_first_oos);
+ rxrpc_see_call(call);
+
spin_lock_bh(&call->lock);
if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
BUG();
@@ -472,8 +525,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
if (call->state < RXRPC_CALL_COMPLETE &&
call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
_debug("+++ ABORTING STATE %d +++\n", call->state);
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_CALL_DEAD;
+ __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
}
write_unlock_bh(&call->state_lock);
@@ -522,6 +574,7 @@ static void rxrpc_dead_call_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
+ rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
call->state = RXRPC_CALL_DEAD;
write_unlock_bh(&call->state_lock);
@@ -536,22 +589,16 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
{
bool sched;
+ rxrpc_see_call(call);
write_lock(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD) {
- sched = false;
- if (call->state < RXRPC_CALL_COMPLETE) {
- _debug("abort call %p", call);
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_CALL_DEAD;
- if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
- sched = true;
- }
+ sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
sched = true;
- if (sched)
- rxrpc_queue_call(call);
}
write_unlock(&call->state_lock);
+ if (sched)
+ rxrpc_queue_call(call);
}
/*
@@ -588,21 +635,43 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
/*
* release a call
*/
-void __rxrpc_put_call(struct rxrpc_call *call)
+void rxrpc_put_call(struct rxrpc_call *call)
{
- ASSERT(call != NULL);
+ const void *here = __builtin_return_address(0);
+ int n, m;
- _enter("%p{u=%d}", call, atomic_read(&call->usage));
+ ASSERT(call != NULL);
- ASSERTCMP(atomic_read(&call->usage), >, 0);
+ n = atomic_dec_return(&call->usage);
+ m = atomic_read(&call->skb_count);
+ trace_rxrpc_call(call, 5, n, m, here, NULL);
+ ASSERTCMP(n, >=, 0);
+ if (n == 0) {
+ _debug("call %d dead", call->debug_id);
+ WARN_ON(m != 0);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
+ rxrpc_queue_work(&call->destroyer);
+ }
+}
- if (atomic_dec_and_test(&call->usage)) {
+/*
+ * Release a call ref held by a socket buffer.
+ */
+void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
+{
+ const void *here = __builtin_return_address(0);
+ int n, m;
+
+ n = atomic_dec_return(&call->usage);
+ m = atomic_dec_return(&call->skb_count);
+ trace_rxrpc_call(call, 6, n, m, here, skb);
+ ASSERTCMP(n, >=, 0);
+ if (n == 0) {
_debug("call %d dead", call->debug_id);
- WARN_ON(atomic_read(&call->skb_count) != 0);
+ WARN_ON(m != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer);
}
- _leave("");
}
/*
@@ -708,6 +777,7 @@ void __exit rxrpc_destroy_all_calls(void)
call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
_debug("Zapping call %p", call);
+ rxrpc_see_call(call);
list_del_init(&call->link);
switch (atomic_read(&call->usage)) {
@@ -749,16 +819,14 @@ static void rxrpc_call_life_expired(unsigned long _call)
{
struct rxrpc_call *call = (struct rxrpc_call *) _call;
+ _enter("{%d}", call->debug_id);
+
+ rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
- _enter("{%d}", call->debug_id);
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_COMPLETE) {
- set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
- rxrpc_queue_call(call);
- }
- read_unlock_bh(&call->state_lock);
+ set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
+ rxrpc_queue_call(call);
}
/*
@@ -771,6 +839,7 @@ static void rxrpc_resend_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
+ rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
@@ -788,12 +857,10 @@ static void rxrpc_ack_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
+ rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_COMPLETE &&
- !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
+ if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
}
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 349402b..4b213bc 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -537,6 +537,7 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
struct rxrpc_call, chan_wait_link);
u32 call_id = chan->call_counter + 1;
+ rxrpc_see_call(call);
list_del_init(&call->chan_wait_link);
conn->active_chans |= 1 << channel;
call->peer = rxrpc_get_peer(conn->params.peer);
@@ -741,7 +742,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
* terminal retransmission without requiring access to the call.
*/
if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
- _debug("exposed %u,%u", call->call_id, call->local_abort);
+ _debug("exposed %u,%u", call->call_id, call->abort_code);
__rxrpc_disconnect_call(conn, call);
}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 6296374..bc9b059 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -27,8 +27,8 @@
/*
* Retransmit terminal ACK or ABORT of the previous call.
*/
-static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
- struct sk_buff *skb)
+static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
+ struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_channel *chan;
@@ -135,33 +135,39 @@ static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
/*
* pass a connection-level abort onto all calls on that connection
*/
-static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
- u32 abort_code)
+static void rxrpc_abort_calls(struct rxrpc_connection *conn,
+ enum rxrpc_call_completion compl,
+ u32 abort_code, int error)
{
struct rxrpc_call *call;
- int i;
+ bool queue;
+ int i, bit;
_enter("{%d},%x", conn->debug_id, abort_code);
+ if (compl == RXRPC_CALL_LOCALLY_ABORTED)
+ bit = RXRPC_CALL_EV_CONN_ABORT;
+ else
+ bit = RXRPC_CALL_EV_RCVD_ABORT;
+
spin_lock(&conn->channel_lock);
for (i = 0; i < RXRPC_MAXCALLS; i++) {
call = rcu_dereference_protected(
conn->channels[i].call,
lockdep_is_held(&conn->channel_lock));
- write_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = state;
- if (state == RXRPC_CALL_LOCALLY_ABORTED) {
- call->local_abort = conn->local_abort;
- set_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
- } else {
- call->remote_abort = conn->remote_abort;
- set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
+ if (call) {
+ rxrpc_see_call(call);
+ write_lock_bh(&call->state_lock);
+ if (rxrpc_set_call_completion(call, compl, abort_code,
+ error)) {
+ set_bit(bit, &call->events);
+ queue = true;
}
- rxrpc_queue_call(call);
+ write_unlock_bh(&call->state_lock);
+ if (queue)
+ rxrpc_queue_call(call);
}
- write_unlock_bh(&call->state_lock);
}
spin_unlock(&conn->channel_lock);
@@ -186,17 +192,16 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
/* generate a connection-level abort */
spin_lock_bh(&conn->state_lock);
- if (conn->state < RXRPC_CONN_REMOTELY_ABORTED) {
- conn->state = RXRPC_CONN_LOCALLY_ABORTED;
- conn->error = error;
- spin_unlock_bh(&conn->state_lock);
- } else {
+ if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) {
spin_unlock_bh(&conn->state_lock);
_leave(" = 0 [already dead]");
return 0;
}
- rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code);
+ conn->state = RXRPC_CONN_LOCALLY_ABORTED;
+ spin_unlock_bh(&conn->state_lock);
+
+ rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code, error);
msg.msg_name = &conn->params.peer->srx.transport;
msg.msg_namelen = conn->params.peer->srx.transport_len;
@@ -276,7 +281,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
case RXRPC_PACKET_TYPE_ACK:
- rxrpc_conn_retransmit(conn, skb);
+ rxrpc_conn_retransmit_call(conn, skb);
rxrpc_free_skb(skb);
return 0;
@@ -287,7 +292,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
_proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code);
conn->state = RXRPC_CONN_REMOTELY_ABORTED;
- rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED,
+ rxrpc_abort_calls(conn, 0, RXRPC_CALL_REMOTELY_ABORTED,
abort_code);
return -ECONNABORTED;
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 5b45b6c..9c6685b 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -165,8 +165,8 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
* through the channel, whilst disposing of the actual call record.
*/
chan->last_service_id = call->service_id;
- if (call->local_abort) {
- chan->last_abort = call->local_abort;
+ if (call->abort_code) {
+ chan->last_abort = call->abort_code;
chan->last_type = RXRPC_PACKET_TYPE_ABORT;
} else {
chan->last_seq = call->rx_data_eaten;
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 5e683dd..86bea9a 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -196,8 +196,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
goto enqueue_packet;
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
terminal = ((flags & RXRPC_LAST_PACKET) &&
!(flags & RXRPC_CLIENT_INITIATED));
ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
@@ -341,14 +340,13 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
abort_code = ntohl(wtmp);
_proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code);
- write_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_REMOTELY_ABORTED;
- call->remote_abort = abort_code;
+ if (__rxrpc_set_call_completion(call,
+ RXRPC_CALL_REMOTELY_ABORTED,
+ abort_code, ECONNABORTED)) {
set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
rxrpc_queue_call(call);
}
- goto free_packet_unlock;
+ goto free_packet;
case RXRPC_PACKET_TYPE_BUSY:
_proto("Rx BUSY %%%u", sp->hdr.serial);
@@ -359,7 +357,9 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
- call->state = RXRPC_CALL_SERVER_BUSY;
+ __rxrpc_set_call_completion(call,
+ RXRPC_CALL_SERVER_BUSY,
+ 0, EBUSY);
set_bit(RXRPC_CALL_EV_RCVD_BUSY, &call->events);
rxrpc_queue_call(call);
case RXRPC_CALL_SERVER_BUSY:
@@ -415,12 +415,8 @@ protocol_error:
_debug("protocol error");
write_lock_bh(&call->state_lock);
protocol_error_locked:
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_PROTOCOL_ERROR;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
+ if (__rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO))
rxrpc_queue_call(call);
- }
free_packet_unlock:
write_unlock_bh(&call->state_lock);
free_packet:
@@ -486,14 +482,8 @@ protocol_error:
_debug("protocol error");
rxrpc_free_skb(part);
rxrpc_free_skb(jumbo);
- write_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_PROTOCOL_ERROR;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
+ if (rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO))
rxrpc_queue_call(call);
- }
- write_unlock_bh(&call->state_lock);
_leave("");
}
@@ -514,26 +504,28 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
read_lock(&call->state_lock);
switch (call->state) {
- case RXRPC_CALL_LOCALLY_ABORTED:
- if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
- rxrpc_queue_call(call);
- goto free_unlock;
- }
- case RXRPC_CALL_REMOTELY_ABORTED:
- case RXRPC_CALL_NETWORK_ERROR:
case RXRPC_CALL_DEAD:
goto dead_call;
+
case RXRPC_CALL_COMPLETE:
- case RXRPC_CALL_CLIENT_FINAL_ACK:
- /* complete server call */
- if (rxrpc_conn_is_service(call->conn))
+ switch (call->completion) {
+ case RXRPC_CALL_LOCALLY_ABORTED:
+ if (!test_and_set_bit(RXRPC_CALL_EV_ABORT,
+ &call->events)) {
+ rxrpc_queue_call(call);
+ goto free_unlock;
+ }
+ default:
goto dead_call;
- /* resend last packet of a completed call */
- _debug("final ack again");
- rxrpc_get_call(call);
- set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
- rxrpc_queue_call(call);
- goto free_unlock;
+ case RXRPC_CALL_SUCCEEDED:
+ if (rxrpc_conn_is_service(call->conn))
+ goto dead_call;
+ goto resend_final_ack;
+ }
+
+ case RXRPC_CALL_CLIENT_FINAL_ACK:
+ goto resend_final_ack;
+
default:
break;
}
@@ -550,6 +542,13 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
rxrpc_put_call(call);
goto done;
+resend_final_ack:
+ _debug("final ack again");
+ rxrpc_get_call(call);
+ set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
+ rxrpc_queue_call(call);
+ goto free_unlock;
+
dead_call:
if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) {
skb->priority = RX_CALL_DEAD;
@@ -748,6 +747,7 @@ void rxrpc_data_ready(struct sock *sk)
if (!call || atomic_read(&call->usage) == 0)
goto cant_route_call;
+ rxrpc_see_call(call);
rxrpc_post_packet_to_call(call, skb);
goto out_unlock;
}
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 8a9917cb..b1e708a 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -115,12 +115,12 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg,
*/
static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
{
+ if (call->state >= RXRPC_CALL_COMPLETE)
+ return;
+
write_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = abort_code;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
+ if (__rxrpc_abort_call(call, abort_code, ECONNABORTED)) {
del_timer_sync(&call->resend_timer);
del_timer_sync(&call->ack_timer);
clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events);
@@ -207,12 +207,13 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
return PTR_ERR(call);
}
+ rxrpc_see_call(call);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
if (call->state >= RXRPC_CALL_COMPLETE) {
/* it's too late for this call */
- ret = -ECONNRESET;
+ ret = -ESHUTDOWN;
} else if (cmd == RXRPC_CMD_SEND_ABORT) {
rxrpc_send_abort(call, abort_code);
ret = 0;
@@ -238,6 +239,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
/**
* rxrpc_kernel_send_data - Allow a kernel service to send data on a call
+ * @sock: The socket the call is on
* @call: The call to send data through
* @msg: The data to send
* @len: The amount of data to send
@@ -247,8 +249,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
* nor should an address be supplied. MSG_MORE should be flagged if there's
* more data to come, otherwise this data will end the transmission phase.
*/
-int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
- size_t len)
+int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
+ struct msghdr *msg, size_t len)
{
int ret;
@@ -257,7 +259,7 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
ASSERTCMP(msg->msg_name, ==, NULL);
ASSERTCMP(msg->msg_control, ==, NULL);
- lock_sock(&call->socket->sk);
+ lock_sock(sock->sk);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
@@ -269,36 +271,36 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
ret = -EPROTO; /* request phase complete for this client call */
} else {
- ret = rxrpc_send_data(call->socket, call, msg, len);
+ ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
}
- release_sock(&call->socket->sk);
+ release_sock(sock->sk);
_leave(" = %d", ret);
return ret;
}
-
EXPORT_SYMBOL(rxrpc_kernel_send_data);
/**
* rxrpc_kernel_abort_call - Allow a kernel service to abort a call
+ * @sock: The socket the call is on
* @call: The call to be aborted
* @abort_code: The abort code to stick into the ABORT packet
*
* Allow a kernel service to abort a call, if it's still in an abortable state.
*/
-void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code)
+void rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
+ u32 abort_code)
{
_enter("{%d},%d", call->debug_id, abort_code);
- lock_sock(&call->socket->sk);
+ lock_sock(sock->sk);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
- if (call->state < RXRPC_CALL_COMPLETE)
- rxrpc_send_abort(call, abort_code);
+ rxrpc_send_abort(call, abort_code);
- release_sock(&call->socket->sk);
+ release_sock(sock->sk);
_leave("");
}
@@ -640,8 +642,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
/* check for the far side aborting the call or a network error
* occurring */
- if (call->state > RXRPC_CALL_COMPLETE)
- goto call_aborted;
+ if (call->state == RXRPC_CALL_COMPLETE)
+ goto call_terminated;
/* add the packet to the send queue if it's now full */
if (sp->remain <= 0 ||
@@ -702,15 +704,9 @@ out:
_leave(" = %d", ret);
return ret;
-call_aborted:
+call_terminated:
rxrpc_free_skb(skb);
- if (call->state == RXRPC_CALL_NETWORK_ERROR)
- ret = call->error_report < RXRPC_LOCAL_ERROR_OFFSET ?
- call->error_report :
- call->error_report - RXRPC_LOCAL_ERROR_OFFSET;
- else
- ret = -ECONNABORTED;
- _leave(" = %d", ret);
+ _leave(" = %d", -call->error);
return ret;
maybe_error:
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index 8940674..27b9eca 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -248,13 +248,21 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
struct rxrpc_peer *peer =
container_of(work, struct rxrpc_peer, error_distributor);
struct rxrpc_call *call;
- int error_report;
+ enum rxrpc_call_completion compl;
+ bool queue;
+ int error;
_enter("");
- error_report = READ_ONCE(peer->error_report);
+ error = READ_ONCE(peer->error_report);
+ if (error < RXRPC_LOCAL_ERROR_OFFSET) {
+ compl = RXRPC_CALL_NETWORK_ERROR;
+ } else {
+ compl = RXRPC_CALL_LOCAL_ERROR;
+ error -= RXRPC_LOCAL_ERROR_OFFSET;
+ }
- _debug("ISSUE ERROR %d", error_report);
+ _debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);
spin_lock_bh(&peer->lock);
@@ -262,16 +270,17 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
call = hlist_entry(peer->error_targets.first,
struct rxrpc_call, error_link);
hlist_del_init(&call->error_link);
+ rxrpc_see_call(call);
+ queue = false;
write_lock(&call->state_lock);
- if (call->state != RXRPC_CALL_COMPLETE &&
- call->state < RXRPC_CALL_NETWORK_ERROR) {
- call->error_report = error_report;
- call->state = RXRPC_CALL_NETWORK_ERROR;
+ if (__rxrpc_set_call_completion(call, compl, 0, error)) {
set_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
- rxrpc_queue_call(call);
+ queue = true;
}
write_unlock(&call->state_lock);
+ if (queue)
+ rxrpc_queue_call(call);
}
spin_unlock_bh(&peer->lock);
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 538e983..aebc73a 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -313,3 +313,18 @@ void __rxrpc_put_peer(struct rxrpc_peer *peer)
kfree_rcu(peer, rcu);
}
+
+/**
+ * rxrpc_kernel_get_peer - Get the peer address of a call
+ * @sock: The socket on which the call is in progress.
+ * @call: The call to query
+ * @_srx: Where to place the result
+ *
+ * Get the address of the remote peer in a call.
+ */
+void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call,
+ struct sockaddr_rxrpc *_srx)
+{
+ *_srx = call->peer->srx;
+}
+EXPORT_SYMBOL(rxrpc_kernel_get_peer);
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 060fb48..82c6405 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -22,7 +22,6 @@ static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
[RXRPC_CONN_SERVICE] = "SvSecure",
[RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort",
- [RXRPC_CONN_NETWORK_ERROR] = "NetError",
};
/*
@@ -94,7 +93,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
rxrpc_is_service_call(call) ? "Svc" : "Clt",
atomic_read(&call->usage),
rxrpc_call_states[call->state],
- call->remote_abort ?: call->local_abort,
+ call->abort_code,
call->user_call_ID);
return 0;
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index b964c2d..c9b38c7 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -115,6 +115,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
sp = rxrpc_skb(skb);
call = sp->call;
ASSERT(call != NULL);
+ rxrpc_see_call(call);
_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
@@ -294,12 +295,17 @@ receive_non_data_message:
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
break;
case RXRPC_SKB_MARK_REMOTE_ABORT:
- abort_code = call->remote_abort;
+ abort_code = call->abort_code;
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
break;
case RXRPC_SKB_MARK_LOCAL_ABORT:
- abort_code = call->local_abort;
+ abort_code = call->abort_code;
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
+ if (call->error) {
+ abort_code = call->error;
+ ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
+ &abort_code);
+ }
break;
case RXRPC_SKB_MARK_NET_ERROR:
_debug("RECV NET ERROR %d", sp->error);
@@ -392,9 +398,8 @@ u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
switch (skb->mark) {
case RXRPC_SKB_MARK_REMOTE_ABORT:
- return sp->call->remote_abort;
case RXRPC_SKB_MARK_LOCAL_ABORT:
- return sp->call->local_abort;
+ return sp->call->abort_code;
default:
BUG();
}
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index fbd8c74..2052920 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -140,9 +140,7 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
_enter("%p{%p}", skb, call);
if (call) {
- if (atomic_dec_return(&call->skb_count) < 0)
- BUG();
- rxrpc_put_call(call);
+ rxrpc_put_call_for_skb(call, skb);
sp->call = NULL;
}