提交 75aca0b0 编写于 作者: H Haojun Liao

fix(query): set correct fill output column index, fix some memory leak, and...

fix(query): set correct fill output column index,  fix some memory leak, and do some internal refactor,
上级 6fd4684a
......@@ -162,7 +162,6 @@ typedef struct SQueryTableDataCond {
int64_t endVersion;
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
......@@ -170,19 +169,6 @@ int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
colDataDestroy(pColInfoData);
}
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFreeClear(pBlock->pBlockAgg);
}
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SColumn {
......
......@@ -226,8 +226,12 @@ int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock();
void* blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
......
......@@ -340,12 +340,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
......
......@@ -1197,15 +1197,28 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
return TSDB_CODE_SUCCESS;
}
void blockDataFreeRes(SSDataBlock* pBlock) {
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
colDataDestroy(pColInfoData);
}
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFreeClear(pBlock->pBlockAgg);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}
void* blockDataDestroy(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
blockDestroyInner(pBlock);
blockDataFreeRes(pBlock);
taosMemoryFreeClear(pBlock);
return NULL;
}
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL);
......
......@@ -319,7 +319,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
return 0;
FAIL:
tDeleteSSDataBlock(pBlock);
blockDataFreeRes(pBlock);
return -1;
}
......
......@@ -209,10 +209,10 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
p->iterInit = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
tsdbTbDataIterDestroy(p->iter.iter);
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
taosArrayDestroy(p->delSkyline);
p->delSkyline = taosArrayDestroy(p->delSkyline);
}
}
......@@ -224,18 +224,15 @@ static void destroyBlockScanInfo(SHashObj* pTableMap) {
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
tsdbTbDataIterDestroy(p->iter.iter);
p->iter.iter = NULL;
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
if (p->iiter.iter != NULL) {
tsdbTbDataIterDestroy(p->iiter.iter);
p->iiter.iter = NULL;
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
}
taosArrayDestroy(p->delSkyline);
taosArrayDestroy(p->pBlockList);
p->delSkyline = NULL;
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
}
taosHashCleanup(pTableMap);
......
......@@ -1193,8 +1193,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
taosArrayDestroy(pBlock->pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
// currently only the tbname pseudo column
......@@ -1202,12 +1200,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes((SSDataBlock*) pBlock);
longjmp(pTaskInfo->env, code);
}
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*) pBlock);
return 0;
}
......
......@@ -71,13 +71,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
SColumnInfoData* pCol0 = taosArrayGet(pBlock->pDataBlock, pFillInfo->tsSlotId);
char* val = colDataGetData(pCol0, index);
// set the primary timestamp value
*(TSKEY*)val = pFillInfo->currentKey;
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
// set the other values
if (pFillInfo->type == TSDB_FILL_PREV) {
......@@ -92,7 +87,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstColInfoData, index, (const char*)&ts, false);
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
} else {
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstColInfoData, index, pKey);
......@@ -101,41 +96,51 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
// todo refactor: start from 0 not 1
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
doSetVal(pDstColInfoData, index, pKey);
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
} else {
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstColInfoData, index, pKey);
}
}
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (outOfBound) {
setNullRow(pBlock, pFillInfo->currentKey, index);
} else {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
int16_t type = pCol->pExpr->base.resSchema.type;
int16_t type = pDstCol->info.type;
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
continue;
}
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
int64_t prevTs = *(int64_t*)pKey1->pData;
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
int64_t prevTs = *(int64_t*)pKey1->pData;
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index);
......@@ -153,7 +158,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
setNullRow(pBlock, pFillInfo->currentKey, index);
} else { // fill with user specified value for each column
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
......@@ -176,6 +181,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else { // varchar/nchar data
colDataAppendNULL(pDst, index);
}
}
}
......@@ -234,8 +241,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
pFillInfo->numOfCurrent = 0;
// todo make sure the first column is always the primary timestamp column?
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->tsSlotId);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
......
......@@ -296,8 +296,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
tDeleteSSDataBlock(&call->block);
tDeleteSSDataBlock(&subRsp->resultData);
blockDataFreeRes(&call->block);
blockDataFreeRes(&subRsp->resultData);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
......@@ -305,7 +305,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
tDeleteSSDataBlock(&call->block);
blockDataFreeRes(&call->block);
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
......
......@@ -116,7 +116,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
......
......@@ -313,7 +313,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
......
......@@ -106,7 +106,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qDebug("stream task %d exec end", pTask->taskId);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return NULL;
}
......@@ -121,7 +121,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return NULL;
}
......@@ -155,7 +155,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
qDebug("stream exec, return result");
return 0;
......@@ -163,7 +163,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
continue;
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
} else {
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册