diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 6a54cb23998ff48ffeb2acb552e6ce5f7a6cbeac..d6f73ee1be8e8ae812c6b47279c17b31642e2871 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -183,10 +183,10 @@ typedef struct { } STsdbFileH; typedef struct { - int numOfFGroups; - SFileGroup *base; - SFileGroup *pFileGroup; int direction; + STsdbFileH* pFileH; + int fileId; + int index; } SFileGroupIter; // ------------------ tsdbMain.c diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 6e7b39830eaea94b322cc44963f8569a453f019c..7d8a68182b1d2d29d6c23a96e69037efdf334932 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -156,8 +156,10 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { goto _err; } + pthread_rwlock_wrlock(&pFileH->fhlock); pFileH->pFGroup[pFileH->nFGroups++] = fGroup; qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); + pthread_rwlock_unlock(&pFileH->fhlock); return tsdbSearchFGroup(pFileH, fid, TD_EQ); } @@ -168,54 +170,72 @@ _err: return NULL; } -void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { + pIter->pFileH = pFileH; pIter->direction = direction; - pIter->base = pFileH->pFGroup; - pIter->numOfFGroups = pFileH->nFGroups; + if (pFileH->nFGroups == 0) { - pIter->pFileGroup = NULL; + pIter->index = -1; + pIter->fileId = -1; } else { if (direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->pFileGroup = pFileH->pFGroup; + pIter->index = 0; } else { - pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1; + pIter->index = pFileH->nFGroups - 1; } + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; } } -void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO - if (pIter->numOfFGroups == 0) { - assert(pIter->pFileGroup == NULL); +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + STsdbFileH *pFileH = pIter->pFileH; + + if (pFileH->nFGroups == 0) { + pIter->index = -1; + pIter->fileId = -1; return; } int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); + void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); if (ptr == NULL) { - pIter->pFileGroup = NULL; + pIter->index = -1; + pIter->fileId = -1; } else { - pIter->pFileGroup = (SFileGroup *)ptr; + pIter->index = POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup); + pIter->fileId = ((SFileGroup *)ptr)->fileId; } } -SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO - SFileGroup *ret = pIter->pFileGroup; - if (ret == NULL) return NULL; +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { + STsdbFileH *pFileH = pIter->pFileH; + SFileGroup *pFGroup = NULL; + + if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + if (pFGroup->fileId != pIter->fileId) { + tsdbSeekFileGroupIter(pIter, pIter->fileId); + } + + if (pIter->index < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + ASSERT(pFGroup->fileId == pIter->fileId); if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) { - if ((pIter->pFileGroup + 1) == (pIter->base + pIter->numOfFGroups)) { - pIter->pFileGroup = NULL; - } else { - pIter->pFileGroup += 1; - } + pIter->index++; } else { - if (pIter->pFileGroup == pIter->base) { - pIter->pFileGroup = NULL; - } else { - pIter->pFileGroup -= 1; - } + pIter->index--; } - return ret; + + if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) { + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; + } else { + pIter->fileId = -1; + } + + return pFGroup; } int tsdbOpenFile(SFile *pFile, int oflag) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b04b0be9bad1b7cac44227a0ade32ed77175f0ea..2b6dc66e2db779083047bdb0c2089096b249c6d6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -787,6 +787,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC); while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) { if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err; + if (tsdbLoadCompIdx(&rhelper, NULL) < 0) goto _err; for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 506f3c11c104bed4a84641f7d95538a62bdbfec7..d5e90bcd601774e5b1bfa0b89fabf7ca585abd8e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -603,6 +603,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe goto _err; } + if (tsdbLoadCompIdx(pHelper, NULL) < 0) { + tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + // Loop to commit data in each table for (int tid = 1; tid < pMem->maxTables; tid++) { SCommitIter *pIter = iters + tid; @@ -651,12 +656,20 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe tsdbCloseHelperFile(pHelper, 0); pthread_rwlock_wrlock(&(pFileH->fhlock)); + #ifdef TSDB_IDX - pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper)); + rename(helperNewIdxF(pHelper)->fname, helperIdxF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_IDX].info = helperNewIdxF(pHelper)->info; #endif - pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper)); - pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper)); - pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper)); + + rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; + + rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; + + pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; + pthread_rwlock_unlock(&(pFileH->fhlock)); return 0; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 8d3908666ed0cd9a517fabec3b7e7ea20a0885c7..b04d08f850a7d8cc49cea8586cfa60c9458e4561 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -118,12 +118,12 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Open the files #ifdef TSDB_IDX - if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) return -1; #endif - if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err; - if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1; + if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1; #ifdef TSDB_IDX // Create and open .i file @@ -144,23 +144,20 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { pFile = helperNewLastF(pHelper); - if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err; + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; pFile->info.size = TSDB_FILE_HEAD_SIZE; pFile->info.magic = TSDB_FILE_INIT_MAGIC; pFile->info.len = 0; if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; } } else { - if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err; - if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) return -1; + if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) return -1; } helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); - return tsdbLoadCompIdx(pHelper, NULL); - -_err: - return -1; + return 0; } int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { @@ -183,8 +180,12 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { pFile = helperDataF(pHelper); if (pFile->fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbUpdateFileHeader(pFile, 0); - fsync(pFile->fd); + if (!hasError) { + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + } else { + // TODO: shrink back to origin + } } close(pFile->fd); pFile->fd = -1; @@ -193,7 +194,12 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { pFile = helperLastF(pHelper); if (pFile->fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { - fsync(pFile->fd); + if (!hasError) { + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + } else { + // TODO: shrink back to origin + } } close(pFile->fd); pFile->fd = -1; @@ -203,60 +209,36 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { #ifdef TSDB_IDX pFile = helperNewIdxF(pHelper); if (pFile->fd > 0) { - if (!hasError) tsdbUpdateFileHeader(pFile, 0); - fsync(pFile->fd); + if (!hasError) { + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + } close(pFile->fd); pFile->fd = -1; - if (hasError) { - (void)remove(pFile->fname); - } else { - if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - helperIdxF(pHelper)->info = pFile->info; - } + if (hasError) (void)remove(pFile->fname); } #endif pFile = helperNewHeadF(pHelper); if (pFile->fd > 0) { - if (!hasError) tsdbUpdateFileHeader(pFile, 0); - fsync(pFile->fd); + if (!hasError) { + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + } close(pFile->fd); pFile->fd = -1; - if (hasError) { - (void)remove(pFile->fname); - } else { - if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - helperHeadF(pHelper)->info = pFile->info; - } + if (hasError) (void)remove(pFile->fname); } pFile = helperNewLastF(pHelper); if (pFile->fd > 0) { - if (!hasError) tsdbUpdateFileHeader(pFile, 0); - fsync(pFile->fd); + if (!hasError) { + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + } close(pFile->fd); pFile->fd = -1; - if (hasError) { - (void)remove(pFile->fname); - } else { - if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - helperLastF(pHelper)->info = helperNewLastF(pHelper)->info; - } + if (hasError) (void)remove(pFile->fname); } } return 0; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 0e3a657fde547f186623734e9dcf24b9087e9f27..8515fb52e3015fa8e8ee6c27b0f34f3361f0b99b 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -555,16 +555,6 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK } static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) { - SFileGroup* fileGroup = pQueryHandle->pFileGroup; - assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); - - int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup); - - //open file failed, return error code to client - if (code != TSDB_CODE_SUCCESS) { - return code; - } - // load all the comp offset value for all tables in this file *numOfBlocks = 0; size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -1461,12 +1451,20 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STimeWindow win = TSWINDOW_INITIALIZER; - while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { + while (true) { + pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + + if ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) == NULL) { + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + break; + } + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo); pQueryHandle->pFileGroup = NULL; @@ -1474,6 +1472,19 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex break; } + if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + code = terrno; + break; + } + + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + + if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) { + code = terrno; + break; + } + if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } @@ -1528,8 +1539,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); + pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); return getDataBlocksInFilesImpl(pQueryHandle, exists); } else {