diff --git a/net/tipc/link.c b/net/tipc/link.c index 52d23b3ffaf5c450747276376978d58c724042b3..3cb9f326ee6f1b5421364896a242f410a4d8c902 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -151,6 +151,7 @@ struct tipc_link { /* Failover/synch */ u16 drop_point; struct sk_buff *failover_reasm_skb; + struct sk_buff_head failover_deferdq; /* Max packet negotiation */ u16 mtu; @@ -209,6 +210,7 @@ enum { }; #define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */ +#define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1)) /* * Interval between NACKs when packets arrive out of order @@ -246,6 +248,10 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, static void tipc_link_build_bc_init_msg(struct tipc_link *l, struct sk_buff_head *xmitq); static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data); +static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, + struct tipc_gap_ack_blks *ga, + struct sk_buff_head *xmitq); /* * Simple non-static link routines (i.e. referenced outside this file) @@ -493,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, __skb_queue_head_init(&l->transmq); __skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->deferdq); + __skb_queue_head_init(&l->failover_deferdq); skb_queue_head_init(&l->wakeupq); skb_queue_head_init(l->inputq); return true; @@ -883,6 +890,7 @@ void tipc_link_reset(struct tipc_link *l) __skb_queue_purge(&l->transmq); __skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->backlogq); + __skb_queue_purge(&l->failover_deferdq); l->backlog[TIPC_LOW_IMPORTANCE].len = 0; l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; @@ -1154,34 +1162,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, * Consumes buffer */ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *inputq) + struct sk_buff_head *inputq, + struct sk_buff **reasm_skb) { struct tipc_msg *hdr = buf_msg(skb); - struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; struct sk_buff_head tmpq; int usr = msg_user(hdr); - int rc = 0; int pos = 0; - int ipos = 0; - - if (unlikely(usr == TUNNEL_PROTOCOL)) { - if (msg_type(hdr) == SYNCH_MSG) { - __skb_queue_purge(&l->deferdq); - goto drop; - } - if (!tipc_msg_extract(skb, &iskb, &ipos)) - return rc; - kfree_skb(skb); - skb = iskb; - hdr = buf_msg(skb); - if (less(msg_seqno(hdr), l->drop_point)) - goto drop; - if (tipc_data_input(l, skb, inputq)) - return rc; - usr = msg_user(hdr); - reasm_skb = &l->failover_reasm_skb; - } if (usr == MSG_BUNDLER) { skb_queue_head_init(&tmpq); @@ -1206,11 +1194,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); tipc_bcast_unlock(l->net); } -drop: + kfree_skb(skb); return 0; } +/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the + * inner message along with the ones in the old link's + * deferdq + * @l: tunnel link + * @skb: TUNNEL_PROTOCOL message + * @inputq: queue to put messages ready for delivery + */ +static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *inputq) +{ + struct sk_buff **reasm_skb = &l->failover_reasm_skb; + struct sk_buff_head *fdefq = &l->failover_deferdq; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff *iskb; + int ipos = 0; + int rc = 0; + u16 seqno; + + /* SYNCH_MSG */ + if (msg_type(hdr) == SYNCH_MSG) + goto drop; + + /* FAILOVER_MSG */ + if (!tipc_msg_extract(skb, &iskb, &ipos)) { + pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", + skb_queue_len(fdefq)); + return rc; + } + + do { + seqno = buf_seqno(iskb); + + if (unlikely(less(seqno, l->drop_point))) { + kfree_skb(iskb); + continue; + } + + if (unlikely(seqno != l->drop_point)) { + __tipc_skb_queue_sorted(fdefq, seqno, iskb); + continue; + } + + l->drop_point++; + + if (!tipc_data_input(l, iskb, inputq)) + rc |= tipc_link_input(l, iskb, inputq, reasm_skb); + if (unlikely(rc)) + break; + } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); + +drop: + kfree_skb(skb); + return rc; +} + static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) { bool released = false; @@ -1226,6 +1269,106 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) return released; } +/* tipc_build_gap_ack_blks - build Gap ACK blocks + * @l: tipc link that data have come with gaps in sequence if any + * @data: data buffer to store the Gap ACK blocks after built + * + * returns the actual allocated memory size + */ +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data) +{ + struct sk_buff *skb = skb_peek(&l->deferdq); + struct tipc_gap_ack_blks *ga = data; + u16 len, expect, seqno = 0; + u8 n = 0; + + if (!skb) + goto exit; + + expect = buf_seqno(skb); + skb_queue_walk(&l->deferdq, skb) { + seqno = buf_seqno(skb); + if (unlikely(more(seqno, expect))) { + ga->gacks[n].ack = htons(expect - 1); + ga->gacks[n].gap = htons(seqno - expect); + if (++n >= MAX_GAP_ACK_BLKS) { + pr_info_ratelimited("Too few Gap ACK blocks!\n"); + goto exit; + } + } else if (unlikely(less(seqno, expect))) { + pr_warn("Unexpected skb in deferdq!\n"); + continue; + } + expect = seqno + 1; + } + + /* last block */ + ga->gacks[n].ack = htons(seqno); + ga->gacks[n].gap = 0; + n++; + +exit: + len = tipc_gap_ack_blks_sz(n); + ga->len = htons(len); + ga->gack_cnt = n; + return len; +} + +/* tipc_link_advance_transmq - advance TIPC link transmq queue by releasing + * acked packets, also doing retransmissions if + * gaps found + * @l: tipc link with transmq queue to be advanced + * @acked: seqno of last packet acked by peer without any gaps before + * @gap: # of gap packets + * @ga: buffer pointer to Gap ACK blocks from peer + * @xmitq: queue for accumulating the retransmitted packets if any + */ +static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, + struct tipc_gap_ack_blks *ga, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *_skb, *tmp; + struct tipc_msg *hdr; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; + u16 ack = l->rcv_nxt - 1; + u16 seqno; + u16 n = 0; + + skb_queue_walk_safe(&l->transmq, skb, tmp) { + seqno = buf_seqno(skb); + +next_gap_ack: + if (less_eq(seqno, acked)) { + /* release skb */ + __skb_unlink(skb, &l->transmq); + kfree_skb(skb); + } else if (less_eq(seqno, acked + gap)) { + /* retransmit skb */ + if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) + continue; + TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; + + _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); + if (!_skb) + continue; + hdr = buf_msg(_skb); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); + _skb->priority = TC_PRIO_CONTROL; + __skb_queue_tail(xmitq, _skb); + l->stats.retransmitted++; + } else { + /* retry with Gap ACK blocks if any */ + if (!ga || n >= ga->gack_cnt) + break; + acked = ntohs(ga->gacks[n].ack); + gap = ntohs(ga->gacks[n].gap); + n++; + goto next_gap_ack; + } + } +} + /* tipc_link_build_state_msg: prepare link state message for transmission * * Note that sending of broadcast ack is coordinated among nodes, to reduce @@ -1280,6 +1423,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) { u32 def_cnt = ++l->stats.deferred_recv; + u32 defq_len = skb_queue_len(&l->deferdq); int match1, match2; if (link_is_bc_rcvlink(l)) { @@ -1290,7 +1434,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, return 0; } - if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) + if (defq_len >= 3 && !((defq_len - 3) % 16)) tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq); return 0; } @@ -1304,29 +1448,29 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { struct sk_buff_head *defq = &l->deferdq; - struct tipc_msg *hdr; + struct tipc_msg *hdr = buf_msg(skb); u16 seqno, rcv_nxt, win_lim; int rc = 0; + /* Verify and update link state */ + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + return tipc_link_proto_rcv(l, skb, xmitq); + + /* Don't send probe at next timeout expiration */ + l->silent_intv_cnt = 0; + do { hdr = buf_msg(skb); seqno = msg_seqno(hdr); rcv_nxt = l->rcv_nxt; win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; - /* Verify and update link state */ - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) - return tipc_link_proto_rcv(l, skb, xmitq); - if (unlikely(!link_is_up(l))) { if (l->state == LINK_ESTABLISHING) rc = TIPC_LINK_UP_EVT; goto drop; } - /* Don't send probe at next timeout expiration */ - l->silent_intv_cnt = 0; - /* Drop if outside receive window */ if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) { l->stats.duplicates++; @@ -1351,13 +1495,16 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Deliver packet */ l->rcv_nxt++; l->stats.recv_pkts++; - if (!tipc_data_input(l, skb, l->inputq)) - rc |= tipc_link_input(l, skb, l->inputq); + + if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL)) + rc |= tipc_link_tnl_rcv(l, skb, l->inputq); + else if (!tipc_data_input(l, skb, l->inputq)) + rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf); if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) rc |= tipc_link_build_state_msg(l, xmitq); if (unlikely(rc & ~TIPC_LINK_SND_STATE)) break; - } while ((skb = __skb_dequeue(defq))); + } while ((skb = __tipc_skb_dequeue(defq, l->rcv_nxt))); return rc; drop: @@ -1378,6 +1525,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, struct tipc_mon_state *mstate = &l->mon_state; int dlen = 0; void *data; + u16 glen = 0; /* Don't send protocol message during reset or link failover */ if (tipc_link_is_blocked(l)) @@ -1390,8 +1538,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt; skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE, - tipc_max_domain_size, l->addr, - tipc_own_addr(l->net), 0, 0, 0); + tipc_max_domain_size + MAX_GAP_ACK_BLKS_SZ, + l->addr, tipc_own_addr(l->net), 0, 0, 0); if (!skb) return; @@ -1418,9 +1566,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); msg_set_probe(hdr, probe); msg_set_is_keepalive(hdr, probe || probe_reply); - tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id); - msg_set_size(hdr, INT_H_SIZE + dlen); - skb_trim(skb, INT_H_SIZE + dlen); + if (l->peer_caps & TIPC_GAP_ACK_BLOCK) + glen = tipc_build_gap_ack_blks(l, data); + tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id); + msg_set_size(hdr, INT_H_SIZE + glen + dlen); + skb_trim(skb, INT_H_SIZE + glen + dlen); l->stats.sent_states++; l->rcv_unacked = 0; } else { @@ -1479,6 +1629,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l, void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, int mtyp, struct sk_buff_head *xmitq) { + struct sk_buff_head *fdefq = &tnl->failover_deferdq; struct sk_buff *skb, *tnlskb; struct tipc_msg *hdr, tnlhdr; struct sk_buff_head *queue = &l->transmq; @@ -1506,7 +1657,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, /* Initialize reusable tunnel packet header */ tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); - pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); + if (mtyp == SYNCH_MSG) + pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq)); + else + pktcnt = skb_queue_len(&l->transmq); + pktcnt += skb_queue_len(&l->backlogq); msg_set_msgcnt(&tnlhdr, pktcnt); msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); tnl: @@ -1537,6 +1692,14 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, tnl->drop_point = l->rcv_nxt; tnl->failover_reasm_skb = l->reasm_buf; l->reasm_buf = NULL; + + /* Failover the link's deferdq */ + if (unlikely(!skb_queue_empty(fdefq))) { + pr_warn("Link failover deferdq not empty: %d!\n", + skb_queue_len(fdefq)); + __skb_queue_purge(fdefq); + } + skb_queue_splice_init(&l->deferdq, fdefq); } } @@ -1590,6 +1753,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { struct tipc_msg *hdr = buf_msg(skb); + struct tipc_gap_ack_blks *ga = NULL; u16 rcvgap = 0; u16 ack = msg_ack(hdr); u16 gap = msg_seq_gap(hdr); @@ -1600,6 +1764,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, u16 dlen = msg_data_sz(hdr); int mtyp = msg_type(hdr); bool reply = msg_probe(hdr); + u16 glen = 0; void *data; char *if_name; int rc = 0; @@ -1697,7 +1862,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, rc = TIPC_LINK_UP_EVT; break; } - tipc_mon_rcv(l->net, data, dlen, l->addr, + + /* Receive Gap ACK blocks from peer if any */ + if (l->peer_caps & TIPC_GAP_ACK_BLOCK) { + ga = (struct tipc_gap_ack_blks *)data; + glen = ntohs(ga->len); + /* sanity check: if failed, ignore Gap ACK blocks */ + if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt)) + ga = NULL; + } + + tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr, &l->mon_state, l->bearer_id); /* Send NACK if peer has sent pkts we haven't received yet */ @@ -1706,13 +1881,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (rcvgap || reply) tipc_link_build_proto_msg(l, STATE_MSG, 0, reply, rcvgap, 0, 0, xmitq); - tipc_link_release_pkts(l, ack); + + tipc_link_advance_transmq(l, ack, gap, ga, xmitq); /* If NACK, retransmit will now start at right position */ - if (gap) { - rc = tipc_link_retrans(l, l, ack + 1, ack + gap, xmitq); + if (gap) l->stats.recv_nacks++; - } tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 528ba9241acc23aeaceed68c74527e1827f5e40d..8de02ad6e352cc17b0c825530ba613e586eebc09 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -117,6 +117,37 @@ struct tipc_msg { __be32 hdr[15]; }; +/* struct tipc_gap_ack - TIPC Gap ACK block + * @ack: seqno of the last consecutive packet in link deferdq + * @gap: number of gap packets since the last ack + * + * E.g: + * link deferdq: 1 2 3 4 10 11 13 14 15 20 + * --> Gap ACK blocks: <4, 5>, <11, 1>, <15, 4>, <20, 0> + */ +struct tipc_gap_ack { + __be16 ack; + __be16 gap; +}; + +/* struct tipc_gap_ack_blks + * @len: actual length of the record + * @gack_cnt: number of Gap ACK blocks in the record + * @gacks: array of Gap ACK blocks + */ +struct tipc_gap_ack_blks { + __be16 len; + u8 gack_cnt; + u8 reserved; + struct tipc_gap_ack gacks[]; +}; + +#define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \ + sizeof(struct tipc_gap_ack) * (n)) + +#define MAX_GAP_ACK_BLKS 32 +#define MAX_GAP_ACK_BLKS_SZ tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS) + static inline struct tipc_msg *buf_msg(struct sk_buff *skb) { return (struct tipc_msg *)skb->data; @@ -1120,4 +1151,25 @@ static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list, tipc_skb_queue_splice_tail(&tmp, head); } +/* __tipc_skb_dequeue() - dequeue the head skb according to expected seqno + * @list: list to be dequeued from + * @seqno: seqno of the expected msg + * + * returns skb dequeued from the list if its seqno is less than or equal to + * the expected one, otherwise the skb is still hold + * + * Note: must be used with appropriate locks held only + */ +static inline struct sk_buff *__tipc_skb_dequeue(struct sk_buff_head *list, + u16 seqno) +{ + struct sk_buff *skb = skb_peek(list); + + if (skb && less_eq(buf_seqno(skb), seqno)) { + __skb_unlink(skb, list); + return skb; + } + return NULL; +} + #endif diff --git a/net/tipc/node.h b/net/tipc/node.h index 2404225c5d58ba84ead791f1a1343712cd845566..c0bf49ea3de46ce91ba1757c8efe912d077da92d 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -52,7 +52,8 @@ enum { TIPC_BCAST_RCAST = (1 << 4), TIPC_NODE_ID128 = (1 << 5), TIPC_LINK_PROTO_SEQNO = (1 << 6), - TIPC_MCAST_RBCTL = (1 << 7) + TIPC_MCAST_RBCTL = (1 << 7), + TIPC_GAP_ACK_BLOCK = (1 << 8) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -62,7 +63,8 @@ enum { TIPC_BLOCK_FLOWCTL | \ TIPC_NODE_ID128 | \ TIPC_LINK_PROTO_SEQNO | \ - TIPC_MCAST_RBCTL) + TIPC_MCAST_RBCTL | \ + TIPC_GAP_ACK_BLOCK) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net);