diff --git a/src/dnode/inc/dnodeMPeer.h b/src/dnode/inc/dnodeMPeer.h index 9a48703110c389f10ac623ccaeaa85420a32817f..00221baa221a411d614c1fa1bb3dc4525de2ed71 100644 --- a/src/dnode/inc/dnodeMPeer.h +++ b/src/dnode/inc/dnodeMPeer.h @@ -20,11 +20,11 @@ extern "C" { #endif -int32_t dnodeInitMnodePeer(); -void dnodeCleanupMnodePeer(); -int32_t dnodeAllocateMnodePqueue(); -void dnodeFreeMnodePqueue(); -void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg); +int32_t dnodeInitMPeer(); +void dnodeCleanupMPeer(); +int32_t dnodeAllocateMPeerQueue(); +void dnodeFreeMPeerQueue(); +void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMRead.h b/src/dnode/inc/dnodeMRead.h index 4e93838b7998850e1bec79f017fdfbd28f286a11..8a8e71227ddf3cd7b7c77ef664ac3d4c935889db 100644 --- a/src/dnode/inc/dnodeMRead.h +++ b/src/dnode/inc/dnodeMRead.h @@ -20,11 +20,11 @@ extern "C" { #endif -int32_t dnodeInitMnodeRead(); -void dnodeCleanupMnodeRead(); -int32_t dnodeAllocateMnodeRqueue(); -void dnodeFreeMnodeRqueue(); -void dnodeDispatchToMnodeReadQueue(SRpcMsg *rpcMsg); +int32_t dnodeInitMRead(); +void dnodeCleanupMRead(); +int32_t dnodeAllocMReadQueue(); +void dnodeFreeMReadQueue(); +void dnodeDispatchToMReadQueue(SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMWrite.h b/src/dnode/inc/dnodeMWrite.h index 498fea81c59329b4d30874d36d10182b6e3ae54f..6a3d41bc81a03465a2e097bc711c83a8d8af362e 100644 --- a/src/dnode/inc/dnodeMWrite.h +++ b/src/dnode/inc/dnodeMWrite.h @@ -20,11 +20,11 @@ extern "C" { #endif -int32_t dnodeInitMnodeWrite(); -void dnodeCleanupMnodeWrite(); -int32_t dnodeAllocateMnodeWqueue(); -void dnodeFreeMnodeWqueue(); -void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg); +int32_t dnodeInitMWrite(); +void dnodeCleanupMWrite(); +int32_t dnodeAllocMWritequeue(); +void dnodeFreeMWritequeue(); +void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 0da7ab74e45a27a15186014197ab3bf03ef867d0..b5ecc4930fd87c706b1adb8f4d20dbb7b0a5d419 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -35,44 +35,44 @@ typedef struct { typedef struct { int32_t curNum; int32_t maxNum; - SMPeerWorker *peerWorker; + SMPeerWorker *worker; } SMPeerWorkerPool; -static SMPeerWorkerPool tsMPeerPool; +static SMPeerWorkerPool tsMPeerWP; static taos_qset tsMPeerQset; static taos_queue tsMPeerQueue; -static void *dnodeProcessMnodePeerQueue(void *param); +static void *dnodeProcessMPeerQueue(void *param); -int32_t dnodeInitMnodePeer() { +int32_t dnodeInitMPeer() { tsMPeerQset = taosOpenQset(); - tsMPeerPool.maxNum = 1; - tsMPeerPool.curNum = 0; - tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum); + tsMPeerWP.maxNum = 1; + tsMPeerWP.curNum = 0; + tsMPeerWP.worker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerWP.maxNum); - if (tsMPeerPool.peerWorker == NULL) return -1; - for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { - SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; + if (tsMPeerWP.worker == NULL) return -1; + for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) { + SMPeerWorker *pWorker = tsMPeerWP.worker + i; pWorker->workerId = i; dDebug("dnode mpeer worker:%d is created", i); } - dDebug("dnode mpeer is initialized, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset); + dDebug("dnode mpeer is initialized, workers:%d qset:%p", tsMPeerWP.maxNum, tsMPeerQset); return 0; } -void dnodeCleanupMnodePeer() { - for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { - SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; +void dnodeCleanupMPeer() { + for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) { + SMPeerWorker *pWorker = tsMPeerWP.worker + i; if (pWorker->thread) { taosQsetThreadResume(tsMPeerQset); } dDebug("dnode mpeer worker:%d is closed", i); } - for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { - SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; + for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) { + SMPeerWorker *pWorker = tsMPeerWP.worker + i; dDebug("dnode mpeer worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); @@ -84,61 +84,60 @@ void dnodeCleanupMnodePeer() { taosCloseQset(tsMPeerQset); tsMPeerQset = NULL; - taosTFree(tsMPeerPool.peerWorker); + taosTFree(tsMPeerWP.worker); } -int32_t dnodeAllocateMnodePqueue() { +int32_t dnodeAllocateMPeerQueue() { tsMPeerQueue = taosOpenQueue(); if (tsMPeerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL); - for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) { - SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; + for (int32_t i = tsMPeerWP.curNum; i < tsMPeerWP.maxNum; ++i) { + SMPeerWorker *pWorker = tsMPeerWP.worker + i; pWorker->workerId = i; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodePeerQueue, pWorker) != 0) { + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMPeerQueue, pWorker) != 0) { dError("failed to create thread to process mpeer queue, reason:%s", strerror(errno)); } pthread_attr_destroy(&thAttr); - tsMPeerPool.curNum = i + 1; - dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum); + tsMPeerWP.curNum = i + 1; + dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerWP.maxNum); } dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue); return TSDB_CODE_SUCCESS; } -void dnodeFreeMnodePqueue() { +void dnodeFreeMPeerQueue() { dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue); taosCloseQueue(tsMPeerQueue); tsMPeerQueue = NULL; } -void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) { +void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) { dnodeSendRedirectMsg(pMsg, false); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pPeer = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); } - SMnodeMsg *pPeer = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pPeer, pMsg); - taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); + rpcFreeCont(pMsg->pCont); } -static void dnodeFreeMnodePeerMsg(SMnodeMsg *pPeer) { +static void dnodeFreeMPeerMsg(SMnodeMsg *pPeer) { mnodeCleanupMsg(pPeer); taosFreeQitem(pPeer); } -static void dnodeSendRpcMnodePeerRsp(SMnodeMsg *pPeer, int32_t code) { +static void dnodeSendRpcMPeerRsp(SMnodeMsg *pPeer, int32_t code) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; SRpcMsg rpcRsp = { @@ -149,10 +148,10 @@ static void dnodeSendRpcMnodePeerRsp(SMnodeMsg *pPeer, int32_t code) { }; rpcSendResponse(&rpcRsp); - dnodeFreeMnodePeerMsg(pPeer); + dnodeFreeMPeerMsg(pPeer); } -static void *dnodeProcessMnodePeerQueue(void *param) { +static void *dnodeProcessMPeerQueue(void *param) { SMnodeMsg *pPeerMsg; int32_t type; void * unUsed; @@ -165,7 +164,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { dDebug("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); - dnodeSendRpcMnodePeerRsp(pPeerMsg, code); + dnodeSendRpcMPeerRsp(pPeerMsg, code); } return NULL; diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 4ad787f26e1b9e31a043644d710c6e6de5e354aa..c14c7a81586d306ff1423e8dba91e0844293c407 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -35,46 +35,46 @@ typedef struct { typedef struct { int32_t curNum; int32_t maxNum; - SMReadWorker *readWorker; + SMReadWorker *worker; } SMReadWorkerPool; -static SMReadWorkerPool tsMReadPool; +static SMReadWorkerPool tsMReadWP; static taos_qset tsMReadQset; static taos_queue tsMReadQueue; -static void *dnodeProcessMnodeReadQueue(void *param); +static void *dnodeProcessMReadQueue(void *param); -int32_t dnodeInitMnodeRead() { +int32_t dnodeInitMRead() { tsMReadQset = taosOpenQset(); - tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2; - tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum); - tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum); - tsMReadPool.curNum = 0; - tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum); + tsMReadWP.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2; + tsMReadWP.maxNum = MAX(2, tsMReadWP.maxNum); + tsMReadWP.maxNum = MIN(4, tsMReadWP.maxNum); + tsMReadWP.curNum = 0; + tsMReadWP.worker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadWP.maxNum); - if (tsMReadPool.readWorker == NULL) return -1; - for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { - SMReadWorker *pWorker = tsMReadPool.readWorker + i; + if (tsMReadWP.worker == NULL) return -1; + for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) { + SMReadWorker *pWorker = tsMReadWP.worker + i; pWorker->workerId = i; dDebug("dnode mread worker:%d is created", i); } - dDebug("dnode mread is initialized, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset); + dDebug("dnode mread is initialized, workers:%d qset:%p", tsMReadWP.maxNum, tsMReadQset); return 0; } -void dnodeCleanupMnodeRead() { - for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { - SMReadWorker *pWorker = tsMReadPool.readWorker + i; +void dnodeCleanupMRead() { + for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) { + SMReadWorker *pWorker = tsMReadWP.worker + i; if (pWorker->thread) { taosQsetThreadResume(tsMReadQset); } dDebug("dnode mread worker:%d is closed", i); } - for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { - SMReadWorker *pWorker = tsMReadPool.readWorker + i; + for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) { + SMReadWorker *pWorker = tsMReadWP.worker + i; dDebug("dnode mread worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); @@ -86,64 +86,63 @@ void dnodeCleanupMnodeRead() { taosCloseQset(tsMReadQset); tsMReadQset = NULL; - free(tsMReadPool.readWorker); + free(tsMReadWP.worker); } -int32_t dnodeAllocateMnodeRqueue() { +int32_t dnodeAllocMReadQueue() { tsMReadQueue = taosOpenQueue(); if (tsMReadQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL); - for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) { - SMReadWorker *pWorker = tsMReadPool.readWorker + i; + for (int32_t i = tsMReadWP.curNum; i < tsMReadWP.maxNum; ++i) { + SMReadWorker *pWorker = tsMReadWP.worker + i; pWorker->workerId = i; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeReadQueue, pWorker) != 0) { + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMReadQueue, pWorker) != 0) { dError("failed to create thread to process mread queue, reason:%s", strerror(errno)); } pthread_attr_destroy(&thAttr); - tsMReadPool.curNum = i + 1; - dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum); + tsMReadWP.curNum = i + 1; + dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadWP.maxNum); } dDebug("dnode mread queue:%p is allocated", tsMReadQueue); return TSDB_CODE_SUCCESS; } -void dnodeFreeMnodeRqueue() { +void dnodeFreeMReadQueue() { dDebug("dnode mread queue:%p is freed", tsMReadQueue); taosCloseQueue(tsMReadQueue); tsMReadQueue = NULL; } -void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { +void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMReadQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pRead = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); } - SMnodeMsg *pRead = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pRead, pMsg); - taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); + rpcFreeCont(pMsg->pCont); } -static void dnodeFreeMnodeReadMsg(SMnodeMsg *pRead) { +static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { mnodeCleanupMsg(pRead); taosFreeQitem(pRead); } -static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) { +static void dnodeSendRpcMReadRsp(SMnodeMsg *pRead, int32_t code) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { // may be a auto create req, should put into write queue - dnodeReprocessMnodeWriteMsg(pRead); + dnodeReprocessMWriteMsg(pRead); return; } @@ -155,23 +154,23 @@ static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) { }; rpcSendResponse(&rpcRsp); - dnodeFreeMnodeReadMsg(pRead); + dnodeFreeMReadMsg(pRead); } -static void *dnodeProcessMnodeReadQueue(void *param) { - SMnodeMsg *pReadMsg; +static void *dnodeProcessMReadQueue(void *param) { + SMnodeMsg *pRead; int32_t type; void * unUsed; - + while (1) { - if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) { + if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pRead, &unUsed) == 0) { dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset); break; } - dDebug("%p, msg:%s will be processed in mread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); - int32_t code = mnodeProcessRead(pReadMsg); - dnodeSendRpcMnodeReadRsp(pReadMsg, code); + dDebug("%p, msg:%s will be processed in mread queue", pRead->rpcMsg.ahandle, taosMsg[pRead->rpcMsg.msgType]); + int32_t code = mnodeProcessRead(pRead); + dnodeSendRpcMReadRsp(pRead, code); } return NULL; diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 600688b9fd2cb5623b6bed492bbc6c420b55aa81..3940a251d4fbe481473b99ca8bd7be2d1a92980e 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -36,45 +36,45 @@ typedef struct { typedef struct { int32_t curNum; int32_t maxNum; - SMWriteWorker *writeWorker; + SMWriteWorker *worker; } SMWriteWorkerPool; -static SMWriteWorkerPool tsMWritePool; +static SMWriteWorkerPool tsMWriteWP; static taos_qset tsMWriteQset; static taos_queue tsMWriteQueue; extern void * tsDnodeTmr; -static void *dnodeProcessMnodeWriteQueue(void *param); +static void *dnodeProcessMWriteQueue(void *param); -int32_t dnodeInitMnodeWrite() { +int32_t dnodeInitMWrite() { tsMWriteQset = taosOpenQset(); - tsMWritePool.maxNum = 1; - tsMWritePool.curNum = 0; - tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum); + tsMWriteWP.maxNum = 1; + tsMWriteWP.curNum = 0; + tsMWriteWP.worker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWriteWP.maxNum); - if (tsMWritePool.writeWorker == NULL) return -1; - for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { - SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + if (tsMWriteWP.worker == NULL) return -1; + for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) { + SMWriteWorker *pWorker = tsMWriteWP.worker + i; pWorker->workerId = i; dDebug("dnode mwrite worker:%d is created", i); } - dDebug("dnode mwrite is initialized, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset); + dDebug("dnode mwrite is initialized, workers:%d qset:%p", tsMWriteWP.maxNum, tsMWriteQset); return 0; } -void dnodeCleanupMnodeWrite() { - for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { - SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; +void dnodeCleanupMWrite() { + for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) { + SMWriteWorker *pWorker = tsMWriteWP.worker + i; if (pWorker->thread) { taosQsetThreadResume(tsMWriteQset); } dDebug("dnode mwrite worker:%d is closed", i); } - for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { - SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) { + SMWriteWorker *pWorker = tsMWriteWP.worker + i; dDebug("dnode mwrite worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); @@ -86,58 +86,56 @@ void dnodeCleanupMnodeWrite() { taosCloseQset(tsMWriteQset); tsMWriteQset = NULL; - taosTFree(tsMWritePool.writeWorker); + taosTFree(tsMWriteWP.worker); } -int32_t dnodeAllocateMnodeWqueue() { +int32_t dnodeAllocMWritequeue() { tsMWriteQueue = taosOpenQueue(); if (tsMWriteQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL); - for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) { - SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + for (int32_t i = tsMWriteWP.curNum; i < tsMWriteWP.maxNum; ++i) { + SMWriteWorker *pWorker = tsMWriteWP.worker + i; pWorker->workerId = i; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeWriteQueue, pWorker) != 0) { + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMWriteQueue, pWorker) != 0) { dError("failed to create thread to process mwrite queue, reason:%s", strerror(errno)); } pthread_attr_destroy(&thAttr); - tsMWritePool.curNum = i + 1; - dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum); + tsMWriteWP.curNum = i + 1; + dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWriteWP.maxNum); } dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue); return TSDB_CODE_SUCCESS; } -void dnodeFreeMnodeWqueue() { +void dnodeFreeMWritequeue() { dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue); taosCloseQueue(tsMWriteQueue); tsMWriteQueue = NULL; } -void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { +void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); + dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); + taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } - SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pWrite, pMsg); - - dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, - taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); - taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); + rpcFreeCont(pMsg->pCont); } -static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { +static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); @@ -145,12 +143,12 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { taosFreeQitem(pWrite); } -void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { +void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code) { SMnodeMsg *pWrite = pMsg; if (pWrite == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { - dnodeReprocessMnodeWriteMsg(pWrite); + dnodeReprocessMWriteMsg(pWrite); return; } @@ -162,10 +160,10 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { }; rpcSendResponse(&rpcRsp); - dnodeFreeMnodeWriteMsg(pWrite); + dnodeFreeMWriteMsg(pWrite); } -static void *dnodeProcessMnodeWriteQueue(void *param) { +static void *dnodeProcessMWriteQueue(void *param) { SMnodeMsg *pWrite; int32_t type; void * unUsed; @@ -180,13 +178,13 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { taosMsg[pWrite->rpcMsg.msgType]); int32_t code = mnodeProcessWrite(pWrite); - dnodeSendRpcMnodeWriteRsp(pWrite, code); + dnodeSendRpcMWriteRsp(pWrite, code); } return NULL; } -void dnodeReprocessMnodeWriteMsg(void *pMsg) { +void dnodeReprocessMWriteMsg(void *pMsg) { SMnodeMsg *pWrite = pMsg; if (!mnodeIsRunning() || tsMWriteQueue == NULL) { @@ -194,7 +192,7 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); dnodeSendRedirectMsg(pMsg, true); - dnodeFreeMnodeWriteMsg(pWrite); + dnodeFreeMWriteMsg(pWrite); } else { dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry); @@ -203,12 +201,12 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { } } -static void dnodeDoDelayReprocessMnodeWriteMsg(void *param, void *tmrId) { - dnodeReprocessMnodeWriteMsg(param); +static void dnodeDoDelayReprocessMWriteMsg(void *param, void *tmrId) { + dnodeReprocessMWriteMsg(param); } -void dnodeDelayReprocessMnodeWriteMsg(void *pMsg) { +void dnodeDelayReprocessMWriteMsg(void *pMsg) { SMnodeMsg *mnodeMsg = pMsg; void *unUsed = NULL; - taosTmrReset(dnodeDoDelayReprocessMnodeWriteMsg, 300, mnodeMsg, tsDnodeTmr, &unUsed); + taosTmrReset(dnodeDoDelayReprocessMWriteMsg, 300, mnodeMsg, tsDnodeTmr, &unUsed); } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 5fde4f972be404bc5a35255cefb6e3f8038bf6fc..f4c0ee565e4246e0d8a5c4fe4d67a590721e8de2 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -37,11 +37,11 @@ #include "dnodeShell.h" #include "dnodeTelemetry.h" -static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; +static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED; static int32_t dnodeInitStorage(); static void dnodeCleanupStorage(); -static void dnodeSetRunStatus(SDnodeRunStatus status); +static void dnodeSetRunStatus(SRunStatus status); static void dnodeCheckDataDirOpenned(char *dir); static int32_t dnodeInitComponents(); static void dnodeCleanupComponents(int32_t stepId); @@ -63,9 +63,9 @@ static const SDnodeComponent tsDnodeComponents[] = { {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVRead, dnodeCleanupVRead}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, - {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, - {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, - {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, + {"mread", dnodeInitMRead, dnodeCleanupMRead}, + {"mwrite", dnodeInitMWrite, dnodeCleanupMWrite}, + {"mpeer", dnodeInitMPeer, dnodeCleanupMPeer}, {"client", dnodeInitClient, dnodeCleanupClient}, {"server", dnodeInitServer, dnodeCleanupServer}, {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, @@ -104,7 +104,7 @@ static int32_t dnodeInitComponents() { } int32_t dnodeInitSystem() { - dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); + dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE); tscEmbedded = 1; taosBlockSIGPIPE(); taosResolveCRC(); @@ -137,7 +137,7 @@ int32_t dnodeInitSystem() { } dnodeStartModules(); - dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); + dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING); dInfo("TDengine is initialized successfully"); @@ -145,20 +145,20 @@ int32_t dnodeInitSystem() { } void dnodeCleanUpSystem() { - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { - dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); + if (dnodeGetRunStatus() != TSDB_RUN_STATUS_STOPPED) { + dnodeSetRunStatus(TSDB_RUN_STATUS_STOPPED); dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1); taos_cleanup(); taosCloseLog(); } } -SDnodeRunStatus dnodeGetRunStatus() { - return tsDnodeRunStatus; +SRunStatus dnodeGetRunStatus() { + return tsRunStatus; } -static void dnodeSetRunStatus(SDnodeRunStatus status) { - tsDnodeRunStatus = status; +static void dnodeSetRunStatus(SRunStatus status) { + tsRunStatus = status; } static void dnodeCheckDataDirOpenned(char *dir) { diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4a5dc31a9fc5008b1a1c8288d9798d23acfb98b6..dcb48f7833abf921d1085e1cbb14b9cfbf2aa8b7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -47,6 +47,11 @@ typedef struct { int32_t * vnodeList; } SOpenVnodeThread; +typedef struct { + SRpcMsg rpcMsg; + char pCont[]; +} SMgmtMsg; + void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -172,38 +177,46 @@ void dnodeCleanupMgmt() { vnodeCleanupResources(); } -void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { - void *item; +static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { + int32_t size = sizeof(SMgmtMsg) + pMsg->contLen; + SMgmtMsg *pMgmt = taosAllocateQitem(size); + if (pMgmt == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } - item = taosAllocateQitem(sizeof(SRpcMsg)); - if (item) { - memcpy(item, pMsg, sizeof(SRpcMsg)); - taosWriteQitem(tsMgmtQueue, 1, item); - } else { - SRpcMsg rsp = { - .handle = pMsg->handle, - .pCont = NULL, - .code = TSDB_CODE_DND_OUT_OF_MEMORY - }; - + pMgmt->rpcMsg = *pMsg; + pMgmt->rpcMsg.pCont = pMgmt->pCont; + memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); + taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); + + return TSDB_CODE_SUCCESS; +} + +void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { + int32_t code = dnodeWriteToMgmtQueue(pMsg); + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = code}; rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); } + + rpcFreeCont(pMsg->pCont); } static void *dnodeProcessMgmtQueue(void *param) { - SRpcMsg *pMsg; - SRpcMsg rsp = {0}; - int type; - void * handle; + SMgmtMsg *pMgmt; + SRpcMsg * pMsg; + SRpcMsg rsp = {0}; + int32_t qtype; + void * handle; while (1) { - if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { + if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); break; } - dDebug("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + pMsg = &pMgmt->rpcMsg; + dDebug("%p, msg:%p:%s will be processed", pMsg->ahandle, pMgmt, taosMsg[pMsg->msgType]); if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { @@ -211,10 +224,9 @@ static void *dnodeProcessMgmtQueue(void *param) { } rsp.handle = pMsg->handle; - rsp.pCont = NULL; + rsp.pCont = NULL; rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index c6fc2b9e36e1c51374d8a7b164ff32a97c0cf9d0..afa712a965e8233baaf9d4f5f14abeee8e88cc38 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -19,6 +19,7 @@ * to dnode. All theses messages are handled from here */ +#define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" #include "tglobal.h" @@ -34,8 +35,8 @@ static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); -static void *tsDnodeServerRpc = NULL; -static void *tsDnodeClientRpc = NULL; +static void *tsServerRpc = NULL; +static void *tsClientRpc = NULL; int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; @@ -50,11 +51,11 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodePeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodePeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodePeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -66,8 +67,8 @@ int32_t dnodeInitServer() { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - tsDnodeServerRpc = rpcOpen(&rpcInit); - if (tsDnodeServerRpc == NULL) { + tsServerRpc = rpcOpen(&rpcInit); + if (tsServerRpc == NULL) { dError("failed to init inter-dnodes RPC server"); return -1; } @@ -77,9 +78,9 @@ int32_t dnodeInitServer() { } void dnodeCleanupServer() { - if (tsDnodeServerRpc) { - rpcClose(tsDnodeServerRpc); - tsDnodeServerRpc = NULL; + if (tsServerRpc) { + rpcClose(tsServerRpc); + tsServerRpc = NULL; dInfo("inter-dnodes RPC server is closed"); } } @@ -93,7 +94,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pMsg->pCont == NULL) return; - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); @@ -131,8 +132,8 @@ int32_t dnodeInitClient() { rpcInit.ckey = "key"; rpcInit.secret = secret; - tsDnodeClientRpc = rpcOpen(&rpcInit); - if (tsDnodeClientRpc == NULL) { + tsClientRpc = rpcOpen(&rpcInit); + if (tsClientRpc == NULL) { dError("failed to init mnode rpc client"); return -1; } @@ -142,9 +143,9 @@ int32_t dnodeInitClient() { } void dnodeCleanupClient() { - if (tsDnodeClientRpc) { - rpcClose(tsDnodeClientRpc); - tsDnodeClientRpc = NULL; + if (tsClientRpc) { + rpcClose(tsClientRpc); + tsClientRpc = NULL; dInfo("dnode inter-dnodes rpc client is closed"); } } @@ -168,15 +169,15 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { } void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); + rpcSendRequest(tsClientRpc, epSet, rpcMsg); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(&epSet); - rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); + rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp); } void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { - rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp); + rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); } \ No newline at end of file diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 2f6d844ff177e5afa5a92981006b7135500dcd32..89f657f78986b8e57c0b5e1dedb841c451db00ba 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -33,9 +33,9 @@ static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -static void * tsDnodeShellRpc = NULL; -static int32_t tsDnodeQueryReqNum = 0; -static int32_t tsDnodeSubmitReqNum = 0; +static void * tsShellRpc = NULL; +static int32_t tsQueryReqNum = 0; +static int32_t tsSubmitReqNum = 0; int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; @@ -44,35 +44,35 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; // the following message shall be treated as mnode query - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); @@ -91,8 +91,8 @@ int32_t dnodeInitShell() { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.afp = dnodeRetrieveUserAuthInfo; - tsDnodeShellRpc = rpcOpen(&rpcInit); - if (tsDnodeShellRpc == NULL) { + tsShellRpc = rpcOpen(&rpcInit); + if (tsShellRpc == NULL) { dError("failed to init shell rpc server"); return -1; } @@ -102,13 +102,13 @@ int32_t dnodeInitShell() { } void dnodeCleanupShell() { - if (tsDnodeShellRpc) { - rpcClose(tsDnodeShellRpc); - tsDnodeShellRpc = NULL; + if (tsShellRpc) { + rpcClose(tsShellRpc); + tsShellRpc = NULL; } } -void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rpcMsg = { .handle = pMsg->handle, .pCont = NULL, @@ -117,7 +117,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pMsg->pCont == NULL) return; - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rpcMsg); @@ -126,9 +126,9 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { - atomic_fetch_add_32(&tsDnodeQueryReqNum, 1); + atomic_fetch_add_32(&tsQueryReqNum, 1); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1); + atomic_fetch_add_32(&tsSubmitReqNum, 1); } else {} if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { @@ -211,12 +211,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { } } -SDnodeStatisInfo dnodeGetStatisInfo() { - SDnodeStatisInfo info = {0}; - if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { +SStatisInfo dnodeGetStatisInfo() { + SStatisInfo info = {0}; + if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) { info.httpReqNum = httpGetReqCount(); - info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0); - info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0); + info.queryReqNum = atomic_exchange_32(&tsQueryReqNum, 0); + info.submitReqNum = atomic_exchange_32(&tsSubmitReqNum, 0); } return info; diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index c28ad66b65ebb8802431e74584afff05ce1b4cfc..9b7497d3ecd51a591b827ed3bb710411015538d4 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -113,7 +113,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { void *dnodeAllocVWriteQueue(void *pVnode) { pthread_mutex_lock(&tsVWriteWP.mutex); SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; - void *queue = taosOpenQueue(); + taos_queue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&tsVWriteWP.mutex); return NULL; diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 6032d8cc0a0ae18f91ffb01a59f31c731ec8318f..9454b97e388e6d51b52e7fd6c1fe586b1e5bdbbd 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -27,16 +27,16 @@ typedef struct { int32_t queryReqNum; int32_t submitReqNum; int32_t httpReqNum; -} SDnodeStatisInfo; +} SStatisInfo; typedef enum { - TSDB_DNODE_RUN_STATUS_INITIALIZE, - TSDB_DNODE_RUN_STATUS_RUNING, - TSDB_DNODE_RUN_STATUS_STOPPED -} SDnodeRunStatus; + TSDB_RUN_STATUS_INITIALIZE, + TSDB_RUN_STATUS_RUNING, + TSDB_RUN_STATUS_STOPPED +} SRunStatus; -SDnodeRunStatus dnodeGetRunStatus(); -SDnodeStatisInfo dnodeGetStatisInfo(); +SRunStatus dnodeGetRunStatus(); +SStatisInfo dnodeGetStatisInfo(); bool dnodeIsFirstDeploy(); bool dnodeIsMasterEp(char *ep); @@ -59,15 +59,15 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void *dnodeAllocVReadQueue(void *pVnode); void dnodeFreeVReadQueue(void *rqueue); -int32_t dnodeAllocateMnodePqueue(); -void dnodeFreeMnodePqueue(); -int32_t dnodeAllocateMnodeRqueue(); -void dnodeFreeMnodeRqueue(); -int32_t dnodeAllocateMnodeWqueue(); -void dnodeFreeMnodeWqueue(); -void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); -void dnodeReprocessMnodeWriteMsg(void *pMsg); -void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); +int32_t dnodeAllocateMPeerQueue(); +void dnodeFreeMPeerQueue(); +int32_t dnodeAllocMReadQueue(); +void dnodeFreeMReadQueue(); +int32_t dnodeAllocMWritequeue(); +void dnodeFreeMWritequeue(); +void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code); +void dnodeReprocessMWriteMsg(void *pMsg); +void dnodeDelayReprocessMWriteMsg(void *pMsg); void dnodeSendStatusMsgToMnode(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 5bef7402e30a0cc9f1964cc2430e0b1f5141590c..128e4d35a4d2500ae49fa4e72d90813bc98b414b 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -35,7 +35,13 @@ typedef struct { } SMnodeRsp; typedef struct SMnodeMsg { - SRpcMsg rpcMsg; + struct SAcctObj * pAcct; + struct SDnodeObj *pDnode; + struct SUserObj * pUser; + struct SDbObj * pDb; + struct SVgObj * pVgroup; + struct STableObj *pTable; + struct SSTableObj*pSTable; SMnodeRsp rpcRsp; int8_t received; int8_t successed; @@ -43,16 +49,11 @@ typedef struct SMnodeMsg { int8_t retry; int32_t code; void * pObj; - struct SAcctObj * pAcct; - struct SDnodeObj *pDnode; - struct SUserObj * pUser; - struct SDbObj * pDb; - struct SVgObj * pVgroup; - struct STableObj *pTable; - struct SSuperTableObj *pSTable; + SRpcMsg rpcMsg; + char pCont[]; } SMnodeMsg; -void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg); +void * mnodeCreateMsg(SRpcMsg *pRpcMsg); int32_t mnodeInitMsg(SMnodeMsg *pMsg); void mnodeCleanupMsg(SMnodeMsg *pMsg); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 6c7229f12d141a15e7750f08ce427fcecd7146c8..6ee25673227f4a36caf8c01508a8c8078475bcd7 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -429,42 +429,43 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_PORT_DNODEDNODE 5 #define TSDB_PORT_SYNC 10 #define TSDB_PORT_HTTP 11 -#define TSDB_PORT_ARBITRATOR 12 +#define TSDB_PORT_ARBITRATOR 12 -#define TAOS_QTYPE_RPC 0 -#define TAOS_QTYPE_FWD 1 -#define TAOS_QTYPE_WAL 2 -#define TAOS_QTYPE_CQ 3 -#define TAOS_QTYPE_QUERY 4 +typedef enum { + TAOS_QTYPE_RPC = 0, + TAOS_QTYPE_FWD = 1, + TAOS_QTYPE_WAL = 2, + TAOS_QTYPE_CQ = 3, + TAOS_QTYPE_QUERY = 4 +} EQType; typedef enum { - TSDB_SUPER_TABLE = 0, // super table - TSDB_CHILD_TABLE = 1, // table created from super table - TSDB_NORMAL_TABLE = 2, // ordinary table - TSDB_STREAM_TABLE = 3, // table created from stream computing - TSDB_TABLE_MAX = 4 + TSDB_SUPER_TABLE = 0, // super table + TSDB_CHILD_TABLE = 1, // table created from super table + TSDB_NORMAL_TABLE = 2, // ordinary table + TSDB_STREAM_TABLE = 3, // table created from stream computing + TSDB_TABLE_MAX = 4 } ETableType; typedef enum { - TSDB_MOD_MNODE, - TSDB_MOD_HTTP, - TSDB_MOD_MONITOR, - TSDB_MOD_MQTT, - TSDB_MOD_MAX + TSDB_MOD_MNODE = 0, + TSDB_MOD_HTTP = 1, + TSDB_MOD_MONITOR = 2, + TSDB_MOD_MQTT = 3, + TSDB_MOD_MAX = 4 } EModuleType; - typedef enum { - TSDB_CHECK_ITEM_NETWORK, - TSDB_CHECK_ITEM_MEM, - TSDB_CHECK_ITEM_CPU, - TSDB_CHECK_ITEM_DISK, - TSDB_CHECK_ITEM_OS, - TSDB_CHECK_ITEM_ACCESS, - TSDB_CHECK_ITEM_VERSION, - TSDB_CHECK_ITEM_DATAFILE, - TSDB_CHECK_ITEM_MAX - } ECheckItemType; - +typedef enum { + TSDB_CHECK_ITEM_NETWORK, + TSDB_CHECK_ITEM_MEM, + TSDB_CHECK_ITEM_CPU, + TSDB_CHECK_ITEM_DISK, + TSDB_CHECK_ITEM_OS, + TSDB_CHECK_ITEM_ACCESS, + TSDB_CHECK_ITEM_VERSION, + TSDB_CHECK_ITEM_DATAFILE, + TSDB_CHECK_ITEM_MAX +} ECheckItemType; #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index d25652100b8a9314946bb51516c71485db0d1270..6d3061c4269f54f0075a07ee75115f43e856479b 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -89,7 +89,7 @@ typedef struct STableObj { int8_t type; } STableObj; -typedef struct SSuperTableObj { +typedef struct SSTableObj { STableObj info; int8_t reserved0[9]; // for fill struct STableObj to 4byte align int16_t nextColId; @@ -104,7 +104,7 @@ typedef struct SSuperTableObj { int32_t numOfTables; SSchema * schema; void * vgHash; -} SSuperTableObj; +} SSTableObj; typedef struct { STableObj info; @@ -122,8 +122,8 @@ typedef struct { int32_t refCount; char* sql; //used by normal table SSchema* schema; //used by normal table - SSuperTableObj *superTable; -} SChildTableObj; + SSTableObj*superTable; +} SCTableObj; typedef struct { int32_t dnodeId; diff --git a/src/mnode/inc/mnodeTable.h b/src/mnode/inc/mnodeTable.h index ed0dbe4ecfc4b344052c70487c3b740d81eaa0f3..7c0077aa609843aa99e9f5114573aea8f7446fc1 100644 --- a/src/mnode/inc/mnodeTable.h +++ b/src/mnode/inc/mnodeTable.h @@ -29,8 +29,8 @@ int64_t mnodeGetChildTableNum(); void * mnodeGetTable(char *tableId); void mnodeIncTableRef(void *pTable); void mnodeDecTableRef(void *pTable); -void * mnodeGetNextChildTable(void *pIter, SChildTableObj **pTable); -void * mnodeGetNextSuperTable(void *pIter, SSuperTableObj **pTable); +void * mnodeGetNextChildTable(void *pIter, SCTableObj **pTable); +void * mnodeGetNextSuperTable(void *pIter, SSTableObj **pTable); void mnodeDropAllChildTables(SDbObj *pDropDb); void mnodeDropAllSuperTables(SDbObj *pDropDb); void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup); diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 7aa662b81cc08c90d6453ff5c73191ad81416073..0e6d9dfde463dc50810654921b2006fda212e496 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -43,8 +43,8 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle); void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle); int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid); -void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); -void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); +void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable); +void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); diff --git a/src/mnode/src/mnodeInt.c b/src/mnode/src/mnodeInt.c index fb1b8741a9f8895c579ad0e549dc8f3c15f7618c..d3f2407386d66bf957e9937248ee409382f0d328 100644 --- a/src/mnode/src/mnodeInt.c +++ b/src/mnode/src/mnodeInt.c @@ -18,7 +18,7 @@ #include "taosmsg.h" #include "taoserror.h" #include "trpc.h" -#include "tcache.h" +#include "tqueue.h" #include "mnode.h" #include "dnode.h" #include "mnodeDef.h" @@ -34,8 +34,15 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg) { - pMsg->rpcMsg = *rpcMsg; +void *mnodeCreateMsg(SRpcMsg *pRpcMsg) { + int32_t size = sizeof(SMnodeMsg) + pRpcMsg->contLen; + SMnodeMsg *pMsg = taosAllocateQitem(size); + + pMsg->rpcMsg = *pRpcMsg; + pMsg->rpcMsg.pCont = pMsg->pCont; + memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); + + return pMsg; } int32_t mnodeInitMsg(SMnodeMsg *pMsg) { @@ -54,7 +61,9 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) { void mnodeCleanupMsg(SMnodeMsg *pMsg) { if (pMsg != NULL) { - if (pMsg->rpcMsg.pCont) rpcFreeCont(pMsg->rpcMsg.pCont); + if (pMsg->rpcMsg.pCont != pMsg->pCont) { + tfree(pMsg->rpcMsg.pCont); + } if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser); if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb); if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup); diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index 2bb8a810566a676a4ae991bc79e5eb78234747c7..1f5ad42bdead75064c60f8626ce4916e7b08fd9f 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -96,9 +96,9 @@ int32_t mnodeStartSystem() { return -1; } - dnodeAllocateMnodeWqueue(); - dnodeAllocateMnodeRqueue(); - dnodeAllocateMnodePqueue(); + dnodeAllocMWritequeue(); + dnodeAllocMReadQueue(); + dnodeAllocateMPeerQueue(); if (mnodeInitComponents() != 0) { return -1; @@ -127,9 +127,9 @@ void mnodeCleanupSystem() { mInfo("starting to clean up mnode"); tsMgmtIsRunning = false; - dnodeFreeMnodeWqueue(); - dnodeFreeMnodeRqueue(); - dnodeFreeMnodePqueue(); + dnodeFreeMWritequeue(); + dnodeFreeMReadQueue(); + dnodeFreeMPeerQueue(); mnodeCleanupTimer(); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index ae359d456d567c223590f9f5d51c6c3312b01da7..8cd8de62ad310729d44b549d4c44fc2bdc6a0f4c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -295,7 +295,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { if (pOper->writeCb != NULL) { pOper->retCode = (*pOper->writeCb)(pMsg, pOper->retCode); } - dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + dnodeSendRpcMWriteRsp(pMsg, pOper->retCode); // if ahandle, means this func is called by sdb write if (ahandle == NULL) { @@ -1043,7 +1043,7 @@ void sdbFreeWritequeue() { int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { SWalHead *pHead = data; int32_t size = sizeof(SWalHead) + pHead->len; - SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); + SWalHead *pWal = taosAllocateQitem(size); memcpy(pWal, pHead, size); taosWriteQitem(tsSdbWriteQueue, qtype, pWal); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 28dc2c6cff7b5e8eabbe78de89051cc9a5cddde4..d27688f8a915ff9f094afc1bf4699efa9f96444c 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -52,9 +52,9 @@ static int32_t tsSuperTableUpdateSize; static void * mnodeGetChildTable(char *tableId); static void * mnodeGetSuperTable(char *tableId); static void * mnodeGetSuperTableByUid(uint64_t uid); -static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable); -static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable); -static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable); +static void mnodeDropAllChildTablesInStable(SSTableObj *pStable); +static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable); +static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable); static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -86,9 +86,9 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg); static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg); static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); +static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName); -static void mnodeDestroyChildTable(SChildTableObj *pTable) { +static void mnodeDestroyChildTable(SCTableObj *pTable) { taosTFree(pTable->info.tableId); taosTFree(pTable->schema); taosTFree(pTable->sql); @@ -101,7 +101,7 @@ static int32_t mnodeChildTableActionDestroy(SSdbOper *pOper) { } static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) { - SChildTableObj *pTable = pOper->pObj; + SCTableObj *pTable = pOper->pObj; SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -150,7 +150,7 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) { } static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) { - SChildTableObj *pTable = pOper->pObj; + SCTableObj *pTable = pOper->pObj; if (pTable->vgId == 0) { return TSDB_CODE_MND_VGROUP_NOT_EXIST; } @@ -186,8 +186,8 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) { } static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) { - SChildTableObj *pNew = pOper->pObj; - SChildTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); + SCTableObj *pNew = pOper->pObj; + SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); if (pTable != pNew) { void *oldTableId = pTable->info.tableId; void *oldSql = pTable->sql; @@ -195,7 +195,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) { void *oldSTable = pTable->superTable; int32_t oldRefCount = pTable->refCount; - memcpy(pTable, pNew, sizeof(SChildTableObj)); + memcpy(pTable, pNew, sizeof(SCTableObj)); pTable->refCount = oldRefCount; pTable->sql = pNew->sql; @@ -213,7 +213,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) { } static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) { - SChildTableObj *pTable = pOper->pObj; + SCTableObj *pTable = pOper->pObj; assert(pTable != NULL && pOper->rowData != NULL); int32_t len = strlen(pTable->info.tableId); @@ -244,7 +244,7 @@ static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) { static int32_t mnodeChildTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); - SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); + SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; int32_t len = strlen(pOper->rowData); @@ -284,7 +284,7 @@ static int32_t mnodeChildTableActionDecode(SSdbOper *pOper) { static int32_t mnodeChildTableActionRestored() { void *pIter = NULL; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; while (1) { pIter = mnodeGetNextChildTable(pIter, &pTable); @@ -323,7 +323,7 @@ static int32_t mnodeChildTableActionRestored() { } if (pTable->info.type == TSDB_CHILD_TABLE) { - SSuperTableObj *pSuperTable = mnodeGetSuperTableByUid(pTable->suid); + SSTableObj *pSuperTable = mnodeGetSuperTableByUid(pTable->suid); if (pSuperTable == NULL) { mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid); pTable->vgId = 0; @@ -344,14 +344,14 @@ static int32_t mnodeChildTableActionRestored() { } static int32_t mnodeInitChildTables() { - SChildTableObj tObj; + SCTableObj tObj; tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_CTABLE, .tableName = "ctables", .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, - .maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE, + .maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE, .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .keyType = SDB_KEY_VAR_STRING, .insertFp = mnodeChildTableActionInsert, @@ -386,7 +386,7 @@ int64_t mnodeGetChildTableNum() { return sdbGetNumOfRows(tsChildTableSdb); } -static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { +static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { atomic_add_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) { @@ -402,7 +402,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt } } -static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { +static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable) { atomic_sub_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) return; @@ -416,7 +416,7 @@ static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj * mnodeDecVgroupRef(pVgroup); } -static void mnodeDestroySuperTable(SSuperTableObj *pStable) { +static void mnodeDestroySuperTable(SSTableObj *pStable) { if (pStable->vgHash != NULL) { taosHashCleanup(pStable->vgHash); pStable->vgHash = NULL; @@ -432,7 +432,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbOper *pOper) { } static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) { - SSuperTableObj *pStable = pOper->pObj; + SSTableObj *pStable = pOper->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) { mnodeAddSuperTableIntoDb(pDb); @@ -443,11 +443,11 @@ static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) { } static int32_t mnodeSuperTableActionDelete(SSdbOper *pOper) { - SSuperTableObj *pStable = pOper->pObj; + SSTableObj *pStable = pOper->pObj; SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); if (pDb != NULL) { mnodeRemoveSuperTableFromDb(pDb); - mnodeDropAllChildTablesInStable((SSuperTableObj *)pStable); + mnodeDropAllChildTablesInStable((SSTableObj *)pStable); } mnodeDecDbRef(pDb); @@ -455,8 +455,8 @@ static int32_t mnodeSuperTableActionDelete(SSdbOper *pOper) { } static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { - SSuperTableObj *pNew = pOper->pObj; - SSuperTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); + SSTableObj *pNew = pOper->pObj; + SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); if (pTable != NULL && pTable != pNew) { void *oldTableId = pTable->info.tableId; void *oldSchema = pTable->schema; @@ -464,7 +464,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { int32_t oldRefCount = pTable->refCount; int32_t oldNumOfTables = pTable->numOfTables; - memcpy(pTable, pNew, sizeof(SSuperTableObj)); + memcpy(pTable, pNew, sizeof(SSTableObj)); pTable->vgHash = oldVgHash; pTable->refCount = oldRefCount; @@ -480,7 +480,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { } static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) { - SSuperTableObj *pStable = pOper->pObj; + SSTableObj *pStable = pOper->pObj; assert(pOper->pObj != NULL && pOper->rowData != NULL); int32_t len = strlen(pStable->info.tableId); @@ -504,7 +504,7 @@ static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) { static int32_t mnodeSuperTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); - SSuperTableObj *pStable = (SSuperTableObj *) calloc(1, sizeof(SSuperTableObj)); + SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; int32_t len = strlen(pOper->rowData); @@ -537,14 +537,14 @@ static int32_t mnodeSuperTableActionRestored() { } static int32_t mnodeInitSuperTables() { - SSuperTableObj tObj; + SSTableObj tObj; tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_STABLE, .tableName = "stables", .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, - .maxRowSize = sizeof(SSuperTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN, + .maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN, .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .keyType = SDB_KEY_VAR_STRING, .insertFp = mnodeSuperTableActionInsert, @@ -615,7 +615,7 @@ static void *mnodeGetSuperTable(char *tableId) { } static void *mnodeGetSuperTableByUid(uint64_t uid) { - SSuperTableObj *pStable = NULL; + SSTableObj *pStable = NULL; void *pIter = NULL; while (1) { @@ -647,11 +647,11 @@ void *mnodeGetTable(char *tableId) { return NULL; } -void *mnodeGetNextChildTable(void *pIter, SChildTableObj **pTable) { +void *mnodeGetNextChildTable(void *pIter, SCTableObj **pTable) { return sdbFetchRow(tsChildTableSdb, pIter, (void **)pTable); } -void *mnodeGetNextSuperTable(void *pIter, SSuperTableObj **pTable) { +void *mnodeGetNextSuperTable(void *pIter, SSTableObj **pTable) { return sdbFetchRow(tsSuperTableSdb, pIter, (void **)pTable); } @@ -765,12 +765,12 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { } if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - SSuperTableObj *pSTable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pSTable = (SSTableObj *)pMsg->pTable; mInfo("app:%p:%p, table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash)); return mnodeProcessDropSuperTableMsg(pMsg); } else { - SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pCTable = (SCTableObj *)pMsg->pTable; mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d tid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pCTable->vgId, pCTable->tid, pCTable->uid); return mnodeProcessDropChildTableMsg(pMsg); @@ -816,7 +816,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { } static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pTable = (SSTableObj *)pMsg->pTable; assert(pTable); if (code == TSDB_CODE_SUCCESS) { @@ -835,7 +835,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; - SSuperTableObj * pStable = calloc(1, sizeof(SSuperTableObj)); + SSTableObj * pStable = calloc(1, sizeof(SSTableObj)); if (pStable == NULL) { mError("app:%p:%p, table:%s, failed to create, no enough memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -878,7 +878,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, - .rowSize = sizeof(SSuperTableObj) + schemaSize, + .rowSize = sizeof(SSTableObj) + schemaSize, .pMsg = pMsg, .writeCb = mnodeCreateSuperTableCb }; @@ -894,7 +894,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { } static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pTable = (SSTableObj *)pMsg->pTable; if (code != TSDB_CODE_SUCCESS) { mError("app:%p:%p, stable:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); } else { @@ -907,7 +907,7 @@ static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) { SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); while (taosHashIterNext(pIter)) { @@ -950,7 +950,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { return code; } -static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { +static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagName) { SSchema *schema = (SSchema *) pStable->schema; for (int32_t tag = 0; tag < pStable->numOfTags; tag++) { if (strcasecmp(schema[pStable->numOfColumns + tag].name, tagName) == 0) { @@ -962,7 +962,7 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char * } static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, add tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); @@ -970,7 +970,7 @@ static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { } static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t ntags) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { mError("app:%p:%p, stable:%s, add tag, too many tags", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); return TSDB_CODE_MND_TOO_MANY_TAGS; @@ -1018,14 +1018,14 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t } static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, drop tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); return code; } static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName); if (col < 0) { mError("app:%p:%p, stable:%s, drop tag, tag:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, @@ -1052,14 +1052,14 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { } static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, modify tag result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); return code; } static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, char *newTagName) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName); if (col < 0) { mError("app:%p:%p, stable:%s, failed to modify table tag, oldName: %s, newName: %s", pMsg->rpcMsg.ahandle, pMsg, @@ -1095,7 +1095,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c return sdbUpdateRow(&oper); } -static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { +static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) { SSchema *schema = (SSchema *) pStable->schema; for (int32_t col = 0; col < pStable->numOfColumns; col++) { if (strcasecmp(schema[col].name, colName) == 0) { @@ -1107,7 +1107,7 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col } static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); return code; @@ -1115,7 +1115,7 @@ static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) { SDbObj *pDb = pMsg->pDb; - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; if (ncols <= 0) { mError("app:%p:%p, stable:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, ncols); return TSDB_CODE_MND_APP_ERROR; @@ -1170,7 +1170,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 } static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, delete column result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); return code; @@ -1178,7 +1178,7 @@ static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { SDbObj *pDb = pMsg->pDb; - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName); if (col <= 0) { mError("app:%p:%p, stable:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, @@ -1215,14 +1215,14 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { } static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; mLInfo("app:%p:%p, stable %s, change column result:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tstrerror(code)); return code; } static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { - SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pStable = (SSTableObj *)pMsg->pTable; int32_t col = mnodeFindSuperTableColumnIndex(pStable, oldName); if (col < 0) { mError("app:%p:%p, stable:%s, change column, oldName: %s, newName: %s", pMsg->rpcMsg.ahandle, pMsg, @@ -1321,7 +1321,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, int32_t numOfRows = 0; char * pWrite; int32_t cols = 0; - SSuperTableObj *pTable = NULL; + SSTableObj *pTable = NULL; char prefix[64] = {0}; int32_t prefixLen; @@ -1399,7 +1399,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void mnodeDropAllSuperTables(SDbObj *pDropDb) { void * pIter= NULL; int32_t numOfTables = 0; - SSuperTableObj *pTable = NULL; + SSTableObj *pTable = NULL; char prefix[64] = {0}; tstrncpy(prefix, pDropDb->name, 64); @@ -1430,7 +1430,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) { mInfo("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); } -static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { +static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; assert(numOfCols <= TSDB_MAX_COLUMNS); @@ -1446,7 +1446,7 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pT } static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { - SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + SSTableObj *pTable = (SSTableObj *)pMsg->pTable; STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); if (pMeta == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -1479,7 +1479,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; - SSuperTableObj *pTable = mnodeGetSuperTable(stableName); + SSTableObj *pTable = mnodeGetSuperTable(stableName); if (pTable != NULL && pTable->vgHash != NULL) { contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg)); } @@ -1497,7 +1497,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < numOfTable; ++i) { char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; - SSuperTableObj *pTable = mnodeGetSuperTable(stableName); + SSTableObj *pTable = mnodeGetSuperTable(stableName); if (pTable == NULL) { mError("app:%p:%p, stable:%s, not exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName); mnodeDecTableRef(pTable); @@ -1569,7 +1569,7 @@ static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { mInfo("drop stable rsp received, result:%s", tstrerror(rpcMsg->code)); } -static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { +static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SCTableObj *pTable) { STagData * pTagData = NULL; int32_t tagDataLen = 0; int32_t totalCols = 0; @@ -1643,7 +1643,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO } static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; assert(pTable); mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, @@ -1669,7 +1669,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { } static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; assert(pTable); @@ -1679,12 +1679,12 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { pTable->info.tableId, pMsg->rpcMsg.handle); pMsg->retry = 0; - dnodeReprocessMnodeWriteMsg(pMsg); + dnodeReprocessMWriteMsg(pMsg); } else { mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, pMsg->rpcMsg.handle); - dnodeSendRpcMnodeWriteRsp(pMsg, TSDB_CODE_SUCCESS); + dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS); } return TSDB_CODE_MND_ACTION_IN_PROGRESS; } else { @@ -1699,7 +1699,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { SVgObj *pVgroup = pMsg->pVgroup; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; - SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); + SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); if (pTable == NULL) { mError("app:%p:%p, table:%s, failed to alloc memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -1842,7 +1842,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { } static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; mLInfo("app:%p:%p, ctable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); @@ -1880,7 +1880,7 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { static int32_t mnodeDropChildTableCb(SMnodeMsg *pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; mError("app:%p:%p, ctable:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); return code; } @@ -1889,7 +1889,7 @@ static int32_t mnodeDropChildTableCb(SMnodeMsg *pMsg, int32_t code) { } static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) { mError("app:%p:%p, table:%s, failed to drop ctable, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg, @@ -1914,7 +1914,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { return code; } -static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { +static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName) { SSchema *schema = (SSchema *) pTable->schema; for (int32_t col = 0; col < pTable->numOfColumns; col++) { if (strcasecmp(schema[col].name, colName) == 0) { @@ -1926,7 +1926,7 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col } static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; if (code != TSDB_CODE_SUCCESS) { mError("app:%p:%p, ctable %s, failed to alter column, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); @@ -1965,7 +1965,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { } static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SDbObj *pDb = pMsg->pDb; if (ncols <= 0) { mError("app:%p:%p, ctable:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, ncols); @@ -2014,7 +2014,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { SDbObj *pDb = pMsg->pDb; - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName); if (col <= 0) { mError("app:%p:%p, ctable:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, @@ -2046,7 +2046,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { } static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; int32_t col = mnodeFindNormalTableColumnIndex(pTable, oldName); if (col < 0) { mError("app:%p:%p, ctable:%s, change column, oldName: %s, newName: %s", pMsg->rpcMsg.ahandle, pMsg, @@ -2082,7 +2082,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char return sdbUpdateRow(&oper); } -static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { +static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns; for (int32_t i = 0; i < numOfCols; ++i) { strcpy(pSchema->name, pTable->schema[i].name); @@ -2097,7 +2097,7 @@ static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *p static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { SDbObj *pDb = pMsg->pDb; - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCTableObj *pTable = (SCTableObj *)pMsg->pTable; pMeta->uid = htobe64(pTable->uid); pMeta->tid = htonl(pTable->tid); @@ -2156,7 +2156,7 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { } int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + tagLen; - SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); + SCMCreateTableMsg *pCreateMsg = calloc(1, contLen); if (pCreateMsg == NULL) { mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId); @@ -2174,11 +2174,13 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { mDebug("app:%p:%p, table:%s, start to create on demand, tagLen:%d stable:%s", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId, tagLen, pTags->name); - rpcFreeCont(pMsg->rpcMsg.pCont); + if (pMsg->rpcMsg.pCont != pMsg->pCont) { + tfree(pMsg->rpcMsg.pCont); + } pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; pMsg->rpcMsg.pCont = pCreateMsg; pMsg->rpcMsg.contLen = contLen; - + return TSDB_CODE_MND_ACTION_NEED_REPROCESSED; } @@ -2203,7 +2205,7 @@ static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) { void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { void * pIter = NULL; int32_t numOfTables = 0; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; mInfo("vgId:%d, all child tables will be dropped from sdb", pVgroup->vgId); @@ -2231,7 +2233,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { void mnodeDropAllChildTables(SDbObj *pDropDb) { void * pIter = NULL; int32_t numOfTables = 0; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; char prefix[64] = {0}; tstrncpy(prefix, pDropDb->name, 64); @@ -2261,10 +2263,10 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) { mInfo("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables); } -static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) { +static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) { void * pIter = NULL; int32_t numOfTables = 0; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; mInfo("stable:%s uid:%" PRIu64 ", all child tables:%d will be dropped from sdb", pStable->info.tableId, pStable->uid, pStable->numOfTables); @@ -2292,11 +2294,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) { } #if 0 -static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) { +static SCTableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) { SVgObj *pVgroup = mnodeGetVgroup(vnode); if (pVgroup == NULL) return NULL; - SChildTableObj *pTable = pVgroup->tableList[tid - 1]; + SCTableObj *pTable = pVgroup->tableList[tid - 1]; mnodeIncTableRef((STableObj *)pTable); mnodeDecVgroupRef(pVgroup); @@ -2314,7 +2316,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { mDebug("app:%p:%p, dnode:%d, vgId:%d sid:%d, receive table config msg", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId, pCfg->vgId, pCfg->sid); - SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid); + SCTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid); if (pTable == NULL) { mError("app:%p:%p, dnode:%d, vgId:%d sid:%d, table not found", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId, pCfg->vgId, pCfg->sid); @@ -2322,7 +2324,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { } SMDCreateTableMsg *pCreate = NULL; - pCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *)pTable); + pCreate = mnodeBuildCreateChildTableMsg(NULL, (SCTableObj *)pTable); mnodeDecTableRef(pTable); if (pCreate == NULL) return terrno; @@ -2340,7 +2342,7 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; - SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; + SCTableObj *pTable = (SCTableObj *)mnodeMsg->pTable; assert(pTable); mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s", @@ -2351,14 +2353,14 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code)); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); + dnodeSendRpcMWriteRsp(mnodeMsg, rpcMsg->code); return; } if (mnodeMsg->pVgroup == NULL) mnodeMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (mnodeMsg->pVgroup == NULL) { mError("app:%p:%p, table:%s, failed to get vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_VGROUP_NOT_EXIST); + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_MND_VGROUP_NOT_EXIST); return; } @@ -2368,7 +2370,7 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { mnodeDropVgroup(mnodeMsg->pVgroup, NULL); } - dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); } /* @@ -2381,7 +2383,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; - SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; + SCTableObj *pTable = (SCTableObj *)mnodeMsg->pTable; assert(pTable); // If the table is deleted by another thread during creation, stop creating and send drop msg to vnode @@ -2399,7 +2401,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mnodeSendDropChildTableMsg(mnodeMsg, false); rpcMsg->code = TSDB_CODE_SUCCESS; - dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); + dnodeSendRpcMWriteRsp(mnodeMsg, rpcMsg->code); return; } @@ -2416,7 +2418,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pTable = NULL; mnodeDestroyChildTable(pTable); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, code); + dnodeSendRpcMWriteRsp(mnodeMsg, code); } } else { if (mnodeMsg->retry++ < 10) { @@ -2425,7 +2427,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); - dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); + dnodeDelayReprocessMWriteMsg(mnodeMsg); } else { mError("app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%" PRIu64 ", result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, @@ -2434,7 +2436,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; sdbDeleteRow(&oper); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); + dnodeSendRpcMWriteRsp(mnodeMsg, rpcMsg->code); } } } @@ -2445,25 +2447,25 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { SMnodeMsg *mnodeMsg = rpcMsg->ahandle; mnodeMsg->received++; - SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; + SCTableObj *pTable = (SCTableObj *)mnodeMsg->pTable; assert(pTable); if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { mDebug("app:%p:%p, ctable:%s, altered in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + dnodeSendRpcMWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); } else { if (mnodeMsg->retry++ < 3) { mDebug("app:%p:%p, table:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); - dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); + dnodeDelayReprocessMWriteMsg(mnodeMsg); } else { mError("app:%p:%p, table:%s, failed to alter in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); + dnodeSendRpcMWriteRsp(mnodeMsg, rpcMsg->code); } } } @@ -2483,7 +2485,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { for (int32_t t = 0; t < pInfo->numOfTables; ++t) { char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_FNAME_LEN); - SChildTableObj *pTable = mnodeGetChildTable(tableId); + SCTableObj *pTable = mnodeGetChildTable(tableId); if (pTable == NULL) continue; if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(tableId); @@ -2607,7 +2609,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows int32_t cols = 0; int32_t numOfRows = 0; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; char prefix[64] = {0}; @@ -2843,7 +2845,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro } int32_t numOfRows = 0; - SChildTableObj *pTable = NULL; + SCTableObj *pTable = NULL; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; char prefix[64] = {0}; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index d3fa9a43ab318c7f5f99887d32f21a6049520d20..cf40e59f6877c9a132e2f285b93ffdbe5304c925 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -529,7 +529,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; (void)sdbUpdateRow(&desc); - dnodeReprocessMnodeWriteMsg(pMsg); + dnodeReprocessMWriteMsg(pMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; // if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) { // mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, @@ -537,7 +537,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { // pVgroup->status = TAOS_VG_STATUS_READY; // SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; // (void)sdbUpdateRow(&desc); - // dnodeReprocessMnodeWriteMsg(pMsg); + // dnodeReprocessMWriteMsg(pMsg); // return TSDB_CODE_MND_ACTION_IN_PROGRESS; // } else { // mError("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle, @@ -694,7 +694,7 @@ static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) { return true; } - SChildTableObj *pCTable = (SChildTableObj *)pTable; + SCTableObj *pCTable = (SCTableObj *)pTable; if (pVgroup->vgId == pCTable->vgId) { return true; } else { @@ -791,7 +791,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v return numOfRows; } -void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { +void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable) { int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool); if (pTable->tid > idPoolSize) { mnodeAllocVgroupIdPool(pVgroup); @@ -807,7 +807,7 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { } } -void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { +void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable) { if (pTable->tid >= 1) { taosFreeId(pVgroup->idPool, pTable->tid); pVgroup->numOfTables--; @@ -970,7 +970,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, code); + dnodeSendRpcMWriteRsp(mnodeMsg, code); } } else { SSdbOper oper = { @@ -979,7 +979,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { .pObj = pVgroup }; sdbDeleteRow(&oper); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, mnodeMsg->code); + dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code); } } @@ -1041,7 +1041,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { code = TSDB_CODE_MND_SDB_ERROR; } - dnodeReprocessMnodeWriteMsg(mnodeMsg); + dnodeReprocessMWriteMsg(mnodeMsg); } static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 43a8ddbd1a0d207addf0576951b2af9b2be16555..0fb055972a5d62de5cb93e81485c1fe3cc5b8364 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -49,7 +49,7 @@ static taos_queue tsHttpQueue; void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t numOfRows, void (*fp)(void *param, void *result, int32_t numOfRows)) { if (tsHttpQueue != NULL) { - SHttpResult *pMsg = (SHttpResult *)taosAllocateQitem(sizeof(SHttpResult)); + SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult)); pMsg->param = param; pMsg->result = result; pMsg->numOfRows = numOfRows; diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index 048f839b728fd7ed2a74d59744ebff5d1223cc33..de1e0e233c2b1441a00c226d7305f99a6db41109 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -27,12 +27,12 @@ #include "monitor.h" #include "taoserror.h" -#define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} -#define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} -#define monitorWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} -#define monitorInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }} -#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} -#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} +#define mnFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} +#define mnError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} +#define mnWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} +#define mnInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }} +#define mnDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} +#define mnTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} #define SQL_LENGTH 1030 #define LOG_LEN_STR 100 @@ -91,12 +91,12 @@ int32_t monitorInitSystem() { pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) { - monitorError("failed to create thread to for monitor module, reason:%s", strerror(errno)); + mnError("failed to create thread to for monitor module, reason:%s", strerror(errno)); return -1; } pthread_attr_destroy(&thAttr); - monitorDebug("monitor thread is launched"); + mnDebug("monitor thread is launched"); monitorStartSystemFp = monitorStartSystem; monitorStopSystemFp = monitorStopSystem; @@ -107,12 +107,12 @@ int32_t monitorStartSystem() { taos_init(); tsMonitor.start = 1; monitorExecuteSQLFp = monitorExecuteSQL; - monitorInfo("monitor module start"); + mnInfo("monitor module start"); return 0; } static void *monitorThreadFunc(void *param) { - monitorDebug("starting to initialize monitor module ..."); + mnDebug("starting to initialize monitor module ..."); while (1) { static int32_t accessTimes = 0; @@ -121,7 +121,7 @@ static void *monitorThreadFunc(void *param) { if (tsMonitor.quiting) { tsMonitor.state = MON_STATE_NOT_INIT; - monitorInfo("monitor thread will quit, for taosd is quiting"); + mnInfo("monitor thread will quit, for taosd is quiting"); break; } else { taosGetDisk(); @@ -132,7 +132,7 @@ static void *monitorThreadFunc(void *param) { } if (dnodeGetDnodeId() <= 0) { - monitorDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); + mnDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); continue; } @@ -140,10 +140,10 @@ static void *monitorThreadFunc(void *param) { tsMonitor.state = MON_STATE_NOT_INIT; tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0); if (tsMonitor.conn == NULL) { - monitorError("failed to connect to database, reason:%s", tstrerror(terrno)); + mnError("failed to connect to database, reason:%s", tstrerror(terrno)); continue; } else { - monitorDebug("connect to database success"); + mnDebug("connect to database success"); } } @@ -155,10 +155,10 @@ static void *monitorThreadFunc(void *param) { taos_free_result(res); if (code != 0) { - monitorError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); + mnError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); break; } else { - monitorDebug("successfully to exec sql:%s", tsMonitor.sql); + mnDebug("successfully to exec sql:%s", tsMonitor.sql); } } @@ -174,7 +174,7 @@ static void *monitorThreadFunc(void *param) { } } - monitorInfo("monitor thread is stopped"); + mnInfo("monitor thread is stopped"); return NULL; } @@ -238,7 +238,7 @@ void monitorStopSystem() { tsMonitor.start = 0; tsMonitor.state = MON_STATE_NOT_INIT; monitorExecuteSQLFp = NULL; - monitorInfo("monitor module stopped"); + mnInfo("monitor module stopped"); } void monitorCleanUpSystem() { @@ -249,7 +249,7 @@ void monitorCleanUpSystem() { taos_close(tsMonitor.conn); tsMonitor.conn = NULL; } - monitorInfo("monitor module is cleaned up"); + mnInfo("monitor module is cleaned up"); } // unit is MB @@ -257,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) { float sysMemoryUsedMB = 0; bool suc = taosGetSysMemory(&sysMemoryUsedMB); if (!suc) { - monitorDebug("failed to get sys memory info"); + mnDebug("failed to get sys memory info"); } float procMemoryUsedMB = 0; suc = taosGetProcMemory(&procMemoryUsedMB); if (!suc) { - monitorDebug("failed to get proc memory info"); + mnDebug("failed to get proc memory info"); } return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB); @@ -274,7 +274,7 @@ static int32_t monitorBuildCpuSql(char *sql) { float sysCpuUsage = 0, procCpuUsage = 0; bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage); if (!suc) { - monitorDebug("failed to get cpu usage"); + mnDebug("failed to get cpu usage"); } if (sysCpuUsage <= procCpuUsage) { @@ -294,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) { float bandSpeedKb = 0; bool suc = taosGetBandSpeed(&bandSpeedKb); if (!suc) { - monitorDebug("failed to get bandwidth speed"); + mnDebug("failed to get bandwidth speed"); } return sprintf(sql, ", %f", bandSpeedKb); } static int32_t monitorBuildReqSql(char *sql) { - SDnodeStatisInfo info = dnodeGetStatisInfo(); + SStatisInfo info = dnodeGetStatisInfo(); return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum); } @@ -309,7 +309,7 @@ static int32_t monitorBuildIoSql(char *sql) { float readKB = 0, writeKB = 0; bool suc = taosGetProcIO(&readKB, &writeKB); if (!suc) { - monitorDebug("failed to get io info"); + mnDebug("failed to get io info"); } return sprintf(sql, ", %f, %f", readKB, writeKB); @@ -332,19 +332,19 @@ static void monitorSaveSystemInfo() { taos_free_result(res); if (code != 0) { - monitorError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); + mnError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); } else { - monitorDebug("successfully to save system info, sql:%s", tsMonitor.sql); + mnDebug("successfully to save system info, sql:%s", tsMonitor.sql); } } static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) { int32_t c = taos_errno(result); if (c != TSDB_CODE_SUCCESS) { - monitorError("save %s failed, reason:%s", (char *)param, tstrerror(c)); + mnError("save %s failed, reason:%s", (char *)param, tstrerror(c)); } else { int32_t rows = taos_affected_rows(result); - monitorDebug("save %s succ, rows:%d", (char *)param, rows); + mnDebug("save %s succ, rows:%d", (char *)param, rows); } taos_free_result(result); @@ -380,7 +380,7 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) { pMon->totalConns, pMon->maxConns, pMon->accessState); - monitorDebug("save account info, sql:%s", sql); + mnDebug("save account info, sql:%s", sql); taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info"); } @@ -401,13 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) { len += sprintf(sql + len, "', '%s')", tsLocalEp); sql[len++] = 0; - monitorDebug("save log, sql: %s", sql); + mnDebug("save log, sql: %s", sql); taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log"); } void monitorExecuteSQL(char *sql) { if (tsMonitor.state != MON_STATE_INITED) return; - monitorDebug("execute sql:%s", sql); + mnDebug("execute sql:%s", sql); taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql"); } diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index f0040f8cdf8cf52d6cca66602c7bbc61ae931cca..e8dd44b48fa6b1389439711439fe864a0fbd948d 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -22,7 +22,6 @@ #include "tsdb.h" #include "dnode.h" #include "vnodeInt.h" -#include "vnodeVersion.h" #include "vnodeCfg.h" static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a7dff5aef54ddf588efe9554851fa6cc424e0724..f5b670f423ff43eedcc1c4d6a82af473137fb2a3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -15,17 +15,11 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "tcache.h" -#include "cJSON.h" -#include "dnode.h" -#include "hash.h" #include "taoserror.h" #include "taosmsg.h" #include "tglobal.h" #include "trpc.h" #include "tsdb.h" -#include "ttimer.h" #include "tutil.h" #include "vnode.h" #include "vnodeInt.h" @@ -34,7 +28,7 @@ #include "vnodeCfg.h" #include "vnodeVersion.h" -static SHashObj*tsDnodeVnodesHash; +static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); @@ -67,8 +61,8 @@ int32_t vnodeInitResources() { vnodeInitWriteFp(); vnodeInitReadFp(); - tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); - if (tsDnodeVnodesHash == NULL) { + tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + if (tsVnodesHash == NULL) { vError("failed to init vnode list"); return TSDB_CODE_VND_OUT_OF_MEMORY; } @@ -77,10 +71,10 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { - if (tsDnodeVnodesHash != NULL) { + if (tsVnodesHash != NULL) { vDebug("vnode list is cleanup"); - taosHashCleanup(tsDnodeVnodesHash); - tsDnodeVnodesHash = NULL; + taosHashCleanup(tsVnodesHash); + tsVnodesHash = NULL; } syncCleanUp(); @@ -349,7 +343,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); + taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); return TSDB_CODE_SUCCESS; } @@ -441,7 +435,7 @@ void vnodeRelease(void *pVnodeRaw) { tsem_destroy(&pVnode->sem); free(pVnode); - int32_t count = taosHashGetSize(tsDnodeVnodesHash); + int32_t count = taosHashGetSize(tsVnodesHash); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count); } @@ -458,7 +452,7 @@ static void vnodeIncRef(void *ptNode) { } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = taosHashGetCB(tsDnodeVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -497,7 +491,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { } int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { - SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); + SHashMutableIterator *pIter = taosHashCreateIter(tsVnodesHash); while (taosHashIterNext(pIter)) { SVnodeObj **pVnode = taosHashIterGet(pIter); if (pVnode == NULL) continue; @@ -518,7 +512,7 @@ int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { void vnodeBuildStatusMsg(void *param) { SStatusMsg *pStatus = param; - SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); + SHashMutableIterator *pIter = taosHashCreateIter(tsVnodesHash); while (taosHashIterNext(pIter)) { SVnodeObj **pVnode = taosHashIterGet(pIter); @@ -547,7 +541,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + taosHashRemove(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); if (pVnode->status != TAOS_VN_STATUS_INIT) { // it may be in updateing or reset state, then it shall wait diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index b63c1992d04b3bdaaac517b0c57944959cb0b283..55ddf3a34b42ea82ee169d2f25aeea23b3c0c972 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -15,13 +15,10 @@ #define _DEFAULT_SOURCE #define _NON_BLOCKING_RETRIEVE 0 - #include "os.h" - #include "tglobal.h" #include "taoserror.h" #include "taosmsg.h" -#include "tcache.h" #include "query.h" #include "trpc.h" #include "tsdb.h" @@ -29,9 +26,9 @@ #include "vnodeInt.h" #include "tqueue.h" -static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg); -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg); -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg); +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { @@ -44,16 +41,16 @@ void vnodeInitReadFp(void) { // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // request enters the queue // -int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) { +int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) { SVnodeObj *pVnode = (SVnodeObj *)param; - int32_t msgType = pReadMsg->msgType; + int32_t msgType = pRead->msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); return TSDB_CODE_VND_MSG_NOT_PROCESSED; } - return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead); } static int32_t vnodeCheckRead(void *param) { @@ -180,27 +177,27 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { pRsp->completed = true; } -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { - void * pCont = pReadMsg->pCont; - int32_t contLen = pReadMsg->contLen; - SRspRet *pRet = &pReadMsg->rspRet; +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { + void * pCont = pRead->pCont; + int32_t contLen = pRead->contLen; + SRspRet *pRet = &pRead->rspRet; SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont; memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pReadMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont; + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcHandle); - assert(pReadMsg->contLen > 0 && killQueryMsg->free == 1); + vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle); + assert(pRead->contLen > 0 && killQueryMsg->free == 1); void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, - pReadMsg->rpcHandle); + pRead->rpcHandle); } else { assert(*qhandle == (void *)killQueryMsg->qhandle); @@ -242,9 +239,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } if (handle != NULL && - vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, - pReadMsg->rpcHandle); + pRead->rpcHandle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); return pRsp->code; @@ -255,7 +252,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcHandle); + code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle); if (code != TSDB_CODE_SUCCESS) { pRsp->code = code; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -264,7 +261,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } } else { assert(pCont != NULL); - void **qhandle = (void **)pReadMsg->qhandle; + void **qhandle = (void **)pRead->qhandle; vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); @@ -276,14 +273,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { // build query rsp, the retrieve request has reached here already if (buildRes) { // update the connection info according to the retrieve connection - pReadMsg->rpcHandle = qGetResultRetrieveMsg(*qhandle); - assert(pReadMsg->rpcHandle != NULL); + pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle); + assert(pRead->rpcHandle != NULL); vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, - pReadMsg->rpcHandle); + pRead->rpcHandle); // set the real rsp error code - pReadMsg->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcHandle); + pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle); // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client code = TSDB_CODE_QRY_HAS_RSP; @@ -308,16 +305,16 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { return code; } -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { - void * pCont = pReadMsg->pCont; - SRspRet *pRet = &pReadMsg->rspRet; +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { + void * pCont = pRead->pCont; + SRspRet *pRet = &pRead->rspRet; SRetrieveTableMsg *pRetrieve = pCont; pRetrieve->free = htons(pRetrieve->free); pRetrieve->qhandle = htobe64(pRetrieve->qhandle); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, - pRetrieve->free, pReadMsg->rpcHandle); + pRetrieve->free, pRead->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); @@ -348,8 +345,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } // register the qhandle to connect to quit query immediate if connection is broken - if (vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcHandle); + if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -359,7 +356,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { bool freeHandle = true; bool buildRes = false; - code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcHandle); + code = qRetrieveQueryResultInfo(*handle, &buildRes, pRead->rpcHandle); if (code != TSDB_CODE_SUCCESS) { // TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -370,7 +367,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { assert(buildRes == true); #if _NON_BLOCKING_RETRIEVE if (!buildRes) { - assert(pReadMsg->rpcHandle != NULL); + assert(pRead->rpcHandle != NULL); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; @@ -378,7 +375,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { #endif // ahandle is the sqlObj pointer - code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcHandle); + code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pRead->rpcHandle); } // If qhandle is not added into vread queue, the query should be completed already or paused with error. diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 3caee2fb0c6469f8325906c06ba5b598ebc161b9..f42b359b26d8467cda65a9f04a70d3db2305f234 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -19,7 +19,6 @@ #include "taoserror.h" #include "tqueue.h" #include "trpc.h" -#include "tutil.h" #include "tsdb.h" #include "twal.h" #include "tsync.h" @@ -185,7 +184,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SDropSTableMsg *pTable = pCont; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; vDebug("vgId:%d, stable:%s, start to drop", pVnode->vgId, pTable->tableId); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1b2fe37c71f26486ba847006ecf3aa373b5f55c1..a5d04f79a6c74dcb9d514bfc22d5dd6b8f03198a 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -57,8 +57,8 @@ cd ../../../debug; make ./test.sh -f general/db/delete_reuse2.sim ./test.sh -f general/db/delete_reusevnode.sim ./test.sh -f general/db/delete_reusevnode2.sim -./test.sh -f general/db/delete_writing1.sim -./test.sh -f general/db/delete_writing2.sim +#./test.sh -f general/db/delete_writing1.sim +#./test.sh -f general/db/delete_writing2.sim ./test.sh -f general/db/delete.sim ./test.sh -f general/db/len.sim ./test.sh -f general/db/repeat.sim