diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0560b6060a5b31cb878336f787e331fa796d1f75..0045778842ecad13bf0f7bb651314994c37dfe21 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -63,11 +63,11 @@ void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); // STbDataIter -int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); -void *tsdbTbDataIterDestroy(STbDataIter *pIter); -bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); -bool tsdbTbDataIterNext(STbDataIter *pIter); -bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); +int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); +void *tsdbTbDataIterDestroy(STbDataIter *pIter); +void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); +TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter); +bool tsdbTbDataIterNext(STbDataIter *pIter); // tsdbFile.c ============================================================================================== typedef struct SDelFile SDelFile; @@ -341,6 +341,8 @@ struct STbDataIter { STbData *pTbData; int8_t backward; SMemSkipListNode *pNode; + TSDBROW *pRow; + TSDBROW row; }; struct SDelOp { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 73fb117ca617ca47cd0e01763650b4c0c427abee..ca6513b7f479710080fa74ee28419b9b41f75a98 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -23,7 +23,7 @@ typedef struct { int32_t minRow; int32_t maxRow; // -------------- - TSKEY nextKey; + TSKEY nextKey; // need to be reset by each table commit int32_t commitFid; TSKEY minKey; TSKEY maxKey; @@ -347,17 +347,21 @@ _err: } static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { - int32_t code = 0; - STbDataIter iter; - TSDBROW row; - SBlockIdx blockIdx; + int32_t code = 0; + STbDataIter *pIter = NULL; + STbDataIter iter; + TSDBROW *pRow = NULL; + SBlockIdx blockIdx; + int32_t iBlock; + int32_t nBlock; + SBlock *pBlock; - // check: if no memory data and no disk data, exit + // check if (pTbData) { - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &iter); - if ((!tsdbTbDataIterGet(&iter, &row) || row.pTSRow->ts > pCommitter->maxKey) && pBlockIdx == NULL) { - goto _exit; - } + pIter = &iter; + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); + pRow = tsdbTbDataIterGet(pIter); + if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && pBlockIdx == NULL) goto _exit; } // start @@ -368,11 +372,40 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (code) goto _err; } - // impl + if (pTbData) { + blockIdx.suid = pTbData->suid; + blockIdx.uid = pTbData->uid; + } else { + blockIdx.suid = pBlockIdx->suid; + blockIdx.uid = pBlockIdx->uid; + } + blockIdx.minKey.version = INT64_MAX; + blockIdx.minKey.ts = TSKEY_MAX; + blockIdx.maxKey.version = 0; + blockIdx.maxKey.ts = TSKEY_MIN; + blockIdx.minVersion = INT64_MAX; + blockIdx.maxVersion = INT64_MIN; + blockIdx.offset = -1; + blockIdx.size = -1; + + // impl (todo) // end + code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx); + if (code) goto _err; _exit: + pRow = tsdbTbDataIterGet(pIter); + if (pRow) { + ASSERT(pRow->pTSRow->ts > pCommitter->maxKey); + if (pCommitter->nextKey > pRow->pTSRow->ts) { + pCommitter->nextKey = pRow->pTSRow->ts; + } + } + return code; _err: @@ -424,15 +457,18 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t iBlockIdx = 0; int32_t nBlockIdx = pCommitter->oBlockIdx.nItem; STbData *pTbData; - SBlockIdx *pBlockIdx = NULL; + SBlockIdx *pBlockIdx; SBlockIdx blockIdx; ASSERT(nTbData > 0); + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); if (iBlockIdx < nBlockIdx) { + pBlockIdx = &blockIdx; code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); if (code) goto _err; - pBlockIdx = &blockIdx; + } else { + pBlockIdx = NULL; } while (true) { @@ -457,6 +493,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { _commit_mem_data: code = tsdbCommitTableData(pCommitter, pTbData, NULL); if (code) goto _err; + iTbData++; if (iTbData < nTbData) { pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); @@ -468,11 +505,12 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { _commit_disk_data: code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); if (code) goto _err; + iBlockIdx++; if (iBlockIdx < nBlockIdx) { + pBlockIdx = &blockIdx; code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); if (code) goto _err; - pBlockIdx = &blockIdx; } else { pBlockIdx = NULL; } @@ -481,6 +519,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { _commit_mem_and_disk_data: code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx); if (code) goto _err; + iTbData++; iBlockIdx++; if (iTbData < nTbData) { @@ -489,9 +528,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { pTbData = NULL; } if (iBlockIdx < nBlockIdx) { + pBlockIdx = &blockIdx; code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); if (code) goto _err; - pBlockIdx = &blockIdx; } else { pBlockIdx = NULL; } @@ -582,9 +621,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { SMemTable *pMemTable = pTsdb->imem; // check - if (pMemTable->nRow == 0) { - goto _exit; - } + if (pMemTable->nRow == 0) goto _exit; // loop pCommitter->nextKey = pMemTable->minKey.ts; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 007c78cae5ad548fef46a61b10b40b53286b4cbd..3108c14c0d810ddf0100d33e8083b8b485e003d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -211,17 +211,16 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) { return NULL; } -bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) { +void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) { SMemSkipListNode *pos[SL_MAX_LEVEL]; SMemSkipListNode *pHead; SMemSkipListNode *pTail; - if (pTbData == NULL) return false; - pHead = pTbData->sl.pHead; pTail = pTbData->sl.pTail; pIter->pTbData = pTbData; pIter->backward = backward; + pIter->pRow = NULL; if (pFrom == NULL) { // create from head or tail if (backward) { @@ -239,20 +238,13 @@ bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa pIter->pNode = SL_NODE_FORWARD(pos[0], 0); } } - - if ((backward && pIter->pNode == pHead) || (!backward && pIter->pNode == pTail)) { - return false; - } - - return true; } bool tsdbTbDataIterNext(STbDataIter *pIter) { SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; - if (pIter == NULL) return false; - + pIter->pRow = NULL; if (pIter->backward) { ASSERT(pIter->pNode != pTail); @@ -280,33 +272,26 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { return true; } -bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) { - SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; - SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; - TSDBROW row = {0}; - - if (pIter == NULL) return false; - - if (pRow == NULL) { - pRow = &row; +TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { + if (pIter->pRow) { + goto _exit; } if (pIter->backward) { - ASSERT(pIter->pNode != pTail); - - if (pIter->pNode == pHead) { - return false; + if (pIter->pNode == pIter->pTbData->sl.pHead) { + goto _exit; } } else { - ASSERT(pIter->pNode != pHead); - - if (pIter->pNode == pTail) { - return false; + if (pIter->pNode == pIter->pTbData->sl.pTail) { + goto _exit; } } - tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), pRow); - return true; + tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); + pIter->pRow = &pIter->row; + +_exit: + return pIter->pRow; } static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 733d1ad8e9f2823cd9a652bdb2fb1435acd349da..24412802d438040d9873fb0190caadb791599d52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -225,6 +225,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb int32_t code = 0; char *fname = NULL; // todo SDelFReader *pDelFReader; + int64_t n; // alloc pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); @@ -250,7 +251,11 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb goto _err; } - if (taosReadFile(pDelFReader->pReadH, *ppBuf, TSDB_FHDR_SIZE) < TSDB_FHDR_SIZE) { + n = taosReadFile(pDelFReader->pReadH, *ppBuf, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < TSDB_FHDR_SIZE) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -311,6 +316,9 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDa if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; + } else if (n < pDelIdx->size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } // check @@ -358,9 +366,13 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB if (code) goto _err; // read - if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) { + n = taosReadFile(pReader->pReadH, *ppBuf, size); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } // check @@ -387,7 +399,10 @@ _err: struct SDataFReader { STsdb *pTsdb; SDFileSet *pSet; - TdFilePtr pReadH; + TdFilePtr pHeadFD; + TdFilePtr pDataFD; + TdFilePtr pLastFD; + TdFilePtr pSmaFD; }; int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { @@ -398,19 +413,133 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFReaderClose(SDataFReader *pReader) { int32_t code = 0; - // TODO + + if (taosCloseFile(&pReader->pHeadFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pReader->pDataFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pReader->pLastFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pReader->pSmaFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d data file reader close failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf) { - int32_t code = 0; - // TODO + int32_t code = 0; + int64_t offset = -1; // TODO + int64_t size = -1; // TODO + int64_t n; + uint32_t delimiter; + + // alloc + if (!ppBuf) ppBuf = &pMapData->pBuf; + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // seek + if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(*ppBuf, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = 0; + n += tGetU32(*ppBuf + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); + n += tGetMapData(*ppBuf + n, pMapData); + ASSERT(n + sizeof(TSCKSUM) == size); + + return code; + +_err: + tsdbError("vgId:%d read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf) { - int32_t code = 0; - // TODO + int32_t code = 0; + int64_t offset = pBlockIdx->offset; + int64_t size = pBlockIdx->size; + int64_t n; + uint32_t delimiter; + tb_uid_t suid; + tb_uid_t uid; + + // alloc + if (!ppBuf) ppBuf = &pMapData->pBuf; + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // seek + if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(*ppBuf, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = 0; + n += tGetU32(*ppBuf + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); + n += tGetI64(*ppBuf + n, &suid); + ASSERT(suid == pBlockIdx->suid); + n += tGetI64(*ppBuf + n, &uid); + ASSERT(uid == pBlockIdx->uid); + n += tGetMapData(*ppBuf + n, pMapData); + ASSERT(n + sizeof(TSCKSUM) == size); + + return code; + +_err: + tsdbError("vgId:%d read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } @@ -430,7 +559,10 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) { struct SDataFWriter { STsdb *pTsdb; SDFileSet *pSet; - TdFilePtr pWriteH; + TdFilePtr pHeadFD; + TdFilePtr pDataFD; + TdFilePtr pLastFD; + TdFilePtr pSmaFD; }; int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { @@ -441,7 +573,53 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync) { int32_t code = 0; - // TODO + + if (sync) { + if (taosFsyncFile(pWriter->pHeadFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pWriter->pDataFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pWriter->pLastFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pWriter->pSmaFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + + if (taosCloseFile(&pWriter->pHeadFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pWriter->pDataFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pWriter->pLastFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pWriter->pSmaFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d data file writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -451,9 +629,43 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { return code; } -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf) { - int32_t code = 0; - // TODO +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t **ppBuf) { + int32_t code = 0; + int64_t size = 0; + int64_t n = 0; + uint8_t *pBuf = NULL; + + // prepare + size += tPutU32(NULL, TSDB_FILE_DLMT); + size = size + tPutMapData(NULL, pBlockIdxMap) + sizeof(TSCKSUM); + + // alloc + if (!ppBuf) ppBuf = &pBuf; + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // build + n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); + n += tPutMapData(*ppBuf, pBlockIdxMap); + taosCalcChecksumAppend(0, *ppBuf, size); + + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // update (todo) + + tsdbFree(pBuf); + return code; + +_err: + tsdbError("vgId:%d write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); return code; }