提交 a0703951 编写于 作者: S Shengliang Guan

Merge branch 'hotfix/boundary' of https://github.com/taosdata/TDengine into hotfix/boundary

...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "dnodeMain.h" #include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
static sem_t exitSem;
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
// Set global configuration file // Set global configuration file
...@@ -65,6 +66,11 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -65,6 +66,11 @@ int32_t main(int32_t argc, char *argv[]) {
#endif #endif
} }
if (sem_init(&exitSem, 0, 0) != 0) {
printf("failed to create exit semphore\n");
exit(EXIT_FAILURE);
}
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
...@@ -90,9 +96,19 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -90,9 +96,19 @@ int32_t main(int32_t argc, char *argv[]) {
syslog(LOG_INFO, "Started TDengine service successfully."); syslog(LOG_INFO, "Started TDengine service successfully.");
while (1) { for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) {
sleep(1000); if (res != EINTR) {
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
break;
}
} }
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
return EXIT_SUCCESS;
} }
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
...@@ -104,14 +120,21 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { ...@@ -104,14 +120,21 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
taosCfgDynamicOptions("resetlog"); taosCfgDynamicOptions("resetlog");
return; return;
} }
syslog(LOG_INFO, "Shut down signal is %d", signum); syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service..."); syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system. // clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog // protect the application from receive another signal
syslog(LOG_INFO, "Shut down TDengine service successfully"); struct sigaction act = {{0}};
dPrint("TDengine is shut down!"); act.sa_handler = SIG_IGN;
closelog(); sigaction(SIGTERM, &act, NULL);
exit(EXIT_SUCCESS); sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// inform main thread to exit
sem_post(&exitSem);
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "twal.h" #include "twal.h"
...@@ -71,11 +72,16 @@ int32_t dnodeInitRead() { ...@@ -71,11 +72,16 @@ int32_t dnodeInitRead() {
} }
void dnodeCleanupRead() { void dnodeCleanupRead() {
for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(readQset);
}
}
for (int i=0; i < readPool.max; ++i) { for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
...@@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
SReadWorker *pWorker = param;
SReadMsg *pReadMsg; SReadMsg *pReadMsg;
int type; int type;
void *pVnode; void *pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dnodeHandleIdleReadWorker(pWorker); dTrace("dnodeProcessReadQueee: got no message from qset, exiting...");
continue; break;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
...@@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset); int32_t num = taosGetQueueNumber(readQset);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -67,11 +68,16 @@ int32_t dnodeInitWrite() { ...@@ -67,11 +68,16 @@ int32_t dnodeInitWrite() {
} }
void dnodeCleanupWrite() { void dnodeCleanupWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
...@@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs ==0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
continue; break;
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
...@@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset); int32_t num = taosGetQueueNumber(pWorker->qset);
......
...@@ -53,6 +53,7 @@ extern "C" { ...@@ -53,6 +53,7 @@ extern "C" {
#include <string.h> #include <string.h>
#include <strings.h> #include <strings.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/file.h> #include <sys/file.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/mman.h> #include <sys/mman.h>
......
...@@ -199,7 +199,7 @@ typedef struct HttpThread { ...@@ -199,7 +199,7 @@ typedef struct HttpThread {
pthread_t thread; pthread_t thread;
HttpContext * pHead; HttpContext * pHead;
pthread_mutex_t threadMutex; pthread_mutex_t threadMutex;
pthread_cond_t fdReady; bool stop;
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
...@@ -212,6 +212,8 @@ typedef struct HttpServer { ...@@ -212,6 +212,8 @@ typedef struct HttpServer {
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
uint32_t serverIp; uint32_t serverIp;
uint16_t serverPort; uint16_t serverPort;
bool online;
int fd;
int cacheContext; int cacheContext;
int sessionExpire; int sessionExpire;
int numOfThreads; int numOfThreads;
...@@ -226,7 +228,6 @@ typedef struct HttpServer { ...@@ -226,7 +228,6 @@ typedef struct HttpServer {
bool (*processData)(HttpContext *pContext); bool (*processData)(HttpContext *pContext);
int requestNum; int requestNum;
void *timerHandle; void *timerHandle;
bool online;
} HttpServer; } HttpServer;
// http util method // http util method
......
...@@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) { ...@@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) {
httpCloseContextByServer(pContext->pThread, pContext); httpCloseContextByServer(pContext->pThread, pContext);
} }
void httpCleanUpConnect(HttpServer *pServer) {
int i;
HttpThread *pThread;
if (pServer == NULL) return; static void httpStopThread(HttpThread* pThread) {
pThread->stop = true;
pthread_cancel(pServer->thread); // signal the thread to stop, try graceful method first,
pthread_join(pServer->thread, NULL); // and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThread->thread);
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThread->thread);
}
for (i = 0; i < pServer->numOfThreads; ++i) { pthread_join(pThread->thread, NULL);
pThread = pServer->pThreads + i; if (fd != -1) {
if (pThread == NULL) continue; close(fd);
//taosCloseSocket(pThread->pollFd); }
close(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex));
//while (pThread->pHead) { //while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0); // httpCleanUpContext(pThread->pHead, 0);
//} //}
}
pthread_cancel(pThread->thread);
pthread_join(pThread->thread, NULL); void httpCleanUpConnect(HttpServer *pServer) {
pthread_cond_destroy(&(pThread->fdReady)); if (pServer == NULL) return;
pthread_mutex_destroy(&(pThread->threadMutex));
shutdown(pServer->fd, SHUT_RD);
pthread_join(pServer->thread, NULL);
for (int i = 0; i < pServer->numOfThreads; ++i) {
HttpThread* pThread = pServer->pThreads + i;
if (pThread != NULL) {
httpStopThread(pThread);
}
} }
tfree(pServer->pThreads); tfree(pServer->pThreads);
...@@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) { ...@@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) {
pthread_sigmask(SIG_SETMASK, &set, NULL); pthread_sigmask(SIG_SETMASK, &set, NULL);
while (1) { while (1) {
pthread_mutex_lock(&pThread->threadMutex);
if (pThread->numOfFds < 1) {
pthread_cond_wait(&pThread->fdReady, &pThread->threadMutex);
}
pthread_mutex_unlock(&pThread->threadMutex);
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
if (pThread->stop) {
httpTrace("%p, http thread get stop event, exiting...", pThread);
break;
}
if (fdNum <= 0) continue; if (fdNum <= 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
...@@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) { ...@@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) {
} }
} }
void httpAcceptHttpConnection(void *arg) { void* httpAcceptHttpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int sockFd;
int threadId = 0; int threadId = 0;
HttpThread * pThread; HttpThread * pThread;
HttpServer * pServer; HttpServer * pServer;
...@@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) { ...@@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) {
sigaddset(&set, SIGPIPE); sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL); pthread_sigmask(SIG_SETMASK, &set, NULL);
sockFd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
if (sockFd < 0) { if (pServer->fd < 0) {
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp), httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp),
pServer->serverPort, strerror(errno)); pServer->serverPort, strerror(errno));
return; return NULL;
} else { } else {
httpPrint("http service init success at %u", pServer->serverPort); httpPrint("http service init success at %u", pServer->serverPort);
pServer->online = true; pServer->online = true;
...@@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) { ...@@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) {
while (1) { while (1) {
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
connFd = (int)accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd == -1) {
if (connFd < 3) { if (errno == EINVAL) {
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label);
break;
}
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno)); httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno));
continue; continue;
} }
...@@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) { ...@@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) {
pThread->pHead = pContext; pThread->pHead = pContext;
pThread->numOfFds++; pThread->numOfFds++;
pthread_cond_signal(&pThread->fdReady);
pthread_mutex_unlock(&(pThread->threadMutex)); pthread_mutex_unlock(&(pThread->threadMutex));
...@@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) { ...@@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) {
threadId++; threadId++;
threadId = threadId % pServer->numOfThreads; threadId = threadId % pServer->numOfThreads;
} }
close(pServer->fd);
return NULL;
} }
bool httpInitConnect(HttpServer *pServer) { bool httpInitConnect(HttpServer *pServer) {
...@@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) { ...@@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) {
return false; return false;
} }
if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) {
httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno));
return false;
}
pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
httpError("http thread:%s, failed to create HTTP epoll", pThread->label); httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
......
...@@ -889,12 +889,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -889,12 +889,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
rpcSendErrorMsgToPeer(pRecv, code); rpcSendErrorMsgToPeer(pRecv, code);
tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
} }
} }
if (code) rpcFreeMsg(pRecv->msg); if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed
return pConn; return pConn;
} }
......
...@@ -39,8 +39,8 @@ typedef struct SThreadObj { ...@@ -39,8 +39,8 @@ typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
SFdObj * pHead; SFdObj * pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t fdReady;
uint32_t ip; uint32_t ip;
bool stop;
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
...@@ -50,6 +50,7 @@ typedef struct SThreadObj { ...@@ -50,6 +50,7 @@ typedef struct SThreadObj {
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
int fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[12];
...@@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param); ...@@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void taosFreeFdObj(SFdObj *pFdObj); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void taosAcceptTcpConnection(void *arg); static void* taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
...@@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;; break;;
} }
code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
if (code != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
break;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
...@@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThreadObj->thread);
}
pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd);
if (fd != -1) {
close(fd);
}
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
}
void taosCleanUpTcpServer(void *handle) { void taosCleanUpTcpServer(void *handle) {
SServerObj *pServerObj = handle; SServerObj *pServerObj = handle;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
pthread_cancel(pServerObj->thread); shutdown(pServerObj->fd, SHUT_RD);
pthread_join(pServerObj->thread, NULL); pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
taosStopTcpThread(pThreadObj);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
pthread_cond_destroy(&(pThreadObj->fdReady));
pthread_mutex_destroy(&(pThreadObj->mutex)); pthread_mutex_destroy(&(pThreadObj->mutex));
} }
...@@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj); tfree(pServerObj);
} }
static void taosAcceptTcpConnection(void *arg) { static void* taosAcceptTcpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in caddr; struct sockaddr_in caddr;
int sockFd;
int threadId = 0; int threadId = 0;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
SServerObj *pServerObj; SServerObj *pServerObj;
pServerObj = (SServerObj *)arg; pServerObj = (SServerObj *)arg;
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) return; if (pServerObj->fd < 0) return NULL;
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) { while (1) {
socklen_t addrlen = sizeof(caddr); socklen_t addrlen = sizeof(caddr);
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen); connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
if (connFd == -1) {
if (connFd < 0) { if (errno == EINVAL) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break;
}
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
continue; continue;
} }
...@@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) { ...@@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) {
threadId++; threadId++;
threadId = threadId % pServerObj->numOfThreads; threadId = threadId % pServerObj->numOfThreads;
} }
close(pServerObj->fd);
return NULL;
} }
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
...@@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return NULL; return NULL;
} }
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
...@@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle; SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return; if (pThreadObj == NULL) return;
while (pThreadObj->pHead) { taosStopTcpThread(pThreadObj);
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
tTrace (":%s, all connections are cleaned up", pThreadObj->label); tTrace (":%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj); tfree(pThreadObj);
...@@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) { ...@@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) {
SRpcHead rpcHead; SRpcHead rpcHead;
while (1) { while (1) {
pthread_mutex_lock(&pThreadObj->mutex);
if (pThreadObj->numOfFds < 1) {
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
}
pthread_mutex_unlock(&pThreadObj->mutex);
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
if (pThreadObj->stop) {
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
break;
}
if (fdNum < 0) continue; if (fdNum < 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
...@@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) { ...@@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj; pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++; pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->mutex)); pthread_mutex_unlock(&(pThreadObj->mutex));
return pFdObj; return pFdObj;
...@@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
...@@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pConn->signature = NULL; pConn->signature = NULL;
free(pConn->buffer); // shutdown to signal the thread to exit
pthread_cancel(pConn->thread); shutdown(pConn->fd, SHUT_RD);
taosCloseSocket(pConn->fd);
} }
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pthread_join(pConn->thread, NULL); pthread_join(pConn->thread, NULL);
free(pConn->buffer);
taosCloseSocket(pConn->fd);
tTrace("chandle:%p is closed", pConn); tTrace("chandle:%p is closed", pConn);
} }
...@@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) { ...@@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) {
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if(dataLen == 0) {
tTrace("data length is 0, socket was closed, exiting");
break;
}
port = ntohs(sourceAdd.sin_port); port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) { if (dataLen < sizeof(SRpcHead)) {
......
...@@ -39,6 +39,7 @@ void taosResetQitems(taos_qall); ...@@ -39,6 +39,7 @@ void taosResetQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
void taosCloseQset(); void taosCloseQset();
void taosQsetThreadResume(taos_qset param);
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);
......
...@@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) { ...@@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) {
free(qset); free(qset);
} }
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
tsem_post(&qset->sem);
}
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;
......
...@@ -31,7 +31,7 @@ typedef struct { ...@@ -31,7 +31,7 @@ typedef struct {
int numOfThreads; int numOfThreads;
pthread_t * qthread; pthread_t * qthread;
SSchedMsg * queue; SSchedMsg * queue;
bool stop;
void* pTmrCtrl; void* pTmrCtrl;
void* pTimer; void* pTimer;
} SSchedQueue; } SSchedQueue;
...@@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { ...@@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
return NULL; return NULL;
} }
pSched->stop = false;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
...@@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) { ...@@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) {
} }
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
} }
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0) if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
...@@ -185,14 +189,17 @@ void taosCleanUpScheduler(void *param) { ...@@ -185,14 +189,17 @@ void taosCleanUpScheduler(void *param) {
SSchedQueue *pSched = (SSchedQueue *)param; SSchedQueue *pSched = (SSchedQueue *)param;
if (pSched == NULL) return; if (pSched == NULL) return;
pSched->stop = true;
for (int i = 0; i < pSched->numOfThreads; ++i) { for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i]) if (pSched->qthread[i]) {
pthread_cancel(pSched->qthread[i]); tsem_post(&pSched->fullSem);
}
} }
for (int i = 0; i < pSched->numOfThreads; ++i) { for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i]) if (pSched->qthread[i]) {
pthread_join(pSched->qthread[i], NULL); pthread_join(pSched->qthread[i], NULL);
} }
}
tsem_destroy(&pSched->emptySem); tsem_destroy(&pSched->emptySem);
tsem_destroy(&pSched->fullSem); tsem_destroy(&pSched->fullSem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册