未验证 提交 661cfc71 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4156 from taosdata/feature/wal

Feature/wal
...@@ -13,18 +13,22 @@ ...@@ -13,18 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tulog.h" #include "tulog.h"
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h" #include "taosTcpPool.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
bool stop; bool stop;
int pollFd; int32_t pollFd;
int numOfFds; int32_t numOfFds;
struct SPoolObj *pPool; struct SPoolObj *pPool;
} SThreadObj; } SThreadObj;
...@@ -32,15 +36,15 @@ typedef struct SPoolObj { ...@@ -32,15 +36,15 @@ typedef struct SPoolObj {
SPoolInfo info; SPoolInfo info;
SThreadObj **pThread; SThreadObj **pThread;
pthread_t thread; pthread_t thread;
int nextId; int32_t nextId;
int acceptFd; // FD for accept new connection int32_t acceptFd; // FD for accept new connection
} SPoolObj; } SPoolObj;
typedef struct { typedef struct {
SThreadObj *pThread; SThreadObj *pThread;
void *ahandle; void * ahandle;
int fd; int32_t fd;
int closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
static void *taosAcceptPeerTcpConnection(void *argv); static void *taosAcceptPeerTcpConnection(void *argv);
...@@ -53,66 +57,66 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -53,66 +57,66 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
if (pPool == NULL) { if (pPool == NULL) {
uError("TCP server, no enough memory"); sError("failed to alloc pool for TCP server since no enough memory");
return NULL; return NULL;
} }
pPool->info = *pInfo; pPool->info = *pInfo;
pPool->pThread = (SThreadObj **)calloc(sizeof(SThreadObj *), pInfo->numOfThreads); pPool->pThread = calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
if (pPool->pThread == NULL) { if (pPool->pThread == NULL) {
uError("TCP server, no enough memory"); sError("failed to alloc pool thread for TCP server since no enough memory");
free(pPool); tfree(pPool);
return NULL; return NULL;
} }
pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port); pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
if (pPool->acceptFd < 0) { if (pPool->acceptFd < 0) {
free(pPool->pThread); tfree(pPool->pThread);
free(pPool); tfree(pPool);
uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); sError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
return NULL; return NULL;
} }
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) {
uError("TCP server, failed to create accept thread, reason:%s", strerror(errno)); sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd); close(pPool->acceptFd);
free(pPool->pThread); tfree(pPool->pThread);
free(pPool); tfree(pPool);
return NULL; return NULL;
} }
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
uDebug("%p TCP pool is created", pPool); sDebug("%p TCP pool is created", pPool);
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) { void taosCloseTcpThreadPool(void *param) {
SPoolObj * pPool = (SPoolObj *)param; SPoolObj * pPool = param;
SThreadObj *pThread; SThreadObj *pThread;
shutdown(pPool->acceptFd, SHUT_RD); shutdown(pPool->acceptFd, SHUT_RD);
pthread_join(pPool->thread, NULL); pthread_join(pPool->thread, NULL);
for (int i = 0; i < pPool->info.numOfThreads; ++i) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i]; pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread); if (pThread) taosStopPoolThread(pThread);
} }
uDebug("%p TCP pool is closed", pPool); sDebug("%p TCP pool is closed", pPool);
taosTFree(pPool->pThread); taosTFree(pPool->pThread);
free(pPool); tfree(pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = param;
SConnObj *pConn = (SConnObj *)calloc(sizeof(SConnObj), 1); SConnObj *pConn = calloc(sizeof(SConnObj), 1);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
...@@ -120,7 +124,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { ...@@ -120,7 +124,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
SThreadObj *pThread = taosGetTcpThread(pPool); SThreadObj *pThread = taosGetTcpThread(pPool);
if (pThread == NULL) { if (pThread == NULL) {
free(pConn); tfree(pConn);
return NULL; return NULL;
} }
...@@ -133,13 +137,13 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { ...@@ -133,13 +137,13 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
event.data.ptr = pConn; event.data.ptr = pConn;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
uError("failed to add fd:%d(%s)", connFd, strerror(errno)); sError("failed to add fd:%d since %s", connFd, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
free(pConn); tfree(pConn);
pConn = NULL; pConn = NULL;
} else { } else {
pThread->numOfFds++; pThread->numOfFds++;
uDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
} }
return pConn; return pConn;
...@@ -149,7 +153,7 @@ void taosFreeTcpConn(void *param) { ...@@ -149,7 +153,7 @@ void taosFreeTcpConn(void *param) {
SConnObj * pConn = (SConnObj *)param; SConnObj * pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
pConn->closedByApp = 1; pConn->closedByApp = 1;
shutdown(pConn->fd, SHUT_WR); shutdown(pConn->fd, SHUT_WR);
} }
...@@ -164,9 +168,9 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -164,9 +168,9 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
pThread->numOfFds--; pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
uDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
taosClose(pConn->fd); taosClose(pConn->fd);
free(pConn); tfree(pConn);
} }
#define maxEvents 10 #define maxEvents 10
...@@ -183,18 +187,18 @@ static void *taosProcessTcpData(void *param) { ...@@ -183,18 +187,18 @@ static void *taosProcessTcpData(void *param) {
while (1) { while (1) {
if (pThread->stop) break; if (pThread->stop) break;
int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); int32_t fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) { if (pThread->stop) {
uDebug("%p TCP epoll thread is exiting...", pThread); sDebug("%p TCP epoll thread is exiting...", pThread);
break; break;
} }
if (fdNum < 0) { if (fdNum < 0) {
uError("epoll_wait failed (%s)", strerror(errno)); sError("epoll_wait failed since %s", strerror(errno));
continue; continue;
} }
for (int i = 0; i < fdNum; ++i) { for (int32_t i = 0; i < fdNum; ++i) {
pConn = events[i].data.ptr; pConn = events[i].data.ptr;
assert(pConn); assert(pConn);
...@@ -219,17 +223,16 @@ static void *taosProcessTcpData(void *param) { ...@@ -219,17 +223,16 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
} }
} }
if (pThread->stop) break; if (pThread->stop) break;
} }
uDebug("%p TCP epoll thread exits", pThread); sDebug("%p TCP epoll thread exits", pThread);
close(pThread->pollFd); close(pThread->pollFd);
free(pThread); tfree(pThread);
free(buffer); tfree(buffer);
return NULL; return NULL;
} }
...@@ -242,18 +245,18 @@ static void *taosAcceptPeerTcpConnection(void *argv) { ...@@ -242,18 +245,18 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
while (1) { while (1) {
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
int connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); int32_t connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 0) { if (connFd < 0) {
if (errno == EINVAL) { if (errno == EINVAL) {
uDebug("%p TCP server accept is exiting...", pPool); sDebug("%p TCP server accept is exiting...", pPool);
break; break;
} else { } else {
uError("TCP accept failure, reason:%s", strerror(errno)); sError("TCP accept failure since %s", strerror(errno));
continue; continue;
} }
} }
// uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); // sDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
(*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
} }
...@@ -273,23 +276,23 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -273,23 +276,23 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pThread->pPool = pPool; pThread->pPool = pPool;
pThread->pollFd = epoll_create(10); // size does not matter pThread->pollFd = epoll_create(10); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
free(pThread); tfree(pThread);
return NULL; return NULL;
} }
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
close(pThread->pollFd); close(pThread->pollFd);
free(pThread); tfree(pThread);
return NULL; return NULL;
} }
uDebug("%p TCP epoll thread is created", pThread); sDebug("%p TCP epoll thread is created", pThread);
pPool->pThread[pPool->nextId] = pThread; pPool->pThread[pPool->nextId] = pThread;
pPool->nextId++; pPool->nextId++;
pPool->nextId = pPool->nextId % pPool->info.numOfThreads; pPool->nextId = pPool->nextId % pPool->info.numOfThreads;
...@@ -314,12 +317,12 @@ static void taosStopPoolThread(SThreadObj *pThread) { ...@@ -314,12 +317,12 @@ static void taosStopPoolThread(SThreadObj *pThread) {
eventfd_t fd = eventfd(1, 0); eventfd_t fd = eventfd(1, 0);
if (fd == -1) { if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption // failed to create eventfd, call pthread_cancel instead, which may result in data corruption
uError("failed to create eventfd(%s)", strerror(errno)); sError("failed to create eventfd since %s", strerror(errno));
pthread_cancel(pThread->thread); pthread_cancel(pThread->thread);
pThread->stop = true; pThread->stop = true;
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption
uError("failed to call epoll_ctl(%s)", strerror(errno)); sError("failed to call epoll_ctl since %s", strerror(errno));
pthread_cancel(pThread->thread); pthread_cancel(pThread->thread);
} }
......
...@@ -128,7 +128,7 @@ SScript *simProcessCallOver(SScript *script) { ...@@ -128,7 +128,7 @@ SScript *simProcessCallOver(SScript *script) {
exit(0); exit(0);
} }
return NULL; return simScriptList[simScriptPos];
} }
} else { } else {
simInfo("script:%s, is stopped by main script", script->fileName); simInfo("script:%s, is stopped by main script", script->fileName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册