提交 97a20ee8 编写于 作者: D dapan1121

update meta based on sversion

上级 0c382080
...@@ -170,6 +170,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); ...@@ -170,6 +170,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
*/ */
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName); int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName);
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables);
/** /**
* Force refresh a table's local cached meta data. * Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
......
...@@ -361,6 +361,17 @@ _return: ...@@ -361,6 +361,17 @@ _return:
return code; return code;
} }
void freeRequestRes(SRequestObj* pRequest, void* res) {
if (NULL == res) {
return;
}
if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) {
}
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
void* pRes = NULL; void* pRes = NULL;
......
...@@ -4087,10 +4087,8 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl ...@@ -4087,10 +4087,8 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1; if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1;
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) { if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
}
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1; if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1;
...@@ -4104,12 +4102,10 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { ...@@ -4104,12 +4102,10 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1; if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) { if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); if (NULL == pBlock->tblFName) return -1;
if (NULL == pBlock->tblFName) return -1; if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
}
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
......
...@@ -320,6 +320,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -320,6 +320,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1; return -1;
} }
strcat(pRsp->tblFName, mr.me.name);
if (mr.me.type == TSDB_NORMAL_TABLE) { if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver; sverNew = mr.me.ntbEntry.schema.sver;
} else { } else {
......
...@@ -676,6 +676,9 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -676,6 +676,9 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
tDecoderClear(&decoder); tDecoderClear(&decoder);
} else {
submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
} }
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
......
...@@ -2883,8 +2883,107 @@ _return: ...@@ -2883,8 +2883,107 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) {
*sver = -1;
if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t tbType = 0;
uint64_t suid = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
if (tbMeta) {
tbType = tbMeta->tableType;
suid = tbMeta->suid;
if (tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion;
}
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == tbMeta) {
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
if (tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("stb not in stbCache, suid:%"PRIx64, suid);
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*sver = (*stbMeta)->sversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTables) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SName name;
int32_t sver = 0;
int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) {
continue;
}
ctgGetTbSverFromCache(pCtg, &name, &sver);
if (sver >= 0 && sver < pTb->sver) {
catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE
}
}
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册