From 49fde0b1a87937a3c6c94551c51f2ad94d6c95af Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 8 Aug 2022 07:27:44 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 12 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 559 +----------------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 202 +++---- 3 files changed, 120 insertions(+), 653 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2606d7a8d0..0e10d1b000 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -141,17 +141,17 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); int32_t tBlockDataCreate(SBlockData *pBlockData); void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema); -int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId); +int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); +int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); +int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); #if 1 int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); -int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); -int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); #endif @@ -261,10 +261,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, int16_t *aColId, - int32_t nColId); -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId, - int32_t nColId); +int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); +int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 48023858cc..f73883d050 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -835,583 +835,72 @@ _err: return code; } -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, int16_t *aColId, - int32_t nColId) { +int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData) { int32_t code = 0; code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData); if (code) goto _err; - for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - // TODO - ASSERT(0); - } - - return code; - -_err: - tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId, - int32_t nColId) { - int32_t code = 0; - - code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -#if 0 -static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, - uint8_t **ppBuf) { - int32_t code = 0; -#if 0 - int64_t size; - int64_t n; - - if (!taosCheckChecksumWhole(pBuf, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - pColData->nVal = pSubBlock->nRow; - pColData->flag = pBlockCol->flag; - - // BITMAP - if (pBlockCol->flag != HAS_VALUE) { - ASSERT(pBlockCol->szBitmap); - - size = BIT2_SIZE(pColData->nVal); - code = tRealloc(&pColData->pBitMap, size); - if (code) goto _err; - - code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); - if (code) goto _err; - - n = tsDecompressTinyint(pBuf, pBlockCol->szBitmap, size, pColData->pBitMap, size, TWO_STAGE_COMP, *ppBuf, - size + COMP_OVERFLOW_BYTES); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - ASSERT(n == size); - } else { - ASSERT(pBlockCol->szBitmap == 0); - } - pBuf = pBuf + pBlockCol->szBitmap; - - // OFFSET - if (IS_VAR_DATA_TYPE(pColData->type)) { - ASSERT(pBlockCol->szOffset); - - size = sizeof(int32_t) * pColData->nVal; - code = tRealloc((uint8_t **)&pColData->aOffset, size); - if (code) goto _err; - - code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); - if (code) goto _err; - - n = tsDecompressInt(pBuf, pBlockCol->szOffset, pColData->nVal, (char *)pColData->aOffset, size, TWO_STAGE_COMP, - *ppBuf, size + COMP_OVERFLOW_BYTES); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - ASSERT(n == size); - } else { - ASSERT(pBlockCol->szOffset == 0); - } - pBuf = pBuf + pBlockCol->szOffset; - - // VALUE - pColData->nData = pBlockCol->szOrigin; - - code = tRealloc(&pColData->pData, pColData->nData); - if (code) goto _err; - - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - memcpy(pColData->pData, pBuf, pColData->nData); - } else { - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES); - if (code) goto _err; - } - - n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->szValue, pSubBlock->nRow, pColData->pData, - pColData->nData, pSubBlock->cmprAlg, *ppBuf, - pColData->nData + COMP_OVERFLOW_BYTES); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - ASSERT(n == pColData->nData); - } - - return code; - -_err: -#endif - return code; -} - -static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, - int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { - int32_t code = 0; -#if 0 - TdFilePtr pFD = pReader->pDataFD; - SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SArray *aBlockCol = NULL; - int64_t offset; - int64_t size; - int64_t n; - - tBlockDataReset(pBlockData); - pBlockData->nRow = pSubBlock->nRow; - - // TSDBKEY and SBlockCol - if (nCol == 1) { - offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM); - size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - } else { - offset = pSubBlock->offset; - size = pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - } - - code = tRealloc(ppBuf1, size); - if (code) goto _err; - - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - if (nCol == 1) { - code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); - if (code) goto _err; - - goto _exit; - } else { - aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); - if (aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + if (pBlock->nSubBlock > 1) { + SBlockData bData1; + SBlockData bData2; - code = tsdbReadBlockCol(*ppBuf1, pSubBlock->szBlock, NULL /*todo*/, aBlockCol); + // create + code = tBlockDataCreate(&bData1); if (code) goto _err; - - code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2); + code = tBlockDataCreate(&bData2); if (code) goto _err; - } - - for (int32_t iCol = 1; iCol < nCol; iCol++) { - void *p = taosArraySearch(aBlockCol, &(SBlockCol){.cid = aColId[iCol]}, tBlockColCmprFn, TD_EQ); - - if (p) { - SBlockCol *pBlockCol = (SBlockCol *)p; - SColData *pColData; - - ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); - if (code) goto _err; - - tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn); - if (pBlockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + - pSubBlock->szTSKEY + sizeof(TSCKSUM) + pBlockCol->offset; - size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); - - code = tRealloc(ppBuf1, size); - if (code) goto _err; - - // seek - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); - if (code) goto _err; - } - } - } - -_exit: - taosArrayDestroy(aBlockCol); - return code; - -_err: - taosArrayDestroy(aBlockCol); -#endif - return code; -} - -int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, - SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - ASSERT(aColId[0] == PRIMARYKEY_TIMESTAMP_COL_ID); + // init + tBlockDataInitEx(&bData1, pBlockData); + tBlockDataInitEx(&bData2, pBlockData); - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; - - code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); - if (code) goto _err; - - if (pBlock->nSubBlock > 1) { - SBlockData *pBlockData1 = &(SBlockData){0}; - SBlockData *pBlockData2 = &(SBlockData){0}; - - tBlockDataCreate(pBlockData1); - tBlockDataCreate(pBlockData2); for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); - if (code) goto _err; - - code = tBlockDataCopy(pBlockData, pBlockData2); + code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[iSubBlock], 0, &bData1); if (code) { - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); goto _err; } - code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); + code = tBlockDataCopy(pBlockData, &bData2); if (code) { - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); goto _err; } - } - - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); - } - - tFree(pBuf1); - tFree(pBuf2); - return code; - -_err: - tsdbError("vgId:%d, tsdb read col data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf1); - tFree(pBuf2); - return code; -} - -static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData, - uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; -#if 0 - uint8_t *p; - int64_t size; - int64_t n; - TdFilePtr pFD = pReader->pDataFD; - SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SArray *aBlockCol = NULL; - - tBlockDataReset(pBlockData); - - // realloc - code = tRealloc(ppBuf1, pSubBlock->szBlock); - if (code) goto _err; - - // seek - n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, pSubBlock->szBlock); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pSubBlock->szBlock) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - pBlockData->nRow = pSubBlock->nRow; - - // TSDBKEY - p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM); - code = tsdbReadBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); - if (code) goto _err; - - // COLUMNS - aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); - if (aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol); - if (code) goto _err; - - for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) { - SColData *pColData; - SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol); - - ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData); - if (code) goto _err; - - tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn); - if (pBlockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + - sizeof(TSCKSUM) + pBlockCol->offset; - code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, p, ppBuf2); - if (code) goto _err; - } - } - - taosArrayDestroy(aBlockCol); - return code; - -_err: - tsdbError("vgId:%d, tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - taosArrayDestroy(aBlockCol); -#endif - return code; -} - -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { - int32_t code = 0; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; - - // read the first sub-block - int32_t iSubBlock = 0; - code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); - if (code) goto _err; - - // read remain block data and do merg - if (pBlock->nSubBlock > 1) { - SBlockData *pBlockData1 = &(SBlockData){0}; - SBlockData *pBlockData2 = &(SBlockData){0}; - tBlockDataCreate(pBlockData1); - tBlockDataCreate(pBlockData2); - for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); + code = tBlockDataMerge(&bData1, &bData2, pBlockData); if (code) { - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); - goto _err; - } - - code = tBlockDataCopy(pBlockData, pBlockData2); - if (code) { - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); - goto _err; - } - - // merge two block data - code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); - if (code) { - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); goto _err; } } - tBlockDataDestroy(pBlockData1, 1); - tBlockDataDestroy(pBlockData2, 1); + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); } - ASSERT(pBlock->nRow == pBlockData->nRow); - ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0); - ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0); - - tFree(pBuf1); - tFree(pBuf2); return code; _err: - tsdbError("vgId:%d, tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf1); - tFree(pBuf2); + tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { +int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData) { int32_t code = 0; -#if 0 - - tBlockDataReset(pBlockData); - - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; - - // realloc - code = tRealloc(ppBuf1, pBlockL->szBlock); - if (code) goto _err; - // seek - int64_t n = taosLSeekFile(pReader->pLastFD, pBlockL->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pReader->pLastFD, *ppBuf1, pBlockL->szBlock); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pBlockL->szBlock) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // decode block col - SDiskDataHdr hdr; - SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); - uint8_t *p = *ppBuf1; - if (aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - code = tsdbReadBlockCol(p, pBlockL->szBlockCol, &hdr, aBlockCol); - if (code) goto _err; - p += pBlockL->szBlockCol + sizeof(TSCKSUM); - - // checksum - if (!taosCheckChecksumWhole(p, pBlockL->szUid + pBlockL->szVer + pBlockL->szTSKEY + sizeof(TSCKSUM))) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // UID - if (hdr.uid == 0) { - code = tsdbReadDataArray(p, pBlockL->szUid, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg, - (uint8_t **)&pBlockData->aUid, ppBuf2); - if (code) goto _err; - } else { - ASSERT(pBlockL->szUid == 0); - } - p += pBlockL->szUid; - - // VERSION - code = tsdbReadDataArray(p, pBlockL->szVer, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg, - (uint8_t **)&pBlockData->aVersion, ppBuf2); - if (code) goto _err; - p += pBlockL->szVer; - - // TSKEY - code = tsdbReadDataArray(p, pBlockL->szTSKEY, pBlockL->nRow, TSDB_DATA_TYPE_TIMESTAMP, pBlockL->cmprAlg, - (uint8_t **)&pBlockData->aTSKEY, ppBuf2); - if (code) goto _err; - p += pBlockL->szTSKEY; - p += sizeof(TSCKSUM); - - // COLUMN - code = tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid); + code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData); if (code) goto _err; - for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) { - SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol); - SColData *pColData; - - // checksum - if (!taosCheckChecksumWhole(p, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // add SColData - code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData); - if (code) goto _err; - tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn); - pColData->nVal = pBlockL->nRow; - pColData->flag = pBlockCol->flag; - - // bitmap - if (pBlockCol->szBitmap) { - code = tsdbReadDataArray(p, pBlockCol->szBitmap, ); - if (code) goto _err; - } - p += pBlockCol->szBitmap; - - // offset - if (pBlockCol->szOffset) { - code = tsdbReadDataArray(p, pBlockCol->szOffset, ); - if (code) goto _err; - } - p += pBlockCol->szOffset; - - // value - pColData->nData = pBlockCol->szOrigin; - if (pColData->nData) { - // TODO - } - } - - taosArrayDestroy(aBlockCol); - return code; _err: tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); -#endif return code; } -#endif // SDataFWriter ==================================================== int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e4ebc3e839..d0d354e467 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -949,13 +949,14 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) { int32_t size; ASSERT(pColDataSrc->nVal > 0); + ASSERT(pColDataDest->cid = pColDataSrc->cid); + ASSERT(pColDataDest->type = pColDataSrc->type); - pColDataDest->cid = pColDataSrc->cid; - pColDataDest->type = pColDataSrc->type; pColDataDest->smaOn = pColDataSrc->smaOn; pColDataDest->nVal = pColDataSrc->nVal; pColDataDest->flag = pColDataSrc->flag; + // bitmap if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) { size = BIT2_SIZE(pColDataSrc->nVal); code = tRealloc(&pColDataDest->pBitMap, size); @@ -963,6 +964,7 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) { memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size); } + // offset if (IS_VAR_DATA_TYPE(pColDataDest->type)) { size = sizeof(int32_t) * pColDataSrc->nVal; @@ -972,9 +974,10 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) { memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size); } + // value + pColDataDest->nData = pColDataSrc->nData; code = tRealloc(&pColDataDest->pData, pColDataSrc->nData); if (code) goto _exit; - pColDataDest->nData = pColDataSrc->nData; memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData); _exit: @@ -1169,32 +1172,25 @@ _exit: return code; } -int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) { +int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom) { int32_t code = 0; ASSERT(0); - ASSERT(suid || uid); + ASSERT(pBlockDataFrom->suid || pBlockDataFrom->uid); - pBlockData->suid = suid; - pBlockData->uid = uid; + pBlockData->suid = pBlockDataFrom->suid; + pBlockData->uid = pBlockDataFrom->uid; pBlockData->nRow = 0; taosArrayClear(pBlockData->aIdx); - if (aColId) { - int16_t lcid = -1; - for (int32_t iColId = 0; iColId < taosArrayGetSize(aColId); iColId++) { - int16_t cid = *(int16_t *)taosArrayGet(aColId, iColId); - - ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); - ASSERT(cid > lcid); - lcid = cid; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockDataFrom->aIdx); iColData++) { + SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColData); - SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColId, &pColData); - if (code) goto _exit; + SColData *pColData; + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _exit; - tColDataInit(pColData, cid, TSDB_DATA_TYPE_NULL, -1); - } + tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn); } _exit: @@ -1339,128 +1335,112 @@ _exit: int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { int32_t code = 0; - // set target - int32_t iColData1 = 0; - int32_t nColData1 = taosArrayGetSize(pBlockData1->aIdx); - int32_t iColData2 = 0; - int32_t nColData2 = taosArrayGetSize(pBlockData2->aIdx); - SColData *pColData1; - SColData *pColData2; - SColData *pColData; + ASSERT(pBlockData->suid == pBlockData1->suid); + ASSERT(pBlockData->uid == pBlockData1->uid); + ASSERT(pBlockData1->nRow > 0); + ASSERT(pBlockData2->nRow > 0); - tBlockDataReset(pBlockData); - while (iColData1 < nColData1 && iColData2 < nColData2) { - pColData1 = tBlockDataGetColDataByIdx(pBlockData1, iColData1); - pColData2 = tBlockDataGetColDataByIdx(pBlockData2, iColData2); + tBlockDataClear(pBlockData); - if (pColData1->cid == pColData2->cid) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); - if (code) goto _exit; - tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); + TSDBROW row1 = tsdbRowFromBlockData(pBlockData1, 0); + TSDBROW row2 = tsdbRowFromBlockData(pBlockData2, 0); + TSDBROW *pRow1 = &row1; + TSDBROW *pRow2 = &row2; - iColData1++; - iColData2++; - } else if (pColData1->cid < pColData2->cid) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + while (pRow1 && pRow2) { + int32_t c = tsdbRowCmprFn(pRow1, pRow2); + + if (c < 0) { + code = tBlockDataAppendRow(pBlockData, pRow1, NULL, + pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]); if (code) goto _exit; - tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn); - iColData1++; - } else { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + pRow1->iRow++; + if (pRow1->iRow < pBlockData1->nRow) { + *pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow); + } else { + pRow1 = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(pBlockData, pRow2, NULL, + pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]); if (code) goto _exit; - tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); - iColData2++; + pRow2->iRow++; + if (pRow2->iRow < pBlockData2->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow); + } else { + pRow2 = NULL; + } + } else { + ASSERT(0); } } - while (iColData1 < nColData1) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); - if (code) goto _exit; - tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn); - - iColData1++; - } - - while (iColData2 < nColData2) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + while (pRow1) { + code = tBlockDataAppendRow(pBlockData, pRow1, NULL, + pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]); if (code) goto _exit; - tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); - - iColData2++; - } - - // loop to merge - int32_t iRow1 = 0; - int32_t nRow1 = pBlockData1->nRow; - int32_t iRow2 = 0; - int32_t nRow2 = pBlockData2->nRow; - TSDBROW row1; - TSDBROW row2; - int32_t c; - while (iRow1 < nRow1 && iRow2 < nRow2) { - row1 = tsdbRowFromBlockData(pBlockData1, iRow1); - row2 = tsdbRowFromBlockData(pBlockData2, iRow2); - - c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2)); - if (c < 0) { - // code = tBlockDataAppendRow(pBlockData, &row1, NULL); - if (code) goto _exit; - iRow1++; - } else if (c > 0) { - // code = tBlockDataAppendRow(pBlockData, &row2, NULL); - if (code) goto _exit; - iRow2++; + pRow1->iRow++; + if (pRow1->iRow < pBlockData1->nRow) { + *pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow); } else { - ASSERT(0); + pRow1 = NULL; } } - while (iRow1 < nRow1) { - row1 = tsdbRowFromBlockData(pBlockData1, iRow1); - // code = tBlockDataAppendRow(pBlockData, &row1, NULL); + while (pRow2) { + code = tBlockDataAppendRow(pBlockData, pRow2, NULL, + pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]); if (code) goto _exit; - iRow1++; - } - while (iRow2 < nRow2) { - row2 = tsdbRowFromBlockData(pBlockData2, iRow2); - // code = tBlockDataAppendRow(pBlockData, &row2, NULL); - if (code) goto _exit; - iRow2++; + pRow2->iRow++; + if (pRow2->iRow < pBlockData2->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow); + } else { + pRow2 = NULL; + } } _exit: return code; } -int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) { - int32_t code = 0; - SColData *pColDataSrc; - SColData *pColDataDest; +int32_t tBlockDataCopy(SBlockData *pSrc, SBlockData *pDest) { + int32_t code = 0; + + tBlockDataClear(pDest); - ASSERT(pBlockDataSrc->nRow > 0); + ASSERT(pDest->suid == pSrc->suid); + ASSERT(pDest->uid == pSrc->uid); + ASSERT(pSrc->nRow == pDest->nRow); + ASSERT(taosArrayGetSize(pSrc->aIdx) == taosArrayGetSize(pDest->aIdx)); - tBlockDataReset(pBlockDataDest); + pDest->nRow = pSrc->nRow; - pBlockDataDest->nRow = pBlockDataSrc->nRow; - // TSDBKEY - code = tRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow); + if (pSrc->uid == 0) { + code = tRealloc((uint8_t **)&pDest->aUid, sizeof(int64_t) * pDest->nRow); + if (code) goto _exit; + memcpy(pDest->aUid, pSrc->aUid, sizeof(int64_t) * pDest->nRow); + } + + code = tRealloc((uint8_t **)&pDest->aVersion, sizeof(int64_t) * pDest->nRow); if (code) goto _exit; - code = tRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow); + memcpy(pDest->aVersion, pSrc->aVersion, sizeof(int64_t) * pDest->nRow); + + code = tRealloc((uint8_t **)&pDest->aTSKEY, sizeof(TSKEY) * pDest->nRow); if (code) goto _exit; - memcpy(pBlockDataDest->aVersion, pBlockDataSrc->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow); - memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow); + memcpy(pDest->aTSKEY, pSrc->aTSKEY, sizeof(TSKEY) * pDest->nRow); - // other - for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aIdx); iColData++) { - pColDataSrc = tBlockDataGetColDataByIdx(pBlockDataSrc, iColData); - code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest); - if (code) goto _exit; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pSrc->aIdx); iColData++) { + SColData *pColSrc = tBlockDataGetColDataByIdx(pSrc, iColData); + SColData *pColDest = tBlockDataGetColDataByIdx(pDest, iColData); + + ASSERT(pColSrc->cid == pColDest->cid); + ASSERT(pColSrc->type == pColDest->type); - code = tColDataCopy(pColDataSrc, pColDataDest); + code = tColDataCopy(pColSrc, pColDest); if (code) goto _exit; } -- GitLab