提交 2eee0c99 编写于 作者: H Hongze Cheng

fix: meta get latest schema

上级 d5e07c66
...@@ -85,7 +85,7 @@ struct STable { ...@@ -85,7 +85,7 @@ struct STable {
#define TABLE_TID(t) (t)->tid #define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid #define TABLE_UID(t) (t)->uid
int tsdbPrepareCommit(STsdb *pTsdb); int tsdbPrepareCommit(STsdb *pTsdb);
typedef enum { typedef enum {
TSDB_FILE_HEAD = 0, // .head TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data TSDB_FILE_DATA, // .data
...@@ -181,7 +181,6 @@ int tsdbUnlockRepo(STsdb *pTsdb); ...@@ -181,7 +181,6 @@ int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
int32_t version) { int32_t version) {
if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) { if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) {
taosMemoryFreeClear(pTable->pSchema); taosMemoryFreeClear(pTable->pSchema);
pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
......
...@@ -178,6 +178,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -178,6 +178,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
if (me.type == TSDB_SUPER_TABLE) { if (me.type == TSDB_SUPER_TABLE) {
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow); pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
} else if (me.type == TSDB_NORMAL_TABLE) { } else if (me.type == TSDB_NORMAL_TABLE) {
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -84,8 +84,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -84,8 +84,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
static void tsdbResetCommitTable(SCommitH *pCommith); static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
TSKEY maxKey, int maxRows, int8_t update); SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
...@@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { ...@@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pTbData = (STbData *)pNode->pData; pTbData = (STbData *)pNode->pData;
pCommitIter = pCommith->iters + i; pCommitIter = pCommith->iters + i;
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 1); // TODO: schema version pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); // TODO: schema version
if (pTSchema) { if (pTSchema) {
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
...@@ -948,7 +948,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { ...@@ -948,7 +948,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
} }
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith),pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1);
pCommith->pTable = pTable; pCommith->pTable = pTable;
...@@ -1422,8 +1422,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -1422,8 +1422,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
int biter = 0; int biter = 0;
while (true) { while (true) {
tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols,
pCfg->update); keyLimit, defaultRows, pCfg->update);
if (pCommith->pDataCols->numOfRows == 0) break; if (pCommith->pDataCols->numOfRows == 0) break;
...@@ -1447,8 +1447,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -1447,8 +1447,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
return 0; return 0;
} }
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
TSKEY maxKey, int maxRows, int8_t update) { SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX; TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX; TSKEY key2 = INT64_MAX;
TSKEY lastKey = TSKEY_INITIAL_VAL; TSKEY lastKey = TSKEY_INITIAL_VAL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册