diff --git a/include/net/sock.h b/include/net/sock.h index 80469c2c448d4c9f10086ee960711c4ca9858169..f59764614e308e6222f2132c52ed9e400d0e7993 100644 --- a/include/net/sock.h +++ b/include/net/sock.h @@ -1590,6 +1590,7 @@ static inline void lock_sock(struct sock *sk) lock_sock_nested(sk, 0); } +void __lock_sock(struct sock *sk); void __release_sock(struct sock *sk); void release_sock(struct sock *sk); diff --git a/net/core/sock.c b/net/core/sock.c index 9badbe7bb4e48880c1f9ca1c1ef5ab85c96316a9..f0f0968528766a4aaa39e78c61b6e477b2872318 100644 --- a/net/core/sock.c +++ b/net/core/sock.c @@ -2486,7 +2486,7 @@ bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag) } EXPORT_SYMBOL(sk_page_frag_refill); -static void __lock_sock(struct sock *sk) +void __lock_sock(struct sock *sk) __releases(&sk->sk_lock.slock) __acquires(&sk->sk_lock.slock) { diff --git a/net/mptcp/mptcp_diag.c b/net/mptcp/mptcp_diag.c index 5f390a97f556de8a0dcfaf985ac14e09bc66d086..b70ae4ba300085ffcfe1d8afa18c594e9016e851 100644 --- a/net/mptcp/mptcp_diag.c +++ b/net/mptcp/mptcp_diag.c @@ -140,7 +140,7 @@ static void mptcp_diag_get_info(struct sock *sk, struct inet_diag_msg *r, info->mptcpi_flags = flags; info->mptcpi_token = READ_ONCE(msk->token); info->mptcpi_write_seq = READ_ONCE(msk->write_seq); - info->mptcpi_snd_una = atomic64_read(&msk->snd_una); + info->mptcpi_snd_una = READ_ONCE(msk->snd_una); info->mptcpi_rcv_nxt = READ_ONCE(msk->ack_seq); unlock_sock_fast(sk, slow); } diff --git a/net/mptcp/options.c b/net/mptcp/options.c index 8a59b3e445999d978c2cd7c87b4824c48be66572..6b7b4b67f18c8c9316cf6209ed1270b066b7ff95 100644 --- a/net/mptcp/options.c +++ b/net/mptcp/options.c @@ -830,18 +830,20 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit) } static void ack_update_msk(struct mptcp_sock *msk, - const struct sock *ssk, + struct sock *ssk, struct mptcp_options_received *mp_opt) { - u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una); - u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end); - u64 snd_nxt = READ_ONCE(msk->snd_nxt); + u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt); struct sock *sk = (struct sock *)msk; + u64 old_snd_una; + + mptcp_data_lock(sk); /* avoid ack expansion on update conflict, to reduce the risk of * wrongly expanding to a future ack sequence number, which is way * more dangerous than missing an ack */ + old_snd_una = msk->snd_una; new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64); /* ACK for data not even sent yet? Ignore. */ @@ -850,26 +852,16 @@ static void ack_update_msk(struct mptcp_sock *msk, new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd; - while (after64(new_wnd_end, old_wnd_end)) { - wnd_end = old_wnd_end; - old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end, - new_wnd_end); - if (old_wnd_end == wnd_end) { - if (mptcp_send_head(sk)) - mptcp_schedule_work(sk); - break; - } + if (after64(new_wnd_end, msk->wnd_end)) { + msk->wnd_end = new_wnd_end; + __mptcp_wnd_updated(sk, ssk); } - while (after64(new_snd_una, old_snd_una)) { - snd_una = old_snd_una; - old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una, - new_snd_una); - if (old_snd_una == snd_una) { - mptcp_data_acked(sk); - break; - } + if (after64(new_snd_una, old_snd_una)) { + msk->snd_una = new_snd_una; + __mptcp_data_acked(sk); } + mptcp_data_unlock(sk); } bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit) @@ -922,8 +914,19 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb) struct mptcp_options_received mp_opt; struct mptcp_ext *mpext; - if (__mptcp_check_fallback(msk)) + if (__mptcp_check_fallback(msk)) { + /* Keep it simple and unconditionally trigger send data cleanup and + * pending queue spooling. We will need to acquire the data lock + * for more accurate checks, and once the lock is acquired, such + * helpers are cheap. + */ + mptcp_data_lock(subflow->conn); + if (mptcp_send_head(subflow->conn)) + __mptcp_wnd_updated(subflow->conn, sk); + __mptcp_data_acked(subflow->conn); + mptcp_data_unlock(subflow->conn); return; + } mptcp_get_options(skb, &mp_opt); if (!check_fully_established(msk, sk, subflow, skb, &mp_opt)) diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index 16e9cb1c79ccf79e329d6b8a1f4e620fe5409f8a..221f7cdd416bdb681968bf1b3ff2ed1b03cea3ce 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -60,7 +60,7 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk) /* Returns end sequence number of the receiver's advertised window */ static u64 mptcp_wnd_end(const struct mptcp_sock *msk) { - return atomic64_read(&msk->wnd_end); + return READ_ONCE(msk->wnd_end); } static bool mptcp_is_tcpsk(struct sock *sk) @@ -348,17 +348,22 @@ static void mptcp_close_wake_up(struct sock *sk) sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); } -static void mptcp_check_data_fin_ack(struct sock *sk) +static bool mptcp_pending_data_fin_ack(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (__mptcp_check_fallback(msk)) - return; + return !__mptcp_check_fallback(msk) && + ((1 << sk->sk_state) & + (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) && + msk->write_seq == READ_ONCE(msk->snd_una); +} + +static void mptcp_check_data_fin_ack(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); /* Look for an acknowledged DATA_FIN */ - if (((1 << sk->sk_state) & - (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) && - msk->write_seq == atomic64_read(&msk->snd_una)) { + if (mptcp_pending_data_fin_ack(sk)) { mptcp_stop_timer(sk); WRITE_ONCE(msk->snd_data_fin_enable, 0); @@ -453,15 +458,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk) static void mptcp_cleanup_rbuf(struct mptcp_sock *msk) { + struct sock *ack_hint = READ_ONCE(msk->ack_hint); struct mptcp_subflow_context *subflow; /* if the hinted ssk is still active, try to use it */ - if (likely(msk->ack_hint)) { + if (likely(ack_hint)) { mptcp_for_each_subflow(msk, subflow) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - if (msk->ack_hint == ssk && - mptcp_subflow_cleanup_rbuf(ssk)) + if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk)) return; } } @@ -614,13 +619,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, break; } } while (more_data_avail); - msk->ack_hint = ssk; + WRITE_ONCE(msk->ack_hint, ssk); *bytes += moved; return done; } -static bool mptcp_ofo_queue(struct mptcp_sock *msk) +static bool __mptcp_ofo_queue(struct mptcp_sock *msk) { struct sock *sk = (struct sock *)msk; struct sk_buff *skb, *tail; @@ -666,34 +671,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk) /* In most cases we will be able to lock the mptcp socket. If its already * owned, we need to defer to the work queue to avoid ABBA deadlock. */ -static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) +static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) { struct sock *sk = (struct sock *)msk; unsigned int moved = 0; - if (READ_ONCE(sk->sk_lock.owned)) - return false; - - if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock))) - return false; - - /* must re-check after taking the lock */ - if (!READ_ONCE(sk->sk_lock.owned)) { - __mptcp_move_skbs_from_subflow(msk, ssk, &moved); - mptcp_ofo_queue(msk); + if (inet_sk_state_load(sk) == TCP_CLOSE) + return; - /* If the moves have caught up with the DATA_FIN sequence number - * it's time to ack the DATA_FIN and change socket state, but - * this is not a good place to change state. Let the workqueue - * do it. - */ - if (mptcp_pending_data_fin(sk, NULL)) - mptcp_schedule_work(sk); - } + mptcp_data_lock(sk); - spin_unlock_bh(&sk->sk_lock.slock); + __mptcp_move_skbs_from_subflow(msk, ssk, &moved); + __mptcp_ofo_queue(msk); - return moved > 0; + /* If the moves have caught up with the DATA_FIN sequence number + * it's time to ack the DATA_FIN and change socket state, but + * this is not a good place to change state. Let the workqueue + * do it. + */ + if (mptcp_pending_data_fin(sk, NULL)) + mptcp_schedule_work(sk); + mptcp_data_unlock(sk); } void mptcp_data_ready(struct sock *sk, struct sock *ssk) @@ -771,16 +769,6 @@ bool mptcp_schedule_work(struct sock *sk) return false; } -void mptcp_data_acked(struct sock *sk) -{ - mptcp_reset_timer(sk); - - if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) || - mptcp_send_head(sk) || - (inet_sk_state_load(sk) != TCP_ESTABLISHED))) - mptcp_schedule_work(sk); -} - void mptcp_subflow_eof(struct sock *sk) { if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags)) @@ -825,16 +813,6 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk) mptcp_close_wake_up(sk); } -static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) -{ - const struct sock *sk = (const struct sock *)msk; - - if (!msk->cached_ext) - msk->cached_ext = __skb_ext_alloc(sk->sk_allocation); - - return !!msk->cached_ext; -} - static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk) { struct mptcp_subflow_context *subflow; @@ -873,6 +851,121 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk, df->data_seq + df->data_len == msk->write_seq; } +static int mptcp_wmem_with_overhead(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + int ret, skbs; + + ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT); + skbs = (msk->tx_pending_data + size) / msk->size_goal_cache; + if (skbs < msk->skb_tx_cache.qlen) + return ret; + + return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER); +} + +static void __mptcp_wmem_reserve(struct sock *sk, int size) +{ + int amount = mptcp_wmem_with_overhead(sk, size); + struct mptcp_sock *msk = mptcp_sk(sk); + + WARN_ON_ONCE(msk->wmem_reserved); + if (amount <= sk->sk_forward_alloc) + goto reserve; + + /* under memory pressure try to reserve at most a single page + * otherwise try to reserve the full estimate and fallback + * to a single page before entering the error path + */ + if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) || + !sk_wmem_schedule(sk, amount)) { + if (amount <= PAGE_SIZE) + goto nomem; + + amount = PAGE_SIZE; + if (!sk_wmem_schedule(sk, amount)) + goto nomem; + } + +reserve: + msk->wmem_reserved = amount; + sk->sk_forward_alloc -= amount; + return; + +nomem: + /* we will wait for memory on next allocation */ + msk->wmem_reserved = -1; +} + +static void __mptcp_update_wmem(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (!msk->wmem_reserved) + return; + + if (msk->wmem_reserved < 0) + msk->wmem_reserved = 0; + if (msk->wmem_reserved > 0) { + sk->sk_forward_alloc += msk->wmem_reserved; + msk->wmem_reserved = 0; + } +} + +static bool mptcp_wmem_alloc(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + /* check for pre-existing error condition */ + if (msk->wmem_reserved < 0) + return false; + + if (msk->wmem_reserved >= size) + goto account; + + mptcp_data_lock(sk); + if (!sk_wmem_schedule(sk, size)) { + mptcp_data_unlock(sk); + return false; + } + + sk->sk_forward_alloc -= size; + msk->wmem_reserved += size; + mptcp_data_unlock(sk); + +account: + msk->wmem_reserved -= size; + return true; +} + +static void mptcp_wmem_uncharge(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (msk->wmem_reserved < 0) + msk->wmem_reserved = 0; + msk->wmem_reserved += size; +} + +static void mptcp_mem_reclaim_partial(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + /* if we are experiencing a transint allocation error, + * the forward allocation memory has been already + * released + */ + if (msk->wmem_reserved < 0) + return; + + mptcp_data_lock(sk); + sk->sk_forward_alloc += msk->wmem_reserved; + sk_mem_reclaim_partial(sk); + msk->wmem_reserved = sk->sk_forward_alloc; + sk->sk_forward_alloc = 0; + mptcp_data_unlock(sk); +} + static void dfrag_uncharge(struct sock *sk, int len) { sk_mem_uncharge(sk, len); @@ -888,7 +981,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag) put_page(dfrag->page); } -static void mptcp_clean_una(struct sock *sk) +static void __mptcp_clean_una(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_data_frag *dtmp, *dfrag; @@ -899,10 +992,9 @@ static void mptcp_clean_una(struct sock *sk) * plain TCP */ if (__mptcp_check_fallback(msk)) - atomic64_set(&msk->snd_una, msk->snd_nxt); - - snd_una = atomic64_read(&msk->snd_una); + msk->snd_una = READ_ONCE(msk->snd_nxt); + snd_una = msk->snd_una; list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) { if (after64(dfrag->data_seq + dfrag->data_len, snd_una)) break; @@ -930,36 +1022,34 @@ static void mptcp_clean_una(struct sock *sk) } out: - if (cleaned) - sk_mem_reclaim_partial(sk); -} - -static void mptcp_clean_una_wakeup(struct sock *sk) -{ - struct mptcp_sock *msk = mptcp_sk(sk); + if (cleaned) { + if (tcp_under_memory_pressure(sk)) { + __mptcp_update_wmem(sk); + sk_mem_reclaim_partial(sk); + } - mptcp_clean_una(sk); + if (sk_stream_is_writeable(sk)) { + /* pairs with memory barrier in mptcp_poll */ + smp_mb(); + if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags)) + sk_stream_write_space(sk); + } + } - /* Only wake up writers if a subflow is ready */ - if (sk_stream_is_writeable(sk)) { - clear_bit(MPTCP_NOSPACE, &msk->flags); - sk_stream_write_space(sk); + if (snd_una == READ_ONCE(msk->snd_nxt)) { + if (msk->timer_ival) + mptcp_stop_timer(sk); + } else { + mptcp_reset_timer(sk); } } -/* ensure we get enough memory for the frag hdr, beyond some minimal amount of - * data - */ -static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag) +static void mptcp_enter_memory_pressure(struct sock *sk) { struct mptcp_subflow_context *subflow; struct mptcp_sock *msk = mptcp_sk(sk); bool first = true; - if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag), - pfrag, sk->sk_allocation))) - return true; - sk_stream_moderate_sndbuf(sk); mptcp_for_each_subflow(msk, subflow) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); @@ -969,6 +1059,18 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag) sk_stream_moderate_sndbuf(ssk); first = false; } +} + +/* ensure we get enough memory for the frag hdr, beyond some minimal amount of + * data + */ +static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag) +{ + if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag), + pfrag, sk->sk_allocation))) + return true; + + mptcp_enter_memory_pressure(sk); return false; } @@ -1015,6 +1117,128 @@ static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq, return avail_size; } +static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp) +{ + struct skb_ext *mpext = __skb_ext_alloc(gfp); + + if (!mpext) + return false; + __skb_ext_set(skb, SKB_EXT_MPTCP, mpext); + return true; +} + +static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp) +{ + struct sk_buff *skb; + + skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp); + if (likely(skb)) { + if (likely(__mptcp_add_ext(skb, gfp))) { + skb_reserve(skb, MAX_TCP_HEADER); + skb->reserved_tailroom = skb->end - skb->tail; + return skb; + } + __kfree_skb(skb); + } else { + mptcp_enter_memory_pressure(sk); + } + return NULL; +} + +static bool mptcp_tx_cache_refill(struct sock *sk, int size, + struct sk_buff_head *skbs, int *total_ts) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct sk_buff *skb; + int space_needed; + + if (unlikely(tcp_under_memory_pressure(sk))) { + mptcp_mem_reclaim_partial(sk); + + /* under pressure pre-allocate at most a single skb */ + if (msk->skb_tx_cache.qlen) + return true; + space_needed = msk->size_goal_cache; + } else { + space_needed = msk->tx_pending_data + size - + msk->skb_tx_cache.qlen * msk->size_goal_cache; + } + + while (space_needed > 0) { + skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation); + if (unlikely(!skb)) { + /* under memory pressure, try to pass the caller a + * single skb to allow forward progress + */ + while (skbs->qlen > 1) { + skb = __skb_dequeue_tail(skbs); + __kfree_skb(skb); + } + return skbs->qlen > 0; + } + + *total_ts += skb->truesize; + __skb_queue_tail(skbs, skb); + space_needed -= msk->size_goal_cache; + } + return true; +} + +static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct sk_buff *skb; + + if (ssk->sk_tx_skb_cache) { + skb = ssk->sk_tx_skb_cache; + if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) && + !__mptcp_add_ext(skb, gfp))) + return false; + return true; + } + + skb = skb_peek(&msk->skb_tx_cache); + if (skb) { + if (likely(sk_wmem_schedule(ssk, skb->truesize))) { + skb = __skb_dequeue(&msk->skb_tx_cache); + if (WARN_ON_ONCE(!skb)) + return false; + + mptcp_wmem_uncharge(sk, skb->truesize); + ssk->sk_tx_skb_cache = skb; + return true; + } + + /* over memory limit, no point to try to allocate a new skb */ + return false; + } + + skb = __mptcp_do_alloc_tx_skb(sk, gfp); + if (!skb) + return false; + + if (likely(sk_wmem_schedule(ssk, skb->truesize))) { + ssk->sk_tx_skb_cache = skb; + return true; + } + kfree_skb(skb); + return false; +} + +static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk) +{ + return !ssk->sk_tx_skb_cache && + !skb_peek(&mptcp_sk(sk)->skb_tx_cache) && + tcp_under_memory_pressure(sk); +} + +static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk) +{ + if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) + mptcp_mem_reclaim_partial(sk); + return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation); +} + static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, struct mptcp_data_frag *dfrag, struct mptcp_sendmsg_info *info) @@ -1026,7 +1250,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, struct sk_buff *skb, *tail; bool can_collapse = false; int avail_size; - size_t ret; + size_t ret = 0; pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d", msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent); @@ -1034,6 +1258,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, /* compute send limit */ info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags); avail_size = info->size_goal; + msk->size_goal_cache = info->size_goal; skb = tcp_write_queue_tail(ssk); if (skb) { /* Limit the write to the size available in the @@ -1054,10 +1279,12 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, /* Zero window and all data acked? Probe. */ avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size); if (avail_size == 0) { - if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt) + u64 snd_una = READ_ONCE(msk->snd_una); + + if (skb || snd_una != msk->snd_nxt) return 0; zero_window_probe = true; - data_seq = atomic64_read(&msk->snd_una) - 1; + data_seq = snd_una - 1; avail_size = 1; } @@ -1082,8 +1309,11 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, goto out; } - mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext); - msk->cached_ext = NULL; + mpext = skb_ext_find(tail, SKB_EXT_MPTCP); + if (WARN_ON_ONCE(!mpext)) { + /* should never reach here, stream corrupted */ + return -EINVAL; + } memset(mpext, 0, sizeof(*mpext)); mpext->data_seq = data_seq; @@ -1107,31 +1337,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, return ret; } -static void mptcp_nospace(struct mptcp_sock *msk) -{ - struct mptcp_subflow_context *subflow; - - set_bit(MPTCP_NOSPACE, &msk->flags); - smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ - - mptcp_for_each_subflow(msk, subflow) { - struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - bool ssk_writeable = sk_stream_is_writeable(ssk); - struct socket *sock = READ_ONCE(ssk->sk_socket); - - if (ssk_writeable || !sock) - continue; - - /* enables ssk->write_space() callbacks */ - set_bit(SOCK_NOSPACE, &sock->flags); - } - - /* mptcp_data_acked() could run just before we set the NOSPACE bit, - * so explicitly check for snd_una value - */ - mptcp_clean_una((struct sock *)msk); -} - #define MPTCP_SEND_BURST_SIZE ((1 << 16) - \ sizeof(struct tcphdr) - \ MAX_TCP_OPTION_SPACE - \ @@ -1156,9 +1361,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk, sock_owned_by_me((struct sock *)msk); *sndbuf = 0; - if (!mptcp_ext_cache_refill(msk)) - return NULL; - if (__mptcp_check_fallback(msk)) { if (!msk->first) return NULL; @@ -1267,6 +1469,15 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags) if (ssk != prev_ssk || !prev_ssk) lock_sock(ssk); + /* keep it simple and always provide a new skb for the + * subflow, even if we will not use it when collapsing + * on the pending one + */ + if (!mptcp_alloc_tx_skb(sk, ssk)) { + mptcp_push_release(sk, ssk, &info); + goto out; + } + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); if (ret <= 0) { mptcp_push_release(sk, ssk, &info); @@ -1277,6 +1488,7 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags) dfrag->already_sent += ret; msk->snd_nxt += ret; msk->snd_burst -= ret; + msk->tx_pending_data -= ret; copied += ret; len -= ret; } @@ -1296,6 +1508,63 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags) } } +static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_sendmsg_info info; + struct mptcp_data_frag *dfrag; + int len, copied = 0; + + info.flags = 0; + while ((dfrag = mptcp_send_head(sk))) { + info.sent = dfrag->already_sent; + info.limit = dfrag->data_len; + len = dfrag->data_len - dfrag->already_sent; + while (len > 0) { + int ret = 0; + + /* do auto tuning */ + if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && + ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf)) + WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf); + + if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) { + __mptcp_update_wmem(sk); + sk_mem_reclaim_partial(sk); + } + if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC)) + goto out; + + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) + goto out; + + info.sent += ret; + dfrag->already_sent += ret; + msk->snd_nxt += ret; + msk->snd_burst -= ret; + msk->tx_pending_data -= ret; + copied += ret; + len -= ret; + } + WRITE_ONCE(msk->first_pending, mptcp_send_next(sk)); + } + +out: + /* __mptcp_alloc_tx_skb could have released some wmem and we are + * not going to flush it via release_sock() + */ + __mptcp_update_wmem(sk); + if (copied) { + mptcp_set_timeout(sk, ssk); + tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle, + info.size_goal); + if (msk->snd_data_fin_enable && + msk->snd_nxt + 1 == msk->write_seq) + mptcp_schedule_work(sk); + } +} + static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) { struct mptcp_sock *msk = mptcp_sk(sk); @@ -1307,7 +1576,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL)) return -EOPNOTSUPP; - lock_sock(sk); + mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len)); timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT); @@ -1318,11 +1587,11 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) } pfrag = sk_page_frag(sk); - mptcp_clean_una(sk); while (msg_data_left(msg)) { + int total_ts, frag_truesize = 0; struct mptcp_data_frag *dfrag; - int frag_truesize = 0; + struct sk_buff_head skbs; bool dfrag_collapsed; size_t psize, offset; @@ -1337,11 +1606,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) dfrag = mptcp_pending_tail(sk); dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag); if (!dfrag_collapsed) { - if (!sk_stream_memory_free(sk)) { - mptcp_push_pending(sk, msg->msg_flags); - if (!sk_stream_memory_free(sk)) - goto wait_for_memory; - } + if (!sk_stream_memory_free(sk)) + goto wait_for_memory; + if (!mptcp_page_frag_refill(sk, pfrag)) goto wait_for_memory; @@ -1356,11 +1623,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) offset = dfrag->offset + dfrag->data_len; psize = pfrag->size - offset; psize = min_t(size_t, psize, msg_data_left(msg)); - if (!sk_wmem_schedule(sk, psize + frag_truesize)) + total_ts = psize + frag_truesize; + __skb_queue_head_init(&skbs); + if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts)) goto wait_for_memory; + if (!mptcp_wmem_alloc(sk, total_ts)) { + __skb_queue_purge(&skbs); + goto wait_for_memory; + } + + skb_queue_splice_tail(&skbs, &msk->skb_tx_cache); if (copy_page_from_iter(dfrag->page, offset, psize, &msg->msg_iter) != psize) { + mptcp_wmem_uncharge(sk, psize + frag_truesize); ret = -EFAULT; goto out; } @@ -1376,7 +1652,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) * Note: we charge such data both to sk and ssk */ sk_wmem_queued_add(sk, frag_truesize); - sk->sk_forward_alloc -= frag_truesize; if (!dfrag_collapsed) { get_page(dfrag->page); list_add_tail(&dfrag->list, &msk->rtx_queue); @@ -1387,21 +1662,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) dfrag->data_seq, dfrag->data_len, dfrag->already_sent, !dfrag_collapsed); - if (!mptcp_ext_cache_refill(msk)) - goto wait_for_memory; continue; wait_for_memory: - mptcp_nospace(msk); - if (mptcp_timer_pending(sk)) - mptcp_reset_timer(sk); + set_bit(MPTCP_NOSPACE, &msk->flags); + mptcp_push_pending(sk, msg->msg_flags); ret = sk_stream_wait_memory(sk, &timeo); if (ret) goto out; } - if (copied) + if (copied) { + msk->tx_pending_data += copied; mptcp_push_pending(sk, msg->msg_flags); + } out: release_sock(sk); @@ -1427,11 +1701,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk, struct msghdr *msg, size_t len) { - struct sock *sk = (struct sock *)msk; struct sk_buff *skb; int copied = 0; - while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) { + while ((skb = skb_peek(&msk->receive_queue)) != NULL) { u32 offset = MPTCP_SKB_CB(skb)->offset; u32 data_len = skb->len - offset; u32 count = min_t(size_t, len - copied, data_len); @@ -1451,7 +1724,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk, break; } - __skb_unlink(skb, &sk->sk_receive_queue); + /* we will bulk release the skb memory later */ + skb->destructor = NULL; + msk->rmem_released += skb->truesize; + __skb_unlink(skb, &msk->receive_queue); __kfree_skb(skb); if (copied >= len) @@ -1559,25 +1835,47 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied) msk->rcvq_space.time = mstamp; } +static void __mptcp_update_rmem(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (!msk->rmem_released) + return; + + atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc); + sk_mem_uncharge(sk, msk->rmem_released); + msk->rmem_released = 0; +} + +static void __mptcp_splice_receive_queue(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue); +} + static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv) { + struct sock *sk = (struct sock *)msk; unsigned int moved = 0; - bool done; - - /* avoid looping forever below on racing close */ - if (((struct sock *)msk)->sk_state == TCP_CLOSE) - return false; + bool ret, done; __mptcp_flush_join_list(msk); do { struct sock *ssk = mptcp_subflow_recv_lookup(msk); bool slowpath; - if (!ssk) + /* we can have data pending in the subflows only if the msk + * receive buffer was full at subflow_data_ready() time, + * that is an unlikely slow path. + */ + if (likely(!ssk)) break; slowpath = lock_sock_fast(ssk); + mptcp_data_lock(sk); done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved); + mptcp_data_unlock(sk); if (moved && rcv) { WRITE_ONCE(msk->rmem_pending, min(rcv, moved)); tcp_cleanup_rbuf(ssk, 1); @@ -1586,11 +1884,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv) unlock_sock_fast(ssk, slowpath); } while (!done); - if (mptcp_ofo_queue(msk) || moved > 0) { - mptcp_check_data_fin((struct sock *)msk); - return true; + /* acquire the data lock only if some input data is pending */ + ret = moved > 0; + if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) || + !skb_queue_empty_lockless(&sk->sk_receive_queue)) { + mptcp_data_lock(sk); + __mptcp_update_rmem(sk); + ret |= __mptcp_ofo_queue(msk); + __mptcp_splice_receive_queue(sk); + mptcp_data_unlock(sk); } - return false; + if (ret) + mptcp_check_data_fin((struct sock *)msk); + return !skb_queue_empty(&msk->receive_queue); } static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, @@ -1604,7 +1910,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT)) return -EOPNOTSUPP; - lock_sock(sk); + mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk)); if (unlikely(sk->sk_state == TCP_LISTEN)) { copied = -ENOTCONN; goto out_err; @@ -1614,7 +1920,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, len = min_t(size_t, len, INT_MAX); target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); - __mptcp_flush_join_list(msk); for (;;) { int bytes_read, old_space; @@ -1628,7 +1933,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, copied += bytes_read; - if (skb_queue_empty(&sk->sk_receive_queue) && + if (skb_queue_empty(&msk->receive_queue) && __mptcp_move_skbs(msk, len - copied)) continue; @@ -1659,8 +1964,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags)) mptcp_check_for_eof(msk); - if (sk->sk_shutdown & RCV_SHUTDOWN) + if (sk->sk_shutdown & RCV_SHUTDOWN) { + /* race breaker: the shutdown could be after the + * previous receive queue check + */ + if (__mptcp_move_skbs(msk, len - copied)) + continue; break; + } if (sk->sk_state == TCP_CLOSE) { copied = -ENOTCONN; @@ -1682,7 +1993,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, mptcp_wait_data(sk, &timeo); } - if (skb_queue_empty(&sk->sk_receive_queue)) { + if (skb_queue_empty_lockless(&sk->sk_receive_queue) && + skb_queue_empty(&msk->receive_queue)) { /* entire backlog drained, clear DATA_READY. */ clear_bit(MPTCP_DATA_READY, &msk->flags); @@ -1698,7 +2010,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, out_err: pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d", msk, test_bit(MPTCP_DATA_READY, &msk->flags), - skb_queue_empty(&sk->sk_receive_queue), copied); + skb_queue_empty_lockless(&sk->sk_receive_queue), copied); mptcp_rcv_space_adjust(msk, copied); release_sock(sk); @@ -1709,12 +2021,8 @@ static void mptcp_retransmit_handler(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) { - mptcp_stop_timer(sk); - } else { - set_bit(MPTCP_WORK_RTX, &msk->flags); - mptcp_schedule_work(sk); - } + set_bit(MPTCP_WORK_RTX, &msk->flags); + mptcp_schedule_work(sk); } static void mptcp_retransmit_timer(struct timer_list *t) @@ -1915,21 +2223,18 @@ static void mptcp_worker(struct work_struct *work) if (unlikely(state == TCP_CLOSE)) goto unlock; - mptcp_clean_una_wakeup(sk); mptcp_check_data_fin_ack(sk); __mptcp_flush_join_list(msk); if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags)) __mptcp_close_subflow(msk); - if (mptcp_send_head(sk)) - mptcp_push_pending(sk, 0); - if (msk->pm.status) pm_work(msk); if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags)) mptcp_check_for_eof(msk); + __mptcp_check_send_data_fin(sk); mptcp_check_data_fin(sk); /* if the msk data is completely acked, or the socket timedout, @@ -1951,9 +2256,6 @@ static void mptcp_worker(struct work_struct *work) if (!dfrag) goto unlock; - if (!mptcp_ext_cache_refill(msk)) - goto reset_unlock; - ssk = mptcp_subflow_get_retrans(msk); if (!ssk) goto reset_unlock; @@ -1964,6 +2266,9 @@ static void mptcp_worker(struct work_struct *work) info.sent = 0; info.limit = dfrag->already_sent; while (info.sent < dfrag->already_sent) { + if (!mptcp_alloc_tx_skb(sk, ssk)) + break; + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); if (ret <= 0) break; @@ -1971,9 +2276,6 @@ static void mptcp_worker(struct work_struct *work) MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS); copied += ret; info.sent += ret; - - if (!mptcp_ext_cache_refill(msk)) - break; } if (copied) tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle, @@ -2001,8 +2303,14 @@ static int __mptcp_init_sock(struct sock *sk) INIT_LIST_HEAD(&msk->join_list); INIT_LIST_HEAD(&msk->rtx_queue); INIT_WORK(&msk->work, mptcp_worker); + __skb_queue_head_init(&msk->receive_queue); + __skb_queue_head_init(&msk->skb_tx_cache); msk->out_of_order_queue = RB_ROOT; msk->first_pending = NULL; + msk->wmem_reserved = 0; + msk->rmem_released = 0; + msk->tx_pending_data = 0; + msk->size_goal_cache = TCP_BASE_MSS; msk->ack_hint = NULL; msk->first = NULL; @@ -2046,12 +2354,15 @@ static void __mptcp_clear_xmit(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_data_frag *dtmp, *dfrag; - - sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer); + struct sk_buff *skb; WRITE_ONCE(msk->first_pending, NULL); list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) dfrag_clear(sk, dfrag); + while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) { + sk->sk_forward_alloc += skb->truesize; + kfree_skb(skb); + } } static void mptcp_cancel_work(struct sock *sk) @@ -2186,7 +2497,7 @@ static void __mptcp_destroy_sock(struct sock *sk) spin_unlock_bh(&msk->join_list_lock); list_splice_init(&msk->conn_list, &conn_list); - __mptcp_clear_xmit(sk); + sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer); sk_stop_timer(sk, &sk->sk_timer); msk->pm.status = 0; @@ -2197,6 +2508,8 @@ static void __mptcp_destroy_sock(struct sock *sk) sk->sk_prot->destroy(sk); + WARN_ON_ONCE(msk->wmem_reserved); + WARN_ON_ONCE(msk->rmem_released); sk_stream_kill_queues(sk); xfrm_sk_free_policy(sk); sk_refcnt_debug_release(sk); @@ -2326,8 +2639,8 @@ struct sock *mptcp_sk_clone(const struct sock *sk, msk->write_seq = subflow_req->idsn + 1; msk->snd_nxt = msk->write_seq; - atomic64_set(&msk->snd_una, msk->write_seq); - atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd); + msk->snd_una = msk->write_seq; + msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd; if (mp_opt->mp_capable) { msk->can_ack = true; @@ -2363,7 +2676,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk) if (msk->rcvq_space.space == 0) msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT; - atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd); + WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd); } static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, @@ -2414,6 +2727,13 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, void mptcp_destroy_common(struct mptcp_sock *msk) { + struct sock *sk = (struct sock *)msk; + + __mptcp_clear_xmit(sk); + + /* move to sk_receive_queue, sk_stream_kill_queues will purge it */ + skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue); + skb_rbtree_purge(&msk->out_of_order_queue); mptcp_token_destroy(msk); mptcp_pm_free_anno_list(msk); @@ -2423,9 +2743,6 @@ static void mptcp_destroy(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (msk->cached_ext) - __skb_ext_put(msk->cached_ext); - mptcp_destroy_common(msk); sk_sockets_allocated_dec(sk); } @@ -2540,15 +2857,58 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname, return -EOPNOTSUPP; } +void __mptcp_data_acked(struct sock *sk) +{ + if (!sock_owned_by_user(sk)) + __mptcp_clean_una(sk); + else + set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags); + + if (mptcp_pending_data_fin_ack(sk)) + mptcp_schedule_work(sk); +} + +void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk) +{ + if (!mptcp_send_head(sk)) + return; + + if (!sock_owned_by_user(sk)) + __mptcp_subflow_push_pending(sk, ssk); + else + set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags); +} + #define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED) -/* this is very alike tcp_release_cb() but we must handle differently a - * different set of events - */ +/* processes deferred events and flush wmem */ static void mptcp_release_cb(struct sock *sk) { unsigned long flags, nflags; + /* push_pending may touch wmem_reserved, do it before the later + * cleanup + */ + if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags)) + __mptcp_clean_una(sk); + if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) { + /* mptcp_push_pending() acquires the subflow socket lock + * + * 1) can't be invoked in atomic scope + * 2) must avoid ABBA deadlock with msk socket spinlock: the RX + * datapath acquires the msk socket spinlock while helding + * the subflow socket lock + */ + + spin_unlock_bh(&sk->sk_lock.slock); + mptcp_push_pending(sk, 0); + spin_lock_bh(&sk->sk_lock.slock); + } + + /* clear any wmem reservation and errors */ + __mptcp_update_wmem(sk); + __mptcp_update_rmem(sk); + do { flags = sk->sk_tsq_flags; if (!(flags & MPTCP_DEFERRED_ALL)) @@ -2619,7 +2979,7 @@ void mptcp_finish_connect(struct sock *ssk) WRITE_ONCE(msk->ack_seq, ack_seq); WRITE_ONCE(msk->rcv_wnd_sent, ack_seq); WRITE_ONCE(msk->can_ack, 1); - atomic64_set(&msk->snd_una, msk->write_seq); + WRITE_ONCE(msk->snd_una, msk->write_seq); mptcp_pm_new_connection(msk, 0); @@ -2880,24 +3240,9 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk) 0; } -static bool __mptcp_check_writeable(struct mptcp_sock *msk) -{ - struct sock *sk = (struct sock *)msk; - bool mptcp_writable; - - mptcp_clean_una(sk); - mptcp_writable = sk_stream_is_writeable(sk); - if (!mptcp_writable) - mptcp_nospace(msk); - - return mptcp_writable; -} - static __poll_t mptcp_check_writeable(struct mptcp_sock *msk) { struct sock *sk = (struct sock *)msk; - __poll_t ret = 0; - bool slow; if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN)) return 0; @@ -2905,12 +3250,12 @@ static __poll_t mptcp_check_writeable(struct mptcp_sock *msk) if (sk_stream_is_writeable(sk)) return EPOLLOUT | EPOLLWRNORM; - slow = lock_sock_fast(sk); - if (__mptcp_check_writeable(msk)) - ret = EPOLLOUT | EPOLLWRNORM; + set_bit(MPTCP_NOSPACE, &msk->flags); + smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ + if (sk_stream_is_writeable(sk)) + return EPOLLOUT | EPOLLWRNORM; - unlock_sock_fast(sk, slow); - return ret; + return 0; } static __poll_t mptcp_poll(struct file *file, struct socket *sock, diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h index 82d5626323b18665f83feafea43be70b0c1fcb92..fc56e730fb35c01f104e64eb70891debac5744f5 100644 --- a/net/mptcp/protocol.h +++ b/net/mptcp/protocol.h @@ -91,6 +91,8 @@ #define MPTCP_WORK_EOF 3 #define MPTCP_FALLBACK_DONE 4 #define MPTCP_WORK_CLOSE_SUBFLOW 5 +#define MPTCP_PUSH_PENDING 6 +#define MPTCP_CLEAN_UNA 7 static inline bool before64(__u64 seq1, __u64 seq2) { @@ -218,14 +220,16 @@ struct mptcp_sock { u64 ack_seq; u64 rcv_wnd_sent; u64 rcv_data_fin_seq; + int wmem_reserved; struct sock *last_snd; int snd_burst; int old_wspace; - atomic64_t snd_una; - atomic64_t wnd_end; + u64 snd_una; + u64 wnd_end; unsigned long timer_ival; u32 token; int rmem_pending; + int rmem_released; unsigned long flags; bool can_ack; bool fully_established; @@ -237,11 +241,14 @@ struct mptcp_sock { struct work_struct work; struct sk_buff *ooo_last_skb; struct rb_root out_of_order_queue; + struct sk_buff_head receive_queue; + struct sk_buff_head skb_tx_cache; /* this is wmem accounted */ + int tx_pending_data; + int size_goal_cache; struct list_head conn_list; struct list_head rtx_queue; struct mptcp_data_frag *first_pending; struct list_head join_list; - struct skb_ext *cached_ext; /* for the next sendmsg */ struct socket *subflow; /* outgoing connect/listener/!mp_capable */ struct sock *first; struct mptcp_pm_data pm; @@ -253,6 +260,22 @@ struct mptcp_sock { } rcvq_space; }; +#define mptcp_lock_sock(___sk, cb) do { \ + struct sock *__sk = (___sk); /* silence macro reuse warning */ \ + might_sleep(); \ + spin_lock_bh(&__sk->sk_lock.slock); \ + if (__sk->sk_lock.owned) \ + __lock_sock(__sk); \ + cb; \ + __sk->sk_lock.owned = 1; \ + spin_unlock(&__sk->sk_lock.slock); \ + mutex_acquire(&__sk->sk_lock.dep_map, 0, 0, _RET_IP_); \ + local_bh_enable(); \ +} while (0) + +#define mptcp_data_lock(sk) spin_lock_bh(&(sk)->sk_lock.slock) +#define mptcp_data_unlock(sk) spin_unlock_bh(&(sk)->sk_lock.slock) + #define mptcp_for_each_subflow(__msk, __subflow) \ list_for_each_entry(__subflow, &((__msk)->conn_list), node) @@ -300,7 +323,7 @@ static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una))) + if (!before64(msk->snd_nxt, READ_ONCE(msk->snd_una))) return NULL; return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list); @@ -474,7 +497,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk); void mptcp_data_ready(struct sock *sk, struct sock *ssk); bool mptcp_finish_join(struct sock *sk); bool mptcp_schedule_work(struct sock *sk); -void mptcp_data_acked(struct sock *sk); +void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk); +void __mptcp_data_acked(struct sock *sk); void mptcp_subflow_eof(struct sock *sk); bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit); void __mptcp_flush_join_list(struct mptcp_sock *msk); diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c index 2e5c3f4da3a4bf36db0110758e6fe4fad58c560b..023c856424b708df2016ee555d89c6afd685a79e 100644 --- a/net/mptcp/subflow.c +++ b/net/mptcp/subflow.c @@ -995,19 +995,9 @@ static void subflow_data_ready(struct sock *sk) mptcp_data_ready(parent, sk); } -static void subflow_write_space(struct sock *sk) +static void subflow_write_space(struct sock *ssk) { - struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); - struct socket *sock = READ_ONCE(sk->sk_socket); - struct sock *parent = subflow->conn; - - if (!sk_stream_is_writeable(sk)) - return; - - if (sock && sk_stream_is_writeable(parent)) - clear_bit(SOCK_NOSPACE, &sock->flags); - - sk_stream_write_space(parent); + /* we take action in __mptcp_clean_una() */ } static struct inet_connection_sock_af_ops *