diff --git a/src/MQTTClient.c b/src/MQTTClient.c index 57ecf3242b4e60adf26fcd04233b27caab197175..a5fab47588d1b1773f99972d5de4f7e624a119a4 100644 --- a/src/MQTTClient.c +++ b/src/MQTTClient.c @@ -250,7 +250,7 @@ static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER; static mutex_type mqttclient_mutex = &mqttclient_mutex_store; static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER; -static mutex_type socket_mutex = &socket_mutex_store; +mutex_type socket_mutex = &socket_mutex_store; static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER; static mutex_type subscribe_mutex = &subscribe_mutex_store; diff --git a/src/Socket.c b/src/Socket.c index 57cdf99c29024f76f9f8a552fd08e5fa2c7d12e0..9a91c58d9dc42ccbeea886fb64d50408510b6a9d 100644 --- a/src/Socket.c +++ b/src/Socket.c @@ -74,6 +74,8 @@ Sockets mod_s; static fd_set wset; #endif +extern mutex_type socket_mutex; + /** * Set a socket non-blocking, OS independently * @param sock the socket to set non-blocking @@ -268,6 +270,7 @@ int Socket_addSocket(SOCKET newSd) int rc = 0; FUNC_ENTRY; + Thread_lock_mutex(socket_mutex); mod_s.nfds++; if (mod_s.fds_read) mod_s.fds_read = realloc(mod_s.fds_read, mod_s.nfds * sizeof(mod_s.fds_read[0])); @@ -307,6 +310,7 @@ int Socket_addSocket(SOCKET newSd) Log(LOG_ERROR, -1, "addSocket: setnonblocking"); exit: + Thread_unlock_mutex(socket_mutex); FUNC_EXIT_RC(rc); return rc; } @@ -1169,6 +1173,7 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock) if (rc == EINPROGRESS || rc == EWOULDBLOCK) { SOCKET* pnewSd = (SOCKET*)malloc(sizeof(SOCKET)); + ListElement* result = NULL; if (!pnewSd) { @@ -1176,7 +1181,10 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock) goto exit; } *pnewSd = *sock; - if (!ListAppend(mod_s.connect_pending, pnewSd, sizeof(SOCKET))) + Thread_lock_mutex(socket_mutex); + result = ListAppend(mod_s.connect_pending, pnewSd, sizeof(SOCKET)); + Thread_unlock_mutex(socket_mutex); + if (!result) { free(pnewSd); rc = PAHO_MEMORY_ERROR; @@ -1189,7 +1197,9 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock) as reported in https://github.com/eclipse/paho.mqtt.c/issues/135 */ if (rc != 0 && (rc != EINPROGRESS) && (rc != EWOULDBLOCK)) { + Thread_lock_mutex(socket_mutex); Socket_close(*sock); /* close socket and remove from our list of sockets */ + Thread_unlock_mutex(socket_mutex); *sock = SOCKET_ERROR; /* as initialized before */ } }