diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 23f59adda7cf34ee6613e00d2461bad8fae17e34..8988459637bb486fd5357981d8eed695aaa3c7a9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -55,8 +55,12 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) \ - (((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0) +#define TMSG_INFO(TYPE) \ + ((TYPE) >= 0 && \ + ((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \ + (TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \ + ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ + : 0 #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 8b93228f07f7d09dde9eb8c188c1f4c1ad097a2f..806c0b51225ee1b7b3228a12aa3955d35adeb1bd 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -82,6 +82,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -164,6 +165,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -199,6 +201,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) @@ -210,6 +213,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) @@ -218,6 +222,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) @@ -228,6 +233,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) @@ -252,6 +258,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index ff0c2df25e3a5458934f8fae7beed2f0e0f76fd3..0fe4274091cd085f2079628910dd7d31d5ba9027 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -279,7 +279,6 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { } pRequest->body.queryFp(pRequest->body.param, pRequest, code); - // pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 136ee3495051b60ec29887b29c06a9854b340d2b..a30d60a589150944c0cf45ee3b38fc202805360a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -665,8 +665,6 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { } void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { - tscDebug("enter meta callback, code %s", tstrerror(code)); - SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; SQuery * pQuery = pWrapper->pQuery; SRequestObj * pRequest = pWrapper->pRequest; @@ -686,10 +684,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { TSWAP(pRequest->tableList, (pQuery)->pTableList); destorySqlParseWrapper(pWrapper); + + tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); - tscDebug("error happens, code:%d", code); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 613f3fb994da4fca90842b971ca65b3e6db7fbc7..90f852eed17bcdc69513593b56a81c7594044566 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); // dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); - pVnodes[num] = (*ppVnode); - num++; + pVnodes[num++] = (*ppVnode); pIter = taosHashIterate(pMgmt->hash, pIter); } else { taosHashCancelIterate(pMgmt->hash, pIter); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 681440dec46afd415d21d4c723e7ca6d8446e409..dc1bcbd258847ed409fa6e4dd5be25c5a21d8c36 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId); + dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); @@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) { } static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { - pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (pMgmt->hash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; dError("failed to init vnode hash since %s", terrstr()); @@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.totalVnodes = numOfVnodes; - int32_t threadNum = 1; + int32_t threadNum = tsNumOfCores / 2; + if (threadNum < 1) threadNum = 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ecd02ae8dc3f7b00d7c181849024a7a8141be2c2..3913e3fda8a2598ddc4546899f57c904c07f8c69 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -107,35 +107,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); // no response here + int32_t code = vnodeProcessSyncMsg(pVnode->pImpl, pMsg, NULL); // no response here dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } -static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - - for (int32_t i = 0; i < numOfMsgs; ++i) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - const STraceId *trace = &pMsg->info.traceId; - dGTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg); - - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); - if (code != 0) { - if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr()); - vmSendRsp(pMsg, code); - } - - dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; SMsgHead *pHead = pMsg->pCont; @@ -207,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) return -1; + if (pMsg == NULL) { + rpcFreeCont(pMsg->pCont); + pRpc->pCont = NULL; + return -1; + } SMsgHead *pHead = pRpc->pCont; dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); @@ -215,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - return vmPutMsgToQueue(pMgmt, pMsg, qtype); + + int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); + if (code != 0) { + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pMsg->pCont); + pRpc->pCont = NULL; + } + + return code; } int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index ee27f27f06fe0ce502fdfd729a2b573ddb221c37..d70ed09920584c666a188e19e3c7e42beb29baa2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); + rpcCleanup(); dDebug("dnode is closed, ptr:%p", pDnode); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index df3c9c4e88f1d59ac23f1f56080236c40fdb729a..70439915250b130539bb0e7e6f4ad5658c4f42c6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -185,6 +185,7 @@ _OVER: taosFreeQitem(pMsg); } rpcFreeCont(pRpc->pCont); + pRpc->pCont = NULL; } dmReleaseWrapper(pWrapper); @@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray * pArray = (*pWrapper->func.getHandlesFp)(); + SArray *pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle * pMgmt = taosArrayGet(pArray, i); + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3a3331a0b3921e2e0f71b8cff7826534d7249f52..85f1ce6843c30d7ba603478851f160e0d67b271f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -739,9 +739,12 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) { pDb = mndAcquireDb(pMnode, pVgroup->dbName); } - int64_t vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024; - if (pDb->cfg.cacheLastRow > 0) { - vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024; + int64_t vgroupMemroy = 0; + if (pDb != NULL) { + vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024; + if (pDb->cfg.cacheLastRow > 0) { + vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024; + } } if (pDbInput == NULL) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index de2905fe966475283fea158384d634a6ad4f715c..0498d889d68e0e111815e2bbd4af9545391a7907 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,10 +51,10 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3e2d7c614f90a1e8267f9a17b683a66921d81dd8..ae659dc396c4d34ab713126c5009741a338cd1eb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,7 +242,7 @@ struct SVnode { SSink* pSink; tsem_t canCommit; int64_t sync; - int32_t syncCount; + int32_t blockCount; tsem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index ef61897f914d1a95d1973a05291f59f4a39986a6..8561314431cc7e33e411408c4d68f3e6480552bc 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -85,6 +85,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { void tqOffsetClose(STqOffsetStore* pStore) { tqOffsetSnapshot(pStore); taosHashCleanup(pStore->pHash); + taosMemoryFree(pStore); } STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ec88ec3119aab3b16f97ffe1024cef98970cfa4e..4267dd9b1f560ae10f83fb8b27d79b211490961e 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -83,7 +83,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.commitID = info.state.commitID; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; - pVnode->syncCount = 0; + pVnode->blockCount = 0; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a274c8e9dc471c78cf7003248869c50381a1a24c..4530eee4a5b9f1272d49f5213612ce8f0539b5a1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; SDecoder dc = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 0445eda7af3811b882c40f0e4c6df69b43496a12..add8c6069a132dcacca5a8e018e6463c791b9842 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -25,12 +25,12 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) { if (!vnodeIsMsgBlock(type)) return; - int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1); + int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1); vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); } static inline void vnodeWaitBlockMsg(SVnode *pVnode) { - int32_t count = atomic_load_32(&pVnode->syncCount); + int32_t count = atomic_load_32(&pVnode->blockCount); if (count <= 0) return; vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); @@ -40,10 +40,10 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) { static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { if (!vnodeIsMsgBlock(type)) return; - int32_t count = atomic_load_32(&pVnode->syncCount); + int32_t count = atomic_load_32(&pVnode->blockCount); if (count <= 0) return; - count = atomic_sub_fetch_32(&pVnode->syncCount, 1); + count = atomic_sub_fetch_32(&pVnode->blockCount, 1); vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); if (count <= 0) { tsem_post(&pVnode->syncSem); @@ -84,8 +84,10 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { terrno = TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG; } - STraceId *trace = &pMsg->info.traceId; + + const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle); + SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; for (int32_t r = 0; r < req.replica; ++r) { SNodeInfo *pNode = &cfg.nodeInfo[r]; @@ -126,32 +128,23 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t m = 0; m < numOfMsgs; m++) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - STraceId *trace = &pMsg->info.traceId; + const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); - if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { - code = vnodeProcessAlterReplicaReq(pVnode, pMsg); + code = vnodePreProcessReq(pVnode, pMsg); + if (code != 0) { + vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); } else { - code = vnodePreprocessReq(pVnode, pMsg); - if (code != 0) { - vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr()); + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + code = vnodeProcessAlterReplicaReq(pVnode, pMsg); } else { code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); - if (code == 1) { - do { - static int32_t cnt = 0; - if (cnt++ % 1000 == 1) { - vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg, - pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType); - } - } while (0); - + if (code > 0) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; - vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); + vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); } - if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } @@ -161,33 +154,27 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (code == 0) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); - } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { - SEpSet newEpSet = {0}; - syncGetRetryEpSet(pVnode->sync, &newEpSet); - - /* - syncGetEpSet(pVnode->sync, &newEpSet); - SEp *pEp = &newEpSet.eps[newEpSet.inUse]; - if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { - newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; - } - */ - - vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, - newEpSet.inUse); - for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { - vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); - } - pMsg->info.hasEpSet = 1; - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - tmsgSendRedirectRsp(&rsp, &newEpSet); - } else { - if (code != 1) { + } else if (code < 0) { + if (terrno == TSDB_CODE_SYN_NOT_LEADER) { + SEpSet newEpSet = {0}; + syncGetRetryEpSet(pVnode->sync, &newEpSet); + vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, + newEpSet.inUse); + for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { + vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); + } + pMsg->info.hasEpSet = 1; + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + tmsgSendRedirectRsp(&rsp, &newEpSet); + } else { if (terrno != 0) code = terrno; vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } } + } else { } vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); @@ -206,7 +193,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - STraceId *trace = &pMsg->info.traceId; + const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle); @@ -229,172 +216,150 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } } -int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = 0; - - if (syncEnvIsStart()) { - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); - assert(pSyncNode != NULL); +int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + int32_t code = 0; + const STraceId *trace = &pMsg->info.traceId; - SMsgHead *pHead = pMsg->pCont; - STraceId *trace = &pMsg->info.traceId; + if (!syncEnvIsStart()) { + vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId); + terrno = TSDB_CODE_APP_ERROR; + return -1; + } - do { - char *syncNodeStr = sync2SimpleStr(pVnode->sync); - static int64_t vndTick = 0; - if (++vndTick % 10 == 1) { - vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); - } - if (gRaftDetailLog) { - char logBuf[512] = {0}; - snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, - syncNodeStr); - syncRpcMsgLog2(logBuf, pMsg); - } - taosMemoryFree(syncNodeStr); - } while (0); - - SRpcMsg *pRpcMsg = pMsg; - - // ToDo: ugly! use function pointer - // use different strategy - if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { - if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - ret = vnodeSetStandBy(pVnode); - if (ret != 0 && terrno != 0) ret = terrno; - SRpcMsg rsp = {.code = ret, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = -1; - } + SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); + if (pSyncNode == NULL) { + vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } +#if 1 + char *syncNodeStr = sync2SimpleStr(pVnode->sync); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); + } + if (gRaftDetailLog) { + char logBuf[512] = {0}; + snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + } + taosMemoryFree(syncNodeStr); +#endif + + if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + code = vnodeSetStandBy(pVnode); + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); } else { - // use wal first strategy - - if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { - SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - syncClientRequestBatchDestroyDeep(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { - SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesBatchDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - ret = vnodeSetStandBy(pVnode); - if (ret != 0 && terrno != 0) ret = terrno; - SRpcMsg rsp = {.code = ret, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = -1; - } + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; } - - syncNodeRelease(pSyncNode); } else { - vError("==vnodeProcessSyncReq== error syncEnv stop"); - ret = -1; + // use wal first strategy + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { + SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); + syncClientRequestBatchDestroyDeep(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { + SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesBatchDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + code = vnodeSetStandBy(pVnode); + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; + } } - if (ret != 0 && terrno == 0) { + syncNodeRelease(pSyncNode); + if (code != 0 && terrno == 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } - return ret; + return code; } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { @@ -427,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info); rpcMsg.info.conn.applyIndex = cbMeta.index; - STraceId *trace = (STraceId *)&pMsg->info.traceId; + const STraceId *trace = (STraceId *)&pMsg->info.traceId; vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); if (rpcMsg.info.handle != NULL) { @@ -444,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - beginIndex); + "commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; @@ -459,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index cd454c075b9430d72753d9e7bba2f355cbbebd1e..fc3e3cbc8af5f55a51f3b25f6521de73bec7589f 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -548,19 +548,17 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { } static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + SSDataBlock* pBlock = createDataBlock(); if (NULL == pBlock) { return TSDB_CODE_OUT_OF_MEMORY; } - pBlock->pDataBlock = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SColumnInfoData)); - SNode* pProj = NULL; FOREACH(pProj, pProjects) { SColumnInfoData infoData = {0}; infoData.info.type = ((SExprNode*)pProj)->resType.type; infoData.info.bytes = ((SExprNode*)pProj)->resType.bytes; - taosArrayPush(pBlock->pDataBlock, &infoData); + blockDataAppendColInfo(pBlock, &infoData); } *pOutput = pBlock; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index fc9b480233969133d4e94d6a81b78ffc63457a80..7f84b4c1743e49c8cd6c42d452c970bc5ecb49a7 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -27,6 +27,10 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } +static void cleanupRefPool() { + int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); + taosCloseRef(ref); +} int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { @@ -34,7 +38,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); - + atexit(cleanupRefPool); int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c0e7a7533a1c726a0012cc330534630d946e5f6b..175e686ca0871b59ff11bf3eaea8d10ff323abc5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1367,6 +1367,7 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { } taosArrayDestroy(pInfo->scanCols); + taosMemoryFreeClear(pInfo->pUser); } static int32_t getSysTableDbNameColId(const char* pTable) { @@ -1788,8 +1789,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); pInfo->pRes->info.rows = p->info.rows; + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); blockDataDestroy(p); return pInfo->pRes->info.rows; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 22c4c3ed8d43c94c52433c7c6361d315ace8357c..10095189703b3e7ff9b48bb2b58f956d7bd8a7d7 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -48,13 +48,19 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) { return TIME_UNIT_INVALID; } - if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) { + if (TSDB_TIME_PRECISION_MILLI == dbPrec && (0 == strcasecmp(pVal->literal, "1u") || + 0 == strcasecmp(pVal->literal, "1b"))) { return TIME_UNIT_TOO_SMALL; } - if (pVal->literal[0] != '1' || - (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' && - pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w')) { + if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) { + return TIME_UNIT_TOO_SMALL; + } + + if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && + pVal->literal[1] != 's' && pVal->literal[1] != 'm' && + pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && + pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) { return TIME_UNIT_INVALID; } @@ -700,9 +706,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "ELAPSED function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } @@ -1223,9 +1228,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "STATEDURATION function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } @@ -1735,9 +1739,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "TIMETRUNCATE function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } addDbPrecisonParam(&pFunc->pParameterList, dbPrec); @@ -1775,9 +1778,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "TIMEDIFF function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bc0010570c0143b497fbd936e734807600c1f307..202e590955443cf97fa586b5c2f60b80ecf9030e 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -490,18 +490,7 @@ static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicN } static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { - switch (nodeType(pChild)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: - return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond); - case QUERY_NODE_LOGIC_PLAN_PROJECT: - return pushDownCondOptPushCondToProject(pCxt, (SProjectLogicNode*)pChild, pCond); - case QUERY_NODE_LOGIC_PLAN_JOIN: - return pushDownCondOptPushCondToJoin(pCxt, (SJoinLogicNode*)pChild, pCond); - default: - break; - } - planError("pushDownCondOptPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild))); - return TSDB_CODE_PLAN_INTERNAL_ERROR; + return pushDownCondOptAppendCond(&pChild->pConditions, pCond); } static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) { @@ -802,10 +791,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg return TSDB_CODE_SUCCESS; } // TODO: remove it after full implementation of pushing down to child - if (1 != LIST_LENGTH(pAgg->node.pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && - QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && - QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { + if (1 != LIST_LENGTH(pAgg->node.pChildren)) { return TSDB_CODE_SUCCESS; } @@ -832,6 +818,77 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg return code; } +typedef struct SRewriteProjCondContext { + SProjectLogicNode* pProj; + int32_t errCode; +}SRewriteProjCondContext; + +static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) { + SRewriteProjCondContext* pCxt = pContext; + SProjectLogicNode* pProj = pCxt->pProj; + if (QUERY_NODE_COLUMN == nodeType(*ppNode)) { + SNode* pTarget = NULL; + FOREACH(pTarget, pProj->node.pTargets) { + if (nodesEqualNode(pTarget, *ppNode)) { + SNode* pProjection = NULL; + FOREACH(pProjection, pProj->pProjections) { + if (0 == strcmp(((SExprNode*)pProjection)->aliasName, ((SColumnNode*)(*ppNode))->colName)) { + SNode* pExpr = nodesCloneNode(pProjection); + if (pExpr == NULL) { + pCxt->errCode = terrno; + return DEAL_RES_ERROR; + } + nodesDestroyNode(*ppNode); + *ppNode = pExpr; + } // end if expr alias name equal column name + } // end for each project + } // end if target node equals cond column node + } // end for each targets + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + +static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) { + SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS}; + SNode* pProjectCond = pProject->node.pConditions; + nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt); + *ppProjectCond = pProjectCond; + pProject->node.pConditions = NULL; + return cxt.errCode; +} + +static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) { + if (NULL == pProject->node.pConditions || + OPTIMIZE_FLAG_TEST_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { + return TSDB_CODE_SUCCESS; + } + // TODO: remove it after full implementation of pushing down to child + if (1 != LIST_LENGTH(pProject->node.pChildren)) { + return TSDB_CODE_SUCCESS; + } + + if (NULL != pProject->node.pLimit || NULL != pProject->node.pSlimit) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + SNode* pProjCond = NULL; + code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond); + if (TSDB_CODE_SUCCESS == code) { + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0); + code = pushDownCondOptPushCondToChild(pCxt, pChild, &pProjCond); + } + + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); + pCxt->optimized = true; + } else { + nodesDestroyNode(pProjCond); + } + return code; +} + static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pLogicNode)) { @@ -844,6 +901,9 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog case QUERY_NODE_LOGIC_PLAN_AGG: code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode); break; + case QUERY_NODE_LOGIC_PLAN_PROJECT: + code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode); + break; default: break; } diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 3994db09024f355f0a1581b596c85cc85650cdbb..e4019292d81fa5c5578c1943367992fff7f60dbc 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -81,3 +81,8 @@ TEST_F(PlanOptimizeTest, eliminateProjection) { run("SELECT c1 FROM st1s3"); // run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first"); } + +TEST_F(PlanOptimizeTest, pushDownProjectCond) { + useDb("root", "test"); + run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first"); +} \ No newline at end of file diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e6b7c755644f8a59dfc27a7ff12829de9bed74bd..df5df127f066beef7a64b40f95dc0909ed6d58a3 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1174,7 +1174,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - timeUnit = timeUnit * 1000 / factor; + int64_t unit = timeUnit * 1000 / factor; for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { @@ -1209,12 +1209,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf); int32_t tsDigits = (int32_t)strlen(buf); - switch (timeUnit) { - case 0: { /* 1u */ + switch (unit) { + case 0: { /* 1u or 1b */ if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000 * 1000; - //} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - // //timeVal = timeVal / 1000; + if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) { + timeVal = timeVal * 1; + } else { + timeVal = timeVal / 1000 * 1000; + } } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor; } else { @@ -1366,8 +1368,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - timeUnit = timeUnit * 1000 / factor; - int32_t numOfRows = 0; for (int32_t i = 0; i < inputNum; ++i) { if (pInput[i].numOfRows > numOfRows) { @@ -1447,9 +1447,14 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } } } else { - switch(timeUnit) { - case 0: { /* 1u */ - result = result / 1000; + int64_t unit = timeUnit * 1000 / factor; + switch(unit) { + case 0: { /* 1u or 1b */ + if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) { + result = result / 1; + } else { + result = result / 1000; + } break; } case 1: { /* 1a */ diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5ce073081d51974e6ad1e4e732a31b1ba581e7bf..f699df68839648114b1141ea0d3692a13a4cbf97 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,8 +96,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) +#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define rpcIsReq(type) (type & 1U) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) -#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) -#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) +#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) +#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) -#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); -#define transIsReq(type) (type & 1U) +#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); +#define transIsReq(type) (type & 1U) #define transLabel(trans) ((STrans*)trans)->label diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4c2af32be34bf99b6660b0f0d5df4c986938c7d7..30dd7bbf3360dbb831b6e01dda6615155cc379e2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn, + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen); } else { - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn), - pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, - transMsg.code); + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), + transMsg.contLen, pHead->noResp, transMsg.code); // no ref here } @@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; - tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn, - pConn->refId); + tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pTransInst), transMsg.info.handle, + pConn, pConn->refId); assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { @@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); - STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); } @@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt SSvrConn* conn = cli->data; SConnBuffer* pBuf = &conn->readBuf; + STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; - tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread); + tTrace("%s conn %p total read: %d, current read: %d", transLabel(pTransInst), conn, pBuf->len, (int)nread); if (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); uvHandleReq(conn); } else { - tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn); } return; } @@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread)); + tError("%s conn %p read error: %s", transLabel(pTransInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { - tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn); STrans* pTransInst = conn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); @@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pMsg->info.traceId; - tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn, + tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pTransInst), pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len); pHead->msgLen = htonl(len); @@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) { exh->refId = transAddExHandle(transGetRefMgt(), exh); transAcquireExHandle(transGetRefMgt(), exh->refId); + STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; transRefSrvHandle(pConn); - tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId); + tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; } @@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); - tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); + STrans* pTransInst = thrd->pTransInst; + tDebug("%s conn %p destroy", transLabel(pTransInst), conn); + + for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { + SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); + destroySmsg(msg); + } transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); @@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) { m->msg = tmsg; m->type = Register; - tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); + STrans* pTransInst = pThrd->pTransInst; + tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); return; diff --git a/tests/system-test/2-query/timetruncate.py b/tests/system-test/2-query/timetruncate.py index 7fcdee3d603022af936a8ce08a28cbcb7d14d874..ebc8e6b6f91acc6c5435ff6766c92dd968d55b07 100644 --- a/tests/system-test/2-query/timetruncate.py +++ b/tests/system-test/2-query/timetruncate.py @@ -24,7 +24,8 @@ class TDTestCase: ] self.db_param_precision = ['ms','us','ns'] self.time_unit = ['1w','1d','1h','1m','1s','1a','1u'] - self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1'] + #self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1'] + self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1'] self.ntbname = 'ntb' self.stbname = 'stb' self.ctbname = 'ctb' @@ -92,7 +93,7 @@ class TDTestCase: elif unit.lower() == '1d': for i in range(len(self.ts_str)): ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0])) - tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000) + tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000) elif unit.lower() == '1w': for i in range(len(self.ts_str)): ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0])) @@ -121,7 +122,7 @@ class TDTestCase: elif unit.lower() == '1d': for i in range(len(self.ts_str)): ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0])) - tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 ) + tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 ) elif unit.lower() == '1w': for i in range(len(self.ts_str)): ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0])) @@ -144,21 +145,21 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60)*60*60*1000*1000*1000 ) elif unit.lower() == '1d': for i in range(len(self.ts_str)): - tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 ) + tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 ) elif unit.lower() == '1w': for i in range(len(self.ts_str)): tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000) def data_check(self,date_time,precision,tb_type): for unit in self.time_unit: if (unit.lower() == '1u' and precision.lower() == 'ms') or () : - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.ctbname}') elif tb_type.lower() == 'stb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.stbname}') elif precision.lower() == 'ms': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') @@ -167,7 +168,7 @@ class TDTestCase: tdSql.checkRows(len(self.ts_str)) self.check_ms_timestamp(unit,date_time) elif precision.lower() == 'us': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') @@ -176,7 +177,7 @@ class TDTestCase: tdSql.checkRows(len(self.ts_str)) self.check_us_timestamp(unit,date_time) elif precision.lower() == 'ns': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 3a0a9b01b25dd4cacde20b9f5f30220e24507bd5..889b3165685259a83291e2796421a8315b42c8fe 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -157,7 +157,7 @@ python3 ./test.py -f 7-tmq/tmqCheckData1.py python3 ./test.py -f 7-tmq/tmqUdf.py #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 python3 ./test.py -f 7-tmq/tmqConsumerGroup.py -#python3 ./test.py -f 7-tmq/tmqShow.py +python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py diff --git a/tests/system-test/simpletest.bat b/tests/system-test/simpletest.bat index 656828aa1ea4b1b2681dab2edac2c94178b5176a..b7e10f423b0aa78175e4cd8a59efff2e7947a8a1 100644 --- a/tests/system-test/simpletest.bat +++ b/tests/system-test/simpletest.bat @@ -1,12 +1,12 @@ -python3 .\test.py -f 0-others\taosShell.py +@REM python3 .\test.py -f 0-others\taosShell.py python3 .\test.py -f 0-others\taosShellError.py python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udf_create.py -python3 .\test.py -f 0-others\udf_restart_taosd.py +@REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\user_control.py