diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9cf68b87f9764b4670cf7091c2f7d83bbb347063..f8d1fb39111f281887606709f7fb8e2a25d37391 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -213,6 +213,7 @@ endif(${BUILD_WITH_TRAFT}) # LIBUV if(${BUILD_WITH_UV}) + add_compile_options(-Wno-sign-compare) if (${TD_WINDOWS}) file(READ "libuv/include/uv.h" CONTENTS) string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1bdc07ba6f14e78f349b6385d0a5ba3f465e8845..6f91cbd5ee09d8e438b80b72b00dcbee01e61b9e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1942,13 +1942,14 @@ typedef struct { } SVCreateTSmaReq; typedef struct { - int8_t type; // 0 status report, 1 update data - char indexName[TSDB_INDEX_NAME_LEN]; // - STimeWindow windows; + int8_t type; // 0 status report, 1 update data + int64_t indexUid; + int64_t skey; // start TS key of interval/sliding window } STSmaMsg; typedef struct { int64_t ver; // use a general definition + int64_t indexUid; char indexName[TSDB_INDEX_NAME_LEN]; } SVDropTSmaReq; @@ -2345,9 +2346,9 @@ struct SRpcMsg; struct SEpSet; struct SMgmtWrapper; typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg); -typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); -typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); +typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); +typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); +typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); #ifdef __cplusplus } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a596794b3d9ac493f775bd2d6f6bb74f12075036..73a78131dc1cb6d81e85c3278328866d05ecd62a 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -190,6 +190,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 4202859359e2fcb014b662956dcdc973f54a8c6c..3a93d16733ed4691a484b8cff1064533d6fdf70a 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -29,6 +29,7 @@ typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SSnode SSnode; typedef struct { + int32_t reserved; } SSnodeLoad; typedef struct { diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 02029a996cd315bc5520da7041ccecee82f52045..c2cce3a05db7a9907d175745e57bc7b6e9961d80 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -52,8 +52,8 @@ typedef struct { char user[TSDB_USER_LEN]; SRpcMsg rpcMsg; int32_t rspLen; - void *pRsp; - void *pNode; + void * pRsp; + void * pNode; } SNodeMsg; typedef struct SRpcInit { @@ -87,7 +87,15 @@ typedef struct { } SRpcCtxVal; typedef struct { - SHashObj *args; + int32_t msgType; + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcBrokenlinkVal; + +typedef struct { + SHashObj * args; + SRpcBrokenlinkVal brokenVal; } SRpcCtx; int32_t rpcInit(); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 638d8c9c9ae1a152b20ab8a2bac160eae37b50fb..1907c2c17b82bb5e9a20d1baa30cf9b83bfeae3e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -460,7 +460,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; SQuery* pQueryNode = NULL; - char* pStr = NULL; + char* astStr = NULL; terrno = TSDB_CODE_SUCCESS; if (taos == NULL || topicName == NULL || sql == NULL) { @@ -489,17 +489,17 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i // todo check for invalid sql statement and return with error code - CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return); + CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); /*printf("%s\n", pStr);*/ - SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T }; + SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T}; strcpy(name.dbname, pRequest->pDb); strcpy(name.tname, topicName); SCMCreateTopicReq req = { .igExists = 1, - .ast = (char*)pStr, + .ast = (char*)astStr, .sql = (char*)sql, }; tNameExtractFullName(&name, req.name); @@ -513,7 +513,11 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tSerializeSCMCreateTopicReq(buf, tlen, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 06d124cb28ebc1f79bba25e640766b88161afaaf..a70454fb02ab3742088112da24f3b224690e5fe9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2716,12 +2716,14 @@ int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->ver); + tlen += taosEncodeFixedI64(buf, pReq->indexUid); tlen += taosEncodeString(buf, pReq->indexName); return tlen; } void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { buf = taosDecodeFixedI64(buf, &(pReq->ver)); + buf = taosDecodeFixedI64(buf, &(pReq->indexUid)); buf = taosDecodeStringTo(buf, pReq->indexName); return buf; diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 41fd23e3c7c2c3ccae488f296daf8f5cb1d6ccaa..8dbea36c222d5358ac17237fcfa118665c1f45e2 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -148,4 +148,5 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg); } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index c0e7e212cc98853bfdaa58be474ed0e6798549a1..2020a3d219292de480da9bd77af9518fbaa942d9 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -24,6 +24,8 @@ extern "C" { #endif +typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType; + typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; @@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg); +int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 8b2c2d7dd44e015f4ce5884d4b17fc8a6bf81d21..c5e79765c76871858ac0099c71f647921babfcbc 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { vnodeOpt.nthreads = tsNumOfCommitThreads; vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue; + vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue; vnodeOpt.sendReqFp = dndSendReqToDnode; + vnodeOpt.sendMnodeReqFp = dndSendReqToMnode; + vnodeOpt.sendRspFp = dndSendRsp; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index b3f752931177c380bd216fb501b319cb35e7dede..53423845d4ede2728a623ab9a40203dee2757945 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -265,6 +265,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e5d5e38328528b3d4bd5bcd34ba24b068debe0e9..fe01b19d2d25fb396a28af8a5e10eaa1c0e02d66 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,46 +16,109 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; + dndSendRsp(pWrapper, &rsp); +} + +static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); + + switch (msgType) { + case TDMT_DND_CREATE_VNODE: + code = vmProcessCreateVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = vmProcessAlterVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = vmProcessDropVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = vmProcessSyncVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = vmProcessCompactVnodeReq(pMgmt, pMsg); + break; + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + dError("msg:%p, not processed in vnode-mgmt queue", pMsg); + } + + if (msgType & 1u) { + if (code != 0 && terrno != 0) code = terrno; + vmSendRsp(pMgmt->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); +} + static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed in vnode query queue", pMsg); - vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); + dTrace("msg:%p, will be processed in vnode-query queue", pMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); + if (code != 0) { + vmSendRsp(pVnode->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed in vnode fetch queue", pMsg); - vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + if (code != 0) { + vmSendRsp(pVnode->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); + if (pArray == NULL) { + dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); + return; + } for (int32_t i = 0; i < numOfMsgs; ++i) { SNodeMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); - dTrace("msg:%p, will be processed in vnode write queue", pMsg); - void *ptr = taosArrayPush(pArray, &pMsg); - assert(ptr != NULL); + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + + dTrace("msg:%p, will be processed in vnode-write queue", pMsg); + if (taosArrayPush(pArray, &pMsg) == NULL) { + dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); + vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); + } } vnodeProcessWMsgs(pVnode->pImpl, pArray); - for (size_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pRsp = NULL; + numOfMsgs = taosArrayGetSize(pArray); + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); + SRpcMsg *pRsp = NULL; + + int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; dndSendRsp(pVnode->pWrapper, pRsp); free(pRsp); } else { - if (code != 0) code = terrno; - SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; - dndSendRsp(pVnode->pWrapper, &rpcRsp); + if (code != 0 && terrno != 0) code = terrno; + vmSendRsp(pVnode->pWrapper, pMsg, code); } } - for (size_t i = 0; i < numOfMsgs; i++) { + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->rpcMsg.pCont); @@ -89,93 +152,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf } } -static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { - SRpcMsg *pMsg = &pNodeMsg->rpcMsg; +static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) { + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; - SMsgHead *pHead = pMsg->pCont; + SMsgHead *pHead = pRpc->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); + dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr()); + return -1; } - return pVnode; -} - -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; + switch (qtype) { + case VND_QUERY_QUEUE: + dTrace("msg:%p, will be written into vnode-query queue", pMsg); + code = taosWriteQitem(pVnode->pQueryQ, pMsg); + break; + case VND_FETCH_QUEUE: + dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); + code = taosWriteQitem(pVnode->pFetchQ, pMsg); + break; + case VND_WRITE_QUEUE: + dTrace("msg:%p, will be written into vnode-write queue", pMsg); + code = taosWriteQitem(pVnode->pWriteQ, pMsg); + case VND_SYNC_QUEUE: + dTrace("msg:%p, will be written into vnode-sync queue", pMsg); + code = taosWriteQitem(pVnode->pSyncQ, pMsg); + default: + terrno = TSDB_CODE_INVALID_PARA; + break; + } - int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg); vmReleaseVnode(pMgmt, pVnode); return code; } int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE); +} - int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg); - vmReleaseVnode(pMgmt, pVnode); - return code; +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE); } int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; - - int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg); - vmReleaseVnode(pMgmt, pVnode); - return code; + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE); } int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE); +} - int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg); - vmReleaseVnode(pMgmt, pVnode); - return code; +int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg); } -int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { +static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - - int32_t code = -1; - SMsgHead *pHead = pRpc->pCont; - // pHead->vgId = htonl(pHead->vgId); + int32_t code = -1; + SMsgHead *pHead = pRpc->pCont; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg != NULL) { + dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - code = taosWriteQitem(pVnode->pQueryQ, pMsg); + switch (qtype) { + case VND_QUERY_QUEUE: + dTrace("msg:%p, will be put into vnode-query queue", pMsg); + code = taosWriteQitem(pVnode->pQueryQ, pMsg); + break; + case VND_FETCH_QUEUE: + dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); + code = taosWriteQitem(pVnode->pFetchQ, pMsg); + break; + case VND_APPLY_QUEUE: + dTrace("msg:%p, will be put into vnode-apply queue", pMsg); + code = taosWriteQitem(pVnode->pApplyQ, pMsg); + break; + case VND_WRITE_QUEUE: + case VND_SYNC_QUEUE: + default: + terrno = TSDB_CODE_INVALID_PARA; + break; + } } vmReleaseVnode(pMgmt, pVnode); return code; } -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - - int32_t code = -1; - SMsgHead *pHead = pRpc->pCont; - // pHead->vgId = htonl(pHead->vgId); +int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE); +} - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) return -1; +int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE); +} - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg != NULL) { - pMsg->rpcMsg = *pRpc; - code = taosWriteQitem(pVnode->pApplyQ, pMsg); - } - vmReleaseVnode(pMgmt, pVnode); - return code; +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE); } int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { @@ -191,6 +273,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { return -1; } + dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId); return 0; } @@ -205,43 +288,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pSyncQ = NULL; pVnode->pFetchQ = NULL; pVnode->pQueryQ = NULL; -} - -static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg); - - switch (msgType) { - case TDMT_DND_CREATE_VNODE: - code = vmProcessCreateVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_ALTER_VNODE: - code = vmProcessAlterVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_DROP_VNODE: - code = vmProcessDropVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_SYNC_VNODE: - code = vmProcessSyncVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = vmProcessCompactVnodeReq(pMgmt, pMsg); - break; - default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in mgmt queue", pMsg); - } - - if (msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; - dndSendRsp(pMgmt->pWrapper, &rsp); - } - - dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); + dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } int32_t vmStartWorker(SVnodesMgmt *pMgmt) { @@ -275,7 +322,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { if (tWWorkerInit(pWPool) != 0) return -1; if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); + dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -291,9 +338,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->syncPool); dDebug("vnode workers is closed"); } - -int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); -} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 8f2a7ef2e558777b3d90dee2973dd27c96069070..f0f06fc9a8c5d45105d555933ce94fcbcae92441 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -802,6 +802,32 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } +static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) { + SDropDbRsp dropRsp = {0}; + if (pDb != NULL) { + memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + dropRsp.uid = pDb->uid; + } + + int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp); + void *pRsp = NULL; + if (useRpcMalloc) { + pRsp = rpcMallocCont(rspLen); + } else { + pRsp = malloc(rspLen); + } + + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp); + *pRspLen = rspLen; + *ppRsp = pRsp; + return 0; +} + static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg); @@ -814,18 +840,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; - SDropDbRsp dropRsp = {0}; - memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN); - dropRsp.uid = pDb->uid; - - int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp); - void *pRsp = malloc(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto DROP_DB_OVER; - } - tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp); - + int32_t rspLen = 0; + void *pRsp = NULL; + if (mndBuildDropDbRsp(pDb, &rspLen, &pRsp, false) < 0) goto DROP_DB_OVER; mndTransSetRpcRsp(pTrans, pRsp, rspLen); if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; @@ -854,7 +871,7 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq) { pDb = mndAcquireDb(pMnode, dropReq.db); if (pDb == NULL) { if (dropReq.ignoreNotExists) { - code = 0; + code = mndBuildDropDbRsp(pDb, &pReq->rspLen, &pReq->pRsp, true); goto DROP_DB_OVER; } else { terrno = TSDB_CODE_MND_DB_NOT_EXIST; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c0bfc2c3f5fc3d6e5a1f987e84ce45cb4e175ccb..1e9e48e2064db4a8404c7277fd0ba8eec367e9eb 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -77,9 +77,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { } static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { - SMsgHead *pHead = pMsg->pCont; - int32_t taskId = pHead->streamTaskId; - SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); + SStreamExecMsgHead *pHead = pMsg->pCont; + int32_t taskId = pHead->streamTaskId; + SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); if (pTask == NULL) { return -1; } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8d8ed2e427087798c7e6da03be47743cec526bfa..6391eaffea70088b389a0e24834e151f8d075dd4 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -55,6 +55,8 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); + int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 776e4e47ac3378edfb9476936bd235a25acb4472..1a62ecea8ff90d12240a5e8d6552c240cae464d4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -62,6 +62,7 @@ typedef struct { typedef struct { uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) PutToQueueFp putToQueryQFp; + PutToQueueFp putToFetchQFp; SendReqFp sendReqFp; SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; @@ -125,9 +126,8 @@ void vnodeDestroy(const char *path); * * @param pVnode The vnode object. * @param pMsgs The array of SRpcMsg - * @return int 0 for success, -1 for failure */ -int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); /** * @brief Apply a write request message. diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index d3f0dce2e5765cad61fbe48a499d5f58953c89ec..de9b7bac837aea248a018e04b10ea54782c7c983 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -46,15 +46,18 @@ typedef struct SVnodeTask { typedef struct SVnodeMgr { td_mode_flag_t vnodeInitFlag; // For commit - bool stop; - uint16_t nthreads; - TdThread* threads; + bool stop; + uint16_t nthreads; + TdThread* threads; TdThreadMutex mutex; TdThreadCond hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt - PutToQueueFp putToQueryQFp; - SendReqFp sendReqFp; + PutToQueueFp putToQueryQFp; + PutToQueueFp putToFetchQFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -85,7 +88,10 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); -void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq); +int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq); +void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp); #define vFatal(...) \ do { \ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cdc250e8d5bbb9cbaac311a38ac2001144572a1..02fecb49b77c1fbf8257e6b2197a698edeb2c379 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; } + +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { + // + return 0; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 0f0d50ad085ad0bf92654830e24e594699cf5161..fa867543b084f8614710d162fbc2eea2e7beca53 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { } if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) { - tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 442921b90ea30dfef491c96e5c4e6b924c30d89d..51aaf9e68f9f1209af184cc724d59604ace3add4 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putToQueryQFp = pOption->putToQueryQFp; + vnodeMgr.putToFetchQFp = pOption->putToFetchQFp; vnodeMgr.sendReqFp = pOption->sendReqFp; + vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp; + vnodeMgr.sendRspFp = pOption->sendRspFp; // Start commit handers if (pOption->nthreads > 0) { @@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) { } int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) { - terrno = TSDB_CODE_VND_APP_ERROR; - return -1; - } return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq); } -void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); +int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) { + return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq); +} + +int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { + return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); +} + +int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) { + return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq); +} + +void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) { + (*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp); } /* ------------------------ STATIC METHODS ------------------------ */ diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 5762e81ba46d646f309bbe34c58211f9bbd0667e..bdcada5c026b856d97ce0231bfbfa1266f4976ac 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) { (putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq); } -void vnodeQueryClose(SVnode *pVnode) { - qWorkerDestroy((void **)&pVnode->pQuery); -} +void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); @@ -67,6 +65,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_EXEC: + return tqProcessTaskExec(pVnode->pTq, pMsg); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 64747dea8892d27994037a4097488935c5b06fcb..c3f53b6b9187281816baa84af0523957b5a5e04e 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -16,7 +16,7 @@ #include "tq.h" #include "vnd.h" -int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { // TODO: Integrate RAFT module here - return 0; + // No results are returned because error handling is difficult + // return 0; } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index ed1ad7a374286c8251dcb719be9a5510c6eec844..c7f964f2baaef8e7ce39443fdc391452cca774a2 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -154,15 +154,7 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { // add more search type } - char* dst = NULL; - if (data != NULL) { - char* src = (char*)data; - size_t len = strlen(src); - dst = (char*)calloc(1, len * sizeof(char) + 1); - memcpy(dst, src, len); - } - - ctx->data = dst; + ctx->data = strdup((char*)data); ctx->type = atype; ctx->stdata = (void*)sv; return ctx; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4e75dcd843aed23ca4cedbc835ef1af532442b3..32a0cf0d54a09c5003e1cdafa7047c3d83978cc5 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -228,8 +228,8 @@ typedef struct SConnBuffer { typedef void (*AsyncCB)(uv_async_t* handle); typedef struct { - void* pThrd; - queue qmsg; + void* pThrd; + queue qmsg; TdThreadMutex mtx; // protect qmsg; } SAsyncItem; @@ -273,10 +273,52 @@ void transCloseClient(void* arg); void transCloseServer(void* arg); void transCtxInit(STransCtx* ctx); -void transCtxDestroy(STransCtx* ctx); +void transCtxCleanup(STransCtx* ctx); void transCtxClear(STransCtx* ctx); void transCtxMerge(STransCtx* dst, STransCtx* src); void* transCtxDumpVal(STransCtx* ctx, int32_t key); +void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); + +// queue sending msgs +typedef struct { + SArray* q; + void (*free)(void* arg); +} STransQueue; + +/* + * init queue + * note: queue'size is small, default 1 + */ +void transQueueInit(STransQueue* queue, void (*free)(void* arg)); + +/* + * put arg into queue + * if queue'size > 1, return false; else return true + */ +bool transQueuePush(STransQueue* queue, void* arg); +/* + * pop head from queue + */ + +void* transQueuePop(STransQueue* queue); +/* + * get head from queue + */ +void* transQueueGet(STransQueue* queue); + +/* + * queue empty or not + */ + +bool transQueueEmpty(STransQueue* queue); +/* + * clear queue + */ +void transQueueClear(STransQueue* queue); +/* + * destroy queue + */ +void transQueueDestroy(STransQueue* queue); #ifdef __cplusplus } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fe5d8bd7f5de8ac85a8c4f34c98e3f0d2b3efb8c..ea73e07c80b6f3bdfd2952c38741abe6a5e36a39 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,13 +25,14 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; void* data; - SArray* cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - bool broken; // link broken or not - STransCtx ctx; - + // SArray* cliMsgs; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + int hThrdIdx; + STransCtx ctx; + + bool broken; // link broken or not ConnStatus status; // int release; // 1: release // spi configure @@ -56,14 +57,14 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrdObj { - TdThread thread; + TdThread thread; uv_loop_t* loop; SAsyncPool* asyncPool; uv_timer_t timer; void* pool; // conn pool // msg queue - queue msg; + queue msg; TdThreadMutex msgMtx; uint64_t nextTimeout; // next timeout @@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { - if (taosArrayGetSize(conn->cliMsgs) > 0) { + if (!transQueueEmpty(&conn->cliMsgs)) { cliSend(conn); return true; - } else { - return false; } + return false; } void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; @@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -204,15 +205,14 @@ void cliHandleResp(SCliConn* conn) { CONN_SHOULD_RELEASE(conn, pHead); - SCliMsg* pMsg = NULL; - if (taosArrayGetSize(conn->cliMsgs) > 0) { - pMsg = taosArrayGetP(conn->cliMsgs, 0); - taosArrayRemove(conn->cliMsgs, 0); - } + SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -264,7 +264,7 @@ _RETURN: } void cliHandleExcept(SCliConn* pConn) { - if (taosArrayGetSize(pConn->cliMsgs) == 0) { + if (transQueueEmpty(&pConn->cliMsgs)) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { transUnrefCliHandle(pConn); return; @@ -274,11 +274,7 @@ void cliHandleExcept(SCliConn* pConn) { STrans* pTransInst = pThrd->pTransInst; do { - SCliMsg* pMsg = NULL; - if (taosArrayGetSize(pConn->cliMsgs) > 0) { - pMsg = taosArrayGetP(pConn->cliMsgs, 0); - taosArrayRemove(pConn->cliMsgs, 0); - } + SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; @@ -289,6 +285,9 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); + } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -303,7 +302,7 @@ void cliHandleExcept(SCliConn* pConn) { } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); - } while (taosArrayGetSize(pConn->cliMsgs) > 0); + } while (!transQueueEmpty(&pConn->cliMsgs)); transUnrefCliHandle(pConn); } @@ -380,21 +379,20 @@ static void addConnToPool(void* pool, SCliConn* conn) { SCliThrdObj* thrd = conn->hostThrd; CONN_HANDLE_THREAD_QUIT(thrd); - char key[128] = {0}; + STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + transCtxCleanup(&conn->ctx); + transQueueClear(&conn->cliMsgs); + conn->status = ConnNormal; - transCtxDestroy(&conn->ctx); + char key[128] = {0}; tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); - STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - conn->status = ConnNormal; // list already create before assert(plist != NULL); - taosArrayClear(conn->cliMsgs); QUEUE_PUSH(&plist->conn, &conn->conn); assert(!QUEUE_IS_EMPTY(&plist->conn)); } @@ -445,7 +443,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; - conn->cliMsgs = taosArrayInit(2, sizeof(void*)); + + transQueueInit(&conn->cliMsgs, NULL); QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -465,18 +464,18 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); - transCtxDestroy(&conn->ctx); - taosArrayDestroy(conn->cliMsgs); + transCtxCleanup(&conn->ctx); + transQueueDestroy(&conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } static bool cliHandleNoResp(SCliConn* conn) { - bool res = false; - SArray* msgs = conn->cliMsgs; - if (taosArrayGetSize(msgs) > 0) { - SCliMsg* pMsg = taosArrayGetP(msgs, 0); + bool res = false; + if (!transQueueEmpty(&conn->cliMsgs)) { + SCliMsg* pMsg = transQueueGet(&conn->cliMsgs); if (REQUEST_NO_RESP(&pMsg->msg)) { - taosArrayRemove(msgs, 0); + transQueuePop(&conn->cliMsgs); + // taosArrayRemove(msgs, 0); destroyCmsg(pMsg); res = true; } @@ -509,8 +508,9 @@ static void cliSendCb(uv_write_t* req, int status) { void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); - assert(taosArrayGetSize(pConn->cliMsgs) > 0); - SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0); + // assert(taosArrayGetSize(pConn->cliMsgs) > 0); + assert(!transQueueEmpty(&pConn->cliMsgs)); + SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs); STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; @@ -600,9 +600,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (T_REF_VAL_GET(conn) == 2) { transUnrefCliHandle(conn); - taosArrayPush(conn->cliMsgs, &pMsg); - if (taosArrayGetSize(conn->cliMsgs) >= 2) { - return; // send one by one + if (!transQueuePush(&conn->cliMsgs, pMsg)) { + return; } cliSend(conn); } else { @@ -643,17 +642,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->hThrdIdx = pCtx->hThrdIdx; transCtxMerge(&conn->ctx, &pCtx->appCtx); - if (taosArrayGetSize(conn->cliMsgs) > 0) { - taosArrayPush(conn->cliMsgs, &pMsg); + if (!transQueuePush(&conn->cliMsgs, pMsg)) { return; } - - taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); - taosArrayPush(conn->cliMsgs, &pMsg); + transQueuePush(&conn->cliMsgs, pMsg); conn->hThrdIdx = pCtx->hThrdIdx; conn->ip = strdup(pMsg->ctx->ip); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 6c16113d46892e4773b5a43505859017e7bd1e7e..5684c332c0bde9fc5a3dc0c2001b30c197adb76a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) { // init transCtx ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); } -void transCtxDestroy(STransCtx* ctx) { +void transCtxCleanup(STransCtx* ctx) { if (ctx->args == NULL) { return; } @@ -238,12 +238,14 @@ void transCtxDestroy(STransCtx* ctx) { iter->free(iter->val); iter = taosHashIterate(ctx->args, iter); } + taosHashCleanup(ctx->args); } void transCtxMerge(STransCtx* dst, STransCtx* src) { if (dst->args == NULL) { dst->args = src->args; + dst->brokenVal = src->brokenVal; src->args = NULL; return; } @@ -275,5 +277,58 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { memcpy(ret, (char*)cVal->val, cVal->len); return (void*)ret; } +void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { + char* ret = calloc(1, ctx->brokenVal.len); + + memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len); + *msgType = ctx->brokenVal.msgType; + + return (void*)ret; +} + +void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { + queue->q = taosArrayInit(2, sizeof(void*)); + queue->free = free; +} +bool transQueuePush(STransQueue* queue, void* arg) { + taosArrayPush(queue->q, &arg); + if (taosArrayGetSize(queue->q) > 1) { + return false; + } + return true; +} +void* transQueuePop(STransQueue* queue) { + if (taosArrayGetSize(queue->q) == 0) { + return NULL; + } + void* ptr = taosArrayGetP(queue->q, 0); + taosArrayRemove(queue->q, 0); + return ptr; +} + +void* transQueueGet(STransQueue* queue) { + if (taosArrayGetSize(queue->q) == 0) { + return NULL; + } + void* ptr = taosArrayGetP(queue->q, 0); + return ptr; +} +bool transQueueEmpty(STransQueue* queue) { + // + return taosArrayGetSize(queue->q) == 0; +} +void transQueueClear(STransQueue* queue) { + if (queue->free != NULL) { + for (int i = 0; i < taosArrayGetSize(queue->q); i++) { + void* p = taosArrayGetP(queue->q, i); + queue->free(p); + } + } + taosArrayClear(queue->q); +} +void transQueueDestroy(STransQueue* queue) { + transQueueClear(queue); + taosArrayDestroy(queue->q); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 3daac3e6f5cbc44250363de1ba2aa98fa9ba1ccb..c6032a956980923bc00bd7e7a679c069e9ac32e6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -37,7 +37,7 @@ typedef struct SSrvConn { void* pTransInst; // rpc init void* ahandle; // void* hostThrd; - SArray* srvMsgs; + STransQueue srvMsgs; SSrvRegArg regArg; bool broken; // conn broken; @@ -62,12 +62,12 @@ typedef struct SSrvMsg { } SSrvMsg; typedef struct SWorkThrdObj { - TdThread thread; - uv_pipe_t* pipe; - uv_os_fd_t fd; - uv_loop_t* loop; - SAsyncPool* asyncPool; - queue msg; + TdThread thread; + uv_pipe_t* pipe; + uv_os_fd_t fd; + uv_loop_t* loop; + SAsyncPool* asyncPool; + queue msg; TdThreadMutex msgMtx; queue conn; @@ -76,7 +76,7 @@ typedef struct SWorkThrdObj { } SWorkThrdObj; typedef struct SServerObj { - TdThread thread; + TdThread thread; uv_tcp_t server; uv_loop_t* loop; @@ -106,8 +106,7 @@ static const char* notify = "a"; srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ - taosArrayPush(conn->srvMsgs, &srvMsg); \ - if (taosArrayGetSize(conn->srvMsgs) > 1) { \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ uvStartSendRespInternal(srvMsg); \ @@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) { transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); - if (conn->srvMsgs != NULL) { - assert(taosArrayGetSize(conn->srvMsgs) >= 1); - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); - tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); - taosArrayRemove(conn->srvMsgs, 0); + if (!transQueueEmpty(&conn->srvMsgs)) { + SSrvMsg* msg = transQueuePop(&conn->srvMsgs); if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); } destroySmsg(msg); // send second data, just use for push - if (taosArrayGetSize(conn->srvMsgs) > 0) { - tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); - msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + if (!transQueueEmpty(&conn->srvMsgs)) { + msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs); if (msg->type == Register && conn->status == ConnAcquire) { conn->regArg.notifyCount = 0; conn->regArg.init = 1; @@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) { (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); } - taosArrayRemove(conn->srvMsgs, 0); + transQueuePop(&conn->srvMsgs); free(msg); } else { uvStartSendRespInternal(msg); @@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { transUnrefSrvHandle(pConn); } - taosArrayPush(pConn->srvMsgs, &smsg); - if (taosArrayGetSize(pConn->srvMsgs) > 1) { - tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + if (!transQueuePush(&pConn->srvMsgs, smsg)) { return; } uvStartSendRespInternal(smsg); @@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_INIT(&pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue); - pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // - tTrace("server conn %p created", pConn); + + transQueueInit(&pConn->srvMsgs, NULL); memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; transRefSrvHandle(pConn); + tTrace("server conn %p created", pConn); return pConn; } @@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { } transDestroyBuffer(&conn->readBuf); - for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); - destroySmsg(msg); - } - conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); + transQueueDestroy(&conn->srvMsgs); if (clear) { tTrace("server conn %p to be destroyed", conn); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); @@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { // release handle to rpc init SSrvConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - taosArrayPush(conn->srvMsgs, &msg); - if (taosArrayGetSize(conn->srvMsgs) > 1) { + if (!transQueuePush(&conn->srvMsgs, msg)) { return; } uvStartSendRespInternal(msg); @@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; tDebug("server conn %p register brokenlink callback", conn); if (conn->status == ConnAcquire) { - if (taosArrayGetSize(conn->srvMsgs) > 0) { - taosArrayPush(conn->srvMsgs, &msg); + if (!transQueuePush(&conn->srvMsgs, msg)) { return; } conn->regArg.notifyCount = 0; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 0b1b1834dfb0fae47e666f7806705a6658816b8e..89cb66e9cfb174a8d40cc03a4cbbb2445c3ce0e6 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -364,9 +364,12 @@ TEST_F(TransEnv, srvReleaseHandle) { SRpcMsg resp = {0}; tr->SetSrvContinueSend(processReleaseHandleCb); // tr->Restart(processReleaseHandleCb); - void *handle = NULL; + void * handle = NULL; + SRpcMsg req = {0}; for (int i = 0; i < 1; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -378,8 +381,11 @@ TEST_F(TransEnv, srvReleaseHandle) { } TEST_F(TransEnv, cliReleaseHandleExcept) { SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 3; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -396,8 +402,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) { } TEST_F(TransEnv, srvContinueSend) { tr->SetSrvContinueSend(processContinueSend); + SRpcMsg req = {0}, resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {0}, resp = {0}; + memset(&req, 0, sizeof(req)); + memset(&resp, 0, sizeof(resp)); req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -410,8 +418,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -428,8 +438,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -450,8 +462,11 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, queryExcept) { tr->SetSrvContinueSend(processRegisterFailure); SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + memset(&req, 0, sizeof(req)); + req.handle = resp.handle; + req.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -466,8 +481,10 @@ TEST_F(TransEnv, queryExcept) { } TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; + SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { - SRpcMsg req = {.noResp = 1}; + memset(&req, 0, sizeof(req)); + req.noResp = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 1f8c8e8ff20d3ae57d120e02dfd04c8e5a84850d..65d930299423963f2793d229384a1131d94762bc 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test { // TODO } virtual void TearDown() { - transCtxDestroy(ctx); + transCtxCleanup(ctx); // formate } STransCtx *ctx; diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh index c6c92bf72458c110ffa25a0127468d0bf2723c99..dd6382992ac8cfa08a6ec01a921059da61e0683f 100755 --- a/tests/script/sh/massiveTable/compileVersion.sh +++ b/tests/script/sh/massiveTable/compileVersion.sh @@ -68,7 +68,7 @@ gitPullBranchInfo $TDengineBrVer compileTDengineVersion taos_dir=${projectDir}/debug/tools/shell -taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon +taosd_dir=${projectDir}/debug/source/dnode/mgmt/main exec_process_dir=${projectDir}/debug/tests/test/c rm -f /usr/bin/taos diff --git a/tests/script/tsim/insert/null.sim b/tests/script/tsim/insert/null.sim new file mode 100644 index 0000000000000000000000000000000000000000..9dcc435486e8c8a87f19eaac2322870db58fd463 --- /dev/null +++ b/tests/script/tsim/insert/null.sim @@ -0,0 +1,358 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database d0 +sql show databases +if $rows != 1 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use d0 + +print =============== create super table, include column type for count/sum/min/max/first +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 bigint) tags (t1 int unsigned) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags(1000) +sql create table ct2 using stb tags(2000) +sql create table ct3 using stb tags(3000) + +sql show tables +if $rows != 3 then + return -1 +endi + +print =============== insert data, include NULL +sql insert into ct1 values (now+0s, 10, 2.0, 3.0, 90)(now+1s, NULL, NULL, NULL, NULL)(now+2s, NULL, 2.1, 3.1, 91)(now+3s, 11, NULL, 3.2, 92)(now+4s, 12, 2.2, NULL, 93)(now+5s, 13, 2.3, 3.3, NULL) +sql insert into ct1 values (now+6s, NULL, 2.4, 3.4, 94) +sql insert into ct1 values (now+7s, 14, NULL, 3.5, 95) +sql insert into ct1 values (now+8s, 15, 2.5, NULL, 96) +sql insert into ct1 values (now+9s, 16, 2.6, 3.6, NULL) +sql insert into ct1 values (now+10s, NULL, NULL, NULL, NULL) +sql insert into ct1 values (now+11s, -2147483648, 2.7, 3.7, 97) + +#=================================================================== +#=================================================================== +print =============== query data from child table +sql select * from ct1 +print ===> select * from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +print ===> rows1: $data10 $data11 $data12 $data13 $data14 +print ===> rows2: $data20 $data21 $data22 $data23 $data24 +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +if $rows != 12 then + return -1 +endi +if $data01 != 10 then + return -1 +endi +if $data02 != 2.00000 then + return -1 +endi +if $data03 != 3.000000000 then + return -1 +endi +#if $data41 != -14 then +# return -1 +#endi +#if $data42 != -2.40000 then +# return -1 +#endi +#if $data43 != -3.400000000 then +# return -1 +#endi + + +print =============== select count(*) from child table +sql select count(*) from ct1 +print ===> select count(*) from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +if $rows != 1 then + return -1 +endi + +print $data00 $data01 $data02 +if $data00 != 4 then + return -1 +endi + +print =============== select count(column) from child table +sql select count(ts), count(c1), count(c2), count(c3) from ct1 +print ===> select count(ts), count(c1), count(c2), count(c3) from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 + +if $data00 != 4 then + return -1 +endi +if $data01 != 4 then + return -1 +endi +if $data02 != 4 then + return -1 +endi +if $data03 != 4 then + return -1 +endi + +#print =============== select first(*)/first(column) from child table +#sql select first(*) from ct1 +#sql select first(ts), first(c1), first(c2), first(c3) from ct1 + +print =============== select min(column) from child table +sql select min(c1), min(c2), min(c3) from ct1 +print ===> select min(c1), min(c2), min(c3) from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +if $rows != 1 then + return -1 +endi +if $data00 != 10 then + return -1 +endi +if $data01 != 2.00000 then + return -1 +endi +if $data02 != 3.000000000 then + return -1 +endi + +print =============== select max(column) from child table +sql select max(c1), max(c2), max(c3) from ct1 +print ===> select max(c1), max(c2), max(c3) from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +if $rows != 1 then + return -1 +endi +if $data00 != 13 then + return -1 +endi +if $data01 != 2.30000 then + return -1 +endi +if $data02 != 3.300000000 then + return -1 +endi + +print =============== select sum(column) from child table +sql select sum(c1), sum(c2), sum(c3) from ct1 +print ===> select sum(c1), sum(c2), sum(c3) from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +if $rows != 1 then + return -1 +endi +if $data00 != 46 then + return -1 +endi +if $data01 != 8.599999905 then + return -1 +endi +if $data02 != 12.600000000 then + return -1 +endi + +print =============== select column, from child table +sql select c1, c2, c3 from ct1 +print ===> select c1, c2, c3 from ct1 +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +#if $rows != 4 then +# return -1 +#endi +#if $data00 != 10 then +# return -1 +#endi +#if $data01 != 2.00000 then +# return -1 +#endi +#if $data02 != 3.000000000 then +# return -1 +#endi +#if $data10 != 11 then +# return -1 +#endi +#if $data11 != 2.10000 then +# return -1 +#endi +#if $data12 != 3.100000000 then +# return -1 +#endi +#if $data30 != 13 then +# return -1 +#endi +#if $data31 != 2.30000 then +# return -1 +#endi +#if $data32 != 3.300000000 then +# return -1 +#endi +#=================================================================== +#=================================================================== + + +return + +#print =============== query data from stb +#sql select * from stb +#print ===> +#print ===> rows: $rows +#print ===> rows0: $data00 $data01 $data02 $data03 $data04 +#if $rows != 4 then +# return -1 +#endi +#print =============== select count(*) from supter table +#sql select count(*) from stb +#if $rows != 1 then +# return -1 +#endi +# +#print $data00 $data01 $data02 +#if $data00 != 8 then +# return -1 +#endi +# +#print =============== select count(column) from supter table +#sql select count(ts), count(c1), count(c2), count(c3) from stb +#print $data00 $data01 $data02 $data03 +#if $data00 != 8 then +# return -1 +#endi +#if $data01 != 8 then +# return -1 +#endi +#if $data02 != 8 then +# return -1 +#endi +#if $data03 != 8 then +# return -1 +#endi + + +#=================================================================== +#=================================================================== + +print =============== stop and restart taosd, then again do query above +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +sleep 2000 +sql select * from ct1 +if $rows != 4 then # after fix bug, modify 4 to 7 + return -1 +endi +if $data01 != 10 then + return -1 +endi +if $data02 != 2.00000 then + return -1 +endi +if $data03 != 3.000000000 then + return -1 +endi +#if $data41 != -14 then +# return -1 +#endi +#if $data42 != -2.40000 then +# return -1 +#endi +#if $data43 != -3.400000000 then +# return -1 +#endi + + +print =============== select count(*) from child table +sql select count(*) from ct1 +if $rows != 1 then + return -1 +endi + +print $data00 $data01 $data02 +if $data00 != 4 then + return -1 +endi + +print =============== select count(column) from child table +sql select count(ts), count(c1), count(c2), count(c3) from ct1 +print $data00 $data01 $data02 $data03 +if $data00 != 4 then + return -1 +endi +if $data01 != 4 then + return -1 +endi +if $data02 != 4 then + return -1 +endi +if $data03 != 4 then + return -1 +endi + +#print =============== select first(*)/first(column) from child table +#sql select first(*) from ct1 +#sql select first(ts), first(c1), first(c2), first(c3) from ct1 + +print =============== select min(column) from child table +sql select min(c1), min(c2), min(c3) from ct1 +print $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != 10 then + return -1 +endi +if $data01 != 2.00000 then + return -1 +endi +if $data02 != 3.000000000 then + return -1 +endi + +print =============== select max(column) from child table +sql select max(c1), max(c2), max(c3) from ct1 +print $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != 13 then + return -1 +endi +if $data01 != 2.30000 then + return -1 +endi +if $data02 != 3.300000000 then + return -1 +endi + +print =============== select sum(column) from child table +sql select sum(c1), sum(c2), sum(c3) from ct1 +print $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != 46 then + return -1 +endi +if $data01 != 8.599999905 then + return -1 +endi +if $data02 != 12.600000000 then + return -1 +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index 08a10f26e4b61d840e3abee9d55ff0d82dcd96ca..a463d69fe59cea15a28f0d49ade3995347b5f3fd 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -31,60 +31,91 @@ if $rows != 2 then return -1 endi -print =============== insert data into child table +print =============== insert data into child table ct1 (s) sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:06.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:10.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:16.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:20.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:26.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:30.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:36.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:40.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:46.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:50.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:01:56.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:00.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:06.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:10.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:16.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:20.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:26.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:30.000', 1 ) -sql insert into ct1 values ( '2022-01-01 01:02:36.000', 1 ) +sql insert into ct1 values ( '2022-01-01 01:01:06.000', 2 ) +sql insert into ct1 values ( '2022-01-01 01:01:10.000', 3 ) +sql insert into ct1 values ( '2022-01-01 01:01:16.000', 4 ) +sql insert into ct1 values ( '2022-01-01 01:01:20.000', 5 ) +sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 ) +sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 ) +sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 ) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) -print ===> $rows $data00 $data01 $data02 $data03 $data04 -if $rows != 10 then +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +print ===> rows1: $data10 $data11 $data12 $data13 $data14 +print ===> rows2: $data20 $data21 $data22 $data23 $data24 +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +if $rows != 5 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data40 != 1 then + return -1 +endi + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s) +print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s) +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +print ===> rows1: $data10 $data11 $data12 $data13 $data14 +print ===> rows2: $data20 $data21 $data22 $data23 $data24 +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +if $rows != 5 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data40 != 1 then + return -1 +endi + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s) +print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s) +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +print ===> rows1: $data10 $data11 $data12 $data13 $data14 +print ===> rows2: $data20 $data21 $data22 $data23 $data24 +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +print ===> rows5: $data50 $data51 $data52 $data53 $data54 +print ===> rows6: $data60 $data61 $data62 $data63 $data64 +print ===> rows7: $data70 $data71 $data72 $data73 $data74 +if $rows != 8 then return -1 endi if $data00 != 2 then return -1 endi -if $data04 != 2 then +if $data70 != 1 then return -1 endi +print =============== insert data into child table ct2 (d) sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-01 12:00:01.000', 2 ) -sql insert into ct2 values ( '2022-01-01 23:00:01.000', 3 ) -sql insert into ct2 values ( '2022-01-02 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-03 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-04 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-05 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-06 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-07 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-08 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-09 10:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-10 10:00:01.000', 1 ) +sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 ) +sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 ) +sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 ) +sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 ) +sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 ) +sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 ) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) -print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2w) +print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) +print ===> rows: $rows print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows2: $data20 $data21 $data22 $data23 $data24 -if $rows != 11 then +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +if $rows != 4 then return -1 endi if $data00 != 1 then @@ -94,8 +125,37 @@ if $data10 != 2 then return -1 endi +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h) +print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h) +print ===> rows: $rows +print ===> rows0: $data00 $data01 $data02 $data03 $data04 +print ===> rows1: $data10 $data11 $data12 $data13 $data14 +print ===> rows2: $data20 $data21 $data22 $data23 $data24 +print ===> rows3: $data30 $data31 $data32 $data33 $data34 +print ===> rows4: $data40 $data41 $data42 $data43 $data44 +print ===> rows5: $data50 $data51 $data52 $data53 $data54 +print ===> rows6: $data60 $data61 $data62 $data63 $data64 +print ===> rows7: $data70 $data71 $data72 $data73 $data74 +if $rows != 7 then + return -1 +endi +if $data00 != 2 then + return -1 +endi +if $data60 != 1 then + return -1 +endi + return + + + + + + + + sql select count(*) from car interval(1n, 10d) order by ts desc # tdSql.checkData(0, 1, 1) # tdSql.checkData(1, 1, 2) diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 609f8d6b694d69218b8a54eddd4cbf435628d37d..08e49a7efe04f962ef43c2e61c9a7d704332a49b 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -306,8 +306,9 @@ int32_t init_env() { } //const char* sql = "select * from tu1"; - sprintf(sqlStr, "select * from %s%d", g_stConfInfo.stbName, 0); - pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr)); + sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0); + /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ + pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); return -1;