提交 80d780eb 编写于 作者: C Cary Xu

fix: use schema version of row when commit

上级 42cc609a
...@@ -40,8 +40,8 @@ typedef struct STable STable; ...@@ -40,8 +40,8 @@ typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
// tsdbCommit ================ // tsdbCommit ================
...@@ -179,7 +179,14 @@ struct STsdbFS { ...@@ -179,7 +179,14 @@ struct STsdbFS {
int tsdbLockRepo(STsdb *pTsdb); int tsdbLockRepo(STsdb *pTsdb);
int tsdbUnlockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
int32_t version) {
if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) {
taosMemoryFreeClear(pTable->pSchema);
pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
}
return pTable->pSchema; return pTable->pSchema;
} }
......
...@@ -84,7 +84,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -84,7 +84,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
static void tsdbResetCommitTable(SCommitH *pCommith); static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
...@@ -301,7 +301,8 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { ...@@ -301,7 +301,8 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
SCommitIter *pIter = pCommith->iters + i; SCommitIter *pIter = pCommith->iters + i;
if (pIter->pTable == NULL || pIter->pIter == NULL) continue; if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0,
true, NULL);
} }
} }
...@@ -947,7 +948,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { ...@@ -947,7 +948,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
} }
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith),pTable, false, false, -1);
pCommith->pTable = pTable; pCommith->pTable = pTable;
...@@ -1254,8 +1255,8 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi ...@@ -1254,8 +1255,8 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
SBlock block; SBlock block;
while (true) { while (true) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, defaultRows,
pCfg->update, &mInfo); pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo);
if (pCommith->pDataCols->numOfRows <= 0) break; if (pCommith->pDataCols->numOfRows <= 0) break;
...@@ -1298,8 +1299,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { ...@@ -1298,8 +1299,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
SSkipListIterator titer = *(pIter->pIter); SSkipListIterator titer = *(pIter->pIter);
if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1; if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1;
tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL,
pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
&mInfo);
if (mInfo.nOperations == 0) { if (mInfo.nOperations == 0) {
// no new data to insert (all updates denied) // no new data to insert (all updates denied)
...@@ -1313,9 +1315,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { ...@@ -1313,9 +1315,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
*(pIter->pIter) = titer; *(pIter->pIter) = titer;
} else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
// Add a sub-block // Add a sub-block
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, INT32_MAX,
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData,
&mInfo); pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
if (pBlock->last) { if (pBlock->last) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith); pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
} else { } else {
...@@ -1420,7 +1422,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -1420,7 +1422,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
int biter = 0; int biter = 0;
while (true) { while (true) {
tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
pCfg->update); pCfg->update);
if (pCommith->pDataCols->numOfRows == 0) break; if (pCommith->pDataCols->numOfRows == 0) break;
...@@ -1445,7 +1447,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols ...@@ -1445,7 +1447,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
return 0; return 0;
} }
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update) { TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX; TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX; TSKEY key2 = INT64_MAX;
...@@ -1487,7 +1489,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1487,7 +1489,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
++(*iter); ++(*iter);
} else if (key1 > key2) { } else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
...@@ -1527,7 +1529,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1527,7 +1529,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (TD_SUPPORT_UPDATE(update)) { if (TD_SUPPORT_UPDATE(update)) {
// copy mem data(Multi-Version) // copy mem data(Multi-Version)
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
......
...@@ -20,7 +20,8 @@ static void tsdbFreeTbData(STbData *pTbData); ...@@ -20,7 +20,8 @@ static void tsdbFreeTbData(STbData *pTbData);
static char *tsdbGetTsTupleKey(const void *data); static char *tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2); static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char *tsdbTbDataGetUid(const void *arg); static char *tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge); static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
bool merge);
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
STsdbMemTable *pMemTable; STsdbMemTable *pMemTable;
...@@ -88,8 +89,8 @@ void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) { ...@@ -88,8 +89,8 @@ void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
* *
* The function tries to procceed AS MUCH AS POSSIBLE. * The function tries to procceed AS MUCH AS POSSIBLE.
*/ */
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
STSchema *pSchema = NULL; STSchema *pSchema = NULL;
...@@ -222,12 +223,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -222,12 +223,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (lastKey != TSKEY_INITIAL_VAL) { if (lastKey != TSKEY_INITIAL_VAL) {
++pCols->numOfRows; ++pCols->numOfRows;
} }
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
} }
lastKey = rowKey; lastKey = rowKey;
} else { } else {
if (keepDup) { if (keepDup) {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
} else { } else {
// discard // discard
} }
...@@ -249,7 +250,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -249,7 +250,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsDeleteSucceed++; pMergeInfo->rowsDeleteSucceed++;
pMergeInfo->nOperations++; pMergeInfo->nOperations++;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
} else { } else {
if (keepDup) { if (keepDup) {
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
...@@ -262,11 +263,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -262,11 +263,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (lastKey != TSKEY_INITIAL_VAL) { if (lastKey != TSKEY_INITIAL_VAL) {
++pCols->numOfRows; ++pCols->numOfRows;
} }
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
} }
lastKey = rowKey; lastKey = rowKey;
} else { } else {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
} }
} else { } else {
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
...@@ -321,7 +322,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -321,7 +322,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
return -1; return -1;
} }
strcat(pRsp->tblFName, mr.me.name); strcat(pRsp->tblFName, mr.me.name);
if (mr.me.type == TSDB_NORMAL_TABLE) { if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver; sverNew = mr.me.ntbEntry.schema.sver;
} else { } else {
...@@ -431,10 +432,12 @@ static char *tsdbTbDataGetUid(const void *arg) { ...@@ -431,10 +432,12 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg; STbData *pTbData = (STbData *)arg;
return (char *)(&(pTbData->uid)); return (char *)(&(pTbData->uid));
} }
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) {
static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
bool merge) {
if (pCols) { if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row)); *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
if (*ppSchema == NULL) { if (*ppSchema == NULL) {
ASSERT(false); ASSERT(false);
return -1; return -1;
......
...@@ -157,7 +157,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) { ...@@ -157,7 +157,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
} }
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_READ_REPO(pReadh), pTable, false, false, -1);
pReadh->pTable = pTable; pReadh->pTable = pTable;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册