提交 30e22a6e 编写于 作者: T Taehee Yoo 提交者: Paolo Abeni

amt: use workqueue for gateway side message handling

There are some synchronization issues(amt->status, amt->req_cnt, etc)
if the interface is in gateway mode because gateway message handlers
are processed concurrently.
This applies a work queue for processing these messages instead of
expanding the locking context.

So, the purposes of this patch are to fix exist race conditions and to make
gateway to be able to validate a gateway status more correctly.

When the AMT gateway interface is created, it tries to establish to relay.
The establishment step looks stateless, but it should be managed well.
In order to handle messages in the gateway, it saves the current
status(i.e. AMT_STATUS_XXX).
This patch makes gateway code to be worked with a single thread.

Now, all messages except the multicast are triggered(received or
delay expired), and these messages will be stored in the event
queue(amt->events).
Then, the single worker processes stored messages asynchronously one
by one.
The multicast data message type will be still processed immediately.

Now, amt->lock is only needed to access the event queue(amt->events)
if an interface is the gateway mode.

Fixes: cbc21dc1 ("amt: add data plane of amt interface")
Signed-off-by: NTaehee Yoo <ap420073@gmail.com>
Signed-off-by: NPaolo Abeni <pabeni@redhat.com>
上级 1774559f
......@@ -900,6 +900,28 @@ static void amt_send_mld_gq(struct amt_dev *amt, struct amt_tunnel_list *tunnel)
}
#endif
static bool amt_queue_event(struct amt_dev *amt, enum amt_event event,
struct sk_buff *skb)
{
int index;
spin_lock_bh(&amt->lock);
if (amt->nr_events >= AMT_MAX_EVENTS) {
spin_unlock_bh(&amt->lock);
return 1;
}
index = (amt->event_idx + amt->nr_events) % AMT_MAX_EVENTS;
amt->events[index].event = event;
amt->events[index].skb = skb;
amt->nr_events++;
amt->event_idx %= AMT_MAX_EVENTS;
queue_work(amt_wq, &amt->event_wq);
spin_unlock_bh(&amt->lock);
return 0;
}
static void amt_secret_work(struct work_struct *work)
{
struct amt_dev *amt = container_of(to_delayed_work(work),
......@@ -913,12 +935,8 @@ static void amt_secret_work(struct work_struct *work)
msecs_to_jiffies(AMT_SECRET_TIMEOUT));
}
static void amt_discovery_work(struct work_struct *work)
static void amt_event_send_discovery(struct amt_dev *amt)
{
struct amt_dev *amt = container_of(to_delayed_work(work),
struct amt_dev,
discovery_wq);
spin_lock_bh(&amt->lock);
if (amt->status > AMT_STATUS_SENT_DISCOVERY)
goto out;
......@@ -933,11 +951,19 @@ static void amt_discovery_work(struct work_struct *work)
spin_unlock_bh(&amt->lock);
}
static void amt_req_work(struct work_struct *work)
static void amt_discovery_work(struct work_struct *work)
{
struct amt_dev *amt = container_of(to_delayed_work(work),
struct amt_dev,
req_wq);
discovery_wq);
if (amt_queue_event(amt, AMT_EVENT_SEND_DISCOVERY, NULL))
mod_delayed_work(amt_wq, &amt->discovery_wq,
msecs_to_jiffies(AMT_DISCOVERY_TIMEOUT));
}
static void amt_event_send_request(struct amt_dev *amt)
{
u32 exp;
spin_lock_bh(&amt->lock);
......@@ -967,6 +993,17 @@ static void amt_req_work(struct work_struct *work)
spin_unlock_bh(&amt->lock);
}
static void amt_req_work(struct work_struct *work)
{
struct amt_dev *amt = container_of(to_delayed_work(work),
struct amt_dev,
req_wq);
if (amt_queue_event(amt, AMT_EVENT_SEND_REQUEST, NULL))
mod_delayed_work(amt_wq, &amt->req_wq,
msecs_to_jiffies(100));
}
static bool amt_send_membership_update(struct amt_dev *amt,
struct sk_buff *skb,
bool v6)
......@@ -2392,12 +2429,14 @@ static bool amt_membership_query_handler(struct amt_dev *amt,
skb->pkt_type = PACKET_MULTICAST;
skb->ip_summed = CHECKSUM_NONE;
len = skb->len;
local_bh_disable();
if (__netif_rx(skb) == NET_RX_SUCCESS) {
amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
dev_sw_netstats_rx_add(amt->dev, len);
} else {
amt->dev->stats.rx_dropped++;
}
local_bh_enable();
return false;
}
......@@ -2688,6 +2727,38 @@ static bool amt_request_handler(struct amt_dev *amt, struct sk_buff *skb)
return false;
}
static void amt_gw_rcv(struct amt_dev *amt, struct sk_buff *skb)
{
int type = amt_parse_type(skb);
int err = 1;
if (type == -1)
goto drop;
if (amt->mode == AMT_MODE_GATEWAY) {
switch (type) {
case AMT_MSG_ADVERTISEMENT:
err = amt_advertisement_handler(amt, skb);
break;
case AMT_MSG_MEMBERSHIP_QUERY:
err = amt_membership_query_handler(amt, skb);
if (!err)
return;
break;
default:
netdev_dbg(amt->dev, "Invalid type of Gateway\n");
break;
}
}
drop:
if (err) {
amt->dev->stats.rx_dropped++;
kfree_skb(skb);
} else {
consume_skb(skb);
}
}
static int amt_rcv(struct sock *sk, struct sk_buff *skb)
{
struct amt_dev *amt;
......@@ -2719,8 +2790,12 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
err = true;
goto drop;
}
err = amt_advertisement_handler(amt, skb);
break;
if (amt_queue_event(amt, AMT_EVENT_RECEIVE, skb)) {
netdev_dbg(amt->dev, "AMT Event queue full\n");
err = true;
goto drop;
}
goto out;
case AMT_MSG_MULTICAST_DATA:
if (iph->saddr != amt->remote_ip) {
netdev_dbg(amt->dev, "Invalid Relay IP\n");
......@@ -2738,11 +2813,12 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
err = true;
goto drop;
}
err = amt_membership_query_handler(amt, skb);
if (err)
if (amt_queue_event(amt, AMT_EVENT_RECEIVE, skb)) {
netdev_dbg(amt->dev, "AMT Event queue full\n");
err = true;
goto drop;
else
goto out;
}
goto out;
default:
err = true;
netdev_dbg(amt->dev, "Invalid type of Gateway\n");
......@@ -2780,6 +2856,46 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
return 0;
}
static void amt_event_work(struct work_struct *work)
{
struct amt_dev *amt = container_of(work, struct amt_dev, event_wq);
struct sk_buff *skb;
u8 event;
int i;
for (i = 0; i < AMT_MAX_EVENTS; i++) {
spin_lock_bh(&amt->lock);
if (amt->nr_events == 0) {
spin_unlock_bh(&amt->lock);
return;
}
event = amt->events[amt->event_idx].event;
skb = amt->events[amt->event_idx].skb;
amt->events[amt->event_idx].event = AMT_EVENT_NONE;
amt->events[amt->event_idx].skb = NULL;
amt->nr_events--;
amt->event_idx++;
amt->event_idx %= AMT_MAX_EVENTS;
spin_unlock_bh(&amt->lock);
switch (event) {
case AMT_EVENT_RECEIVE:
amt_gw_rcv(amt, skb);
break;
case AMT_EVENT_SEND_DISCOVERY:
amt_event_send_discovery(amt);
break;
case AMT_EVENT_SEND_REQUEST:
amt_event_send_request(amt);
break;
default:
if (skb)
kfree_skb(skb);
break;
}
}
}
static int amt_err_lookup(struct sock *sk, struct sk_buff *skb)
{
struct amt_dev *amt;
......@@ -2867,6 +2983,8 @@ static int amt_dev_open(struct net_device *dev)
amt->ready4 = false;
amt->ready6 = false;
amt->event_idx = 0;
amt->nr_events = 0;
err = amt_socket_create(amt);
if (err)
......@@ -2892,6 +3010,8 @@ static int amt_dev_stop(struct net_device *dev)
struct amt_dev *amt = netdev_priv(dev);
struct amt_tunnel_list *tunnel, *tmp;
struct socket *sock;
struct sk_buff *skb;
int i;
cancel_delayed_work_sync(&amt->req_wq);
cancel_delayed_work_sync(&amt->discovery_wq);
......@@ -2904,6 +3024,15 @@ static int amt_dev_stop(struct net_device *dev)
if (sock)
udp_tunnel_sock_release(sock);
cancel_work_sync(&amt->event_wq);
for (i = 0; i < AMT_MAX_EVENTS; i++) {
skb = amt->events[i].skb;
if (skb)
kfree_skb(skb);
amt->events[i].event = AMT_EVENT_NONE;
amt->events[i].skb = NULL;
}
amt->ready4 = false;
amt->ready6 = false;
amt->req_cnt = 0;
......@@ -3146,8 +3275,8 @@ static int amt_newlink(struct net *net, struct net_device *dev,
INIT_DELAYED_WORK(&amt->discovery_wq, amt_discovery_work);
INIT_DELAYED_WORK(&amt->req_wq, amt_req_work);
INIT_DELAYED_WORK(&amt->secret_wq, amt_secret_work);
INIT_WORK(&amt->event_wq, amt_event_work);
INIT_LIST_HEAD(&amt->tunnel_list);
return 0;
err:
dev_put(amt->stream_dev);
......@@ -3280,7 +3409,7 @@ static int __init amt_init(void)
if (err < 0)
goto unregister_notifier;
amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 1);
amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 0);
if (!amt_wq) {
err = -ENOMEM;
goto rtnl_unregister;
......
......@@ -78,6 +78,15 @@ enum amt_status {
#define AMT_STATUS_MAX (__AMT_STATUS_MAX - 1)
/* Gateway events only */
enum amt_event {
AMT_EVENT_NONE,
AMT_EVENT_RECEIVE,
AMT_EVENT_SEND_DISCOVERY,
AMT_EVENT_SEND_REQUEST,
__AMT_EVENT_MAX,
};
struct amt_header {
#if defined(__LITTLE_ENDIAN_BITFIELD)
u8 type:4,
......@@ -292,6 +301,12 @@ struct amt_group_node {
struct hlist_head sources[];
};
#define AMT_MAX_EVENTS 16
struct amt_events {
enum amt_event event;
struct sk_buff *skb;
};
struct amt_dev {
struct net_device *dev;
struct net_device *stream_dev;
......@@ -308,6 +323,7 @@ struct amt_dev {
struct delayed_work req_wq;
/* Protected by RTNL */
struct delayed_work secret_wq;
struct work_struct event_wq;
/* AMT status */
enum amt_status status;
/* Generated key */
......@@ -345,6 +361,10 @@ struct amt_dev {
/* Used only in gateway mode */
u64 mac:48,
reserved:16;
/* AMT gateway side message handler queue */
struct amt_events events[AMT_MAX_EVENTS];
u8 event_idx;
u8 nr_events;
};
#define AMT_TOS 0xc0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册