未验证 提交 2f19dc4b 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #13240 from taosdata/feature/TD-11274-3.0

fix: support fetching specific schema version from table
...@@ -605,6 +605,10 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols ...@@ -605,6 +605,10 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
* @param pCols * @param pCols
*/ */
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printf("%s:%d ts: %" PRIi64 " sver:%d maxCols:%" PRIi16 " nCols:%" PRIi16 ", nRows:%d\n", __func__, __LINE__,
TD_ROW_KEY(pRow), TD_ROW_SVER(pRow), pCols->maxCols, pCols->numOfCols, pCols->numOfRows);
#endif
if (TD_IS_TP_ROW(pRow)) { if (TD_IS_TP_ROW(pRow)) {
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge); return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
} else if (TD_IS_KV_ROW(pRow)) { } else if (TD_IS_KV_ROW(pRow)) {
......
...@@ -79,7 +79,8 @@ struct STsdb { ...@@ -79,7 +79,8 @@ struct STsdb {
struct STable { struct STable {
uint64_t tid; uint64_t tid;
uint64_t uid; uint64_t uid;
STSchema *pSchema; STSchema *pSchema; // latest schema
STSchema *pCacheSchema; // cached cache
}; };
#define TABLE_TID(t) (t)->tid #define TABLE_TID(t) (t)->tid
...@@ -181,12 +182,15 @@ int tsdbUnlockRepo(STsdb *pTsdb); ...@@ -181,12 +182,15 @@ int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
int32_t version) { int32_t version) {
if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) { if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) {
taosMemoryFreeClear(pTable->pSchema); return pTable->pSchema;
pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
} }
return pTable->pSchema; if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) {
taosMemoryFreeClear(pTable->pCacheSchema);
pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
}
return pTable->pCacheSchema;
} }
// tsdbMemTable.h // tsdbMemTable.h
......
...@@ -300,7 +300,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -300,7 +300,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW = metaGetTableSchema(pMeta, quid, sver, 0); pSW = metaGetTableSchema(pMeta, quid, sver, 0);
if (!pSW) return NULL; if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, sver); tdInitTSchemaBuilder(&sb, pSW->version);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes); tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
......
...@@ -441,7 +441,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb ...@@ -441,7 +441,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 1); STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { ...@@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pTbData = (STbData *)pNode->pData; pTbData = (STbData *)pNode->pData;
pCommitIter = pCommith->iters + i; pCommitIter = pCommith->iters + i;
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); // TODO: schema version pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1);
if (pTSchema) { if (pTSchema) {
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
...@@ -475,7 +475,8 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { ...@@ -475,7 +475,8 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
pCommitIter->pTable->uid = pTbData->uid; pCommitIter->pTable->uid = pTbData->uid;
pCommitIter->pTable->tid = pTbData->uid; pCommitIter->pTable->tid = pTbData->uid;
pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); pCommitIter->pTable->pSchema = pTSchema;
pCommitIter->pTable->pCacheSchema = NULL;
} }
} }
tSkipListDestroyIter(pSlIter); tSkipListDestroyIter(pSlIter);
...@@ -490,6 +491,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { ...@@ -490,6 +491,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
tSkipListDestroyIter(pCommith->iters[i].pIter); tSkipListDestroyIter(pCommith->iters[i].pIter);
if (pCommith->iters[i].pTable) { if (pCommith->iters[i].pTable) {
tdFreeSchema(pCommith->iters[i].pTable->pSchema); tdFreeSchema(pCommith->iters[i].pTable->pSchema);
tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema);
taosMemoryFreeClear(pCommith->iters[i].pTable); taosMemoryFreeClear(pCommith->iters[i].pTable);
} }
} }
...@@ -914,7 +916,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { ...@@ -914,7 +916,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
while (bidx < nBlocks) { while (bidx < nBlocks) {
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) { if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
// Set commit table // Set commit table
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1); // TODO: schema version
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册