diff --git a/src/MQTTProtocolClient.c b/src/MQTTProtocolClient.c index 78f4d7f21e7bbffbb68d2827a98593bbb3aa308a..8ed220ed3563784e7c2a134482268582df772d37 100644 --- a/src/MQTTProtocolClient.c +++ b/src/MQTTProtocolClient.c @@ -325,10 +325,8 @@ int MQTTProtocol_handlePublishes(void* pack, int sock) rc = SOCKET_ERROR; /* queue acks? */ else if (publish->header.bits.qos == 1) { - /* send puback before processing the publications because a lot of return publications could fill up the socket buffer */ - rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); - /* if we get a socket error from sending the puback, should we ignore the publication? */ Protocol_processPublication(publish, client, 1); + rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); } else if (publish->header.bits.qos == 2) { @@ -364,7 +362,6 @@ int MQTTProtocol_handlePublishes(void* pack, int sock) already_received = 1; } else ListAppend(client->inboundMsgs, m, sizeof(Messages) + len); - rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0) { Publish publish1; @@ -394,6 +391,7 @@ int MQTTProtocol_handlePublishes(void* pack, int sock) } memcpy(m->publish->payload, temp, m->publish->payloadlen); } + rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); publish->topic = NULL; } exit: @@ -558,8 +556,6 @@ int MQTTProtocol_handlePubrels(void* pack, int sock) Publish publish; memset(&publish, '\0', sizeof(publish)); - /* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */ - rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID); publish.header.bits.qos = m->qos; publish.header.bits.retain = m->retain; publish.msgId = m->msgid; @@ -586,6 +582,7 @@ int MQTTProtocol_handlePubrels(void* pack, int sock) ListRemove(&(state.publications), m->publish); ListRemove(client->inboundMsgs, m); ++(state.msgs_received); + rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID); } } if (pubrel->MQTTVersion >= MQTTVERSION_5)