提交 33c3f34f 编写于 作者: H Hongze Cheng

more work

上级 b56aedff
......@@ -137,8 +137,10 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
......@@ -262,17 +264,6 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl
int32_t nColId);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId,
int32_t nColId);
#if 0
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2);
#endif
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
......
......@@ -450,9 +450,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
// tBlockDataClear(&state->blockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
if (state->pBlockData) {
tBlockDataClear(state->pBlockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
......@@ -494,7 +494,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (!state->pBlockData) {
state->pBlockData = &state->blockData;
tBlockDataInit(&state->blockData);
tBlockDataCreate(&state->blockData);
}
}
case SFSNEXTROW_BLOCKDATA:
......@@ -552,8 +552,8 @@ _err:
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
......@@ -579,8 +579,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
......
......@@ -1270,7 +1270,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
goto _exit;
}
code = tBlockDataInit(&pCommitter->dReader.bData);
code = tBlockDataCreate(&pCommitter->dReader.bData);
if (code) goto _exit;
pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL));
......@@ -1279,7 +1279,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
goto _exit;
}
code = tBlockDataInit(&pCommitter->dReader.bDatal);
code = tBlockDataCreate(&pCommitter->dReader.bDatal);
if (code) goto _exit;
// Writer
......@@ -1295,10 +1295,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
goto _exit;
}
code = tBlockDataInit(&pCommitter->dWriter.bData);
code = tBlockDataCreate(&pCommitter->dWriter.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->dWriter.bDatal);
code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
if (code) goto _exit;
_exit:
......@@ -1309,16 +1309,16 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
// Reader
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataClear(&pCommitter->dReader.bData, 1);
tBlockDataDestroy(&pCommitter->dReader.bData, 1);
taosArrayDestroy(pCommitter->dReader.aBlockL);
tBlockDataClear(&pCommitter->dReader.bDatal, 1);
tBlockDataDestroy(&pCommitter->dReader.bDatal, 1);
// Writer
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
taosArrayDestroy(pCommitter->dWriter.aBlockL);
tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataClear(&pCommitter->dWriter.bData, 1);
tBlockDataClear(&pCommitter->dWriter.bDatal, 1);
tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
}
......
......@@ -438,7 +438,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
code = tBlockDataInit(&pReader->status.fileBlockData);
code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _end;
......@@ -2670,7 +2670,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
}
taosMemoryFree(pSupInfo->buildBuf);
tBlockDataClear(&pReader->status.fileBlockData, true);
tBlockDataDestroy(&pReader->status.fileBlockData, true);
cleanupDataBlockIterator(&pReader->status.blockIter);
......@@ -2874,7 +2874,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
tBlockDataClearData(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData, 1);
tBlockDataDestroy(&pStatus->fileBlockData, 1);
terrno = code;
return NULL;
......
......@@ -846,63 +846,6 @@ _err:
}
#if 0
static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0;
#if 0
int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
int64_t n;
if (!taosCheckChecksumWhole(pBuf, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow);
if (code) goto _err;
code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->szVersion == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->szTSKEY == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION
memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion);
// TSKEY
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tRealloc(ppBuf, size);
if (code) goto _err;
}
// VERSION
n = tsDecompressBigint(pBuf, pSubBlock->szVersion, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// TSKEY
n = tsDecompressTimestamp(pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY, pSubBlock->nRow,
(char *)pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf,
size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
}
return code;
_err:
#endif
return code;
}
static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf,
uint8_t **ppBuf) {
int32_t code = 0;
......@@ -998,80 +941,6 @@ _err:
return code;
}
#if 0
static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr *pHdr, SArray *aBlockCol) {
int32_t code = 0;
int32_t n = 0;
SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol;
// checksum
if (!taosCheckChecksumWhole(pBuf, szBlockCol + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// hdr
*pHdr = *(SDiskDataHdr *)pBuf;
n += sizeof(SDiskDataHdr);
// aBlockCol
while (n < szBlockCol) {
n += tGetBlockCol(pBuf + n, pBlockCol);
if (taosArrayPush(aBlockCol, pBlockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n == szBlockCol);
return code;
_err:
return code;
}
#endif
static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg,
uint8_t **ppOut, uint8_t **ppBuf) {
int32_t code = 0;
int32_t size;
// size
if (IS_VAR_DATA_TYPE(type)) {
size = nEle;
} else {
size = tDataTypes[type].bytes * nEle;
}
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// decode
if (cmprAlg == NO_COMPRESSION) {
ASSERT(szInput == size);
memcpy(*ppOut, pInput, size);
} else {
if (cmprAlg == TWO_STAGE_COMP) {
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _exit;
int32_t n =
tDataTypes[type].decompFunc(pInput, szInput, nEle, *ppOut, size, cmprAlg, *ppBuf, size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _exit;
}
}
}
_exit:
return code;
}
static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
......@@ -1210,29 +1079,29 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
SBlockData *pBlockData1 = &(SBlockData){0};
SBlockData *pBlockData2 = &(SBlockData){0};
tBlockDataInit(pBlockData1);
tBlockDataInit(pBlockData2);
tBlockDataCreate(pBlockData1);
tBlockDataCreate(pBlockData2);
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
if (code) goto _err;
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
}
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
}
tFree(pBuf1);
......@@ -1349,34 +1218,34 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl
SBlockData *pBlockData1 = &(SBlockData){0};
SBlockData *pBlockData2 = &(SBlockData){0};
tBlockDataInit(pBlockData1);
tBlockDataInit(pBlockData2);
tBlockDataCreate(pBlockData1);
tBlockDataCreate(pBlockData2);
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) {
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
// merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
}
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
}
ASSERT(pBlock->nRow == pBlockData->nRow);
......
......@@ -289,9 +289,9 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
goto _err;
}
pReader->mBlock = tMapDataInit();
code = tBlockDataInit(&pReader->oBlockData);
code = tBlockDataCreate(&pReader->oBlockData);
if (code) goto _err;
code = tBlockDataInit(&pReader->nBlockData);
code = tBlockDataCreate(&pReader->nBlockData);
if (code) goto _err;
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
......@@ -327,8 +327,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
}
taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock);
tBlockDataClear(&pReader->oBlockData, 1);
tBlockDataClear(&pReader->nBlockData, 1);
tBlockDataDestroy(&pReader->oBlockData, 1);
tBlockDataDestroy(&pReader->nBlockData, 1);
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
......@@ -1123,7 +1123,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file
code = tBlockDataInit(&pWriter->bData);
code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
......@@ -1131,7 +1131,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataR);
code = tBlockDataCreate(&pWriter->bDataR);
if (code) goto _err;
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
......@@ -1139,7 +1139,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataW);
code = tBlockDataCreate(&pWriter->bDataW);
if (code) goto _err;
// for del file
......
......@@ -1107,7 +1107,7 @@ static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
}
// SBlockData ======================================================
int32_t tBlockDataInit(SBlockData *pBlockData) {
int32_t tBlockDataCreate(SBlockData *pBlockData) {
int32_t code = 0;
pBlockData->suid = 0;
......@@ -1132,7 +1132,7 @@ _exit:
return code;
}
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aUid);
tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册