提交 5aa8dbbd 编写于 作者: D David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: Merge port and socket layer code

After the removal of the TIPC native interface, there is no reason to
keep a distinction between a "generic" port layer and a "specific"
socket layer in the code. Throughout the last months, we have posted
several series that aimed at facilitating removal of the port layer,
and in particular the port_lock spinlock, which in reality duplicates
the role normally kept by lock_sock()/bh_lock_sock().

In this series, we finalize this work, by making a significant number of
changes to the link, node, port and socket code, all with the aim of
reducing dependencies between the layers. In the final commits, we then
remove the port spinlock, port.c and port.h altogether.

After this series, we have a socket layer that has only few dependencies
to the rest of the stack, so that it should be possible to continue
cleanups of its code without significantly affecting other code.
====================
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
...@@ -7,7 +7,7 @@ obj-$(CONFIG_TIPC) := tipc.o ...@@ -7,7 +7,7 @@ obj-$(CONFIG_TIPC) := tipc.o
tipc-y += addr.o bcast.o bearer.o config.o \ tipc-y += addr.o bcast.o bearer.o config.o \
core.o link.o discover.o msg.o \ core.o link.o discover.o msg.o \
name_distr.o subscr.o name_table.o net.o \ name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \ netlink.o node.o node_subscr.o \
socket.o log.o eth_media.o server.o socket.o log.o eth_media.o server.o
tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
......
...@@ -37,7 +37,6 @@ ...@@ -37,7 +37,6 @@
#include "core.h" #include "core.h"
#include "link.h" #include "link.h"
#include "port.h"
#include "socket.h" #include "socket.h"
#include "msg.h" #include "msg.h"
#include "bcast.h" #include "bcast.h"
...@@ -300,8 +299,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) ...@@ -300,8 +299,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
tipc_link_push_queue(bcl); tipc_link_push_queue(bcl);
bclink_set_last_sent(); bclink_set_last_sent();
} }
if (unlikely(released && !list_empty(&bcl->waiting_ports))) if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
tipc_link_wakeup_ports(bcl, 0); bclink->node.action_flags |= TIPC_WAKEUP_USERS;
exit: exit:
tipc_bclink_unlock(); tipc_bclink_unlock();
} }
...@@ -840,9 +839,10 @@ int tipc_bclink_init(void) ...@@ -840,9 +839,10 @@ int tipc_bclink_init(void)
sprintf(bcbearer->media.name, "tipc-broadcast"); sprintf(bcbearer->media.name, "tipc-broadcast");
spin_lock_init(&bclink->lock); spin_lock_init(&bclink->lock);
INIT_LIST_HEAD(&bcl->waiting_ports); __skb_queue_head_init(&bcl->waiting_sks);
bcl->next_out_no = 1; bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock); spin_lock_init(&bclink->node.lock);
__skb_queue_head_init(&bclink->node.waiting_sks);
bcl->owner = &bclink->node; bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
*/ */
#include "core.h" #include "core.h"
#include "port.h" #include "socket.h"
#include "name_table.h" #include "name_table.h"
#include "config.h" #include "config.h"
#include "server.h" #include "server.h"
...@@ -266,7 +266,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area ...@@ -266,7 +266,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
rep_tlv_buf = tipc_media_get_names(); rep_tlv_buf = tipc_media_get_names();
break; break;
case TIPC_CMD_SHOW_PORTS: case TIPC_CMD_SHOW_PORTS:
rep_tlv_buf = tipc_port_get_ports(); rep_tlv_buf = tipc_sk_socks_show();
break; break;
case TIPC_CMD_SHOW_STATS: case TIPC_CMD_SHOW_STATS:
rep_tlv_buf = tipc_show_stats(); rep_tlv_buf = tipc_show_stats();
......
...@@ -35,11 +35,10 @@ ...@@ -35,11 +35,10 @@
*/ */
#include "core.h" #include "core.h"
#include "ref.h"
#include "name_table.h" #include "name_table.h"
#include "subscr.h" #include "subscr.h"
#include "config.h" #include "config.h"
#include "port.h" #include "socket.h"
#include <linux/module.h> #include <linux/module.h>
...@@ -85,7 +84,7 @@ static void tipc_core_stop(void) ...@@ -85,7 +84,7 @@ static void tipc_core_stop(void)
tipc_netlink_stop(); tipc_netlink_stop();
tipc_subscr_stop(); tipc_subscr_stop();
tipc_nametbl_stop(); tipc_nametbl_stop();
tipc_ref_table_stop(); tipc_sk_ref_table_stop();
tipc_socket_stop(); tipc_socket_stop();
tipc_unregister_sysctl(); tipc_unregister_sysctl();
} }
...@@ -99,7 +98,7 @@ static int tipc_core_start(void) ...@@ -99,7 +98,7 @@ static int tipc_core_start(void)
get_random_bytes(&tipc_random, sizeof(tipc_random)); get_random_bytes(&tipc_random, sizeof(tipc_random));
err = tipc_ref_table_init(tipc_max_ports, tipc_random); err = tipc_sk_ref_table_init(tipc_max_ports, tipc_random);
if (err) if (err)
goto out_reftbl; goto out_reftbl;
...@@ -139,7 +138,7 @@ static int tipc_core_start(void) ...@@ -139,7 +138,7 @@ static int tipc_core_start(void)
out_netlink: out_netlink:
tipc_nametbl_stop(); tipc_nametbl_stop();
out_nametbl: out_nametbl:
tipc_ref_table_stop(); tipc_sk_ref_table_stop();
out_reftbl: out_reftbl:
return err; return err;
} }
......
...@@ -187,8 +187,11 @@ static inline void k_term_timer(struct timer_list *timer) ...@@ -187,8 +187,11 @@ static inline void k_term_timer(struct timer_list *timer)
struct tipc_skb_cb { struct tipc_skb_cb {
void *handle; void *handle;
bool deferred;
struct sk_buff *tail; struct sk_buff *tail;
bool deferred;
bool wakeup_pending;
u16 chain_sz;
u16 chain_imp;
}; };
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include "core.h" #include "core.h"
#include "link.h" #include "link.h"
#include "port.h"
#include "socket.h" #include "socket.h"
#include "name_distr.h" #include "name_distr.h"
#include "discover.h" #include "discover.h"
...@@ -275,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, ...@@ -275,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
link_init_max_pkt(l_ptr); link_init_max_pkt(l_ptr);
l_ptr->next_out_no = 1; l_ptr->next_out_no = 1;
INIT_LIST_HEAD(&l_ptr->waiting_ports); __skb_queue_head_init(&l_ptr->waiting_sks);
link_reset_statistics(l_ptr); link_reset_statistics(l_ptr);
...@@ -322,66 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) ...@@ -322,66 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
} }
/** /**
* link_schedule_port - schedule port for deferred sending * link_schedule_user - schedule user for wakeup after congestion
* @l_ptr: pointer to link * @link: congested link
* @origport: reference to sending port * @oport: sending port
* @sz: amount of data to be sent * @chain_sz: size of buffer chain that was attempted sent
* * @imp: importance of message attempted sent
* Schedules port for renewed sending of messages after link congestion * Create pseudo msg to send back to user when congestion abates
* has abated.
*/ */
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) static bool link_schedule_user(struct tipc_link *link, u32 oport,
uint chain_sz, uint imp)
{ {
struct tipc_port *p_ptr; struct sk_buff *buf;
struct tipc_sock *tsk;
spin_lock_bh(&tipc_port_list_lock); buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
p_ptr = tipc_port_lock(origport); tipc_own_addr, oport, 0, 0);
if (p_ptr) { if (!buf)
if (!list_empty(&p_ptr->wait_list)) return false;
goto exit; TIPC_SKB_CB(buf)->chain_sz = chain_sz;
tsk = tipc_port_to_sock(p_ptr); TIPC_SKB_CB(buf)->chain_imp = imp;
tsk->link_cong = 1; __skb_queue_tail(&link->waiting_sks, buf);
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); link->stats.link_congs++;
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); return true;
l_ptr->stats.link_congs++;
exit:
tipc_port_unlock(p_ptr);
}
spin_unlock_bh(&tipc_port_list_lock);
return -ELINKCONG;
} }
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) /**
* link_prepare_wakeup - prepare users for wakeup after congestion
* @link: congested link
* Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to node wait queue for wakeup
*/
static void link_prepare_wakeup(struct tipc_link *link)
{ {
struct tipc_port *p_ptr; struct sk_buff_head *wq = &link->waiting_sks;
struct tipc_sock *tsk; struct sk_buff *buf;
struct tipc_port *temp_p_ptr; uint pend_qsz = link->out_queue_size;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
if (all) if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
win = 100000;
if (win <= 0)
return;
if (!spin_trylock_bh(&tipc_port_list_lock))
return;
if (link_congested(l_ptr))
goto exit;
list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
wait_list) {
if (win <= 0)
break; break;
tsk = tipc_port_to_sock(p_ptr); pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
list_del_init(&p_ptr->wait_list); __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
spin_lock_bh(p_ptr->lock);
tsk->link_cong = 0;
tipc_sock_wakeup(tsk);
win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock);
} }
exit:
spin_unlock_bh(&tipc_port_list_lock);
} }
/** /**
...@@ -423,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) ...@@ -423,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
u32 prev_state = l_ptr->state; u32 prev_state = l_ptr->state;
u32 checkpoint = l_ptr->next_in_no; u32 checkpoint = l_ptr->next_in_no;
int was_active_link = tipc_link_is_active(l_ptr); int was_active_link = tipc_link_is_active(l_ptr);
struct tipc_node *owner = l_ptr->owner;
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
...@@ -450,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr) ...@@ -450,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
kfree_skb(l_ptr->proto_msg_queue); kfree_skb(l_ptr->proto_msg_queue);
l_ptr->proto_msg_queue = NULL; l_ptr->proto_msg_queue = NULL;
kfree_skb_list(l_ptr->oldest_deferred_in); kfree_skb_list(l_ptr->oldest_deferred_in);
if (!list_empty(&l_ptr->waiting_ports)) if (!skb_queue_empty(&l_ptr->waiting_sks)) {
tipc_link_wakeup_ports(l_ptr, 1); skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
owner->action_flags |= TIPC_WAKEUP_USERS;
}
l_ptr->retransm_queue_head = 0; l_ptr->retransm_queue_head = 0;
l_ptr->retransm_queue_size = 0; l_ptr->retransm_queue_size = 0;
l_ptr->last_out = NULL; l_ptr->last_out = NULL;
...@@ -688,19 +670,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) ...@@ -688,19 +670,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
uint psz = msg_size(msg);
uint imp = tipc_msg_tot_importance(msg); uint imp = tipc_msg_tot_importance(msg);
u32 oport = msg_tot_origport(msg); u32 oport = msg_tot_origport(msg);
if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
link_schedule_port(link, oport, psz);
return -ELINKCONG;
}
} else {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link); tipc_link_reset(link);
goto drop;
} }
if (unlikely(msg_errcode(msg)))
goto drop;
if (unlikely(msg_reroute_cnt(msg)))
goto drop;
if (TIPC_SKB_CB(buf)->wakeup_pending)
return -ELINKCONG;
if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
return -ELINKCONG;
drop:
kfree_skb_list(buf); kfree_skb_list(buf);
return -EHOSTUNREACH; return -EHOSTUNREACH;
} }
...@@ -1202,8 +1188,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1202,8 +1188,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(l_ptr->next_out)) if (unlikely(l_ptr->next_out))
tipc_link_push_queue(l_ptr); tipc_link_push_queue(l_ptr);
if (unlikely(!list_empty(&l_ptr->waiting_ports))) if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
tipc_link_wakeup_ports(l_ptr, 0); link_prepare_wakeup(l_ptr);
l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
}
/* Process the incoming packet */ /* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) { if (unlikely(!link_working_working(l_ptr))) {
......
/* /*
* net/tipc/link.h: Include file for TIPC link code * net/tipc/link.h: Include file for TIPC link code
* *
* Copyright (c) 1995-2006, 2013, Ericsson AB * Copyright (c) 1995-2006, 2013-2014, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -133,7 +133,7 @@ struct tipc_stats { ...@@ -133,7 +133,7 @@ struct tipc_stats {
* @retransm_queue_size: number of messages to retransmit * @retransm_queue_size: number of messages to retransmit
* @retransm_queue_head: sequence number of first message to retransmit * @retransm_queue_head: sequence number of first message to retransmit
* @next_out: ptr to first unsent outbound message in queue * @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate * @waiting_sks: linked list of sockets waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages * @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_buf: head of partially reassembled inbound message fragments * @reasm_buf: head of partially reassembled inbound message fragments
* @stats: collects statistics regarding link activity * @stats: collects statistics regarding link activity
...@@ -194,7 +194,7 @@ struct tipc_link { ...@@ -194,7 +194,7 @@ struct tipc_link {
u32 retransm_queue_size; u32 retransm_queue_size;
u32 retransm_queue_head; u32 retransm_queue_head;
struct sk_buff *next_out; struct sk_buff *next_out;
struct list_head waiting_ports; struct sk_buff_head waiting_sks;
/* Fragmentation/reassembly */ /* Fragmentation/reassembly */
u32 long_msg_seq_no; u32 long_msg_seq_no;
...@@ -235,7 +235,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, ...@@ -235,7 +235,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
void tipc_link_push_queue(struct tipc_link *l_ptr); void tipc_link_push_queue(struct tipc_link *l_ptr);
u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff *buf); struct sk_buff *buf);
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all);
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
void tipc_link_retransmit(struct tipc_link *l_ptr, void tipc_link_retransmit(struct tipc_link *l_ptr,
struct sk_buff *start, u32 retransmits); struct sk_buff *start, u32 retransmits);
......
...@@ -56,8 +56,35 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, ...@@ -56,8 +56,35 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
msg_set_size(m, hsize); msg_set_size(m, hsize);
msg_set_prevnode(m, tipc_own_addr); msg_set_prevnode(m, tipc_own_addr);
msg_set_type(m, type); msg_set_type(m, type);
msg_set_orignode(m, tipc_own_addr); if (hsize > SHORT_H_SIZE) {
msg_set_destnode(m, destnode); msg_set_orignode(m, tipc_own_addr);
msg_set_destnode(m, destnode);
}
}
struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
uint data_sz, u32 dnode, u32 onode,
u32 dport, u32 oport, int errcode)
{
struct tipc_msg *msg;
struct sk_buff *buf;
buf = tipc_buf_acquire(hdr_sz + data_sz);
if (unlikely(!buf))
return NULL;
msg = buf_msg(buf);
tipc_msg_init(msg, user, type, hdr_sz, dnode);
msg_set_size(msg, hdr_sz + data_sz);
msg_set_prevnode(msg, onode);
msg_set_origport(msg, oport);
msg_set_destport(msg, dport);
msg_set_errcode(msg, errcode);
if (hdr_sz > SHORT_H_SIZE) {
msg_set_orignode(msg, onode);
msg_set_destnode(msg, dnode);
}
return buf;
} }
/* tipc_buf_append(): Append a buffer to the fragment list of another buffer /* tipc_buf_append(): Append a buffer to the fragment list of another buffer
...@@ -155,7 +182,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, ...@@ -155,7 +182,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
struct sk_buff *buf, *prev; struct sk_buff *buf, *prev;
char *pktpos; char *pktpos;
int rc; int rc;
uint chain_sz = 0;
msg_set_size(mhdr, msz); msg_set_size(mhdr, msz);
/* No fragmentation needed? */ /* No fragmentation needed? */
...@@ -166,6 +193,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, ...@@ -166,6 +193,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
return -ENOMEM; return -ENOMEM;
skb_copy_to_linear_data(buf, mhdr, mhsz); skb_copy_to_linear_data(buf, mhdr, mhsz);
pktpos = buf->data + mhsz; pktpos = buf->data + mhsz;
TIPC_SKB_CB(buf)->chain_sz = 1;
if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
return dsz; return dsz;
rc = -EFAULT; rc = -EFAULT;
...@@ -182,6 +210,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, ...@@ -182,6 +210,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
*chain = buf = tipc_buf_acquire(pktmax); *chain = buf = tipc_buf_acquire(pktmax);
if (!buf) if (!buf)
return -ENOMEM; return -ENOMEM;
chain_sz = 1;
pktpos = buf->data; pktpos = buf->data;
skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
pktpos += INT_H_SIZE; pktpos += INT_H_SIZE;
...@@ -215,6 +244,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, ...@@ -215,6 +244,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
rc = -ENOMEM; rc = -ENOMEM;
goto error; goto error;
} }
chain_sz++;
prev->next = buf; prev->next = buf;
msg_set_type(&pkthdr, FRAGMENT); msg_set_type(&pkthdr, FRAGMENT);
msg_set_size(&pkthdr, pktsz); msg_set_size(&pkthdr, pktsz);
...@@ -224,7 +254,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, ...@@ -224,7 +254,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
pktrem = pktsz - INT_H_SIZE; pktrem = pktsz - INT_H_SIZE;
} while (1); } while (1);
TIPC_SKB_CB(*chain)->chain_sz = chain_sz;
msg_set_type(buf_msg(buf), LAST_FRAGMENT); msg_set_type(buf_msg(buf), LAST_FRAGMENT);
return dsz; return dsz;
error: error:
......
...@@ -442,6 +442,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) ...@@ -442,6 +442,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define NAME_DISTRIBUTOR 11 #define NAME_DISTRIBUTOR 11
#define MSG_FRAGMENTER 12 #define MSG_FRAGMENTER 12
#define LINK_CONFIG 13 #define LINK_CONFIG 13
#define SOCK_WAKEUP 14 /* pseudo user */
/* /*
* Connection management protocol message types * Connection management protocol message types
...@@ -732,6 +733,10 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode); ...@@ -732,6 +733,10 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode); u32 destnode);
struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
uint data_sz, u32 dnode, u32 onode,
u32 dport, u32 oport, int errcode);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
......
...@@ -39,7 +39,6 @@ ...@@ -39,7 +39,6 @@
#include "name_table.h" #include "name_table.h"
#include "name_distr.h" #include "name_distr.h"
#include "subscr.h" #include "subscr.h"
#include "port.h"
#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
......
...@@ -38,7 +38,6 @@ ...@@ -38,7 +38,6 @@
#include "net.h" #include "net.h"
#include "name_distr.h" #include "name_distr.h"
#include "subscr.h" #include "subscr.h"
#include "port.h"
#include "socket.h" #include "socket.h"
#include "node.h" #include "node.h"
#include "config.h" #include "config.h"
...@@ -111,7 +110,7 @@ int tipc_net_start(u32 addr) ...@@ -111,7 +110,7 @@ int tipc_net_start(u32 addr)
tipc_own_addr = addr; tipc_own_addr = addr;
tipc_named_reinit(); tipc_named_reinit();
tipc_port_reinit(); tipc_sk_reinit();
res = tipc_bclink_init(); res = tipc_bclink_init();
if (res) if (res)
return res; return res;
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "config.h" #include "config.h"
#include "node.h" #include "node.h"
#include "name_distr.h" #include "name_distr.h"
#include "socket.h"
#define NODE_HTABLE_SIZE 512 #define NODE_HTABLE_SIZE 512
...@@ -50,6 +51,13 @@ static u32 tipc_num_nodes; ...@@ -50,6 +51,13 @@ static u32 tipc_num_nodes;
static u32 tipc_num_links; static u32 tipc_num_links;
static DEFINE_SPINLOCK(node_list_lock); static DEFINE_SPINLOCK(node_list_lock);
struct tipc_sock_conn {
u32 port;
u32 peer_port;
u32 peer_node;
struct list_head list;
};
/* /*
* A trivial power-of-two bitmask technique is used for speed, since this * A trivial power-of-two bitmask technique is used for speed, since this
* operation is done for every incoming TIPC packet. The number of hash table * operation is done for every incoming TIPC packet. The number of hash table
...@@ -100,6 +108,8 @@ struct tipc_node *tipc_node_create(u32 addr) ...@@ -100,6 +108,8 @@ struct tipc_node *tipc_node_create(u32 addr)
INIT_HLIST_NODE(&n_ptr->hash); INIT_HLIST_NODE(&n_ptr->hash);
INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->nsub); INIT_LIST_HEAD(&n_ptr->nsub);
INIT_LIST_HEAD(&n_ptr->conn_sks);
__skb_queue_head_init(&n_ptr->waiting_sks);
hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
...@@ -136,6 +146,71 @@ void tipc_node_stop(void) ...@@ -136,6 +146,71 @@ void tipc_node_stop(void)
spin_unlock_bh(&node_list_lock); spin_unlock_bh(&node_list_lock);
} }
int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port)
{
struct tipc_node *node;
struct tipc_sock_conn *conn;
if (in_own_node(dnode))
return 0;
node = tipc_node_find(dnode);
if (!node) {
pr_warn("Connecting sock to node 0x%x failed\n", dnode);
return -EHOSTUNREACH;
}
conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
if (!conn)
return -EHOSTUNREACH;
conn->peer_node = dnode;
conn->port = port;
conn->peer_port = peer_port;
tipc_node_lock(node);
list_add_tail(&conn->list, &node->conn_sks);
tipc_node_unlock(node);
return 0;
}
void tipc_node_remove_conn(u32 dnode, u32 port)
{
struct tipc_node *node;
struct tipc_sock_conn *conn, *safe;
if (in_own_node(dnode))
return;
node = tipc_node_find(dnode);
if (!node)
return;
tipc_node_lock(node);
list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
if (port != conn->port)
continue;
list_del(&conn->list);
kfree(conn);
}
tipc_node_unlock(node);
}
void tipc_node_abort_sock_conns(struct list_head *conns)
{
struct tipc_sock_conn *conn, *safe;
struct sk_buff *buf;
list_for_each_entry_safe(conn, safe, conns, list) {
buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
SHORT_H_SIZE, 0, tipc_own_addr,
conn->peer_node, conn->port,
conn->peer_port, TIPC_ERR_NO_NODE);
if (likely(buf))
tipc_sk_rcv(buf);
list_del(&conn->list);
kfree(conn);
}
}
/** /**
* tipc_node_link_up - handle addition of link * tipc_node_link_up - handle addition of link
* *
...@@ -474,6 +549,8 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) ...@@ -474,6 +549,8 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
void tipc_node_unlock(struct tipc_node *node) void tipc_node_unlock(struct tipc_node *node)
{ {
LIST_HEAD(nsub_list); LIST_HEAD(nsub_list);
LIST_HEAD(conn_sks);
struct sk_buff_head waiting_sks;
u32 addr = 0; u32 addr = 0;
if (likely(!node->action_flags)) { if (likely(!node->action_flags)) {
...@@ -481,8 +558,14 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -481,8 +558,14 @@ void tipc_node_unlock(struct tipc_node *node)
return; return;
} }
__skb_queue_head_init(&waiting_sks);
if (node->action_flags & TIPC_WAKEUP_USERS) {
skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
node->action_flags &= ~TIPC_WAKEUP_USERS;
}
if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) { if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
list_replace_init(&node->nsub, &nsub_list); list_replace_init(&node->nsub, &nsub_list);
list_replace_init(&node->conn_sks, &conn_sks);
node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
} }
if (node->action_flags & TIPC_NOTIFY_NODE_UP) { if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
...@@ -491,8 +574,15 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -491,8 +574,15 @@ void tipc_node_unlock(struct tipc_node *node)
} }
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
while (!skb_queue_empty(&waiting_sks))
tipc_sk_rcv(__skb_dequeue(&waiting_sks));
if (!list_empty(&conn_sks))
tipc_node_abort_sock_conns(&conn_sks);
if (!list_empty(&nsub_list)) if (!list_empty(&nsub_list))
tipc_nodesub_notify(&nsub_list); tipc_nodesub_notify(&nsub_list);
if (addr) if (addr)
tipc_named_node_up(addr); tipc_named_node_up(addr);
} }
...@@ -58,7 +58,8 @@ enum { ...@@ -58,7 +58,8 @@ enum {
TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
TIPC_NOTIFY_NODE_DOWN = (1 << 3), TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4) TIPC_NOTIFY_NODE_UP = (1 << 4),
TIPC_WAKEUP_USERS = (1 << 5)
}; };
/** /**
...@@ -115,6 +116,8 @@ struct tipc_node { ...@@ -115,6 +116,8 @@ struct tipc_node {
int working_links; int working_links;
u32 signature; u32 signature;
struct list_head nsub; struct list_head nsub;
struct sk_buff_head waiting_sks;
struct list_head conn_sks;
struct rcu_head rcu; struct rcu_head rcu;
}; };
...@@ -133,6 +136,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) ...@@ -133,6 +136,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node); void tipc_node_unlock(struct tipc_node *node);
int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(u32 dnode, u32 port);
static inline void tipc_node_lock(struct tipc_node *node) static inline void tipc_node_lock(struct tipc_node *node)
{ {
......
/*
* net/tipc/port.c: TIPC port code
*
* Copyright (c) 1992-2007, 2014, Ericsson AB
* Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "core.h"
#include "config.h"
#include "port.h"
#include "name_table.h"
#include "socket.h"
/* Connection management: */
#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
#define MAX_REJECT_SIZE 1024
DEFINE_SPINLOCK(tipc_port_list_lock);
static LIST_HEAD(ports);
static void port_handle_node_down(unsigned long ref);
static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
static void port_timeout(unsigned long ref);
/**
* tipc_port_peer_msg - verify message was sent by connected port's peer
*
* Handles cases where the node's network address has changed from
* the default of <0.0.0> to its configured setting.
*/
int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
{
u32 peernode;
u32 orignode;
if (msg_origport(msg) != tipc_port_peerport(p_ptr))
return 0;
orignode = msg_orignode(msg);
peernode = tipc_port_peernode(p_ptr);
return (orignode == peernode) ||
(!orignode && (peernode == tipc_own_addr)) ||
(!peernode && (orignode == tipc_own_addr));
}
/* tipc_port_init - intiate TIPC port and lock it
*
* Returns obtained reference if initialization is successful, zero otherwise
*/
u32 tipc_port_init(struct tipc_port *p_ptr,
const unsigned int importance)
{
struct tipc_msg *msg;
u32 ref;
ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
if (!ref) {
pr_warn("Port registration failed, ref. table exhausted\n");
return 0;
}
p_ptr->max_pkt = MAX_PKT_DEFAULT;
p_ptr->ref = ref;
INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
INIT_LIST_HEAD(&p_ptr->publications);
INIT_LIST_HEAD(&p_ptr->port_list);
/*
* Must hold port list lock while initializing message header template
* to ensure a change to node's own network address doesn't result
* in template containing out-dated network address information
*/
spin_lock_bh(&tipc_port_list_lock);
msg = &p_ptr->phdr;
tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
msg_set_origport(msg, ref);
list_add_tail(&p_ptr->port_list, &ports);
spin_unlock_bh(&tipc_port_list_lock);
return ref;
}
void tipc_port_destroy(struct tipc_port *p_ptr)
{
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
u32 peer;
tipc_withdraw(p_ptr, 0, NULL);
spin_lock_bh(p_ptr->lock);
tipc_ref_discard(p_ptr->ref);
spin_unlock_bh(p_ptr->lock);
k_cancel_timer(&p_ptr->timer);
if (p_ptr->connected) {
buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
tipc_nodesub_unsubscribe(&p_ptr->subscription);
msg = buf_msg(buf);
peer = msg_destnode(msg);
tipc_link_xmit(buf, peer, msg_link_selector(msg));
}
spin_lock_bh(&tipc_port_list_lock);
list_del(&p_ptr->port_list);
list_del(&p_ptr->wait_list);
spin_unlock_bh(&tipc_port_list_lock);
k_term_timer(&p_ptr->timer);
}
/*
* port_build_proto_msg(): create connection protocol message for port
*
* On entry the port must be locked and connected.
*/
static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
u32 type, u32 ack)
{
struct sk_buff *buf;
struct tipc_msg *msg;
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
msg = buf_msg(buf);
tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
tipc_port_peernode(p_ptr));
msg_set_destport(msg, tipc_port_peerport(p_ptr));
msg_set_origport(msg, p_ptr->ref);
msg_set_msgcnt(msg, ack);
buf->next = NULL;
}
return buf;
}
static void port_timeout(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
if (!p_ptr->connected) {
tipc_port_unlock(p_ptr);
return;
}
/* Last probe answered ? */
if (p_ptr->probing_state == TIPC_CONN_PROBING) {
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
} else {
buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
p_ptr->probing_state = TIPC_CONN_PROBING;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
}
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
static void port_handle_node_down(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
{
struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
msg_swap_words(msg, 4, 5);
msg_swap_words(msg, 6, 7);
buf->next = NULL;
}
return buf;
}
static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
{
struct sk_buff *buf;
struct tipc_msg *msg;
u32 imp;
if (!p_ptr->connected)
return NULL;
buf = tipc_buf_acquire(BASIC_H_SIZE);
if (buf) {
msg = buf_msg(buf);
memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
msg_set_hdr_sz(msg, BASIC_H_SIZE);
msg_set_size(msg, BASIC_H_SIZE);
imp = msg_importance(msg);
if (imp < TIPC_CRITICAL_IMPORTANCE)
msg_set_importance(msg, ++imp);
msg_set_errcode(msg, err);
buf->next = NULL;
}
return buf;
}
static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
{
struct publication *publ;
int ret;
if (full_id)
ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
tipc_zone(tipc_own_addr),
tipc_cluster(tipc_own_addr),
tipc_node(tipc_own_addr), p_ptr->ref);
else
ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
if (p_ptr->connected) {
u32 dport = tipc_port_peerport(p_ptr);
u32 destnode = tipc_port_peernode(p_ptr);
ret += tipc_snprintf(buf + ret, len - ret,
" connected to <%u.%u.%u:%u>",
tipc_zone(destnode),
tipc_cluster(destnode),
tipc_node(destnode), dport);
if (p_ptr->conn_type != 0)
ret += tipc_snprintf(buf + ret, len - ret,
" via {%u,%u}", p_ptr->conn_type,
p_ptr->conn_instance);
} else if (p_ptr->published) {
ret += tipc_snprintf(buf + ret, len - ret, " bound to");
list_for_each_entry(publ, &p_ptr->publications, pport_list) {
if (publ->lower == publ->upper)
ret += tipc_snprintf(buf + ret, len - ret,
" {%u,%u}", publ->type,
publ->lower);
else
ret += tipc_snprintf(buf + ret, len - ret,
" {%u,%u,%u}", publ->type,
publ->lower, publ->upper);
}
}
ret += tipc_snprintf(buf + ret, len - ret, "\n");
return ret;
}
struct sk_buff *tipc_port_get_ports(void)
{
struct sk_buff *buf;
struct tlv_desc *rep_tlv;
char *pb;
int pb_len;
struct tipc_port *p_ptr;
int str_len = 0;
buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
if (!buf)
return NULL;
rep_tlv = (struct tlv_desc *)buf->data;
pb = TLV_DATA(rep_tlv);
pb_len = ULTRA_STRING_MAX_LEN;
spin_lock_bh(&tipc_port_list_lock);
list_for_each_entry(p_ptr, &ports, port_list) {
spin_lock_bh(p_ptr->lock);
str_len += port_print(p_ptr, pb, pb_len, 0);
spin_unlock_bh(p_ptr->lock);
}
spin_unlock_bh(&tipc_port_list_lock);
str_len += 1; /* for "\0" */
skb_put(buf, TLV_SPACE(str_len));
TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
return buf;
}
void tipc_port_reinit(void)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg;
spin_lock_bh(&tipc_port_list_lock);
list_for_each_entry(p_ptr, &ports, port_list) {
msg = &p_ptr->phdr;
msg_set_prevnode(msg, tipc_own_addr);
msg_set_orignode(msg, tipc_own_addr);
}
spin_unlock_bh(&tipc_port_list_lock);
}
void tipc_acknowledge(u32 ref, u32 ack)
{
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
struct tipc_msg *msg;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return;
if (p_ptr->connected)
buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
tipc_port_unlock(p_ptr);
if (!buf)
return;
msg = buf_msg(buf);
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
struct tipc_name_seq const *seq)
{
struct publication *publ;
u32 key;
if (p_ptr->connected)
return -EINVAL;
key = p_ptr->ref + p_ptr->pub_count + 1;
if (key == p_ptr->ref)
return -EADDRINUSE;
publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
scope, p_ptr->ref, key);
if (publ) {
list_add(&publ->pport_list, &p_ptr->publications);
p_ptr->pub_count++;
p_ptr->published = 1;
return 0;
}
return -EINVAL;
}
int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope,
struct tipc_name_seq const *seq)
{
struct publication *publ;
struct publication *tpubl;
int res = -EINVAL;
if (!seq) {
list_for_each_entry_safe(publ, tpubl,
&p_ptr->publications, pport_list) {
tipc_nametbl_withdraw(publ->type, publ->lower,
publ->ref, publ->key);
}
res = 0;
} else {
list_for_each_entry_safe(publ, tpubl,
&p_ptr->publications, pport_list) {
if (publ->scope != scope)
continue;
if (publ->type != seq->type)
continue;
if (publ->lower != seq->lower)
continue;
if (publ->upper != seq->upper)
break;
tipc_nametbl_withdraw(publ->type, publ->lower,
publ->ref, publ->key);
res = 0;
break;
}
}
if (list_empty(&p_ptr->publications))
p_ptr->published = 0;
return res;
}
int tipc_port_connect(u32 ref, struct tipc_portid const *peer)
{
struct tipc_port *p_ptr;
int res;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
res = __tipc_port_connect(ref, p_ptr, peer);
tipc_port_unlock(p_ptr);
return res;
}
/*
* __tipc_port_connect - connect to a remote peer
*
* Port must be locked.
*/
int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer)
{
struct tipc_msg *msg;
int res = -EINVAL;
if (p_ptr->published || p_ptr->connected)
goto exit;
if (!peer->ref)
goto exit;
msg = &p_ptr->phdr;
msg_set_destnode(msg, peer->node);
msg_set_destport(msg, peer->ref);
msg_set_type(msg, TIPC_CONN_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_hdr_sz(msg, SHORT_H_SIZE);
p_ptr->probing_interval = PROBING_INTERVAL;
p_ptr->probing_state = TIPC_CONN_OK;
p_ptr->connected = 1;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
(void *)(unsigned long)ref,
(net_ev_handler)port_handle_node_down);
res = 0;
exit:
p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
return res;
}
/*
* __tipc_disconnect - disconnect port from peer
*
* Port must be locked.
*/
int __tipc_port_disconnect(struct tipc_port *tp_ptr)
{
if (tp_ptr->connected) {
tp_ptr->connected = 0;
/* let timer expire on it's own to avoid deadlock! */
tipc_nodesub_unsubscribe(&tp_ptr->subscription);
return 0;
}
return -ENOTCONN;
}
/*
* tipc_port_disconnect(): Disconnect port form peer.
* This is a node local operation.
*/
int tipc_port_disconnect(u32 ref)
{
struct tipc_port *p_ptr;
int res;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
res = __tipc_port_disconnect(p_ptr);
tipc_port_unlock(p_ptr);
return res;
}
/*
* tipc_port_shutdown(): Send a SHUTDOWN msg to peer and disconnect
*/
int tipc_port_shutdown(u32 ref)
{
struct tipc_msg *msg;
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
return tipc_port_disconnect(ref);
}
/*
* net/tipc/port.h: Include file for TIPC port code
*
* Copyright (c) 1994-2007, 2014, Ericsson AB
* Copyright (c) 2004-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _TIPC_PORT_H
#define _TIPC_PORT_H
#include "ref.h"
#include "net.h"
#include "msg.h"
#include "node_subscr.h"
#define TIPC_CONNACK_INTV 256
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
/**
* struct tipc_port - TIPC port structure
* @lock: pointer to spinlock for controlling access to port
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established
* @published: non-zero if port has one or more associated names
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
* @probing_interval:
* @timer_ref:
* @subscription: "node down" subscription used to terminate failed connections
*/
struct tipc_port {
spinlock_t *lock;
int connected;
u32 conn_type;
u32 conn_instance;
int published;
u32 max_pkt;
u32 ref;
struct tipc_msg phdr;
struct list_head port_list;
struct list_head wait_list;
u32 waiting_pkts;
struct list_head publications;
u32 pub_count;
u32 probing_state;
u32 probing_interval;
struct timer_list timer;
struct tipc_node_subscr subscription;
};
extern spinlock_t tipc_port_list_lock;
struct tipc_port_list;
/*
* TIPC port manipulation routines
*/
u32 tipc_port_init(struct tipc_port *p_ptr,
const unsigned int importance);
void tipc_acknowledge(u32 port_ref, u32 ack);
void tipc_port_destroy(struct tipc_port *p_ptr);
int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
struct tipc_name_seq const *name_seq);
int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope,
struct tipc_name_seq const *name_seq);
int tipc_port_connect(u32 portref, struct tipc_portid const *port);
int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref);
/*
* The following routines require that the port be locked on entry
*/
int __tipc_port_disconnect(struct tipc_port *tp_ptr);
int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer);
int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
struct sk_buff *tipc_port_get_ports(void);
void tipc_port_reinit(void);
/**
* tipc_port_lock - lock port instance referred to and return its pointer
*/
static inline struct tipc_port *tipc_port_lock(u32 ref)
{
return (struct tipc_port *)tipc_ref_lock(ref);
}
/**
* tipc_port_unlock - unlock a port instance
*
* Can use pointer instead of tipc_ref_unlock() since port is already locked.
*/
static inline void tipc_port_unlock(struct tipc_port *p_ptr)
{
spin_unlock_bh(p_ptr->lock);
}
static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
{
return msg_destnode(&p_ptr->phdr);
}
static inline u32 tipc_port_peerport(struct tipc_port *p_ptr)
{
return msg_destport(&p_ptr->phdr);
}
static inline bool tipc_port_unreliable(struct tipc_port *port)
{
return msg_src_droppable(&port->phdr) != 0;
}
static inline void tipc_port_set_unreliable(struct tipc_port *port,
bool unreliable)
{
msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0);
}
static inline bool tipc_port_unreturnable(struct tipc_port *port)
{
return msg_dest_droppable(&port->phdr) != 0;
}
static inline void tipc_port_set_unreturnable(struct tipc_port *port,
bool unreturnable)
{
msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0);
}
static inline int tipc_port_importance(struct tipc_port *port)
{
return msg_importance(&port->phdr);
}
static inline int tipc_port_set_importance(struct tipc_port *port, int imp)
{
if (imp > TIPC_CRITICAL_IMPORTANCE)
return -EINVAL;
msg_set_importance(&port->phdr, (u32)imp);
return 0;
}
#endif
/*
* net/tipc/ref.c: TIPC object registry code
*
* Copyright (c) 1991-2006, Ericsson AB
* Copyright (c) 2004-2007, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "core.h"
#include "ref.h"
/**
* struct reference - TIPC object reference entry
* @object: pointer to object associated with reference entry
* @lock: spinlock controlling access to object
* @ref: reference value for object (combines instance & array index info)
*/
struct reference {
void *object;
spinlock_t lock;
u32 ref;
};
/**
* struct tipc_ref_table - table of TIPC object reference entries
* @entries: pointer to array of reference entries
* @capacity: array index of first unusable entry
* @init_point: array index of first uninitialized entry
* @first_free: array index of first unused object reference entry
* @last_free: array index of last unused object reference entry
* @index_mask: bitmask for array index portion of reference values
* @start_mask: initial value for instance value portion of reference values
*/
struct ref_table {
struct reference *entries;
u32 capacity;
u32 init_point;
u32 first_free;
u32 last_free;
u32 index_mask;
u32 start_mask;
};
/*
* Object reference table consists of 2**N entries.
*
* State Object ptr Reference
* ----- ---------- ---------
* In use non-NULL XXXX|own index
* (XXXX changes each time entry is acquired)
* Free NULL YYYY|next free index
* (YYYY is one more than last used XXXX)
* Uninitialized NULL 0
*
* Entry 0 is not used; this allows index 0 to denote the end of the free list.
*
* Note that a reference value of 0 does not necessarily indicate that an
* entry is uninitialized, since the last entry in the free list could also
* have a reference value of 0 (although this is unlikely).
*/
static struct ref_table tipc_ref_table;
static DEFINE_SPINLOCK(ref_table_lock);
/**
* tipc_ref_table_init - create reference table for objects
*/
int tipc_ref_table_init(u32 requested_size, u32 start)
{
struct reference *table;
u32 actual_size;
/* account for unused entry, then round up size to a power of 2 */
requested_size++;
for (actual_size = 16; actual_size < requested_size; actual_size <<= 1)
/* do nothing */ ;
/* allocate table & mark all entries as uninitialized */
table = vzalloc(actual_size * sizeof(struct reference));
if (table == NULL)
return -ENOMEM;
tipc_ref_table.entries = table;
tipc_ref_table.capacity = requested_size;
tipc_ref_table.init_point = 1;
tipc_ref_table.first_free = 0;
tipc_ref_table.last_free = 0;
tipc_ref_table.index_mask = actual_size - 1;
tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
return 0;
}
/**
* tipc_ref_table_stop - destroy reference table for objects
*/
void tipc_ref_table_stop(void)
{
vfree(tipc_ref_table.entries);
tipc_ref_table.entries = NULL;
}
/**
* tipc_ref_acquire - create reference to an object
*
* Register an object pointer in reference table and lock the object.
* Returns a unique reference value that is used from then on to retrieve the
* object pointer, or to determine that the object has been deregistered.
*
* Note: The object is returned in the locked state so that the caller can
* register a partially initialized object, without running the risk that
* the object will be accessed before initialization is complete.
*/
u32 tipc_ref_acquire(void *object, spinlock_t **lock)
{
u32 index;
u32 index_mask;
u32 next_plus_upper;
u32 ref;
struct reference *entry = NULL;
if (!object) {
pr_err("Attempt to acquire ref. to non-existent obj\n");
return 0;
}
if (!tipc_ref_table.entries) {
pr_err("Ref. table not found in acquisition attempt\n");
return 0;
}
/* take a free entry, if available; otherwise initialize a new entry */
spin_lock_bh(&ref_table_lock);
if (tipc_ref_table.first_free) {
index = tipc_ref_table.first_free;
entry = &(tipc_ref_table.entries[index]);
index_mask = tipc_ref_table.index_mask;
next_plus_upper = entry->ref;
tipc_ref_table.first_free = next_plus_upper & index_mask;
ref = (next_plus_upper & ~index_mask) + index;
} else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
index = tipc_ref_table.init_point++;
entry = &(tipc_ref_table.entries[index]);
spin_lock_init(&entry->lock);
ref = tipc_ref_table.start_mask + index;
} else {
ref = 0;
}
spin_unlock_bh(&ref_table_lock);
/*
* Grab the lock so no one else can modify this entry
* While we assign its ref value & object pointer
*/
if (entry) {
spin_lock_bh(&entry->lock);
entry->ref = ref;
entry->object = object;
*lock = &entry->lock;
/*
* keep it locked, the caller is responsible
* for unlocking this when they're done with it
*/
}
return ref;
}
/**
* tipc_ref_discard - invalidate references to an object
*
* Disallow future references to an object and free up the entry for re-use.
* Note: The entry's spin_lock may still be busy after discard
*/
void tipc_ref_discard(u32 ref)
{
struct reference *entry;
u32 index;
u32 index_mask;
if (!tipc_ref_table.entries) {
pr_err("Ref. table not found during discard attempt\n");
return;
}
index_mask = tipc_ref_table.index_mask;
index = ref & index_mask;
entry = &(tipc_ref_table.entries[index]);
spin_lock_bh(&ref_table_lock);
if (!entry->object) {
pr_err("Attempt to discard ref. to non-existent obj\n");
goto exit;
}
if (entry->ref != ref) {
pr_err("Attempt to discard non-existent reference\n");
goto exit;
}
/*
* mark entry as unused; increment instance part of entry's reference
* to invalidate any subsequent references
*/
entry->object = NULL;
entry->ref = (ref & ~index_mask) + (index_mask + 1);
/* append entry to free entry list */
if (tipc_ref_table.first_free == 0)
tipc_ref_table.first_free = index;
else
tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
tipc_ref_table.last_free = index;
exit:
spin_unlock_bh(&ref_table_lock);
}
/**
* tipc_ref_lock - lock referenced object and return pointer to it
*/
void *tipc_ref_lock(u32 ref)
{
if (likely(tipc_ref_table.entries)) {
struct reference *entry;
entry = &tipc_ref_table.entries[ref &
tipc_ref_table.index_mask];
if (likely(entry->ref != 0)) {
spin_lock_bh(&entry->lock);
if (likely((entry->ref == ref) && (entry->object)))
return entry->object;
spin_unlock_bh(&entry->lock);
}
}
return NULL;
}
/*
* net/tipc/ref.h: Include file for TIPC object registry code
*
* Copyright (c) 1991-2006, Ericsson AB
* Copyright (c) 2005-2006, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _TIPC_REF_H
#define _TIPC_REF_H
int tipc_ref_table_init(u32 requested_size, u32 start);
void tipc_ref_table_stop(void);
u32 tipc_ref_acquire(void *object, spinlock_t **lock);
void tipc_ref_discard(u32 ref);
void *tipc_ref_lock(u32 ref);
#endif
此差异已折叠。
...@@ -35,56 +35,17 @@ ...@@ -35,56 +35,17 @@
#ifndef _TIPC_SOCK_H #ifndef _TIPC_SOCK_H
#define _TIPC_SOCK_H #define _TIPC_SOCK_H
#include "port.h"
#include <net/sock.h> #include <net/sock.h>
#define TIPC_CONN_OK 0 #define TIPC_CONNACK_INTV 256
#define TIPC_CONN_PROBING 1 #define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
/** SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
* struct tipc_sock - TIPC socket structure
* @sk: socket - interacts with 'port' and with user via the socket API
* @port: port - interacts with 'sk' and with the rest of the TIPC stack
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
*/
struct tipc_sock {
struct sock sk;
struct tipc_port port;
unsigned int conn_timeout;
atomic_t dupl_rcvcnt;
int link_cong;
uint sent_unacked;
uint rcv_unacked;
};
static inline struct tipc_sock *tipc_sk(const struct sock *sk)
{
return container_of(sk, struct tipc_sock, sk);
}
static inline struct tipc_sock *tipc_port_to_sock(const struct tipc_port *port)
{
return container_of(port, struct tipc_sock, port);
}
static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
{
tsk->sk.sk_write_space(&tsk->sk);
}
static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
{
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
}
int tipc_sk_rcv(struct sk_buff *buf); int tipc_sk_rcv(struct sk_buff *buf);
struct sk_buff *tipc_sk_socks_show(void);
void tipc_sk_mcast_rcv(struct sk_buff *buf); void tipc_sk_mcast_rcv(struct sk_buff *buf);
void tipc_sk_reinit(void);
int tipc_sk_ref_table_init(u32 requested_size, u32 start);
void tipc_sk_ref_table_stop(void);
#endif #endif
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include "core.h" #include "core.h"
#include "name_table.h" #include "name_table.h"
#include "port.h"
#include "subscr.h" #include "subscr.h"
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册