diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e82bb580ec1b8aa8e4f4f4781ce5d028f4d4f1d1..866141614baf68a969367b258e58c4aa77163edf 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -608,6 +608,7 @@ struct STsdbRepo { }; #define REPO_ID(r) (r)->config.tsdbId +#define REPO_CFG(r) (&((r)->config)) #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 9c1b8798aa5b2f065948179f7267757f7a6fa79b..6d031e06359bb3172cc0bef88b83a70599a116a8 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -46,6 +46,7 @@ typedef struct { #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh)) +#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { @@ -469,23 +470,70 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) { TSKEY nextKey = tsdbNextIterKey(pIter->pIter); int cidx = 0; void * ptr = NULL; - SBlock *pBlock = NULL; + SBlock *pBlock; + + if (cidx < nBlocks) { + pBlock = pch->readh.pBlkInfo->blocks + cidx; + } else { + pBlock = NULL; + } while (true) { - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= nBlocks)) break; + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (pBlock == NULL)) break; if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) || - ((cidx < nBlocks) && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { + (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { // TODO: move the block + ASSERT(pBlock->numOfSubBlocks > 0); + if (pBlock->numOfSubBlocks == 1) { // move super block + if (taosArrayPush(pch->aSupBlk, (void *)pBlock) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + } else { + + } + cidx++; + if (cidx < nBlocks) { + pBlock = pch->readh.pBlkInfo->blocks + cidx; + } else { + pBlock = NULL; + } } else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // TODO: process merge commit + TSKEY keyLimit; + if (cidx == nBlocks - 1) { + keyLimit = pch->maxKey; + } else { + keyLimit = pBlock[1].keyFirst - 1; + } + + if (tsdbMergeMemData(pch, pIter, pBlock, keyLimit) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + cidx++; + if (cidx < nBlocks) { + pBlock = pch->readh.pBlkInfo->blocks + cidx; + } else { + pBlock = NULL; + } + nextKey = tsdbNextIterKey(pIter->pIter); } else { if (pBlock == NULL) { - // commit memory data until pch->maxKey and write to the appropriate file + if (tsdbCommitMemData(pch, pIter, pch->maxKey, false) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + nextKey = tsdbNextIterKey(pIter->pIter); } else { - // commit memory data until pBlock->keyFirst and write to only data file + if (tsdbCommitMemData(pch, pIter, pBlock->keyFirst-1, true) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + nextKey = tsdbNextIterKey(pIter->pIter); } } @@ -512,35 +560,6 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) { #endif } - // if (pIter->pIter == NULL) { - // // No memory data but has disk data - // // TODO - // } else { - // TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - // int cidx = 0; - // SBlock *pBlock = NULL; - - // void *ptr = taosbsearch((void *)(&nextKey), pch->readh.pBlkInfo->blocks, pch->readh.pBlkIdx->numOfBlocks, - // sizeof(SBlock), tsdbComparKeyBlock, TD_GE); - - // while (true) { - // if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= pch->readh.pBlkIdx->numOfBlocks)) - // break; - - // if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) < 0) { - // if (pBlock->last) { - // // merge with the last block - // } else { - // // Commit until pch->maxKey or (pBlock[1].keyFirst-1) - // } - // } else if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0) { // merge the block - - // } else { - - // } - // } - // } - TSDB_RUNLOCK_TABLE(pIter->pTable); if (tsdbWriteBlockInfo(pch) < 0) return -1; @@ -825,5 +844,82 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) { pHeadf->info.offset = offset; pHeadf->info.len = tlen; + return 0; +} + +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + SMergeInfo mInfo; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + SDFile * pDFile; + bool isLast; + SBlock block; + + while (true) { + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, + pCfg->update, &mInfo); + + if (pCommith->pDataCols->numOfRows <= 0) break; + + if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } else { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + + if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + return 0; +} + +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, SBlock *pBlock, TSKEY keyLimit) { + // TODO + return 0; +} + +static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks+bidx; + SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith); + SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh)); + SBlock block; + + if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) { + if (pBlock->numOfSubBlocks == 1) { + if (taosArrayPush(pCommith->aSupBlk, (void *)pBlock) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } else { + block = *pBlock; + block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock); + + if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (taosArrayPushBatch(pCommith->aSubBlk, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; + if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + return 0; } \ No newline at end of file diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index 268d5b89dfbe1d0cef0bf76425cf95070a783021..1f6ea935f918f2d5f9856b16fd4664f7050d1323 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -46,9 +46,20 @@ void* taosArrayInit(size_t size, size_t elemSize); * * @param pArray * @param pData + * @param nEles * @return */ -void* taosArrayPush(SArray* pArray, const void* pData); +void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles); + +/** + * + * @param pArray + * @param pData + * @return + */ +static FORCE_INLINE void* taosArrayPush(SArray* pArray, const void* pData) { + return taosArrayPushBatch(pArray, pData, 1); +} /** * diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 232b46cf8346d551448056aca231fcc0a85fe331..b82b2a1ed0b0390ff97f84126ffef463807de09f 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -55,24 +55,29 @@ static int32_t taosArrayResize(SArray* pArray) { return 0; } -void* taosArrayPush(SArray* pArray, const void* pData) { +void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) { if (pArray == NULL || pData == NULL) { return NULL; } - if (pArray->size >= pArray->capacity) { - int32_t ret = taosArrayResize(pArray); - - // failed to push data into buffer due to the failure of memory allocation - if (ret != 0) { + if (pArray->size + nEles > pArray->capacity) { + size_t tsize = (pArray->capacity << 1u); + while (pArray->size + nEles > tsize) { + tsize = (tsize << 1u); + } + + pArray->pData = realloc(pArray->pData, tsize * pArray->elemSize); + if (pArray->pData == NULL) { return NULL; } + + pArray->capacity = tsize; } void* dst = TARRAY_GET_ELEM(pArray, pArray->size); - memcpy(dst, pData, pArray->elemSize); + memcpy(dst, pData, pArray->elemSize * nEles); - pArray->size += 1; + pArray->size += nEles; return dst; }