diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/tipc/Kconfig | 13 | ||||
-rw-r--r-- | net/tipc/bcast.c | 27 | ||||
-rw-r--r-- | net/tipc/bearer.c | 110 | ||||
-rw-r--r-- | net/tipc/bearer.h | 24 | ||||
-rw-r--r-- | net/tipc/core.c | 5 | ||||
-rw-r--r-- | net/tipc/discover.c | 2 | ||||
-rw-r--r-- | net/tipc/link.c | 188 | ||||
-rw-r--r-- | net/tipc/link.h | 4 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 2 | ||||
-rw-r--r-- | net/tipc/node.c | 15 | ||||
-rw-r--r-- | net/tipc/node.h | 6 | ||||
-rw-r--r-- | net/tipc/socket.c | 58 |
12 files changed, 185 insertions, 269 deletions
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig index 5854601..bc41bd3 100644 --- a/net/tipc/Kconfig +++ b/net/tipc/Kconfig @@ -20,18 +20,9 @@ menuconfig TIPC If in doubt, say N. -if TIPC - -config TIPC_ADVANCED - bool "Advanced TIPC configuration" - default n - help - Saying Y here will open some advanced configuration for TIPC. - Most users do not need to bother; if unsure, just say N. - config TIPC_PORTS int "Maximum number of ports in a node" - depends on TIPC_ADVANCED + depends on TIPC range 127 65535 default "8191" help @@ -40,5 +31,3 @@ config TIPC_PORTS Setting this to a smaller value saves some memory, setting it to higher allows for more ports. - -endif # TIPC diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e4e6d8c..54f89f9 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_lock(n_ptr); - if (n_ptr->bclink.supported && + if (n_ptr->bclink.recv_permitted && (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && (n_ptr->bclink.last_in == msg_bcgap_after(msg))) n_ptr->bclink.oos_state = 2; @@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) goto exit; tipc_node_lock(node); - if (unlikely(!node->bclink.supported)) + if (unlikely(!node->bclink.recv_permitted)) goto unlock; /* Handle broadcast protocol message */ @@ -564,7 +564,7 @@ exit: u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) { - return (n_ptr->bclink.supported && + return (n_ptr->bclink.recv_permitted && (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); } @@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, if (bcbearer->remains_new.count == bcbearer->remains.count) continue; /* bearer pair doesn't add anything */ - if (p->blocked || - p->media->send_msg(buf, p, &p->media->bcast_addr)) { + if (!tipc_bearer_blocked(p)) + tipc_bearer_send(p, buf, &p->media->bcast_addr); + else if (s && !tipc_bearer_blocked(s)) /* unable to send on primary bearer */ - if (!s || s->blocked || - s->media->send_msg(buf, s, - &s->media->bcast_addr)) { - /* unable to send on either bearer */ - continue; - } - } + tipc_bearer_send(s, buf, &s->media->bcast_addr); + else + /* unable to send on either bearer */ + continue; if (s) { bcbearer->bpairs[bp_index].primary = s; @@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) " TX naks:%u acks:%u dups:%u\n", s->sent_nacks, s->sent_acks, s->retransmitted); ret += tipc_snprintf(buf + ret, buf_size - ret, - " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", - s->bearer_congs, s->link_congs, s->max_queue_sz, + " Congestion link:%u Send queue max:%u avg:%u\n", + s->link_congs, s->max_queue_sz, s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); @@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit) void tipc_bclink_init(void) { - INIT_LIST_HEAD(&bcbearer->bearer.cong_links); bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 4ec5c80..aa62f93 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -279,116 +279,31 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest) } /* - * bearer_push(): Resolve bearer congestion. Force the waiting - * links to push out their unsent packets, one packet per link - * per iteration, until all packets are gone or congestion reoccurs. - * 'tipc_net_lock' is read_locked when this function is called - * bearer.lock must be taken before calling - * Returns binary true(1) ore false(0) - */ -static int bearer_push(struct tipc_bearer *b_ptr) -{ - u32 res = 0; - struct tipc_link *ln, *tln; - - if (b_ptr->blocked) - return 0; - - while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) { - list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) { - res = tipc_link_push_packet(ln); - if (res == PUSH_FAILED) - break; - if (res == PUSH_FINISHED) - list_move_tail(&ln->link_list, &b_ptr->links); - } - } - return list_empty(&b_ptr->cong_links); -} - -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr) -{ - spin_lock_bh(&b_ptr->lock); - bearer_push(b_ptr); - spin_unlock_bh(&b_ptr->lock); -} - - -/* - * Interrupt enabling new requests after bearer congestion or blocking: + * Interrupt enabling new requests after bearer blocking: * See bearer_send(). */ -void tipc_continue(struct tipc_bearer *b_ptr) +void tipc_continue(struct tipc_bearer *b) { - spin_lock_bh(&b_ptr->lock); - if (!list_empty(&b_ptr->cong_links)) - tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr); - b_ptr->blocked = 0; - spin_unlock_bh(&b_ptr->lock); + spin_lock_bh(&b->lock); + b->blocked = 0; + spin_unlock_bh(&b->lock); } /* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here - * bearer.lock is busy + * tipc_bearer_blocked - determines if bearer is currently blocked */ -static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr) +int tipc_bearer_blocked(struct tipc_bearer *b) { - list_move_tail(&l_ptr->link_list, &b_ptr->cong_links); -} - -/* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here, - * bearer.lock is free - */ -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ - spin_lock_bh(&b_ptr->lock); - tipc_bearer_schedule_unlocked(b_ptr, l_ptr); - spin_unlock_bh(&b_ptr->lock); -} - + int res; -/* - * tipc_bearer_resolve_congestion(): Check if there is bearer congestion, - * and if there is, try to resolve it before returning. - * 'tipc_net_lock' is read_locked when this function is called - */ -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr) -{ - int res = 1; + spin_lock_bh(&b->lock); + res = b->blocked; + spin_unlock_bh(&b->lock); - if (list_empty(&b_ptr->cong_links)) - return 1; - spin_lock_bh(&b_ptr->lock); - if (!bearer_push(b_ptr)) { - tipc_bearer_schedule_unlocked(b_ptr, l_ptr); - res = 0; - } - spin_unlock_bh(&b_ptr->lock); return res; } /** - * tipc_bearer_congested - determines if bearer is currently congested - */ -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ - if (unlikely(b_ptr->blocked)) - return 1; - if (likely(list_empty(&b_ptr->cong_links))) - return 0; - return !tipc_bearer_resolve_congestion(b_ptr, l_ptr); -} - -/** * tipc_enable_bearer - enable bearer with the given name */ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority) @@ -489,7 +404,6 @@ restart: b_ptr->net_plane = bearer_id + 'A'; b_ptr->active = 1; b_ptr->priority = priority; - INIT_LIST_HEAD(&b_ptr->cong_links); INIT_LIST_HEAD(&b_ptr->links); spin_lock_init(&b_ptr->lock); @@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name) pr_info("Blocking bearer <%s>\n", name); spin_lock_bh(&b_ptr->lock); b_ptr->blocked = 1; - list_splice_init(&b_ptr->cong_links, &b_ptr->links); list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { struct tipc_node *n_ptr = l_ptr->owner; @@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr) spin_lock_bh(&b_ptr->lock); b_ptr->blocked = 1; b_ptr->media->disable_bearer(b_ptr); - list_splice_init(&b_ptr->cong_links, &b_ptr->links); list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { tipc_link_delete(l_ptr); } diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dd4c2ab..39f1192 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -120,7 +120,6 @@ struct tipc_media { * @identity: array index of this bearer within TIPC bearer array * @link_req: ptr to (optional) structure making periodic link setup requests * @links: list of non-congested links associated with bearer - * @cong_links: list of congested links associated with bearer * @active: non-zero if bearer structure is represents a bearer * @net_plane: network plane ('A' through 'H') currently associated with bearer * @nodes: indicates which nodes in cluster can be reached through bearer @@ -143,7 +142,6 @@ struct tipc_bearer { u32 identity; struct tipc_link_req *link_req; struct list_head links; - struct list_head cong_links; int active; char net_plane; struct tipc_node_map nodes; @@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void); struct sk_buff *tipc_bearer_get_names(void); void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr); struct tipc_bearer *tipc_bearer_find(const char *name); struct tipc_bearer *tipc_bearer_find_interface(const char *if_name); struct tipc_media *tipc_media_find(const char *name); -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr); -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr); +int tipc_bearer_blocked(struct tipc_bearer *b_ptr); void tipc_bearer_stop(void); -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr); - /** * tipc_bearer_send- sends buffer to destination over bearer * - * Returns true (1) if successful, or false (0) if unable to send - * * IMPORTANT: * The media send routine must not alter the buffer being passed in * as it may be needed for later retransmission! - * - * If the media send routine returns a non-zero value (indicating that - * it was unable to send the buffer), it must: - * 1) mark the bearer as blocked, - * 2) call tipc_continue() once the bearer is able to send again. - * Media types that are unable to meet these two critera must ensure their - * send routine always returns success -- even if the buffer was not sent -- - * and let TIPC's link code deal with the undelivered message. */ -static inline int tipc_bearer_send(struct tipc_bearer *b_ptr, - struct sk_buff *buf, +static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, struct tipc_media_addr *dest) { - return !b_ptr->media->send_msg(buf, b_ptr, dest); + b->media->send_msg(buf, b, dest); } #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/core.c b/net/tipc/core.c index bfe8af8..fc05cec 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -42,11 +42,6 @@ #include <linux/module.h> -#ifndef CONFIG_TIPC_PORTS -#define CONFIG_TIPC_PORTS 8191 -#endif - - /* global variables used by multiple sub-systems within TIPC */ int tipc_random __read_mostly; diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 50eaa40..1074b95 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) { rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); if (rbuf) { - b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); + tipc_bearer_send(b_ptr, rbuf, &media_addr); kfree_skb(rbuf); } } diff --git a/net/tipc/link.c b/net/tipc/link.c index a79c755..87bf5aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@ /* * net/tipc/link.c: TIPC link code * - * Copyright (c) 1996-2007, Ericsson AB + * Copyright (c) 1996-2007, 2012, Ericsson AB * Copyright (c) 2004-2007, 2010-2011, Wind River Systems * All rights reserved. * @@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); static void link_start(struct tipc_link *l_ptr); static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_send_sync(struct tipc_link *l); +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); /* * Simple link routines @@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) link_activate(l_ptr); tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; + if (l_ptr->owner->working_links == 1) + tipc_link_send_sync(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: @@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) link_activate(l_ptr); tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; + if (l_ptr->owner->working_links == 1) + tipc_link_send_sync(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: @@ -872,17 +878,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) return link_send_long_buf(l_ptr, buf); /* Packet can be queued or sent. */ - if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && + if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) && !link_congested(l_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); - if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { - l_ptr->unacked_window = 0; - } else { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->next_out = buf; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->unacked_window = 0; return dsz; } /* Congestion: can message be bundled ? */ @@ -891,10 +892,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) /* Try adding message to an existing bundle */ if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { - tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); + link_bundle_buf(l_ptr, l_ptr->last_out, buf)) return dsz; - } /* Try creating a new bundle */ if (size <= max_packet * 2 / 3) { @@ -917,7 +916,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) if (!l_ptr->next_out) l_ptr->next_out = buf; link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); return dsz; } @@ -949,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) return res; } -/** +/* + * tipc_link_send_sync - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + * + * Called with node locked + */ +static void tipc_link_send_sync(struct tipc_link *l) +{ + struct sk_buff *buf; + struct tipc_msg *msg; + + buf = tipc_buf_acquire(INT_H_SIZE); + if (!buf) + return; + + msg = buf_msg(buf); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); + msg_set_last_bcast(msg, l->owner->bclink.acked); + link_add_chain_to_outqueue(l, buf, 0); + tipc_link_push_queue(l); +} + +/* + * tipc_link_recv_sync - synchronize broadcast link endpoints. + * Receive the sequence number where we should start receiving and + * acking broadcast packets from a newly added peer node, and open + * up for reception of such packets. + * + * Called with node locked + */ +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + + n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); + n->bclink.recv_permitted = true; + kfree_skb(buf); +} + +/* * tipc_link_send_names - send name table entries to new neighbor * * Send routine for bulk delivery of name table messages when contact @@ -1006,16 +1045,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, if (likely(!link_congested(l_ptr))) { if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { + if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); - if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, - &l_ptr->media_addr))) { - l_ptr->unacked_window = 0; - return res; - } - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->next_out = buf; + tipc_bearer_send(l_ptr->b_ptr, buf, + &l_ptr->media_addr); + l_ptr->unacked_window = 0; return res; } } else @@ -1106,7 +1140,7 @@ exit: /* Exit if link (or bearer) is congested */ if (link_congested(l_ptr) || - !list_empty(&l_ptr->b_ptr->cong_links)) { + tipc_bearer_blocked(l_ptr->b_ptr)) { res = link_schedule_port(l_ptr, sender->ref, res); goto exit; @@ -1329,15 +1363,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (r_q_size && buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->retransm_queue_head = mod(++r_q_head); - l_ptr->retransm_queue_size = --r_q_size; - l_ptr->stats.retransmitted++; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->retransm_queue_head = mod(++r_q_head); + l_ptr->retransm_queue_size = --r_q_size; + l_ptr->stats.retransmitted++; + return 0; } /* Send deferred protocol message, if any: */ @@ -1345,15 +1375,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->unacked_window = 0; - kfree_skb(buf); - l_ptr->proto_msg_queue = NULL; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->unacked_window = 0; + kfree_skb(buf); + l_ptr->proto_msg_queue = NULL; + return 0; } /* Send one deferred data message, if send window not full: */ @@ -1366,18 +1392,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); - l_ptr->next_out = buf->next; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + if (msg_user(msg) == MSG_BUNDLER) + msg_set_type(msg, CLOSED_MSG); + l_ptr->next_out = buf->next; + return 0; } } - return PUSH_FINISHED; + return 1; } /* @@ -1388,15 +1410,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr) { u32 res; - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) + if (tipc_bearer_blocked(l_ptr->b_ptr)) return; do { res = tipc_link_push_packet(l_ptr); } while (!res); - - if (res == PUSH_FAILED) - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); } static void link_reset_all(unsigned long addr) @@ -1454,9 +1473,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_addr_string_fill(addr_string, n_ptr->addr); pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Supportable: %d, Supported: %d, Acked: %u\n", - n_ptr->bclink.supportable, - n_ptr->bclink.supported, + pr_info("Reception permitted: %d, Acked: %u\n", + n_ptr->bclink.recv_permitted, n_ptr->bclink.acked); pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", n_ptr->bclink.last_in, @@ -1481,7 +1499,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { + if (tipc_bearer_blocked(l_ptr->b_ptr)) { if (l_ptr->retransm_queue_size == 0) { l_ptr->retransm_queue_head = msg_seqno(msg); l_ptr->retransm_queue_size = retransmits; @@ -1491,7 +1509,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, } return; } else { - /* Detect repeated retransmit failures on uncongested bearer */ + /* Detect repeated retransmit failures on unblocked bearer */ if (l_ptr->last_retransmitted == msg_seqno(msg)) { if (++l_ptr->stale_count > 100) { link_retransmit_failure(l_ptr, buf); @@ -1507,17 +1525,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - buf = buf->next; - retransmits--; - l_ptr->stats.retransmitted++; - } else { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->retransm_queue_head = buf_seqno(buf); - l_ptr->retransm_queue_size = retransmits; - return; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + buf = buf->next; + retransmits--; + l_ptr->stats.retransmitted++; } l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; @@ -1676,7 +1687,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ackd = msg_ack(msg); /* Release acked messages */ - if (n_ptr->bclink.supported) + if (n_ptr->bclink.recv_permitted) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); crs = l_ptr->first_out; @@ -1727,9 +1738,14 @@ deliver: tipc_link_recv_bundle(buf); continue; case NAME_DISTRIBUTOR: + n_ptr->bclink.recv_permitted = true; tipc_node_unlock(n_ptr); tipc_named_recv(buf); continue; + case BCAST_PROTOCOL: + tipc_link_recv_sync(n_ptr, buf); + tipc_node_unlock(n_ptr); + continue; case CONN_MANAGER: tipc_node_unlock(n_ptr); tipc_port_recv_proto_msg(buf); @@ -1772,16 +1788,19 @@ deliver: continue; } + /* Link is not in state WORKING_WORKING */ if (msg_user(msg) == LINK_PROTOCOL) { link_recv_proto_msg(l_ptr, buf); head = link_insert_deferred_queue(l_ptr, head); tipc_node_unlock(n_ptr); continue; } + + /* Traffic message. Conditionally activate link */ link_state_event(l_ptr, TRAFFIC_MSG_EVT); if (link_working_working(l_ptr)) { - /* Re-insert in front of queue */ + /* Re-insert buffer in front of queue */ buf->next = head; head = buf; tipc_node_unlock(n_ptr); @@ -1972,21 +1991,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - /* Defer message if bearer is already congested */ - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { - l_ptr->proto_msg_queue = buf; - return; - } - - /* Defer message if attempting to send results in bearer congestion */ - if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); + /* Defer message if bearer is already blocked */ + if (tipc_bearer_blocked(l_ptr->b_ptr)) { l_ptr->proto_msg_queue = buf; - l_ptr->stats.bearer_congs++; return; } - /* Discard message if it was sent successfully */ + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; kfree_skb(buf); } @@ -2057,7 +2068,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } else { l_ptr->max_pkt = l_ptr->max_pkt_target; } - l_ptr->owner->bclink.supportable = (max_pkt_info != 0); /* Synchronize broadcast link info, if not done previously */ if (!tipc_node_is_up(l_ptr->owner)) { @@ -2112,7 +2122,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } /* Protocol message before retransmits, reduce loss risk */ - if (l_ptr->owner->bclink.supported) + if (l_ptr->owner->bclink.recv_permitted) tipc_bclink_update_link_state(l_ptr->owner, msg_last_bcast(msg)); @@ -2937,8 +2947,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) s->sent_nacks, s->sent_acks, s->retransmitted); ret += tipc_snprintf(buf + ret, buf_size - ret, - " Congestion bearer:%u link:%u Send queue" - " max:%u avg:%u\n", s->bearer_congs, s->link_congs, + " Congestion link:%u Send queue" + " max:%u avg:%u\n", s->link_congs, s->max_queue_sz, s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); diff --git a/net/tipc/link.h b/net/tipc/link.h index 6e92112..c048ed1 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -40,9 +40,6 @@ #include "msg.h" #include "node.h" -#define PUSH_FAILED 1 -#define PUSH_FINISHED 2 - /* * Out-of-range value for link sequence numbers */ @@ -82,7 +79,6 @@ struct tipc_stats { u32 recv_fragmented; u32 recv_fragments; u32 link_congs; /* # port sends blocked by congestion */ - u32 bearer_congs; u32 deferred_recv; u32 duplicates; u32 max_queue_sz; /* send queue size high water mark */ diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 55d3928..e0d0805 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg) named_distribute(&message_list, node, &publ_zone, max_item_buf); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_send_names(&message_list, (u32)node); + tipc_link_send_names(&message_list, node); } /** diff --git a/net/tipc/node.c b/net/tipc/node.c index d21db20..48f39dd 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@ /* * net/tipc/node.c: TIPC node management routines * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2012 Ericsson AB * Copyright (c) 2005-2006, 2010-2011, Wind River Systems * All rights reserved. * @@ -263,12 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) static void node_established_contact(struct tipc_node *n_ptr) { tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); - - if (n_ptr->bclink.supportable) { - n_ptr->bclink.acked = tipc_bclink_get_last_sent(); - tipc_bclink_add_node(n_ptr->addr); - n_ptr->bclink.supported = 1; - } + n_ptr->bclink.oos_state = 0; + n_ptr->bclink.acked = tipc_bclink_get_last_sent(); + tipc_bclink_add_node(n_ptr->addr); } static void node_name_purge_complete(unsigned long node_addr) @@ -294,7 +291,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_addr_string_fill(addr_string, n_ptr->addr)); /* Flush broadcast link info associated with lost node */ - if (n_ptr->bclink.supported) { + if (n_ptr->bclink.recv_permitted) { while (n_ptr->bclink.deferred_head) { struct sk_buff *buf = n_ptr->bclink.deferred_head; n_ptr->bclink.deferred_head = buf->next; @@ -310,7 +307,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); - n_ptr->bclink.supported = 0; + n_ptr->bclink.recv_permitted = false; } /* Abort link changeover */ diff --git a/net/tipc/node.h b/net/tipc/node.h index cfcaf4d..3c189b3 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -67,8 +67,6 @@ * @permit_changeover: non-zero if node has redundant links to this system * @signature: node instance identifier * @bclink: broadcast-related info - * @supportable: non-zero if node supports TIPC b'cast link capability - * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node * @last_sent: sequence # of last b'cast message sent by node @@ -77,6 +75,7 @@ * @deferred_head: oldest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node * @defragm: list of partially reassembled b'cast message fragments from node + * @recv_permitted: true if node is allowed to receive b'cast messages */ struct tipc_node { u32 addr; @@ -92,8 +91,6 @@ struct tipc_node { int permit_changeover; u32 signature; struct { - u8 supportable; - u8 supported; u32 acked; u32 last_in; u32 last_sent; @@ -102,6 +99,7 @@ struct tipc_node { struct sk_buff *deferred_head; struct sk_buff *deferred_tail; struct sk_buff *defragm; + bool recv_permitted; } bclink; }; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index fd5f042..1a720c8 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -62,6 +62,8 @@ struct tipc_sock { static int backlog_rcv(struct sock *sk, struct sk_buff *skb); static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); static void wakeupdispatch(struct tipc_port *tport); +static void tipc_data_ready(struct sock *sk, int len); +static void tipc_write_space(struct sock *sk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, sock_init_data(sock, sk); sk->sk_backlog_rcv = backlog_rcv; sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; + sk->sk_data_ready = tipc_data_ready; + sk->sk_write_space = tipc_write_space; tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; @@ -408,7 +412,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr, * socket state flags set * ------------ --------- * unconnected no read flags - * no write flags + * POLLOUT if port is not congested * * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue * no write flags @@ -435,9 +439,13 @@ static unsigned int poll(struct file *file, struct socket *sock, struct sock *sk = sock->sk; u32 mask = 0; - poll_wait(file, sk_sleep(sk), wait); + sock_poll_wait(file, sk_sleep(sk), wait); switch ((int)sock->state) { + case SS_UNCONNECTED: + if (!tipc_sk_port(sk)->congested) + mask |= POLLOUT; + break; case SS_READY: case SS_CONNECTED: if (!tipc_sk_port(sk)->congested) @@ -1126,6 +1134,39 @@ exit: } /** + * tipc_write_space - wake up thread if port congestion is released + * @sk: socket + */ +static void tipc_write_space(struct sock *sk) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLOUT | + POLLWRNORM | POLLWRBAND); + rcu_read_unlock(); +} + +/** + * tipc_data_ready - wake up threads to indicate messages have been received + * @sk: socket + * @len: the length of messages + */ +static void tipc_data_ready(struct sock *sk, int len) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLIN | + POLLRDNORM | POLLRDBAND); + rcu_read_unlock(); +} + +/** * rx_queue_full - determine if receive queue can accept another message * @msg: message to be added to queue * @queue_size: current size of queue @@ -1222,8 +1263,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) tipc_disconnect_port(tipc_sk_port(sk)); } - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_data_ready(sk, 0); return TIPC_OK; } @@ -1290,8 +1330,7 @@ static void wakeupdispatch(struct tipc_port *tport) { struct sock *sk = (struct sock *)tport->usr_handle; - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_write_space(sk); } /** @@ -1556,10 +1595,11 @@ restart: case SS_DISCONNECTING: - /* Discard any unreceived messages; wake up sleeping tasks */ + /* Discard any unreceived messages */ discard_rx_queue(sk); - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + + /* Wake up anyone sleeping in poll */ + sk->sk_state_change(sk); res = 0; break; |