diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7a6735291bea625806c627ba6902d852d04f0884..5045c341d6161c140a1182d521618a1a6623dc44 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -447,13 +447,11 @@ typedef struct { int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); -// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); void tsdbDestroyHelper(SRWHelper *pHelper); void tsdbResetHelper(SRWHelper *pHelper); // --------- For set operations int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); -// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema); void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo); int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 888ee069ae40a0e98e4dbe0990aef359fc2a515c..d1ee5113fd3a8b94900087904663daa7b0ff2f4b 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -131,6 +131,11 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t // Init block part if (tsdbInitHelperBlock(pHelper) < 0) goto _err; + pHelper->blockBuffer = + tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + + pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); + if (pHelper->blockBuffer == NULL) goto _err; + return 0; _err: @@ -149,6 +154,8 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { void tsdbDestroyHelper(SRWHelper *pHelper) { if (pHelper) { + tzfree(pHelper->blockBuffer); + tzfree(pHelper->compBuffer); tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperTable(pHelper); tsdbDestroyHelperBlock(pHelper); @@ -563,6 +570,8 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 void *pStart = NULL; if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints; + } else { + pStart = pDataCol->pData; } // TODO: get rid of INT32_MAX here pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfPoints, pStart, @@ -597,8 +606,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - pHelper->blockBuffer = trealloc(pHelper->blockBuffer, pCompBlock->len); - if (pHelper->blockBuffer == NULL) return -1; + ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len); SCompData *pCompData = (SCompData *)pHelper->blockBuffer; @@ -627,9 +635,13 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa SCompCol *pCompCol = &(pCompData->cols[ccol]); if (pCompCol->colId == pDataCol->colId) { + if (pCompBlock->algorithm == TWO_STAGE_COMP) { + pHelper->compBuffer = trealloc(pHelper->compBuffer, pCompCol->len + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) goto _err; + } if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, - pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, pHelper->compBuffer, - tsizeof(pHelper->compBuffer)) < 0) + pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, + pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) goto _err; dcol++; ccol++; @@ -731,6 +743,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa // TODO: compresee the data if (pHelper->config.compress) { + if (pHelper->config.compress == TWO_STAGE_COMP) { + pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) goto _err; + } + pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( (char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer));