提交 450a9f3c 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/tsdb_optimize' into enh/tsdb_optimize

......@@ -138,8 +138,7 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int
#define TARRAY2_APPEND(a, e) TARRAY2_APPEND_PTR(a, &(e))
// return (TYPE *)
#define TARRAY2_SEARCH(a, ep, cmp, flag) \
(typeof((a)->data))tarray2Search(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag)
#define TARRAY2_SEARCH(a, ep, cmp, flag) tarray2Search(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag)
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \
tarray2SearchIdx(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag)
......
......@@ -112,6 +112,8 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
committer->ctx->hasTSData = false;
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = row->suid;
......@@ -149,14 +151,28 @@ _exit:
}
static int32_t tsdbCommitTombData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t lino = 0;
SMetaInfo info;
if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
return 0;
}
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
if (record->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = record->suid;
committer->ctx->tbid->uid = record->uid;
if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
}
if (record->ekey < committer->ctx->minKey) {
goto _next;
} else if (record->skey > committer->ctx->maxKey) {
......
......@@ -791,9 +791,9 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32
}
// check if same task is on
if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) {
return 0;
}
// if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) {
// return 0;
// }
for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) {
if (task->type == type) {
......@@ -867,6 +867,29 @@ int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) {
taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex);
}
taosThreadMutexUnlock(fs->mutex);
return 0;
}
static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) {
fs->stop = true;
if (fs->bgTaskRunning) {
tsdbDoWaitBgTask(fs, fs->bgTaskRunning);
}
return 0;
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
int32_t code = tsdbFSDoDisableBgTask(fs);
taosThreadMutexUnlock(fs->mutex);
return code;
}
int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
fs->stop = false;
taosThreadMutexUnlock(fs->mutex);
return 0;
}
\ No newline at end of file
......@@ -62,6 +62,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid);
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
......
......@@ -281,21 +281,34 @@ _exit:
}
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t lino = 0;
SMetaInfo info;
tBlockDataReset(reader->blockData);
TABLEID tbid[1] = {0};
for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
// skip dropped table
if (row->uid != tbid->uid) {
tbid->suid = row->suid;
tbid->uid = row->uid;
if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
}
if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
TABLEID tbid = {
TABLEID tbid1 = {
.suid = row->suid,
.uid = row->suid ? 0 : row->uid,
};
code = tBlockDataInit(reader->blockData, &tbid, reader->skmTb->pTSchema, NULL, 0);
code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -360,12 +373,24 @@ _exit:
}
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t lino = 0;
SMetaInfo info;
tTombBlockClear(reader->tombBlock);
TABLEID tbid[1] = {0};
for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
if (record->uid != tbid->uid) {
tbid->suid = record->suid;
tbid->uid = record->uid;
if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
}
code = tTombBlockPut(reader->tombBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -1000,6 +1025,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSDisableBgTask(pTsdb->pFS);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
......@@ -1050,6 +1077,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock);
}
tsdbFSEnableBgTask(tsdb->pFS);
tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册