提交 5a50a927 编写于 作者: D David S. Miller

Merge branch 'tipc-next'

Ying Xue says:

====================
tipc: purge signal handler infrastructure

When we delay some actions to be executed in asynchronous contexts,
these usually add unnecessary code complexities, and make their
behaviours unpredictable and indeterministic. Moreover, as the signal
handler infrastructure is first stopped when tipc module is removed,
this may cause some potential risks for us. For instance, although
signal handler is already stopped, some tipc components still submit
signal requests to signal handler infrastructure, which may lead to
some resources not to be released or freed correctly.

So the series aims to convert all actions being performed in tasklet
context asynchronously with interface provided by signal handler
infrastructure to be executed synchronously, thereby deleting the
whole infrastructure of signal handler.
====================
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
......@@ -5,7 +5,7 @@
obj-$(CONFIG_TIPC) := tipc.o
tipc-y += addr.o bcast.o bearer.o config.o \
core.o handler.o link.o discover.o msg.o \
core.o link.o discover.o msg.o \
name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \
socket.o log.o eth_media.o server.o
......
......@@ -71,7 +71,7 @@ struct tipc_bcbearer_pair {
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bc_lock".
* prevented through use of the spinlock "bclink_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
......@@ -84,28 +84,27 @@ struct tipc_bcbearer {
/**
* struct tipc_bclink - link used for broadcast messages
* @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
* @flags: represent bclink states
* @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
struct tipc_bclink {
spinlock_t lock;
struct tipc_link link;
struct tipc_node node;
unsigned int flags;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};
static struct tipc_bcbearer bcast_bearer;
static struct tipc_bclink bcast_link;
static struct tipc_bcbearer *bcbearer = &bcast_bearer;
static struct tipc_bclink *bclink = &bcast_link;
static struct tipc_link *bcl = &bcast_link.link;
static DEFINE_SPINLOCK(bc_lock);
static struct tipc_bcbearer *bcbearer;
static struct tipc_bclink *bclink;
static struct tipc_link *bcl;
const char tipc_bclink_name[] = "broadcast-link";
......@@ -115,6 +114,35 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_bclink_lock(void)
{
spin_lock_bh(&bclink->lock);
}
static void tipc_bclink_unlock(void)
{
struct tipc_node *node = NULL;
if (likely(!bclink->flags)) {
spin_unlock_bh(&bclink->lock);
return;
}
if (bclink->flags & TIPC_BCLINK_RESET) {
bclink->flags &= ~TIPC_BCLINK_RESET;
node = tipc_bclink_retransmit_to();
}
spin_unlock_bh(&bclink->lock);
if (node)
tipc_link_reset_all(node);
}
void tipc_bclink_set_flags(unsigned int flags)
{
bclink->flags |= flags;
}
static u32 bcbuf_acks(struct sk_buff *buf)
{
return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
......@@ -132,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
void tipc_bclink_add_node(u32 addr)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_nmap_add(&bclink->bcast_nodes, addr);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}
void tipc_bclink_remove_node(u32 addr)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_nmap_remove(&bclink->bcast_nodes, addr);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}
static void bclink_set_last_sent(void)
......@@ -167,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bc_lock locked
* Called with bclink_lock locked
*/
struct tipc_node *tipc_bclink_retransmit_to(void)
{
......@@ -179,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
* Called with bc_lock locked
* Called with bclink_lock locked
*/
static void bclink_retransmit_pkt(u32 after, u32 to)
{
......@@ -196,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
* Node is locked, bc_lock unlocked.
* Node is locked, bclink_lock unlocked.
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
......@@ -204,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
struct sk_buff *next;
unsigned int released = 0;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
/* Bail out if tx queue is empty (no clean up is required) */
crs = bcl->first_out;
if (!crs)
......@@ -269,7 +296,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
if (unlikely(released && !list_empty(&bcl->waiting_ports)))
tipc_link_wakeup_ports(bcl, 0);
exit:
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}
/**
......@@ -322,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_bearer_send(MAX_BEARERS, buf, NULL);
bcl->stats.sent_nacks++;
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
kfree_skb(buf);
n_ptr->bclink.oos_state++;
......@@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
{
int res;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
if (!bclink->bcast_nodes.count) {
res = msg_data_sz(buf_msg(buf));
......@@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf)
bcl->stats.accu_queue_sz += bcl->out_queue_size;
}
exit:
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return res;
}
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bc_lock taken.
* Called with both sending node's lock and bclink_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
......@@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
if (msg_destnode(msg) == tipc_own_addr) {
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node);
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bcl->stats.recv_nacks++;
bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
} else {
tipc_node_unlock(node);
bclink_peek_nack(msg);
......@@ -462,20 +489,20 @@ void tipc_bclink_rcv(struct sk_buff *buf)
/* Deliver message to destination */
if (likely(msg_isdata(msg))) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_port_mcast_rcv(buf, NULL);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
......@@ -485,28 +512,28 @@ void tipc_bclink_rcv(struct sk_buff *buf)
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
if (ret == LINK_REASM_COMPLETE) {
bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
goto receive;
}
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_named_rcv(buf);
} else {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
kfree_skb(buf);
}
......@@ -552,14 +579,14 @@ void tipc_bclink_rcv(struct sk_buff *buf)
} else
deferred = 0;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++;
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
unlock:
tipc_node_unlock(node);
......@@ -663,7 +690,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
int b_index;
int pri;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
if (action)
tipc_nmap_add(nm_ptr, node);
......@@ -710,7 +737,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
bp_curr++;
}
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}
......@@ -722,7 +749,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
if (!bcl)
return 0;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
s = &bcl->stats;
......@@ -751,7 +778,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return ret;
}
......@@ -760,9 +787,9 @@ int tipc_bclink_reset_stats(void)
if (!bcl)
return -ENOPROTOOPT;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
memset(&bcl->stats, 0, sizeof(bcl->stats));
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return 0;
}
......@@ -773,18 +800,30 @@ int tipc_bclink_set_queue_limits(u32 limit)
if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
return -EINVAL;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_link_set_queue_limits(bcl, limit);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return 0;
}
void tipc_bclink_init(void)
int tipc_bclink_init(void)
{
bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
if (!bcbearer)
return -ENOMEM;
bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
if (!bclink) {
kfree(bcbearer);
return -ENOMEM;
}
bcl = &bclink->link;
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
spin_lock_init(&bclink->lock);
INIT_LIST_HEAD(&bcl->waiting_ports);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
......@@ -795,17 +834,19 @@ void tipc_bclink_init(void)
rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
return 0;
}
void tipc_bclink_stop(void)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_link_purge_queues(bcl);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
memset(bclink, 0, sizeof(*bclink));
memset(bcbearer, 0, sizeof(*bcbearer));
synchronize_net();
kfree(bcbearer);
kfree(bclink);
}
/**
......
......@@ -39,6 +39,7 @@
#define MAX_NODES 4096
#define WSIZE 32
#define TIPC_BCLINK_RESET 1
/**
* struct tipc_node_map - set of node identifiers
......@@ -81,8 +82,9 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port);
void tipc_port_list_free(struct tipc_port_list *pl_ptr);
void tipc_bclink_init(void);
int tipc_bclink_init(void);
void tipc_bclink_stop(void);
void tipc_bclink_set_flags(unsigned int flags);
void tipc_bclink_add_node(u32 addr);
void tipc_bclink_remove_node(u32 addr);
struct tipc_node *tipc_bclink_retransmit_to(void);
......
......@@ -177,8 +177,10 @@ static struct sk_buff *cfg_set_own_addr(void)
if (tipc_own_addr)
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change node address once assigned)");
tipc_net_start(addr);
return tipc_cfg_reply_none();
if (!tipc_net_start(addr))
return tipc_cfg_reply_none();
return tipc_cfg_reply_error_string("cannot change to network mode");
}
static struct sk_buff *cfg_set_max_ports(void)
......
......@@ -80,7 +80,6 @@ struct sk_buff *tipc_buf_acquire(u32 size)
*/
static void tipc_core_stop(void)
{
tipc_handler_stop();
tipc_net_stop();
tipc_bearer_cleanup();
tipc_netlink_stop();
......@@ -100,10 +99,6 @@ static int tipc_core_start(void)
get_random_bytes(&tipc_random, sizeof(tipc_random));
err = tipc_handler_start();
if (err)
goto out_handler;
err = tipc_ref_table_init(tipc_max_ports, tipc_random);
if (err)
goto out_reftbl;
......@@ -146,8 +141,6 @@ static int tipc_core_start(void)
out_nametbl:
tipc_ref_table_stop();
out_reftbl:
tipc_handler_stop();
out_handler:
return err;
}
......
......@@ -89,8 +89,6 @@ extern int tipc_random __read_mostly;
/*
* Routines available to privileged subsystems
*/
int tipc_handler_start(void);
void tipc_handler_stop(void);
int tipc_netlink_start(void);
void tipc_netlink_stop(void);
int tipc_socket_init(void);
......@@ -109,12 +107,10 @@ void tipc_unregister_sysctl(void);
#endif
/*
* TIPC timer and signal code
* TIPC timer code
*/
typedef void (*Handler) (unsigned long);
u32 tipc_k_signal(Handler routine, unsigned long argument);
/**
* k_init_timer - initialize a timer
* @timer: pointer to timer structure
......
/*
* net/tipc/handler.c: TIPC signal handling
*
* Copyright (c) 2000-2006, Ericsson AB
* Copyright (c) 2005, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "core.h"
struct queue_item {
struct list_head next_signal;
void (*handler) (unsigned long);
unsigned long data;
};
static struct kmem_cache *tipc_queue_item_cache;
static struct list_head signal_queue_head;
static DEFINE_SPINLOCK(qitem_lock);
static int handler_enabled __read_mostly;
static void process_signal_queue(unsigned long dummy);
static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0);
unsigned int tipc_k_signal(Handler routine, unsigned long argument)
{
struct queue_item *item;
spin_lock_bh(&qitem_lock);
if (!handler_enabled) {
spin_unlock_bh(&qitem_lock);
return -ENOPROTOOPT;
}
item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC);
if (!item) {
pr_err("Signal queue out of memory\n");
spin_unlock_bh(&qitem_lock);
return -ENOMEM;
}
item->handler = routine;
item->data = argument;
list_add_tail(&item->next_signal, &signal_queue_head);
spin_unlock_bh(&qitem_lock);
tasklet_schedule(&tipc_tasklet);
return 0;
}
static void process_signal_queue(unsigned long dummy)
{
struct queue_item *__volatile__ item;
struct list_head *l, *n;
spin_lock_bh(&qitem_lock);
list_for_each_safe(l, n, &signal_queue_head) {
item = list_entry(l, struct queue_item, next_signal);
list_del(&item->next_signal);
spin_unlock_bh(&qitem_lock);
item->handler(item->data);
spin_lock_bh(&qitem_lock);
kmem_cache_free(tipc_queue_item_cache, item);
}
spin_unlock_bh(&qitem_lock);
}
int tipc_handler_start(void)
{
tipc_queue_item_cache =
kmem_cache_create("tipc_queue_items", sizeof(struct queue_item),
0, SLAB_HWCACHE_ALIGN, NULL);
if (!tipc_queue_item_cache)
return -ENOMEM;
INIT_LIST_HEAD(&signal_queue_head);
tasklet_enable(&tipc_tasklet);
handler_enabled = 1;
return 0;
}
void tipc_handler_stop(void)
{
struct list_head *l, *n;
struct queue_item *item;
spin_lock_bh(&qitem_lock);
if (!handler_enabled) {
spin_unlock_bh(&qitem_lock);
return;
}
handler_enabled = 0;
spin_unlock_bh(&qitem_lock);
tasklet_kill(&tipc_tasklet);
spin_lock_bh(&qitem_lock);
list_for_each_safe(l, n, &signal_queue_head) {
item = list_entry(l, struct queue_item, next_signal);
list_del(&item->next_signal);
kmem_cache_free(tipc_queue_item_cache, item);
}
spin_unlock_bh(&qitem_lock);
kmem_cache_destroy(tipc_queue_item_cache);
}
......@@ -297,14 +297,14 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
spin_lock_bh(&n_ptr->lock);
tipc_node_lock(n_ptr);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr) {
tipc_link_reset(l_ptr);
if (shutting_down || !tipc_node_is_up(n_ptr)) {
tipc_node_detach_link(l_ptr->owner, l_ptr);
tipc_link_reset_fragments(l_ptr);
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
/* Nobody else can access this link now: */
del_timer_sync(&l_ptr->timer);
......@@ -312,12 +312,12 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
} else {
/* Detach/delete when failover is finished: */
l_ptr->flags |= LINK_STOPPED;
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
del_timer_sync(&l_ptr->timer);
}
continue;
}
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
}
rcu_read_unlock();
}
......@@ -474,11 +474,11 @@ void tipc_link_reset_list(unsigned int bearer_id)
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
spin_lock_bh(&n_ptr->lock);
tipc_node_lock(n_ptr);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr)
tipc_link_reset(l_ptr);
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
}
rcu_read_unlock();
}
......@@ -1259,29 +1259,24 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
} while (!res);
}
static void link_reset_all(unsigned long addr)
void tipc_link_reset_all(struct tipc_node *node)
{
struct tipc_node *n_ptr;
char addr_string[16];
u32 i;
n_ptr = tipc_node_find((u32)addr);
if (!n_ptr)
return; /* node no longer exists */
tipc_node_lock(n_ptr);
tipc_node_lock(node);
pr_warn("Resetting all links to %s\n",
tipc_addr_string_fill(addr_string, n_ptr->addr));
tipc_addr_string_fill(addr_string, node->addr));
for (i = 0; i < MAX_BEARERS; i++) {
if (n_ptr->links[i]) {
link_print(n_ptr->links[i], "Resetting link\n");
tipc_link_reset(n_ptr->links[i]);
if (node->links[i]) {
link_print(node->links[i], "Resetting link\n");
tipc_link_reset(node->links[i]);
}
}
tipc_node_unlock(n_ptr);
tipc_node_unlock(node);
}
static void link_retransmit_failure(struct tipc_link *l_ptr,
......@@ -1318,10 +1313,9 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
n_ptr->bclink.oos_state,
n_ptr->bclink.last_sent);
tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
tipc_node_unlock(n_ptr);
tipc_bclink_set_flags(TIPC_BCLINK_RESET);
l_ptr->stale_count = 0;
}
}
......@@ -1495,14 +1489,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
goto unlock_discard;
/* Verify that communication with node is currently allowed */
if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
msg_user(msg) == LINK_PROTOCOL &&
(msg_type(msg) == RESET_MSG ||
msg_type(msg) == ACTIVATE_MSG) &&
!msg_redundant_link(msg))
n_ptr->block_setup &= ~WAIT_PEER_DOWN;
if (n_ptr->block_setup)
if ((n_ptr->flags & TIPC_NODE_DOWN) &&
msg_user(msg) == LINK_PROTOCOL &&
(msg_type(msg) == RESET_MSG ||
msg_type(msg) == ACTIVATE_MSG) &&
!msg_redundant_link(msg))
n_ptr->flags &= ~TIPC_NODE_DOWN;
if (tipc_node_blocked(n_ptr))
goto unlock_discard;
/* Validate message sequence number info */
......@@ -1744,7 +1738,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
return;
/* Abort non-RESET send if communication with node is prohibited */
if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
return;
/* Create protocol message with "out-of-sequence" sequence number */
......@@ -1859,7 +1853,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
* peer has lost contact -- don't allow peer's links
* to reactivate before we recognize loss & clean up
*/
l_ptr->owner->block_setup = WAIT_NODE_DOWN;
l_ptr->owner->flags = TIPC_NODE_RESET;
}
link_state_event(l_ptr, RESET_MSG);
......
......@@ -230,6 +230,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,
int req_tlv_space);
struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
int req_tlv_space);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
......
......@@ -38,34 +38,6 @@
#include "link.h"
#include "name_distr.h"
#define ITEM_SIZE sizeof(struct distr_item)
/**
* struct distr_item - publication info distributed to other nodes
* @type: name sequence type
* @lower: name sequence lower bound
* @upper: name sequence upper bound
* @ref: publishing port reference
* @key: publication key
*
* ===> All fields are stored in network byte order. <===
*
* First 3 fields identify (name or) name sequence being published.
* Reference field uniquely identifies port that published name sequence.
* Key field uniquely identifies publication, in the event a port has
* multiple publications of the same name sequence.
*
* Note: There is no field that identifies the publishing node because it is
* the same for all items contained within a publication message.
*/
struct distr_item {
__be32 type;
__be32 lower;
__be32 upper;
__be32 ref;
__be32 key;
};
/**
* struct publ_list - list of publications made by this node
* @list: circular list of publications
......@@ -135,18 +107,18 @@ void named_cluster_distribute(struct sk_buff *buf)
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
spin_lock_bh(&n_ptr->lock);
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[n_ptr->addr & 1];
if (l_ptr) {
buf_copy = skb_copy(buf, GFP_ATOMIC);
if (!buf_copy) {
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
break;
}
msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
__tipc_link_xmit(l_ptr, buf_copy);
}
spin_unlock_bh(&n_ptr->lock);
tipc_node_unlock(n_ptr);
}
rcu_read_unlock();
......@@ -239,29 +211,9 @@ static void named_distribute(struct list_head *message_list, u32 node,
/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
void tipc_named_node_up(unsigned long nodearg)
void tipc_named_node_up(u32 max_item_buf, u32 node)
{
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
struct list_head message_list;
u32 node = (u32)nodearg;
u32 max_item_buf = 0;
/* compute maximum amount of publication data to send per message */
n_ptr = tipc_node_find(node);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[0];
if (l_ptr)
max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
ITEM_SIZE) * ITEM_SIZE;
tipc_node_unlock(n_ptr);
}
if (!max_item_buf)
return;
/* create list of publication messages, then send them as a unit */
INIT_LIST_HEAD(&message_list);
LIST_HEAD(message_list);
read_lock_bh(&tipc_nametbl_lock);
named_distribute(&message_list, node, &publ_cluster, max_item_buf);
......
......@@ -39,10 +39,38 @@
#include "name_table.h"
#define ITEM_SIZE sizeof(struct distr_item)
/**
* struct distr_item - publication info distributed to other nodes
* @type: name sequence type
* @lower: name sequence lower bound
* @upper: name sequence upper bound
* @ref: publishing port reference
* @key: publication key
*
* ===> All fields are stored in network byte order. <===
*
* First 3 fields identify (name or) name sequence being published.
* Reference field uniquely identifies port that published name sequence.
* Key field uniquely identifies publication, in the event a port has
* multiple publications of the same name sequence.
*
* Note: There is no field that identifies the publishing node because it is
* the same for all items contained within a publication message.
*/
struct distr_item {
__be32 type;
__be32 lower;
__be32 upper;
__be32 ref;
__be32 key;
};
struct sk_buff *tipc_named_publish(struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct publication *publ);
void named_cluster_distribute(struct sk_buff *buf);
void tipc_named_node_up(unsigned long node);
void tipc_named_node_up(u32 max_item_buf, u32 node);
void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void);
......
......@@ -164,20 +164,25 @@ void tipc_net_route_msg(struct sk_buff *buf)
tipc_link_xmit(buf, dnode, msg_link_selector(msg));
}
void tipc_net_start(u32 addr)
int tipc_net_start(u32 addr)
{
char addr_string[16];
int res;
tipc_own_addr = addr;
tipc_named_reinit();
tipc_port_reinit();
tipc_bclink_init();
res = tipc_bclink_init();
if (res)
return res;
tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr,
TIPC_ZONE_SCOPE, 0, tipc_own_addr);
pr_info("Started in network mode\n");
pr_info("Own node address %s, network identity %u\n",
tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id);
return 0;
}
void tipc_net_stop(void)
......
......@@ -39,7 +39,7 @@
void tipc_net_route_msg(struct sk_buff *buf);
void tipc_net_start(u32 addr);
int tipc_net_start(u32 addr);
void tipc_net_stop(void);
#endif
......@@ -108,7 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr)
break;
}
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
n_ptr->block_setup = WAIT_PEER_DOWN;
n_ptr->flags = TIPC_NODE_DOWN;
n_ptr->signature = INVALID_NODE_SIG;
tipc_num_nodes++;
......@@ -267,24 +267,12 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
static void node_established_contact(struct tipc_node *n_ptr)
{
tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
n_ptr->flags |= TIPC_NODE_UP;
n_ptr->bclink.oos_state = 0;
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
}
static void node_name_purge_complete(unsigned long node_addr)
{
struct tipc_node *n_ptr;
n_ptr = tipc_node_find(node_addr);
if (n_ptr) {
tipc_node_lock(n_ptr);
n_ptr->block_setup &= ~WAIT_NAMES_GONE;
tipc_node_unlock(n_ptr);
}
}
static void node_lost_contact(struct tipc_node *n_ptr)
{
char addr_string[16];
......@@ -320,12 +308,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_link_reset_fragments(l_ptr);
}
/* Notify subscribers */
tipc_nodesub_notify(n_ptr);
/* Prevent re-contact with node until cleanup is done */
n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE;
tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);
/* Notify subscribers and prevent re-contact with node until
* cleanup is done.
*/
n_ptr->flags = TIPC_NODE_DOWN | TIPC_NODE_LOST;
}
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
......@@ -465,3 +451,36 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
tipc_node_unlock(node);
return -EINVAL;
}
void tipc_node_unlock(struct tipc_node *node)
{
LIST_HEAD(nsub_list);
struct tipc_link *link;
int pkt_sz = 0;
u32 addr = 0;
if (likely(!node->flags)) {
spin_unlock_bh(&node->lock);
return;
}
if (node->flags & TIPC_NODE_LOST) {
list_replace_init(&node->nsub, &nsub_list);
node->flags &= ~TIPC_NODE_LOST;
}
if (node->flags & TIPC_NODE_UP) {
link = node->active_links[0];
node->flags &= ~TIPC_NODE_UP;
if (link) {
pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
ITEM_SIZE;
addr = node->addr;
}
}
spin_unlock_bh(&node->lock);
if (!list_empty(&nsub_list))
tipc_nodesub_notify(&nsub_list);
if (pkt_sz)
tipc_named_node_up(pkt_sz, addr);
}
......@@ -47,62 +47,78 @@
*/
#define INVALID_NODE_SIG 0x10000
/* Flags used to block (re)establishment of contact with a neighboring node */
#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */
#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */
#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */
/* Flags used to block (re)establishment of contact with a neighboring node
* TIPC_NODE_DOWN: indicate node is down and it's used to block the node's
* links until RESET or ACTIVE message arrives
* TIPC_NODE_RESET: indicate node is reset
* TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions
* when node lock is released
* TIPC_NODE_UP: indicate node is up and it's used to deliver local name table
* when node lock is released
*/
enum {
TIPC_NODE_DOWN = (1 << 1),
TIPC_NODE_RESET = (1 << 2),
TIPC_NODE_LOST = (1 << 3),
TIPC_NODE_UP = (1 << 4)
};
/**
* struct tipc_node_bclink - TIPC node bclink structure
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
* @last_sent: sequence # of last b'cast message sent by node
* @oos_state: state tracker for handling OOS b'cast messages
* @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @reasm_head: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node_bclink {
u32 acked;
u32 last_in;
u32 last_sent;
u32 oos_state;
u32 deferred_size;
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
bool recv_permitted;
};
/**
* struct tipc_node - TIPC node structure
* @addr: network address of node
* @lock: spinlock governing access to structure
* @hash: links to adjacent nodes in unsorted hash chain
* @list: links to adjacent nodes in sorted list of cluster's nodes
* @nsub: list of "node down" subscriptions monitoring node
* @active_links: pointers to active links to node
* @links: pointers to all links to node
* @flags: bit mask of conditions preventing link establishment to node
* @bclink: broadcast-related info
* @list: links to adjacent nodes in sorted list of cluster's nodes
* @working_links: number of working links to node (both active and standby)
* @block_setup: bit mask of conditions preventing link establishment to node
* @link_cnt: number of links to node
* @signature: node instance identifier
* @bclink: broadcast-related info
* @nsub: list of "node down" subscriptions monitoring node
* @rcu: rcu struct for tipc_node
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
* @last_sent: sequence # of last b'cast message sent by node
* @oos_state: state tracker for handling OOS b'cast messages
* @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @reasm_head: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
u32 addr;
spinlock_t lock;
struct hlist_node hash;
struct list_head list;
struct list_head nsub;
struct tipc_link *active_links[2];
struct tipc_link *links[MAX_BEARERS];
unsigned int flags;
struct tipc_node_bclink bclink;
struct list_head list;
int link_cnt;
int working_links;
int block_setup;
u32 signature;
struct list_head nsub;
struct rcu_head rcu;
struct {
u32 acked;
u32 last_in;
u32 last_sent;
u32 oos_state;
u32 deferred_size;
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
bool recv_permitted;
} bclink;
};
extern struct list_head tipc_node_list;
......@@ -119,15 +135,17 @@ int tipc_node_is_up(struct tipc_node *n_ptr);
struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space);
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node);
static inline void tipc_node_lock(struct tipc_node *n_ptr)
static inline void tipc_node_lock(struct tipc_node *node)
{
spin_lock_bh(&n_ptr->lock);
spin_lock_bh(&node->lock);
}
static inline void tipc_node_unlock(struct tipc_node *n_ptr)
static inline bool tipc_node_blocked(struct tipc_node *node)
{
spin_unlock_bh(&n_ptr->lock);
return (node->flags & (TIPC_NODE_DOWN | TIPC_NODE_LOST |
TIPC_NODE_RESET));
}
#endif
......@@ -81,14 +81,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
*
* Note: node is locked by caller
*/
void tipc_nodesub_notify(struct tipc_node *node)
void tipc_nodesub_notify(struct list_head *nsub_list)
{
struct tipc_node_subscr *ns;
struct tipc_node_subscr *ns, *safe;
list_for_each_entry(ns, &node->nsub, nodesub_list) {
list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
if (ns->handle_node_down) {
tipc_k_signal((Handler)ns->handle_node_down,
(unsigned long)ns->usr_handle);
ns->handle_node_down(ns->usr_handle);
ns->handle_node_down = NULL;
}
}
......
......@@ -58,6 +58,6 @@ struct tipc_node_subscr {
void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
void *usr_handle, net_ev_handler handle_down);
void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
void tipc_nodesub_notify(struct tipc_node *node);
void tipc_nodesub_notify(struct list_head *nsub_list);
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册