diff --git a/include/util/tarray2.h b/include/util/tarray2.h index 5a90ca1847dd63dc95f6a90b932be109df623a74..c8b0163a8f081af03e68d2f31119f3ffacb5098c 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -138,8 +138,7 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int #define TARRAY2_APPEND(a, e) TARRAY2_APPEND_PTR(a, &(e)) // return (TYPE *) -#define TARRAY2_SEARCH(a, ep, cmp, flag) \ - (typeof((a)->data))tarray2Search(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag) +#define TARRAY2_SEARCH(a, ep, cmp, flag) tarray2Search(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag) #define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \ tarray2SearchIdx(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 7f5139caa5de33bd565afcfd959a41b8f5e2b0b6..170700aeb863a7039884fadfd24b7a33f4f6b3ec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -112,6 +112,8 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { committer->ctx->hasTSData = false; + committer->ctx->tbid->suid = 0; + committer->ctx->tbid->uid = 0; for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { if (row->uid != committer->ctx->tbid->uid) { committer->ctx->tbid->suid = row->suid; @@ -149,14 +151,28 @@ _exit: } static int32_t tsdbCommitTombData(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SMetaInfo info; if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) { return 0; } + committer->ctx->tbid->suid = 0; + committer->ctx->tbid->uid = 0; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + if (record->uid != committer->ctx->tbid->uid) { + committer->ctx->tbid->suid = record->suid; + committer->ctx->tbid->uid = record->uid; + + if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + } + if (record->ekey < committer->ctx->minKey) { goto _next; } else if (record->skey > committer->ctx->maxKey) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 82d1530ad5f0880dc3aa51497039e86cb32f79bb..13a88d10118357685b1a15d412741573e45e9893 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -791,9 +791,9 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32 } // check if same task is on - if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) { - return 0; - } + // if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) { + // return 0; + // } for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) { if (task->type == type) { @@ -867,6 +867,29 @@ int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) { taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex); } + taosThreadMutexUnlock(fs->mutex); + return 0; +} + +static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) { + fs->stop = true; + + if (fs->bgTaskRunning) { + tsdbDoWaitBgTask(fs, fs->bgTaskRunning); + } + return 0; +} + +int32_t tsdbFSDisableBgTask(STFileSystem *fs) { + taosThreadMutexLock(fs->mutex); + int32_t code = tsdbFSDoDisableBgTask(fs); + taosThreadMutexUnlock(fs->mutex); + return code; +} + +int32_t tsdbFSEnableBgTask(STFileSystem *fs) { + taosThreadMutexLock(fs->mutex); + fs->stop = false; taosThreadMutexUnlock(fs->mutex); return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 24966a71272fb2f4b0e7c8956b2c2856981b9aac..e740d5b735146c91f4fa4ff6c68d67dc9a6bb8f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -62,6 +62,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs); int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid); int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid); int32_t tsdbFSWaitAllBgTask(STFileSystem *fs); +int32_t tsdbFSDisableBgTask(STFileSystem *fs); +int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index d7daa9cde4a2341ebd0cad96b83524a8bef34247..778121ae4ac534e1dc29b89f527c7adf1c8eb8c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -281,21 +281,34 @@ _exit: } static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SMetaInfo info; tBlockDataReset(reader->blockData); + TABLEID tbid[1] = {0}; for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) { + // skip dropped table + if (row->uid != tbid->uid) { + tbid->suid = row->suid; + tbid->uid = row->uid; + if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + } + if (reader->blockData->suid == 0 && reader->blockData->uid == 0) { code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb); TSDB_CHECK_CODE(code, lino, _exit); - TABLEID tbid = { + TABLEID tbid1 = { .suid = row->suid, .uid = row->suid ? 0 : row->uid, }; - code = tBlockDataInit(reader->blockData, &tbid, reader->skmTb->pTSchema, NULL, 0); + code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } @@ -360,12 +373,24 @@ _exit: } static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SMetaInfo info; tTombBlockClear(reader->tombBlock); + TABLEID tbid[1] = {0}; for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) { + if (record->uid != tbid->uid) { + tbid->suid = record->suid; + tbid->uid = record->uid; + if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + } + code = tTombBlockPut(reader->tombBlock, record); TSDB_CHECK_CODE(code, lino, _exit); @@ -1000,6 +1025,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); + tsdbFSDisableBgTask(pTsdb->pFS); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -1050,6 +1077,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); } + tsdbFSEnableBgTask(tsdb->pFS); tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger); tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);