提交 0ff9275a 编写于 作者: D David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: new unicast transmission code

As a step towards making the data transmission code more maintainable
and performant, we introduce a number of new functions, both for
building, sending and rejecting messages. The new functions will
eventually be used for alla data transmission, user data unicast,
service internal messaging, and multicast/broadcast.

We start with this series, where we introduce the functions, and
let user data unicast and the internal connection protocol use them.
The remaining users will come in a later series.

There are only minor changes to data structures, and no protocol
changes, so the older functions can still be used in parallel for
some time. Until the old functions are removed, we use temporary
names for the new functions, such as tipc_build_msg2, tipc_link_xmit2.

It should be noted that the first two commits are unrelated to the
rest of the series.
====================
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
......@@ -82,9 +82,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
struct sk_buff **buf);
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destnode);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
......@@ -335,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
{
struct tipc_port *p_ptr;
struct tipc_sock *tsk;
spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport);
if (p_ptr) {
if (!list_empty(&p_ptr->wait_list))
goto exit;
p_ptr->congested = 1;
tsk = tipc_port_to_sock(p_ptr);
tsk->link_cong = 1;
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
l_ptr->stats.link_congs++;
......@@ -355,6 +354,7 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
{
struct tipc_port *p_ptr;
struct tipc_sock *tsk;
struct tipc_port *temp_p_ptr;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
......@@ -370,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
wait_list) {
if (win <= 0)
break;
tsk = tipc_port_to_sock(p_ptr);
list_del_init(&p_ptr->wait_list);
spin_lock_bh(p_ptr->lock);
p_ptr->congested = 0;
tipc_port_wakeup(p_ptr);
tsk->link_cong = 0;
tipc_sock_wakeup(tsk);
win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock);
}
......@@ -850,6 +851,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
return res;
}
/* tipc_link_cong: determine return value and how to treat the
* sent buffer during link congestion.
* - For plain, errorless user data messages we keep the buffer and
* return -ELINKONG.
* - For all other messages we discard the buffer and return -EHOSTUNREACH
* - For TIPC internal messages we also reset the link
*/
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
uint psz = msg_size(msg);
uint imp = tipc_msg_tot_importance(msg);
u32 oport = msg_tot_origport(msg);
if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
link_schedule_port(link, oport, psz);
return -ELINKCONG;
}
} else {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
}
kfree_skb_list(buf);
return -EHOSTUNREACH;
}
/**
* __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
* @link: link to use
* @buf: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
* user data messages) or -EHOSTUNREACH (all other messages/senders)
* Only the socket functions tipc_send_stream() and tipc_send_packet() need
* to act on the return value, since they may need to do more send attempts.
*/
int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
uint psz = msg_size(msg);
uint qsz = link->out_queue_size;
uint sndlim = link->queue_limit[0];
uint imp = tipc_msg_tot_importance(msg);
uint mtu = link->max_pkt;
uint ack = mod(link->next_in_no - 1);
uint seqno = link->next_out_no;
uint bc_last_in = link->owner->bclink.last_in;
struct tipc_media_addr *addr = &link->media_addr;
struct sk_buff *next = buf->next;
/* Match queue limits against msg importance: */
if (unlikely(qsz >= link->queue_limit[imp]))
return tipc_link_cong(link, buf);
/* Has valid packet limit been used ? */
if (unlikely(psz > mtu)) {
kfree_skb_list(buf);
return -EMSGSIZE;
}
/* Prepare each packet for sending, and add to outqueue: */
while (buf) {
next = buf->next;
msg = buf_msg(buf);
msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
msg_set_bcast_ack(msg, bc_last_in);
if (!link->first_out) {
link->first_out = buf;
} else if (qsz < sndlim) {
link->last_out->next = buf;
} else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
link->stats.sent_bundled++;
buf = next;
next = buf->next;
continue;
} else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
link->stats.sent_bundled++;
link->stats.sent_bundles++;
link->last_out->next = buf;
if (!link->next_out)
link->next_out = buf;
} else {
link->last_out->next = buf;
if (!link->next_out)
link->next_out = buf;
}
/* Send packet if possible: */
if (likely(++qsz <= sndlim)) {
tipc_bearer_send(link->bearer_id, buf, addr);
link->next_out = next;
link->unacked_window = 0;
}
seqno++;
link->last_out = buf;
buf = next;
}
link->next_out_no = seqno;
link->out_queue_size = qsz;
return 0;
}
/**
* tipc_link_xmit2() is the general link level function for message sending
* @buf: chain of buffers containing message
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
{
struct tipc_link *link = NULL;
struct tipc_node *node;
int rc = -EHOSTUNREACH;
node = tipc_node_find(dnode);
if (node) {
tipc_node_lock(node);
link = node->active_links[selector & 1];
if (link)
rc = __tipc_link_xmit2(link, buf);
tipc_node_unlock(node);
}
if (link)
return rc;
if (likely(in_own_node(dnode)))
return tipc_sk_rcv(buf);
kfree_skb_list(buf);
return rc;
}
/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*
......@@ -932,252 +1071,6 @@ void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
}
}
/*
* tipc_link_xmit_fast: Entry for data messages where the
* destination link is known and the header is complete,
* inclusive total message length. Very time critical.
* Link is locked. Returns user data length.
*/
static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
u32 *used_max_pkt)
{
struct tipc_msg *msg = buf_msg(buf);
int res = msg_data_sz(msg);
if (likely(!link_congested(l_ptr))) {
if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
link_add_to_outqueue(l_ptr, buf, msg);
tipc_bearer_send(l_ptr->bearer_id, buf,
&l_ptr->media_addr);
l_ptr->unacked_window = 0;
return res;
}
else
*used_max_pkt = l_ptr->max_pkt;
}
return __tipc_link_xmit(l_ptr, buf); /* All other cases */
}
/*
* tipc_link_iovec_xmit_fast: Entry for messages where the
* destination processor is known and the header is complete,
* except for total message length.
* Returns user data length or errno.
*/
int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destaddr)
{
struct tipc_msg *hdr = &sender->phdr;
struct tipc_link *l_ptr;
struct sk_buff *buf;
struct tipc_node *node;
int res;
u32 selector = msg_origport(hdr) & 1;
again:
/*
* Try building message using port's max_pkt hint.
* (Must not hold any locks while building message.)
*/
res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
/* Exit if build request was invalid */
if (unlikely(res < 0))
return res;
node = tipc_node_find(destaddr);
if (likely(node)) {
tipc_node_lock(node);
l_ptr = node->active_links[selector];
if (likely(l_ptr)) {
if (likely(buf)) {
res = tipc_link_xmit_fast(l_ptr, buf,
&sender->max_pkt);
exit:
tipc_node_unlock(node);
return res;
}
/* Exit if link (or bearer) is congested */
if (link_congested(l_ptr)) {
res = link_schedule_port(l_ptr,
sender->ref, res);
goto exit;
}
/*
* Message size exceeds max_pkt hint; update hint,
* then re-try fast path or fragment the message
*/
sender->max_pkt = l_ptr->max_pkt;
tipc_node_unlock(node);
if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
goto again;
return tipc_link_iovec_long_xmit(sender, msg_sect,
len, destaddr);
}
tipc_node_unlock(node);
}
/* Couldn't find a link to the destination node */
kfree_skb(buf);
tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
return -ENETUNREACH;
}
/*
* tipc_link_iovec_long_xmit(): Entry for long messages where the
* destination node is known and the header is complete,
* inclusive total message length.
* Link and bearer congestion status have been checked to be ok,
* and are ignored if they change.
*
* Note that fragments do not use the full link MTU so that they won't have
* to undergo refragmentation if link changeover causes them to be sent
* over another link with an additional tunnel header added as prefix.
* (Refragmentation will still occur if the other link has a smaller MTU.)
*
* Returns user data length or errno.
*/
static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destaddr)
{
struct tipc_link *l_ptr;
struct tipc_node *node;
struct tipc_msg *hdr = &sender->phdr;
u32 dsz = len;
u32 max_pkt, fragm_sz, rest;
struct tipc_msg fragm_hdr;
struct sk_buff *buf, *buf_chain, *prev;
u32 fragm_crs, fragm_rest, hsz, sect_rest;
const unchar __user *sect_crs;
int curr_sect;
u32 fragm_no;
int res = 0;
again:
fragm_no = 1;
max_pkt = sender->max_pkt - INT_H_SIZE;
/* leave room for tunnel header in case of link changeover */
fragm_sz = max_pkt - INT_H_SIZE;
/* leave room for fragmentation header in each fragment */
rest = dsz;
fragm_crs = 0;
fragm_rest = 0;
sect_rest = 0;
sect_crs = NULL;
curr_sect = -1;
/* Prepare reusable fragment header */
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, msg_destnode(hdr));
msg_set_size(&fragm_hdr, max_pkt);
msg_set_fragm_no(&fragm_hdr, 1);
/* Prepare header of first fragment */
buf_chain = buf = tipc_buf_acquire(max_pkt);
if (!buf)
return -ENOMEM;
buf->next = NULL;
skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
hsz = msg_hdr_sz(hdr);
skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
/* Chop up message */
fragm_crs = INT_H_SIZE + hsz;
fragm_rest = fragm_sz - hsz;
do { /* For all sections */
u32 sz;
if (!sect_rest) {
sect_rest = msg_sect[++curr_sect].iov_len;
sect_crs = msg_sect[curr_sect].iov_base;
}
if (sect_rest < fragm_rest)
sz = sect_rest;
else
sz = fragm_rest;
if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
res = -EFAULT;
error:
kfree_skb_list(buf_chain);
return res;
}
sect_crs += sz;
sect_rest -= sz;
fragm_crs += sz;
fragm_rest -= sz;
rest -= sz;
if (!fragm_rest && rest) {
/* Initiate new fragment: */
if (rest <= fragm_sz) {
fragm_sz = rest;
msg_set_type(&fragm_hdr, LAST_FRAGMENT);
} else {
msg_set_type(&fragm_hdr, FRAGMENT);
}
msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
msg_set_fragm_no(&fragm_hdr, ++fragm_no);
prev = buf;
buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
if (!buf) {
res = -ENOMEM;
goto error;
}
buf->next = NULL;
prev->next = buf;
skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
fragm_crs = INT_H_SIZE;
fragm_rest = fragm_sz;
}
} while (rest > 0);
/*
* Now we have a buffer chain. Select a link and check
* that packet size is still OK
*/
node = tipc_node_find(destaddr);
if (likely(node)) {
tipc_node_lock(node);
l_ptr = node->active_links[sender->ref & 1];
if (!l_ptr) {
tipc_node_unlock(node);
goto reject;
}
if (l_ptr->max_pkt < max_pkt) {
sender->max_pkt = l_ptr->max_pkt;
tipc_node_unlock(node);
kfree_skb_list(buf_chain);
goto again;
}
} else {
reject:
kfree_skb_list(buf_chain);
tipc_port_iovec_reject(sender, hdr, msg_sect, len,
TIPC_ERR_NO_NODE);
return -ENETUNREACH;
}
/* Append chain of fragments to send queue & send them */
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
l_ptr->stats.sent_fragmented++;
tipc_link_push_queue(l_ptr);
tipc_node_unlock(node);
return dsz;
}
/*
* tipc_link_push_packet: Push one unsent packet to the media
*/
......@@ -1238,7 +1131,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
tipc_bearer_send(l_ptr->bearer_id, buf,
&l_ptr->media_addr);
if (msg_user(msg) == MSG_BUNDLER)
msg_set_type(msg, CLOSED_MSG);
msg_set_type(msg, BUNDLE_CLOSED);
l_ptr->next_out = buf->next;
return 0;
}
......@@ -1590,6 +1483,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_sk_rcv(buf);
continue;
......@@ -1604,10 +1498,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
tipc_node_unlock(n_ptr);
tipc_named_rcv(buf);
continue;
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_port_proto_rcv(buf);
continue;
case BCAST_PROTOCOL:
tipc_link_sync_rcv(n_ptr, buf);
break;
......@@ -2217,6 +2107,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
u32 msgcount = msg_msgcnt(buf_msg(buf));
u32 pos = INT_H_SIZE;
struct sk_buff *obuf;
struct tipc_msg *omsg;
while (msgcount--) {
obuf = buf_extract(buf, pos);
......@@ -2224,8 +2115,16 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
pr_warn("Link unable to unbundle message(s)\n");
break;
}
pos += align(msg_size(buf_msg(obuf)));
tipc_net_route_msg(obuf);
omsg = buf_msg(obuf);
pos += align(msg_size(omsg));
if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
tipc_sk_rcv(obuf);
} else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
tipc_named_rcv(obuf);
} else {
pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
kfree_skb(obuf);
}
}
kfree_skb(buf);
}
......
......@@ -227,8 +227,10 @@ void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
......
......@@ -36,21 +36,16 @@
#include "core.h"
#include "msg.h"
#include "addr.h"
#include "name_table.h"
u32 tipc_msg_tot_importance(struct tipc_msg *m)
#define MAX_FORWARD_SIZE 1024
static unsigned int align(unsigned int i)
{
if (likely(msg_isdata(m))) {
if (likely(msg_orignode(m) == tipc_own_addr))
return msg_importance(m);
return msg_importance(m) + 4;
}
if ((msg_user(m) == MSG_FRAGMENTER) &&
(msg_type(m) == FIRST_FRAGMENT))
return msg_importance(msg_get_wrapped(m));
return msg_importance(m);
return (i + 3) & ~3u;
}
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode)
{
......@@ -152,3 +147,268 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
kfree_skb(*buf);
return 0;
}
/**
* tipc_msg_build2 - create buffer chain containing specified header and data
* @mhdr: Message header, to be prepended to data
* @iov: User data
* @offset: Posision in iov to start copying from
* @dsz: Total length of user data
* @pktmax: Max packet size that can be used
* @chain: Buffer or chain of buffers to be returned to caller
* Returns message data size or errno: -ENOMEM, -EFAULT
*/
int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int pktmax , struct sk_buff **chain)
{
int mhsz = msg_hdr_sz(mhdr);
int msz = mhsz + dsz;
int pktno = 1;
int pktsz;
int pktrem = pktmax;
int drem = dsz;
struct tipc_msg pkthdr;
struct sk_buff *buf, *prev;
char *pktpos;
int rc;
msg_set_size(mhdr, msz);
/* No fragmentation needed? */
if (likely(msz <= pktmax)) {
buf = tipc_buf_acquire(msz);
*chain = buf;
if (unlikely(!buf))
return -ENOMEM;
skb_copy_to_linear_data(buf, mhdr, mhsz);
pktpos = buf->data + mhsz;
if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
return dsz;
rc = -EFAULT;
goto error;
}
/* Prepare reusable fragment header */
tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, msg_destnode(mhdr));
msg_set_size(&pkthdr, pktmax);
msg_set_fragm_no(&pkthdr, pktno);
/* Prepare first fragment */
*chain = buf = tipc_buf_acquire(pktmax);
if (!buf)
return -ENOMEM;
pktpos = buf->data;
skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
pktpos += INT_H_SIZE;
pktrem -= INT_H_SIZE;
skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz);
pktpos += mhsz;
pktrem -= mhsz;
do {
if (drem < pktrem)
pktrem = drem;
if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) {
rc = -EFAULT;
goto error;
}
drem -= pktrem;
offset += pktrem;
if (!drem)
break;
/* Prepare new fragment: */
if (drem < (pktmax - INT_H_SIZE))
pktsz = drem + INT_H_SIZE;
else
pktsz = pktmax;
prev = buf;
buf = tipc_buf_acquire(pktsz);
if (!buf) {
rc = -ENOMEM;
goto error;
}
prev->next = buf;
msg_set_type(&pkthdr, FRAGMENT);
msg_set_size(&pkthdr, pktsz);
msg_set_fragm_no(&pkthdr, ++pktno);
skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
pktpos = buf->data + INT_H_SIZE;
pktrem = pktsz - INT_H_SIZE;
} while (1);
msg_set_type(buf_msg(buf), LAST_FRAGMENT);
return dsz;
error:
kfree_skb_list(*chain);
*chain = NULL;
return rc;
}
/**
* tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
* @bbuf: the existing buffer ("bundle")
* @buf: buffer to be appended
* @mtu: max allowable size for the bundle buffer
* Consumes buffer if successful
* Returns true if bundling could be performed, otherwise false
*/
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
{
struct tipc_msg *bmsg = buf_msg(bbuf);
struct tipc_msg *msg = buf_msg(buf);
unsigned int bsz = msg_size(bmsg);
unsigned int msz = msg_size(msg);
u32 start = align(bsz);
u32 max = mtu - INT_H_SIZE;
u32 pad = start - bsz;
if (likely(msg_user(msg) == MSG_FRAGMENTER))
return false;
if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
return false;
if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
return false;
if (likely(msg_user(bmsg) != MSG_BUNDLER))
return false;
if (likely(msg_type(bmsg) != BUNDLE_OPEN))
return false;
if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
return false;
if (unlikely(max < (start + msz)))
return false;
skb_put(bbuf, pad + msz);
skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
msg_set_size(bmsg, start + msz);
msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
bbuf->next = buf->next;
kfree_skb(buf);
return true;
}
/**
* tipc_msg_make_bundle(): Create bundle buf and append message to its tail
* @buf: buffer to be appended and replaced
* @mtu: max allowable size for the bundle buffer, inclusive header
* @dnode: destination node for message. (Not always present in header)
* Replaces buffer if successful
* Returns true if sucess, otherwise false
*/
bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
{
struct sk_buff *bbuf;
struct tipc_msg *bmsg;
struct tipc_msg *msg = buf_msg(*buf);
u32 msz = msg_size(msg);
u32 max = mtu - INT_H_SIZE;
if (msg_user(msg) == MSG_FRAGMENTER)
return false;
if (msg_user(msg) == CHANGEOVER_PROTOCOL)
return false;
if (msg_user(msg) == BCAST_PROTOCOL)
return false;
if (msz > (max / 2))
return false;
bbuf = tipc_buf_acquire(max);
if (!bbuf)
return false;
skb_trim(bbuf, INT_H_SIZE);
bmsg = buf_msg(bbuf);
tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
msg_set_seqno(bmsg, msg_seqno(msg));
msg_set_ack(bmsg, msg_ack(msg));
msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
bbuf->next = (*buf)->next;
tipc_msg_bundle(bbuf, *buf, mtu);
*buf = bbuf;
return true;
}
/**
* tipc_msg_reverse(): swap source and destination addresses and add error code
* @buf: buffer containing message to be reversed
* @dnode: return value: node where to send message after reversal
* @err: error code to be set in message
* Consumes buffer if failure
* Returns true if success, otherwise false
*/
bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
{
struct tipc_msg *msg = buf_msg(buf);
uint imp = msg_importance(msg);
struct tipc_msg ohdr;
uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
if (skb_linearize(buf))
goto exit;
if (msg_dest_droppable(msg))
goto exit;
if (msg_errcode(msg))
goto exit;
memcpy(&ohdr, msg, msg_hdr_sz(msg));
imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
if (msg_isdata(msg))
msg_set_importance(msg, imp);
msg_set_errcode(msg, err);
msg_set_origport(msg, msg_destport(&ohdr));
msg_set_destport(msg, msg_origport(&ohdr));
msg_set_prevnode(msg, tipc_own_addr);
if (!msg_short(msg)) {
msg_set_orignode(msg, msg_destnode(&ohdr));
msg_set_destnode(msg, msg_orignode(&ohdr));
}
msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
skb_trim(buf, msg_size(msg));
skb_orphan(buf);
*dnode = msg_orignode(&ohdr);
return true;
exit:
kfree_skb(buf);
return false;
}
/**
* tipc_msg_eval: determine fate of message that found no destination
* @buf: the buffer containing the message.
* @dnode: return value: next-hop node, if message to be forwarded
* @err: error code to use, if message to be rejected
*
* Does not consume buffer
* Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error
* code if message to be rejected
*/
int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
{
struct tipc_msg *msg = buf_msg(buf);
u32 dport;
if (msg_type(msg) != TIPC_NAMED_MSG)
return -TIPC_ERR_NO_PORT;
if (skb_linearize(buf))
return -TIPC_ERR_NO_NAME;
if (msg_data_sz(msg) > MAX_FORWARD_SIZE)
return -TIPC_ERR_NO_NAME;
if (msg_reroute_cnt(msg) > 0)
return -TIPC_ERR_NO_NAME;
*dnode = addr_domain(msg_lookup_scope(msg));
dport = tipc_nametbl_translate(msg_nametype(msg),
msg_nameinst(msg),
dnode);
if (!dport)
return -TIPC_ERR_NO_NAME;
msg_incr_reroute_cnt(msg);
msg_set_destnode(msg, *dnode);
msg_set_destport(msg, dport);
return TIPC_OK;
}
......@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define FRAGMENT 1
#define LAST_FRAGMENT 2
/* Bundling protocol message types
*/
#define BUNDLE_OPEN 0
#define BUNDLE_CLOSED 1
/*
* Link management protocol message types
*/
......@@ -706,12 +711,37 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
u32 tipc_msg_tot_importance(struct tipc_msg *m);
static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
{
if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
return msg_importance(msg_get_wrapped(m));
return msg_importance(m);
}
static inline u32 msg_tot_origport(struct tipc_msg *m)
{
if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
return msg_origport(msg_get_wrapped(m));
return msg_origport(m);
}
bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode);
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
unsigned int len, int max_size, struct sk_buff **buf);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int mtu , struct sk_buff **chain);
#endif
/*
* net/tipc/net.c: TIPC network routing code
*
* Copyright (c) 1995-2006, Ericsson AB
* Copyright (c) 1995-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
......@@ -104,67 +104,6 @@
* - A local spin_lock protecting the queue of subscriber events.
*/
static void net_route_named_msg(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
u32 dnode;
u32 dport;
if (!msg_named(msg)) {
kfree_skb(buf);
return;
}
dnode = addr_domain(msg_lookup_scope(msg));
dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode);
if (dport) {
msg_set_destnode(msg, dnode);
msg_set_destport(msg, dport);
tipc_net_route_msg(buf);
return;
}
tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
}
void tipc_net_route_msg(struct sk_buff *buf)
{
struct tipc_msg *msg;
u32 dnode;
if (!buf)
return;
msg = buf_msg(buf);
/* Handle message for this node */
dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
if (tipc_in_scope(dnode, tipc_own_addr)) {
if (msg_isdata(msg)) {
if (msg_mcast(msg))
tipc_port_mcast_rcv(buf, NULL);
else if (msg_destport(msg))
tipc_sk_rcv(buf);
else
net_route_named_msg(buf);
return;
}
switch (msg_user(msg)) {
case NAME_DISTRIBUTOR:
tipc_named_rcv(buf);
break;
case CONN_MANAGER:
tipc_port_proto_rcv(buf);
break;
default:
kfree_skb(buf);
}
return;
}
/* Handle message for another node */
skb_trim(buf, msg_size(msg));
tipc_link_xmit(buf, dnode, msg_link_selector(msg));
}
int tipc_net_start(u32 addr)
{
char addr_string[16];
......
......@@ -37,8 +37,6 @@
#ifndef _TIPC_NET_H
#define _TIPC_NET_H
void tipc_net_route_msg(struct sk_buff *buf);
int tipc_net_start(u32 addr);
void tipc_net_stop(void);
......
/*
* net/tipc/node.c: TIPC node management routines
*
* Copyright (c) 2000-2006, 2012 Ericsson AB
* Copyright (c) 2000-2006, 2012-2014, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved.
*
......@@ -155,21 +155,25 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
if (!active[0]) {
active[0] = active[1] = l_ptr;
node_established_contact(n_ptr);
return;
goto exit;
}
if (l_ptr->priority < active[0]->priority) {
pr_info("New link <%s> becomes standby\n", l_ptr->name);
return;
goto exit;
}
tipc_link_dup_queue_xmit(active[0], l_ptr);
if (l_ptr->priority == active[0]->priority) {
active[0] = l_ptr;
return;
goto exit;
}
pr_info("Old link <%s> becomes standby\n", active[0]->name);
if (active[1] != active[0])
pr_info("Old link <%s> becomes standby\n", active[1]->name);
active[0] = active[1] = l_ptr;
exit:
/* Leave room for changeover header when returning 'mtu' to users: */
n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
}
/**
......@@ -229,6 +233,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
tipc_link_failover_send_queue(l_ptr);
else
node_lost_contact(n_ptr);
/* Leave room for changeover header when returning 'mtu' to users: */
if (active[0]) {
n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
return;
}
/* Loopback link went down? No fragmentation needed from now on. */
if (n_ptr->addr == tipc_own_addr) {
n_ptr->act_mtus[0] = MAX_MSG_SIZE;
n_ptr->act_mtus[1] = MAX_MSG_SIZE;
}
}
int tipc_node_active_links(struct tipc_node *n_ptr)
......
......@@ -41,6 +41,7 @@
#include "addr.h"
#include "net.h"
#include "bearer.h"
#include "msg.h"
/*
* Out-of-range value for node signature
......@@ -105,6 +106,7 @@ struct tipc_node {
spinlock_t lock;
struct hlist_node hash;
struct tipc_link *active_links[2];
u32 act_mtus[2];
struct tipc_link *links[MAX_BEARERS];
unsigned int action_flags;
struct tipc_node_bclink bclink;
......@@ -143,4 +145,19 @@ static inline bool tipc_node_blocked(struct tipc_node *node)
TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
}
static inline uint tipc_node_get_mtu(u32 addr, u32 selector)
{
struct tipc_node *node;
u32 mtu;
node = tipc_node_find(addr);
if (likely(node))
mtu = node->act_mtus[selector & 1];
else
mtu = MAX_MSG_SIZE;
return mtu;
}
#endif
......@@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
void tipc_nodesub_notify(struct list_head *nsub_list)
{
struct tipc_node_subscr *ns, *safe;
net_ev_handler handle_node_down;
list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
if (ns->handle_node_down) {
ns->handle_node_down(ns->usr_handle);
handle_node_down = ns->handle_node_down;
if (handle_node_down) {
ns->handle_node_down = NULL;
handle_node_down(ns->usr_handle);
}
}
}
......@@ -42,8 +42,6 @@
/* Connection management: */
#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
#define CONFIRMED 0
#define PROBING 1
#define MAX_REJECT_SIZE 1024
......@@ -188,12 +186,6 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
tipc_port_list_free(dp);
}
void tipc_port_wakeup(struct tipc_port *port)
{
tipc_sock_wakeup(tipc_port_to_sock(port));
}
/* tipc_port_init - intiate TIPC port and lock it
*
* Returns obtained reference if initialization is successful, zero otherwise
......@@ -235,6 +227,8 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
void tipc_port_destroy(struct tipc_port *p_ptr)
{
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
u32 peer;
tipc_withdraw(p_ptr, 0, NULL);
......@@ -246,14 +240,15 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
if (p_ptr->connected) {
buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
tipc_nodesub_unsubscribe(&p_ptr->subscription);
msg = buf_msg(buf);
peer = msg_destnode(msg);
tipc_link_xmit2(buf, peer, msg_link_selector(msg));
}
spin_lock_bh(&tipc_port_list_lock);
list_del(&p_ptr->port_list);
list_del(&p_ptr->wait_list);
spin_unlock_bh(&tipc_port_list_lock);
k_term_timer(&p_ptr->timer);
tipc_net_route_msg(buf);
}
/*
......@@ -275,100 +270,16 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
msg_set_destport(msg, tipc_port_peerport(p_ptr));
msg_set_origport(msg, p_ptr->ref);
msg_set_msgcnt(msg, ack);
buf->next = NULL;
}
return buf;
}
int tipc_reject_msg(struct sk_buff *buf, u32 err)
{
struct tipc_msg *msg = buf_msg(buf);
struct sk_buff *rbuf;
struct tipc_msg *rmsg;
int hdr_sz;
u32 imp;
u32 data_sz = msg_data_sz(msg);
u32 src_node;
u32 rmsg_sz;
/* discard rejected message if it shouldn't be returned to sender */
if (WARN(!msg_isdata(msg),
"attempt to reject message with user=%u", msg_user(msg))) {
dump_stack();
goto exit;
}
if (msg_errcode(msg) || msg_dest_droppable(msg))
goto exit;
/*
* construct returned message by copying rejected message header and
* data (or subset), then updating header fields that need adjusting
*/
hdr_sz = msg_hdr_sz(msg);
rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
rbuf = tipc_buf_acquire(rmsg_sz);
if (rbuf == NULL)
goto exit;
rmsg = buf_msg(rbuf);
skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
if (msg_connected(rmsg)) {
imp = msg_importance(rmsg);
if (imp < TIPC_CRITICAL_IMPORTANCE)
msg_set_importance(rmsg, ++imp);
}
msg_set_non_seq(rmsg, 0);
msg_set_size(rmsg, rmsg_sz);
msg_set_errcode(rmsg, err);
msg_set_prevnode(rmsg, tipc_own_addr);
msg_swap_words(rmsg, 4, 5);
if (!msg_short(rmsg))
msg_swap_words(rmsg, 6, 7);
/* send self-abort message when rejecting on a connected port */
if (msg_connected(msg)) {
struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
if (p_ptr) {
struct sk_buff *abuf = NULL;
if (p_ptr->connected)
abuf = port_build_self_abort_msg(p_ptr, err);
tipc_port_unlock(p_ptr);
tipc_net_route_msg(abuf);
}
}
/* send returned message & dispose of rejected message */
src_node = msg_prevnode(msg);
if (in_own_node(src_node))
tipc_sk_rcv(rbuf);
else
tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
exit:
kfree_skb(buf);
return data_sz;
}
int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
struct iovec const *msg_sect, unsigned int len,
int err)
{
struct sk_buff *buf;
int res;
res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
if (!buf)
return res;
return tipc_reject_msg(buf, err);
}
static void port_timeout(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
......@@ -379,15 +290,16 @@ static void port_timeout(unsigned long ref)
}
/* Last probe answered ? */
if (p_ptr->probing_state == PROBING) {
if (p_ptr->probing_state == TIPC_CONN_PROBING) {
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
} else {
buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
p_ptr->probing_state = PROBING;
p_ptr->probing_state = TIPC_CONN_PROBING;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
}
tipc_port_unlock(p_ptr);
tipc_net_route_msg(buf);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
}
......@@ -395,12 +307,14 @@ static void port_handle_node_down(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
tipc_port_unlock(p_ptr);
tipc_net_route_msg(buf);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
}
......@@ -412,6 +326,7 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er
struct tipc_msg *msg = buf_msg(buf);
msg_swap_words(msg, 4, 5);
msg_swap_words(msg, 6, 7);
buf->next = NULL;
}
return buf;
}
......@@ -436,60 +351,11 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
if (imp < TIPC_CRITICAL_IMPORTANCE)
msg_set_importance(msg, ++imp);
msg_set_errcode(msg, err);
buf->next = NULL;
}
return buf;
}
void tipc_port_proto_rcv(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port *p_ptr;
struct sk_buff *r_buf = NULL;
u32 destport = msg_destport(msg);
int wakeable;
/* Validate connection */
p_ptr = tipc_port_lock(destport);
if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
r_buf = tipc_buf_acquire(BASIC_H_SIZE);
if (r_buf) {
msg = buf_msg(r_buf);
tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
BASIC_H_SIZE, msg_orignode(msg));
msg_set_errcode(msg, TIPC_ERR_NO_PORT);
msg_set_origport(msg, destport);
msg_set_destport(msg, msg_origport(msg));
}
if (p_ptr)
tipc_port_unlock(p_ptr);
goto exit;
}
/* Process protocol message sent by peer */
switch (msg_type(msg)) {
case CONN_ACK:
wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;
p_ptr->acked += msg_msgcnt(msg);
if (!tipc_port_congested(p_ptr)) {
p_ptr->congested = 0;
if (wakeable)
tipc_port_wakeup(p_ptr);
}
break;
case CONN_PROBE:
r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
break;
default:
/* CONN_PROBE_REPLY or unrecognized - no action required */
break;
}
p_ptr->probing_state = CONFIRMED;
tipc_port_unlock(p_ptr);
exit:
tipc_net_route_msg(r_buf);
kfree_skb(buf);
}
static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
{
struct publication *publ;
......@@ -581,16 +447,19 @@ void tipc_acknowledge(u32 ref, u32 ack)
{
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
struct tipc_msg *msg;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return;
if (p_ptr->connected) {
p_ptr->conn_unacked -= ack;
if (p_ptr->connected)
buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
}
tipc_port_unlock(p_ptr);
tipc_net_route_msg(buf);
if (!buf)
return;
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
}
int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
......@@ -689,7 +558,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
msg_set_hdr_sz(msg, SHORT_H_SIZE);
p_ptr->probing_interval = PROBING_INTERVAL;
p_ptr->probing_state = CONFIRMED;
p_ptr->probing_state = TIPC_CONN_OK;
p_ptr->connected = 1;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
......@@ -698,7 +567,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
(net_ev_handler)port_handle_node_down);
res = 0;
exit:
p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
return res;
}
......@@ -741,6 +610,7 @@ int tipc_port_disconnect(u32 ref)
*/
int tipc_port_shutdown(u32 ref)
{
struct tipc_msg *msg;
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
......@@ -750,149 +620,7 @@ int tipc_port_shutdown(u32 ref)
buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
tipc_port_unlock(p_ptr);
tipc_net_route_msg(buf);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
return tipc_port_disconnect(ref);
}
/*
* tipc_port_iovec_rcv: Concatenate and deliver sectioned
* message for this node.
*/
static int tipc_port_iovec_rcv(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len)
{
struct sk_buff *buf;
int res;
res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
if (likely(buf))
tipc_sk_rcv(buf);
return res;
}
/**
* tipc_send - send message sections on connection
*/
int tipc_send(struct tipc_port *p_ptr,
struct iovec const *msg_sect,
unsigned int len)
{
u32 destnode;
int res;
if (!p_ptr->connected)
return -EINVAL;
p_ptr->congested = 1;
if (!tipc_port_congested(p_ptr)) {
destnode = tipc_port_peernode(p_ptr);
if (likely(!in_own_node(destnode)))
res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
destnode);
else
res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
if (likely(res != -ELINKCONG)) {
p_ptr->congested = 0;
if (res > 0)
p_ptr->sent++;
return res;
}
}
if (tipc_port_unreliable(p_ptr)) {
p_ptr->congested = 0;
return len;
}
return -ELINKCONG;
}
/**
* tipc_send2name - send message sections to port name
*/
int tipc_send2name(struct tipc_port *p_ptr,
struct tipc_name const *name,
unsigned int domain,
struct iovec const *msg_sect,
unsigned int len)
{
struct tipc_msg *msg;
u32 destnode = domain;
u32 destport;
int res;
if (p_ptr->connected)
return -EINVAL;
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_NAMED_MSG);
msg_set_hdr_sz(msg, NAMED_H_SIZE);
msg_set_nametype(msg, name->type);
msg_set_nameinst(msg, name->instance);
msg_set_lookup_scope(msg, tipc_addr_scope(domain));
destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
msg_set_destnode(msg, destnode);
msg_set_destport(msg, destport);
if (likely(destport || destnode)) {
if (likely(in_own_node(destnode)))
res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
else if (tipc_own_addr)
res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
destnode);
else
res = tipc_port_iovec_reject(p_ptr, msg, msg_sect,
len, TIPC_ERR_NO_NODE);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
return res;
}
if (tipc_port_unreliable(p_ptr))
return len;
return -ELINKCONG;
}
return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
TIPC_ERR_NO_NAME);
}
/**
* tipc_send2port - send message sections to port identity
*/
int tipc_send2port(struct tipc_port *p_ptr,
struct tipc_portid const *dest,
struct iovec const *msg_sect,
unsigned int len)
{
struct tipc_msg *msg;
int res;
if (p_ptr->connected)
return -EINVAL;
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_destnode(msg, dest->node);
msg_set_destport(msg, dest->ref);
msg_set_hdr_sz(msg, BASIC_H_SIZE);
if (in_own_node(dest->node))
res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
else if (tipc_own_addr)
res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
dest->node);
else
res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
TIPC_ERR_NO_NODE);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
return res;
}
if (tipc_port_unreliable(p_ptr))
return len;
return -ELINKCONG;
}
......@@ -53,17 +53,13 @@
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established
* @conn_unacked: number of unacknowledged messages received from peer port
* @published: non-zero if port has one or more associated names
* @congested: non-zero if cannot send because of link or port congestion
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
* @sent: # of non-empty messages sent by port
* @acked: # of non-empty message acknowledgements from connected port's peer
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
......@@ -76,17 +72,13 @@ struct tipc_port {
int connected;
u32 conn_type;
u32 conn_instance;
u32 conn_unacked;
int published;
u32 congested;
u32 max_pkt;
u32 ref;
struct tipc_msg phdr;
struct list_head port_list;
struct list_head wait_list;
u32 waiting_pkts;
u32 sent;
u32 acked;
struct list_head publications;
u32 pub_count;
u32 probing_state;
......@@ -104,8 +96,6 @@ struct tipc_port_list;
u32 tipc_port_init(struct tipc_port *p_ptr,
const unsigned int importance);
int tipc_reject_msg(struct sk_buff *buf, u32 err);
void tipc_acknowledge(u32 port_ref, u32 ack);
void tipc_port_destroy(struct tipc_port *p_ptr);
......@@ -122,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref);
void tipc_port_wakeup(struct tipc_port *port);
/*
* The following routines require that the port be locked on entry
*/
......@@ -136,34 +124,12 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
* TIPC messaging routines
*/
int tipc_send(struct tipc_port *port,
struct iovec const *msg_sect,
unsigned int len);
int tipc_send2name(struct tipc_port *port,
struct tipc_name const *name,
u32 domain,
struct iovec const *msg_sect,
unsigned int len);
int tipc_send2port(struct tipc_port *port,
struct tipc_portid const *dest,
struct iovec const *msg_sect,
unsigned int len);
int tipc_port_mcast_xmit(struct tipc_port *port,
struct tipc_name_seq const *seq,
struct iovec const *msg,
unsigned int len);
int tipc_port_iovec_reject(struct tipc_port *p_ptr,
struct tipc_msg *hdr,
struct iovec const *msg_sect,
unsigned int len,
int err);
struct sk_buff *tipc_port_get_ports(void);
void tipc_port_proto_rcv(struct sk_buff *buf);
void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
void tipc_port_reinit(void);
......@@ -185,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
spin_unlock_bh(p_ptr->lock);
}
static inline int tipc_port_congested(struct tipc_port *p_ptr)
{
return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
}
static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
{
return msg_destnode(&p_ptr->phdr);
......
此差异已折叠。
......@@ -38,6 +38,9 @@
#include "port.h"
#include <net/sock.h>
#define TIPC_CONN_OK 0
#define TIPC_CONN_PROBING 1
/**
* struct tipc_sock - TIPC socket structure
* @sk: socket - interacts with 'port' and with user via the socket API
......@@ -45,6 +48,9 @@
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
*/
struct tipc_sock {
......@@ -52,6 +58,9 @@ struct tipc_sock {
struct tipc_port port;
unsigned int conn_timeout;
atomic_t dupl_rcvcnt;
int link_cong;
uint sent_unacked;
uint rcv_unacked;
};
static inline struct tipc_sock *tipc_sk(const struct sock *sk)
......@@ -69,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
tsk->sk.sk_write_space(&tsk->sk);
}
static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
{
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
}
int tipc_sk_rcv(struct sk_buff *buf);
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册