diff --git a/src/inc/trpc.h b/src/inc/trpc.h index e545abfed378f661c6ab58278590784c985b2672..a34d10747441757cb29bf07963702dc1585d9b40 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 78952e22095c6a5cebf6bb4b6de4c25009e9f3bd..21ce7ee60b951a5bed391ef68e204c5508fb824e 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -83,6 +83,9 @@ typedef struct { int8_t oldInUse; // server IP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + SRpcMsg *pRsp; // for synchronous API + tsem_t *pSem; // for synchronous API + SRpcIpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -183,6 +186,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); @@ -415,12 +419,12 @@ void rpcSendResponse(SRpcMsg *pMsg) { rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; - if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; rpcUnlockConn(pConn); taosTmrStopA(&pConn->pTimer); - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -456,6 +460,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SRpcReqContext *pContext; + pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + + memset(pRsp, 0, sizeof(SRpcMsg)); + + tsem_t sem; + tsem_init(&sem, 0, 0); + pContext->pSem = &sem; + pContext->pRsp = pRsp; + pContext->pSet = pIpSet; + + rpcSendRequest(shandle, pIpSet, pMsg); + + tsem_wait(&sem); + tsem_destroy(&sem); + + return; +} + static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); @@ -661,8 +685,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); - rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + if (pHead->code == 0) { + tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + } else { + // do nothing, it is heart beat from client + } } else if (pConn->inType == 0) { tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->inTranId); @@ -703,22 +731,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_INVALID_RESPONSE_TYPE; } - if (*pHead->content == TSDB_CODE_NOT_READY) { + if (pHead->code == TSDB_CODE_NOT_READY) { return TSDB_CODE_ALREADY_PROCESSED; } taosTmrStopA(&pConn->pTimer); pConn->retry = 0; - if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { + if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - pConn->tretry++; tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->tretry++; + rpcSendReqHead(pConn); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - *pHead->content = TSDB_CODE_TOO_SLOW; + return TSDB_CODE_TOO_SLOW; } } @@ -779,6 +808,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if ( rpcIsReq(pHead->msgType) ) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -800,6 +830,18 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { pContext->code = TSDB_CODE_NETWORK_UNAVAIL; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } + + if (pConn->inType) { + // if there are pending request, notify the app + tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); + SRpcMsg rpcMsg; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; + (*(pRpc->cfp))(&rpcMsg); + } rpcCloseConn(pConn); } @@ -824,7 +866,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } @@ -845,6 +887,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { return pConn; } +static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { + SRpcInfo *pRpc = pContext->pRpc; + + if (pContext->pRsp) { + // for synchronous API + tsem_post(pContext->pSem); + memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); + memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); + } else { + // for asynchronous API + if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) + (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet + + (*pRpc->cfp)(pMsg); + } + + // free the request message + rpcFreeCont(pContext->pCont); +} + static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; @@ -877,10 +939,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); } else { - if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) - (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet - (*pRpc->cfp)(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + rpcNotifyClient(pContext, &rpcMsg); } } } @@ -894,7 +953,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead = (SRpcHead *)msg; pHead->version = 1; pHead->msgType = pConn->inType+1; - pHead->spi = 0; + pHead->spi = pConn->spi; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; @@ -903,7 +962,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); - rpcSendMsgToPeer(pConn, msg, 0); + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); + pConn->secured = 1; // connection shall be secured +} + +static void rpcSendReqHead(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgType = pConn->outType; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = pConn->outTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = 1; + + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); } static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { @@ -999,9 +1080,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", + tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); @@ -1027,8 +1108,8 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; - (*(pRpc->cfp))(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + + rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP pContext->ipSet.inUse++; @@ -1079,6 +1160,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->user[0]) { tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + if (pConn->inType && pRpc->cfp) { + // if there are pending request, notify the app + tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + SRpcMsg rpcMsg; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; + (*(pRpc->cfp))(&rpcMsg); + } rpcCloseConn(pConn); } else { tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt index 15780a396c1b5de49f7a2c7af408369b67b6e088..b519ae757829f47c8d8467abba69834e35453686 100644 --- a/src/rpc/test/CMakeLists.txt +++ b/src/rpc/test/CMakeLists.txt @@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_EXECUTABLE(rclient ${CLIENT_SRC}) TARGET_LINK_LIBRARIES(rclient trpc) + LIST(APPEND SCLIENT_SRC ./rsclient.c) + ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) + TARGET_LINK_LIBRARIES(rsclient trpc) + LIST(APPEND SERVER_SRC ./rserver.c) ADD_EXECUTABLE(rserver ${SERVER_SRC}) TARGET_LINK_LIBRARIES(rserver trpc) diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 562d0fff96acab47d8121e21ed10e9fb984c105f..f000ab91a273251972e0145ff431a21bc0c8b519 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -40,7 +40,7 @@ typedef struct { void *pRpc; } SInfo; -void processResponse(SRpcMsg *pMsg) { +static void processResponse(SRpcMsg *pMsg) { SInfo *pInfo = (SInfo *)pMsg->handle; tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); @@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) { sem_post(&pInfo->rspSem); } -void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)handle; tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); pInfo->ipSet = *pIpSet; } -int tcount = 0; +static int tcount = 0; -void *sendRequest(void *param) { +static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c new file mode 100644 index 0000000000000000000000000000000000000000..b99387e097df6930810d2176e2526d76f823f3fa --- /dev/null +++ b/src/rpc/test/rsclient.c @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include "taoserror.h" +#include +#include + +typedef struct { + int index; + SRpcIpSet ipSet; + int num; + int numOfReqs; + int msgSize; + sem_t rspSem; + sem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { + SInfo *pInfo = (SInfo *)handle; + + tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); + pInfo->ipSet = *pIpSet; +} + +static int tcount = 0; +static int terror = 0; + +static void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg, rspMsg; + + tTrace("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.handle = pInfo; + rpcMsg.msgType = 1; + tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + + rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); + + // handle response + if (rspMsg.code != 0) terror++; + + tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); + + rpcFreeCont(rspMsg.pCont); + + if ( pInfo->num % 20000 == 0 ) + tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + } + + tTrace("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcIpSet ipSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + ipSet.numOfIps = 1; + ipSet.inUse = 0; + ipSet.port = 7000; + ipSet.ip[0] = inet_addr(serverIp); + ipSet.ip[1] = inet_addr("192.168.0.1"); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + // rpcInit.cfp = processResponse; + rpcInit.ufp = processUpdateIpSet; + rpcInit.sessions = 100; + rpcInit.idleTime = tsShellActivityTimer*1000; + rpcInit.user = "michael"; + rpcInit.secret = "mypassword"; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i=1; iindex = i; + pInfo->ipSet = ipSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + sem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds + + tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); + tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + + taosCloseLogger(); + + return 0; +} + + diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 6c5b320809b8c714a3f30caefa952e289b57a36d..48ae02a1d57089852ad5964a058f0ad145410b8e 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -72,7 +72,7 @@ void processShellMsg() { rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 1; + rpcMsg.code = 0; rpcSendResponse(&rpcMsg); taosFreeQitem(pRpcMsg); @@ -126,9 +126,10 @@ void processRequestMsg(SRpcMsg *pMsg) { int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; + char localIp[40] = "0.0.0.0"; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = "0.0.0.0"; + rpcInit.localIp = localIp; rpcInit.localPort = 7000; rpcInit.label = "SER"; rpcInit.numOfThreads = 1; @@ -201,5 +202,3 @@ int main(int argc, char *argv[]) { return 0; } - - diff --git a/src/vnode/wal/CMakeLists.txt b/src/vnode/wal/CMakeLists.txt index 1de958f84e65198c94f8a946d5379f02ce0d2fa0..d77a235bb952cb736f63d6b6f87195b321af61c5 100644 --- a/src/vnode/wal/CMakeLists.txt +++ b/src/vnode/wal/CMakeLists.txt @@ -1,4 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) -ADD_LIBRARY(wal ${SRC}) -TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file +ADD_LIBRARY(twal ${SRC}) +TARGET_LINK_LIBRARIES(twal tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/vnode/wal/inc/vnodeWal.h b/src/vnode/wal/inc/twal.h similarity index 54% rename from src/vnode/wal/inc/vnodeWal.h rename to src/vnode/wal/inc/twal.h index 7753e4ecca5401bfc8434465f90cdb415d5757a7..49fcde9e28548da05c73a0eca658640a99eb67c4 100644 --- a/src/vnode/wal/inc/vnodeWal.h +++ b/src/vnode/wal/inc/twal.h @@ -14,19 +14,37 @@ */ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ -#include #ifdef __cplusplus extern "C" { #endif -typedef void walh; // WAL HANDLE +#define TAOS_WAL_NOLOG 0 +#define TAOS_WAL_WRITE 1 +#define TAOS_WAL_FSYNC 2 + +typedef struct { + int8_t msgType; + int8_t reserved[3]; + int32_t len; + uint64_t version; + uint32_t signature; + uint32_t cksum; + char cont[]; +} SWalHead; + +typedef void* twal_h; // WAL HANDLE + +twal_h walOpen(char *path, int max, int level); +void walClose(twal_h); +int walRenew(twal_h); +int walWrite(twal_h, SWalHead *); +void walFsync(twal_h); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); +int walGetWalFile(twal_h, char *name, uint32_t *index); + +extern int wDebugFlag; -walh *vnodeOpenWal(int vnode, uint8_t op); -int vnodeCloseWal(walh *pWal); -int vnodeRenewWal(walh *pWal); -int vnodeWriteWal(walh *pWal, void *cont, int contLen); -int vnodeSyncWal(walh *pWal); #ifdef __cplusplus } diff --git a/src/vnode/wal/src/vnodeWal.c b/src/vnode/wal/src/vnodeWal.c deleted file mode 100644 index 528cc97ed6030bbbf7941b05a3ff9c4cf5b81e75..0000000000000000000000000000000000000000 --- a/src/vnode/wal/src/vnodeWal.c +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include "vnodeWal.h" - -typedef struct { - /* TODO */ -} SWal; - -walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; } -int vnodeCloseWal(walh *pWal) { return 0; } -int vnodeRenewWal(walh *pWal) { return 0; } -int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; } -int vnodeSyncWal(walh *pWal) { return 0; } \ No newline at end of file diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c new file mode 100644 index 0000000000000000000000000000000000000000..f327c28ce32e554a694f26d1adb4a4eb0c046407 --- /dev/null +++ b/src/vnode/wal/src/walMain.c @@ -0,0 +1,357 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "os.h" +#include "tlog.h" +#include "tchecksum.h" +#include "tutil.h" +#include "twal.h" + +#define walPrefix "wal" +#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);} +#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);} +#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);} +#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int fd; + int level; + int max; // maximum number of wal files + uint32_t id; // increase continuously + int num; // number of wal files + char path[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN]; + pthread_mutex_t mutex; +} SWal; + +int wDebugFlag = 135; + +static uint32_t walSignature = 0xFAFBFDFE; +static int walHandleExistingFiles(char *path); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)); +static int walRemoveWalFiles(char *path); + +void *walOpen(char *path, int max, int level) { + SWal *pWal = calloc(sizeof(SWal), 1); + if (pWal == NULL) return NULL; + + pWal->fd = -1; + pWal->max = max; + pWal->id = 0; + pWal->num = 0; + pWal->level = level; + strcpy(pWal->path, path); + pthread_mutex_init(&pWal->mutex, NULL); + + if (access(path, F_OK) != 0) mkdir(path, 0755); + + if (walHandleExistingFiles(path) == 0) + walRenew(pWal); + + if (pWal->fd <0) { + wError("wal:%s, failed to open", path); + pthread_mutex_destroy(&pWal->mutex); + free(pWal); + pWal = NULL; + } + + return pWal; +} + +void walClose(void *handle) { + + SWal *pWal = (SWal *)handle; + + close(pWal->fd); + + // remove all files in the directory + for (int i=0; inum; ++i) { + sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id-i); + if (remove(pWal->name) <0) { + wError("wal:%s, failed to remove", pWal->name); + } else { + wTrace("wal:%s, it is removed", pWal->name); + } + } + + pthread_mutex_destroy(&pWal->mutex); + + free(pWal); +} + +int walRenew(twal_h handle) { + SWal *pWal = (SWal *)handle; + int code = 0; + + pthread_mutex_lock(&pWal->mutex); + + if (pWal->fd >=0) { + close(pWal->fd); + pWal->id++; + wTrace("wal:%s, it is closed", pWal->name); + } + + pWal->num++; + + sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + + if (pWal->fd < 0) { + wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); + code = -1; + } else { + wTrace("wal:%s, it is created", pWal->name); + + if (pWal->num > pWal->max) { + // remove the oldest wal file + char name[TSDB_FILENAME_LEN]; + sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + } else { + wTrace("wal:%s, it is removed", name); + } + + pWal->num--; + } + } + + pthread_mutex_unlock(&pWal->mutex); + + return code; +} + +int walWrite(void *handle, SWalHead *pHead) { + SWal *pWal = (SWal *)handle; + int code = 0; + + // no wal + if (pWal->level == TAOS_WAL_NOLOG) return 0; + + pHead->signature = walSignature; + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal)); + int contLen = pHead->len + sizeof(SWalHead); + + if(write(pWal->fd, pHead, contLen) != contLen) { + wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); + code = -1; + } + + return code; +} + +void walFsync(void *handle) { + + SWal *pWal = (SWal *)handle; + + if (pWal->level == TAOS_WAL_FSYNC) + fsync(pWal->fd); +} + +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { + SWal *pWal = (SWal *)handle; + int code = 0; + struct dirent *ent; + int count = 0; + uint32_t maxId = 0, minId = -1, index =0; + + int plen = strlen(walPrefix); + char opath[TSDB_FILENAME_LEN]; + sprintf(opath, "%s/old", pWal->path); + + // is there old directory? + if (access(opath, F_OK)) return 0; + + DIR *dir = opendir(opath); + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + index = atol(ent->d_name + plen); + if (index > maxId) maxId = index; + if (index < minId) minId = index; + count++; + } + } + + if ( count != (maxId-minId+1) ) { + wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); + code = -1; + } else { + wTrace("wal:%s, %d files will be restored", opath, count); + + for (index = minId; index<=maxId; ++index) { + sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index); + code = walRestoreWalFile(pWal->name, pVnode, writeFp); + if (code < 0) break; + } + } + + if (code == 0) { + code = walRemoveWalFiles(opath); + if (code == 0) { + if (remove(opath) < 0) { + wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + code = -1; + } + } + } + + closedir(dir); + + return code; +} + +int walGetWalFile(void *handle, char *name, uint32_t *index) { + SWal *pWal = (SWal *)handle; + int code = 1; + int32_t first = 0; + + name[0] = 0; + if (pWal == NULL || pWal->num == 0) return 0; + + pthread_mutex_lock(&(pWal->mutex)); + + first = pWal->id + 1 - pWal->num; + if (*index == 0) *index = first; // set to first one + + if (*index < first && *index > pWal->id) { + code = -1; // index out of range + } else { + sprintf(name, "%s/%s%d", pWal->path, walPrefix, *index); + code = (*index == pWal->id) ? 0:1; + } + + pthread_mutex_unlock(&(pWal->mutex)); + + return code; +} + +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { + SWalHead walHead; + int code = 0; + + int fd = open(name, O_RDONLY); + if (fd < 0) { + wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + return -1; + } + + wTrace("wal:%s, start to restore", name); + + while (1) { + int ret = read(fd, &walHead, sizeof(walHead)); + if ( ret == 0) { code = 0; break;} + + if (ret != sizeof(walHead)) { + wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); + break; + } + + if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) { + wWarn("wal:%s, cksum is messed up, skip the rest of file", name); + break; + } + + char *buffer = malloc(sizeof(SWalHead) + walHead.len); + memcpy(buffer, &walHead, sizeof(walHead)); + + ret = read(fd, buffer+sizeof(walHead), walHead.len); + if ( ret != walHead.len) { + wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret); + break; + } + + // write into queue + (*writeFp)(pVnode, buffer); + } + + return code; +} + +int walHandleExistingFiles(char *path) { + int code = 0; + char oname[TSDB_FILENAME_LEN]; + char nname[TSDB_FILENAME_LEN]; + char opath[TSDB_FILENAME_LEN]; + + sprintf(opath, "%s/old", path); + + struct dirent *ent; + DIR *dir = opendir(path); + int plen = strlen(walPrefix); + + if (access(opath, F_OK) == 0) { + // old directory is there, it means restore process is not finished + walRemoveWalFiles(path); + + } else { + // move all files to old directory + int count = 0; + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + if (access(opath, F_OK) != 0) mkdir(opath, 0755); + + sprintf(oname, "%s/%s", path, ent->d_name); + sprintf(nname, "%s/old/%s", path, ent->d_name); + if (rename(oname, nname) < 0) { + wError("wal:%s, failed to move to new:%s", oname, nname); + code = -1; + break; + } + + count++; + } + } + + wTrace("wal:%s, %d files are moved for restoration", path, count); + } + + closedir(dir); + return code; +} + +static int walRemoveWalFiles(char *path) { + int plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN]; + int code = 0; + + if (access(path, F_OK) != 0) return 0; + + struct dirent *ent; + DIR *dir = opendir(path); + + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + sprintf(name, "%s/%s", path, ent->d_name); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + code = -1; break; + } + } + } + + closedir(dir); + + return code; +} + + diff --git a/src/vnode/wal/test/CMakeLists.txt b/src/vnode/wal/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..06591def40ab0c7a8013553e8187ee8ad2d0ba8a --- /dev/null +++ b/src/vnode/wal/test/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND WALTEST_SRC ./waltest.c) + ADD_EXECUTABLE(waltest ${WALTEST_SRC}) + TARGET_LINK_LIBRARIES(waltest twal) + +ENDIF () + + diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c new file mode 100644 index 0000000000000000000000000000000000000000..e90b54d1f326789d846bc65bf5b909d90e8eb3dc --- /dev/null +++ b/src/vnode/wal/test/waltest.c @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include +#include "tlog.h" +#include "twal.h" + +int64_t ver = 0; +void *pWal = NULL; + +int writeToQueue(void *pVnode, void *data) { + SWalHead *pHead = (SWalHead *)data; + + // do nothing + if (pHead->version > ver) + ver = pHead->version; + + walWrite(pWal, pHead); + + free(data); + + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "/home/jhtao/test/wal"; + int max = 3; + int level = 2; + int total = 5; + int rows = 10000; + int size = 128; + + for (int i=1; iversion = ++ver; + pHead->len = size; + walWrite(pWal, pHead); + } + + printf("renew a wal, i:%d\n", i); + walRenew(pWal); + } + + printf("%d wal files are written\n", total); + + uint32_t index = 0; + char name[256]; + + while (1) { + int code = walGetWalFile(pWal, name, &index); + if (code == -1) { + printf("failed to get wal file, index:%d\n", index); + break; + } + + printf("index:%d wal:%s\n", index, name); + if (code == 0) break; + + index++; + } + + getchar(); + + walClose(pWal); + + return 0; +}