提交 09838cfd 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into patch/TD-1632

Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
...@@ -62,7 +62,7 @@ typedef struct { ...@@ -62,7 +62,7 @@ typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj *pThreadObj; SThreadObj **pThreadObj;
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
...@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
pServerObj->numOfThreads = numOfThreads; pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// initialize parameters in case it may encounter error later // initialize parameters in case it may encounter error later
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
taosResetPthread(&pThreadObj->thread); taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
pThreadObj++;
} }
// initialize mutex, thread, fd which may fail // initialize mutex, thread, fd which may fail
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj[i];
code = pthread_mutex_init(&(pThreadObj->mutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
...@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
pThreadObj->threadId = i; pThreadObj->threadId = i;
pThreadObj++;
} }
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
...@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { ...@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true; pThreadObj->stop = true;
eventfd_t fd = -1; eventfd_t fd = -1;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
// signal the thread to stop, try graceful method first, // signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed // and use pthread_cancel when failed
...@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { ...@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
} }
} }
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); pthread_join(pThreadObj->thread, NULL);
if (fd != -1) taosCloseSocket(fd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
} }
if (fd != -1) taosCloseSocket(fd);
} }
void taosStopTcpServer(void *handle) { void taosStopTcpServer(void *handle) {
...@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) { ...@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
pthread_detach(pthread_self());
} else {
pthread_join(pServerObj->thread, NULL);
}
}
tDebug("%s TCP server is stopped", pServerObj->label); tDebug("%s TCP server is stopped", pServerObj->label);
} }
...@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
pthread_mutex_destroy(&(pThreadObj->mutex));
} }
tDebug("%s TCP server is cleaned up", pServerObj->label); tDebug("%s TCP server is cleaned up", pServerObj->label);
...@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId; pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) { if (pFdObj) {
...@@ -329,8 +344,6 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -329,8 +344,6 @@ void taosCleanUpTcpClient(void *chandle) {
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
tDebug ("%s TCP client is cleaned up", pThreadObj->label); tDebug ("%s TCP client is cleaned up", pThreadObj->label);
taosTFree(pThreadObj);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
...@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) { ...@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
if (pThreadObj->stop) break;
}
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
} }
pthread_mutex_destroy(&(pThreadObj->mutex));
tDebug("%s TCP thread exits ...", pThreadObj->label);
taosTFree(pThreadObj);
return NULL; return NULL;
} }
......
...@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { ...@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
} }
} }
if (pThread->stop) break;
} }
uDebug("%p TCP epoll thread exits", pThread); uDebug("%p TCP epoll thread exits", pThread);
...@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { ...@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
} }
pthread_join(thread, NULL); pthread_join(thread, NULL);
taosClose(fd); if (fd >= 0) taosClose(fd);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册