未验证 提交 35e234f1 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14187 from taosdata/feature/stream

fix(stream): memory leak
......@@ -71,8 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && \
(*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data)))
#define IS_JSON_NULL(type, data) \
((type) == TSDB_DATA_TYPE_JSON && (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data)))
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (!pColumnInfoData->hasNull) {
......@@ -186,7 +186,8 @@ int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, uint32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
......@@ -222,6 +223,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
......
......@@ -344,10 +344,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
......
......@@ -263,7 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
pColumnInfoData->varmeta.length = len + oldLen;
} else {
if (finalNumOfRows > *capacity) {
ASSERT(finalNumOfRows*pColumnInfoData->info.bytes);
ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
if (tmp == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
......@@ -293,7 +293,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
return numOfRow1 + numOfRow2;
}
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo) {
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo) {
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
if (numOfRows <= 0) {
return numOfRows;
......@@ -327,9 +328,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
return taosArrayGetSize(pBlock->pDataBlock);
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
......@@ -396,7 +395,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
int32_t pageSize) {
ASSERT(pBlock != NULL && stopIndex != NULL);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfRows = pBlock->info.rows;
int32_t bitmapChar = 1;
......@@ -512,7 +511,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
// write the number of rows
*(uint32_t*)buf = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfRows = pBlock->info.rows;
char* pStart = buf + sizeof(uint32_t);
......@@ -542,7 +541,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
pBlock->info.rows = *(int32_t*)buf;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
const char* pStart = buf + sizeof(uint32_t);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -734,7 +733,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
int32_t tupleIndex) {
int32_t code = 0;
size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = &pDstCols[i];
......@@ -794,7 +793,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
int32_t rows = pDataBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
if (pCols == NULL) {
......@@ -902,7 +901,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
} else { // var data type
}
} else if (numOfCols == 2) {
}
}
......@@ -1103,14 +1101,14 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
}
}
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *pBlockInfo, uint32_t numOfRows) {
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) {
ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
if (numOfRows < pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
}
// todo temp disable it
// ASSERT(pColumn->info.bytes != 0);
// ASSERT(pColumn->info.bytes != 0);
int32_t existedRows = pBlockInfo->rows;
......@@ -1141,7 +1139,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
pColumn->pData = tmp;
}
......@@ -1197,6 +1195,40 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
taosMemoryFreeClear(pBlock);
return NULL;
}
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
ASSERT(src != NULL);
dst->info = src->info;
dst->info.rows = 0;
dst->info.capacity = 0;
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
blockDataAppendColInfo(dst, &colInfo);
}
int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.capacity = src->info.rows;
return 0;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) {
......@@ -1272,7 +1304,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
}
// todo disable it temporarily
// ASSERT(pColInfoData->info.type != 0);
// ASSERT(pColInfoData->info.type != 0);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pBlock->info.hasVarCol = true;
}
......@@ -1284,7 +1316,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
SColumnInfoData col = {.hasNull = true};
col.info.colId = colId;
col.info.type = type;
col.info.type = type;
col.info.bytes = bytes;
return col;
......@@ -1552,9 +1584,9 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
......@@ -1633,8 +1665,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
......
......@@ -66,6 +66,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp
if (qStreamScanSnapshot(task) < 0) {
ASSERT(0);
}
// set version
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
......
......@@ -44,6 +44,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
taosMemoryFree(fname);
if (pFile != NULL) {
STqOffsetHead head = {0};
int64_t code;
......@@ -77,7 +78,6 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
taosCloseFile(&pFile);
taosMemoryFree(fname);
}
return pStore;
}
......@@ -102,6 +102,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// TODO file name should be with a version
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosMemoryFree(fname);
if (pFile == NULL) {
ASSERT(0);
return -1;
......@@ -140,6 +141,5 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
}
// close and rename file
taosCloseFile(&pFile);
taosMemoryFree(fname);
return 0;
}
......@@ -151,6 +151,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema);
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
......@@ -161,6 +162,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
}
// this interface use suid instead of uid
if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
......@@ -184,7 +186,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
while (colMeta < pSchemaWrapper->nCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
......@@ -207,7 +209,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
colNeed++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
......@@ -251,8 +253,8 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
}
return 0;
FAIL: // todo refactor here
// if (*ppCols) taosArrayDestroy(*ppCols);
FAIL: // todo refactor here
// if (*ppCols) taosArrayDestroy(*ppCols);
return -1;
}
......
......@@ -336,7 +336,6 @@ typedef struct SStreamBlockScanInfo {
int32_t numOfPseudoExpr;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode;
......
......@@ -1341,7 +1341,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows;
int32_t totalRows = pBlock->info.rows;
SSDataBlock* px = createOneDataBlock(pBlock, true);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
......@@ -3954,7 +3954,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
}
int32_t len = (int32_t)(pStart - (char*)keyBuf);
int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len);
......@@ -3980,8 +3980,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL && terrno != 0) {
pTaskInfo->code = terrno;
return NULL;
......@@ -4007,16 +4006,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
SOperatorInfo* pOperator =
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
......@@ -4031,20 +4034,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->initTsdbReader) {
// for stream
ASSERT(pHandle->vnode);
pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
} else {
// for tq
ASSERT(pHandle->meta);
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
}
}
#if 0
if (pDataReader == NULL && terrno != 0) {
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
// return NULL;
} else {
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
#endif
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
......@@ -4057,9 +4062,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
......@@ -4070,6 +4077,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
......@@ -4343,7 +4351,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error;
}
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
cleanupQueryTableDataCond(&cond);
return pReader;
......@@ -4487,18 +4495,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList);
(*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
goto _complete;
}
if ((*pTaskInfo)->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}
return code;
_complete:
......
......@@ -967,9 +967,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock(pInfo->streamBlockReader)) {
SSDataBlock block = {0};
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
// todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows);
......@@ -1022,6 +1022,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
// TODO refactor @liao
taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pOperator->status = OP_EXEC_DONE;
......@@ -1061,12 +1064,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult) {
return pResult->info.rows > 0 ? pResult : NULL;
}
return NULL;
return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else {
ASSERT(0);
return NULL;
......@@ -1161,7 +1163,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0;
......@@ -1343,7 +1344,8 @@ static SSDataBlock* buildSysTableMetaBlock() {
SSDataBlock* pBlock = createDataBlock();
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
SColumnInfoData colInfoData =
createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
blockDataAppendColInfo(pBlock, &colInfoData);
}
......
......@@ -34,7 +34,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
......
......@@ -172,7 +172,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
int32_t numOfCols = (int32_t) taosArrayGetSize(pBlock->pDataBlock);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0;
......@@ -185,7 +185,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(data->blocks);
......@@ -307,6 +307,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
return 0;
......
......@@ -53,9 +53,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
// TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true);
outputCopy->info.childId = pTask->selfChildId;
taosArrayPush(pRes, outputCopy);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
/*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/
/*outputCopy->info.childId = pTask->selfChildId;*/
/*taosArrayPush(pRes, outputCopy);*/
}
return 0;
}
......@@ -68,6 +72,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
streamTaskExecImpl(pTask, data, pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
return NULL;
}
......@@ -82,25 +87,25 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(qRes);
return NULL;
}
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock));
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
}
return pRes;
......@@ -125,14 +130,14 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return 0;
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
return 0;
} else {
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册