未验证 提交 6fbde2d1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17254 from taosdata/fix/TD-19416

fix(meta): use rwlock to favor writers
...@@ -107,8 +107,8 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR ...@@ -107,8 +107,8 @@ int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pR
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema); int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage); int metaAlterCache(SMeta* pMeta, int32_t nPage);
...@@ -116,8 +116,8 @@ int metaAlterCache(SMeta* pMeta, int32_t nPage); ...@@ -116,8 +116,8 @@ int metaAlterCache(SMeta* pMeta, int32_t nPage);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name); tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int64_t metaGetTbNum(SMeta* pMeta); int64_t metaGetTbNum(SMeta* pMeta);
int64_t metaGetTimeSeriesNum(SMeta* pMeta); int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid); SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur); void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid); SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseStbCursor(SMStbCursor* pStbCur); void metaCloseStbCursor(SMStbCursor* pStbCur);
......
...@@ -297,15 +297,16 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { ...@@ -297,15 +297,16 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
return 0; return 0;
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
void *pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int64_t version; int64_t version;
SSchemaWrapper schema = {0}; SSchemaWrapper schema = {0};
SSchemaWrapper *pSchema = NULL; SSchemaWrapper *pSchema = NULL;
SDecoder dc = {0}; SDecoder dc = {0};
if (lock) {
metaRLock(pMeta); metaRLock(pMeta);
}
_query: _query:
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) { if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
goto _err; goto _err;
...@@ -384,13 +385,17 @@ _query: ...@@ -384,13 +385,17 @@ _query:
_exit: _exit:
tDecoderClear(&dc); tDecoderClear(&dc);
metaULock(pMeta); if (lock) {
metaULock(pMeta);
}
tdbFree(pData); tdbFree(pData);
return pSchema; return pSchema;
_err: _err:
tDecoderClear(&dc); tDecoderClear(&dc);
metaULock(pMeta); if (lock) {
metaULock(pMeta);
}
tdbFree(pData); tdbFree(pData);
return NULL; return NULL;
} }
...@@ -436,7 +441,7 @@ struct SMCtbCursor { ...@@ -436,7 +441,7 @@ struct SMCtbCursor {
int vLen; int vLen;
}; };
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
SMCtbCursor *pCtbCur = NULL; SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey; SCtbIdxKey ctbIdxKey;
int ret = 0; int ret = 0;
...@@ -449,7 +454,9 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { ...@@ -449,7 +454,9 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
pCtbCur->pMeta = pMeta; pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid; pCtbCur->suid = uid;
metaRLock(pMeta); if (lock) {
metaRLock(pMeta);
}
ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) { if (ret < 0) {
...@@ -469,9 +476,9 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { ...@@ -469,9 +476,9 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return pCtbCur; return pCtbCur;
} }
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) { void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
if (pCtbCur) { if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta); if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) { if (pCtbCur->pCur) {
tdbTbcClose(pCtbCur->pCur); tdbTbcClose(pCtbCur->pCur);
...@@ -566,14 +573,14 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) { ...@@ -566,14 +573,14 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
return *(tb_uid_t *)pStbCur->pKey; return *(tb_uid_t *)pStbCur->pKey;
} }
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
// SMetaReader mr = {0}; // SMetaReader mr = {0};
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0}; STSchemaBuilder sb = {0};
SSchema *pSchema; SSchema *pSchema;
pSW = metaGetTableSchema(pMeta, uid, sver, 0); pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL; if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, pSW->version); tdInitTSchemaBuilder(&sb, pSW->version);
...@@ -1181,7 +1188,7 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHas ...@@ -1181,7 +1188,7 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHas
} }
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) { int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid); SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
SHashObj *uHash = NULL; SHashObj *uHash = NULL;
size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids
...@@ -1208,7 +1215,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj ...@@ -1208,7 +1215,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
} }
taosHashCleanup(uHash); taosHashCleanup(uHash);
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -411,7 +411,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -411,7 +411,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1, 1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _err; goto _err;
......
...@@ -188,7 +188,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -188,7 +188,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto _err; goto _err;
} }
pTsmaStat->pTSma = pTSma; pTsmaStat->pTSma = pTSma;
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1); pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1);
if (!pTsmaStat->pTSchema) { if (!pTsmaStat->pTSchema) {
smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno)); indexUid, tstrerror(terrno));
...@@ -204,8 +204,9 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -204,8 +204,9 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, SSubmitReq *pSubmitReq =
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq); tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
...@@ -413,7 +413,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { ...@@ -413,7 +413,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
pReader->cachedSchemaSuid != pReader->msgIter.suid) { pReader->cachedSchemaSuid != pReader->msgIter.suid) {
if (pReader->pSchema) taosMemoryFree(pReader->pSchema); if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion); pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
...@@ -423,7 +423,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { ...@@ -423,7 +423,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
} }
if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper); if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->msgIter.uid, pReader->cachedSchemaVer); pReader->msgIter.uid, pReader->cachedSchemaVer);
......
...@@ -182,7 +182,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST ...@@ -182,7 +182,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
if (row->ts == cacheRow->ts) { if (row->ts == cacheRow->ts) {
STSRow *mergedRow = NULL; STSRow *mergedRow = NULL;
SRowMerger merger = {0}; SRowMerger merger = {0};
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema); tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema);
...@@ -249,7 +249,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb ...@@ -249,7 +249,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
getTableCacheKey(uid, 1, key, &keyLen); getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) { if (h) {
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
TSKEY keyTs = row->ts; TSKEY keyTs = row->ts;
bool invalidate = false; bool invalidate = false;
...@@ -418,9 +418,9 @@ typedef enum { ...@@ -418,9 +418,9 @@ typedef enum {
} SFSLASTNEXTROWSTATES; } SFSLASTNEXTROWSTATES;
typedef struct { typedef struct {
SFSLASTNEXTROWSTATES state; // [input] SFSLASTNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input] STsdb *pTsdb; // [input]
STSchema *pTSchema;// [input] STSchema *pTSchema; // [input]
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
int32_t nFileSet; int32_t nFileSet;
...@@ -456,10 +456,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -456,10 +456,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err; if (code) goto _err;
SSttBlockLoadInfo* pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0); SSttBlockLoadInfo *pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0);
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo,true, NULL); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL);
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
...@@ -1034,7 +1034,7 @@ _err: ...@@ -1034,7 +1034,7 @@ _err:
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
int32_t code = 0; int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nCol = pTSchema->numOfCols; int16_t nCol = pTSchema->numOfCols;
int16_t iCol = 0; int16_t iCol = 0;
int16_t noneCol = 0; int16_t noneCol = 0;
...@@ -1131,7 +1131,7 @@ _err: ...@@ -1131,7 +1131,7 @@ _err:
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
int32_t code = 0; int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nCol = pTSchema->numOfCols; int16_t nCol = pTSchema->numOfCols;
int16_t iCol = 0; int16_t iCol = 0;
int16_t noneCol = 0; int16_t noneCol = 0;
......
...@@ -79,7 +79,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList ...@@ -79,7 +79,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
} }
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
......
...@@ -78,7 +78,7 @@ typedef struct SBlockLoadSuppInfo { ...@@ -78,7 +78,7 @@ typedef struct SBlockLoadSuppInfo {
SArray* pColAgg; SArray* pColAgg;
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
SColumnDataAgg** plist; SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data int16_t* colIds; // column ids for loading file block data
int32_t numOfCols; int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
...@@ -355,7 +355,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb ...@@ -355,7 +355,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
if (pLReader->pInfo == NULL) { if (pLReader->pInfo == NULL) {
// here we ignore the first column, which is always be the primary timestamp column // here we ignore the first column, which is always be the primary timestamp column
pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1); pLReader->pInfo =
tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
if (pLReader->pInfo == NULL) { if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno; return terrno;
...@@ -841,13 +842,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -841,13 +842,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) { if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pBlock->minKey.ts) { if (asc && pReader->window.skey <= pBlock->minKey.ts) {
//pDumpInfo->rowIndex = 0; // pDumpInfo->rowIndex = 0;
} else } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { // pDumpInfo->rowIndex = pBlock->nRow - 1;
//pDumpInfo->rowIndex = pBlock->nRow - 1;
} else { } else {
int32_t pos = asc ? pBlock->nRow - 1 : 0; int32_t pos = asc ? pBlock->nRow - 1 : 0;
int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
...@@ -959,12 +958,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -959,12 +958,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = uid}; TABLEID tid = {.suid = pReader->suid, .uid = uid};
int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols-1); int32_t code =
tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1495,7 +1496,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas ...@@ -1495,7 +1496,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema // always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
} }
if (pReader->pSchema && sversion == pReader->pSchema->version) { if (pReader->pSchema && sversion == pReader->pSchema->version) {
...@@ -2011,9 +2012,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan ...@@ -2011,9 +2012,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
w.ekey = pScanInfo->lastKey + step; w.ekey = pScanInfo->lastKey + step;
} }
int32_t code = int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, pReader->idStr); pLBlockReader->pInfo, false, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
...@@ -3368,14 +3369,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3368,14 +3369,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
if (pCond->suid != 0) { if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:%" PRId64 " , %s", pReader->suid, -1, tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:%" PRId64 " , %s", pReader->suid, -1,
pReader->idStr); pReader->idStr);
} }
} else if (taosArrayGetSize(pTableList) > 0) { } else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0); STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:%" PRId64 " , %s", pKey->uid, -1, pReader->idStr); tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:%" PRId64 " , %s", pKey->uid, -1, pReader->idStr);
} }
...@@ -3908,7 +3909,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3908,7 +3909,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
} }
metaReaderClear(&mr); metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion); *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -393,7 +393,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { ...@@ -393,7 +393,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
} }
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid); SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1);
while (1) { while (1) {
tb_uid_t id = metaCtbCursorNext(pCur); tb_uid_t id = metaCtbCursorNext(pCur);
...@@ -405,7 +405,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { ...@@ -405,7 +405,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -413,7 +413,7 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo ...@@ -413,7 +413,7 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
return 0; return 0;
} }
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) { int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid); SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 1);
while (1) { while (1) {
tb_uid_t id = metaCtbCursorNext(pCur); tb_uid_t id = metaCtbCursorNext(pCur);
...@@ -424,7 +424,7 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) { ...@@ -424,7 +424,7 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
taosArrayPush(list, &id); taosArrayPush(list, &id);
} }
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -448,7 +448,7 @@ int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) { ...@@ -448,7 +448,7 @@ int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
} }
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid); SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 0);
if (!pCur) { if (!pCur) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -463,12 +463,12 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { ...@@ -463,12 +463,12 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
++(*num); ++(*num);
} }
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
STSchema *pTSchema = metaGetTbTSchema(pVnode->pMeta, suid, -1); STSchema *pTSchema = metaGetTbTSchema(pVnode->pMeta, suid, -1, 0);
// metaGetTbTSchemaEx(pVnode->pMeta, suid, suid, -1, &pTSchema); // metaGetTbTSchemaEx(pVnode->pMeta, suid, suid, -1, &pTSchema);
if (pTSchema) { if (pTSchema) {
......
...@@ -770,7 +770,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, ...@@ -770,7 +770,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
if (pSchema) { if (pSchema) {
taosMemoryFreeClear(pSchema); taosMemoryFreeClear(pSchema);
} }
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row)); // TODO: use the real schema pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1); // TODO: use the real schema
if (pSchema) { if (pSchema) {
suid = msgIter->suid; suid = msgIter->suid;
rv = TD_ROW_SVER(blkIter.row); rv = TD_ROW_SVER(blkIter.row);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册