diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a29dc3a90afa6b50b6e34f1eb42f4e49dfdddacf..f9d8fc0de19fccd48a7d3e115f7e33e3a328ef03 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -170,6 +170,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); */ int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName); +int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables); + /** * Force refresh a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5d3a139aeb92c74861bec174d6110b9b3715df75..c3ccecb0d364b5b713ed9f2f6b7b4787ef274cc1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -361,6 +361,17 @@ _return: return code; } +void freeRequestRes(SRequestObj* pRequest, void* res) { + if (NULL == res) { + return; + } + + if (TDMT_VND_SUBMIT == pRequest->type) { + tFreeSSubmitRsp((SSubmitRsp*)res); + } else if (TDMT_VND_QUERY == pRequest->type) { + + } +} SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { void* pRes = NULL; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4e97ebbe47e3179b1e698837ff8095e28ad40cbb..278fc2c193830dfa199f07c4598c80ce8576b4ec 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4087,10 +4087,8 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1; if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; - if (pBlock->hashMeta) { - if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; - if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1; - } + if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; + if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1; @@ -4104,12 +4102,10 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1; if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; - if (pBlock->hashMeta) { - if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; - pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); - if (NULL == pBlock->tblFName) return -1; - if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; - } + if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; + pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); + if (NULL == pBlock->tblFName) return -1; + if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 2fdbfdd2064dbc78c713df891433e02c7aba57cd..d8426db12719f4bc27915c07b6ec9e5235b5e47c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -320,6 +320,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } + strcat(pRsp->tblFName, mr.me.name); + if (mr.me.type == TSDB_NORMAL_TABLE) { sverNew = mr.me.ntbEntry.schema.sver; } else { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 464331319e893a8115c122a9901fd79f5e8a2673..371ade4f764112fc93a492893c697e9d347f3ea1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -676,6 +676,9 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); tDecoderClear(&decoder); + } else { + submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN); + sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname); } if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a620b84f6d1495336914cf4f9848fbb5928ebf32..23957d1a6bfdc0cd87dd6f94727d0e1dc76adb57 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -2883,8 +2883,107 @@ _return: CTG_API_LEAVE(code); } + + +int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) { + *sver = -1; + + if (NULL == pCtg->dbCache) { + ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname); + return TSDB_CODE_SUCCESS; + } + + SCtgDBCache *dbCache = NULL; + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); + + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + if (NULL == dbCache) { + ctgDebug("db %s not in cache", pTableName->tname); + return TSDB_CODE_SUCCESS; + } + + int32_t tbType = 0; + uint64_t suid = 0; + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); + if (tbMeta) { + tbType = tbMeta->tableType; + suid = tbMeta->suid; + if (tbType != TSDB_CHILD_TABLE) { + *sver = tbMeta->sversion; + } + } + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + + if (NULL == tbMeta) { + ctgReleaseDBCache(pCtg, dbCache); + return TSDB_CODE_SUCCESS; + } + + if (tbType != TSDB_CHILD_TABLE) { + ctgReleaseDBCache(pCtg, dbCache); + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + + return TSDB_CODE_SUCCESS; + } + + ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid); + + CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock); + + STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid)); + if (NULL == stbMeta || NULL == *stbMeta) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); + ctgReleaseDBCache(pCtg, dbCache); + ctgDebug("stb not in stbCache, suid:%"PRIx64, suid); + return TSDB_CODE_SUCCESS; + } + + if ((*stbMeta)->suid != suid) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); + ctgReleaseDBCache(pCtg, dbCache); + ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + *sver = (*stbMeta)->sversion; + + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); + + ctgReleaseDBCache(pCtg, dbCache); + + ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname); + + return TSDB_CODE_SUCCESS; +} + + int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTables) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + SName name; + int32_t sver = 0; + int32_t tbNum = taosArrayGetSize(pTables); + for (int32_t i = 0; i < tbNum; ++i) { + STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i); + tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + if (CTG_IS_SYS_DBNAME(name.dbname)) { + continue; + } + + ctgGetTbSverFromCache(pCtg, &name, &sver); + if (sver >= 0 && sver < pTb->sver) { + catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE + } + } + + CTG_API_LEAVE(TSDB_CODE_SUCCESS); }