提交 dd3f9e70 编写于 作者: J Jon Paul Maloy 提交者: David S. Miller

tipc: add packet sequence number at instant of transmission

Currently, the packet sequence number is updated and added to each
packet at the moment a packet is added to the link backlog queue.
This is wasteful, since it forces the code to traverse the send
packet list packet by packet when adding them to the backlog queue.
It would be better to just splice the whole packet list into the
backlog queue when that is the right action to do.

In this commit, we do this change. Also, since the sequence numbers
cannot now be assigned to the packets at the moment they are added
the backlog queue, we do instead calculate and add them at the moment
of transmission, when the backlog queue has to be traversed anyway.
We do this in the function tipc_link_push_packet().
Reviewed-by: NErik Hugne <erik.hugne@ericsson.com>
Reviewed-by: NYing Xue <ying.xue@windriver.com>
Signed-off-by: NJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
上级 f21e897e
...@@ -115,12 +115,8 @@ static void bclink_set_last_sent(struct net *net) ...@@ -115,12 +115,8 @@ static void bclink_set_last_sent(struct net *net)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl; struct tipc_link *bcl = tn->bcl;
struct sk_buff *skb = skb_peek(&bcl->backlogq);
if (skb) bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
bcl->silent_intv_cnt = mod(buf_seqno(skb) - 1);
else
bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
} }
u32 tipc_bclink_get_last_sent(struct net *net) u32 tipc_bclink_get_last_sent(struct net *net)
......
...@@ -653,7 +653,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, ...@@ -653,7 +653,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct tipc_media_addr *addr = &link->media_addr; struct tipc_media_addr *addr = &link->media_addr;
struct sk_buff_head *transmq = &link->transmq; struct sk_buff_head *transmq = &link->transmq;
struct sk_buff_head *backlogq = &link->backlogq; struct sk_buff_head *backlogq = &link->backlogq;
struct sk_buff *skb, *tmp; struct sk_buff *skb, *bskb;
/* Match msg importance against this and all higher backlog limits: */ /* Match msg importance against this and all higher backlog limits: */
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
...@@ -665,32 +665,36 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, ...@@ -665,32 +665,36 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
return -EMSGSIZE; return -EMSGSIZE;
} }
/* Prepare each packet for sending, and add to relevant queue: */ /* Prepare each packet for sending, and add to relevant queue: */
skb_queue_walk_safe(list, skb, tmp) { while (skb_queue_len(list)) {
__skb_unlink(skb, list); skb = skb_peek(list);
msg = buf_msg(skb); msg = buf_msg(skb);
msg_set_seqno(msg, seqno); msg_set_seqno(msg, seqno);
msg_set_ack(msg, ack); msg_set_ack(msg, ack);
msg_set_bcast_ack(msg, bc_last_in); msg_set_bcast_ack(msg, bc_last_in);
if (likely(skb_queue_len(transmq) < maxwin)) { if (likely(skb_queue_len(transmq) < maxwin)) {
__skb_dequeue(list);
__skb_queue_tail(transmq, skb); __skb_queue_tail(transmq, skb);
tipc_bearer_send(net, link->bearer_id, skb, addr); tipc_bearer_send(net, link->bearer_id, skb, addr);
link->rcv_unacked = 0; link->rcv_unacked = 0;
seqno++; seqno++;
continue; continue;
} }
if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) { if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
kfree_skb(__skb_dequeue(list));
link->stats.sent_bundled++; link->stats.sent_bundled++;
continue; continue;
} }
if (tipc_msg_make_bundle(&skb, mtu, link->addr)) { if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
kfree_skb(__skb_dequeue(list));
__skb_queue_tail(backlogq, bskb);
link->backlog[msg_importance(buf_msg(bskb))].len++;
link->stats.sent_bundled++; link->stats.sent_bundled++;
link->stats.sent_bundles++; link->stats.sent_bundles++;
imp = msg_importance(buf_msg(skb)); continue;
} }
__skb_queue_tail(backlogq, skb); link->backlog[imp].len += skb_queue_len(list);
link->backlog[imp].len++; skb_queue_splice_tail_init(list, backlogq);
seqno++;
} }
link->snd_nxt = seqno; link->snd_nxt = seqno;
return 0; return 0;
...@@ -822,6 +826,7 @@ void tipc_link_push_packets(struct tipc_link *link) ...@@ -822,6 +826,7 @@ void tipc_link_push_packets(struct tipc_link *link)
{ {
struct sk_buff *skb; struct sk_buff *skb;
struct tipc_msg *msg; struct tipc_msg *msg;
u16 seqno = link->snd_nxt;
u16 ack = mod(link->rcv_nxt - 1); u16 ack = mod(link->rcv_nxt - 1);
while (skb_queue_len(&link->transmq) < link->window) { while (skb_queue_len(&link->transmq) < link->window) {
...@@ -831,12 +836,15 @@ void tipc_link_push_packets(struct tipc_link *link) ...@@ -831,12 +836,15 @@ void tipc_link_push_packets(struct tipc_link *link)
msg = buf_msg(skb); msg = buf_msg(skb);
link->backlog[msg_importance(msg)].len--; link->backlog[msg_importance(msg)].len--;
msg_set_ack(msg, ack); msg_set_ack(msg, ack);
msg_set_seqno(msg, seqno);
seqno = mod(seqno + 1);
msg_set_bcast_ack(msg, link->owner->bclink.last_in); msg_set_bcast_ack(msg, link->owner->bclink.last_in);
link->rcv_unacked = 0; link->rcv_unacked = 0;
__skb_queue_tail(&link->transmq, skb); __skb_queue_tail(&link->transmq, skb);
tipc_bearer_send(link->owner->net, link->bearer_id, tipc_bearer_send(link->owner->net, link->bearer_id,
skb, &link->media_addr); skb, &link->media_addr);
} }
link->snd_nxt = seqno;
} }
void tipc_link_reset_all(struct tipc_node *node) void tipc_link_reset_all(struct tipc_node *node)
...@@ -1526,6 +1534,11 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) ...@@ -1526,6 +1534,11 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);
skb_queue_walk(&l_ptr->backlogq, skb) {
msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt);
l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1);
}
skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
tipc_link_purge_backlog(l_ptr); tipc_link_purge_backlog(l_ptr);
msgcount = skb_queue_len(&l_ptr->transmq); msgcount = skb_queue_len(&l_ptr->transmq);
...@@ -1586,6 +1599,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *link, ...@@ -1586,6 +1599,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *link,
struct tipc_msg tnl_hdr; struct tipc_msg tnl_hdr;
struct sk_buff_head *queue = &link->transmq; struct sk_buff_head *queue = &link->transmq;
int mcnt; int mcnt;
u16 seqno;
tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
SYNCH_MSG, INT_H_SIZE, link->addr); SYNCH_MSG, INT_H_SIZE, link->addr);
...@@ -1617,6 +1631,11 @@ void tipc_link_dup_queue_xmit(struct tipc_link *link, ...@@ -1617,6 +1631,11 @@ void tipc_link_dup_queue_xmit(struct tipc_link *link,
} }
if (queue == &link->backlogq) if (queue == &link->backlogq)
return; return;
seqno = link->snd_nxt;
skb_queue_walk(&link->backlogq, skb) {
msg_set_seqno(buf_msg(skb), seqno);
seqno = mod(seqno + 1);
}
queue = &link->backlogq; queue = &link->backlogq;
goto tunnel_queue; goto tunnel_queue;
} }
......
...@@ -331,16 +331,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, ...@@ -331,16 +331,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
/** /**
* tipc_msg_bundle(): Append contents of a buffer to tail of an existing one * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
* @bskb: the buffer to append to ("bundle") * @skb: the buffer to append to ("bundle")
* @skb: buffer to be appended * @msg: message to be appended
* @mtu: max allowable size for the bundle buffer * @mtu: max allowable size for the bundle buffer
* Consumes buffer if successful * Consumes buffer if successful
* Returns true if bundling could be performed, otherwise false * Returns true if bundling could be performed, otherwise false
*/ */
bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu)
{ {
struct tipc_msg *bmsg; struct tipc_msg *bmsg;
struct tipc_msg *msg = buf_msg(skb);
unsigned int bsz; unsigned int bsz;
unsigned int msz = msg_size(msg); unsigned int msz = msg_size(msg);
u32 start, pad; u32 start, pad;
...@@ -348,9 +347,9 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) ...@@ -348,9 +347,9 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
if (likely(msg_user(msg) == MSG_FRAGMENTER)) if (likely(msg_user(msg) == MSG_FRAGMENTER))
return false; return false;
if (!bskb) if (!skb)
return false; return false;
bmsg = buf_msg(bskb); bmsg = buf_msg(skb);
bsz = msg_size(bmsg); bsz = msg_size(bmsg);
start = align(bsz); start = align(bsz);
pad = start - bsz; pad = start - bsz;
...@@ -359,9 +358,9 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) ...@@ -359,9 +358,9 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
return false; return false;
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
return false; return false;
if (likely(msg_user(bmsg) != MSG_BUNDLER)) if (unlikely(msg_user(bmsg) != MSG_BUNDLER))
return false; return false;
if (unlikely(skb_tailroom(bskb) < (pad + msz))) if (unlikely(skb_tailroom(skb) < (pad + msz)))
return false; return false;
if (unlikely(max < (start + msz))) if (unlikely(max < (start + msz)))
return false; return false;
...@@ -369,11 +368,10 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) ...@@ -369,11 +368,10 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
(msg_importance(bmsg) == TIPC_SYSTEM_IMPORTANCE)) (msg_importance(bmsg) == TIPC_SYSTEM_IMPORTANCE))
return false; return false;
skb_put(bskb, pad + msz); skb_put(skb, pad + msz);
skb_copy_to_linear_data_offset(bskb, start, skb->data, msz); skb_copy_to_linear_data_offset(skb, start, msg, msz);
msg_set_size(bmsg, start + msz); msg_set_size(bmsg, start + msz);
msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
kfree_skb(skb);
return true; return true;
} }
...@@ -419,18 +417,18 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos) ...@@ -419,18 +417,18 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
/** /**
* tipc_msg_make_bundle(): Create bundle buf and append message to its tail * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
* @list: the buffer chain * @list: the buffer chain, where head is the buffer to replace/append
* @skb: buffer to be appended and replaced * @skb: buffer to be created, appended to and returned in case of success
* @msg: message to be appended
* @mtu: max allowable size for the bundle buffer, inclusive header * @mtu: max allowable size for the bundle buffer, inclusive header
* @dnode: destination node for message. (Not always present in header) * @dnode: destination node for message. (Not always present in header)
* Replaces buffer if successful
* Returns true if success, otherwise false * Returns true if success, otherwise false
*/ */
bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode) bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
u32 mtu, u32 dnode)
{ {
struct sk_buff *bskb; struct sk_buff *_skb;
struct tipc_msg *bmsg; struct tipc_msg *bmsg;
struct tipc_msg *msg = buf_msg(*skb);
u32 msz = msg_size(msg); u32 msz = msg_size(msg);
u32 max = mtu - INT_H_SIZE; u32 max = mtu - INT_H_SIZE;
...@@ -443,12 +441,12 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode) ...@@ -443,12 +441,12 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
if (msz > (max / 2)) if (msz > (max / 2))
return false; return false;
bskb = tipc_buf_acquire(max); _skb = tipc_buf_acquire(max);
if (!bskb) if (!_skb)
return false; return false;
skb_trim(bskb, INT_H_SIZE); skb_trim(_skb, INT_H_SIZE);
bmsg = buf_msg(bskb); bmsg = buf_msg(_skb);
tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
INT_H_SIZE, dnode); INT_H_SIZE, dnode);
if (msg_isdata(msg)) if (msg_isdata(msg))
...@@ -458,8 +456,8 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode) ...@@ -458,8 +456,8 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_seqno(bmsg, msg_seqno(msg));
msg_set_ack(bmsg, msg_ack(msg)); msg_set_ack(bmsg, msg_ack(msg));
msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
tipc_msg_bundle(bskb, *skb, mtu); tipc_msg_bundle(_skb, msg, mtu);
*skb = bskb; *skb = _skb;
return true; return true;
} }
......
...@@ -776,9 +776,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, ...@@ -776,9 +776,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
uint data_sz, u32 dnode, u32 onode, uint data_sz, u32 dnode, u32 onode,
u32 dport, u32 oport, int errcode); u32 dport, u32 oport, int errcode);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu); bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode); u32 mtu, u32 dnode);
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list); int offset, int dsz, int mtu, struct sk_buff_head *list);
......
/* /*
* net/tipc/node.c: TIPC node management routines * net/tipc/node.c: TIPC node management routines
* *
* Copyright (c) 2000-2006, 2012-2014, Ericsson AB * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved. * All rights reserved.
* *
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册