提交 f0f5c28e 编写于 作者: G Guillaume Nault 提交者: Dmitry Kozlov

l2tp: separate send and retransmission queues

Don't send messages immediately, store them in conn->send_queue
instead, so we have control over how many and when messages are
sent on the network. Once a message is sent, it's removed from
the send queue and added to the retransmission queue.

Retransmission queue is automatically updated based on acknowledgements
received from peer.

For now, packets in the send queue are pushed on the network after
each incoming packet processing. So functions called by l2tp_conn_read()
don't have to call l2tp_tunnel_push_sendqueue().
Other functions (e.g. triton callbacks) have to manually push packets
out of the send queue.
The same applies for disconnection. The l2tp_tunnel_disconnect_push()
and l2tp_session_disconnect_push() functions have been defined for use
in functions that don't call l2tp_tunnel_push_sendqueue() automatically.
Signed-off-by: NGuillaume Nault <g.nault@alphalink.fr>
上级 c924d549
......@@ -141,7 +141,9 @@ struct l2tp_conn_t
int retransmit;
uint16_t Ns, Nr;
uint16_t peer_Nr;
struct list_head send_queue;
struct list_head rtms_queue;
unsigned int ref_count;
int state;
......@@ -158,10 +160,6 @@ static mempool_t l2tp_sess_pool;
static void l2tp_tunnel_timeout(struct triton_timer_t *t);
static void l2tp_rtimeout(struct triton_timer_t *t);
static void l2tp_send_HELLO(struct triton_timer_t *t);
static int l2tp_tunnel_send(struct l2tp_conn_t *conn,
struct l2tp_packet_t *pack);
static int l2tp_session_send(struct l2tp_sess_t *sess,
struct l2tp_packet_t *pack);
static int l2tp_conn_read(struct triton_md_handler_t *);
static void l2tp_session_free(struct l2tp_sess_t *sess);
static void l2tp_tunnel_free(struct l2tp_conn_t *conn);
......@@ -435,6 +433,173 @@ static int l2tp_tunnel_checkchallresp(uint8_t msgident,
return 0;
}
static int __l2tp_tunnel_send(const struct l2tp_conn_t *conn,
struct l2tp_packet_t *pack)
{
const struct l2tp_attr_t *msg_type;
void (*log_func)(const char *fmt, ...);
pack->hdr.Nr = htons(conn->Nr);
if (conf_verbose) {
if (l2tp_packet_is_ZLB(pack)) {
log_func = log_debug;
} else {
msg_type = list_first_entry(&pack->attrs,
typeof(*msg_type), entry);
if (msg_type->val.uint16 == Message_Type_Hello)
log_func = log_debug;
else
log_func = log_info2;
}
log_tunnel(log_func, conn, "send ");
l2tp_packet_print(pack, log_func);
}
return l2tp_packet_send(conn->hnd.fd, pack);
}
/* Drop acknowledged packets from tunnel's retransmission queue */
static int l2tp_tunnel_clean_rtmsqueue(struct l2tp_conn_t *conn)
{
struct l2tp_packet_t *pack;
unsigned int pkt_freed = 0;
while (!list_empty(&conn->rtms_queue)) {
pack = list_first_entry(&conn->rtms_queue, typeof(*pack),
entry);
if (nsnr_cmp(ntohs(pack->hdr.Ns), conn->peer_Nr) >= 0)
break;
list_del(&pack->entry);
l2tp_packet_free(pack);
++pkt_freed;
}
log_tunnel(log_debug, conn, "%u message%s acked by peer\n", pkt_freed,
pkt_freed > 1 ? "s" : "");
if (pkt_freed == 0)
return 0;
/* Oldest message from retransmission queue has been acknowledged,
* reset retransmission counter and timer.
*/
conn->retransmit = 0;
/* Stop timer if retransmission queue is empty */
if (list_empty(&conn->rtms_queue)) {
if (conn->rtimeout_timer.tpd)
triton_timer_del(&conn->rtimeout_timer);
return 0;
}
/* Some messages haven't been acknowledged yet, restart timer */
if (conn->rtimeout_timer.tpd) {
if (triton_timer_mod(&conn->rtimeout_timer, 0) < 0) {
log_tunnel(log_error, conn,
"impossible to clean retransmission queue:"
" updating retransmission timer failed\n");
return -1;
}
} else {
if (triton_timer_add(&conn->ctx,
&conn->rtimeout_timer, 0) < 0) {
log_tunnel(log_error, conn,
"impossible to clean retransmission queue:"
" starting retransmission timer failed\n");
return -1;
}
}
return 0;
}
static int l2tp_tunnel_push_sendqueue(struct l2tp_conn_t *conn)
{
struct l2tp_packet_t *pack;
unsigned int pkt_sent = 0;
while (!list_empty(&conn->send_queue)) {
pack = list_first_entry(&conn->send_queue, typeof(*pack),
entry);
pack->hdr.Ns = htons(conn->Ns);
if (__l2tp_tunnel_send(conn, pack) < 0) {
log_tunnel(log_error, conn,
"impossible to process the send queue:"
" sending packet %hu failed\n", conn->Ns);
return -1;
}
list_move_tail(&pack->entry, &conn->rtms_queue);
++conn->Ns;
++pkt_sent;
}
log_tunnel(log_debug, conn, "%u message%s sent from send queue\n",
pkt_sent, pkt_sent > 1 ? "s" : "");
if (pkt_sent == 0)
return 0;
/* At least one message sent, restart retransmission timer if necessary
* (timer may be stopped, e.g. because there was no message left in the
* retransmission queue).
*/
if (conn->rtimeout_timer.tpd == NULL) {
if (triton_timer_add(&conn->ctx,
&conn->rtimeout_timer, 0) < 0) {
log_tunnel(log_error, conn,
"impossible to process the send queue:"
" setting retransmission timer failed\n");
return -1;
}
}
return 0;
}
static int l2tp_tunnel_send(struct l2tp_conn_t *conn,
struct l2tp_packet_t *pack)
{
if (conn->state == STATE_FIN || conn->state == STATE_CLOSE) {
log_tunnel(log_info2, conn,
"discarding outgoing message, tunnel is closing\n");
l2tp_packet_free(pack);
return -1;
}
pack->hdr.tid = htons(conn->peer_tid);
list_add_tail(&pack->entry, &conn->send_queue);
return 0;
}
static int l2tp_session_send(struct l2tp_sess_t *sess,
struct l2tp_packet_t *pack)
{
if (sess->state1 == STATE_CLOSE) {
log_session(log_info2, sess,
"discarding outgoing message,"
" session is closing\n");
l2tp_packet_free(pack);
return -1;
}
pack->hdr.sid = htons(sess->peer_sid);
return l2tp_tunnel_send(sess->paren_conn, pack);
}
static int l2tp_send_StopCCN(struct l2tp_conn_t *conn,
uint16_t res, uint16_t err)
{
......@@ -630,6 +795,25 @@ static int l2tp_tunnel_disconnect(struct l2tp_conn_t *conn,
return 0;
}
static int l2tp_tunnel_disconnect_push(struct l2tp_conn_t *conn,
uint16_t res, uint16_t err)
{
if (l2tp_tunnel_disconnect(conn, res, err) < 0)
return -1;
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn,
"impossible to notify peer of tunnel disconnection:"
" transmitting messages from send queue failed,"
" deleting tunnel anyway\n");
l2tp_tunnel_free(conn);
return -1;
}
return 0;
}
static void __tunnel_destroy(struct l2tp_conn_t *conn)
{
pthread_mutex_destroy(&conn->ctx_lock);
......@@ -770,7 +954,7 @@ static void l2tp_session_free(struct l2tp_sess_t *sess)
case STATE_ESTB:
log_tunnel(log_info1, sess->paren_conn,
"no more session, disconnecting tunnel\n");
l2tp_tunnel_disconnect(sess->paren_conn, 1, 0);
l2tp_tunnel_disconnect_push(sess->paren_conn, 1, 0);
break;
case STATE_FIN:
case STATE_CLOSE:
......@@ -837,6 +1021,12 @@ static void l2tp_tunnel_free(struct l2tp_conn_t *conn)
if (conn->hello_timer.tpd)
triton_timer_del(&conn->hello_timer);
while (!list_empty(&conn->rtms_queue)) {
pack = list_first_entry(&conn->rtms_queue, typeof(*pack),
entry);
list_del(&pack->entry);
l2tp_packet_free(pack);
}
while (!list_empty(&conn->send_queue)) {
pack = list_entry(conn->send_queue.next, typeof(*pack), entry);
list_del(&pack->entry);
......@@ -866,6 +1056,22 @@ static void l2tp_session_disconnect(struct l2tp_sess_t *sess,
l2tp_session_free(sess);
}
static void l2tp_session_disconnect_push(struct l2tp_sess_t *sess,
uint16_t res, uint16_t err)
{
if (l2tp_send_CDN(sess, res, err) < 0)
log_session(log_error, sess,
"impossible to notify peer of session disconnection,"
" sending CDN failed, deleting session anyway\n");
else if (l2tp_tunnel_push_sendqueue(sess->paren_conn) < 0)
log_session(log_error, sess,
"impossible to notify peer of session disconnection:"
" transmitting messages from send queue failed,"
" deleting session anyway\n");
l2tp_session_free(sess);
}
static void l2tp_session_apses_finished(void *data)
{
struct l2tp_conn_t *conn = l2tp_tunnel_self();
......@@ -884,7 +1090,7 @@ static void l2tp_session_apses_finished(void *data)
if (sess->state1 == STATE_ESTB) {
log_session(log_info1, sess,
"data channel closed, disconnecting session\n");
l2tp_session_disconnect(sess, 2, 0);
l2tp_session_disconnect_push(sess, 2, 0);
} else {
log_session(log_warn, sess,
"avoiding disconnection of session with no data channel:"
......@@ -1070,7 +1276,7 @@ static void l2tp_session_timeout(struct triton_timer_t *t)
triton_timer_del(t);
log_session(log_info1, sess, "session establishment timeout,"
" disconnecting session\n");
l2tp_session_disconnect(sess, 10, 0);
l2tp_session_disconnect_push(sess, 10, 0);
}
static struct l2tp_sess_t *l2tp_tunnel_new_session(struct l2tp_conn_t *conn)
......@@ -1167,7 +1373,7 @@ static void l2tp_conn_close(struct triton_context_t *ctx)
log_tunnel(log_info1, conn, "context thread is closing,"
" disconnecting tunnel\n");
l2tp_tunnel_disconnect(conn, 0, 0);
l2tp_tunnel_disconnect_push(conn, 0, 0);
}
static int l2tp_tunnel_start(struct l2tp_conn_t *conn,
......@@ -1231,6 +1437,7 @@ static struct l2tp_conn_t *l2tp_tunnel_alloc(const struct sockaddr_in *peer,
memset(conn, 0, sizeof(*conn));
pthread_mutex_init(&conn->ctx_lock, NULL);
INIT_LIST_HEAD(&conn->send_queue);
INIT_LIST_HEAD(&conn->rtms_queue);
conn->hnd.fd = socket(PF_INET, SOCK_DGRAM, 0);
if (conn->hnd.fd < 0) {
......@@ -1671,15 +1878,14 @@ static int l2tp_retransmit(struct l2tp_conn_t *conn)
{
struct l2tp_packet_t *pack;
pack = list_entry(conn->send_queue.next, typeof(*pack), entry);
pack->hdr.Nr = htons(conn->Nr);
pack = list_first_entry(&conn->rtms_queue, typeof(*pack), entry);
log_tunnel(log_info2, conn, "retransmitting packet %hu\n",
ntohs(pack->hdr.Ns));
if (conf_verbose) {
log_tunnel(log_info2, conn, "retransmit (duplicate) ");
l2tp_packet_print(pack, log_info2);
}
if (l2tp_packet_send(conn->hnd.fd, pack) < 0) {
if (__l2tp_tunnel_send(conn, pack) < 0) {
log_tunnel(log_error, conn,
"packet retransmission failure\n");
return -1;
......@@ -1693,10 +1899,10 @@ static void l2tp_rtimeout(struct triton_timer_t *t)
struct l2tp_conn_t *conn = container_of(t, typeof(*conn), rtimeout_timer);
struct l2tp_packet_t *pack;
if (!list_empty(&conn->send_queue)) {
if (!list_empty(&conn->rtms_queue)) {
if (++conn->retransmit <= conf_retransmit) {
pack = list_entry(conn->send_queue.next, typeof(*pack), entry);
pack->hdr.Nr = htons(conn->Nr);
pack = list_first_entry(&conn->rtms_queue,
typeof(*pack), entry);
log_tunnel(log_info2, conn,
"retransmission %i (packet %hu)\n",
conn->retransmit, ntohs(pack->hdr.Ns));
......@@ -1705,7 +1911,7 @@ static void l2tp_rtimeout(struct triton_timer_t *t)
"retransmit (timeout) ");
l2tp_packet_print(pack, log_info2);
}
if (l2tp_packet_send(conn->hnd.fd, pack) < 0)
if (__l2tp_tunnel_send(conn, pack) < 0)
log_tunnel(log_error, conn,
"packet retransmission failure\n");
} else {
......@@ -1724,72 +1930,13 @@ static void l2tp_tunnel_timeout(struct triton_timer_t *t)
triton_timer_del(t);
log_tunnel(log_info1, conn, "tunnel establishment timeout,"
" disconnecting tunnel\n");
l2tp_tunnel_disconnect(conn, 1, 0);
}
static int l2tp_tunnel_send(struct l2tp_conn_t *conn,
struct l2tp_packet_t *pack)
{
const struct l2tp_attr_t *msg_type = NULL;
void (*log_func)(const char *fmt, ...) = NULL;
conn->retransmit = 0;
pack->hdr.tid = htons(conn->peer_tid);
pack->hdr.Nr = htons(conn->Nr);
pack->hdr.Ns = htons(conn->Ns);
if (conf_verbose) {
if (list_empty(&pack->attrs))
log_func = log_debug;
else {
msg_type = list_entry(pack->attrs.next,
typeof(*msg_type), entry);
if (msg_type->val.uint16 == Message_Type_Hello)
log_func = log_debug;
else
log_func = log_info2;
}
log_tunnel(log_func, conn, "send ");
l2tp_packet_print(pack, log_func);
}
if (l2tp_packet_send(conn->hnd.fd, pack)) {
log_tunnel(log_error, conn, "packet transmission failure\n");
goto out_err;
}
if (!list_empty(&pack->attrs)) {
conn->Ns++;
list_add_tail(&pack->entry, &conn->send_queue);
if (!conn->rtimeout_timer.tpd)
if (triton_timer_add(&conn->ctx,
&conn->rtimeout_timer, 0) < 0)
log_tunnel(log_warn, conn,
"setting retransmission timer for"
" packet %hu failed\n",
conn->Ns - 1);
} else
l2tp_packet_free(pack);
return 0;
out_err:
l2tp_packet_free(pack);
return -1;
}
static int l2tp_session_send(struct l2tp_sess_t *sess,
struct l2tp_packet_t *pack)
{
pack->hdr.sid = htons(sess->peer_sid);
return l2tp_tunnel_send(sess->paren_conn, pack);
l2tp_tunnel_disconnect_push(conn, 1, 0);
}
static int l2tp_send_ZLB(struct l2tp_conn_t *conn)
{
struct l2tp_packet_t *pack;
int res;
log_tunnel(log_debug, conn, "sending ZLB\n");
......@@ -1800,13 +1947,22 @@ static int l2tp_send_ZLB(struct l2tp_conn_t *conn)
return -1;
}
if (l2tp_tunnel_send(conn, pack) < 0) {
/* ZLB messages are special: they take no slot in the control message
* sequence number space and never have to be retransmitted. So they're
* sent directly by __l2tp_tunnel_send(), thus bypassing the send and
* retransmission queues.
*/
pack->hdr.tid = htons(conn->peer_tid);
pack->hdr.Ns = htons(conn->Ns);
res = __l2tp_tunnel_send(conn, pack);
if (res < 0)
log_tunnel(log_error, conn, "impossible to send ZLB:"
" sending packet failed\n");
return -1;
}
return 0;
l2tp_packet_free(pack);
return res;
}
static void l2tp_send_HELLO(struct triton_timer_t *t)
......@@ -1821,13 +1977,23 @@ static void l2tp_send_HELLO(struct triton_timer_t *t)
conn->secret_len);
if (!pack) {
log_tunnel(log_error, conn, "impossible to send HELLO:"
" packet allocation failed\n");
return;
" packet allocation failed, deleting tunnel\n");
goto err;
}
if (l2tp_tunnel_send(conn, pack) < 0)
l2tp_tunnel_send(conn, pack);
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn, "impossible to send HELLO:"
" sending packet failed\n");
" transmitting messages from send queue failed,"
" deleting tunnel\n");
goto err;
}
return;
err:
l2tp_tunnel_free(conn);
}
static void l2tp_send_SCCRQ(void *peer_addr)
......@@ -1895,9 +2061,11 @@ static void l2tp_send_SCCRQ(void *peer_addr)
goto pack_err;
}
if (l2tp_tunnel_send(conn, pack) < 0) {
l2tp_tunnel_send(conn, pack);
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn, "impossible to send SCCRQ:"
" sending packet failed\n");
" transmitting messages from send queue failed\n");
goto err;
}
......@@ -1982,9 +2150,11 @@ static void l2tp_send_SCCRP(struct l2tp_conn_t *conn)
goto out_err;
}
if (l2tp_tunnel_send(conn, pack) < 0) {
l2tp_tunnel_send(conn, pack);
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn, "impossible to send SCCRP:"
" sending packet failed\n");
" transmitting messages from send queue failed\n");
goto out;
}
......@@ -3562,6 +3732,14 @@ static void l2tp_tunnel_create_session(void *data)
return;
}
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn, "impossible to create session:"
" transmitting messages from send queue failed\n");
l2tp_session_free(sess);
return;
}
log_tunnel(log_info1, conn, "new session %hu created following"
" request from command line interface\n", sid);
}
......@@ -3674,7 +3852,7 @@ static int l2tp_conn_read(struct triton_md_handler_t *h)
{
struct l2tp_conn_t *conn = container_of(h, typeof(*conn), hnd);
struct l2tp_sess_t *sess = NULL;
struct l2tp_packet_t *pack, *p;
struct l2tp_packet_t *pack;
const struct l2tp_attr_t *msg_type;
uint16_t m_type;
uint16_t m_sid;
......@@ -3727,6 +3905,17 @@ static int l2tp_conn_read(struct triton_md_handler_t *h)
continue;
}
if (nsnr_cmp(ntohs(pack->hdr.Nr), conn->peer_Nr) > 0)
conn->peer_Nr = ntohs(pack->hdr.Nr);
/* Drop acknowledged packets from retransmission queue */
if (l2tp_tunnel_clean_rtmsqueue(conn) < 0) {
log_tunnel(log_error, conn,
"impossible to handle incoming message:"
" cleaning retransmission queue failed\n");
goto drop;
}
res = nsnr_cmp(ntohs(pack->hdr.Ns), conn->Nr);
if (res < 0) {
/* Duplicate message */
......@@ -3735,7 +3924,7 @@ static int l2tp_conn_read(struct triton_md_handler_t *h)
" %hu/%hu, tunnel Ns/Nr: %hu/%hu)\n",
ntohs(pack->hdr.Ns), ntohs(pack->hdr.Nr),
conn->Ns, conn->Nr);
if (!list_empty(&conn->send_queue))
if (!list_empty(&conn->rtms_queue))
res = l2tp_retransmit(conn);
else
res = l2tp_send_ZLB(conn);
......@@ -3757,26 +3946,9 @@ static int l2tp_conn_read(struct triton_md_handler_t *h)
} else {
if (!list_empty(&pack->attrs))
conn->Nr++;
while (!list_empty(&conn->send_queue)) {
/* Flush retransmission queue up
to the last acknowledged message */
p = list_entry(conn->send_queue.next,
typeof(*pack), entry);
if (nsnr_cmp(ntohs(p->hdr.Ns),
ntohs(pack->hdr.Nr)) >= 0)
break;
list_del(&p->entry);
l2tp_packet_free(p);
conn->retransmit = 0;
}
if (!list_empty(&conn->send_queue))
triton_timer_mod(&conn->rtimeout_timer, 0);
else {
if (conn->rtimeout_timer.tpd)
triton_timer_del(&conn->rtimeout_timer);
if (conn->state == STATE_FIN)
goto drop;
}
if (conn->state == STATE_FIN)
goto drop;
}
if (list_empty(&pack->attrs)) {
......@@ -3824,6 +3996,15 @@ static int l2tp_conn_read(struct triton_md_handler_t *h)
} else {
l2tp_tunnel_recv(conn, pack, m_type, msg_type->M);
}
if (l2tp_tunnel_push_sendqueue(conn) < 0) {
log_tunnel(log_error, conn,
"impossible to reply to incoming message:"
" transmitting messages from send queue failed,"
" deleting tunnel\n");
goto drop;
}
l2tp_packet_free(pack);
}
......
......@@ -76,6 +76,11 @@ struct l2tp_packet_t
extern int conf_verbose;
extern int conf_avp_permissive;
static inline int l2tp_packet_is_ZLB(const struct l2tp_packet_t *pack)
{
return list_empty(&pack->attrs);
}
struct l2tp_dict_attr_t *l2tp_dict_find_attr_by_name(const char *name);
struct l2tp_dict_attr_t *l2tp_dict_find_attr_by_id(int id);
const struct l2tp_dict_value_t *l2tp_dict_find_value(const struct l2tp_dict_attr_t *attr,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册