diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/bcast.c | 87 | ||||
-rw-r--r-- | net/tipc/bcast.h | 5 | ||||
-rw-r--r-- | net/tipc/link.c | 794 | ||||
-rw-r--r-- | net/tipc/link.h | 7 | ||||
-rw-r--r-- | net/tipc/msg.c | 381 | ||||
-rw-r--r-- | net/tipc/msg.h | 35 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 76 | ||||
-rw-r--r-- | net/tipc/name_distr.h | 2 | ||||
-rw-r--r-- | net/tipc/net.c | 63 | ||||
-rw-r--r-- | net/tipc/net.h | 2 | ||||
-rw-r--r-- | net/tipc/node.c | 38 | ||||
-rw-r--r-- | net/tipc/node.h | 17 | ||||
-rw-r--r-- | net/tipc/node_subscr.c | 6 | ||||
-rw-r--r-- | net/tipc/port.c | 440 | ||||
-rw-r--r-- | net/tipc/port.h | 50 | ||||
-rw-r--r-- | net/tipc/socket.c | 551 | ||||
-rw-r--r-- | net/tipc/socket.h | 16 |
17 files changed, 1112 insertions, 1458 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 55c6c9d..dd13bfa 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@ /* * net/tipc/bcast.c: TIPC broadcast code * - * Copyright (c) 2004-2006, Ericsson AB + * Copyright (c) 2004-2006, 2014, Ericsson AB * Copyright (c) 2004, Intel Corporation. * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. @@ -38,6 +38,8 @@ #include "core.h" #include "link.h" #include "port.h" +#include "socket.h" +#include "msg.h" #include "bcast.h" #include "name_distr.h" @@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void) tipc_link_reset_all(node); } +uint tipc_bclink_get_mtu(void) +{ + return MAX_PKT_DEFAULT_MCAST; +} + void tipc_bclink_set_flags(unsigned int flags) { bclink->flags |= flags; @@ -382,30 +389,50 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_unlock(n_ptr); } -/* - * tipc_bclink_xmit - broadcast a packet to all nodes in cluster +/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster + * and to identified node local sockets + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ int tipc_bclink_xmit(struct sk_buff *buf) { - int res; + int rc = 0; + int bc = 0; + struct sk_buff *clbuf; - tipc_bclink_lock(); - - if (!bclink->bcast_nodes.count) { - res = msg_data_sz(buf_msg(buf)); - kfree_skb(buf); - goto exit; + /* Prepare clone of message for local node */ + clbuf = tipc_msg_reassemble(buf); + if (unlikely(!clbuf)) { + kfree_skb_list(buf); + return -EHOSTUNREACH; } - res = __tipc_link_xmit(bcl, buf); - if (likely(res >= 0)) { - bclink_set_last_sent(); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += bcl->out_queue_size; + /* Broadcast to all other nodes */ + if (likely(bclink)) { + tipc_bclink_lock(); + if (likely(bclink->bcast_nodes.count)) { + rc = __tipc_link_xmit(bcl, buf); + if (likely(!rc)) { + bclink_set_last_sent(); + bcl->stats.queue_sz_counts++; + bcl->stats.accu_queue_sz += bcl->out_queue_size; + } + bc = 1; + } + tipc_bclink_unlock(); } -exit: - tipc_bclink_unlock(); - return res; + + if (unlikely(!bc)) + kfree_skb_list(buf); + + /* Deliver message clone */ + if (likely(!rc)) + tipc_sk_mcast_rcv(clbuf); + else + kfree_skb(clbuf); + + return rc; } /** @@ -443,7 +470,7 @@ void tipc_bclink_rcv(struct sk_buff *buf) struct tipc_node *node; u32 next_in; u32 seqno; - int deferred; + int deferred = 0; /* Screen out unwanted broadcast messages */ @@ -494,7 +521,7 @@ receive: tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) - tipc_port_mcast_rcv(buf, NULL); + tipc_sk_mcast_rcv(buf); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { @@ -573,8 +600,7 @@ receive: node->bclink.deferred_size += deferred; bclink_update_last_sent(node, seqno); buf = NULL; - } else - deferred = 0; + } tipc_bclink_lock(); @@ -611,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2) { int bp_index; + struct tipc_msg *msg = buf_msg(buf); /* Prepare broadcast link message for reliable transmission, * if first time trying to send it; @@ -618,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, * since they are sent in an unreliable manner and don't need it */ if (likely(!msg_non_seq(buf_msg(buf)))) { - struct tipc_msg *msg; - bcbuf_set_acks(buf, bclink->bcast_nodes.count); - msg = buf_msg(buf); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); bcl->stats.sent_info++; @@ -638,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; - struct tipc_bearer *b = p; + struct tipc_bearer *bp[2] = {p, s}; + struct tipc_bearer *b = bp[msg_link_selector(msg)]; struct sk_buff *tbuf; if (!p) break; /* No more bearers to try */ - + if (!b) + b = p; tipc_nmap_diff(&bcbearer->remains, &b->nodes, &bcbearer->remains_new); if (bcbearer->remains_new.count == bcbearer->remains.count) @@ -660,13 +686,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); kfree_skb(tbuf); /* Bearer keeps a clone */ } - - /* Swap bearers for next packet */ - if (s) { - bcbearer->bpairs[bp_index].primary = s; - bcbearer->bpairs[bp_index].secondary = p; - } - if (bcbearer->remains_new.count == 0) break; /* All targets reached */ diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 00330c4..4875d95 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -1,7 +1,7 @@ /* * net/tipc/bcast.h: Include file for TIPC broadcast code * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -int tipc_bclink_xmit(struct sk_buff *buf); void tipc_bclink_rcv(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); @@ -98,5 +97,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); +uint tipc_bclink_get_mtu(void); +int tipc_bclink_xmit(struct sk_buff *buf); #endif diff --git a/net/tipc/link.c b/net/tipc/link.c index ad2c57f..fb1485d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -82,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, struct sk_buff **buf); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf); /* * Simple link routines @@ -335,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; spin_lock_bh(&tipc_port_list_lock); p_ptr = tipc_port_lock(origport); if (p_ptr) { if (!list_empty(&p_ptr->wait_list)) goto exit; - p_ptr->congested = 1; + tsk = tipc_port_to_sock(p_ptr); + tsk->link_cong = 1; p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); l_ptr->stats.link_congs++; @@ -355,6 +355,7 @@ exit: void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; struct tipc_port *temp_p_ptr; int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; @@ -370,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) wait_list) { if (win <= 0) break; + tsk = tipc_port_to_sock(p_ptr); list_del_init(&p_ptr->wait_list); spin_lock_bh(p_ptr->lock); - p_ptr->congested = 0; - tipc_port_wakeup(p_ptr); + tsk->link_cong = 0; + tipc_sock_wakeup(tsk); win -= p_ptr->waiting_pkts; spin_unlock_bh(p_ptr->lock); } @@ -676,178 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } } -/* - * link_bundle_buf(): Append contents of a buffer to - * the tail of an existing one. +/* tipc_link_cong: determine return value and how to treat the + * sent buffer during link congestion. + * - For plain, errorless user data messages we keep the buffer and + * return -ELINKONG. + * - For all other messages we discard the buffer and return -EHOSTUNREACH + * - For TIPC internal messages we also reset the link */ -static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, - struct sk_buff *buf) +static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) { - struct tipc_msg *bundler_msg = buf_msg(bundler); struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 bundle_size = msg_size(bundler_msg); - u32 to_pos = align(bundle_size); - u32 pad = to_pos - bundle_size; - - if (msg_user(bundler_msg) != MSG_BUNDLER) - return 0; - if (msg_type(bundler_msg) != OPEN_MSG) - return 0; - if (skb_tailroom(bundler) < (pad + size)) - return 0; - if (l_ptr->max_pkt < (to_pos + size)) - return 0; - - skb_put(bundler, pad + size); - skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); - msg_set_size(bundler_msg, to_pos + size); - msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); - kfree_skb(buf); - l_ptr->stats.sent_bundled++; - return 1; -} - -static void link_add_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf, - struct tipc_msg *msg) -{ - u32 ack = mod(l_ptr->next_in_no - 1); - u32 seqno = mod(l_ptr->next_out_no++); + uint psz = msg_size(msg); + uint imp = tipc_msg_tot_importance(msg); + u32 oport = msg_tot_origport(msg); - msg_set_word(msg, 2, ((ack << 16) | seqno)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - buf->next = NULL; - if (l_ptr->first_out) { - l_ptr->last_out->next = buf; - l_ptr->last_out = buf; - } else - l_ptr->first_out = l_ptr->last_out = buf; - - l_ptr->out_queue_size++; - if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; -} - -static void link_add_chain_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf_chain, - u32 long_msgno) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - - if (!l_ptr->next_out) - l_ptr->next_out = buf_chain; - while (buf_chain) { - buf = buf_chain; - buf_chain = buf_chain->next; - - msg = buf_msg(buf); - msg_set_long_msgno(msg, long_msgno); - link_add_to_outqueue(l_ptr, buf, msg); + if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { + if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) { + link_schedule_port(link, oport, psz); + return -ELINKCONG; + } + } else { + pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); + tipc_link_reset(link); } + kfree_skb_list(buf); + return -EHOSTUNREACH; } -/* - * tipc_link_xmit() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_xmit - * has failed, and from link_send() +/** + * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked + * @link: link to use + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket + * user data messages) or -EHOSTUNREACH (all other messages/senders) + * Only the socket functions tipc_send_stream() and tipc_send_packet() need + * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 dsz = msg_data_sz(msg); - u32 queue_size = l_ptr->out_queue_size; - u32 imp = tipc_msg_tot_importance(msg); - u32 queue_limit = l_ptr->queue_limit[imp]; - u32 max_packet = l_ptr->max_pkt; - - /* Match msg importance against queue limits: */ - if (unlikely(queue_size >= queue_limit)) { - if (imp <= TIPC_CRITICAL_IMPORTANCE) { - link_schedule_port(l_ptr, msg_origport(msg), size); - kfree_skb(buf); - return -ELINKCONG; - } - kfree_skb(buf); - if (imp > CONN_MANAGER) { - pr_warn("%s<%s>, send queue full", link_rst_msg, - l_ptr->name); - tipc_link_reset(l_ptr); - } - return dsz; + uint psz = msg_size(msg); + uint qsz = link->out_queue_size; + uint sndlim = link->queue_limit[0]; + uint imp = tipc_msg_tot_importance(msg); + uint mtu = link->max_pkt; + uint ack = mod(link->next_in_no - 1); + uint seqno = link->next_out_no; + uint bc_last_in = link->owner->bclink.last_in; + struct tipc_media_addr *addr = &link->media_addr; + struct sk_buff *next = buf->next; + + /* Match queue limits against msg importance: */ + if (unlikely(qsz >= link->queue_limit[imp])) + return tipc_link_cong(link, buf); + + /* Has valid packet limit been used ? */ + if (unlikely(psz > mtu)) { + kfree_skb_list(buf); + return -EMSGSIZE; } - /* Fragmentation needed ? */ - if (size > max_packet) - return tipc_link_frag_xmit(l_ptr, buf); - - /* Packet can be queued or sent. */ - if (likely(!link_congested(l_ptr))) { - link_add_to_outqueue(l_ptr, buf, msg); + /* Prepare each packet for sending, and add to outqueue: */ + while (buf) { + next = buf->next; + msg = buf_msg(buf); + msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); + msg_set_bcast_ack(msg, bc_last_in); + + if (!link->first_out) { + link->first_out = buf; + } else if (qsz < sndlim) { + link->last_out->next = buf; + } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + link->stats.sent_bundled++; + buf = next; + next = buf->next; + continue; + } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + link->stats.sent_bundled++; + link->stats.sent_bundles++; + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } else { + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return dsz; - } - /* Congestion: can message be bundled ? */ - if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && - (msg_user(msg) != MSG_FRAGMENTER)) { - - /* Try adding message to an existing bundle */ - if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) - return dsz; - - /* Try creating a new bundle */ - if (size <= max_packet * 2 / 3) { - struct sk_buff *bundler = tipc_buf_acquire(max_packet); - struct tipc_msg bundler_hdr; - - if (bundler) { - tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, - INT_H_SIZE, l_ptr->addr); - skb_copy_to_linear_data(bundler, &bundler_hdr, - INT_H_SIZE); - skb_trim(bundler, INT_H_SIZE); - link_bundle_buf(l_ptr, bundler, buf); - buf = bundler; - msg = buf_msg(buf); - l_ptr->stats.sent_bundles++; - } + /* Send packet if possible: */ + if (likely(++qsz <= sndlim)) { + tipc_bearer_send(link->bearer_id, buf, addr); + link->next_out = next; + link->unacked_window = 0; } + seqno++; + link->last_out = buf; + buf = next; } - if (!l_ptr->next_out) - l_ptr->next_out = buf; - link_add_to_outqueue(l_ptr, buf, msg); - return dsz; + link->next_out_no = seqno; + link->out_queue_size = qsz; + return 0; } -/* - * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use - * has not been selected yet, and the the owner node is not locked - * Called by TIPC internal users, e.g. the name distributor +/** + * tipc_link_xmit() is the general link level function for message sending + * @buf: chain of buffers containing message + * @dsz: amount of user data to be sent + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) { - struct tipc_link *l_ptr; - struct tipc_node *n_ptr; - int res = -ELINKCONG; + struct tipc_link *link = NULL; + struct tipc_node *node; + int rc = -EHOSTUNREACH; - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[selector & 1]; - if (l_ptr) - res = __tipc_link_xmit(l_ptr, buf); - else - kfree_skb(buf); - tipc_node_unlock(n_ptr); - } else { - kfree_skb(buf); + node = tipc_node_find(dnode); + if (node) { + tipc_node_lock(node); + link = node->active_links[selector & 1]; + if (link) + rc = __tipc_link_xmit(link, buf); + tipc_node_unlock(node); } - return res; + + if (link) + return rc; + + if (likely(in_own_node(dnode))) + return tipc_sk_rcv(buf); + + kfree_skb_list(buf); + return rc; } /* @@ -858,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) * * Called with node locked */ -static void tipc_link_sync_xmit(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *link) { struct sk_buff *buf; struct tipc_msg *msg; @@ -868,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l) return; msg = buf_msg(buf); - tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); - msg_set_last_bcast(msg, l->owner->bclink.acked); - link_add_chain_to_outqueue(l, buf, 0); - tipc_link_push_queue(l); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); + msg_set_last_bcast(msg, link->owner->bclink.acked); + __tipc_link_xmit(link, buf); } /* @@ -892,293 +857,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) } /* - * tipc_link_names_xmit - send name table entries to new neighbor - * - * Send routine for bulk delivery of name table messages when contact - * with a new neighbor occurs. No link congestion checking is performed - * because name table messages *must* be delivered. The messages must be - * small enough not to require fragmentation. - * Called without any locks held. - */ -void tipc_link_names_xmit(struct list_head *message_list, u32 dest) -{ - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct sk_buff *temp_buf; - - if (list_empty(message_list)) - return; - - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) { - /* convert circular list to linear list */ - ((struct sk_buff *)message_list->prev)->next = NULL; - link_add_chain_to_outqueue(l_ptr, - (struct sk_buff *)message_list->next, 0); - tipc_link_push_queue(l_ptr); - INIT_LIST_HEAD(message_list); - } - tipc_node_unlock(n_ptr); - } - - /* discard the messages if they couldn't be sent */ - list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { - list_del((struct list_head *)buf); - kfree_skb(buf); - } -} - -/* - * tipc_link_xmit_fast: Entry for data messages where the - * destination link is known and the header is complete, - * inclusive total message length. Very time critical. - * Link is locked. Returns user data length. - */ -static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, - u32 *used_max_pkt) -{ - struct tipc_msg *msg = buf_msg(buf); - int res = msg_data_sz(msg); - - if (likely(!link_congested(l_ptr))) { - if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->bearer_id, buf, - &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return res; - } - else - *used_max_pkt = l_ptr->max_pkt; - } - return __tipc_link_xmit(l_ptr, buf); /* All other cases */ -} - -/* - * tipc_link_iovec_xmit_fast: Entry for messages where the - * destination processor is known and the header is complete, - * except for total message length. - * Returns user data length or errno. - */ -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_msg *hdr = &sender->phdr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct tipc_node *node; - int res; - u32 selector = msg_origport(hdr) & 1; - -again: - /* - * Try building message using port's max_pkt hint. - * (Must not hold any locks while building message.) - */ - res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf); - /* Exit if build request was invalid */ - if (unlikely(res < 0)) - return res; - - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[selector]; - if (likely(l_ptr)) { - if (likely(buf)) { - res = tipc_link_xmit_fast(l_ptr, buf, - &sender->max_pkt); -exit: - tipc_node_unlock(node); - return res; - } - - /* Exit if link (or bearer) is congested */ - if (link_congested(l_ptr)) { - res = link_schedule_port(l_ptr, - sender->ref, res); - goto exit; - } - - /* - * Message size exceeds max_pkt hint; update hint, - * then re-try fast path or fragment the message - */ - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - - - if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) - goto again; - - return tipc_link_iovec_long_xmit(sender, msg_sect, - len, destaddr); - } - tipc_node_unlock(node); - } - - /* Couldn't find a link to the destination node */ - kfree_skb(buf); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE); - return -ENETUNREACH; -} - -/* - * tipc_link_iovec_long_xmit(): Entry for long messages where the - * destination node is known and the header is complete, - * inclusive total message length. - * Link and bearer congestion status have been checked to be ok, - * and are ignored if they change. - * - * Note that fragments do not use the full link MTU so that they won't have - * to undergo refragmentation if link changeover causes them to be sent - * over another link with an additional tunnel header added as prefix. - * (Refragmentation will still occur if the other link has a smaller MTU.) - * - * Returns user data length or errno. - */ -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_link *l_ptr; - struct tipc_node *node; - struct tipc_msg *hdr = &sender->phdr; - u32 dsz = len; - u32 max_pkt, fragm_sz, rest; - struct tipc_msg fragm_hdr; - struct sk_buff *buf, *buf_chain, *prev; - u32 fragm_crs, fragm_rest, hsz, sect_rest; - const unchar __user *sect_crs; - int curr_sect; - u32 fragm_no; - int res = 0; - -again: - fragm_no = 1; - max_pkt = sender->max_pkt - INT_H_SIZE; - /* leave room for tunnel header in case of link changeover */ - fragm_sz = max_pkt - INT_H_SIZE; - /* leave room for fragmentation header in each fragment */ - rest = dsz; - fragm_crs = 0; - fragm_rest = 0; - sect_rest = 0; - sect_crs = NULL; - curr_sect = -1; - - /* Prepare reusable fragment header */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, msg_destnode(hdr)); - msg_set_size(&fragm_hdr, max_pkt); - msg_set_fragm_no(&fragm_hdr, 1); - - /* Prepare header of first fragment */ - buf_chain = buf = tipc_buf_acquire(max_pkt); - if (!buf) - return -ENOMEM; - buf->next = NULL; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - hsz = msg_hdr_sz(hdr); - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); - - /* Chop up message */ - fragm_crs = INT_H_SIZE + hsz; - fragm_rest = fragm_sz - hsz; - - do { /* For all sections */ - u32 sz; - - if (!sect_rest) { - sect_rest = msg_sect[++curr_sect].iov_len; - sect_crs = msg_sect[curr_sect].iov_base; - } - - if (sect_rest < fragm_rest) - sz = sect_rest; - else - sz = fragm_rest; - - if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { - res = -EFAULT; -error: - kfree_skb_list(buf_chain); - return res; - } - sect_crs += sz; - sect_rest -= sz; - fragm_crs += sz; - fragm_rest -= sz; - rest -= sz; - - if (!fragm_rest && rest) { - - /* Initiate new fragment: */ - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } else { - msg_set_type(&fragm_hdr, FRAGMENT); - } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - msg_set_fragm_no(&fragm_hdr, ++fragm_no); - prev = buf; - buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (!buf) { - res = -ENOMEM; - goto error; - } - - buf->next = NULL; - prev->next = buf; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - fragm_crs = INT_H_SIZE; - fragm_rest = fragm_sz; - } - } while (rest > 0); - - /* - * Now we have a buffer chain. Select a link and check - * that packet size is still OK - */ - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[sender->ref & 1]; - if (!l_ptr) { - tipc_node_unlock(node); - goto reject; - } - if (l_ptr->max_pkt < max_pkt) { - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - kfree_skb_list(buf_chain); - goto again; - } - } else { -reject: - kfree_skb_list(buf_chain); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, - TIPC_ERR_NO_NODE); - return -ENETUNREACH; - } - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - tipc_node_unlock(node); - return dsz; -} - -/* * tipc_link_push_packet: Push one unsent packet to the media */ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) @@ -1238,7 +916,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); + msg_set_type(msg, BUNDLE_CLOSED); l_ptr->next_out = buf->next; return 0; } @@ -1527,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(!list_empty(&l_ptr->waiting_ports))) tipc_link_wakeup_ports(l_ptr, 0); - if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); - } - /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { if (msg_user(msg) == LINK_PROTOCOL) { @@ -1565,57 +1238,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); - /* Deliver packet/message to correct user: */ - if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { - if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { - tipc_node_unlock(n_ptr); - continue; - } - msg = buf_msg(buf); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - l_ptr->stats.recv_fragments++; - if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { - l_ptr->stats.recv_fragmented++; - msg = buf_msg(buf); - } else { - if (!l_ptr->reasm_buf) - tipc_link_reset(l_ptr); - tipc_node_unlock(n_ptr); - continue; - } + if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { + l_ptr->stats.sent_acks++; + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } - switch (msg_user(msg)) { - case TIPC_LOW_IMPORTANCE: - case TIPC_MEDIUM_IMPORTANCE: - case TIPC_HIGH_IMPORTANCE: - case TIPC_CRITICAL_IMPORTANCE: + if (tipc_link_prepare_input(l_ptr, &buf)) { tipc_node_unlock(n_ptr); - tipc_sk_rcv(buf); continue; - case MSG_BUNDLER: - l_ptr->stats.recv_bundles++; - l_ptr->stats.recv_bundled += msg_msgcnt(msg); - tipc_node_unlock(n_ptr); - tipc_link_bundle_rcv(buf); - continue; - case NAME_DISTRIBUTOR: - n_ptr->bclink.recv_permitted = true; - tipc_node_unlock(n_ptr); - tipc_named_rcv(buf); - continue; - case CONN_MANAGER: - tipc_node_unlock(n_ptr); - tipc_port_proto_rcv(buf); - continue; - case BCAST_PROTOCOL: - tipc_link_sync_rcv(n_ptr, buf); - break; - default: - kfree_skb(buf); - break; } tipc_node_unlock(n_ptr); + msg = buf_msg(buf); + if (tipc_link_input(l_ptr, buf) != 0) + goto discard; continue; unlock_discard: tipc_node_unlock(n_ptr); @@ -1625,6 +1260,80 @@ discard: } /** + * tipc_link_prepare_input - process TIPC link messages + * + * returns nonzero if the message was consumed + * + * Node lock must be held + */ +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf) +{ + struct tipc_node *n; + struct tipc_msg *msg; + int res = -EINVAL; + + n = l->owner; + msg = buf_msg(*buf); + switch (msg_user(msg)) { + case CHANGEOVER_PROTOCOL: + if (tipc_link_tunnel_rcv(n, buf)) + res = 0; + break; + case MSG_FRAGMENTER: + l->stats.recv_fragments++; + if (tipc_buf_append(&l->reasm_buf, buf)) { + l->stats.recv_fragmented++; + res = 0; + } else if (!l->reasm_buf) { + tipc_link_reset(l); + } + break; + case MSG_BUNDLER: + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(msg); + res = 0; + break; + case NAME_DISTRIBUTOR: + n->bclink.recv_permitted = true; + res = 0; + break; + case BCAST_PROTOCOL: + tipc_link_sync_rcv(n, *buf); + break; + default: + res = 0; + } + return res; +} +/** + * tipc_link_input - Deliver message too higher layers + */ +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int res = 0; + + switch (msg_user(msg)) { + case TIPC_LOW_IMPORTANCE: + case TIPC_MEDIUM_IMPORTANCE: + case TIPC_HIGH_IMPORTANCE: + case TIPC_CRITICAL_IMPORTANCE: + case CONN_MANAGER: + tipc_sk_rcv(buf); + break; + case NAME_DISTRIBUTOR: + tipc_named_rcv(buf); + break; + case MSG_BUNDLER: + tipc_link_bundle_rcv(buf); + break; + default: + res = -EINVAL; + } + return res; +} + +/** * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * * Returns increase in queue length (i.e. 0 or 1) @@ -2217,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) u32 msgcount = msg_msgcnt(buf_msg(buf)); u32 pos = INT_H_SIZE; struct sk_buff *obuf; + struct tipc_msg *omsg; while (msgcount--) { obuf = buf_extract(buf, pos); @@ -2224,82 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) pr_warn("Link unable to unbundle message(s)\n"); break; } - pos += align(msg_size(buf_msg(obuf))); - tipc_net_route_msg(obuf); - } - kfree_skb(buf); -} - -/* - * Fragmentation/defragmentation: - */ - -/* - * tipc_link_frag_xmit: Entry for buffers needing fragmentation. - * The buffer is complete, inclusive total message length. - * Returns user data length. - */ -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) -{ - struct sk_buff *buf_chain = NULL; - struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; - struct tipc_msg *inmsg = buf_msg(buf); - struct tipc_msg fragm_hdr; - u32 insize = msg_size(inmsg); - u32 dsz = msg_data_sz(inmsg); - unchar *crs = buf->data; - u32 rest = insize; - u32 pack_sz = l_ptr->max_pkt; - u32 fragm_sz = pack_sz - INT_H_SIZE; - u32 fragm_no = 0; - u32 destaddr; - - if (msg_short(inmsg)) - destaddr = l_ptr->addr; - else - destaddr = msg_destnode(inmsg); - - /* Prepare reusable fragment header: */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, destaddr); - - /* Chop up message: */ - while (rest > 0) { - struct sk_buff *fragm; - - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } - fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (fragm == NULL) { - kfree_skb(buf); - kfree_skb_list(buf_chain); - return -ENOMEM; + omsg = buf_msg(obuf); + pos += align(msg_size(omsg)); + if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) { + tipc_sk_rcv(obuf); + } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { + tipc_named_rcv(obuf); + } else { + pr_warn("Illegal bundled msg: %u\n", msg_user(omsg)); + kfree_skb(obuf); } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - fragm_no++; - msg_set_fragm_no(&fragm_hdr, fragm_no); - skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, - fragm_sz); - buf_chain_tail->next = fragm; - buf_chain_tail = fragm; - - rest -= fragm_sz; - crs += fragm_sz; - msg_set_type(&fragm_hdr, FRAGMENT); } kfree_skb(buf); - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - - return dsz; } static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) diff --git a/net/tipc/link.h b/net/tipc/link.h index 200d518..782983c 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -227,13 +227,8 @@ void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); -void tipc_link_names_xmit(struct list_head *message_list, u32 dest); -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); -int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 0a37a47..9680be6 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -36,21 +36,16 @@ #include "core.h" #include "msg.h" +#include "addr.h" +#include "name_table.h" -u32 tipc_msg_tot_importance(struct tipc_msg *m) +#define MAX_FORWARD_SIZE 1024 + +static unsigned int align(unsigned int i) { - if (likely(msg_isdata(m))) { - if (likely(msg_orignode(m) == tipc_own_addr)) - return msg_importance(m); - return msg_importance(m) + 4; - } - if ((msg_user(m) == MSG_FRAGMENTER) && - (msg_type(m) == FIRST_FRAGMENT)) - return msg_importance(msg_get_wrapped(m)); - return msg_importance(m); + return (i + 3) & ~3u; } - void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode) { @@ -65,41 +60,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, msg_set_destnode(m, destnode); } -/** - * tipc_msg_build - create message using specified header and data - * - * Note: Caller must not hold any locks in case copy_from_user() is interrupted! - * - * Returns message data size or errno - */ -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf) -{ - int dsz, sz, hsz; - unsigned char *to; - - dsz = len; - hsz = msg_hdr_sz(hdr); - sz = hsz + dsz; - msg_set_size(hdr, sz); - if (unlikely(sz > max_size)) { - *buf = NULL; - return dsz; - } - - *buf = tipc_buf_acquire(sz); - if (!(*buf)) - return -ENOMEM; - skb_copy_to_linear_data(*buf, hdr, hsz); - to = (*buf)->data + hsz; - if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) { - kfree_skb(*buf); - *buf = NULL; - return -EFAULT; - } - return dsz; -} - /* tipc_buf_append(): Append a buffer to the fragment list of another buffer * @*headbuf: in: NULL for first frag, otherwise value returned from prev call * out: set when successful non-complete reassembly, otherwise NULL @@ -112,27 +72,38 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) struct sk_buff *head = *headbuf; struct sk_buff *frag = *buf; struct sk_buff *tail; - struct tipc_msg *msg = buf_msg(frag); - u32 fragid = msg_type(msg); - bool headstolen; + struct tipc_msg *msg; + u32 fragid; int delta; + bool headstolen; + if (!frag) + goto err; + + msg = buf_msg(frag); + fragid = msg_type(msg); + frag->next = NULL; skb_pull(frag, msg_hdr_sz(msg)); if (fragid == FIRST_FRAGMENT) { - if (head || skb_unclone(frag, GFP_ATOMIC)) - goto out_free; + if (unlikely(head)) + goto err; + if (unlikely(skb_unclone(frag, GFP_ATOMIC))) + goto err; head = *headbuf = frag; skb_frag_list_init(head); + TIPC_SKB_CB(head)->tail = NULL; *buf = NULL; return 0; } + if (!head) - goto out_free; - tail = TIPC_SKB_CB(head)->tail; + goto err; + if (skb_try_coalesce(head, frag, &headstolen, &delta)) { kfree_skb_partial(frag, headstolen); } else { + tail = TIPC_SKB_CB(head)->tail; if (!skb_has_frag_list(head)) skb_shinfo(head)->frag_list = frag; else @@ -142,6 +113,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) head->len += frag->len; TIPC_SKB_CB(head)->tail = frag; } + if (fragid == LAST_FRAGMENT) { *buf = head; TIPC_SKB_CB(head)->tail = NULL; @@ -150,10 +122,311 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) } *buf = NULL; return 0; -out_free: + +err: pr_warn_ratelimited("Unable to build fragment list\n"); kfree_skb(*buf); kfree_skb(*headbuf); *buf = *headbuf = NULL; return 0; } + + +/** + * tipc_msg_build - create buffer chain containing specified header and data + * @mhdr: Message header, to be prepended to data + * @iov: User data + * @offset: Posision in iov to start copying from + * @dsz: Total length of user data + * @pktmax: Max packet size that can be used + * @chain: Buffer or chain of buffers to be returned to caller + * Returns message data size or errno: -ENOMEM, -EFAULT + */ +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int pktmax , struct sk_buff **chain) +{ + int mhsz = msg_hdr_sz(mhdr); + int msz = mhsz + dsz; + int pktno = 1; + int pktsz; + int pktrem = pktmax; + int drem = dsz; + struct tipc_msg pkthdr; + struct sk_buff *buf, *prev; + char *pktpos; + int rc; + + msg_set_size(mhdr, msz); + + /* No fragmentation needed? */ + if (likely(msz <= pktmax)) { + buf = tipc_buf_acquire(msz); + *chain = buf; + if (unlikely(!buf)) + return -ENOMEM; + skb_copy_to_linear_data(buf, mhdr, mhsz); + pktpos = buf->data + mhsz; + if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) + return dsz; + rc = -EFAULT; + goto error; + } + + /* Prepare reusable fragment header */ + tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT, + INT_H_SIZE, msg_destnode(mhdr)); + msg_set_size(&pkthdr, pktmax); + msg_set_fragm_no(&pkthdr, pktno); + + /* Prepare first fragment */ + *chain = buf = tipc_buf_acquire(pktmax); + if (!buf) + return -ENOMEM; + pktpos = buf->data; + skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); + pktpos += INT_H_SIZE; + pktrem -= INT_H_SIZE; + skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); + pktpos += mhsz; + pktrem -= mhsz; + + do { + if (drem < pktrem) + pktrem = drem; + + if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) { + rc = -EFAULT; + goto error; + } + drem -= pktrem; + offset += pktrem; + + if (!drem) + break; + + /* Prepare new fragment: */ + if (drem < (pktmax - INT_H_SIZE)) + pktsz = drem + INT_H_SIZE; + else + pktsz = pktmax; + prev = buf; + buf = tipc_buf_acquire(pktsz); + if (!buf) { + rc = -ENOMEM; + goto error; + } + prev->next = buf; + msg_set_type(&pkthdr, FRAGMENT); + msg_set_size(&pkthdr, pktsz); + msg_set_fragm_no(&pkthdr, ++pktno); + skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); + pktpos = buf->data + INT_H_SIZE; + pktrem = pktsz - INT_H_SIZE; + + } while (1); + + msg_set_type(buf_msg(buf), LAST_FRAGMENT); + return dsz; +error: + kfree_skb_list(*chain); + *chain = NULL; + return rc; +} + +/** + * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one + * @bbuf: the existing buffer ("bundle") + * @buf: buffer to be appended + * @mtu: max allowable size for the bundle buffer + * Consumes buffer if successful + * Returns true if bundling could be performed, otherwise false + */ +bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) +{ + struct tipc_msg *bmsg = buf_msg(bbuf); + struct tipc_msg *msg = buf_msg(buf); + unsigned int bsz = msg_size(bmsg); + unsigned int msz = msg_size(msg); + u32 start = align(bsz); + u32 max = mtu - INT_H_SIZE; + u32 pad = start - bsz; + + if (likely(msg_user(msg) == MSG_FRAGMENTER)) + return false; + if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) + return false; + if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) + return false; + if (likely(msg_user(bmsg) != MSG_BUNDLER)) + return false; + if (likely(msg_type(bmsg) != BUNDLE_OPEN)) + return false; + if (unlikely(skb_tailroom(bbuf) < (pad + msz))) + return false; + if (unlikely(max < (start + msz))) + return false; + + skb_put(bbuf, pad + msz); + skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); + msg_set_size(bmsg, start + msz); + msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); + bbuf->next = buf->next; + kfree_skb(buf); + return true; +} + +/** + * tipc_msg_make_bundle(): Create bundle buf and append message to its tail + * @buf: buffer to be appended and replaced + * @mtu: max allowable size for the bundle buffer, inclusive header + * @dnode: destination node for message. (Not always present in header) + * Replaces buffer if successful + * Returns true if sucess, otherwise false + */ +bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) +{ + struct sk_buff *bbuf; + struct tipc_msg *bmsg; + struct tipc_msg *msg = buf_msg(*buf); + u32 msz = msg_size(msg); + u32 max = mtu - INT_H_SIZE; + + if (msg_user(msg) == MSG_FRAGMENTER) + return false; + if (msg_user(msg) == CHANGEOVER_PROTOCOL) + return false; + if (msg_user(msg) == BCAST_PROTOCOL) + return false; + if (msz > (max / 2)) + return false; + + bbuf = tipc_buf_acquire(max); + if (!bbuf) + return false; + + skb_trim(bbuf, INT_H_SIZE); + bmsg = buf_msg(bbuf); + tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); + msg_set_seqno(bmsg, msg_seqno(msg)); + msg_set_ack(bmsg, msg_ack(msg)); + msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); + bbuf->next = (*buf)->next; + tipc_msg_bundle(bbuf, *buf, mtu); + *buf = bbuf; + return true; +} + +/** + * tipc_msg_reverse(): swap source and destination addresses and add error code + * @buf: buffer containing message to be reversed + * @dnode: return value: node where to send message after reversal + * @err: error code to be set in message + * Consumes buffer if failure + * Returns true if success, otherwise false + */ +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err) +{ + struct tipc_msg *msg = buf_msg(buf); + uint imp = msg_importance(msg); + struct tipc_msg ohdr; + uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); + + if (skb_linearize(buf)) + goto exit; + if (msg_dest_droppable(msg)) + goto exit; + if (msg_errcode(msg)) + goto exit; + + memcpy(&ohdr, msg, msg_hdr_sz(msg)); + imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE); + if (msg_isdata(msg)) + msg_set_importance(msg, imp); + msg_set_errcode(msg, err); + msg_set_origport(msg, msg_destport(&ohdr)); + msg_set_destport(msg, msg_origport(&ohdr)); + msg_set_prevnode(msg, tipc_own_addr); + if (!msg_short(msg)) { + msg_set_orignode(msg, msg_destnode(&ohdr)); + msg_set_destnode(msg, msg_orignode(&ohdr)); + } + msg_set_size(msg, msg_hdr_sz(msg) + rdsz); + skb_trim(buf, msg_size(msg)); + skb_orphan(buf); + *dnode = msg_orignode(&ohdr); + return true; +exit: + kfree_skb(buf); + return false; +} + +/** + * tipc_msg_eval: determine fate of message that found no destination + * @buf: the buffer containing the message. + * @dnode: return value: next-hop node, if message to be forwarded + * @err: error code to use, if message to be rejected + * + * Does not consume buffer + * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error + * code if message to be rejected + */ +int tipc_msg_eval(struct sk_buff *buf, u32 *dnode) +{ + struct tipc_msg *msg = buf_msg(buf); + u32 dport; + + if (msg_type(msg) != TIPC_NAMED_MSG) + return -TIPC_ERR_NO_PORT; + if (skb_linearize(buf)) + return -TIPC_ERR_NO_NAME; + if (msg_data_sz(msg) > MAX_FORWARD_SIZE) + return -TIPC_ERR_NO_NAME; + if (msg_reroute_cnt(msg) > 0) + return -TIPC_ERR_NO_NAME; + + *dnode = addr_domain(msg_lookup_scope(msg)); + dport = tipc_nametbl_translate(msg_nametype(msg), + msg_nameinst(msg), + dnode); + if (!dport) + return -TIPC_ERR_NO_NAME; + msg_incr_reroute_cnt(msg); + msg_set_destnode(msg, *dnode); + msg_set_destport(msg, dport); + return TIPC_OK; +} + +/* tipc_msg_reassemble() - clone a buffer chain of fragments and + * reassemble the clones into one message + */ +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) +{ + struct sk_buff *buf = chain; + struct sk_buff *frag = buf; + struct sk_buff *head = NULL; + int hdr_sz; + + /* Copy header if single buffer */ + if (!buf->next) { + hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); + return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); + } + + /* Clone all fragments and reassemble */ + while (buf) { + frag = skb_clone(buf, GFP_ATOMIC); + if (!frag) + goto error; + frag->next = NULL; + if (tipc_buf_append(&head, &frag)) + break; + if (!head) + goto error; + buf = buf->next; + } + return frag; +error: + pr_warn("Failed do clone local mcast rcv buffer\n"); + kfree_skb(head); + return NULL; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 5035119..462fa19 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) #define FRAGMENT 1 #define LAST_FRAGMENT 2 +/* Bundling protocol message types + */ +#define BUNDLE_OPEN 0 +#define BUNDLE_CLOSED 1 + /* * Link management protocol message types */ @@ -706,12 +711,36 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } -u32 tipc_msg_tot_importance(struct tipc_msg *m); +static inline u32 tipc_msg_tot_importance(struct tipc_msg *m) +{ + if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) + return msg_importance(msg_get_wrapped(m)); + return msg_importance(m); +} + +static inline u32 msg_tot_origport(struct tipc_msg *m) +{ + if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) + return msg_origport(msg_get_wrapped(m)); + return msg_origport(m); +} + +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err); + +int tipc_msg_eval(struct sk_buff *buf, u32 *dnode); + void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); +bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); + +bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); + +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int mtu , struct sk_buff **chain); + +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); + #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 8ce7309..dcc15bc 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) void named_cluster_distribute(struct sk_buff *buf) { - struct sk_buff *buf_copy; - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; + struct sk_buff *obuf; + struct tipc_node *node; + u32 dnode; rcu_read_lock(); - list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[n_ptr->addr & 1]; - if (l_ptr) { - buf_copy = skb_copy(buf, GFP_ATOMIC); - if (!buf_copy) { - tipc_node_unlock(n_ptr); - break; - } - msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); - __tipc_link_xmit(l_ptr, buf_copy); - } - tipc_node_unlock(n_ptr); + list_for_each_entry_rcu(node, &tipc_node_list, list) { + dnode = node->addr; + if (in_own_node(dnode)) + continue; + if (!tipc_node_active_links(node)) + continue; + obuf = skb_copy(buf, GFP_ATOMIC); + if (!obuf) + break; + msg_set_destnode(buf_msg(obuf), dnode); + tipc_link_xmit(obuf, dnode, dnode); } rcu_read_unlock(); @@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) return buf; } -/* +/** * named_distribute - prepare name info for bulk distribution to another node + * @msg_list: list of messages (buffers) to be returned from this function + * @dnode: node to be updated + * @pls: linked list of publication items to be packed into buffer chain */ -static void named_distribute(struct list_head *message_list, u32 node, - struct publ_list *pls, u32 max_item_buf) +static void named_distribute(struct list_head *msg_list, u32 dnode, + struct publ_list *pls) { struct publication *publ; struct sk_buff *buf = NULL; struct distr_item *item = NULL; - u32 left = 0; - u32 rest = pls->size * ITEM_SIZE; + uint dsz = pls->size * ITEM_SIZE; + uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; + uint rem = dsz; + uint msg_rem = 0; list_for_each_entry(publ, &pls->list, local_list) { + /* Prepare next buffer: */ if (!buf) { - left = (rest <= max_item_buf) ? rest : max_item_buf; - rest -= left; - buf = named_prepare_buf(PUBLICATION, left, node); + msg_rem = min_t(uint, rem, msg_dsz); + rem -= msg_rem; + buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); if (!buf) { pr_warn("Bulk publication failure\n"); return; } item = (struct distr_item *)msg_data(buf_msg(buf)); } + + /* Pack publication into message: */ publ_to_item(item, publ); item++; - left -= ITEM_SIZE; - if (!left) { - list_add_tail((struct list_head *)buf, message_list); + msg_rem -= ITEM_SIZE; + + /* Append full buffer to list: */ + if (!msg_rem) { + list_add_tail((struct list_head *)buf, msg_list); buf = NULL; } } @@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node, /** * tipc_named_node_up - tell specified node about all publications by this node */ -void tipc_named_node_up(u32 max_item_buf, u32 node) +void tipc_named_node_up(u32 dnode) { - LIST_HEAD(message_list); + LIST_HEAD(msg_list); + struct sk_buff *buf_chain; read_lock_bh(&tipc_nametbl_lock); - named_distribute(&message_list, node, &publ_cluster, max_item_buf); - named_distribute(&message_list, node, &publ_zone, max_item_buf); + named_distribute(&msg_list, dnode, &publ_cluster); + named_distribute(&msg_list, dnode, &publ_zone); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_names_xmit(&message_list, node); + /* Convert circular list to linear list and send: */ + buf_chain = (struct sk_buff *)msg_list.next; + ((struct sk_buff *)msg_list.prev)->next = NULL; + tipc_link_xmit(buf_chain, dnode, dnode); } /** diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index b2eed4e..8afe32b 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -70,7 +70,7 @@ struct distr_item { struct sk_buff *tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ); void named_cluster_distribute(struct sk_buff *buf); -void tipc_named_node_up(u32 max_item_buf, u32 node); +void tipc_named_node_up(u32 dnode); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); diff --git a/net/tipc/net.c b/net/tipc/net.c index f64375e..7fcc949 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -1,7 +1,7 @@ /* * net/tipc/net.c: TIPC network routing code * - * Copyright (c) 1995-2006, Ericsson AB + * Copyright (c) 1995-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -104,67 +104,6 @@ * - A local spin_lock protecting the queue of subscriber events. */ -static void net_route_named_msg(struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - u32 dnode; - u32 dport; - - if (!msg_named(msg)) { - kfree_skb(buf); - return; - } - - dnode = addr_domain(msg_lookup_scope(msg)); - dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode); - if (dport) { - msg_set_destnode(msg, dnode); - msg_set_destport(msg, dport); - tipc_net_route_msg(buf); - return; - } - tipc_reject_msg(buf, TIPC_ERR_NO_NAME); -} - -void tipc_net_route_msg(struct sk_buff *buf) -{ - struct tipc_msg *msg; - u32 dnode; - - if (!buf) - return; - msg = buf_msg(buf); - - /* Handle message for this node */ - dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg); - if (tipc_in_scope(dnode, tipc_own_addr)) { - if (msg_isdata(msg)) { - if (msg_mcast(msg)) - tipc_port_mcast_rcv(buf, NULL); - else if (msg_destport(msg)) - tipc_sk_rcv(buf); - else - net_route_named_msg(buf); - return; - } - switch (msg_user(msg)) { - case NAME_DISTRIBUTOR: - tipc_named_rcv(buf); - break; - case CONN_MANAGER: - tipc_port_proto_rcv(buf); - break; - default: - kfree_skb(buf); - } - return; - } - - /* Handle message for another node */ - skb_trim(buf, msg_size(msg)); - tipc_link_xmit(buf, dnode, msg_link_selector(msg)); -} - int tipc_net_start(u32 addr) { char addr_string[16]; diff --git a/net/tipc/net.h b/net/tipc/net.h index c6c2b46..59ef338 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -37,8 +37,6 @@ #ifndef _TIPC_NET_H #define _TIPC_NET_H -void tipc_net_route_msg(struct sk_buff *buf); - int tipc_net_start(u32 addr); void tipc_net_stop(void); diff --git a/net/tipc/node.c b/net/tipc/node.c index 5b44c30..f706929 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@ /* * net/tipc/node.c: TIPC node management routines * - * Copyright (c) 2000-2006, 2012 Ericsson AB + * Copyright (c) 2000-2006, 2012-2014, Ericsson AB * Copyright (c) 2005-2006, 2010-2014, Wind River Systems * All rights reserved. * @@ -155,21 +155,25 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) if (!active[0]) { active[0] = active[1] = l_ptr; node_established_contact(n_ptr); - return; + goto exit; } if (l_ptr->priority < active[0]->priority) { pr_info("New link <%s> becomes standby\n", l_ptr->name); - return; + goto exit; } tipc_link_dup_queue_xmit(active[0], l_ptr); if (l_ptr->priority == active[0]->priority) { active[0] = l_ptr; - return; + goto exit; } pr_info("Old link <%s> becomes standby\n", active[0]->name); if (active[1] != active[0]) pr_info("Old link <%s> becomes standby\n", active[1]->name); active[0] = active[1] = l_ptr; +exit: + /* Leave room for changeover header when returning 'mtu' to users: */ + n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; } /** @@ -229,6 +233,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) tipc_link_failover_send_queue(l_ptr); else node_lost_contact(n_ptr); + + /* Leave room for changeover header when returning 'mtu' to users: */ + if (active[0]) { + n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; + return; + } + + /* Loopback link went down? No fragmentation needed from now on. */ + if (n_ptr->addr == tipc_own_addr) { + n_ptr->act_mtus[0] = MAX_MSG_SIZE; + n_ptr->act_mtus[1] = MAX_MSG_SIZE; + } } int tipc_node_active_links(struct tipc_node *n_ptr) @@ -457,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) void tipc_node_unlock(struct tipc_node *node) { LIST_HEAD(nsub_list); - struct tipc_link *link; - int pkt_sz = 0; u32 addr = 0; if (likely(!node->action_flags)) { @@ -471,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node) node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; } if (node->action_flags & TIPC_NOTIFY_NODE_UP) { - link = node->active_links[0]; node->action_flags &= ~TIPC_NOTIFY_NODE_UP; - if (link) { - pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) * - ITEM_SIZE; - addr = node->addr; - } + addr = node->addr; } spin_unlock_bh(&node->lock); if (!list_empty(&nsub_list)) tipc_nodesub_notify(&nsub_list); - if (pkt_sz) - tipc_named_node_up(pkt_sz, addr); + if (addr) + tipc_named_node_up(addr); } diff --git a/net/tipc/node.h b/net/tipc/node.h index 9087063..b61716a 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -41,6 +41,7 @@ #include "addr.h" #include "net.h" #include "bearer.h" +#include "msg.h" /* * Out-of-range value for node signature @@ -105,6 +106,7 @@ struct tipc_node { spinlock_t lock; struct hlist_node hash; struct tipc_link *active_links[2]; + u32 act_mtus[2]; struct tipc_link *links[MAX_BEARERS]; unsigned int action_flags; struct tipc_node_bclink bclink; @@ -143,4 +145,19 @@ static inline bool tipc_node_blocked(struct tipc_node *node) TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); } +static inline uint tipc_node_get_mtu(u32 addr, u32 selector) +{ + struct tipc_node *node; + u32 mtu; + + node = tipc_node_find(addr); + + if (likely(node)) + mtu = node->act_mtus[selector & 1]; + else + mtu = MAX_MSG_SIZE; + + return mtu; +} + #endif diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 7c59ab1..2d13eea 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) void tipc_nodesub_notify(struct list_head *nsub_list) { struct tipc_node_subscr *ns, *safe; + net_ev_handler handle_node_down; list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { - if (ns->handle_node_down) { - ns->handle_node_down(ns->usr_handle); + handle_node_down = ns->handle_node_down; + if (handle_node_down) { ns->handle_node_down = NULL; + handle_node_down(ns->usr_handle); } } } diff --git a/net/tipc/port.c b/net/tipc/port.c index 5fd7acc..7e096a5 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -42,8 +42,6 @@ /* Connection management: */ #define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ -#define CONFIRMED 0 -#define PROBING 1 #define MAX_REJECT_SIZE 1024 @@ -76,124 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) (!peernode && (orignode == tipc_own_addr)); } -/** - * tipc_port_mcast_xmit - send a multicast message to local and remote - * destinations - */ -int tipc_port_mcast_xmit(struct tipc_port *oport, - struct tipc_name_seq const *seq, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *hdr; - struct sk_buff *buf; - struct sk_buff *ibuf = NULL; - struct tipc_port_list dports = {0, NULL, }; - int ext_targets; - int res; - - /* Create multicast message */ - hdr = &oport->phdr; - msg_set_type(hdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(hdr, 0); - msg_set_destnode(hdr, 0); - msg_set_nametype(hdr, seq->type); - msg_set_namelower(hdr, seq->lower); - msg_set_nameupper(hdr, seq->upper); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); - res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (unlikely(!buf)) - return res; - - /* Figure out where to send multicast message */ - ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, - TIPC_NODE_SCOPE, &dports); - - /* Send message to destinations (duplicate it only if necessary) */ - if (ext_targets) { - if (dports.count != 0) { - ibuf = skb_copy(buf, GFP_ATOMIC); - if (ibuf == NULL) { - tipc_port_list_free(&dports); - kfree_skb(buf); - return -ENOMEM; - } - } - res = tipc_bclink_xmit(buf); - if ((res < 0) && (dports.count != 0)) - kfree_skb(ibuf); - } else { - ibuf = buf; - } - - if (res >= 0) { - if (ibuf) - tipc_port_mcast_rcv(ibuf, &dports); - } else { - tipc_port_list_free(&dports); - } - return res; -} - -/** - * tipc_port_mcast_rcv - deliver multicast message to all destination ports - * - * If there is no port list, perform a lookup to create one - */ -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) -{ - struct tipc_msg *msg; - struct tipc_port_list dports = {0, NULL, }; - struct tipc_port_list *item = dp; - int cnt = 0; - - msg = buf_msg(buf); - - /* Create destination port list, if one wasn't supplied */ - if (dp == NULL) { - tipc_nametbl_mc_translate(msg_nametype(msg), - msg_namelower(msg), - msg_nameupper(msg), - TIPC_CLUSTER_SCOPE, - &dports); - item = dp = &dports; - } - - /* Deliver a copy of message to each destination port */ - if (dp->count != 0) { - msg_set_destnode(msg, tipc_own_addr); - if (dp->count == 1) { - msg_set_destport(msg, dp->ports[0]); - tipc_sk_rcv(buf); - tipc_port_list_free(dp); - return; - } - for (; cnt < dp->count; cnt++) { - int index = cnt % PLSIZE; - struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); - - if (b == NULL) { - pr_warn("Unable to deliver multicast message(s)\n"); - goto exit; - } - if ((index == 0) && (cnt != 0)) - item = item->next; - msg_set_destport(buf_msg(b), item->ports[index]); - tipc_sk_rcv(b); - } - } -exit: - kfree_skb(buf); - tipc_port_list_free(dp); -} - - -void tipc_port_wakeup(struct tipc_port *port) -{ - tipc_sock_wakeup(tipc_port_to_sock(port)); -} - /* tipc_port_init - intiate TIPC port and lock it * * Returns obtained reference if initialization is successful, zero otherwise @@ -235,6 +115,8 @@ u32 tipc_port_init(struct tipc_port *p_ptr, void tipc_port_destroy(struct tipc_port *p_ptr) { struct sk_buff *buf = NULL; + struct tipc_msg *msg = NULL; + u32 peer; tipc_withdraw(p_ptr, 0, NULL); @@ -246,14 +128,15 @@ void tipc_port_destroy(struct tipc_port *p_ptr) if (p_ptr->connected) { buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); tipc_nodesub_unsubscribe(&p_ptr->subscription); + msg = buf_msg(buf); + peer = msg_destnode(msg); + tipc_link_xmit(buf, peer, msg_link_selector(msg)); } - spin_lock_bh(&tipc_port_list_lock); list_del(&p_ptr->port_list); list_del(&p_ptr->wait_list); spin_unlock_bh(&tipc_port_list_lock); k_term_timer(&p_ptr->timer); - tipc_net_route_msg(buf); } /* @@ -275,100 +158,16 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, msg_set_destport(msg, tipc_port_peerport(p_ptr)); msg_set_origport(msg, p_ptr->ref); msg_set_msgcnt(msg, ack); + buf->next = NULL; } return buf; } -int tipc_reject_msg(struct sk_buff *buf, u32 err) -{ - struct tipc_msg *msg = buf_msg(buf); - struct sk_buff *rbuf; - struct tipc_msg *rmsg; - int hdr_sz; - u32 imp; - u32 data_sz = msg_data_sz(msg); - u32 src_node; - u32 rmsg_sz; - - /* discard rejected message if it shouldn't be returned to sender */ - if (WARN(!msg_isdata(msg), - "attempt to reject message with user=%u", msg_user(msg))) { - dump_stack(); - goto exit; - } - if (msg_errcode(msg) || msg_dest_droppable(msg)) - goto exit; - - /* - * construct returned message by copying rejected message header and - * data (or subset), then updating header fields that need adjusting - */ - hdr_sz = msg_hdr_sz(msg); - rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE); - - rbuf = tipc_buf_acquire(rmsg_sz); - if (rbuf == NULL) - goto exit; - - rmsg = buf_msg(rbuf); - skb_copy_to_linear_data(rbuf, msg, rmsg_sz); - - if (msg_connected(rmsg)) { - imp = msg_importance(rmsg); - if (imp < TIPC_CRITICAL_IMPORTANCE) - msg_set_importance(rmsg, ++imp); - } - msg_set_non_seq(rmsg, 0); - msg_set_size(rmsg, rmsg_sz); - msg_set_errcode(rmsg, err); - msg_set_prevnode(rmsg, tipc_own_addr); - msg_swap_words(rmsg, 4, 5); - if (!msg_short(rmsg)) - msg_swap_words(rmsg, 6, 7); - - /* send self-abort message when rejecting on a connected port */ - if (msg_connected(msg)) { - struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); - - if (p_ptr) { - struct sk_buff *abuf = NULL; - - if (p_ptr->connected) - abuf = port_build_self_abort_msg(p_ptr, err); - tipc_port_unlock(p_ptr); - tipc_net_route_msg(abuf); - } - } - - /* send returned message & dispose of rejected message */ - src_node = msg_prevnode(msg); - if (in_own_node(src_node)) - tipc_sk_rcv(rbuf); - else - tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); -exit: - kfree_skb(buf); - return data_sz; -} - -int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr, - struct iovec const *msg_sect, unsigned int len, - int err) -{ - struct sk_buff *buf; - int res; - - res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (!buf) - return res; - - return tipc_reject_msg(buf, err); -} - static void port_timeout(unsigned long ref) { struct tipc_port *p_ptr = tipc_port_lock(ref); struct sk_buff *buf = NULL; + struct tipc_msg *msg = NULL; if (!p_ptr) return; @@ -379,15 +178,16 @@ static void port_timeout(unsigned long ref) } /* Last probe answered ? */ - if (p_ptr->probing_state == PROBING) { + if (p_ptr->probing_state == TIPC_CONN_PROBING) { buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); } else { buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); - p_ptr->probing_state = PROBING; + p_ptr->probing_state = TIPC_CONN_PROBING; k_start_timer(&p_ptr->timer, p_ptr->probing_interval); } tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); + msg = buf_msg(buf); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } @@ -395,12 +195,14 @@ static void port_handle_node_down(unsigned long ref) { struct tipc_port *p_ptr = tipc_port_lock(ref); struct sk_buff *buf = NULL; + struct tipc_msg *msg = NULL; if (!p_ptr) return; buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); + msg = buf_msg(buf); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } @@ -412,6 +214,7 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er struct tipc_msg *msg = buf_msg(buf); msg_swap_words(msg, 4, 5); msg_swap_words(msg, 6, 7); + buf->next = NULL; } return buf; } @@ -436,60 +239,11 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er if (imp < TIPC_CRITICAL_IMPORTANCE) msg_set_importance(msg, ++imp); msg_set_errcode(msg, err); + buf->next = NULL; } return buf; } -void tipc_port_proto_rcv(struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - struct tipc_port *p_ptr; - struct sk_buff *r_buf = NULL; - u32 destport = msg_destport(msg); - int wakeable; - - /* Validate connection */ - p_ptr = tipc_port_lock(destport); - if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) { - r_buf = tipc_buf_acquire(BASIC_H_SIZE); - if (r_buf) { - msg = buf_msg(r_buf); - tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG, - BASIC_H_SIZE, msg_orignode(msg)); - msg_set_errcode(msg, TIPC_ERR_NO_PORT); - msg_set_origport(msg, destport); - msg_set_destport(msg, msg_origport(msg)); - } - if (p_ptr) - tipc_port_unlock(p_ptr); - goto exit; - } - - /* Process protocol message sent by peer */ - switch (msg_type(msg)) { - case CONN_ACK: - wakeable = tipc_port_congested(p_ptr) && p_ptr->congested; - p_ptr->acked += msg_msgcnt(msg); - if (!tipc_port_congested(p_ptr)) { - p_ptr->congested = 0; - if (wakeable) - tipc_port_wakeup(p_ptr); - } - break; - case CONN_PROBE: - r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0); - break; - default: - /* CONN_PROBE_REPLY or unrecognized - no action required */ - break; - } - p_ptr->probing_state = CONFIRMED; - tipc_port_unlock(p_ptr); -exit: - tipc_net_route_msg(r_buf); - kfree_skb(buf); -} - static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id) { struct publication *publ; @@ -581,16 +335,19 @@ void tipc_acknowledge(u32 ref, u32 ack) { struct tipc_port *p_ptr; struct sk_buff *buf = NULL; + struct tipc_msg *msg; p_ptr = tipc_port_lock(ref); if (!p_ptr) return; - if (p_ptr->connected) { - p_ptr->conn_unacked -= ack; + if (p_ptr->connected) buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); - } + tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); + if (!buf) + return; + msg = buf_msg(buf); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, @@ -689,7 +446,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, msg_set_hdr_sz(msg, SHORT_H_SIZE); p_ptr->probing_interval = PROBING_INTERVAL; - p_ptr->probing_state = CONFIRMED; + p_ptr->probing_state = TIPC_CONN_OK; p_ptr->connected = 1; k_start_timer(&p_ptr->timer, p_ptr->probing_interval); @@ -698,7 +455,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, (net_ev_handler)port_handle_node_down); res = 0; exit: - p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); + p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref); return res; } @@ -741,6 +498,7 @@ int tipc_port_disconnect(u32 ref) */ int tipc_port_shutdown(u32 ref) { + struct tipc_msg *msg; struct tipc_port *p_ptr; struct sk_buff *buf = NULL; @@ -750,149 +508,7 @@ int tipc_port_shutdown(u32 ref) buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); + msg = buf_msg(buf); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); return tipc_port_disconnect(ref); } - -/* - * tipc_port_iovec_rcv: Concatenate and deliver sectioned - * message for this node. - */ -static int tipc_port_iovec_rcv(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len) -{ - struct sk_buff *buf; - int res; - - res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (likely(buf)) - tipc_sk_rcv(buf); - return res; -} - -/** - * tipc_send - send message sections on connection - */ -int tipc_send(struct tipc_port *p_ptr, - struct iovec const *msg_sect, - unsigned int len) -{ - u32 destnode; - int res; - - if (!p_ptr->connected) - return -EINVAL; - - p_ptr->congested = 1; - if (!tipc_port_congested(p_ptr)) { - destnode = tipc_port_peernode(p_ptr); - if (likely(!in_own_node(destnode))) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - destnode); - else - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - - if (likely(res != -ELINKCONG)) { - p_ptr->congested = 0; - if (res > 0) - p_ptr->sent++; - return res; - } - } - if (tipc_port_unreliable(p_ptr)) { - p_ptr->congested = 0; - return len; - } - return -ELINKCONG; -} - -/** - * tipc_send2name - send message sections to port name - */ -int tipc_send2name(struct tipc_port *p_ptr, - struct tipc_name const *name, - unsigned int domain, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *msg; - u32 destnode = domain; - u32 destport; - int res; - - if (p_ptr->connected) - return -EINVAL; - - msg = &p_ptr->phdr; - msg_set_type(msg, TIPC_NAMED_MSG); - msg_set_hdr_sz(msg, NAMED_H_SIZE); - msg_set_nametype(msg, name->type); - msg_set_nameinst(msg, name->instance); - msg_set_lookup_scope(msg, tipc_addr_scope(domain)); - destport = tipc_nametbl_translate(name->type, name->instance, &destnode); - msg_set_destnode(msg, destnode); - msg_set_destport(msg, destport); - - if (likely(destport || destnode)) { - if (likely(in_own_node(destnode))) - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - else if (tipc_own_addr) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - destnode); - else - res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, - len, TIPC_ERR_NO_NODE); - if (likely(res != -ELINKCONG)) { - if (res > 0) - p_ptr->sent++; - return res; - } - if (tipc_port_unreliable(p_ptr)) - return len; - - return -ELINKCONG; - } - return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, - TIPC_ERR_NO_NAME); -} - -/** - * tipc_send2port - send message sections to port identity - */ -int tipc_send2port(struct tipc_port *p_ptr, - struct tipc_portid const *dest, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *msg; - int res; - - if (p_ptr->connected) - return -EINVAL; - - msg = &p_ptr->phdr; - msg_set_type(msg, TIPC_DIRECT_MSG); - msg_set_lookup_scope(msg, 0); - msg_set_destnode(msg, dest->node); - msg_set_destport(msg, dest->ref); - msg_set_hdr_sz(msg, BASIC_H_SIZE); - - if (in_own_node(dest->node)) - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - else if (tipc_own_addr) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - dest->node); - else - res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, - TIPC_ERR_NO_NODE); - if (likely(res != -ELINKCONG)) { - if (res > 0) - p_ptr->sent++; - return res; - } - if (tipc_port_unreliable(p_ptr)) - return len; - - return -ELINKCONG; -} diff --git a/net/tipc/port.h b/net/tipc/port.h index cf4ca5b..3f93454 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -53,17 +53,13 @@ * @connected: non-zero if port is currently connected to a peer port * @conn_type: TIPC type used when connection was established * @conn_instance: TIPC instance used when connection was established - * @conn_unacked: number of unacknowledged messages received from peer port * @published: non-zero if port has one or more associated names - * @congested: non-zero if cannot send because of link or port congestion * @max_pkt: maximum packet size "hint" used when building messages sent by port * @ref: unique reference to port in TIPC object registry * @phdr: preformatted message header used when sending messages * @port_list: adjacent ports in TIPC's global list of ports * @wait_list: adjacent ports in list of ports waiting on link congestion * @waiting_pkts: - * @sent: # of non-empty messages sent by port - * @acked: # of non-empty message acknowledgements from connected port's peer * @publications: list of publications for port * @pub_count: total # of publications port has made during its lifetime * @probing_state: @@ -76,17 +72,13 @@ struct tipc_port { int connected; u32 conn_type; u32 conn_instance; - u32 conn_unacked; int published; - u32 congested; u32 max_pkt; u32 ref; struct tipc_msg phdr; struct list_head port_list; struct list_head wait_list; u32 waiting_pkts; - u32 sent; - u32 acked; struct list_head publications; u32 pub_count; u32 probing_state; @@ -104,8 +96,6 @@ struct tipc_port_list; u32 tipc_port_init(struct tipc_port *p_ptr, const unsigned int importance); -int tipc_reject_msg(struct sk_buff *buf, u32 err); - void tipc_acknowledge(u32 port_ref, u32 ack); void tipc_port_destroy(struct tipc_port *p_ptr); @@ -122,8 +112,6 @@ int tipc_port_disconnect(u32 portref); int tipc_port_shutdown(u32 ref); -void tipc_port_wakeup(struct tipc_port *port); - /* * The following routines require that the port be locked on entry */ @@ -132,39 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, struct tipc_portid const *peer); int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); -/* - * TIPC messaging routines - */ - -int tipc_send(struct tipc_port *port, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_send2name(struct tipc_port *port, - struct tipc_name const *name, - u32 domain, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_send2port(struct tipc_port *port, - struct tipc_portid const *dest, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_port_mcast_xmit(struct tipc_port *port, - struct tipc_name_seq const *seq, - struct iovec const *msg, - unsigned int len); - -int tipc_port_iovec_reject(struct tipc_port *p_ptr, - struct tipc_msg *hdr, - struct iovec const *msg_sect, - unsigned int len, - int err); - struct sk_buff *tipc_port_get_ports(void); -void tipc_port_proto_rcv(struct sk_buff *buf); -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp); void tipc_port_reinit(void); /** @@ -185,12 +141,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) spin_unlock_bh(p_ptr->lock); } -static inline int tipc_port_congested(struct tipc_port *p_ptr) -{ - return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN); -} - - static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) { return msg_destnode(&p_ptr->phdr); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ef04755..7d423ee 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -36,20 +36,23 @@ #include "core.h" #include "port.h" +#include "name_table.h" #include "node.h" - +#include "link.h" #include <linux/export.h> #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define TIPC_FWD_MSG 1 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -123,9 +126,12 @@ static void advance_rx_queue(struct sock *sk) static void reject_rx_queue(struct sock *sk) { struct sk_buff *buf; + u32 dnode; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); + } } /** @@ -201,6 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + tsk->sent_unacked = 0; atomic_set(&tsk->dupl_rcvcnt, 0); tipc_port_unlock(port); @@ -303,6 +310,7 @@ static int tipc_release(struct socket *sock) struct tipc_sock *tsk; struct tipc_port *port; struct sk_buff *buf; + u32 dnode; /* * Exit if socket isn't fully initialized (occurs when a failed accept() @@ -331,7 +339,8 @@ static int tipc_release(struct socket *sock) sock->state = SS_DISCONNECTING; tipc_port_disconnect(port->ref); } - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); } } @@ -504,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch ((int)sock->state) { case SS_UNCONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong) mask |= POLLOUT; break; case SS_READY: case SS_CONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong && !tipc_sk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case SS_CONNECTING: @@ -526,6 +535,136 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, } /** + * tipc_sendmcast - send multicast message + * @sock: socket structure + * @seq: destination address + * @iov: message data to send + * @dsz: total length of message data + * @timeo: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, + struct iovec *iov, size_t dsz, long timeo) +{ + struct sock *sk = sock->sk; + struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr; + struct sk_buff *buf; + uint mtu; + int rc; + + msg_set_type(mhdr, TIPC_MCAST_MSG); + msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(mhdr, 0); + msg_set_destnode(mhdr, 0); + msg_set_nametype(mhdr, seq->type); + msg_set_namelower(mhdr, seq->lower); + msg_set_nameupper(mhdr, seq->upper); + msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + +new_mtu: + mtu = tipc_bclink_get_mtu(); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (unlikely(rc < 0)) + return rc; + + do { + rc = tipc_bclink_xmit(buf); + if (likely(rc >= 0)) { + rc = dsz; + break; + } + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) + break; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); + return rc; +} + +/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets + */ +void tipc_sk_mcast_rcv(struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + struct tipc_port_list dports = {0, NULL, }; + struct tipc_port_list *item; + struct sk_buff *b; + uint i, last, dst = 0; + u32 scope = TIPC_CLUSTER_SCOPE; + + if (in_own_node(msg_orignode(msg))) + scope = TIPC_NODE_SCOPE; + + /* Create destination port list: */ + tipc_nametbl_mc_translate(msg_nametype(msg), + msg_namelower(msg), + msg_nameupper(msg), + scope, + &dports); + last = dports.count; + if (!last) { + kfree_skb(buf); + return; + } + + for (item = &dports; item; item = item->next) { + for (i = 0; i < PLSIZE && ++dst <= last; i++) { + b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; + if (!b) { + pr_warn("Failed do clone mcast rcv buffer\n"); + continue; + } + msg_set_destport(msg, item->ports[i]); + tipc_sk_rcv(b); + } + } + tipc_port_list_free(&dports); +} + +/** + * tipc_sk_proto_rcv - receive a connection mng protocol message + * @tsk: receiving socket + * @dnode: node to send response message to, if any + * @buf: buffer containing protocol message + * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if + * (CONN_PROBE_REPLY) message should be forwarded. + */ +static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, + struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + struct tipc_port *port = &tsk->port; + int conn_cong; + + /* Ignore if connection cannot be validated: */ + if (!port->connected || !tipc_port_peer_msg(port, msg)) + goto exit; + + port->probing_state = TIPC_CONN_OK; + + if (msg_type(msg) == CONN_ACK) { + conn_cong = tipc_sk_conn_cong(tsk); + tsk->sent_unacked -= msg_msgcnt(msg); + if (conn_cong) + tipc_sock_wakeup(tsk); + } else if (msg_type(msg) == CONN_PROBE) { + if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) + return TIPC_OK; + msg_set_type(msg, CONN_PROBE_REPLY); + return TIPC_FWD_MSG; + } + /* Do nothing if msg_type() == CONN_PROBE_REPLY */ +exit: + kfree_skb(buf); + return TIPC_OK; +} + +/** * dest_name_check - verify user is permitted to send to specified port name * @dest: destination address * @m: descriptor for message to be sent @@ -539,6 +678,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) { struct tipc_cfg_msg_hdr hdr; + if (unlikely(dest->addrtype == TIPC_ADDR_ID)) + return 0; if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES)) return 0; if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) @@ -575,19 +716,18 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) return sock_intr_errno(*timeo_p); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); - done = sk_wait_event(sk, timeo_p, !tsk->port.congested); + done = sk_wait_event(sk, timeo_p, !tsk->link_cong); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } - /** * tipc_sendmsg - send message in connectionless manner * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure * @m: message to send - * @total_len: length of message + * @dsz: amount of user data to be sent * * Message must have an destination specified explicitly. * Used for SOCK_RDM and SOCK_DGRAM messages, @@ -597,100 +737,123 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) * Returns the number of bytes sent on success, or errno otherwise */ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) + struct msghdr *m, size_t dsz) { + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_port *port = &tsk->port; - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int needs_conn; + struct tipc_msg *mhdr = &port->phdr; + struct iovec *iov = m->msg_iov; + u32 dnode, dport; + struct sk_buff *buf; + struct tipc_name_seq *seq = &dest->addr.nameseq; + u32 mtu; long timeo; - int res = -EINVAL; + int rc = -EINVAL; if (unlikely(!dest)) return -EDESTADDRREQ; + if (unlikely((m->msg_namelen < sizeof(*dest)) || (dest->family != AF_TIPC))) return -EINVAL; - if (total_len > TIPC_MAX_USER_MSG_SIZE) + + if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; if (iocb) lock_sock(sk); - needs_conn = (sock->state != SS_READY); - if (unlikely(needs_conn)) { + if (unlikely(sock->state != SS_READY)) { if (sock->state == SS_LISTENING) { - res = -EPIPE; + rc = -EPIPE; goto exit; } if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; + rc = -EISCONN; goto exit; } if (tsk->port.published) { - res = -EOPNOTSUPP; + rc = -EOPNOTSUPP; goto exit; } if (dest->addrtype == TIPC_ADDR_NAME) { tsk->port.conn_type = dest->addr.name.name.type; tsk->port.conn_instance = dest->addr.name.name.instance; } - - /* Abort any pending connection attempts (very unlikely) */ - reject_rx_queue(sk); } + rc = dest_name_check(dest, m); + if (rc) + goto exit; timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - do { - if (dest->addrtype == TIPC_ADDR_NAME) { - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_send2name(port, - &dest->addr.name.name, - dest->addr.name.domain, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_ID) { - res = tipc_send2port(port, - &dest->addr.id, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_MCAST) { - if (needs_conn) { - res = -EOPNOTSUPP; - break; - } - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_port_mcast_xmit(port, - &dest->addr.nameseq, - m->msg_iov, - total_len); + + if (dest->addrtype == TIPC_ADDR_MCAST) { + rc = tipc_sendmcast(sock, seq, iov, dsz, timeo); + goto exit; + } else if (dest->addrtype == TIPC_ADDR_NAME) { + u32 type = dest->addr.name.name.type; + u32 inst = dest->addr.name.name.instance; + u32 domain = dest->addr.name.domain; + + dnode = domain; + msg_set_type(mhdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(mhdr, NAMED_H_SIZE); + msg_set_nametype(mhdr, type); + msg_set_nameinst(mhdr, inst); + msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + dport = tipc_nametbl_translate(type, inst, &dnode); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dport); + if (unlikely(!dport && !dnode)) { + rc = -EHOSTUNREACH; + goto exit; } - if (likely(res != -ELINKCONG)) { - if (needs_conn && (res >= 0)) + } else if (dest->addrtype == TIPC_ADDR_ID) { + dnode = dest->addr.id.node; + msg_set_type(mhdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(mhdr, 0); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dest->addr.id.ref); + msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + } + +new_mtu: + mtu = tipc_node_get_mtu(dnode, tsk->port.ref); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (rc < 0) + goto exit; + + do { + rc = tipc_link_xmit(buf, dnode, tsk->port.ref); + if (likely(rc >= 0)) { + if (sock->state != SS_READY) sock->state = SS_CONNECTING; + rc = dsz; break; } - res = tipc_wait_for_sndmsg(sock, &timeo); - if (res) + if (rc == -EMSGSIZE) + goto new_mtu; + + if (rc != -ELINKCONG) break; - } while (1); + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + + return rc; } static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; DEFINE_WAIT(wait); int done; @@ -709,37 +872,49 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); done = sk_wait_event(sk, timeo_p, - (!port->congested || !port->connected)); + (!tsk->link_cong && + !tipc_sk_conn_cong(tsk)) || + !tsk->port.connected); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } /** - * tipc_send_packet - send a connection-oriented message - * @iocb: if NULL, indicates that socket lock is already held + * tipc_send_stream - send stream-oriented data + * @iocb: (unused) * @sock: socket structure - * @m: message to send - * @total_len: length of message + * @m: data to send + * @dsz: total length of data to be transmitted * - * Used for SOCK_SEQPACKET messages and SOCK_STREAM data. + * Used for SOCK_STREAM data. * - * Returns the number of bytes sent on success, or errno otherwise + * Returns the number of bytes sent on success (or partial success), + * or errno if no data sent */ -static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; + struct tipc_msg *mhdr = &port->phdr; + struct sk_buff *buf; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int res = -EINVAL; + u32 ref = port->ref; + int rc = -EINVAL; long timeo; + u32 dnode; + uint mtu, send, sent = 0; /* Handle implied connection establishment */ - if (unlikely(dest)) - return tipc_sendmsg(iocb, sock, m, total_len); - - if (total_len > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dest)) { + rc = tipc_sendmsg(iocb, sock, m, dsz); + if (dsz && (dsz == rc)) + tsk->sent_unacked = 1; + return rc; + } + if (dsz > (uint)INT_MAX) return -EMSGSIZE; if (iocb) @@ -747,123 +922,66 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, if (unlikely(sock->state != SS_CONNECTED)) { if (sock->state == SS_DISCONNECTING) - res = -EPIPE; + rc = -EPIPE; else - res = -ENOTCONN; + rc = -ENOTCONN; goto exit; } timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + dnode = tipc_port_peernode(port); + +next: + mtu = port->max_pkt; + send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); + if (unlikely(rc < 0)) + goto exit; do { - res = tipc_send(&tsk->port, m->msg_iov, total_len); - if (likely(res != -ELINKCONG)) - break; - res = tipc_wait_for_sndpkt(sock, &timeo); - if (res) - break; - } while (1); + if (likely(!tipc_sk_conn_cong(tsk))) { + rc = tipc_link_xmit(buf, dnode, ref); + if (likely(!rc)) { + tsk->sent_unacked++; + sent += send; + if (sent == dsz) + break; + goto next; + } + if (rc == -EMSGSIZE) { + port->max_pkt = tipc_node_get_mtu(dnode, ref); + goto next; + } + if (rc != -ELINKCONG) + break; + } + rc = tipc_wait_for_sndpkt(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + return sent ? sent : rc; } /** - * tipc_send_stream - send stream-oriented data - * @iocb: (unused) + * tipc_send_packet - send a connection-oriented message + * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure - * @m: data to send - * @total_len: total length of data to be sent + * @m: message to send + * @dsz: length of data to be transmitted * - * Used for SOCK_STREAM data. + * Used for SOCK_SEQPACKET messages. * - * Returns the number of bytes sent on success (or partial success), - * or errno if no data sent + * Returns the number of bytes sent on success, or errno otherwise */ -static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct msghdr my_msg; - struct iovec my_iov; - struct iovec *curr_iov; - int curr_iovlen; - char __user *curr_start; - u32 hdr_size; - int curr_left; - int bytes_to_send; - int bytes_sent; - int res; - - lock_sock(sk); - - /* Handle special cases where there is no connection */ - if (unlikely(sock->state != SS_CONNECTED)) { - if (sock->state == SS_UNCONNECTED) - res = tipc_send_packet(NULL, sock, m, total_len); - else - res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN; - goto exit; - } - - if (unlikely(m->msg_name)) { - res = -EISCONN; - goto exit; - } - - if (total_len > (unsigned int)INT_MAX) { - res = -EMSGSIZE; - goto exit; - } - - /* - * Send each iovec entry using one or more messages - * - * Note: This algorithm is good for the most likely case - * (i.e. one large iovec entry), but could be improved to pass sets - * of small iovec entries into send_packet(). - */ - curr_iov = m->msg_iov; - curr_iovlen = m->msg_iovlen; - my_msg.msg_iov = &my_iov; - my_msg.msg_iovlen = 1; - my_msg.msg_flags = m->msg_flags; - my_msg.msg_name = NULL; - bytes_sent = 0; - - hdr_size = msg_hdr_sz(&tsk->port.phdr); - - while (curr_iovlen--) { - curr_start = curr_iov->iov_base; - curr_left = curr_iov->iov_len; - - while (curr_left) { - bytes_to_send = tsk->port.max_pkt - hdr_size; - if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE) - bytes_to_send = TIPC_MAX_USER_MSG_SIZE; - if (curr_left < bytes_to_send) - bytes_to_send = curr_left; - my_iov.iov_base = curr_start; - my_iov.iov_len = bytes_to_send; - res = tipc_send_packet(NULL, sock, &my_msg, - bytes_to_send); - if (res < 0) { - if (bytes_sent) - res = bytes_sent; - goto exit; - } - curr_left -= bytes_to_send; - curr_start += bytes_to_send; - bytes_sent += bytes_to_send; - } + if (dsz > TIPC_MAX_USER_MSG_SIZE) + return -EMSGSIZE; - curr_iov++; - } - res = bytes_sent; -exit: - release_sock(sk); - return res; + return tipc_send_stream(iocb, sock, m, dsz); } /** @@ -1104,8 +1222,10 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); + (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_acknowledge(port->ref, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } advance_rx_queue(sk); } exit: @@ -1213,8 +1333,10 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); + if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_acknowledge(port->ref, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } advance_rx_queue(sk); } @@ -1269,17 +1391,16 @@ static void tipc_data_ready(struct sock *sk) * @tsk: TIPC socket * @msg: message * - * Returns TIPC error status code and socket error status code - * once it encounters some errors + * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise */ -static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) +static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) { struct sock *sk = &tsk->sk; struct tipc_port *port = &tsk->port; struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(*buf); - u32 retval = TIPC_ERR_NO_PORT; + int retval = -TIPC_ERR_NO_PORT; int res; if (msg_mcast(msg)) @@ -1382,32 +1503,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) * * Called with socket lock already taken; port lock may also be taken. * - * Returns TIPC error status code (TIPC_OK if message is not to be rejected) + * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message + * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded */ -static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) +static int filter_rcv(struct sock *sk, struct sk_buff *buf) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *msg = buf_msg(buf); unsigned int limit = rcvbuf_limit(sk, buf); - u32 res = TIPC_OK; + u32 onode; + int rc = TIPC_OK; + + if (unlikely(msg_user(msg) == CONN_MANAGER)) + return tipc_sk_proto_rcv(tsk, &onode, buf); /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; if (sock->state == SS_READY) { if (msg_connected(msg)) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; } else { - res = filter_connect(tsk, &buf); - if (res != TIPC_OK || buf == NULL) - return res; + rc = filter_connect(tsk, &buf); + if (rc != TIPC_OK || buf == NULL) + return rc; } /* Reject message if there isn't room to queue it */ if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) - return TIPC_ERR_OVERLOAD; + return -TIPC_ERR_OVERLOAD; /* Enqueue message */ TIPC_SKB_CB(buf)->handle = NULL; @@ -1429,16 +1555,23 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) */ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) { - u32 res; + int rc; + u32 onode; struct tipc_sock *tsk = tipc_sk(sk); uint truesize = buf->truesize; - res = filter_rcv(sk, buf); - if (unlikely(res)) - tipc_reject_msg(buf, res); + rc = filter_rcv(sk, buf); - if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) - atomic_add(truesize, &tsk->dupl_rcvcnt); + if (likely(!rc)) { + if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(truesize, &tsk->dupl_rcvcnt); + return 0; + } + + if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) + return 0; + + tipc_link_xmit(buf, onode, 0); return 0; } @@ -1455,19 +1588,14 @@ int tipc_sk_rcv(struct sk_buff *buf) struct tipc_port *port; struct sock *sk; u32 dport = msg_destport(buf_msg(buf)); - int err = TIPC_OK; + int rc = TIPC_OK; uint limit; + u32 dnode; - /* Forward unresolved named message */ - if (unlikely(!dport)) { - tipc_net_route_msg(buf); - return 0; - } - - /* Validate destination */ + /* Validate destination and message */ port = tipc_port_lock(dport); if (unlikely(!port)) { - err = TIPC_ERR_NO_PORT; + rc = tipc_msg_eval(buf, &dnode); goto exit; } @@ -1478,23 +1606,25 @@ int tipc_sk_rcv(struct sk_buff *buf) bh_lock_sock(sk); if (!sock_owned_by_user(sk)) { - err = filter_rcv(sk, buf); + rc = filter_rcv(sk, buf); } else { if (sk->sk_backlog.len == 0) atomic_set(&tsk->dupl_rcvcnt, 0); limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); if (sk_add_backlog(sk, buf, limit)) - err = TIPC_ERR_OVERLOAD; + rc = -TIPC_ERR_OVERLOAD; } - bh_unlock_sock(sk); tipc_port_unlock(port); - if (likely(!err)) + if (likely(!rc)) return 0; exit: - tipc_reject_msg(buf, err); - return -EHOSTUNREACH; + if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) + return -EHOSTUNREACH; + + tipc_link_xmit(buf, dnode, 0); + return (rc < 0) ? -EHOSTUNREACH : 0; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -1758,6 +1888,7 @@ static int tipc_shutdown(struct socket *sock, int how) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_port *port = &tsk->port; struct sk_buff *buf; + u32 peer; int res; if (how != SHUT_RDWR) @@ -1778,7 +1909,8 @@ restart: goto restart; } tipc_port_disconnect(port->ref); - tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); + if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN)) + tipc_link_xmit(buf, peer, 0); } else { tipc_port_shutdown(port->ref); } @@ -1936,7 +2068,7 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, return put_user(sizeof(value), ol); } -int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) +static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) { struct tipc_sioc_ln_req lnr; void __user *argp = (void __user *)arg; @@ -1952,7 +2084,6 @@ int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) return 0; } return -EADDRNOTAVAIL; - break; default: return -ENOIOCTLCMD; } diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 3afcd2a..43b75b3 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -38,6 +38,9 @@ #include "port.h" #include <net/sock.h> +#define TIPC_CONN_OK 0 +#define TIPC_CONN_PROBING 1 + /** * struct tipc_sock - TIPC socket structure * @sk: socket - interacts with 'port' and with user via the socket API @@ -45,6 +48,9 @@ * @peer_name: the peer of the connection, if any * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue + * @link_cong: non-zero if owner must sleep because of link congestion + * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @rcv_unacked: # messages read by user, but not yet acked back to peer */ struct tipc_sock { @@ -52,6 +58,9 @@ struct tipc_sock { struct tipc_port port; unsigned int conn_timeout; atomic_t dupl_rcvcnt; + int link_cong; + uint sent_unacked; + uint rcv_unacked; }; static inline struct tipc_sock *tipc_sk(const struct sock *sk) @@ -69,6 +78,13 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) tsk->sk.sk_write_space(&tsk->sk); } +static inline int tipc_sk_conn_cong(struct tipc_sock *tsk) +{ + return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; +} + int tipc_sk_rcv(struct sk_buff *buf); +void tipc_sk_mcast_rcv(struct sk_buff *buf); + #endif |