diff --git a/src/dnode/inc/dnodeVWrite.h b/src/dnode/inc/dnodeVWrite.h index 7da701a8e270239ee2f48e090138f048cc81f880..323405143fd10b7b4f73bac5103ad63ed870044f 100644 --- a/src/dnode/inc/dnodeVWrite.h +++ b/src/dnode/inc/dnodeVWrite.h @@ -20,9 +20,12 @@ extern "C" { #endif -int32_t dnodeInitVnodeWrite(); -void dnodeCleanupVnodeWrite(); -void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg); +int32_t dnodeInitVWrite(); +void dnodeCleanupVWrite(); +void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg); +void * dnodeAllocVWriteQueue(void *pVnode); +void dnodeFreeVWriteQueue(void *wqueue); +void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index c9cb59150c30aec5ee25680c08887f577f63b040..b4c174e29bc15be2796c6fab60e4ba0ab9e7b621 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = { {"wal", walInit, walCleanUp}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, - {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, + {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index a7c76a3473e16afa212800c86249b48ed006b62e..c6fc2b9e36e1c51374d8a7b164ff32a97c0cf9d0 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL; static void *tsDnodeClientRpc = NULL; int32_t dnodeInitServer() { - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 35ea6ae8b34b2fe2bbb3f6498ecccb56a4590742..33cda607937bac0032ca202ff1e746aec401f618 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index cd913cc100386f0145280b648adf193b0c508954..abf3cb527d3cc918161e2df10fd9f939adabfc30 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { } } -void *dnodeAllocateVnodeRqueue(void *pVnode) { +void *dnodeAllocVReadQueue(void *pVnode) { pthread_mutex_lock(&readPool.mutex); taos_queue queue = taosOpenQueue(); if (queue == NULL) { @@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { return queue; } -void dnodeFreeVnodeRqueue(void *rqueue) { +void dnodeFreeVReadQueue(void *rqueue) { taosCloseQueue(rqueue); // dynamically adjust the number of threads diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index d36c140f43668ec64a098873752ae60191b02b02..3a820f180c73b856a4693d668f91d675deb5b75b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -15,74 +15,65 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" -#include "taoserror.h" -#include "tutil.h" +#include "tglobal.h" #include "tqueue.h" -#include "trpc.h" #include "tsdb.h" #include "twal.h" -#include "tdataformat.h" -#include "tglobal.h" #include "tsync.h" #include "vnode.h" -#include "dnodeInt.h" #include "syncInt.h" -#include "dnodeVWrite.h" -#include "dnodeMgmt.h" +#include "dnodeInt.h" typedef struct { - taos_qall qall; - taos_qset qset; // queue set - pthread_t thread; // thread - int32_t workerId; // worker ID + taos_qall qall; + taos_qset qset; // queue set + int32_t workerId; // worker ID + pthread_t thread; // thread } SWriteWorker; typedef struct { - SRspRet rspRet; - int32_t processedCount; - int32_t code; - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; + SRspRet rspRet; + SRpcMsg rpcMsg; + int32_t processedCount; + int32_t code; + int32_t contLen; + void * pCont; } SWriteMsg; typedef struct { - int32_t max; // max number of workers - int32_t nextId; // from 0 to max-1, cyclic - SWriteWorker *writeWorker; + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic + SWriteWorker *worker; pthread_mutex_t mutex; } SWriteWorkerPool; +static SWriteWorkerPool tsVWriteWP; static void *dnodeProcessWriteQueue(void *param); -static void dnodeHandleIdleWorker(SWriteWorker *pWorker); -SWriteWorkerPool wWorkerPool; +int32_t dnodeInitVWrite() { + tsVWriteWP.max = tsNumOfCores; + tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max); + if (tsVWriteWP.worker == NULL) return -1; + pthread_mutex_init(&tsVWriteWP.mutex, NULL); -int32_t dnodeInitVnodeWrite() { - wWorkerPool.max = tsNumOfCores; - wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); - if (wWorkerPool.writeWorker == NULL) return -1; - pthread_mutex_init(&wWorkerPool.mutex, NULL); - - for (int32_t i = 0; i < wWorkerPool.max; ++i) { - wWorkerPool.writeWorker[i].workerId = i; + for (int32_t i = 0; i < tsVWriteWP.max; ++i) { + tsVWriteWP.worker[i].workerId = i; } - dInfo("dnode write is initialized, max worker %d", wWorkerPool.max); + dInfo("dnode vwrite is initialized, max worker %d", tsVWriteWP.max); return 0; } -void dnodeCleanupVnodeWrite() { - for (int32_t i = 0; i < wWorkerPool.max; ++i) { - SWriteWorker *pWorker = wWorkerPool.writeWorker + i; +void dnodeCleanupVWrite() { + for (int32_t i = 0; i < tsVWriteWP.max; ++i) { + SWriteWorker *pWorker = tsVWriteWP.worker + i; if (pWorker->thread) { taosQsetThreadResume(pWorker->qset); } } - for (int32_t i = 0; i < wWorkerPool.max; ++i) { - SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + for (int32_t i = 0; i < tsVWriteWP.max; ++i) { + SWriteWorker *pWorker = tsVWriteWP.worker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); taosFreeQall(pWorker->qall); @@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() { } } - pthread_mutex_destroy(&wWorkerPool.mutex); - free(wWorkerPool.writeWorker); - dInfo("dnode write is closed"); + pthread_mutex_destroy(&tsVWriteWP.mutex); + tfree(tsVWriteWP.worker); + dInfo("dnode vwrite is closed"); } -void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { - char *pCont = (char *)pMsg->pCont; +void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) { + char *pCont = pMsg->pCont; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { SMsgDesc *pDesc = (SMsgDesc *)pCont; @@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { taos_queue queue = vnodeAcquireWqueue(pHead->vgId); if (queue) { // put message into queue - SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); + SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg)); pWrite->rpcMsg = *pMsg; pWrite->pCont = pCont; pWrite->contLen = pHead->contLen; @@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { } } -void *dnodeAllocateVnodeWqueue(void *pVnode) { - pthread_mutex_lock(&wWorkerPool.mutex); - SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; +void *dnodeAllocVWriteQueue(void *pVnode) { + pthread_mutex_lock(&tsVWriteWP.mutex); + SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; void *queue = taosOpenQueue(); if (queue == NULL) { - pthread_mutex_unlock(&wWorkerPool.mutex); + pthread_mutex_unlock(&tsVWriteWP.mutex); return NULL; } @@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { pWorker->qset = taosOpenQset(); if (pWorker->qset == NULL) { taosCloseQueue(queue); - pthread_mutex_unlock(&wWorkerPool.mutex); + pthread_mutex_unlock(&tsVWriteWP.mutex); return NULL; } @@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { if (pWorker->qall == NULL) { taosCloseQset(pWorker->qset); taosCloseQueue(queue); - pthread_mutex_unlock(&wWorkerPool.mutex); + pthread_mutex_unlock(&tsVWriteWP.mutex); return NULL; } pthread_attr_t thAttr; @@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { - dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + dError("failed to create thread to process vwrite queue since %s", strerror(errno)); taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); taosCloseQueue(queue); queue = NULL; } else { - dDebug("write worker:%d is launched", pWorker->workerId); - wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; + dDebug("dnode vwrite worker:%d is launched", pWorker->workerId); + tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max; } pthread_attr_destroy(&thAttr); } else { taosAddIntoQset(pWorker->qset, queue, pVnode); - wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; + tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max; } - pthread_mutex_unlock(&wWorkerPool.mutex); - dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue); + pthread_mutex_unlock(&tsVWriteWP.mutex); + dDebug("pVnode:%p, dnode vwrite queue:%p is allocated", pVnode, queue); return queue; } -void dnodeFreeVnodeWqueue(void *wqueue) { +void dnodeFreeVWriteQueue(void *wqueue) { taosCloseQueue(wqueue); - - // dynamically adjust the number of threads } -void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { - SWriteMsg *pWrite = (SWriteMsg *)param; - if (pWrite == NULL) return; +void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { + if (param == NULL) return; + SWriteMsg *pWrite = param; if (code < 0) pWrite->code = code; int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); @@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; SWriteMsg * pWrite; SWalHead * pHead; - int32_t numOfMsgs; - int type; - void * pVnode, *item; SRspRet * pRspRet; + void * pVnode; + void * pItem; + int32_t numOfMsgs; + int32_t qtype; - dDebug("write worker:%d is running", pWorker->workerId); + dDebug("dnode vwrite worker:%d is running", pWorker->workerId); while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs == 0) { - dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset); + dDebug("qset:%p, dnode vwrite got no message from qset, exiting", pWorker->qset); break; } for (int32_t i = 0; i < numOfMsgs; ++i) { pWrite = NULL; pRspRet = NULL; - taosGetQitem(pWorker->qall, &type, &item); - if (type == TAOS_QTYPE_RPC) { - pWrite = (SWriteMsg *)item; + taosGetQitem(pWorker->qall, &qtype, &pItem); + if (qtype == TAOS_QTYPE_RPC) { + pWrite = pItem; pRspRet = &pWrite->rspRet; - pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); + pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead)); pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); - } else if (type == TAOS_QTYPE_CQ) { - pHead = (SWalHead *)((char*)item + sizeof(SSyncHead)); + } else if (qtype == TAOS_QTYPE_CQ) { + pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead)); dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version); } else { - pHead = (SWalHead *)item; + pHead = pItem; dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version); } - int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); + int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet); dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], pHead->version, tstrerror(code)); @@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) { // browse all items, and process them one by one taosResetQitems(pWorker->qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(pWorker->qall, &type, &item); - if (type == TAOS_QTYPE_RPC) { - pWrite = (SWriteMsg *)item; - dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); - } else if (type == TAOS_QTYPE_FWD) { - pHead = (SWalHead *)item; + taosGetQitem(pWorker->qall, &qtype, &pItem); + if (qtype == TAOS_QTYPE_RPC) { + pWrite = pItem; + dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code); + } else if (qtype == TAOS_QTYPE_FWD) { + pHead = pItem; vnodeConfirmForward(pVnode, pHead->version, 0); - taosFreeQitem(item); + taosFreeQitem(pItem); vnodeRelease(pVnode); } else { - taosFreeQitem(item); + taosFreeQitem(pItem); vnodeRelease(pVnode); } } @@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) { return NULL; } - -UNUSED_FUNC -static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { - int32_t num = taosGetQueueNumber(pWorker->qset); - - if (num > 0) { - usleep(30000); - sched_yield(); - } else { - taosFreeQall(pWorker->qall); - taosCloseQset(pWorker->qset); - pWorker->qset = NULL; - dDebug("write worker:%d is released", pWorker->workerId); - pthread_exit(NULL); - } -} diff --git a/src/inc/dnode.h b/src/inc/dnode.h index f7ebed31cf5df71b3024fd507a9d74163e912f42..b4973cc672546dc99531e3f6349d9761067f94f8 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); -void *dnodeAllocateVnodeWqueue(void *pVnode); -void dnodeFreeVnodeWqueue(void *queue); -void *dnodeAllocateVnodeRqueue(void *pVnode); -void dnodeFreeVnodeRqueue(void *rqueue); -void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); +void *dnodeAllocVWriteQueue(void *pVnode); +void dnodeFreeVWriteQueue(void *wqueue); +void *dnodeAllocVReadQueue(void *pVnode); +void dnodeFreeVReadQueue(void *rqueue); +void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); void dnodeFreeMnodePqueue(); diff --git a/src/os/inc/osAlloc.h b/src/os/inc/osAlloc.h index 77559bb29b5821eb42bc27e71b75281e0309f081..2d970174800373300832706b2b16225c106e5585 100644 --- a/src/os/inc/osAlloc.h +++ b/src/os/inc/osAlloc.h @@ -22,14 +22,14 @@ extern "C" { #ifndef TAOS_OS_FUNC_ALLOC #define tmalloc(size) malloc(size) - #define tcalloc(size) calloc(1, size) + #define tcalloc(nmemb, size) calloc(nmemb, size) #define trealloc(p, size) realloc(p, size) #define tmemalign(alignment, size) malloc(size) #define tfree(p) free(p) #define tmemzero(p, size) memset(p, 0, size) #else void *tmalloc(int32_t size); - void *tcalloc(int32_t size); + void *tcalloc(int32_t nmemb, int32_t size); void *trealloc(void *p, int32_t size); void *tmemalign(int32_t alignment, int32_t size); void tfree(void *p); diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index 7e18c79fe8b7df179c6220bbf050e66a77c6b7ff..62e44d8eb0b70fb1526693895847637daa72247a 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -31,10 +31,14 @@ extern "C" { } \ } -int64_t taosRead(int32_t fd, void *buf, int64_t count); -int64_t taosWrite(int32_t fd, void *buf, int64_t count); -int64_t taosLSeek(int32_t fd, int64_t offset, int32_t whence); +int64_t taosReadImp(int32_t fd, void *buf, int64_t count); +int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); +int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath); + +#define taosRead(fd, buf, count) taosReadImp(fd, buf, count) +#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) +#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence) #define taosClose(x) tclose(x) // TAOS_OS_FUNC_FILE_SENDIFLE @@ -42,12 +46,12 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); #ifdef TAOS_RANDOM_FILE_FAIL - void taosSetRandomFileFailFactor(int factor); + void taosSetRandomFileFailFactor(int32_t factor); void taosSetRandomFileFailOutput(const char *path); #ifdef TAOS_RANDOM_FILE_FAIL_TEST - ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line); - ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line); - off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line); + int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line); + int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line); + int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line); #undef taosRead #undef taosWrite #undef taosLSeek diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index 6c7361d26e0b4b22fb64806f6b5d0c96540bff58..cbfdedef485f73f0005b2da1e1d9cc3adf9dd377 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -42,10 +42,10 @@ extern "C" { #ifdef TAOS_RANDOM_NETWORK_FAIL #ifdef TAOS_RANDOM_NETWORK_FAIL_TEST - ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags); - ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen); - ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count); - ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count); + int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags); + int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen); + int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count); + int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count); #undef taosSend #undef taosSendto #undef taosReadSocket diff --git a/src/os/src/detail/osAlloc.c b/src/os/src/detail/osAlloc.c index dd7a96792aceb4576dabe855a76644a778a2f72b..4ca35793e7bd16e024c4dddbfdcc9138ab70d3f8 100644 --- a/src/os/src/detail/osAlloc.c +++ b/src/os/src/detail/osAlloc.c @@ -32,11 +32,11 @@ void *tmalloc(int32_t size) { return p; } -void *tcalloc(int32_t size) { - void *p = calloc(1, size); +void *tcalloc(int32_t nmemb, int32_t size) { + void *p = calloc(nmemb, size); if (p == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno)); + uError("failed to calloc memory, nmemb:%d size:%d reason:%s", nmemb, size, strerror(errno)); } return p; diff --git a/src/os/src/detail/osFail.c b/src/os/src/detail/osFail.c index 5b870ba6cb3a21246484c504b6c9d54393e0c452..a99bcd01dbccd0290de1fc38a1220b573ab74b22 100644 --- a/src/os/src/detail/osFail.c +++ b/src/os/src/detail/osFail.c @@ -20,7 +20,7 @@ #ifdef TAOS_RANDOM_NETWORK_FAIL -ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) { +int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { errno = ECONNRESET; return -1; @@ -29,8 +29,8 @@ ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t return send(sockfd, buf, len, flags); } -ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, - socklen_t addrlen) { +int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, + const struct sockaddr *dest_addr, socklen_t addrlen) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { errno = ECONNRESET; return -1; @@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_ return sendto(sockfd, buf, len, flags, dest_addr, addrlen); } -ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) { +int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { errno = ECONNRESET; return -1; @@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) { return read(fd, buf, count); } -ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) { +int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { errno = EINTR; return -1; @@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) { sigaction(SIGILL, &act, NULL); } -ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) { +int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) { if (random_file_fail_factor > 0) { if (rand() % random_file_fail_factor == 0) { errno = EIO; @@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char * } } - return taosRead(fd, buf, count); + return taosReadImp(fd, buf, count); } -ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) { +int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) { if (random_file_fail_factor > 0) { if (rand() % random_file_fail_factor == 0) { errno = EIO; @@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char } } - return taosWrite(fd, buf, count); + return taosWriteImp(fd, buf, count); } -off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char *file, uint32_t line) { +int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line) { if (random_file_fail_factor > 0) { if (rand() % random_file_fail_factor == 0) { errno = EIO; @@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char * } } - return taosLSeek(fd, offset, whence); + return taosLSeekImp(fd, offset, whence); } #endif //TAOS_RANDOM_FILE_FAIL diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index c97879e55586fa93e584791a98f1c31a845ea3a9..6eb4515f3098f89991640d2fdee2b0aca47c1703 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -71,7 +71,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP return rename(fullPath, *dstPath); } -int64_t taosRead(int32_t fd, void *buf, int64_t count) { +int64_t taosReadImp(int32_t fd, void *buf, int64_t count) { int64_t leftbytes = count; int64_t readbytes; char * tbuf = (char *)buf; @@ -95,7 +95,7 @@ int64_t taosRead(int32_t fd, void *buf, int64_t count) { return count; } -int64_t taosWrite(int32_t fd, void *buf, int64_t n) { +int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) { int64_t nleft = n; int64_t nwritten = 0; char * tbuf = (char *)buf; @@ -115,6 +115,10 @@ int64_t taosWrite(int32_t fd, void *buf, int64_t n) { return n; } +int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { + return (int64_t)tlseek(fd, (long)offset, whence); +} + #ifndef TAOS_OS_FUNC_FILE_SENDIFLE int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 238631885faa21225cb5b0a24b7f473a4c12857f..3a5416dd30b61f3e8925b3ef67c5533d18d19021 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -13,10 +13,8 @@ * along with this program. If not, see . */ #define _DEFAULT_SOURCE -#include - #define TAOS_RANDOM_FILE_FAIL_TEST - +#include #include "os.h" #include "talgo.h" #include "tchecksum.h" diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 6cbd11996187e5caa50e8de2639f8a0de81284dc..98f815f78acb5f8beb77b27bc43c88e93bf1b917 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -14,9 +14,7 @@ */ #define _DEFAULT_SOURCE - #define TAOS_RANDOM_FILE_FAIL_TEST - #include "os.h" #include "talgo.h" #include "tchecksum.h" diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index c5aa02941642340f50c9a9ef94dd0c50898551af..0806c29ff84b2a1e44792964ed959b7370611373 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -14,9 +14,7 @@ */ #define _DEFAULT_SOURCE - #define TAOS_RANDOM_FILE_FAIL_TEST - #include "os.h" #include "hash.h" #include "taoserror.h" diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9a9782d53b5a89972b46d0d3289516d248c30269..81e6bf0b4729b89021d21f3fc14263fd4a2acb7d 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -255,8 +255,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->fversion = pVnode->version; - pVnode->wqueue = dnodeAllocateVnodeWqueue(pVnode); - pVnode->rqueue = dnodeAllocateVnodeRqueue(pVnode); + pVnode->wqueue = dnodeAllocVWriteQueue(pVnode); + pVnode->rqueue = dnodeAllocVReadQueue(pVnode); if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { vnodeCleanUp(pVnode); return terrno; @@ -322,7 +322,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.writeToCache = vnodeWriteToQueue; - syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp; + syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; @@ -409,12 +409,12 @@ void vnodeRelease(void *pVnodeRaw) { } if (pVnode->wqueue) { - dnodeFreeVnodeWqueue(pVnode->wqueue); + dnodeFreeVWriteQueue(pVnode->wqueue); pVnode->wqueue = NULL; } if (pVnode->rqueue) { - dnodeFreeVnodeRqueue(pVnode->rqueue); + dnodeFreeVReadQueue(pVnode->rqueue); pVnode->rqueue = NULL; } diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index fa5cbed20f36fd965e60aba098bbbcd081204037..c8f0274174dc518ea36500d093023ec53c5fb75d 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -55,7 +55,7 @@ void walCleanUp() { } void *walOpen(char *path, SWalCfg *pCfg) { - SWal *pWal = tcalloc(sizeof(SWal)); + SWal *pWal = tcalloc(1, sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 8a5911da7b07dbb29211dea636feb6dbc1854afc..e57cb0e042f321ed4d4d3c463a84e6917d4e0fc7 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE +#define TAOS_RANDOM_FILE_FAIL_TEST #include "os.h" #include "taoserror.h" #include "tchecksum.h"