diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 4d289147d03840c7d8edf2868d90682d32b2db75..04f67b584faca5e50fbd0d7e2fc835b497e8fddb 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -57,7 +57,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted); /** * Set multiple input data blocks for the stream scan. @@ -67,7 +67,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); * @param type * @return */ -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted); /** * Update the table id list, add or remove. diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index f304e3153ded7f26c586bffcd7f2f249480662aa..1e63dd983300ec1b644a3a64d1adb3d2449a479c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -453,7 +453,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt } } if (pStb->ast2Len > 0) { - int32_t qmsgLen2 = 0; if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &pRSmaParam->qmsg2, &pRSmaParam->qmsg2Len) != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pRSmaParam->pFuncIds); taosMemoryFreeClear(pRSmaParam->qmsg1); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2b52d333dad3193841ee107c36ccf2e5a68438f3..9f80f0beffde0b670d0c99505422e025343d17c1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -108,6 +108,7 @@ void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); +int32_t tqReadHandleSetMsgEx(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index fb875a46e0a0d0f785b5b2df0977398e59573604..2d417fd6260a6191ca3384aef44f84c585c916f0 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -51,7 +51,7 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64() #define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE -int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); +int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlHandle *pHandle); int metaDropTable(SMeta* pMeta, tb_uid_t uid); int metaCommit(SMeta* pMeta); int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); @@ -74,7 +74,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); +int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlHandle *pHandle); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 322ba215ad04d9f035c6154eec599225ddbcf34a..3aad11a6b5527fe9f75317c7b2b9e050e05b3789 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -24,8 +24,37 @@ extern "C" { #endif +typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2); + +struct STbDdlHandle { + void *ahandle; + void *result; + __tb_ddl_fn_t fp; +}; + +typedef struct { + tb_uid_t suid; + SArray *tbUids; + SHashObj *uidHash; +} STbUidStore; + +static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { + ASSERT(*pStore == NULL); + *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); + if (*pStore == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid); +void *tsdbUidStoreFree(STbUidStore *pStore); + int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); -int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, SSubmitReq *pMsg); +int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid); +int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore); +int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 23a6d97a66f901a3fb7aa8f3917a60d8bd96b59b..5566681dc018bf47db9135a5519afb4c1114787b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,6 +102,8 @@ struct SVnode { #define TD_VID(PVNODE) (PVNODE)->config.vgId +typedef struct STbDdlHandle STbDdlHandle; + // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index 149a7ca3cb0a72a5908dac3ac282ea5cb5aaf701..96e2a34b7bf0e100994a688bfd180ea012acc427 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -248,7 +248,7 @@ void metaCloseDB(SMeta *pMeta) { } } -int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { +int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlHandle *pHandle) { tb_uid_t uid; SMetaDB *pMetaDb; void *pKey; @@ -347,6 +347,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { if (ret < 0) { return -1; } + // child table handle for rsma + if (pHandle && pHandle->fp) { + if (((*pHandle->fp)(pHandle->ahandle, &pHandle->result, &ctbIdxKey.suid, &uid)) < 0) { + return -1; + }; + } } else if (pTbCfg->type == META_NORMAL_TABLE) { pKey = &uid; kLen = sizeof(uid); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7f06ba88553b0e211db7f79f0e09d87dc2958277..32e7c24b6475f02d851469e6522208159b2c6c1e 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -15,7 +15,7 @@ #include "vnodeInt.h" -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg, STbDdlHandle *pHandle) { // Validate the tbOptions // if (metaValidateTbCfg(pMeta, pTbCfg) < 0) { // // TODO: handle error @@ -24,7 +24,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { // TODO: add atomicity - if (metaSaveTableToDB(pMeta, pTbCfg) < 0) { + if (metaSaveTableToDB(pMeta, pTbCfg, pHandle) < 0) { // TODO: handle error return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 510dd324599e44858304c3707f5b1005d1b77a2b..1d693ddff5d1c7eb5ff60bf7135247c137288fe4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -287,7 +287,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -450,7 +450,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; qTaskInfo_t task = pTopic->buffer.output[workerId].task; ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock = NULL; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 02ce6c4aad299c1c569b76a581691bbb91bd1a49..3cdd88a597dc693ad86d9d2b6e7ac7aacbbe0736 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -56,6 +56,22 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t return 0; } +int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { + pReadHandle->pMsg = pMsg; + + // iterate + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; + if (pReadHandle->pBlock == NULL) break; + } + + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + pReadHandle->ver = ver; + memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); + return 0; +} + bool tqNextDataBlock(STqReadHandle* pHandle) { while (1) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 34d41b1c79ddf01b95ce706d5edaffc2664d15d4..4722e27391d53295a375fb77740dae23f06c43f2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -101,23 +101,24 @@ typedef struct { STSma *pSma; // cache schema } SSmaStatItem; +#define RSMA_MAX_LEVEL 2 +#define RSMA_TASK_INFO_HASH_SLOT 8 +struct SRSmaInfo { + void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t +}; + struct SSmaStat { union { SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; + SRSmaInfo rsmaInfo; T_REF_DECLARE() }; #define SMA_STAT_ITEMS(s) ((s)->smaStatItems) #define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) -#define RSMA_MAX_LEVEL 2 -#define RSMA_TASK_INFO_HASH_SLOT 8 -struct SRSmaInfo { - void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t -}; - static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); @@ -1120,9 +1121,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); - // key: skey + groupId - char smaKey[SMA_KEY_LEN] = {0}; - char dataBuf[512] = {0}; + char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId + char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer? void *pDataBuf = NULL; int32_t sz = taosArrayGetSize(pDataBlocks); for (int32_t i = 0; i < sz; ++i) { @@ -1716,6 +1716,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; + SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo; TASSERT(pEnv != NULL && pStat != NULL); @@ -1749,6 +1750,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { taosMemoryFree(pReadHandle); return TSDB_CODE_FAILED; } + tRSmaInfo->taskInfo[0] = pRSmaInfo->taskInfo[0]; } if (param->qmsg2) { @@ -1758,6 +1760,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { taosMemoryFree(pReadHandle); return TSDB_CODE_FAILED; } + tRSmaInfo->taskInfo[1] = pRSmaInfo->taskInfo[1]; } if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != @@ -1768,6 +1771,151 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { return TSDB_CODE_SUCCESS; } +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid) { + if ((suid == pStore->suid) || (pStore->suid == 0)) { + if (pStore->tbUids == NULL) { + if ((pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t))) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pStore->suid = suid; + } + if (taosArrayPush(pStore->tbUids, &uid) == NULL) { + return TSDB_CODE_FAILED; + } + } else { + if (pStore->uidHash == NULL) { + pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pStore->uidHash == NULL) { + return TSDB_CODE_FAILED; + } + } + SArray *hashVal = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); + if (hashVal && ((hashVal = *(SArray **)hashVal) != NULL)) { + taosArrayPush(hashVal, &uid); + } else { + SArray *pItem = taosArrayInit(1, sizeof(tb_uid_t)); + if (pItem == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (taosArrayPush(pItem, &uid) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pItem, sizeof(pItem)) != 0) { + return TSDB_CODE_FAILED; + } + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { + SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb); + if (!pEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SHashObj *infoHash = NULL; + if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) { + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + return TSDB_CODE_FAILED; + } + + // info cached when create rsma stable + if (!taosHashGet(infoHash, suid, sizeof(suid))) { + return TSDB_CODE_SUCCESS; + } + + if (*ppStore == NULL) { + if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) { + return TSDB_CODE_FAILED; + } + } + if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, *(tb_uid_t *)uid) != 0) { + *ppStore = tsdbUidStoreFree(*ppStore); + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +void *tsdbUidStoreFree(STbUidStore *pStore) { + if (pStore) { + taosArrayDestroy(pStore->tbUids); + if (pStore->uidHash) { + void *pIter = taosHashIterate(pStore->uidHash, NULL); + while (pIter != NULL) { + SArray *arr = *(SArray **)pIter; + taosArrayDestroy(arr); + pIter = taosHashIterate(pStore->uidHash, pIter); + } + } + taosMemoryFree(pStore); + } + return NULL; +} + +int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore) { + if (!pUidStore || (taosArrayGetSize(pUidStore->tbUids) == 0)) { + tsdbDebug("vgId:%d empty uidStore and no need to update", REPO_ID(pTsdb)); + return TSDB_CODE_SUCCESS; + } + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = &pStat->rsmaInfo; + + if (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], pUidStore->tbUids, true) != 0) { + tsdbUidStoreFree(pUidStore); + tsdbError("vgId:%d update tbUidList failed since %s", REPO_ID(pTsdb), terrstr(terrno)); + return TSDB_CODE_FAILED; + } + + tsdbUidStoreFree(pUidStore); + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType) { + SArray *pResult = NULL; + + pResult = taosArrayInit(0, sizeof(SSDataBlock)); + if (pResult == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = NULL; + + TASSERT(pEnv != NULL && pStat != NULL); + + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + // pRSmaInfo = taosHashGet(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t)); + // if (pRSmaInfo != NULL) { + // pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo); + // } + + SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo; + + qSetStreamInput(tRSmaInfo->taskInfo[0], pMsg, inputType, true); + while (1) { + SSDataBlock *output; + uint64_t ts; + if (qExecTask(tRSmaInfo->taskInfo[0], &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) { + break; + } + taosArrayPush(pResult, output); + } + } + blockDebugShowData(pResult); + return TSDB_CODE_SUCCESS; +} + #if 0 /** * @brief Get the start TS key of the last data block of one interval/sliding. diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a81bbf4298f649d50acc446989b580942c8d7e54..310a2751dd486439cdc6f8e9afb5cb0a67683483 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -79,6 +79,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_SUBMIT: pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); + tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); break; case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -112,7 +113,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA - if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO } @@ -210,7 +210,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(pReq, &vCreateTbReq); - if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { + if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq), NULL) < 0) { // TODO return -1; } @@ -235,6 +235,13 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray); + + STbDdlHandle ddlHandle = { + .ahandle = pVnode->pTsdb, + .result = NULL, + .fp = tsdbFetchTbUidList, + }; + for (int i = 0; i < reqNum; i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); @@ -250,7 +257,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } - if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + if (metaCreateTable(pVnode->pMeta, pCreateTbReq, &ddlHandle) < 0) { // TODO: handle error vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); } @@ -286,6 +293,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR pRsp->contLen = contLen; } + tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result); + return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4863b03fb9f69fcba57bea23b359081fd857568f..8f7f55eb33d5c6fe731e164353d5ab1bc743d2cc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,7 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id, bool converted) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -32,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id, converted); } else { pOperator->status = OP_NOT_OPENED; @@ -46,8 +46,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("submit msg messed up when initing stream block, %s" PRIx64, id); + if (converted) { + if (tqReadHandleSetMsgEx(pInfo->readerHandle, input, 0) < 0) { + qError("converted: submit msg messed up when initing stream block, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + } else if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + qError("raw msg: submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } } else { @@ -67,11 +72,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { - return qSetMultiStreamInput(tinfo, input, 1, type); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted) { + return qSetMultiStreamInput(tinfo, input, 1, type, converted); } -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -82,7 +87,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo), converted); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 8aaaa414ca265f31f09f26f877b2ec3dea7ce5b3..1f0c269ca14bd41364e63de41b1d2e6cbd3cf962 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -110,7 +110,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in return -1; } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - qSetStreamInput(exec, input, inputType); + qSetStreamInput(exec, input, inputType, true); while (1) { SSDataBlock* output; uint64_t ts; @@ -128,7 +128,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in /*for (int32_t i = 0; i < sz; i++) {*/ /*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/ /*qSetStreamInput(exec, pBlock, inputType);*/ - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, true); while (1) { SSDataBlock* output; uint64_t ts;