提交 ae236fb2 编写于 作者: J Jon Maloy 提交者: David S. Miller

tipc: receive group membership events via member socket

Like with any other service, group members' availability can be
subscribed for by connecting to be topology server. However, because
the events arrive via a different socket than the member socket, there
is a real risk that membership events my arrive out of synch with the
actual JOIN/LEAVE action. I.e., it is possible to receive the first
messages from a new member before the corresponding JOIN event arrives,
just as it is possible to receive the last messages from a leaving
member after the LEAVE event has already been received.

Since each member socket is internally also subscribing for membership
events, we now fix this problem by passing those events on to the user
via the member socket. We leverage the already present member synch-
ronization protocol to guarantee correct message/event order. An event
is delivered to the user as an empty message where the two source
addresses identify the new/lost member. Furthermore, we set the MSG_OOB
bit in the message flags to mark it as an event. If the event is an
indication about a member loss we also set the MSG_EOR bit, so it can
be distinguished from a member addition event.
Signed-off-by: NJon Maloy <jon.maloy@ericsson.com>
Acked-by: NYing Xue <ying.xue@windriver.com>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
上级 31c82a2d
...@@ -238,6 +238,7 @@ struct sockaddr_tipc { ...@@ -238,6 +238,7 @@ struct sockaddr_tipc {
* Flag values * Flag values
*/ */
#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ #define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */
#define TIPC_GROUP_MEMBER_EVTS 0x2 /* Receive membership events in socket */
struct tipc_group_req { struct tipc_group_req {
__u32 type; /* group id */ __u32 type; /* group id */
......
...@@ -59,6 +59,7 @@ enum mbr_state { ...@@ -59,6 +59,7 @@ enum mbr_state {
struct tipc_member { struct tipc_member {
struct rb_node tree_node; struct rb_node tree_node;
struct list_head list; struct list_head list;
struct sk_buff *event_msg;
u32 node; u32 node;
u32 port; u32 port;
u32 instance; u32 instance;
...@@ -79,6 +80,7 @@ struct tipc_group { ...@@ -79,6 +80,7 @@ struct tipc_group {
u16 member_cnt; u16 member_cnt;
u16 bc_snd_nxt; u16 bc_snd_nxt;
bool loopback; bool loopback;
bool events;
}; };
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
...@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, ...@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
grp->instance = mreq->instance; grp->instance = mreq->instance;
grp->scope = mreq->scope; grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
return grp; return grp;
kfree(grp); kfree(grp);
...@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!msg_in_group(hdr)) if (!msg_in_group(hdr))
goto drop; goto drop;
if (mtyp == TIPC_GRP_MEMBER_EVT) {
if (!grp->events)
goto drop;
__skb_queue_tail(inputq, skb);
return;
}
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m)) if (!tipc_group_is_receiver(m))
goto drop; goto drop;
...@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, ...@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
} }
void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
u32 node = msg_orignode(hdr); u32 node = msg_orignode(hdr);
...@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, ...@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
/* Wait until PUBLISH event is received */ /* Wait until PUBLISH event is received */
if (m->state == MBR_DISCOVERED) if (m->state == MBR_DISCOVERED) {
m->state = MBR_JOINING; m->state = MBR_JOINING;
else if (m->state == MBR_PUBLISHED) } else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED; m->state = MBR_JOINED;
__skb_queue_tail(inputq, m->event_msg);
}
return; return;
case GRP_LEAVE_MSG: case GRP_LEAVE_MSG:
if (!m) if (!m)
...@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, ...@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
return; return;
} }
/* Otherwise deliver already received WITHDRAW event */ /* Otherwise deliver already received WITHDRAW event */
__skb_queue_tail(inputq, m->event_msg);
tipc_group_delete_member(grp, m); tipc_group_delete_member(grp, m);
return; return;
default: default:
...@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, ...@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
} }
} }
/* tipc_group_member_evt() - receive and handle a member up/down event
*/
void tipc_group_member_evt(struct tipc_group *grp, void tipc_group_member_evt(struct tipc_group *grp,
struct sk_buff *skb, struct sk_buff *skb,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr); struct tipc_event *evt = (void *)msg_data(hdr);
u32 instance = evt->found_lower;
u32 node = evt->port.node; u32 node = evt->port.node;
u32 port = evt->port.ref; u32 port = evt->port.ref;
int event = evt->event;
struct tipc_member *m; struct tipc_member *m;
struct net *net; struct net *net;
u32 self; u32 self;
...@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp,
if (!grp->loopback && node == self && port == grp->portid) if (!grp->loopback && node == self && port == grp->portid)
goto drop; goto drop;
/* Convert message before delivery to user */
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
msg_set_origport(hdr, port);
msg_set_orignode(hdr, node);
msg_set_nametype(hdr, grp->type);
msg_set_grp_evt(hdr, event);
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (evt->event == TIPC_PUBLISHED) { if (event == TIPC_PUBLISHED) {
if (!m) if (!m)
m = tipc_group_create_member(grp, node, port, m = tipc_group_create_member(grp, node, port,
MBR_DISCOVERED); MBR_DISCOVERED);
if (!m) if (!m)
goto drop; goto drop;
/* Wait if JOIN message not yet received */ /* Hold back event if JOIN message not yet received */
if (m->state == MBR_DISCOVERED) if (m->state == MBR_DISCOVERED) {
m->event_msg = skb;
m->state = MBR_PUBLISHED; m->state = MBR_PUBLISHED;
else } else {
__skb_queue_tail(inputq, skb);
m->state = MBR_JOINED; m->state = MBR_JOINED;
m->instance = evt->found_lower; }
m->instance = instance;
TIPC_SKB_CB(skb)->orig_member = m->instance;
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
} else if (evt->event == TIPC_WITHDRAWN) { } else if (event == TIPC_WITHDRAWN) {
if (!m) if (!m)
goto drop; goto drop;
/* Keep back event if more messages might be expected */ TIPC_SKB_CB(skb)->orig_member = m->instance;
if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
/* Hold back event if more messages might be expected */
if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
m->event_msg = skb;
m->state = MBR_LEAVING; m->state = MBR_LEAVING;
else } else {
__skb_queue_tail(inputq, skb);
tipc_group_delete_member(grp, m); tipc_group_delete_member(grp, m);
}
} }
return;
drop: drop:
kfree_skb(skb); kfree_skb(skb);
} }
...@@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, ...@@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_group_member_evt(struct tipc_group *grp, void tipc_group_member_evt(struct tipc_group *grp,
struct sk_buff *skb, struct sk_buff *skb,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_group_proto_rcv(struct tipc_group *grp, void tipc_group_proto_rcv(struct tipc_group *grp,
struct tipc_msg *hdr, struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_group_update_bc_members(struct tipc_group *grp); void tipc_group_update_bc_members(struct tipc_group *grp);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
......
...@@ -65,7 +65,8 @@ struct plist; ...@@ -65,7 +65,8 @@ struct plist;
#define TIPC_MCAST_MSG 1 #define TIPC_MCAST_MSG 1
#define TIPC_NAMED_MSG 2 #define TIPC_NAMED_MSG 2
#define TIPC_DIRECT_MSG 3 #define TIPC_DIRECT_MSG 3
#define TIPC_GRP_BCAST_MSG 4 #define TIPC_GRP_MEMBER_EVT 4
#define TIPC_GRP_BCAST_MSG 5
/* /*
* Internal message users * Internal message users
...@@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) ...@@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
static inline int msg_in_group(struct tipc_msg *m) static inline int msg_in_group(struct tipc_msg *m)
{ {
return (msg_type(m) == TIPC_GRP_BCAST_MSG); int mtyp = msg_type(m);
return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
}
static inline bool msg_is_grp_evt(struct tipc_msg *m)
{
return msg_type(m) == TIPC_GRP_MEMBER_EVT;
} }
static inline u32 msg_named(struct tipc_msg *m) static inline u32 msg_named(struct tipc_msg *m)
...@@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) ...@@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
/* Word 10 /* Word 10
*/ */
static inline u16 msg_grp_evt(struct tipc_msg *m)
{
return msg_bits(m, 10, 0, 0x3);
}
static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
{
msg_set_bits(m, 10, 0, 0x3, n);
}
static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
{ {
return msg_bits(m, 10, 16, 0xffff); return msg_bits(m, 10, 16, 0xffff);
......
...@@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, ...@@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
poll_table *wait) poll_table *wait)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group; struct tipc_group *grp = tsk->group;
u32 mask = 0; u32 revents = 0;
sock_poll_wait(file, sk_sleep(sk), wait); sock_poll_wait(file, sk_sleep(sk), wait);
if (sk->sk_shutdown & RCV_SHUTDOWN) if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLRDHUP | POLLIN | POLLRDNORM; revents |= POLLRDHUP | POLLIN | POLLRDNORM;
if (sk->sk_shutdown == SHUTDOWN_MASK) if (sk->sk_shutdown == SHUTDOWN_MASK)
mask |= POLLHUP; revents |= POLLHUP;
switch (sk->sk_state) { switch (sk->sk_state) {
case TIPC_ESTABLISHED: case TIPC_ESTABLISHED:
if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
mask |= POLLOUT; revents |= POLLOUT;
/* fall thru' */ /* fall thru' */
case TIPC_LISTEN: case TIPC_LISTEN:
case TIPC_CONNECTING: case TIPC_CONNECTING:
if (!skb_queue_empty(&sk->sk_receive_queue)) if (skb)
mask |= (POLLIN | POLLRDNORM); revents |= POLLIN | POLLRDNORM;
break; break;
case TIPC_OPEN: case TIPC_OPEN:
if (!grp || tipc_group_size(grp)) if (!grp || tipc_group_size(grp))
if (!tsk->cong_link_cnt) if (!tsk->cong_link_cnt)
mask |= POLLOUT; revents |= POLLOUT;
if (tipc_sk_type_connectionless(sk) && if (!tipc_sk_type_connectionless(sk))
(!skb_queue_empty(&sk->sk_receive_queue))) break;
mask |= (POLLIN | POLLRDNORM); if (!skb)
break;
revents |= POLLIN | POLLRDNORM;
break; break;
case TIPC_DISCONNECTING: case TIPC_DISCONNECTING:
mask = (POLLIN | POLLRDNORM | POLLHUP); revents = POLLIN | POLLRDNORM | POLLHUP;
break; break;
} }
return revents;
return mask;
} }
/** /**
...@@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, ...@@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
size_t buflen, int flags) size_t buflen, int flags)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct sk_buff *skb;
struct tipc_msg *hdr;
bool connected = !tipc_sk_type_connectionless(sk); bool connected = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy; int rc, err, hlen, dlen, copy;
struct tipc_msg *hdr;
struct sk_buff *skb;
bool grp_evt;
long timeout; long timeout;
/* Catch invalid receive requests */ /* Catch invalid receive requests */
...@@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, ...@@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
dlen = msg_data_sz(hdr); dlen = msg_data_sz(hdr);
hlen = msg_hdr_sz(hdr); hlen = msg_hdr_sz(hdr);
err = msg_errcode(hdr); err = msg_errcode(hdr);
grp_evt = msg_is_grp_evt(hdr);
if (likely(dlen || err)) if (likely(dlen || err))
break; break;
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
...@@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, ...@@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
if (unlikely(rc)) if (unlikely(rc))
goto exit; goto exit;
/* Mark message as group event if applicable */
if (unlikely(grp_evt)) {
if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
m->msg_flags |= MSG_EOR;
m->msg_flags |= MSG_OOB;
copy = 0;
}
/* Caption of data or error code/rejected data was successful */ /* Caption of data or error code/rejected data was successful */
if (unlikely(flags & MSG_PEEK)) if (unlikely(flags & MSG_PEEK))
goto exit; goto exit;
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
if (likely(!connected)) if (likely(!connected))
goto exit; goto exit;
...@@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk, ...@@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk,
sk->sk_write_space(sk); sk->sk_write_space(sk);
break; break;
case GROUP_PROTOCOL: case GROUP_PROTOCOL:
tipc_group_proto_rcv(grp, hdr, xmitq); tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
break; break;
case TOP_SRV: case TOP_SRV:
tipc_group_member_evt(tsk->group, skb, xmitq); tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
skb = NULL; skb = NULL;
break; break;
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册