diff --git a/include/net/tcp.h b/include/net/tcp.h index d643ee4e42495f0b28c87e79d6192c23f8cdba7f..d5d22c41191826d68a3eefeeeb2380dfc0227586 100644 --- a/include/net/tcp.h +++ b/include/net/tcp.h @@ -322,6 +322,7 @@ void tcp_shutdown(struct sock *sk, int how); int tcp_v4_early_demux(struct sk_buff *skb); int tcp_v4_rcv(struct sk_buff *skb); +void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb); int tcp_v4_tw_remember_stamp(struct inet_timewait_sock *tw); int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size); int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size); @@ -329,6 +330,8 @@ int tcp_sendpage(struct sock *sk, struct page *page, int offset, size_t size, int flags); int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset, size_t size, int flags); +struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags, + struct page *page, int offset, size_t *size); ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset, size_t size, int flags); int tcp_send_mss(struct sock *sk, int *size_goal, int flags); @@ -392,6 +395,7 @@ void tcp_update_metrics(struct sock *sk); void tcp_init_metrics(struct sock *sk); void tcp_metrics_init(void); bool tcp_peer_is_proven(struct request_sock *req, struct dst_entry *dst); +void __tcp_close(struct sock *sk, long timeout); void tcp_close(struct sock *sk, long timeout); void tcp_init_sock(struct sock *sk); void tcp_init_transfer(struct sock *sk, int bpf_op, struct sk_buff *skb); diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c index b2bc3d7fe9e806718f6e20ad9fa775ffb83806fb..b285b338a019e53c2451ac86b34db821aa6a1be7 100644 --- a/net/ipv4/tcp.c +++ b/net/ipv4/tcp.c @@ -954,7 +954,7 @@ int tcp_send_mss(struct sock *sk, int *size_goal, int flags) * importantly be able to generate EPOLLOUT for Edge Trigger epoll() * users. */ -static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb) +void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb) { if (skb && !skb->len) { tcp_unlink_write_queue(skb, sk); @@ -964,6 +964,68 @@ static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb) } } +struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags, + struct page *page, int offset, size_t *size) +{ + struct sk_buff *skb = tcp_write_queue_tail(sk); + struct tcp_sock *tp = tcp_sk(sk); + bool can_coalesce; + int copy, i; + + if (!skb || (copy = size_goal - skb->len) <= 0 || + !tcp_skb_can_collapse_to(skb)) { +new_segment: + if (!sk_stream_memory_free(sk)) + return NULL; + + skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation, + tcp_rtx_and_write_queues_empty(sk)); + if (!skb) + return NULL; + +#ifdef CONFIG_TLS_DEVICE + skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED); +#endif + skb_entail(sk, skb); + copy = size_goal; + } + + if (copy > *size) + copy = *size; + + i = skb_shinfo(skb)->nr_frags; + can_coalesce = skb_can_coalesce(skb, i, page, offset); + if (!can_coalesce && i >= sysctl_max_skb_frags) { + tcp_mark_push(tp, skb); + goto new_segment; + } + if (!sk_wmem_schedule(sk, copy)) + return NULL; + + if (can_coalesce) { + skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy); + } else { + get_page(page); + skb_fill_page_desc(skb, i, page, offset, copy); + } + + if (!(flags & MSG_NO_SHARED_FRAGS)) + skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG; + + skb->len += copy; + skb->data_len += copy; + skb->truesize += copy; + sk_wmem_queued_add(sk, copy); + sk_mem_charge(sk, copy); + skb->ip_summed = CHECKSUM_PARTIAL; + WRITE_ONCE(tp->write_seq, tp->write_seq + copy); + TCP_SKB_CB(skb)->end_seq += copy; + tcp_skb_pcount_set(skb, 0); + + *size = copy; + return skb; +} + ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset, size_t size, int flags) { @@ -999,60 +1061,13 @@ ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset, goto out_err; while (size > 0) { - struct sk_buff *skb = tcp_write_queue_tail(sk); - int copy, i; - bool can_coalesce; + struct sk_buff *skb; + size_t copy = size; - if (!skb || (copy = size_goal - skb->len) <= 0 || - !tcp_skb_can_collapse_to(skb)) { -new_segment: - if (!sk_stream_memory_free(sk)) - goto wait_for_space; - - skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation, - tcp_rtx_and_write_queues_empty(sk)); - if (!skb) - goto wait_for_space; - -#ifdef CONFIG_TLS_DEVICE - skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED); -#endif - skb_entail(sk, skb); - copy = size_goal; - } - - if (copy > size) - copy = size; - - i = skb_shinfo(skb)->nr_frags; - can_coalesce = skb_can_coalesce(skb, i, page, offset); - if (!can_coalesce && i >= sysctl_max_skb_frags) { - tcp_mark_push(tp, skb); - goto new_segment; - } - if (!sk_wmem_schedule(sk, copy)) + skb = tcp_build_frag(sk, size_goal, flags, page, offset, ©); + if (!skb) goto wait_for_space; - if (can_coalesce) { - skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy); - } else { - get_page(page); - skb_fill_page_desc(skb, i, page, offset, copy); - } - - if (!(flags & MSG_NO_SHARED_FRAGS)) - skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG; - - skb->len += copy; - skb->data_len += copy; - skb->truesize += copy; - sk_wmem_queued_add(sk, copy); - sk_mem_charge(sk, copy); - skb->ip_summed = CHECKSUM_PARTIAL; - WRITE_ONCE(tp->write_seq, tp->write_seq + copy); - TCP_SKB_CB(skb)->end_seq += copy; - tcp_skb_pcount_set(skb, 0); - if (!copied) TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH; @@ -2405,13 +2420,12 @@ bool tcp_check_oom(struct sock *sk, int shift) return too_many_orphans || out_of_socket_memory; } -void tcp_close(struct sock *sk, long timeout) +void __tcp_close(struct sock *sk, long timeout) { struct sk_buff *skb; int data_was_unread = 0; int state; - lock_sock(sk); sk->sk_shutdown = SHUTDOWN_MASK; if (sk->sk_state == TCP_LISTEN) { @@ -2575,6 +2589,12 @@ void tcp_close(struct sock *sk, long timeout) out: bh_unlock_sock(sk); local_bh_enable(); +} + +void tcp_close(struct sock *sk, long timeout) +{ + lock_sock(sk); + __tcp_close(sk, timeout); release_sock(sk); sock_put(sk); } diff --git a/net/mptcp/options.c b/net/mptcp/options.c index a044dd43411d9d9e0858e52aebb0335caf9a3ca0..f2d1e27a2bc1953c48e01ead9ee6e48a5cccce6e 100644 --- a/net/mptcp/options.c +++ b/net/mptcp/options.c @@ -492,7 +492,7 @@ static bool mptcp_established_options_dss(struct sock *sk, struct sk_buff *skb, bool ret = false; mpext = skb ? mptcp_get_ext(skb) : NULL; - snd_data_fin_enable = READ_ONCE(msk->snd_data_fin_enable); + snd_data_fin_enable = mptcp_data_fin_enabled(msk); if (!skb || (mpext && mpext->use_map) || snd_data_fin_enable) { unsigned int map_size; @@ -809,11 +809,14 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit) return cur_ack; } -static void update_una(struct mptcp_sock *msk, - struct mptcp_options_received *mp_opt) +static void ack_update_msk(struct mptcp_sock *msk, + const struct sock *ssk, + struct mptcp_options_received *mp_opt) { u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una); - u64 write_seq = READ_ONCE(msk->write_seq); + u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end); + u64 snd_nxt = READ_ONCE(msk->snd_nxt); + struct sock *sk = (struct sock *)msk; /* avoid ack expansion on update conflict, to reduce the risk of * wrongly expanding to a future ack sequence number, which is way @@ -822,15 +825,28 @@ static void update_una(struct mptcp_sock *msk, new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64); /* ACK for data not even sent yet? Ignore. */ - if (after64(new_snd_una, write_seq)) + if (after64(new_snd_una, snd_nxt)) new_snd_una = old_snd_una; + new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd; + + while (after64(new_wnd_end, old_wnd_end)) { + wnd_end = old_wnd_end; + old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end, + new_wnd_end); + if (old_wnd_end == wnd_end) { + if (mptcp_send_head(sk)) + mptcp_schedule_work(sk); + break; + } + } + while (after64(new_snd_una, old_snd_una)) { snd_una = old_snd_una; old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una, new_snd_una); if (old_snd_una == snd_una) { - mptcp_data_acked((struct sock *)msk); + mptcp_data_acked(sk); break; } } @@ -930,7 +946,7 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb) * monodirectional flows will stuck */ if (mp_opt.use_ack) - update_una(msk, &mp_opt); + ack_update_msk(msk, sk, &mp_opt); /* Zero-data-length packets are dropped by the caller and not * propagated to the MPTCP layer, so the skb extension does not diff --git a/net/mptcp/pm.c b/net/mptcp/pm.c index e19e1525ecbb0086f301176fc0fdf32be72150a4..f9c88e2abb8e6d5a8f5e51b59488e26529baaba5 100644 --- a/net/mptcp/pm.c +++ b/net/mptcp/pm.c @@ -89,8 +89,7 @@ static bool mptcp_pm_schedule_work(struct mptcp_sock *msk, return false; msk->pm.status |= BIT(new_status); - if (schedule_work(&msk->work)) - sock_hold((struct sock *)msk); + mptcp_schedule_work((struct sock *)msk); return true; } diff --git a/net/mptcp/pm_netlink.c b/net/mptcp/pm_netlink.c index 446ef8f0773435ff28c8f56170c489105bbb98e0..f8a9d82a0ea82a34a15f684447ab59434be5ab72 100644 --- a/net/mptcp/pm_netlink.c +++ b/net/mptcp/pm_netlink.c @@ -416,14 +416,13 @@ void mptcp_pm_nl_rm_addr_received(struct mptcp_sock *msk) list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); int how = RCV_SHUTDOWN | SEND_SHUTDOWN; - long timeout = 0; if (msk->pm.rm_id != subflow->remote_id) continue; spin_unlock_bh(&msk->pm.lock); mptcp_subflow_shutdown(sk, ssk, how); - __mptcp_close_ssk(sk, ssk, subflow, timeout); + __mptcp_close_ssk(sk, ssk, subflow); spin_lock_bh(&msk->pm.lock); msk->pm.add_addr_accepted--; @@ -452,14 +451,13 @@ void mptcp_pm_nl_rm_subflow_received(struct mptcp_sock *msk, u8 rm_id) list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); int how = RCV_SHUTDOWN | SEND_SHUTDOWN; - long timeout = 0; if (rm_id != subflow->local_id) continue; spin_unlock_bh(&msk->pm.lock); mptcp_subflow_shutdown(sk, ssk, how); - __mptcp_close_ssk(sk, ssk, subflow, timeout); + __mptcp_close_ssk(sk, ssk, subflow); spin_lock_bh(&msk->pm.lock); msk->pm.local_addr_used--; diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index eaa61e22749296be1195a31d11d13b8a004f4eb4..8df013daea887682a33dc4d763f1c3bf3dd80dae 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -21,6 +21,7 @@ #include #endif #include +#include #include "protocol.h" #include "mib.h" @@ -41,6 +42,9 @@ struct mptcp_skb_cb { static struct percpu_counter mptcp_sockets_allocated; +static void __mptcp_destroy_sock(struct sock *sk); +static void __mptcp_check_send_data_fin(struct sock *sk); + /* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not * completed yet or has failed, return the subflow socket. * Otherwise return NULL. @@ -53,6 +57,12 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk) return msk->subflow; } +/* Returns end sequence number of the receiver's advertised window */ +static u64 mptcp_wnd_end(const struct mptcp_sock *msk) +{ + return atomic64_read(&msk->wnd_end); +} + static bool mptcp_is_tcpsk(struct sock *sk) { struct socket *sock = sk->sk_socket; @@ -102,6 +112,7 @@ static int __mptcp_socket_create(struct mptcp_sock *msk) msk->subflow = ssock; subflow = mptcp_subflow_ctx(ssock->sk); list_add(&subflow->node, &msk->conn_list); + sock_hold(ssock->sk); subflow->request_mptcp = 1; /* accept() will wait on first subflow sk_wq, and we always wakes up @@ -169,6 +180,7 @@ static void mptcp_data_queue_ofo(struct mptcp_sock *msk, struct sk_buff *skb) if (after64(seq, max_seq)) { /* out of window */ mptcp_drop(sk, skb); + pr_debug("oow by %ld", (unsigned long)seq - (unsigned long)max_seq); MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_NODSSWINDOW); return; } @@ -323,6 +335,19 @@ static void mptcp_stop_timer(struct sock *sk) mptcp_sk(sk)->timer_ival = 0; } +static void mptcp_close_wake_up(struct sock *sk) +{ + if (sock_flag(sk, SOCK_DEAD)) + return; + + sk->sk_state_change(sk); + if (sk->sk_shutdown == SHUTDOWN_MASK || + sk->sk_state == TCP_CLOSE) + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); + else + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); +} + static void mptcp_check_data_fin_ack(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); @@ -341,20 +366,14 @@ static void mptcp_check_data_fin_ack(struct sock *sk) switch (sk->sk_state) { case TCP_FIN_WAIT1: inet_sk_state_store(sk, TCP_FIN_WAIT2); - sk->sk_state_change(sk); break; case TCP_CLOSING: case TCP_LAST_ACK: inet_sk_state_store(sk, TCP_CLOSE); - sk->sk_state_change(sk); break; } - if (sk->sk_shutdown == SHUTDOWN_MASK || - sk->sk_state == TCP_CLOSE) - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); - else - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + mptcp_close_wake_up(sk); } } @@ -388,13 +407,27 @@ static void mptcp_set_timeout(const struct sock *sk, const struct sock *ssk) mptcp_sk(sk)->timer_ival = tout > 0 ? tout : TCP_RTO_MIN; } -static void mptcp_check_data_fin(struct sock *sk) +static void mptcp_send_ack(struct mptcp_sock *msk) +{ + struct mptcp_subflow_context *subflow; + + mptcp_for_each_subflow(msk, subflow) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + lock_sock(ssk); + tcp_send_ack(ssk); + release_sock(ssk); + } +} + +static bool mptcp_check_data_fin(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); u64 rcv_data_fin_seq; + bool ret = false; if (__mptcp_check_fallback(msk) || !msk->first) - return; + return ret; /* Need to ack a DATA_FIN received from a peer while this side * of the connection is in ESTABLISHED, FIN_WAIT1, or FIN_WAIT2. @@ -410,8 +443,6 @@ static void mptcp_check_data_fin(struct sock *sk) */ if (mptcp_pending_data_fin(sk, &rcv_data_fin_seq)) { - struct mptcp_subflow_context *subflow; - WRITE_ONCE(msk->ack_seq, msk->ack_seq + 1); WRITE_ONCE(msk->rcv_data_fin, 0); @@ -428,7 +459,6 @@ static void mptcp_check_data_fin(struct sock *sk) break; case TCP_FIN_WAIT2: inet_sk_state_store(sk, TCP_CLOSE); - // @@ Close subflows now? break; default: /* Other states not expected */ @@ -436,23 +466,12 @@ static void mptcp_check_data_fin(struct sock *sk) break; } + ret = true; mptcp_set_timeout(sk, NULL); - mptcp_for_each_subflow(msk, subflow) { - struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - - lock_sock(ssk); - tcp_send_ack(ssk); - release_sock(ssk); - } - - sk->sk_state_change(sk); - - if (sk->sk_shutdown == SHUTDOWN_MASK || - sk->sk_state == TCP_CLOSE) - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); - else - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + mptcp_send_ack(msk); + mptcp_close_wake_up(sk); } + return ret; } static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, @@ -620,9 +639,8 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) * this is not a good place to change state. Let the workqueue * do it. */ - if (mptcp_pending_data_fin(sk, NULL) && - schedule_work(&msk->work)) - sock_hold(sk); + if (mptcp_pending_data_fin(sk, NULL)) + mptcp_schedule_work(sk); } spin_unlock_bh(&sk->sk_lock.slock); @@ -692,6 +710,10 @@ static void mptcp_reset_timer(struct sock *sk) struct inet_connection_sock *icsk = inet_csk(sk); unsigned long tout; + /* prevent rescheduling on close */ + if (unlikely(inet_sk_state_load(sk) == TCP_CLOSE)) + return; + /* should never be called with mptcp level timer cleared */ tout = READ_ONCE(mptcp_sk(sk)->timer_ival); if (WARN_ON_ONCE(!tout)) @@ -699,23 +721,33 @@ static void mptcp_reset_timer(struct sock *sk) sk_reset_timer(sk, &icsk->icsk_retransmit_timer, jiffies + tout); } +bool mptcp_schedule_work(struct sock *sk) +{ + if (inet_sk_state_load(sk) != TCP_CLOSE && + schedule_work(&mptcp_sk(sk)->work)) { + /* each subflow already holds a reference to the sk, and the + * workqueue is invoked by a subflow, so sk can't go away here. + */ + sock_hold(sk); + return true; + } + return false; +} + void mptcp_data_acked(struct sock *sk) { mptcp_reset_timer(sk); - if ((!test_bit(MPTCP_SEND_SPACE, &mptcp_sk(sk)->flags) || - (inet_sk_state_load(sk) != TCP_ESTABLISHED)) && - schedule_work(&mptcp_sk(sk)->work)) - sock_hold(sk); + if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) || + mptcp_send_head(sk) || + (inet_sk_state_load(sk) != TCP_ESTABLISHED))) + mptcp_schedule_work(sk); } void mptcp_subflow_eof(struct sock *sk) { - struct mptcp_sock *msk = mptcp_sk(sk); - - if (!test_and_set_bit(MPTCP_WORK_EOF, &msk->flags) && - schedule_work(&msk->work)) - sock_hold(sk); + if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags)) + mptcp_schedule_work(sk); } static void mptcp_check_for_eof(struct mptcp_sock *msk) @@ -726,8 +758,10 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk) mptcp_for_each_subflow(msk, subflow) receivers += !subflow->rx_eof; + if (receivers) + return; - if (!receivers && !(sk->sk_shutdown & RCV_SHUTDOWN)) { + if (!(sk->sk_shutdown & RCV_SHUTDOWN)) { /* hopefully temporary hack: propagate shutdown status * to msk, when all subflows agree on it */ @@ -737,6 +771,19 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk) set_bit(MPTCP_DATA_READY, &msk->flags); sk->sk_data_ready(sk); } + + switch (sk->sk_state) { + case TCP_ESTABLISHED: + inet_sk_state_store(sk, TCP_CLOSE_WAIT); + break; + case TCP_FIN_WAIT1: + /* fallback sockets skip TCP_CLOSING - TCP will take care */ + inet_sk_state_store(sk, TCP_CLOSE); + break; + default: + return; + } + mptcp_close_wake_up(sk); } static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) @@ -783,6 +830,7 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk, const struct mptcp_data_frag *df) { return df && pfrag->page == df->page && + pfrag->size - pfrag->offset > 0 && df->data_seq + df->data_len == msk->write_seq; } @@ -801,20 +849,6 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag) put_page(dfrag->page); } -static bool mptcp_is_writeable(struct mptcp_sock *msk) -{ - struct mptcp_subflow_context *subflow; - - if (!sk_stream_is_writeable((struct sock *)msk)) - return false; - - mptcp_for_each_subflow(msk, subflow) { - if (sk_stream_is_writeable(subflow->tcp_sock)) - return true; - } - return false; -} - static void mptcp_clean_una(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); @@ -826,13 +860,16 @@ static void mptcp_clean_una(struct sock *sk) * plain TCP */ if (__mptcp_check_fallback(msk)) - atomic64_set(&msk->snd_una, msk->write_seq); + atomic64_set(&msk->snd_una, msk->snd_nxt); + snd_una = atomic64_read(&msk->snd_una); list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) { if (after64(dfrag->data_seq + dfrag->data_len, snd_una)) break; + if (WARN_ON_ONCE(dfrag == msk->first_pending)) + break; dfrag_clear(sk, dfrag); cleaned = true; } @@ -841,12 +878,13 @@ static void mptcp_clean_una(struct sock *sk) if (dfrag && after64(snd_una, dfrag->data_seq)) { u64 delta = snd_una - dfrag->data_seq; - if (WARN_ON_ONCE(delta > dfrag->data_len)) + if (WARN_ON_ONCE(delta > dfrag->already_sent)) goto out; dfrag->data_seq += delta; dfrag->offset += delta; dfrag->data_len -= delta; + dfrag->already_sent -= delta; dfrag_uncharge(sk, delta); cleaned = true; @@ -864,13 +902,8 @@ static void mptcp_clean_una_wakeup(struct sock *sk) mptcp_clean_una(sk); /* Only wake up writers if a subflow is ready */ - if (mptcp_is_writeable(msk)) { - set_bit(MPTCP_SEND_SPACE, &msk->flags); - smp_mb__after_atomic(); - - /* set SEND_SPACE before sk_stream_write_space clears - * NOSPACE - */ + if (sk_stream_is_writeable(sk)) { + clear_bit(MPTCP_NOSPACE, &msk->flags); sk_stream_write_space(sk); } } @@ -880,12 +913,23 @@ static void mptcp_clean_una_wakeup(struct sock *sk) */ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag) { + struct mptcp_subflow_context *subflow; + struct mptcp_sock *msk = mptcp_sk(sk); + bool first = true; + if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag), pfrag, sk->sk_allocation))) return true; - sk->sk_prot->enter_memory_pressure(sk); sk_stream_moderate_sndbuf(sk); + mptcp_for_each_subflow(msk, subflow) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + if (first) + tcp_enter_memory_pressure(ssk); + sk_stream_moderate_sndbuf(ssk); + first = false; + } return false; } @@ -901,149 +945,109 @@ mptcp_carve_data_frag(const struct mptcp_sock *msk, struct page_frag *pfrag, dfrag->data_seq = msk->write_seq; dfrag->overhead = offset - orig_offset + sizeof(struct mptcp_data_frag); dfrag->offset = offset + sizeof(struct mptcp_data_frag); + dfrag->already_sent = 0; dfrag->page = pfrag->page; return dfrag; } +struct mptcp_sendmsg_info { + int mss_now; + int size_goal; + u16 limit; + u16 sent; + unsigned int flags; +}; + +static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq, + int avail_size) +{ + u64 window_end = mptcp_wnd_end(msk); + + if (__mptcp_check_fallback(msk)) + return avail_size; + + if (!before64(data_seq + avail_size, window_end)) { + u64 allowed_size = window_end - data_seq; + + return min_t(unsigned int, allowed_size, avail_size); + } + + return avail_size; +} + static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, - struct msghdr *msg, struct mptcp_data_frag *dfrag, - long *timeo, int *pmss_now, - int *ps_goal) + struct mptcp_data_frag *dfrag, + struct mptcp_sendmsg_info *info) { - int mss_now, avail_size, size_goal, offset, ret, frag_truesize = 0; - bool dfrag_collapsed, can_collapse = false; + u64 data_seq = dfrag->data_seq + info->sent; struct mptcp_sock *msk = mptcp_sk(sk); + bool zero_window_probe = false; struct mptcp_ext *mpext = NULL; - bool retransmission = !!dfrag; struct sk_buff *skb, *tail; - struct page_frag *pfrag; - struct page *page; - u64 *write_seq; - size_t psize; - - /* use the mptcp page cache so that we can easily move the data - * from one substream to another, but do per subflow memory accounting - * Note: pfrag is used only !retransmission, but the compiler if - * fooled into a warning if we don't init here - */ - pfrag = sk_page_frag(sk); - if (!retransmission) { - write_seq = &msk->write_seq; - page = pfrag->page; - } else { - write_seq = &dfrag->data_seq; - page = dfrag->page; - } + bool can_collapse = false; + int avail_size; + size_t ret; + + pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d", + msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent); - /* compute copy limit */ - mss_now = tcp_send_mss(ssk, &size_goal, msg->msg_flags); - *pmss_now = mss_now; - *ps_goal = size_goal; - avail_size = size_goal; + /* compute send limit */ + info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags); + avail_size = info->size_goal; skb = tcp_write_queue_tail(ssk); if (skb) { - mpext = skb_ext_find(skb, SKB_EXT_MPTCP); - /* Limit the write to the size available in the * current skb, if any, so that we create at most a new skb. * Explicitly tells TCP internals to avoid collapsing on later * queue management operation, to avoid breaking the ext <-> * SSN association set here */ - can_collapse = (size_goal - skb->len > 0) && - mptcp_skb_can_collapse_to(*write_seq, skb, mpext); + mpext = skb_ext_find(skb, SKB_EXT_MPTCP); + can_collapse = (info->size_goal - skb->len > 0) && + mptcp_skb_can_collapse_to(data_seq, skb, mpext); if (!can_collapse) TCP_SKB_CB(skb)->eor = 1; else - avail_size = size_goal - skb->len; + avail_size = info->size_goal - skb->len; } - if (!retransmission) { - /* reuse tail pfrag, if possible, or carve a new one from the - * page allocator - */ - dfrag = mptcp_rtx_tail(sk); - offset = pfrag->offset; - dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag); - if (!dfrag_collapsed) { - dfrag = mptcp_carve_data_frag(msk, pfrag, offset); - offset = dfrag->offset; - frag_truesize = dfrag->overhead; - } - psize = min_t(size_t, pfrag->size - offset, avail_size); - - /* Copy to page */ - pr_debug("left=%zu", msg_data_left(msg)); - psize = copy_page_from_iter(pfrag->page, offset, - min_t(size_t, msg_data_left(msg), - psize), - &msg->msg_iter); - pr_debug("left=%zu", msg_data_left(msg)); - if (!psize) - return -EINVAL; - - if (!sk_wmem_schedule(sk, psize + dfrag->overhead)) { - iov_iter_revert(&msg->msg_iter, psize); - return -ENOMEM; - } - } else { - offset = dfrag->offset; - psize = min_t(size_t, dfrag->data_len, avail_size); + /* Zero window and all data acked? Probe. */ + avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size); + if (avail_size == 0) { + if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt) + return 0; + zero_window_probe = true; + data_seq = atomic64_read(&msk->snd_una) - 1; + avail_size = 1; } - /* tell the TCP stack to delay the push so that we can safely - * access the skb after the sendpages call - */ - ret = do_tcp_sendpages(ssk, page, offset, psize, - msg->msg_flags | MSG_SENDPAGE_NOTLAST | MSG_DONTWAIT); - if (ret <= 0) { - if (!retransmission) - iov_iter_revert(&msg->msg_iter, psize); - return ret; - } + if (WARN_ON_ONCE(info->sent > info->limit || + info->limit > dfrag->data_len)) + return 0; - frag_truesize += ret; - if (!retransmission) { - if (unlikely(ret < psize)) - iov_iter_revert(&msg->msg_iter, psize - ret); - - /* send successful, keep track of sent data for mptcp-level - * retransmission - */ - dfrag->data_len += ret; - if (!dfrag_collapsed) { - get_page(dfrag->page); - list_add_tail(&dfrag->list, &msk->rtx_queue); - sk_wmem_queued_add(sk, frag_truesize); - } else { - sk_wmem_queued_add(sk, ret); - } - - /* charge data on mptcp rtx queue to the master socket - * Note: we charge such data both to sk and ssk - */ - sk->sk_forward_alloc -= frag_truesize; + ret = info->limit - info->sent; + tail = tcp_build_frag(ssk, avail_size, info->flags, dfrag->page, + dfrag->offset + info->sent, &ret); + if (!tail) { + tcp_remove_empty_skb(sk, tcp_write_queue_tail(ssk)); + return -ENOMEM; } - /* if the tail skb extension is still the cached one, collapsing - * really happened. Note: we can't check for 'same skb' as the sk_buff - * hdr on tail can be transmitted, freed and re-allocated by the - * do_tcp_sendpages() call + /* if the tail skb is still the cached one, collapsing really happened. */ - tail = tcp_write_queue_tail(ssk); - if (mpext && tail && mpext == skb_ext_find(tail, SKB_EXT_MPTCP)) { + if (skb == tail) { WARN_ON_ONCE(!can_collapse); mpext->data_len += ret; + WARN_ON_ONCE(zero_window_probe); goto out; } - skb = tcp_write_queue_tail(ssk); - mpext = __skb_ext_set(skb, SKB_EXT_MPTCP, msk->cached_ext); + mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext); msk->cached_ext = NULL; memset(mpext, 0, sizeof(*mpext)); - mpext->data_seq = *write_seq; + mpext->data_seq = data_seq; mpext->subflow_seq = mptcp_subflow_ctx(ssk)->rel_write_seq; mpext->data_len = ret; mpext->use_map = 1; @@ -1053,12 +1057,14 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, mpext->data_seq, mpext->subflow_seq, mpext->data_len, mpext->dsn64); + if (zero_window_probe) { + mptcp_subflow_ctx(ssk)->rel_write_seq += ret; + mpext->frozen = 1; + ret = 0; + tcp_push_pending_frames(ssk); + } out: - if (!retransmission) - pfrag->offset += frag_truesize; - WRITE_ONCE(*write_seq, *write_seq + ret); mptcp_subflow_ctx(ssk)->rel_write_seq += ret; - return ret; } @@ -1066,17 +1072,25 @@ static void mptcp_nospace(struct mptcp_sock *msk) { struct mptcp_subflow_context *subflow; - clear_bit(MPTCP_SEND_SPACE, &msk->flags); + set_bit(MPTCP_NOSPACE, &msk->flags); smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ mptcp_for_each_subflow(msk, subflow) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + bool ssk_writeable = sk_stream_is_writeable(ssk); struct socket *sock = READ_ONCE(ssk->sk_socket); + if (ssk_writeable || !sock) + continue; + /* enables ssk->write_space() callbacks */ - if (sock) - set_bit(SOCK_NOSPACE, &sock->flags); + set_bit(SOCK_NOSPACE, &sock->flags); } + + /* mptcp_data_acked() could run just before we set the NOSPACE bit, + * so explicitly check for snd_una value + */ + mptcp_clean_una((struct sock *)msk); } static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow) @@ -1180,21 +1194,86 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk, return NULL; } -static void ssk_check_wmem(struct mptcp_sock *msk) +static void mptcp_push_release(struct sock *sk, struct sock *ssk, + struct mptcp_sendmsg_info *info) { - if (unlikely(!mptcp_is_writeable(msk))) - mptcp_nospace(msk); + mptcp_set_timeout(sk, ssk); + tcp_push(ssk, 0, info->mss_now, tcp_sk(ssk)->nonagle, info->size_goal); + release_sock(ssk); +} + +static void mptcp_push_pending(struct sock *sk, unsigned int flags) +{ + struct sock *prev_ssk = NULL, *ssk = NULL; + struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_sendmsg_info info = { + .flags = flags, + }; + struct mptcp_data_frag *dfrag; + int len, copied = 0; + u32 sndbuf; + + while ((dfrag = mptcp_send_head(sk))) { + info.sent = dfrag->already_sent; + info.limit = dfrag->data_len; + len = dfrag->data_len - dfrag->already_sent; + while (len > 0) { + int ret = 0; + + prev_ssk = ssk; + __mptcp_flush_join_list(msk); + ssk = mptcp_subflow_get_send(msk, &sndbuf); + + /* do auto tuning */ + if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && + sndbuf > READ_ONCE(sk->sk_sndbuf)) + WRITE_ONCE(sk->sk_sndbuf, sndbuf); + + /* try to keep the subflow socket lock across + * consecutive xmit on the same socket + */ + if (ssk != prev_ssk && prev_ssk) + mptcp_push_release(sk, prev_ssk, &info); + if (!ssk) + goto out; + + if (ssk != prev_ssk || !prev_ssk) + lock_sock(ssk); + + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) { + mptcp_push_release(sk, ssk, &info); + goto out; + } + + info.sent += ret; + dfrag->already_sent += ret; + msk->snd_nxt += ret; + msk->snd_burst -= ret; + copied += ret; + len -= ret; + } + WRITE_ONCE(msk->first_pending, mptcp_send_next(sk)); + } + + /* at this point we held the socket lock for the last subflow we used */ + if (ssk) + mptcp_push_release(sk, ssk, &info); + +out: + /* start the timer, if it's not pending */ + if (!mptcp_timer_pending(sk)) + mptcp_reset_timer(sk); + if (copied) + __mptcp_check_send_data_fin(sk); } static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) { - int mss_now = 0, size_goal = 0, ret = 0; struct mptcp_sock *msk = mptcp_sk(sk); struct page_frag *pfrag; size_t copied = 0; - struct sock *ssk; - u32 sndbuf; - bool tx_ok; + int ret = 0; long timeo; if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL)) @@ -1211,130 +1290,92 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) } pfrag = sk_page_frag(sk); -restart: mptcp_clean_una(sk); - if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { - ret = -EPIPE; - goto out; - } - - __mptcp_flush_join_list(msk); - ssk = mptcp_subflow_get_send(msk, &sndbuf); - while (!sk_stream_memory_free(sk) || - !ssk || - !mptcp_page_frag_refill(ssk, pfrag)) { - if (ssk) { - /* make sure retransmit timer is - * running before we wait for memory. - * - * The retransmit timer might be needed - * to make the peer send an up-to-date - * MPTCP Ack. - */ - mptcp_set_timeout(sk, ssk); - if (!mptcp_timer_pending(sk)) - mptcp_reset_timer(sk); - } - - mptcp_nospace(msk); - ret = sk_stream_wait_memory(sk, &timeo); - if (ret) - goto out; + while (msg_data_left(msg)) { + struct mptcp_data_frag *dfrag; + int frag_truesize = 0; + bool dfrag_collapsed; + size_t psize, offset; - mptcp_clean_una(sk); - - ssk = mptcp_subflow_get_send(msk, &sndbuf); - if (list_empty(&msk->conn_list)) { - ret = -ENOTCONN; + if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { + ret = -EPIPE; goto out; } - } - - /* do auto tuning */ - if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && - sndbuf > READ_ONCE(sk->sk_sndbuf)) - WRITE_ONCE(sk->sk_sndbuf, sndbuf); - pr_debug("conn_list->subflow=%p", ssk); - - lock_sock(ssk); - tx_ok = msg_data_left(msg); - while (tx_ok) { - ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now, - &size_goal); - if (ret < 0) { - if (ret == -EAGAIN && timeo > 0) { - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; + /* reuse tail pfrag, if possible, or carve a new one from the + * page allocator + */ + dfrag = mptcp_pending_tail(sk); + dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag); + if (!dfrag_collapsed) { + if (!sk_stream_memory_free(sk)) { + mptcp_push_pending(sk, msg->msg_flags); + if (!sk_stream_memory_free(sk)) + goto wait_for_memory; } - break; + if (!mptcp_page_frag_refill(sk, pfrag)) + goto wait_for_memory; + + dfrag = mptcp_carve_data_frag(msk, pfrag, pfrag->offset); + frag_truesize = dfrag->overhead; } - /* burst can be negative, we will try move to the next subflow - * at selection time, if possible. + /* we do not bound vs wspace, to allow a single packet. + * memory accounting will prevent execessive memory usage + * anyway */ - msk->snd_burst -= ret; - copied += ret; - - tx_ok = msg_data_left(msg); - if (!tx_ok) - break; - - if (!sk_stream_memory_free(ssk) || - !mptcp_page_frag_refill(ssk, pfrag) || - !mptcp_ext_cache_refill(msk)) { - tcp_push(ssk, msg->msg_flags, mss_now, - tcp_sk(ssk)->nonagle, size_goal); - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; + offset = dfrag->offset + dfrag->data_len; + psize = pfrag->size - offset; + psize = min_t(size_t, psize, msg_data_left(msg)); + if (!sk_wmem_schedule(sk, psize + frag_truesize)) + goto wait_for_memory; + + if (copy_page_from_iter(dfrag->page, offset, psize, + &msg->msg_iter) != psize) { + ret = -EFAULT; + goto out; } - /* memory is charged to mptcp level socket as well, i.e. - * if msg is very large, mptcp socket may run out of buffer - * space. mptcp_clean_una() will release data that has - * been acked at mptcp level in the mean time, so there is - * a good chance we can continue sending data right away. - * - * Normally, when the tcp subflow can accept more data, then - * so can the MPTCP socket. However, we need to cope with - * peers that might lag behind in their MPTCP-level - * acknowledgements, i.e. data might have been acked at - * tcp level only. So, we must also check the MPTCP socket - * limits before we send more data. + /* data successfully copied into the write queue */ + copied += psize; + dfrag->data_len += psize; + frag_truesize += psize; + pfrag->offset += frag_truesize; + WRITE_ONCE(msk->write_seq, msk->write_seq + psize); + + /* charge data on mptcp pending queue to the msk socket + * Note: we charge such data both to sk and ssk */ - if (unlikely(!sk_stream_memory_free(sk))) { - tcp_push(ssk, msg->msg_flags, mss_now, - tcp_sk(ssk)->nonagle, size_goal); - mptcp_clean_una(sk); - if (!sk_stream_memory_free(sk)) { - /* can't send more for now, need to wait for - * MPTCP-level ACKs from peer. - * - * Wakeup will happen via mptcp_clean_una(). - */ - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; - } + sk_wmem_queued_add(sk, frag_truesize); + sk->sk_forward_alloc -= frag_truesize; + if (!dfrag_collapsed) { + get_page(dfrag->page); + list_add_tail(&dfrag->list, &msk->rtx_queue); + if (!msk->first_pending) + WRITE_ONCE(msk->first_pending, dfrag); } - } + pr_debug("msk=%p dfrag at seq=%lld len=%d sent=%d new=%d", msk, + dfrag->data_seq, dfrag->data_len, dfrag->already_sent, + !dfrag_collapsed); - mptcp_set_timeout(sk, ssk); - if (copied) { - tcp_push(ssk, msg->msg_flags, mss_now, tcp_sk(ssk)->nonagle, - size_goal); + if (!mptcp_ext_cache_refill(msk)) + goto wait_for_memory; + continue; - /* start the timer, if it's not pending */ - if (!mptcp_timer_pending(sk)) +wait_for_memory: + mptcp_nospace(msk); + if (mptcp_timer_pending(sk)) mptcp_reset_timer(sk); + ret = sk_stream_wait_memory(sk, &timeo); + if (ret) + goto out; } - release_sock(ssk); + if (copied) + mptcp_push_pending(sk, msg->msg_flags); + out: - ssk_check_wmem(msk); release_sock(sk); return copied ? : ret; } @@ -1513,7 +1554,8 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk) } while (!done); if (mptcp_ofo_queue(msk) || moved > 0) { - mptcp_check_data_fin((struct sock *)msk); + if (!mptcp_check_data_fin((struct sock *)msk)) + mptcp_send_ack(msk); return true; } return false; @@ -1625,12 +1667,11 @@ static void mptcp_retransmit_handler(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->write_seq)) { + if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) { mptcp_stop_timer(sk); } else { set_bit(MPTCP_WORK_RTX, &msk->flags); - if (schedule_work(&msk->work)) - sock_hold(sk); + mptcp_schedule_work(sk); } } @@ -1653,6 +1694,13 @@ static void mptcp_retransmit_timer(struct timer_list *t) sock_put(sk); } +static void mptcp_timeout_timer(struct timer_list *t) +{ + struct sock *sk = from_timer(sk, t, sk_timer); + + mptcp_schedule_work(sk); +} + /* Find an idle subflow. Return NULL if there is unacked data at tcp * level. * @@ -1666,7 +1714,7 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) sock_owned_by_me((const struct sock *)msk); if (__mptcp_check_fallback(msk)) - return msk->first; + return NULL; mptcp_for_each_subflow(msk, subflow) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); @@ -1699,20 +1747,43 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) * parent socket. */ void __mptcp_close_ssk(struct sock *sk, struct sock *ssk, - struct mptcp_subflow_context *subflow, - long timeout) + struct mptcp_subflow_context *subflow) { - struct socket *sock = READ_ONCE(ssk->sk_socket); + bool dispose_socket = false; + struct socket *sock; list_del(&subflow->node); - if (sock && sock != sk->sk_socket) { - /* outgoing subflow */ - sock_release(sock); + lock_sock(ssk); + + /* if we are invoked by the msk cleanup code, the subflow is + * already orphaned + */ + sock = ssk->sk_socket; + if (sock) { + dispose_socket = sock != sk->sk_socket; + sock_orphan(ssk); + } + + /* if ssk hit tcp_done(), tcp_cleanup_ulp() cleared the related ops + * the ssk has been already destroyed, we just need to release the + * reference owned by msk; + */ + if (!inet_csk(ssk)->icsk_ulp_ops) { + kfree_rcu(subflow, rcu); } else { - /* incoming subflow */ - tcp_close(ssk, timeout); + /* otherwise ask tcp do dispose of ssk and subflow ctx */ + subflow->disposable = 1; + __tcp_close(ssk, 0); + + /* close acquired an extra ref */ + __sock_put(ssk); } + release_sock(ssk); + if (dispose_socket) + iput(SOCK_INODE(sock)); + + sock_put(ssk); } static unsigned int mptcp_sync_mss(struct sock *sk, u32 pmtu) @@ -1757,24 +1828,44 @@ static void __mptcp_close_subflow(struct mptcp_sock *msk) if (inet_sk_state_load(ssk) != TCP_CLOSE) continue; - __mptcp_close_ssk((struct sock *)msk, ssk, subflow, 0); + __mptcp_close_ssk((struct sock *)msk, ssk, subflow); } } +static bool mptcp_check_close_timeout(const struct sock *sk) +{ + s32 delta = tcp_jiffies32 - inet_csk(sk)->icsk_mtup.probe_timestamp; + struct mptcp_subflow_context *subflow; + + if (delta >= TCP_TIMEWAIT_LEN) + return true; + + /* if all subflows are in closed status don't bother with additional + * timeout + */ + mptcp_for_each_subflow(mptcp_sk(sk), subflow) { + if (inet_sk_state_load(mptcp_subflow_tcp_sock(subflow)) != + TCP_CLOSE) + return false; + } + return true; +} + static void mptcp_worker(struct work_struct *work) { struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work); struct sock *ssk, *sk = &msk->sk.icsk_inet.sk; - int orig_len, orig_offset, mss_now = 0, size_goal = 0; + struct mptcp_sendmsg_info info = {}; struct mptcp_data_frag *dfrag; - u64 orig_write_seq; size_t copied = 0; - struct msghdr msg = { - .msg_flags = MSG_DONTWAIT, - }; - long timeo = 0; + int state, ret; lock_sock(sk); + set_bit(MPTCP_WORKER_RUNNING, &msk->flags); + state = sk->sk_state; + if (unlikely(state == TCP_CLOSE)) + goto unlock; + mptcp_clean_una_wakeup(sk); mptcp_check_data_fin_ack(sk); __mptcp_flush_join_list(msk); @@ -1782,6 +1873,8 @@ static void mptcp_worker(struct work_struct *work) __mptcp_close_subflow(msk); __mptcp_move_skbs(msk); + if (mptcp_send_head(sk)) + mptcp_push_pending(sk, 0); if (msk->pm.status) pm_work(msk); @@ -1791,6 +1884,18 @@ static void mptcp_worker(struct work_struct *work) mptcp_check_data_fin(sk); + /* if the msk data is completely acked, or the socket timedout, + * there is no point in keeping around an orphaned sk + */ + if (sock_flag(sk, SOCK_DEAD) && + (mptcp_check_close_timeout(sk) || + (state != sk->sk_state && + ((1 << inet_sk_state_load(sk)) & (TCPF_CLOSE | TCPF_FIN_WAIT2))))) { + inet_sk_state_store(sk, TCP_CLOSE); + __mptcp_destroy_sock(sk); + goto unlock; + } + if (!test_and_clear_bit(MPTCP_WORK_RTX, &msk->flags)) goto unlock; @@ -1807,30 +1912,24 @@ static void mptcp_worker(struct work_struct *work) lock_sock(ssk); - orig_len = dfrag->data_len; - orig_offset = dfrag->offset; - orig_write_seq = dfrag->data_seq; - while (dfrag->data_len > 0) { - int ret = mptcp_sendmsg_frag(sk, ssk, &msg, dfrag, &timeo, - &mss_now, &size_goal); - if (ret < 0) + /* limit retransmission to the bytes already sent on some subflows */ + info.sent = 0; + info.limit = dfrag->already_sent; + while (info.sent < dfrag->already_sent) { + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) break; MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS); copied += ret; - dfrag->data_len -= ret; - dfrag->offset += ret; + info.sent += ret; if (!mptcp_ext_cache_refill(msk)) break; } if (copied) - tcp_push(ssk, msg.msg_flags, mss_now, tcp_sk(ssk)->nonagle, - size_goal); - - dfrag->data_seq = orig_write_seq; - dfrag->offset = orig_offset; - dfrag->data_len = orig_len; + tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle, + info.size_goal); mptcp_set_timeout(sk, ssk); release_sock(ssk); @@ -1840,6 +1939,7 @@ static void mptcp_worker(struct work_struct *work) mptcp_reset_timer(sk); unlock: + clear_bit(MPTCP_WORKER_RUNNING, &msk->flags); release_sock(sk); sock_put(sk); } @@ -1853,9 +1953,9 @@ static int __mptcp_init_sock(struct sock *sk) INIT_LIST_HEAD(&msk->conn_list); INIT_LIST_HEAD(&msk->join_list); INIT_LIST_HEAD(&msk->rtx_queue); - __set_bit(MPTCP_SEND_SPACE, &msk->flags); INIT_WORK(&msk->work, mptcp_worker); msk->out_of_order_queue = RB_ROOT; + msk->first_pending = NULL; msk->first = NULL; inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss; @@ -1864,7 +1964,7 @@ static int __mptcp_init_sock(struct sock *sk) /* re-use the csk retrans timer for MPTCP-level retrans */ timer_setup(&msk->sk.icsk_retransmit_timer, mptcp_retransmit_timer, 0); - + timer_setup(&sk->sk_timer, mptcp_timeout_timer, 0); return 0; } @@ -1901,6 +2001,7 @@ static void __mptcp_clear_xmit(struct sock *sk) sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer); + WRITE_ONCE(msk->first_pending, NULL); list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) dfrag_clear(sk, dfrag); } @@ -1909,8 +2010,12 @@ static void mptcp_cancel_work(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (cancel_work_sync(&msk->work)) - sock_put(sk); + /* if called by the work itself, do not try to cancel the work, or + * we will hang. + */ + if (!test_bit(MPTCP_WORKER_RUNNING, &msk->flags) && + cancel_work_sync(&msk->work)) + __sock_put(sk); } void mptcp_subflow_shutdown(struct sock *sk, struct sock *ssk, int how) @@ -1968,42 +2073,61 @@ static int mptcp_close_state(struct sock *sk) return next & TCP_ACTION_FIN; } -static void mptcp_close(struct sock *sk, long timeout) +static void __mptcp_check_send_data_fin(struct sock *sk) { - struct mptcp_subflow_context *subflow, *tmp; + struct mptcp_subflow_context *subflow; struct mptcp_sock *msk = mptcp_sk(sk); - LIST_HEAD(conn_list); - lock_sock(sk); - sk->sk_shutdown = SHUTDOWN_MASK; + pr_debug("msk=%p snd_data_fin_enable=%d pending=%d snd_nxt=%llu write_seq=%llu", + msk, msk->snd_data_fin_enable, !!mptcp_send_head(sk), + msk->snd_nxt, msk->write_seq); - if (sk->sk_state == TCP_LISTEN) { + /* we still need to enqueue subflows or not really shutting down, + * skip this + */ + if (!msk->snd_data_fin_enable || msk->snd_nxt + 1 != msk->write_seq || + mptcp_send_head(sk)) + return; + + WRITE_ONCE(msk->snd_nxt, msk->write_seq); + + /* fallback socket will not get data_fin/ack, can move to close now */ + if (__mptcp_check_fallback(msk) && sk->sk_state == TCP_LAST_ACK) { inet_sk_state_store(sk, TCP_CLOSE); - goto cleanup; - } else if (sk->sk_state == TCP_CLOSE) { - goto cleanup; + mptcp_close_wake_up(sk); } - if (__mptcp_check_fallback(msk)) { - goto update_state; - } else if (mptcp_close_state(sk)) { - pr_debug("Sending DATA_FIN sk=%p", sk); - WRITE_ONCE(msk->write_seq, msk->write_seq + 1); - WRITE_ONCE(msk->snd_data_fin_enable, 1); - - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); + __mptcp_flush_join_list(msk); + mptcp_for_each_subflow(msk, subflow) { + struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - mptcp_subflow_shutdown(sk, tcp_sk, SHUTDOWN_MASK); - } + mptcp_subflow_shutdown(sk, tcp_sk, SEND_SHUTDOWN); } +} - sk_stream_wait_close(sk, timeout); +static void __mptcp_wr_shutdown(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); -update_state: - inet_sk_state_store(sk, TCP_CLOSE); + pr_debug("msk=%p snd_data_fin_enable=%d shutdown=%x state=%d pending=%d", + msk, msk->snd_data_fin_enable, sk->sk_shutdown, sk->sk_state, + !!mptcp_send_head(sk)); + + /* will be ignored by fallback sockets */ + WRITE_ONCE(msk->write_seq, msk->write_seq + 1); + WRITE_ONCE(msk->snd_data_fin_enable, 1); + + __mptcp_check_send_data_fin(sk); +} + +static void __mptcp_destroy_sock(struct sock *sk) +{ + struct mptcp_subflow_context *subflow, *tmp; + struct mptcp_sock *msk = mptcp_sk(sk); + LIST_HEAD(conn_list); + + pr_debug("msk=%p", msk); -cleanup: /* be sure to always acquire the join list lock, to sync vs * mptcp_finish_join(). */ @@ -2013,19 +2137,74 @@ static void mptcp_close(struct sock *sk, long timeout) list_splice_init(&msk->conn_list, &conn_list); __mptcp_clear_xmit(sk); - - release_sock(sk); + sk_stop_timer(sk, &sk->sk_timer); + msk->pm.status = 0; list_for_each_entry_safe(subflow, tmp, &conn_list, node) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - __mptcp_close_ssk(sk, ssk, subflow, timeout); + __mptcp_close_ssk(sk, ssk, subflow); } - mptcp_cancel_work(sk); + sk->sk_prot->destroy(sk); - __skb_queue_purge(&sk->sk_receive_queue); + sk_stream_kill_queues(sk); + xfrm_sk_free_policy(sk); + sk_refcnt_debug_release(sk); + sock_put(sk); +} - sk_common_release(sk); +static void mptcp_close(struct sock *sk, long timeout) +{ + struct mptcp_subflow_context *subflow; + bool do_cancel_work = false; + + lock_sock(sk); + sk->sk_shutdown = SHUTDOWN_MASK; + + if ((1 << sk->sk_state) & (TCPF_LISTEN | TCPF_CLOSE)) { + inet_sk_state_store(sk, TCP_CLOSE); + goto cleanup; + } + + if (mptcp_close_state(sk)) + __mptcp_wr_shutdown(sk); + + sk_stream_wait_close(sk, timeout); + +cleanup: + /* orphan all the subflows */ + inet_csk(sk)->icsk_mtup.probe_timestamp = tcp_jiffies32; + list_for_each_entry(subflow, &mptcp_sk(sk)->conn_list, node) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + bool slow, dispose_socket; + struct socket *sock; + + slow = lock_sock_fast(ssk); + sock = ssk->sk_socket; + dispose_socket = sock && sock != sk->sk_socket; + sock_orphan(ssk); + unlock_sock_fast(ssk, slow); + + /* for the outgoing subflows we additionally need to free + * the associated socket + */ + if (dispose_socket) + iput(SOCK_INODE(sock)); + } + sock_orphan(sk); + + sock_hold(sk); + pr_debug("msk=%p state=%d", sk, sk->sk_state); + if (sk->sk_state == TCP_CLOSE) { + __mptcp_destroy_sock(sk); + do_cancel_work = true; + } else { + sk_reset_timer(sk, &sk->sk_timer, jiffies + TCP_TIMEWAIT_LEN); + } + release_sock(sk); + if (do_cancel_work) + mptcp_cancel_work(sk); + sock_put(sk); } static void mptcp_copy_inaddrs(struct sock *msk, const struct sock *ssk) @@ -2096,7 +2275,10 @@ struct sock *mptcp_sk_clone(const struct sock *sk, WRITE_ONCE(msk->fully_established, false); msk->write_seq = subflow_req->idsn + 1; + msk->snd_nxt = msk->write_seq; atomic64_set(&msk->snd_una, msk->write_seq); + atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd); + if (mp_opt->mp_capable) { msk->can_ack = true; msk->remote_key = mp_opt->sndr_key; @@ -2129,6 +2311,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk) TCP_INIT_CWND * tp->advmss); if (msk->rcvq_space.space == 0) msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT; + + atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd); } static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, @@ -2177,6 +2361,7 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, newsk = new_mptcp_sock; mptcp_copy_inaddrs(newsk, ssk); list_add(&subflow->node, &msk->conn_list); + sock_hold(ssk); mptcp_rcv_space_init(msk, ssk); bh_unlock_sock(new_mptcp_sock); @@ -2343,7 +2528,8 @@ static void mptcp_release_cb(struct sock *sk) struct sock *ssk; ssk = mptcp_subflow_recv_lookup(msk); - if (!ssk || !schedule_work(&msk->work)) + if (!ssk || sk->sk_state == TCP_CLOSE || + !schedule_work(&msk->work)) __sock_put(sk); } @@ -2404,6 +2590,7 @@ void mptcp_finish_connect(struct sock *ssk) WRITE_ONCE(msk->remote_key, subflow->remote_key); WRITE_ONCE(msk->local_key, subflow->local_key); WRITE_ONCE(msk->write_seq, subflow->idsn + 1); + WRITE_ONCE(msk->snd_nxt, msk->write_seq); WRITE_ONCE(msk->ack_seq, ack_seq); WRITE_ONCE(msk->can_ack, 1); atomic64_set(&msk->snd_una, msk->write_seq); @@ -2422,9 +2609,9 @@ static void mptcp_sock_graft(struct sock *sk, struct socket *parent) write_unlock_bh(&sk->sk_callback_lock); } -bool mptcp_finish_join(struct sock *sk) +bool mptcp_finish_join(struct sock *ssk) { - struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); + struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk); struct mptcp_sock *msk = mptcp_sk(subflow->conn); struct sock *parent = (void *)msk; struct socket *parent_sock; @@ -2445,12 +2632,14 @@ bool mptcp_finish_join(struct sock *sk) /* active connections are already on conn_list, and we can't acquire * msk lock here. * use the join list lock as synchronization point and double-check - * msk status to avoid racing with mptcp_close() + * msk status to avoid racing with __mptcp_destroy_sock() */ spin_lock_bh(&msk->join_list_lock); ret = inet_sk_state_load(parent) == TCP_ESTABLISHED; - if (ret && !WARN_ON_ONCE(!list_empty(&subflow->node))) + if (ret && !WARN_ON_ONCE(!list_empty(&subflow->node))) { list_add_tail(&subflow->node, &msk->join_list); + sock_hold(ssk); + } spin_unlock_bh(&msk->join_list_lock); if (!ret) return false; @@ -2459,19 +2648,12 @@ bool mptcp_finish_join(struct sock *sk) * at close time */ parent_sock = READ_ONCE(parent->sk_socket); - if (parent_sock && !sk->sk_socket) - mptcp_sock_graft(sk, parent_sock); + if (parent_sock && !ssk->sk_socket) + mptcp_sock_graft(ssk, parent_sock); subflow->map_seq = READ_ONCE(msk->ack_seq); return true; } -static bool mptcp_memory_free(const struct sock *sk, int wake) -{ - struct mptcp_sock *msk = mptcp_sk(sk); - - return wake ? test_bit(MPTCP_SEND_SPACE, &msk->flags) : true; -} - static struct proto mptcp_prot = { .name = "MPTCP", .owner = THIS_MODULE, @@ -2492,7 +2674,6 @@ static struct proto mptcp_prot = { .sockets_allocated = &mptcp_sockets_allocated, .memory_allocated = &tcp_memory_allocated, .memory_pressure = &tcp_memory_pressure, - .stream_memory_free = mptcp_memory_free, .sysctl_wmem_offset = offsetof(struct net, ipv4.sysctl_tcp_wmem), .sysctl_rmem_offset = offsetof(struct net, ipv4.sysctl_tcp_rmem), .sysctl_mem = sysctl_tcp_mem, @@ -2666,6 +2847,39 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk) 0; } +static bool __mptcp_check_writeable(struct mptcp_sock *msk) +{ + struct sock *sk = (struct sock *)msk; + bool mptcp_writable; + + mptcp_clean_una(sk); + mptcp_writable = sk_stream_is_writeable(sk); + if (!mptcp_writable) + mptcp_nospace(msk); + + return mptcp_writable; +} + +static __poll_t mptcp_check_writeable(struct mptcp_sock *msk) +{ + struct sock *sk = (struct sock *)msk; + __poll_t ret = 0; + bool slow; + + if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN)) + return 0; + + if (sk_stream_is_writeable(sk)) + return EPOLLOUT | EPOLLWRNORM; + + slow = lock_sock_fast(sk); + if (__mptcp_check_writeable(msk)) + ret = EPOLLOUT | EPOLLWRNORM; + + unlock_sock_fast(sk, slow); + return ret; +} + static __poll_t mptcp_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait) { @@ -2684,8 +2898,7 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, if (state != TCP_SYN_SENT && state != TCP_SYN_RECV) { mask |= mptcp_check_readable(msk); - if (test_bit(MPTCP_SEND_SPACE, &msk->flags)) - mask |= EPOLLOUT | EPOLLWRNORM; + mask |= mptcp_check_writeable(msk); } if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; @@ -2696,12 +2909,12 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, static int mptcp_shutdown(struct socket *sock, int how) { struct mptcp_sock *msk = mptcp_sk(sock->sk); - struct mptcp_subflow_context *subflow; + struct sock *sk = sock->sk; int ret = 0; pr_debug("sk=%p, how=%d", msk, how); - lock_sock(sock->sk); + lock_sock(sk); how++; if ((how & ~SHUTDOWN_MASK) || !how) { @@ -2710,45 +2923,22 @@ static int mptcp_shutdown(struct socket *sock, int how) } if (sock->state == SS_CONNECTING) { - if ((1 << sock->sk->sk_state) & + if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV | TCPF_CLOSE)) sock->state = SS_DISCONNECTING; else sock->state = SS_CONNECTED; } - /* If we've already sent a FIN, or it's a closed state, skip this. */ - if (__mptcp_check_fallback(msk)) { - if (how == SHUT_WR || how == SHUT_RDWR) - inet_sk_state_store(sock->sk, TCP_FIN_WAIT1); - - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - - mptcp_subflow_shutdown(sock->sk, tcp_sk, how); - } - } else if ((how & SEND_SHUTDOWN) && - ((1 << sock->sk->sk_state) & - (TCPF_ESTABLISHED | TCPF_SYN_SENT | - TCPF_SYN_RECV | TCPF_CLOSE_WAIT)) && - mptcp_close_state(sock->sk)) { - __mptcp_flush_join_list(msk); - - WRITE_ONCE(msk->write_seq, msk->write_seq + 1); - WRITE_ONCE(msk->snd_data_fin_enable, 1); - - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - - mptcp_subflow_shutdown(sock->sk, tcp_sk, how); - } - } + sk->sk_shutdown |= how; + if ((how & SEND_SHUTDOWN) && mptcp_close_state(sk)) + __mptcp_wr_shutdown(sk); /* Wake up anyone sleeping in poll. */ - sock->sk->sk_state_change(sock->sk); + sk->sk_state_change(sk); out_unlock: - release_sock(sock->sk); + release_sock(sk); return ret; } diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h index 278c88c405e847ad51c94fe95b7b6c67369e730d..b4c8dbe9236bf0f1948490383f5c7086dda5181e 100644 --- a/net/mptcp/protocol.h +++ b/net/mptcp/protocol.h @@ -86,11 +86,19 @@ /* MPTCP socket flags */ #define MPTCP_DATA_READY 0 -#define MPTCP_SEND_SPACE 1 +#define MPTCP_NOSPACE 1 #define MPTCP_WORK_RTX 2 #define MPTCP_WORK_EOF 3 #define MPTCP_FALLBACK_DONE 4 #define MPTCP_WORK_CLOSE_SUBFLOW 5 +#define MPTCP_WORKER_RUNNING 6 + +static inline bool before64(__u64 seq1, __u64 seq2) +{ + return (__s64)(seq1 - seq2) < 0; +} + +#define after64(seq2, seq1) before64(seq1, seq2) struct mptcp_options_received { u64 sndr_key; @@ -187,9 +195,10 @@ struct mptcp_pm_data { struct mptcp_data_frag { struct list_head list; u64 data_seq; - int data_len; - int offset; - int overhead; + u16 data_len; + u16 offset; + u16 overhead; + u16 already_sent; struct page *page; }; @@ -200,11 +209,13 @@ struct mptcp_sock { u64 local_key; u64 remote_key; u64 write_seq; + u64 snd_nxt; u64 ack_seq; u64 rcv_data_fin_seq; struct sock *last_snd; int snd_burst; atomic64_t snd_una; + atomic64_t wnd_end; unsigned long timer_ival; u32 token; unsigned long flags; @@ -219,6 +230,7 @@ struct mptcp_sock { struct rb_root out_of_order_queue; struct list_head conn_list; struct list_head rtx_queue; + struct mptcp_data_frag *first_pending; struct list_head join_list; struct skb_ext *cached_ext; /* for the next sendmsg */ struct socket *subflow; /* outgoing connect/listener/!mp_capable */ @@ -240,11 +252,41 @@ static inline struct mptcp_sock *mptcp_sk(const struct sock *sk) return (struct mptcp_sock *)sk; } +static inline struct mptcp_data_frag *mptcp_send_head(const struct sock *sk) +{ + const struct mptcp_sock *msk = mptcp_sk(sk); + + return READ_ONCE(msk->first_pending); +} + +static inline struct mptcp_data_frag *mptcp_send_next(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_data_frag *cur; + + cur = msk->first_pending; + return list_is_last(&cur->list, &msk->rtx_queue) ? NULL : + list_next_entry(cur, list); +} + +static inline struct mptcp_data_frag *mptcp_pending_tail(const struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (!msk->first_pending) + return NULL; + + if (WARN_ON_ONCE(list_empty(&msk->rtx_queue))) + return NULL; + + return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list); +} + static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (list_empty(&msk->rtx_queue)) + if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una))) return NULL; return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list); @@ -312,7 +354,8 @@ struct mptcp_subflow_context { mpc_map : 1, backup : 1, rx_eof : 1, - can_ack : 1; /* only after processing the remote a key */ + can_ack : 1, /* only after processing the remote a key */ + disposable : 1; /* ctx can be free at ulp release time */ enum mptcp_data_avail data_avail; u32 remote_nonce; u64 thmac; @@ -369,8 +412,7 @@ bool mptcp_subflow_data_available(struct sock *sk); void __init mptcp_subflow_init(void); void mptcp_subflow_shutdown(struct sock *sk, struct sock *ssk, int how); void __mptcp_close_ssk(struct sock *sk, struct sock *ssk, - struct mptcp_subflow_context *subflow, - long timeout); + struct mptcp_subflow_context *subflow); void mptcp_subflow_reset(struct sock *ssk); /* called with sk socket lock held */ @@ -408,9 +450,16 @@ static inline bool mptcp_is_fully_established(struct sock *sk) void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk); void mptcp_data_ready(struct sock *sk, struct sock *ssk); bool mptcp_finish_join(struct sock *sk); +bool mptcp_schedule_work(struct sock *sk); void mptcp_data_acked(struct sock *sk); void mptcp_subflow_eof(struct sock *sk); bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit); +static inline bool mptcp_data_fin_enabled(const struct mptcp_sock *msk) +{ + return READ_ONCE(msk->snd_data_fin_enable) && + READ_ONCE(msk->write_seq) == READ_ONCE(msk->snd_nxt); +} + void mptcp_destroy_common(struct mptcp_sock *msk); void __init mptcp_token_init(void); @@ -495,13 +544,6 @@ static inline struct mptcp_ext *mptcp_get_ext(struct sk_buff *skb) return (struct mptcp_ext *)skb_ext_find(skb, SKB_EXT_MPTCP); } -static inline bool before64(__u64 seq1, __u64 seq2) -{ - return (__s64)(seq1 - seq2) < 0; -} - -#define after64(seq2, seq1) before64(seq1, seq2) - void mptcp_diag_subflow_init(struct tcp_ulp_ops *ops); static inline bool __mptcp_check_fallback(const struct mptcp_sock *msk) diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c index ac4a1fe3550bd0ddee6e4f3d808aa4e48003dac6..7942597891945bbdd5d2819b09bde2bd3433eee0 100644 --- a/net/mptcp/subflow.c +++ b/net/mptcp/subflow.c @@ -997,17 +997,16 @@ static void subflow_data_ready(struct sock *sk) static void subflow_write_space(struct sock *sk) { struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); + struct socket *sock = READ_ONCE(sk->sk_socket); struct sock *parent = subflow->conn; if (!sk_stream_is_writeable(sk)) return; - if (sk_stream_is_writeable(parent)) { - set_bit(MPTCP_SEND_SPACE, &mptcp_sk(parent)->flags); - smp_mb__after_atomic(); - /* set SEND_SPACE before sk_stream_write_space clears NOSPACE */ - sk_stream_write_space(parent); - } + if (sock && sk_stream_is_writeable(parent)) + clear_bit(SOCK_NOSPACE, &sock->flags); + + sk_stream_write_space(parent); } static struct inet_connection_sock_af_ops * @@ -1125,6 +1124,7 @@ int __mptcp_subflow_connect(struct sock *sk, const struct mptcp_addr_info *loc, if (err && err != -EINPROGRESS) goto failed; + sock_hold(ssk); spin_lock_bh(&msk->join_list_lock); list_add_tail(&subflow->node, &msk->join_list); spin_unlock_bh(&msk->join_list_lock); @@ -1132,6 +1132,7 @@ int __mptcp_subflow_connect(struct sock *sk, const struct mptcp_addr_info *loc, return err; failed: + subflow->disposable = 1; sock_release(sf); return err; } @@ -1254,7 +1255,6 @@ static void subflow_state_change(struct sock *sk) mptcp_data_ready(parent, sk); if (__mptcp_check_fallback(mptcp_sk(parent)) && - !(parent->sk_shutdown & RCV_SHUTDOWN) && !subflow->rx_eof && subflow_is_done(sk)) { subflow->rx_eof = 1; mptcp_subflow_eof(parent); @@ -1297,17 +1297,26 @@ static int subflow_ulp_init(struct sock *sk) return err; } -static void subflow_ulp_release(struct sock *sk) +static void subflow_ulp_release(struct sock *ssk) { - struct mptcp_subflow_context *ctx = mptcp_subflow_ctx(sk); + struct mptcp_subflow_context *ctx = mptcp_subflow_ctx(ssk); + bool release = true; + struct sock *sk; if (!ctx) return; - if (ctx->conn) - sock_put(ctx->conn); + sk = ctx->conn; + if (sk) { + /* if the msk has been orphaned, keep the ctx + * alive, will be freed by mptcp_done() + */ + release = ctx->disposable; + sock_put(sk); + } - kfree_rcu(ctx, rcu); + if (release) + kfree_rcu(ctx, rcu); } static void subflow_ulp_clone(const struct request_sock *req,