diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 7f86b8f5e2dbc11811a290430e586765113e12d2..c4c2c89fdb230288fe9a83928f276d71aa3c79c1 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -42,7 +42,7 @@ typedef struct { typedef struct { void* tqReader; - void* meta; +// void* meta; void* config; void* vnode; void* mnd; @@ -54,7 +54,7 @@ typedef struct { int32_t numOfVgroups; void* sContext; // SSnapContext* - void* pStateBackend; + void* pStateBackend; struct SStorageAPI api; } SReadHandle; diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 8c79ba050026f52ab857ee32501051623ada590f..c0fd6c34fc9e9aee7db2599744bfe515b5c27827 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -243,6 +243,7 @@ typedef struct TsdReader { void (*tsdReaderNotifyClosing)(); } TsdReader; + /** * int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, @@ -262,7 +263,6 @@ typedef struct SStoreCacheReader { /*------------------------------------------------------------------------------------------------------------------*/ /* - * void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); @@ -552,7 +552,7 @@ typedef struct SStateStore { typedef struct SStorageAPI { SStoreMeta metaFn; // todo: refactor - TsdReader tsdReader; + TsdReader tsdReader; SStoreMetaReader metaReaderFn; SStoreCacheReader cacheFn; SStoreSnapshotFn snapshotFn; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fe1ccf90c872c090e6d0957e742b7879b4f18640..0f08b632942039d222777cfeb5a658dba464a3cd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -19,23 +19,23 @@ #include "vnode.h" #include "vnodeInt.h" -static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) { int32_t code = 0; @@ -301,32 +301,32 @@ _exit: return code; } -int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { +int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) { void *ptr = NULL; void *pReq; int32_t len; int32_t ret; - if (version <= pVnode->state.applied) { - vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, + if (ver <= pVnode->state.applied) { + vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver, pVnode->state.applied); terrno = TSDB_CODE_VND_DUP_REQUEST; return -1; } vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), - version); + ver); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); - ASSERT(pVnode->state.applied + 1 == version); + ASSERT(pVnode->state.applied + 1 == ver); - atomic_store_64(&pVnode->state.applied, version); + atomic_store_64(&pVnode->state.applied, ver); atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm); if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) { - if (tqCheckLogInWal(pVnode->pTq, version)) return 0; + if (tqCheckLogInWal(pVnode->pTq, ver)) return 0; } // skip header @@ -337,123 +337,123 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp switch (pMsg->msgType) { /* META */ case TDMT_VND_CREATE_STB: - if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_STB: - if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_STB: - if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_TABLE: - if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_TABLE: - if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TABLE: - if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TTL_TABLE: - if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_TRIM: - if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_SMA: - if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; /* TSDB */ case TDMT_VND_SUBMIT: - if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; + if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; break; case TDMT_VND_DELETE: - if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_BATCH_DEL: - if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; /* TQ */ case TDMT_VND_TMQ_SUBSCRIBE: - if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) { + if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } break; case TDMT_VND_TMQ_DELETE_SUB: - if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } break; case TDMT_VND_TMQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_TMQ_SEEK_TO_OFFSET: - if (tqProcessSeekReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_TMQ_ADD_CHECKINFO: - if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) { + if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } break; case TDMT_VND_TMQ_DEL_CHECKINFO: - if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) { + if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { - if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) { + if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_PAUSE: { - if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_RESUME: { - if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: { - if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_CHECK_RSP: { - if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) { + if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; - if (vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp) < 0) { + if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { goto _err; } break; case TDMT_VND_ALTER_CONFIG: - vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); + vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp); break; case TDMT_VND_COMMIT: needCommit = true; break; case TDMT_VND_CREATE_INDEX: - vnodeProcessCreateIndexReq(pVnode, version, pReq, len, pRsp); + vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp); break; case TDMT_VND_DROP_INDEX: - vnodeProcessDropIndexReq(pVnode, version, pReq, len, pRsp); + vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; case TDMT_VND_COMPACT: - vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp); + vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp); goto _exit; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); @@ -461,18 +461,18 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code, - version); + ver); - walApplyVer(pVnode->pWal, version); + walApplyVer(pVnode->pWal, ver); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // commit if need if (needCommit) { - vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); + vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver); if (vnodeAsyncCommit(pVnode) < 0) { vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -489,8 +489,8 @@ _exit: return 0; _err: - vError("vgId:%d, process %s request failed since %s, version:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), - tstrerror(terrno), version); + vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + tstrerror(terrno), ver); return -1; } @@ -514,7 +514,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: case TDMT_SCH_MERGE_QUERY: @@ -603,7 +603,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { } extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now); -static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -624,7 +624,7 @@ _exit: return code; } -static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -650,7 +650,7 @@ end: return ret; } -static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder coder; @@ -667,7 +667,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p goto _err; } - if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) { + if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) { pRsp->code = terrno; goto _err; } @@ -685,7 +685,7 @@ _err: return -1; } -static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SDecoder decoder = {0}; SEncoder encoder = {0}; int32_t rcode = 0; @@ -742,7 +742,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR } // do create table - if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) { + if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) { if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { cRsp.code = TSDB_CODE_SUCCESS; } else { @@ -790,7 +790,7 @@ _exit: return rcode; } -static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder dc = {0}; @@ -808,7 +808,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pR return -1; } - if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) { + if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) { pRsp->code = terrno; tDecoderClear(&dc); return -1; @@ -819,7 +819,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pR return 0; } -static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVDropStbReq req = {0}; int32_t rcode = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; @@ -839,7 +839,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe // process request tbUidList = taosArrayInit(8, sizeof(int64_t)); if (tbUidList == NULL) goto _exit; - if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) { + if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) { rcode = terrno; goto _exit; } @@ -862,7 +862,7 @@ _exit: return 0; } -static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVAlterTbReq vAlterTbReq = {0}; SVAlterTbRsp vAlterTbRsp = {0}; SDecoder dc = {0}; @@ -887,7 +887,7 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pRe } // process - if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) { + if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) { vAlterTbRsp.code = terrno; tDecoderClear(&dc); rcode = -1; @@ -912,7 +912,7 @@ _exit: return 0; } -static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVDropTbBatchReq req = {0}; SVDropTbBatchRsp rsp = {0}; SDecoder decoder = {0}; @@ -946,7 +946,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq tb_uid_t tbUid = 0; /* code */ - ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, &tbUid); + ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid); if (ret < 0) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) { dropTbRsp.code = TSDB_CODE_SUCCESS; @@ -1189,7 +1189,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { return code; } -static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; terrno = 0; @@ -1203,7 +1203,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq void *pAllocMsg = NULL; SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq; - if (0 == pMsg->version) { + if (0 == pMsg->ver) { code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq); if (TSDB_CODE_SUCCESS == code) { code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq); @@ -1251,7 +1251,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) { if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), version); + vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); goto _exit; } } @@ -1263,7 +1263,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq for (int32_t iRow = 0; iRow < nRow; ++iRow) { if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), version); + vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); goto _exit; } } @@ -1350,7 +1350,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); // create table - if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) { + if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) { // create table success if (newTbUids == NULL && @@ -1376,7 +1376,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq // insert data int32_t affectedRows; - code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &affectedRows); + code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows); if (code) goto _exit; pSubmitRsp->affectedRows += affectedRows; @@ -1404,7 +1404,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); if (code == 0) { atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); - tdProcessRSmaSubmit(pVnode->pSma, version, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); + tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); } // clear @@ -1419,7 +1419,7 @@ _exit: return code; } -static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; SDecoder coder = {0}; @@ -1439,20 +1439,20 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void * goto _err; } - if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) { + if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) { if (pRsp) pRsp->code = terrno; goto _err; } tDecoderClear(&coder); vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode), - req.indexName, req.indexUid, version, req.tableUid); + req.indexName, req.indexUid, ver, req.tableUid); return 0; _err: tDecoderClear(&coder); vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s", - TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr()); + TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr()); return -1; } @@ -1468,28 +1468,28 @@ int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) { return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL); } -static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t version) { +static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) { int32_t code = TSDB_CODE_SUCCESS; vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode), - pVnode->config.hashBegin, pVnode->config.hashEnd, version); + pVnode->config.hashBegin, pVnode->config.hashEnd, ver); // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd] return code; } -static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode)); int32_t code = TSDB_CODE_SUCCESS; if (!pVnode->config.hashChange) { goto _exit; } - code = vnodeConsolidateAlterHashRange(pVnode, version); + code = vnodeConsolidateAlterHashRange(pVnode, ver); if (code < 0) { vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), - version); + ver); goto _exit; } pVnode->config.hashChange = false; @@ -1503,7 +1503,7 @@ _exit: return code; } -static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { bool walChanged = false; bool tsdbChanged = false; @@ -1606,7 +1606,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void return 0; } -static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SBatchDeleteReq deleteReq; SDecoder decoder; tDecoderInit(&decoder, pReq, len); @@ -1626,7 +1626,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void int64_t uid = mr.me.uid; - int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); + int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); if (code < 0) { terrno = code; vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, @@ -1640,7 +1640,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void return 0; } -static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SDecoder *pCoder = &(SDecoder){0}; SDeleteRes *pRes = &(SDeleteRes){0}; @@ -1661,7 +1661,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0)); for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { - code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), + code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), pRes->skey, pRes->ekey); if (code) goto _err; } @@ -1682,7 +1682,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq _err: return code; } -static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder dc = {0}; @@ -1698,7 +1698,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void tDecoderClear(&dc); return -1; } - if (metaAddIndexToSTable(pVnode->pMeta, version, &req) < 0) { + if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) { pRsp->code = terrno; goto _err; } @@ -1708,7 +1708,7 @@ _err: tDecoderClear(&dc); return -1; } -static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { SDropIndexReq req = {0}; pRsp->msgType = TDMT_VND_DROP_INDEX_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -1720,21 +1720,21 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p return -1; } - if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) { + if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) { pRsp->code = terrno; return -1; } return TSDB_CODE_SUCCESS; } -extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp); +static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { + return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); } #ifndef TD_ENTERPRISE -int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { +int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index ec23f7b72413a50c306d08039dfd87105de39e16..b24fcade79a7068872f130e625aa4a4a6d52ebb1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2055,7 +2055,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = 1; } } else { - code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo, digest, pAPI); + code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 73a6532f73aeb9e9c196e59c1fc1204acac142b3..cad98348d04ec82a403e3b6e6330a32786a33f2f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -340,7 +340,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0); for (int32_t i = 0; i < numOfUids; ++i) { uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i); @@ -372,7 +372,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S if (pScanInfo->pTagCond != NULL) { bool qualified = false; STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid}; - code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified, pAPI); + code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI); if (code != TSDB_CODE_SUCCESS) { qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr); continue; @@ -437,7 +437,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI if (assignUid) { keyInfo.groupId = keyInfo.uid; } else { - code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, + code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, &keyInfo.groupId, &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(keyBuf); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index a48de1d1581a09d7dae289799827842de6c9b842..a684a6bf94169129fc09382cb04e9db8fceecf11 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -122,7 +122,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo SStorageAPI* pAPI = &pTaskInfo->storageAPI; - pAPI->metaReaderFn.initReader(&mr, pHandle->meta, 0); + pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0); int32_t code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8c880bb9b12184df2641725bf50fffb8451a3676..23e029e780ca4807e40ae33d4202ff74ccc68472 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -531,7 +531,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->meta, 0); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed. @@ -560,7 +560,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { - pHandle->api.metaReaderFn.initReader(&mr, pHandle->meta, 0); + pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -2556,7 +2556,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); @@ -3396,7 +3396,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca pRes->info.id.groupId = groupId; int64_t dbTableCount = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.meta, &dbTableCount); + pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &dbTableCount); fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes); setOperatorCompleted(pOperator); } @@ -3410,18 +3410,18 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO if (strlen(pSupp->dbNameFilter) != 0) { if (strlen(pSupp->stbNameFilter) != 0) { tb_uid_t uid = 0; - pAPI->metaFn.getTableUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter, &uid); + pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid); SMetaStbStats stats = {0}; ASSERT(0); -// metaGetStbStats(pInfo->readHandle.meta, uid, &stats); +// metaGetStbStats(pInfo->readHandle.vnode, uid, &stats); int64_t ctbNum = stats.ctbNum; fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes); } else { - int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.meta); + int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); } } else { - int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.meta); + int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode); pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); } @@ -3437,7 +3437,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName)); pRes->info.id.groupId = groupId; - int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.meta); + int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.vnode); ASSERT(0); if (ntbNum != 0) { fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes); @@ -3448,7 +3448,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) { char stbName[TSDB_TABLE_NAME_LEN] = {0}; ASSERT(0); -// metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName); +// metaGetTableSzNameByUid(pInfo->readHandle.vnode, stbUid, stbName); char fullStbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pSupp->groupByDbName) { @@ -3461,7 +3461,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S pRes->info.id.groupId = groupId; SMetaStbStats stats = {0}; -// metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats); +// metaGetStbStats(pInfo->readHandle.vnode, stbUid, &stats); int64_t ctbNum = stats.ctbNum; fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 76302a4eb456514a1be8a73d0826f963ae6fa7c7..a9450fb18366948b542dd7a9c31013d371c0f393 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -466,7 +466,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->req.filterTb); SMetaReader smrTable = {0}; - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -486,7 +486,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { if (smrTable.me.type == TSDB_CHILD_TABLE) { int64_t suid = smrTable.me.ctbEntry.suid; pAPI->metaReaderFn.clearReader(&smrTable); - pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0); code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -522,7 +522,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta); + pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); } if (pInfo->pSchema == NULL) { @@ -569,7 +569,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { schemaRow = *(SSchemaWrapper**)schema; } else { SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0); int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -658,7 +658,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, condTableName); SMetaReader smrChildTable = {0}; - pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0); int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrChildTable, condTableName); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly @@ -676,7 +676,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK); code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by pAPI->metaReaderFn.getTableEntryByUid @@ -702,7 +702,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta); + pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); } bool blockFull = false; @@ -715,7 +715,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); SMetaReader smrSuperTable = {0}; - pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1131,7 +1131,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { tb_uid_t* uid = taosArrayGet(pIdx->uids, i); SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, 0); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0); ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid); if (ret < 0) { pAPI->metaReaderFn.clearReader(&mr); @@ -1159,7 +1159,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); SMetaReader mr1 = {0}; - pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.meta, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK); int64_t suid = mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr1, suid); @@ -1292,7 +1292,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta); + pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); } blockDataCleanup(pInfo->pRes); @@ -1338,7 +1338,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); SMetaReader mr = {0}; - pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, META_READER_NOLOCK); + pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid); @@ -1486,7 +1486,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { } else { if (pInfo->showRewrite == false) { if (pCondition != NULL && pInfo->pIdx == NULL) { - SSTabFltArg arg = {.pMeta = pInfo->readHandle.meta, .pVnode = pInfo->readHandle.vnode}; + SSTabFltArg arg = {.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode}; SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); idx->init = 0; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 1a3a740b34918c695dfb79848f8df2c3500b0f3a..231e597724f7bb03c4e1a726bb9e59e18660d532 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -440,11 +440,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int64_t rId = msg.refId; int32_t eId = msg.execId; - SQWMsg qwMsg = {.node = node, - .msg = msg.msg, - .msgLen = msg.msgLen, - .connInfo = pMsg->info, - .msgType = pMsg->msgType}; + SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; qwMsg.msgInfo.explain = msg.explain; qwMsg.msgInfo.taskType = msg.taskType; qwMsg.msgInfo.needFetch = msg.needFetch;