未验证 提交 29608b08 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11610 from taosdata/feature/3.0_liaohj

fix(query): avoid the output result overlap within the project operator buffer.
......@@ -196,7 +196,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
......
......@@ -203,6 +203,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
if (pSource->hasNull) {
pColumnInfoData->hasNull = pSource->hasNull;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
// Handle the bitmap
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
......@@ -1075,8 +1076,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
}
}
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (0 == numOfRows) {
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) {
if (0 == numOfRows || numOfRows <= existRows) {
return TSDB_CODE_SUCCESS;
}
......@@ -1087,19 +1088,16 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
}
pColumn->varmeta.offset = (int32_t*)tmp;
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
pColumn->varmeta.length = 0;
pColumn->varmeta.allocLen = 0;
taosMemoryFreeClear(pColumn->pData);
memset(&pColumn->varmeta.offset[existRows], 0, sizeof(int32_t) * (numOfRows - existRows));
} else {
char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t oldLen = BitmapLen(existRows);
pColumn->nullbitmap = tmp;
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
if (pColumn->info.type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
......@@ -1135,7 +1133,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = colInfoDataEnsureCapacity(p, numOfRows);
code = colInfoDataEnsureCapacity(p, pDataBlock->info.rows, numOfRows);
if (code) {
return code;
}
......@@ -1180,7 +1178,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows);
int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
......
......@@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq);
} else {
if (pIter->len >= pIter->totalLen) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
ASSERT(pIter->len > 0);
......
......@@ -141,7 +141,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) {
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
......
......@@ -392,7 +392,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......
......@@ -342,7 +342,7 @@ typedef struct STableScanInfo {
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
double sampleRatio; // data block sample ratio
double sampleRatio; // data block sample ratio, 1 by default
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
} STableScanInfo;
......@@ -395,7 +395,6 @@ typedef struct SOptrBasicInfo {
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
int32_t capacity; // TODO remove it
} SOptrBasicInfo;
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
......@@ -408,6 +407,12 @@ typedef struct SAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
......@@ -421,7 +426,7 @@ typedef struct STableIntervalOperatorInfo {
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
STimeWindowAggSupp twAggSup;
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -505,7 +510,6 @@ typedef struct SPartitionOperatorInfo {
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo;
......@@ -524,7 +528,7 @@ typedef struct SSessionAggOperatorInfo {
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
......@@ -541,7 +545,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t colIndex; // start row index
bool hasKey;
SStateKeys stateKey;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
STimeWindowAggSupp twAggSup;
// bool reptScan;
} SStateWindowOperatorInfo;
......@@ -602,7 +606,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
......@@ -638,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
......@@ -654,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
......
......@@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else {
pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info;
// the block type can not be changed in the streamscan operators
......
......@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -307,11 +307,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
// pInfo->binfo.rowCellInfoOffset);
// }
blockDataEnsureCapacity(pRes, pInfo->binfo.capacity);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) {
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......@@ -348,7 +348,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
goto _error;
}
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pOperator->name = "GroupbyAggOperator";
......
......@@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
......@@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code;
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (pBlockInfo->rows == 0) {
return NULL;
break;
}
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
......@@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return NULL;
}
......@@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows;
if (pBlockInfo->rows == 0) {
pOperator->status = OP_EXEC_DONE;
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
}
}
......
......@@ -767,17 +767,21 @@ void percentileFinalize(SqlFunctionCtx* pCtx) {
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes;
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
return true;
}
// TODO fix this
// This ordinary first function only handle the data block in ascending order
int32_t firstFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) {
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
if (pTsColInfo == NULL) {
return 0;
}
return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex);
}
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
int32_t firstFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -786,29 +790,75 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes;
// All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
return 0;
}
// Check for the first not null data
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
if (ts < startKey) {
return TSDB_CODE_SUCCESS;
}
}
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
memcpy(buf, data, pInputCol->info.bytes);
// TODO handle the subsidary value
// if (pCtx->ptsList != NULL) {
// TSKEY k = GET_TS_DATA(pCtx, i);
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
// }
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
}
pResInfo->complete = true;
numOfElems++;
break;
}
} else {
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
// all data needs to be check.
if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes);
if (ts < endKey) {
return TSDB_CODE_SUCCESS;
}
}
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
}
numOfElems++;
}
}
SET_VAL(pResInfo, numOfElems, 1);
......@@ -816,10 +866,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
}
int32_t lastFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order != TSDB_ORDER_DESC) {
return 0;
}
int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -829,7 +875,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
return 0;
}
......
......@@ -461,8 +461,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
return buildSyntaxErrMsg(pMsgBuf, "primary timestamp can not be null", pToken->z);
}
return func(pMsgBuf, NULL, 0, param);
......
......@@ -30,7 +30,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pColumnData);
......@@ -45,7 +45,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1);
colInfoDataEnsureCapacity(out->columnData, 0, 1);
int32_t code = vectorConvertImpl(&in, out);
sclFreeParam(&in);
......
......@@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
res->info.numOfCols++;
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
colInfoDataEnsureCapacity(pColumn, rowNum);
colInfoDataEnsureCapacity(pColumn, 0, rowNum);
for (int32_t i = 0; i < rowNum; ++i) {
colDataAppend(pColumn, i, (const char *)value, false);
......
......@@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
SColumnInfoData idata = {0};
idata.info = *colInfo;
colInfoDataEnsureCapacity(&idata, rows);
colInfoDataEnsureCapacity(&idata, 0, rows);
taosArrayPush(res->pDataBlock, &idata);
......@@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
res->info.numOfCols++;
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
colInfoDataEnsureCapacity(pColumn, rowNum);
colInfoDataEnsureCapacity(pColumn, 0, rowNum);
for (int32_t i = 0; i < rowNum; ++i) {
colDataAppend(pColumn, i, (const char *)value, false);
......@@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t
input->numOfRows = num;
input->columnData->info = createColumnInfo(0, type, bytes);
colInfoDataEnsureCapacity(input->columnData, num);
colInfoDataEnsureCapacity(input->columnData, 0, num);
if (setVal) {
for (int32_t i = 0; i < num; ++i) {
......
......@@ -31,8 +31,11 @@
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
#define FREE_HASH_NODE(_n) \
#define FREE_HASH_NODE(_fp, _n) \
do { \
/* if (_fp != NULL) { \
(_fp)(_n); \
}*/ \
taosMemoryFreeClear(_n); \
} while (0);
......@@ -195,7 +198,7 @@ static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SH
if (pNode->refCount <= 0) {
pNewNode->next = pNode->next;
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pHashObj->freeFp, pNode);
} else {
pNewNode->next = pNode;
pe->num++;
......@@ -523,7 +526,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pHashObj->freeFp, pNode);
}
} else {
prevNode = pNode;
......@@ -558,7 +561,7 @@ void taosHashClear(SHashObj *pHashObj) {
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pHashObj->freeFp, pNode);
pNode = pNext;
}
......@@ -769,7 +772,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pOld);
FREE_HASH_NODE(pHashObj->freeFp, pOld);
}
} else {
// uError("pNode:%p data:%p is not there!!!", pNode, p);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册