/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tsdb.h" extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); // STsdbDataIter2 ======================================== #define TSDB_MEM_TABLE_DATA_ITER 0 #define TSDB_DATA_FILE_DATA_ITER 1 #define TSDB_STT_FILE_DATA_ITER 2 #define TSDB_TOMB_FILE_DATA_ITER 3 typedef struct STsdbDataIter2 STsdbDataIter2; typedef struct STsdbFilterInfo STsdbFilterInfo; typedef struct { int64_t suid; int64_t uid; SDelData delData; } SDelInfo; struct STsdbDataIter2 { STsdbDataIter2* next; SRBTreeNode rbtn; int32_t type; SRowInfo rowInfo; SDelInfo delInfo; union { // TSDB_MEM_TABLE_DATA_ITER struct { SMemTable* pMemTable; } mIter; // TSDB_DATA_FILE_DATA_ITER struct { SDataFReader* pReader; SArray* aBlockIdx; // SArray SMapData mDataBlk; SBlockData bData; int32_t iBlockIdx; int32_t iDataBlk; int32_t iRow; } dIter; // TSDB_STT_FILE_DATA_ITER struct { SDataFReader* pReader; int32_t iStt; SArray* aSttBlk; SBlockData bData; int32_t iSttBlk; int32_t iRow; } sIter; // TSDB_TOMB_FILE_DATA_ITER struct { SDelFReader* pReader; SArray* aDelIdx; SArray* aDelData; int32_t iDelIdx; int32_t iDelData; } tIter; }; }; #define TSDB_FILTER_FLAG_BY_VERSION 0x1 struct STsdbFilterInfo { int32_t flag; int64_t sver; int64_t ever; }; #define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn))) /* open */ static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { int32_t code = 0; int32_t lino = 0; // create handle STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); if (pIter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pIter->type = TSDB_DATA_FILE_DATA_ITER; pIter->dIter.pReader = pReader; if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pIter->dIter.bData); TSDB_CHECK_CODE(code, lino, _exit); pIter->dIter.iBlockIdx = 0; pIter->dIter.iDataBlk = 0; pIter->dIter.iRow = 0; // read data code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear; _exit: if (code) { if (pIter) { _clear: tBlockDataDestroy(&pIter->dIter.bData, 1); taosArrayDestroy(pIter->dIter.aBlockIdx); taosMemoryFree(pIter); pIter = NULL; } } *ppIter = pIter; return code; } static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) { int32_t code = 0; int32_t lino = 0; // create handle STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); if (pIter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pIter->type = TSDB_STT_FILE_DATA_ITER; pIter->sIter.pReader = pReader; pIter->sIter.iStt = iStt; pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pIter->sIter.aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pIter->sIter.bData); TSDB_CHECK_CODE(code, lino, _exit); pIter->sIter.iSttBlk = 0; pIter->sIter.iRow = 0; // read data code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear; _exit: if (code) { if (pIter) { _clear: taosArrayDestroy(pIter->sIter.aSttBlk); tBlockDataDestroy(&pIter->sIter.bData, 1); taosMemoryFree(pIter); pIter = NULL; } } *ppIter = pIter; return code; } static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) { int32_t code = 0; int32_t lino = 0; STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); if (pIter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pIter->type = TSDB_TOMB_FILE_DATA_ITER; pIter->tIter.pReader = pReader; if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx); TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear; pIter->tIter.iDelIdx = 0; pIter->tIter.iDelData = 0; _exit: if (code) { if (pIter) { _clear: taosArrayDestroy(pIter->tIter.aDelIdx); taosArrayDestroy(pIter->tIter.aDelData); taosMemoryFree(pIter); pIter = NULL; } } *ppIter = pIter; return code; } /* close */ static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) { tBlockDataDestroy(&pIter->dIter.bData, 1); tMapDataClear(&pIter->dIter.mDataBlk); taosArrayDestroy(pIter->dIter.aBlockIdx); taosMemoryFree(pIter); } static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) { tBlockDataDestroy(&pIter->sIter.bData, 1); taosArrayDestroy(pIter->sIter.aSttBlk); taosMemoryFree(pIter); } static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) { taosArrayDestroy(pIter->tIter.aDelData); taosArrayDestroy(pIter->tIter.aDelIdx); taosMemoryFree(pIter); } static void tsdbCloseDataIter2(STsdbDataIter2* pIter) { if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { ASSERT(0); } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { tsdbCloseDataFileDataIter(pIter); } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { tsdbCloseSttFileDataIter(pIter); } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { tsdbCloseTombFileDataIter(pIter); } else { ASSERT(0); } } /* cmpr */ static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1); STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2); return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); } /* seek */ /* iter next */ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; int32_t lino = 0; for (;;) { while (pIter->dIter.iRow < pIter->dIter.bData.nRow) { if (pFilterInfo) { if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver || pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) { pIter->dIter.iRow++; continue; } } } pIter->rowInfo.suid = pIter->dIter.bData.suid; pIter->rowInfo.uid = pIter->dIter.bData.uid; pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); pIter->dIter.iRow++; goto _exit; } for (;;) { while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) { SDataBlk dataBlk; tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); // filter if (pFilterInfo) { if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) { pIter->dIter.iDataBlk++; continue; } } } code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData); TSDB_CHECK_CODE(code, lino, _exit); pIter->dIter.iDataBlk++; pIter->dIter.iRow = 0; break; } if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break; for (;;) { if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) { SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); TSDB_CHECK_CODE(code, lino, _exit); pIter->dIter.iBlockIdx++; pIter->dIter.iDataBlk = 0; break; } else { pIter->rowInfo = (SRowInfo){0}; goto _exit; } } } } _exit: if (code) { tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; int32_t lino = 0; for (;;) { while (pIter->sIter.iRow < pIter->sIter.bData.nRow) { if (pFilterInfo) { if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] || pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) { pIter->sIter.iRow++; continue; } } } pIter->rowInfo.suid = pIter->sIter.bData.suid; pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow]; pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow); pIter->sIter.iRow++; goto _exit; } for (;;) { if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) { SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk); if (pFilterInfo) { if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) { pIter->sIter.iSttBlk++; continue; } } } code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData); TSDB_CHECK_CODE(code, lino, _exit); pIter->sIter.iRow = 0; pIter->sIter.iSttBlk++; break; } else { pIter->rowInfo = (SRowInfo){0}; goto _exit; } } } _exit: if (code) { tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; int32_t lino = 0; for (;;) { while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) { SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData); if (pFilterInfo) { if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) { pIter->tIter.iDelData++; continue; } } } pIter->delInfo.delData = *pDelData; pIter->tIter.iDelData++; goto _exit; } for (;;) { if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) { SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx); code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); TSDB_CHECK_CODE(code, lino, _exit); pIter->delInfo.suid = pDelIdx->suid; pIter->delInfo.uid = pDelIdx->uid; pIter->tIter.iDelData = 0; pIter->tIter.iDelIdx++; break; } else { pIter->delInfo = (SDelInfo){0}; goto _exit; } } } _exit: if (code) { tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { ASSERT(0); return code; } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { return tsdbDataFileDataIterNext(pIter, pFilterInfo); } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { return tsdbSttFileDataIterNext(pIter, pFilterInfo); } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { return tsdbTombFileDataIterNext(pIter, pFilterInfo); } else { ASSERT(0); return code; } } /* get */ // STsdbSnapReader ======================================== struct STsdbSnapReader { STsdb* pTsdb; int64_t sver; int64_t ever; int8_t type; uint8_t* aBuf[5]; STsdbFS fs; TABLEID tbid; SSkmInfo skmTable; // timeseries data int8_t dataDone; int32_t fid; SDataFReader* pDataFReader; STsdbDataIter2* iterList; STsdbDataIter2* pIter; SRBTree rbt; SBlockData bData; // tombstone data int8_t delDone; SDelFReader* pDelFReader; STsdbDataIter2* pTIter; SArray* aDelData; }; static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) { int32_t code = 0; int32_t lino = 0; SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT); if (pSet == NULL) { pReader->fid = INT32_MAX; goto _exit; } pReader->fid = pSet->fid; tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn); code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pIter) { // iter to next with filter info (sver, ever) code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag .sver = pReader->sver, .ever = pReader->ever}); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { // add to rbtree tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); // add to iterList pReader->pIter->next = pReader->iterList; pReader->iterList = pReader->pIter; } else { tsdbCloseDataIter2(pReader->pIter); } } for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pIter) { // iter to valid row code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag .sver = pReader->sver, .ever = pReader->ever}); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { // add to rbtree tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); // add to iterList pReader->pIter->next = pReader->iterList; pReader->iterList = pReader->pIter; } else { tsdbCloseDataIter2(pReader->pIter); } } } pReader->pIter = NULL; _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid); } return code; } static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) { while (pReader->iterList) { STsdbDataIter2* pIter = pReader->iterList; pReader->iterList = pIter->next; tsdbCloseDataIter2(pIter); } tsdbDataFReaderClose(&pReader->pDataFReader); } static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { int32_t code = 0; int32_t lino = 0; if (pReader->pIter) { code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag .sver = pReader->sver, .ever = pReader->ever}); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) { pReader->pIter = NULL; } else { SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); if (pNode) { int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode); if (c > 0) { tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); pReader->pIter = NULL; } else if (c == 0) { ASSERT(0); } } } } if (pReader->pIter == NULL) { SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); if (pNode) { tRBTreeDrop(&pReader->rbt, pNode); pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); } } if (ppRowInfo) { if (pReader->pIter) { *ppRowInfo = &pReader->pIter->rowInfo; } else { *ppRowInfo = NULL; } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { if (pReader->pIter) { *ppRowInfo = &pReader->pIter->rowInfo; return 0; } return tsdbSnapReadNextRow(pReader, ppRowInfo); } static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; ASSERT(pReader->bData.nRow); int32_t aBufN[5] = {0}; code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN); if (code) goto _exit; int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData; pHdr->type = SNAP_DATA_TSDB; pHdr->size = size; memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]); memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]); if (aBufN[1]) { memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]); } if (aBufN[0]) { memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]); } _exit: return code; } static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pReader->pTsdb; tBlockDataReset(&pReader->bData); for (;;) { // start a new file read if need if (pReader->pDataFReader == NULL) { code = tsdbSnapReadFileDataStart(pReader); TSDB_CHECK_CODE(code, lino, _exit); } if (pReader->pDataFReader == NULL) break; SRowInfo* pRowInfo; code = tsdbSnapReadGetRow(pReader, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); if (pRowInfo == NULL) { tsdbSnapReadFileDataEnd(pReader); continue; } code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable); TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); do { if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break; if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) { code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1)); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) { pReader->bData.aUid[iRow] = pReader->bData.uid; } pReader->bData.uid = 0; } code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbSnapReadNextRow(pReader, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->bData.nRow >= 4096) break; } while (pRowInfo); ASSERT(pReader->bData.nRow > 0); break; } if (pReader->bData.nRow > 0) { code = tsdbSnapCmprData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; int64_t size = sizeof(TABLEID); for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData)); } uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size); if (pData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; pHdr->type = SNAP_DATA_DEL; pHdr->size = size; TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr)); *pId = pReader->tbid; size = sizeof(SSnapDataHdr) + sizeof(TABLEID); for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData)); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); if (pData) { taosMemoryFree(pData); pData = NULL; } } *ppData = pData; return code; } static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) { *ppDelInfo = NULL; } else { *ppDelInfo = &pReader->pTIter->delInfo; } } static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { int32_t code = 0; int32_t lino = 0; code = tsdbDataIterNext2( pReader->pTIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, .sver = pReader->sver, .ever = pReader->ever}); TSDB_CHECK_CODE(code, lino, _exit); if (ppDelInfo) { tsdbSnapReadGetTombData(pReader, ppDelInfo); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pReader->pTsdb; // open tombstone data iter if need if (pReader->pDelFReader == NULL) { if (pReader->fs.pDelFile == NULL) goto _exit; // open code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter); TSDB_CHECK_CODE(code, lino, _exit); if (pReader->pTIter) { code = tsdbSnapReadNextTombData(pReader, NULL); TSDB_CHECK_CODE(code, lino, _exit); } } // loop to get tombstone data SDelInfo* pDelInfo; tsdbSnapReadGetTombData(pReader, &pDelInfo); if (pDelInfo == NULL) goto _exit; pReader->tbid = *(TABLEID*)pDelInfo; if (pReader->aDelData) { taosArrayClear(pReader->aDelData); } else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapReadNextTombData(pReader, &pDelInfo); TSDB_CHECK_CODE(code, lino, _exit); } // encode tombstone data if (taosArrayGetSize(pReader->aDelData) > 0) { code = tsdbSnapCmprTombData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) { int32_t code = 0; int32_t lino = 0; // alloc STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pReader->pTsdb = pTsdb; pReader->sver = sver; pReader->ever = ever; pReader->type = type; taosThreadRwlockRdlock(&pTsdb->rwLock); code = tsdbFSRef(pTsdb, &pReader->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } taosThreadRwlockUnlock(&pTsdb->rwLock); // init pReader->fid = INT32_MIN; code = tBlockDataCreate(&pReader->bData); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), sver, ever, type); if (pReader) { tBlockDataDestroy(&pReader->bData, 1); tsdbFSUnref(pTsdb, &pReader->fs); taosMemoryFree(pReader); pReader = NULL; } } else { tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever, type); } *ppReader = pReader; return code; } int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { int32_t code = 0; int32_t lino = 0; STsdbSnapReader* pReader = *ppReader; STsdb* pTsdb = pReader->pTsdb; // tombstone if (pReader->pTIter) { tsdbCloseDataIter2(pReader->pTIter); pReader->pTIter = NULL; } if (pReader->pDelFReader) { tsdbDelFReaderClose(&pReader->pDelFReader); } taosArrayDestroy(pReader->aDelData); // timeseries while (pReader->iterList) { STsdbDataIter2* pIter = pReader->iterList; pReader->iterList = pIter->next; tsdbCloseDataIter2(pIter); } if (pReader->pDataFReader) { tsdbDataFReaderClose(&pReader->pDataFReader); } tBlockDataDestroy(&pReader->bData, 1); // other tDestroyTSchema(pReader->skmTable.pTSchema); tsdbFSUnref(pReader->pTsdb, &pReader->fs); for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) { tFree(pReader->aBuf[iBuf]); } taosMemoryFree(pReader); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } *ppReader = NULL; return code; } int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; *ppData = NULL; // read data file if (!pReader->dataDone) { code = tsdbSnapReadTimeSeriesData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); if (*ppData) { goto _exit; } else { pReader->dataDone = 1; } } // read del file if (!pReader->delDone) { code = tsdbSnapReadTombData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); if (*ppData) { goto _exit; } else { pReader->delDone = 1; } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__); } return code; } // STsdbSnapWriter ======================================== struct STsdbSnapWriter { STsdb* pTsdb; int64_t sver; int64_t ever; int32_t minutes; int8_t precision; int32_t minRow; int32_t maxRow; int8_t cmprAlg; int64_t commitID; uint8_t* aBuf[5]; STsdbFS fs; TABLEID tbid; // time-series data SBlockData inData; int32_t fid; SSkmInfo skmTable; /* reader */ SDataFReader* pDataFReader; STsdbDataIter2* iterList; STsdbDataIter2* pDIter; STsdbDataIter2* pSIter; SRBTree rbt; // SRBTree /* writer */ SDataFWriter* pDataFWriter; SArray* aBlockIdx; SMapData mDataBlk; // SMapData SArray* aSttBlk; // SArray SBlockData bData; SBlockData sData; // tombstone data /* reader */ SDelFReader* pDelFReader; STsdbDataIter2* pTIter; /* writer */ SDelFWriter* pDelFWriter; SArray* aDelIdx; SArray* aDelData; }; // SNAP_DATA_TSDB static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; int32_t lino = 0; if (pId) { pWriter->tbid = *pId; } else { pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX}; } if (pWriter->pDIter) { STsdbDataIter2* pIter = pWriter->pDIter; // assert last table data end ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow); ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem); for (;;) { if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) { pWriter->pDIter = NULL; break; } SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid); if (c < 0) { code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); TSDB_CHECK_CODE(code, lino, _exit); SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); if (pNewBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pNewBlockIdx->suid = pBlockIdx->suid; pNewBlockIdx->uid = pBlockIdx->uid; code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); pIter->dIter.iBlockIdx++; } else if (c == 0) { code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); TSDB_CHECK_CODE(code, lino, _exit); pIter->dIter.iDataBlk = 0; pIter->dIter.iBlockIdx++; break; } else { pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem; break; } } } if (pId) { code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); TSDB_CHECK_CODE(code, lino, _exit); tMapDataReset(&pWriter->mDataBlk); code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } if (!TABLE_SAME_SCHEMA(pWriter->tbid.suid, pWriter->tbid.uid, pWriter->sData.suid, pWriter->sData.uid)) { if ((pWriter->sData.nRow > 0)) { code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } if (pId) { TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid}; code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pWriter->tbid.suid, pWriter->tbid.uid); } return code; } static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) { int32_t code = 0; int32_t lino = 0; code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->bData.nRow >= pWriter->maxRow) { code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { int32_t code = 0; int32_t lino = 0; TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX; if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { goto _write_row; } else { for (;;) { while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) { TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow); int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row)); if (c < 0) { goto _write_row; } else if (c > 0) { code = tsdbSnapWriteTableRowImpl(pWriter, &row); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pDIter->dIter.iRow++; } else { ASSERT(0); } } for (;;) { if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row; // FIXME: Here can be slow, use array instead SDataBlk dataBlk; tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey}); if (c > 0) { goto _write_row; } else if (c < 0) { if (pWriter->bData.nRow > 0) { code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); pWriter->pDIter->dIter.iDataBlk++; } else { code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pDIter->dIter.iRow = 0; pWriter->pDIter->dIter.iDataBlk++; break; } } } } _write_row: if (pRow) { code = tsdbSnapWriteTableRowImpl(pWriter, pRow); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; // write a NULL row to end current table data write code = tsdbSnapWriteTableRow(pWriter, NULL); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->bData.nRow > 0) { if (pWriter->bData.nRow < pWriter->minRow) { ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid)); for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) { code = tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->sData.nRow >= pWriter->maxRow) { code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } } tBlockDataClear(&pWriter->bData); } else { code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } } if (pWriter->mDataBlk.nItem) { SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); if (pBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pBlockIdx->suid = pWriter->tbid.suid; pBlockIdx->uid = pWriter->tbid.uid; code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) { int32_t code = 0; int32_t lino = 0; ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid); STsdb* pTsdb = pWriter->pTsdb; pWriter->fid = fid; pWriter->tbid = (TABLEID){0}; SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); // open reader pWriter->pDataFReader = NULL; pWriter->iterList = NULL; pWriter->pDIter = NULL; pWriter->pSIter = NULL; tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn); if (pSet) { code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pDIter) { pWriter->pDIter->next = pWriter->iterList; pWriter->iterList = pWriter->pDIter; } for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pSIter) { code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL); TSDB_CHECK_CODE(code, lino, _exit); // add to tree tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); // add to list pWriter->pSIter->next = pWriter->iterList; pWriter->iterList = pWriter->pSIter; } } pWriter->pSIter = NULL; } // open writer SDiskID diskId; if (pSet) { diskId = pSet->diskId; } else { tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId); } SDFileSet wSet = {.diskId = diskId, .fid = fid, .pHeadF = &(SHeadFile){.commitID = pWriter->commitID}, .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID}, .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID}, .nSttF = 1, .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}}; code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->aBlockIdx) { taosArrayClear(pWriter->aBlockIdx); } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } tMapDataReset(&pWriter->mDataBlk); if (pWriter->aSttBlk) { taosArrayClear(pWriter->aSttBlk); } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } tBlockDataReset(&pWriter->bData); tBlockDataReset(&pWriter->sData); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), fid); } else { tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid); } return code; } static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) { int32_t code = 0; int32_t lino = 0; // switch to new table if need if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) { if (pWriter->tbid.uid) { code = tsdbSnapWriteTableDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); } if (pRowInfo == NULL) goto _exit; code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { int32_t code = 0; int32_t lino = 0; if (pWriter->pSIter) { code = tsdbDataIterNext2(pWriter->pSIter, NULL); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) { pWriter->pSIter = NULL; } else { SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); if (pNode) { int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode); if (c > 0) { tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); pWriter->pSIter = NULL; } else if (c == 0) { ASSERT(0); } } } } if (pWriter->pSIter == NULL) { SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); if (pNode) { tRBTreeDrop(&pWriter->rbt, pNode); pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode); } } if (ppRowInfo) { if (pWriter->pSIter) { *ppRowInfo = &pWriter->pSIter->rowInfo; } else { *ppRowInfo = NULL; } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { int32_t code = 0; int32_t lino = 0; if (pWriter->pSIter) { *ppRowInfo = &pWriter->pSIter->rowInfo; goto _exit; } code = tsdbSnapWriteNextRow(pWriter, ppRowInfo); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; ASSERT(pWriter->pDataFWriter); // consume remain data and end with a NULL table row SRowInfo* pRowInfo; code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); for (;;) { code = tsdbSnapWriteTableData(pWriter, pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); if (pRowInfo == NULL) break; code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); } // do file-level updates code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pDataFReader) { code = tsdbDataFReaderClose(&pWriter->pDataFReader); TSDB_CHECK_CODE(code, lino, _exit); } // clear sources while (pWriter->iterList) { STsdbDataIter2* pIter = pWriter->iterList; pWriter->iterList = pIter->next; tsdbCloseDataIter2(pIter); } _exit: if (code) { tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); } else { tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__); } return code; } static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { int32_t code = 0; int32_t lino = 0; code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf); TSDB_CHECK_CODE(code, lino, _exit); ASSERT(pWriter->inData.nRow > 0); // switch to new data file if need int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision); if (pWriter->fid != fid) { if (pWriter->pDataFWriter) { code = tsdbSnapWriteFileDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapWriteFileDataStart(pWriter, fid); TSDB_CHECK_CODE(code, lino, _exit); } // loop write each row SRowInfo* pRowInfo; code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) { SRowInfo rInfo = {.suid = pWriter->inData.suid, .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow], .row = tsdbRowFromBlockData(&pWriter->inData, iRow)}; for (;;) { if (pRowInfo == NULL) { code = tsdbSnapWriteTableData(pWriter, &rInfo); TSDB_CHECK_CODE(code, lino, _exit); break; } else { int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo); if (c < 0) { code = tsdbSnapWriteTableData(pWriter, &rInfo); TSDB_CHECK_CODE(code, lino, _exit); break; } else if (c > 0) { code = tsdbSnapWriteTableData(pWriter, pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); } else { ASSERT(0); } } } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__, pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow); } return code; } // SNAP_DATA_DEL static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; int32_t lino = 0; if (pId) { pWriter->tbid = *pId; } else { pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}; } taosArrayClear(pWriter->aDelData); if (pWriter->pTIter) { while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) { SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx); int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid); if (c < 0) { code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData); TSDB_CHECK_CODE(code, lino, _exit); SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1); if (pDelIdxNew == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pDelIdxNew->suid = pDelIdx->suid; pDelIdxNew->uid = pDelIdx->uid; code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pTIter->tIter.iDelIdx++; } else if (c == 0) { code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pTIter->tIter.iDelIdx++; break; } else { break; } } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid, pId->uid); } return code; } static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; if (taosArrayGetSize(pWriter->aDelData) > 0) { SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1); if (pDelIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pDelIdx->suid = pWriter->tbid.suid; pDelIdx->uid = pWriter->tbid.uid; code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); } return code; } static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) { int32_t code = 0; int32_t lino = 0; if (pId == NULL || pId->uid != pWriter->tbid.uid) { if (pWriter->tbid.uid) { code = tsdbSnapWriteDelTableDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapWriteDelTableDataStart(pWriter, pId); TSDB_CHECK_CODE(code, lino, _exit); } if (pId == NULL) goto _exit; int64_t n = 0; while (n < size) { SDelData delData; n += tGetDelData(pData + n, &delData); if (taosArrayPush(pWriter->aDelData, &delData) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } } ASSERT(n == size); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pWriter->pTsdb; SDelFile* pDelFile = pWriter->fs.pDelFile; pWriter->tbid = (TABLEID){0}; // reader if (pDelFile) { code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter); TSDB_CHECK_CODE(code, lino, _exit); } // writer code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb); TSDB_CHECK_CODE(code, lino, _exit); if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pWriter->pTsdb; // end remaining table with NULL data code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); // update file-level info code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pDelFReader) { code = tsdbDelFReaderClose(&pWriter->pDelFReader); TSDB_CHECK_CODE(code, lino, _exit); } if (pWriter->pTIter) { tsdbCloseDataIter2(pWriter->pTIter); pWriter->pTIter = NULL; } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pWriter->pTsdb; // start to write del data if need if (pWriter->pDelFWriter == NULL) { code = tsdbSnapWriteDelDataStart(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } // do write del data code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID), pHdr->size - sizeof(TABLEID)); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code)); } else { tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } // APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; int32_t lino = 0; // alloc STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } pWriter->pTsdb = pTsdb; pWriter->sver = sver; pWriter->ever = ever; pWriter->minutes = pTsdb->keepCfg.days; pWriter->precision = pTsdb->keepCfg.precision; pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->commitID = pTsdb->pVnode->state.commitID; code = tsdbFSCopy(pTsdb, &pWriter->fs); TSDB_CHECK_CODE(code, lino, _exit); // SNAP_DATA_TSDB code = tBlockDataCreate(&pWriter->inData); TSDB_CHECK_CODE(code, lino, _exit); pWriter->fid = INT32_MIN; code = tBlockDataCreate(&pWriter->bData); TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataCreate(&pWriter->sData); TSDB_CHECK_CODE(code, lino, _exit); // SNAP_DATA_DEL _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); if (pWriter) { tBlockDataDestroy(&pWriter->sData, 1); tBlockDataDestroy(&pWriter->bData, 1); tBlockDataDestroy(&pWriter->inData, 1); tsdbFSDestroy(&pWriter->fs); pWriter = NULL; } } else { tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever); } *ppWriter = pWriter; return code; } int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; if (pWriter->pDataFWriter) { code = tsdbSnapWriteFileDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } if (pWriter->pDelFWriter) { code = tsdbSnapWriteDelDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); } return code; } int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; int32_t lino = 0; STsdbSnapWriter* pWriter = *ppWriter; STsdb* pTsdb = pWriter->pTsdb; if (rollback) { tsdbRollbackCommit(pWriter->pTsdb); } else { // lock taosThreadRwlockWrlock(&pTsdb->rwLock); code = tsdbFSCommit(pWriter->pTsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); } // SNAP_DATA_DEL taosArrayDestroy(pWriter->aDelData); taosArrayDestroy(pWriter->aDelIdx); // SNAP_DATA_TSDB tBlockDataDestroy(&pWriter->sData, 1); tBlockDataDestroy(&pWriter->bData, 1); taosArrayDestroy(pWriter->aSttBlk); tMapDataClear(&pWriter->mDataBlk); taosArrayDestroy(pWriter->aBlockIdx); tDestroyTSchema(pWriter->skmTable.pTSchema); tBlockDataDestroy(&pWriter->inData, 1); for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); } tsdbFSDestroy(&pWriter->fs); taosMemoryFree(pWriter); *ppWriter = NULL; _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { int32_t code = 0; int32_t lino = 0; if (pHdr->type == SNAP_DATA_TSDB) { code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr); TSDB_CHECK_CODE(code, lino, _exit); goto _exit; } else if (pWriter->pDataFWriter) { code = tsdbSnapWriteFileDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } if (pHdr->type == SNAP_DATA_DEL) { code = tsdbSnapWriteDelData(pWriter, pHdr); TSDB_CHECK_CODE(code, lino, _exit); goto _exit; } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size); } else { tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pHdr->type, pHdr->index, pHdr->size); } return code; }