diff --git a/net/tipc/link.c b/net/tipc/link.c index 96a8072f73cccb078c200fd01990ed8bfcbaf843..a081e7d08d22aefec08f2e661cbdee29c03e1502 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; spin_lock_bh(&tipc_port_list_lock); p_ptr = tipc_port_lock(origport); if (p_ptr) { if (!list_empty(&p_ptr->wait_list)) goto exit; - p_ptr->congested = 1; + tsk = tipc_port_to_sock(p_ptr); + tsk->link_cong = 1; p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); l_ptr->stats.link_congs++; @@ -352,6 +354,7 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; struct tipc_port *temp_p_ptr; int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; @@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) wait_list) { if (win <= 0) break; + tsk = tipc_port_to_sock(p_ptr); list_del_init(&p_ptr->wait_list); spin_lock_bh(p_ptr->lock); - p_ptr->congested = 0; - tipc_port_wakeup(p_ptr); + tsk->link_cong = 0; + tipc_sock_wakeup(tsk); win -= p_ptr->waiting_pkts; spin_unlock_bh(p_ptr->lock); } diff --git a/net/tipc/port.c b/net/tipc/port.c index 9f53d5ac35e169db2c692b3008314c7987f6ea1d..0d09dcb6da182ff281659b1b14cc098436a55dc2 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -186,12 +186,6 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) tipc_port_list_free(dp); } - -void tipc_port_wakeup(struct tipc_port *port) -{ - tipc_sock_wakeup(tipc_port_to_sock(port)); -} - /* tipc_port_init - intiate TIPC port and lock it * * Returns obtained reference if initialization is successful, zero otherwise @@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr, } p_ptr->max_pkt = MAX_PKT_DEFAULT; - p_ptr->sent = 1; p_ptr->ref = ref; INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); @@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack) p_ptr = tipc_port_lock(ref); if (!p_ptr) return; - if (p_ptr->connected) { - p_ptr->conn_unacked -= ack; + if (p_ptr->connected) buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); - } + tipc_port_unlock(p_ptr); if (!buf) return; diff --git a/net/tipc/port.h b/net/tipc/port.h index 3a4f808135c3041ed1ab7f12f2dcc74d35a7d981..0e47052daa29ea85f818d3bc3950ae123896349e 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -53,17 +53,13 @@ * @connected: non-zero if port is currently connected to a peer port * @conn_type: TIPC type used when connection was established * @conn_instance: TIPC instance used when connection was established - * @conn_unacked: number of unacknowledged messages received from peer port * @published: non-zero if port has one or more associated names - * @congested: non-zero if cannot send because of link or port congestion * @max_pkt: maximum packet size "hint" used when building messages sent by port * @ref: unique reference to port in TIPC object registry * @phdr: preformatted message header used when sending messages * @port_list: adjacent ports in TIPC's global list of ports * @wait_list: adjacent ports in list of ports waiting on link congestion * @waiting_pkts: - * @sent: # of non-empty messages sent by port - * @acked: # of non-empty message acknowledgements from connected port's peer * @publications: list of publications for port * @pub_count: total # of publications port has made during its lifetime * @probing_state: @@ -76,17 +72,13 @@ struct tipc_port { int connected; u32 conn_type; u32 conn_instance; - u32 conn_unacked; int published; - u32 congested; u32 max_pkt; u32 ref; struct tipc_msg phdr; struct list_head port_list; struct list_head wait_list; u32 waiting_pkts; - u32 sent; - u32 acked; struct list_head publications; u32 pub_count; u32 probing_state; @@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref); int tipc_port_shutdown(u32 ref); -void tipc_port_wakeup(struct tipc_port *port); - /* * The following routines require that the port be locked on entry */ @@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) spin_unlock_bh(p_ptr->lock); } -static inline int tipc_port_congested(struct tipc_port *p_ptr) -{ - return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN); -} - - static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) { return msg_destnode(&p_ptr->phdr); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1762323156af262cb9cfdbd10d7c33fe51249288..ede78b144dcfee5a37ca900c8581612bd890319e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; - tsk->port.sent = 0; + tsk->sent_unacked = 0; atomic_set(&tsk->dupl_rcvcnt, 0); tipc_port_unlock(port); @@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch ((int)sock->state) { case SS_UNCONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong) mask |= POLLOUT; break; case SS_READY: case SS_CONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong && !tipc_sk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case SS_CONNECTING: @@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); struct tipc_port *port = &tsk->port; - int wakeable; + int conn_cong; /* Ignore if connection cannot be validated: */ if (!port->connected || !tipc_port_peer_msg(port, msg)) @@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf) port->probing_state = TIPC_CONN_OK; if (msg_type(msg) == CONN_ACK) { - wakeable = tipc_port_congested(port) && port->congested; - port->acked += msg_msgcnt(msg); - if (!tipc_port_congested(port)) { - port->congested = 0; - if (wakeable) - tipc_port_wakeup(port); - } + conn_cong = tipc_sk_conn_cong(tsk); + tsk->sent_unacked -= msg_msgcnt(msg); + if (conn_cong) + tipc_sock_wakeup(tsk); } else if (msg_type(msg) == CONN_PROBE) { if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) return TIPC_OK; @@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) return sock_intr_errno(*timeo_p); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); - done = sk_wait_event(sk, timeo_p, !tsk->port.congested); + done = sk_wait_event(sk, timeo_p, !tsk->link_cong); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; @@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; DEFINE_WAIT(wait); int done; @@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); done = sk_wait_event(sk, timeo_p, - (!port->congested || !port->connected)); + (!tsk->link_cong && + !tipc_sk_conn_cong(tsk)) || + !tsk->port.connected); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; @@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, if (unlikely(dest)) { rc = tipc_sendmsg(iocb, sock, m, dsz); if (dsz && (dsz == rc)) - tsk->port.sent = 1; + tsk->sent_unacked = 1; return rc; } if (dsz > (uint)INT_MAX) @@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); dnode = tipc_port_peernode(port); - port->congested = 1; next: mtu = port->max_pkt; @@ -884,11 +881,10 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, if (unlikely(rc < 0)) goto exit; do { - port->congested = 1; - if (likely(!tipc_port_congested(port))) { + if (likely(!tipc_sk_conn_cong(tsk))) { rc = tipc_link_xmit2(buf, dnode, ref); if (likely(!rc)) { - port->sent++; + tsk->sent_unacked++; sent += send; if (sent == dsz) break; @@ -903,8 +899,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, } rc = tipc_wait_for_sndpkt(sock, &timeo); } while (!rc); - - port->congested = 0; exit: if (iocb) release_sock(sk); @@ -1169,8 +1163,10 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); + (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_acknowledge(port->ref, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } advance_rx_queue(sk); } exit: @@ -1278,8 +1274,10 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); + if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_acknowledge(port->ref, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } advance_rx_queue(sk); } diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 69fd06bce68a40393bdfddf356a65c48424e8772..2cdede9eda1ba48bf9b4da79ec3b03a8c0b859c3 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -48,6 +48,9 @@ * @peer_name: the peer of the connection, if any * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue + * @link_cong: non-zero if owner must sleep because of link congestion + * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @rcv_unacked: # messages read by user, but not yet acked back to peer */ struct tipc_sock { @@ -55,6 +58,9 @@ struct tipc_sock { struct tipc_port port; unsigned int conn_timeout; atomic_t dupl_rcvcnt; + int link_cong; + uint sent_unacked; + uint rcv_unacked; }; static inline struct tipc_sock *tipc_sk(const struct sock *sk) @@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) tsk->sk.sk_write_space(&tsk->sk); } +static inline int tipc_sk_conn_cong(struct tipc_sock *tsk) +{ + return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; +} + int tipc_sk_rcv(struct sk_buff *buf); #endif