提交 228cb6f9 编写于 作者: H hjxilinx

[td-98] fix compData buffer overflow

上级 90e6fb39
...@@ -14,10 +14,10 @@ typedef struct SDataStatis { ...@@ -14,10 +14,10 @@ typedef struct SDataStatis {
int16_t numOfNull; int16_t numOfNull;
} SDataStatis; } SDataStatis;
typedef struct SColumnInfoEx { typedef struct SColumnInfoData {
SColumnInfo info; SColumnInfo info;
void* pData; // the corresponding block data in memory void* pData; // the corresponding block data in memory
} SColumnInfoEx; } SColumnInfoData;
void extractTableName(const char *tableId, char *name); void extractTableName(const char *tableId, char *name);
......
...@@ -89,7 +89,7 @@ typedef struct SColumnFilterElem { ...@@ -89,7 +89,7 @@ typedef struct SColumnFilterElem {
} SColumnFilterElem; } SColumnFilterElem;
typedef struct SSingleColumnFilterInfo { typedef struct SSingleColumnFilterInfo {
SColumnInfoEx info; SColumnInfoData info;
int32_t numOfFilters; int32_t numOfFilters;
SColumnFilterElem* pFilters; SColumnFilterElem* pFilters;
void* pData; void* pData;
...@@ -130,7 +130,7 @@ typedef struct SQuery { ...@@ -130,7 +130,7 @@ typedef struct SQuery {
int32_t rowSize; int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr; SSqlGroupbyExpr* pGroupbyExpr;
SSqlFunctionExpr* pSelectExpr; SSqlFunctionExpr* pSelectExpr;
SColumnInfoEx* colList; SColumnInfoData* colList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* defaultVal; int64_t* defaultVal;
TSKEY lastKey; TSKEY lastKey;
......
...@@ -866,7 +866,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 ...@@ -866,7 +866,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
int32_t numOfCols = taosArrayGetSize(pDataBlock); int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx *p = taosArrayGet(pDataBlock, i); SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pCol->colId == p->info.colId) { if (pCol->colId == p->info.colId) {
dataBlock = p->pData; dataBlock = p->pData;
break; break;
...@@ -894,7 +894,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati ...@@ -894,7 +894,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SColumnInfoEx *pColInfo = NULL; SColumnInfoData *pColInfo = NULL;
TSKEY * primaryKeyCol = NULL; TSKEY * primaryKeyCol = NULL;
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
...@@ -2223,7 +2223,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi ...@@ -2223,7 +2223,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].resBytes * realRowId; pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
} }
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
...@@ -2292,11 +2292,12 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -2292,11 +2292,12 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
STsdbQueryCond cond = {0}; STsdbQueryCond cond = {
cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey},
cond.order = pQuery->order.order; .order = pQuery->order.order,
.colList = pQuery->colList,
cond.colList = *pQuery->colList; };
SArray *sa = taosArrayInit(1, POINTER_BYTES); SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
...@@ -2699,7 +2700,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, vo ...@@ -2699,7 +2700,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, vo
} }
// set the join tag for first column // set the join tag for first column
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1); assert(pFuncMsg->numOfParams == 1);
...@@ -3472,8 +3472,8 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { ...@@ -3472,8 +3472,8 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
bool toContinue = false;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
...@@ -3567,8 +3567,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3567,8 +3567,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan(pRuntimeEnv); doSingleMeterSupplementScan(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
// supplementary scan
pQuery->lastKey = lkey; pQuery->lastKey = lkey;
pQuery->window.ekey = ekey; pQuery->window.ekey = ekey;
...@@ -3577,7 +3576,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3577,7 +3576,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle); // tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
} }
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
...@@ -4170,7 +4169,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) ...@@ -4170,7 +4169,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
STsdbQueryCond cond = { STsdbQueryCond cond = {
.twindow = pQuery->window, .twindow = pQuery->window,
.order = pQuery->order.order, .order = pQuery->order.order,
.colList = *pQuery->colList, .colList = pQuery->colList,
}; };
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
...@@ -4450,7 +4449,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i ...@@ -4450,7 +4449,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i
scanAllDataBlocks(pRuntimeEnv); scanAllDataBlocks(pRuntimeEnv);
// first/last_row query, do not invoke the finalize for super table query // first/last_row query, do not invoke the finalize for super table query
doFinalizeResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
int64_t numOfRes = getNumOfResult(pRuntimeEnv); int64_t numOfRes = getNumOfResult(pRuntimeEnv);
assert(numOfRes == 1 || numOfRes == 0); assert(numOfRes == 1 || numOfRes == 0);
...@@ -4690,7 +4689,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4690,7 +4689,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
* Only the ts-comp query requires the finalizer function to be executed here. * Only the ts-comp query requires the finalizer function to be executed here.
*/ */
if (isTSCompQuery(pQuery)) { if (isTSCompQuery(pQuery)) {
doFinalizeResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
} }
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
...@@ -4896,12 +4895,12 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4896,12 +4895,12 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column; * select count(*) from table_name group by status_column;
*/ */
static void tableFixedOutputProcessor(SQInfo *pQInfo) { static void tableFixedOutputProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
scanAllDataBlocks(pRuntimeEnv); scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
return; return;
...@@ -4917,8 +4916,6 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { ...@@ -4917,8 +4916,6 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
doSkipResults(pRuntimeEnv); doSkipResults(pRuntimeEnv);
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQuery->rec.rows = pQuery->rec.rows;
} }
static void tableMultiOutputProcess(SQInfo *pQInfo) { static void tableMultiOutputProcess(SQInfo *pQInfo) {
...@@ -4932,7 +4929,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { ...@@ -4932,7 +4929,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
while (1) { while (1) {
scanAllDataBlocks(pRuntimeEnv); scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
return; return;
...@@ -4983,7 +4980,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4983,7 +4980,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
} }
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED)); assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
doFinalizeResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation // here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query // todo handle offset, in case of top/bottom interval query
...@@ -5004,7 +5001,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5004,7 +5001,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
} }
// handle time interval query on table // handle time interval query on table
static void tableIntervalProcessor(SQInfo *pQInfo) { static void tableIntervalProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
...@@ -5076,8 +5073,6 @@ static void tableQueryImpl(SQInfo* pQInfo) { ...@@ -5076,8 +5073,6 @@ static void tableQueryImpl(SQInfo* pQInfo) {
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfInterpo;
pQuery->rec.rows += pQuery->rec.rows;
dTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); dTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
...@@ -5119,15 +5114,14 @@ static void tableQueryImpl(SQInfo* pQInfo) { ...@@ -5119,15 +5114,14 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
tableIntervalProcessor(pQInfo); tableIntervalProcess(pQInfo);
} else if (isFixedOutputQuery(pQuery)) { } else if (isFixedOutputQuery(pQuery)) {
tableFixedOutputProcessor(pQInfo); tableFixedOutputProcess(pQInfo);
} else { // diff/add/multiply/subtract/division } else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1); assert(pQuery->checkBuffer == 1);
tableMultiOutputProcess(pQInfo); tableMultiOutputProcess(pQInfo);
} }
// record the total elapsed time // record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQInfo->elapsedTime += (taosGetTimestampUs() - st);
assert(taosArrayGetSize(pQInfo->pTableIdList) == 1); assert(taosArrayGetSize(pQInfo->pTableIdList) == 1);
...@@ -5593,7 +5587,7 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { ...@@ -5593,7 +5587,7 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
if (pQuery->colList[i].info.numOfFilters > 0) { if (pQuery->colList[i].info.numOfFilters > 0) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j]; SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j];
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoEx)); memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
pFilterInfo->info.info.filters = NULL; pFilterInfo->info.info.filters = NULL;
pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters; pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters;
......
...@@ -188,7 +188,7 @@ typedef void* tsdb_query_handle_t; // Use void to hide implementation details ...@@ -188,7 +188,7 @@ typedef void* tsdb_query_handle_t; // Use void to hide implementation details
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc/asc order to iterate the data block int32_t order; // desc/asc order to iterate the data block
SColumnInfoEx colList; SColumnInfoData* colList;
} STsdbQueryCond; } STsdbQueryCond;
typedef struct SBlockInfo { typedef struct SBlockInfo {
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) #define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC) #define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) #define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
...@@ -65,7 +65,10 @@ typedef struct STableCheckInfo { ...@@ -65,7 +65,10 @@ typedef struct STableCheckInfo {
int64_t offsetInHeaderFile; int64_t offsetInHeaderFile;
int32_t start; int32_t start;
bool checkFirstFileBlock; bool checkFirstFileBlock;
SCompInfo* pCompInfo; SCompInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols; SDataCols* pDataCols;
...@@ -106,7 +109,7 @@ typedef struct STsdbQueryHandle { ...@@ -106,7 +109,7 @@ typedef struct STsdbQueryHandle {
SCompBlock* pBlock; SCompBlock* pBlock;
int32_t numOfBlocks; int32_t numOfBlocks;
SField** pFields; SField** pFields;
SArray* pColumns; // column list, SColumnInfoEx array list SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart; bool locateStart;
int32_t realNumOfRows; int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek. bool loadDataAfterSeek; // load data after seek.
...@@ -127,7 +130,7 @@ typedef struct STsdbQueryHandle { ...@@ -127,7 +130,7 @@ typedef struct STsdbQueryHandle {
int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) { int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) {
// record the maximum column width among columns of this meter/metric // record the maximum column width among columns of this meter/metric
SColumnInfoEx* pColumn = taosArrayGet(pQueryHandle->pColumns, 0); SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
int32_t maxColWidth = pColumn->info.bytes; int32_t maxColWidth = pColumn->info.bytes;
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) { for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
...@@ -177,7 +180,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond ...@@ -177,7 +180,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
.lastKey = pQueryHandle->window.skey, .lastKey = pQueryHandle->window.skey,
.tableId = id, .tableId = id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed
.pCompInfo = calloc(1, 1024), .pCompInfo = NULL,
}; };
assert(info.pTableObj != NULL); assert(info.pTableObj != NULL);
...@@ -193,10 +196,10 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond ...@@ -193,10 +196,10 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
int32_t numOfCols = taosArrayGetSize(pColumnInfo); int32_t numOfCols = taosArrayGetSize(pColumnInfo);
size_t bufferCapacity = 4096; size_t bufferCapacity = 4096;
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoEx)); pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pCol = taosArrayGet(pColumnInfo, i); SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i);
SColumnInfoEx pDest = {{0}, 0}; SColumnInfoData pDest = {{0}, 0};
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes); pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes);
pDest.info = pCol->info; pDest.info = pCol->info;
...@@ -261,6 +264,15 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -261,6 +264,15 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
} else { } else {
if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
pCheckInfo->pCompInfo = (SCompInfo*) t;
}
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
int32_t index = 0; int32_t index = 0;
...@@ -589,7 +601,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf ...@@ -589,7 +601,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
int16_t colId = *(int16_t*)taosArrayGet(sa, i); int16_t colId = *(int16_t*)taosArrayGet(sa, i);
for (int32_t j = 0; j < numOfCols; ++j) { for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoEx* pCol = taosArrayGet(pQueryHandle->pColumns, j); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
if (pCol->info.colId == colId) { if (pCol->info.colId == colId) {
SDataCol* pDataCol = &pCols->cols[i]; SDataCol* pDataCol = &pCols->cols[i];
...@@ -610,7 +622,7 @@ static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) { ...@@ -610,7 +622,7 @@ static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t)); SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
taosArrayPush(pIdList, &pCol->info.colId); taosArrayPush(pIdList, &pCol->info.colId);
} }
...@@ -1067,7 +1079,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max ...@@ -1067,7 +1079,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
offset += pColInfo->info.bytes; offset += pColInfo->info.bytes;
} }
...@@ -1102,7 +1114,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { ...@@ -1102,7 +1114,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
/* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the /* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the
* block next function * block next function
*/ */
SColumnInfoEx* pColInfoEx = taosArrayGet(pHandle->pColumns, 0); SColumnInfoData* pColInfoEx = taosArrayGet(pHandle->pColumns, 0);
rows = pHandle->realNumOfRows; rows = pHandle->realNumOfRows;
skey = *(TSKEY*)pColInfoEx->pData; skey = *(TSKEY*)pColInfoEx->pData;
...@@ -1141,7 +1153,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t* pQueryHandle, SData ...@@ -1141,7 +1153,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t* pQueryHandle, SData
SArray* tsdbRetrieveDataBlock(tsdb_query_handle_t* pQueryHandle, SArray* pIdList) { SArray* tsdbRetrieveDataBlock(tsdb_query_handle_t* pQueryHandle, SArray* pIdList) {
/** /**
* In the following two cases, the data has been loaded to SColumnInfoEx. * In the following two cases, the data has been loaded to SColumnInfoData.
* 1. data is from cache, 2. data block is not completed qualified to query time range * 1. data is from cache, 2. data block is not completed qualified to query time range
*/ */
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
...@@ -1484,7 +1496,7 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { ...@@ -1484,7 +1496,7 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
size_t cols = taosArrayGetSize(pQueryHandle->pColumns); size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for (int32_t i = 0; i < cols; ++i) { for (int32_t i = 0; i < cols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData); tfree(pColInfo->pData);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册