From 0271f943944786bad3c96b7d494e8f0adef43221 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 31 Jul 2020 14:02:20 +0800 Subject: [PATCH] refact FGroup Iter make it multi-thread safe --- src/tsdb/inc/tsdbMain.h | 6 ++-- src/tsdb/src/tsdbFile.c | 72 ++++++++++++++++++++++++++--------------- src/tsdb/src/tsdbRead.c | 2 ++ 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 6a54cb2399..d6f73ee1be 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -183,10 +183,10 @@ typedef struct { } STsdbFileH; typedef struct { - int numOfFGroups; - SFileGroup *base; - SFileGroup *pFileGroup; int direction; + STsdbFileH* pFileH; + int fileId; + int index; } SFileGroupIter; // ------------------ tsdbMain.c diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 6e7b39830e..7d8a68182b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -156,8 +156,10 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { goto _err; } + pthread_rwlock_wrlock(&pFileH->fhlock); pFileH->pFGroup[pFileH->nFGroups++] = fGroup; qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); + pthread_rwlock_unlock(&pFileH->fhlock); return tsdbSearchFGroup(pFileH, fid, TD_EQ); } @@ -168,54 +170,72 @@ _err: return NULL; } -void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { + pIter->pFileH = pFileH; pIter->direction = direction; - pIter->base = pFileH->pFGroup; - pIter->numOfFGroups = pFileH->nFGroups; + if (pFileH->nFGroups == 0) { - pIter->pFileGroup = NULL; + pIter->index = -1; + pIter->fileId = -1; } else { if (direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->pFileGroup = pFileH->pFGroup; + pIter->index = 0; } else { - pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1; + pIter->index = pFileH->nFGroups - 1; } + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; } } -void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO - if (pIter->numOfFGroups == 0) { - assert(pIter->pFileGroup == NULL); +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + STsdbFileH *pFileH = pIter->pFileH; + + if (pFileH->nFGroups == 0) { + pIter->index = -1; + pIter->fileId = -1; return; } int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); + void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); if (ptr == NULL) { - pIter->pFileGroup = NULL; + pIter->index = -1; + pIter->fileId = -1; } else { - pIter->pFileGroup = (SFileGroup *)ptr; + pIter->index = POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup); + pIter->fileId = ((SFileGroup *)ptr)->fileId; } } -SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO - SFileGroup *ret = pIter->pFileGroup; - if (ret == NULL) return NULL; +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { + STsdbFileH *pFileH = pIter->pFileH; + SFileGroup *pFGroup = NULL; + + if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + if (pFGroup->fileId != pIter->fileId) { + tsdbSeekFileGroupIter(pIter, pIter->fileId); + } + + if (pIter->index < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + ASSERT(pFGroup->fileId == pIter->fileId); if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) { - if ((pIter->pFileGroup + 1) == (pIter->base + pIter->numOfFGroups)) { - pIter->pFileGroup = NULL; - } else { - pIter->pFileGroup += 1; - } + pIter->index++; } else { - if (pIter->pFileGroup == pIter->base) { - pIter->pFileGroup = NULL; - } else { - pIter->pFileGroup -= 1; - } + pIter->index--; } - return ret; + + if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) { + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; + } else { + pIter->fileId = -1; + } + + return pFGroup; } int tsdbOpenFile(SFile *pFile, int oflag) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 4494b86209..8515fb52e3 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1539,8 +1539,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); + pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); + pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); return getDataBlocksInFilesImpl(pQueryHandle, exists); } else { -- GitLab