diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6af1a1a79b96d2f1c6a74ff4009f07a7fc2fa6b7..e1bbc3278dabd783ed708485aeacfff9b67db291 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -188,6 +188,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol uint8_t **ppBuf); int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, uint8_t **ppBuf); +int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 4f02299c7e3ba194fe6cee747932633b57c9b6d4..94d957eac770eed9fe7f1cb842becef8898d1863 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -401,6 +401,7 @@ struct SDataFReader { uint8_t *pBuf1; uint8_t *pBuf2; + uint8_t *pBuf3; }; int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { @@ -485,6 +486,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { tFree((*ppReader)->pBuf1); tFree((*ppReader)->pBuf2); + tFree((*ppReader)->pBuf3); taosMemoryFree(*ppReader); _exit: @@ -729,94 +731,75 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo int32_t nColId, SBlockData *pBlockData) { int32_t code = 0; + // TODO tBlockDataReset(pBlockData); TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; - // seek - int64_t n = taosLSeekFile(pFD, pBlkInfo->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - code = tRealloc(&pReader->pBuf1, pBlkInfo->szBlock); + // uid + version + tskey + code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey); if (code) goto _err; - - n = taosReadFile(pFD, pReader->pBuf1, pBlkInfo->szBlock); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pBlkInfo->szBlock) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - uint8_t *p = pReader->pBuf1; - // check & decode SDiskDataHdr hdr; - if (!taosCheckChecksumWhole(p, pBlkInfo->szKey)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - p += tGetDiskDataHdr(p, &hdr); + uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr); + + ASSERT(hdr.delimiter == TSDB_FILE_DLMT); + ASSERT(hdr.suid || hdr.uid); - tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid); + pBlockData->suid = hdr.suid; + pBlockData->uid = hdr.uid; pBlockData->nRow = hdr.nRow; + // uid if (hdr.uid == 0) { ASSERT(hdr.szUid); code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); if (code) goto _err; } else { - ASSERT(hdr.szUid == 0); + ASSERT(!hdr.szUid); } p += hdr.szUid; + // version code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); if (code) goto _err; p += hdr.szVer; - code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, + // TSKEY + code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * hdr.nRow, &pReader->pBuf2); if (code) goto _err; p += hdr.szKey; - p += sizeof(TSCKSUM); - // SBlockCol + ASSERT(p - pReader->pBuf1 == sizeof(TSCKSUM)); + + // read and decode columns if (hdr.szBlkCol > 0) { - if (!taosCheckChecksumWhole(p, hdr.szBlkCol + sizeof(TSCKSUM))) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = + tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, hdr.szBlkCol + sizeof(TSCKSUM)); + if (code) goto _err; - int32_t iColData = 0; - uint8_t *pt = p + hdr.szBlkCol + sizeof(TSCKSUM); - n = 0; + int32_t n = 0; while (n < hdr.szBlkCol) { SBlockCol blockCol; - n += tGetBlockCol(p + n, &blockCol); + n += tGetBlockCol(pReader->pBuf1 + n, &blockCol); ASSERT(blockCol.flag && blockCol.flag != HAS_NONE); - SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColData, &pColData); - if (code) goto _err; - iColData++; + // TODO: merge with the column IDs - tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn); + SColData *pColData = NULL; // (todo) if (blockCol.flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type)); - if (code) goto _err; - } + // TODO: make a hdr.nRow COL_VAL_NULL(); } else { - code = tsdbDecmprColData(pt + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf2); + code = tsdbReadAndCheckFile( + pFD, pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &pReader->pBuf2, + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM)); + + code = tsdbDecmprColData(pReader->pBuf2, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3); if (code) goto _err; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index dbb35532e823166ff1bb6727b2924907a30f651f..6a27b10cf1a7f268de07c2501c9830a9814c742c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1865,6 +1865,40 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in } p += pBlockCol->szValue; +_exit: + return code; +} + +int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size) { + int32_t code = 0; + + // alloc + code = tRealloc(ppOut, size); + if (code) goto _exit; + + // seek + int64_t n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + // read + n = taosReadFile(pFD, *ppOut, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + // check + if (!taosCheckChecksumWhole(*ppOut, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + _exit: return code; } \ No newline at end of file