提交 035d5157 编写于 作者: X xywang

[TS-1029]<fix>(query): added some memory allocation checks

上级 3b2a37fe
...@@ -1861,6 +1861,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { ...@@ -1861,6 +1861,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self);
if (pQueryInfo->pQInfo == NULL) {
taosHashCleanup(tableGroupInfo.map);
taosArrayDestroy(&group);
tscAsyncResultOnError(pSql);
pRes->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pRes->code;
}
} }
uint64_t localQueryId = pSql->self; uint64_t localQueryId = pSql->self;
......
...@@ -3902,8 +3902,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr ...@@ -3902,8 +3902,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
STsBufInfo bufInfo = {0}; STsBufInfo bufInfo = {0};
SQueryParam param = {.pOperator = pa}; SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger); int32_t code = initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger);
taosArrayDestroy(&pa); taosArrayDestroy(&pa);
if (code != TSDB_CODE_SUCCESS) {
goto _cleanup;
}
return pQInfo; return pQInfo;
......
...@@ -340,9 +340,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -340,9 +340,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
const static int32_t minSize = 8; const static int32_t minSize = 8;
SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput; if (res == NULL) {
qError("failed to allocate for output buffer");
goto _clean;
}
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
if (res->pDataBlock == NULL) {
qError("failed to init arrary for data block of output buffer");
goto _clean;
}
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
idata.info.type = pExpr[i].base.resType; idata.info.type = pExpr[i].base.resType;
...@@ -351,10 +359,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -351,10 +359,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
int32_t size = MAX(idata.info.bytes * numOfRows, minSize); int32_t size = MAX(idata.info.bytes * numOfRows, minSize);
idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform
if (idata.pData == NULL) {
qError("failed to allocate column buffer for output buffer");
goto _clean;
}
taosArrayPush(res->pDataBlock, &idata); taosArrayPush(res->pDataBlock, &idata);
res->info.numOfCols++;
} }
return res; return res;
_clean:
destroyOutputBuf(res);
return NULL;
} }
void* destroyOutputBuf(SSDataBlock* pBlock) { void* destroyOutputBuf(SSDataBlock* pBlock) {
...@@ -1816,7 +1834,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1816,7 +1834,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// denote the order type // denote the order type
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) { if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
// if param[2] is set value, input data come from client, order is no relation with pQueryAttr->order , so always return true // if param[2] is set value, input data come from client, order is no relation with pQueryAttr->order, so always return true
if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT) if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT)
return true; return true;
return pCtx->param[0].i64 == pQueryAttr->order.order; return pCtx->param[0].i64 == pQueryAttr->order.order;
...@@ -2072,17 +2090,26 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2072,17 +2090,26 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
switch (*op) { switch (*op) {
case OP_TagScan: { case OP_TagScan: {
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_MultiTableTimeInterval: { case OP_MultiTableTimeInterval: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2092,6 +2119,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2092,6 +2119,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_TimeEvery: { case OP_TimeEvery: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeEveryOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createTimeEveryOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2101,7 +2131,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2101,7 +2131,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Groupby: { case OP_Groupby: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2111,6 +2143,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2111,6 +2143,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_SessionWindow: { case OP_SessionWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2120,13 +2155,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2120,13 +2155,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiTableAggregate: { case OP_MultiTableAggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_Aggregate: { case OP_Aggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2146,11 +2186,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2146,11 +2186,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
assert(pQueryAttr->pExpr2 != NULL); assert(pQueryAttr->pExpr2 != NULL);
pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
} }
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_StateWindow: { case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2160,6 +2207,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2160,6 +2207,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Limit: { case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2171,12 +2221,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2171,12 +2221,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
} else { } else {
SColumnInfo* pColInfo = SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
} }
break; break;
...@@ -2185,11 +2241,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2185,11 +2241,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Fill: { case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot; SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_MultiwayMergeSort: { case OP_MultiwayMergeSort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 200, merger); // TD-10899 pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 200, merger); // TD-10899
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2201,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2201,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2208,11 +2273,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2208,11 +2273,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
int32_t num = pRuntimeEnv->proot->numOfOutput; int32_t num = pRuntimeEnv->proot->numOfOutput;
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_Distinct: { case OP_Distinct: {
pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2224,6 +2295,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2224,6 +2295,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, &pQueryAttr->order); pQueryAttr->numOfOutput, &pQueryAttr->order);
} }
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -4824,18 +4898,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4824,18 +4898,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
switch(tbScanner) { switch(tbScanner) {
case OP_TableBlockInfoScan: { case OP_TableBlockInfoScan: {
pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_TableSeqScan: { case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_DataBlocksOptScan: { case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_TableScan: { case OP_TableScan: {
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
default: { // do nothing default: { // do nothing
...@@ -5152,6 +5238,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5152,6 +5238,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
...@@ -5159,6 +5249,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5159,6 +5249,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
pInfo->current = 0; pInfo->current = 0;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo);
return NULL;
}
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5173,6 +5268,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5173,6 +5268,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = 1; pInfo->times = 1;
...@@ -5183,6 +5281,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5183,6 +5281,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pRuntimeEnv->enableGroupData = true; pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo);
return NULL;
}
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan; pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5197,9 +5300,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5197,9 +5300,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
if (pInfo->block.pDataBlock == NULL) {
goto _clean;
}
SColumnInfoData infoData = {{0}}; SColumnInfoData infoData = {{0}};
infoData.info.type = TSDB_DATA_TYPE_BINARY; infoData.info.type = TSDB_DATA_TYPE_BINARY;
...@@ -5208,6 +5317,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu ...@@ -5208,6 +5317,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
taosArrayPush(pInfo->block.pDataBlock, &infoData); taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
taosArrayDestroy(&pInfo->block.pDataBlock);
goto _clean;
}
pOperator->name = "TableBlockInfoScanOperator"; pOperator->name = "TableBlockInfoScanOperator";
pOperator->operatorType = OP_TableBlockInfoScan; pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5218,6 +5332,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu ...@@ -5218,6 +5332,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
pOperator->exec = doBlockInfoScan; pOperator->exec = doBlockInfoScan;
return pOperator; return pOperator;
_clean:
tfree(pInfo);
return NULL;
} }
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
...@@ -5285,6 +5404,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5285,6 +5404,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
...@@ -5296,6 +5419,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5296,6 +5419,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
} }
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
if (pOptr == NULL) {
tfree(pInfo);
return NULL;
}
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan; pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->pRuntimeEnv = pRuntimeEnv;
...@@ -5317,6 +5445,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { ...@@ -5317,6 +5445,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
} }
if (pOrderColumns == NULL) {
return NULL;
}
if (pQuery->interval.interval > 0) { if (pQuery->interval.interval > 0) {
if (pOrderColumns == NULL) { if (pOrderColumns == NULL) {
pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); pOrderColumns = taosArrayInit(1, sizeof(SColIndex));
...@@ -5356,7 +5488,11 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { ...@@ -5356,7 +5488,11 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
} }
for(int32_t i = 0; i < numOfCols; ++i) { if (pOrderColumns == NULL) {
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex* index = taosArrayGet(pOrderColumns, i); SColIndex* index = taosArrayGet(pOrderColumns, i);
bool found = false; bool found = false;
...@@ -5384,21 +5520,45 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5384,21 +5520,45 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param; SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param;
destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput);
if (pInfo->orderColumnList) {
taosArrayDestroy(&pInfo->orderColumnList); taosArrayDestroy(&pInfo->orderColumnList);
}
if (pInfo->groupColumnList) {
taosArrayDestroy(&pInfo->groupColumnList); taosArrayDestroy(&pInfo->groupColumnList);
}
if (pInfo->prevRow) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
}
if (pInfo->currentGroupColData) {
tfree(pInfo->currentGroupColData); tfree(pInfo->currentGroupColData);
}
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
if (pInfo->orderColumnList) {
taosArrayDestroy(&pInfo->orderColumnList); taosArrayDestroy(&pInfo->orderColumnList);
}
if (pInfo->pRes) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
if (pInfo->prevRow) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
}
} }
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->resultRowFactor = pInfo->resultRowFactor =
(int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
...@@ -5414,6 +5574,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5414,6 +5574,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->orderColumnList == NULL || pInfo->groupColumnList == NULL) {
goto _clean;
}
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
...@@ -5433,6 +5597,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5433,6 +5597,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0; numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0;
pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
if (pInfo->currentGroupColData == NULL) {
goto _clean;
}
offset = POINTER_BYTES * numOfCols; offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
...@@ -5443,11 +5611,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5443,11 +5611,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
pInfo->seed = rand(); pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
return NULL;
}
pOperator->name = "GlobalAggregate"; pOperator->name = "GlobalAggregate";
pOperator->operatorType = OP_GlobalAggregate; pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -5462,17 +5637,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5462,17 +5637,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyGlobalAggOperatorInfo((void *) pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger) { int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pMerge = merger; pInfo->pMerge = merger;
pInfo->bufCapacity = numOfRows; pInfo->bufCapacity = numOfRows;
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
if (pInfo->orderColumnList == NULL || pInfo->binfo.pRes == NULL) {
goto _clean;
}
{ // todo extract method to create prev compare buffer { // todo extract method to create prev compare buffer
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
...@@ -5492,6 +5680,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5492,6 +5680,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "MultiwaySortOperator"; pOperator->name = "MultiwaySortOperator";
pOperator->operatorType = OP_MultiwayMergeSort; pOperator->operatorType = OP_MultiwayMergeSort;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5503,6 +5695,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5503,6 +5695,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pOperator->exec = doMultiwayMergeSort; pOperator->exec = doMultiwayMergeSort;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
return pOperator; return pOperator;
_clean:
destroyGlobalAggOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
...@@ -5579,11 +5777,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5579,11 +5777,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
{ {
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
if (pDataBlock == NULL) {
goto _clean;
}
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) { if (pDataBlock->pDataBlock == NULL) {
goto _clean;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData col = {{0}}; SColumnInfoData col = {{0}};
col.info.colId = pExpr[i].base.colInfo.colId; col.info.colId = pExpr[i].base.colInfo.colId;
col.info.bytes = pExpr[i].base.resBytes; col.info.bytes = pExpr[i].base.resBytes;
...@@ -5601,6 +5810,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5601,6 +5810,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "InMemoryOrder"; pOperator->name = "InMemoryOrder";
pOperator->operatorType = OP_Order; pOperator->operatorType = OP_Order;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -5612,6 +5825,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5612,6 +5825,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyOrderOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
...@@ -5891,8 +6110,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5891,8 +6110,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
} } else if(srows > 0) {
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) { if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
...@@ -6982,6 +7200,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -6982,6 +7200,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
...@@ -6991,10 +7212,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -6991,10 +7212,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
pInfo->seed = rand(); pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = OP_Aggregate; pOperator->operatorType = OP_Aggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7009,31 +7238,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -7009,31 +7238,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyAggOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
assert(pInfo != NULL); assert(pInfo != NULL);
if (pInfo->pCtx) {
destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); destroySQLFunctionCtx(pInfo->pCtx, numOfOutput);
}
if (pInfo->rowCellInfoOffset) {
tfree(pInfo->rowCellInfoOffset); tfree(pInfo->rowCellInfoOffset);
}
if (pInfo->resultRowInfo.pResult) {
cleanupResultRowInfo(&pInfo->resultRowInfo); cleanupResultRowInfo(&pInfo->resultRowInfo);
}
if (pInfo->pRes) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
} }
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput); doDestroyBasicInfo(pInfo, numOfOutput);
} }
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
if (pInfo->prevData) {
tfree(pInfo->prevData); tfree(pInfo->prevData);
}
} }
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param; SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
...@@ -7041,15 +7292,27 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7041,15 +7292,27 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
if (pInfo->pFillInfo) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
}
if (pInfo->pRes) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
if (pInfo->p) {
tfree(pInfo->p); tfree(pInfo->p);
}
} }
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
if (pInfo->prevData) {
tfree(pInfo->prevData); tfree(pInfo->prevData);
}
} }
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -7060,18 +7323,27 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7060,18 +7323,27 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTimeEveryOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTimeEveryOperatorInfo(void* param, int32_t numOfOutput) {
STimeEveryOperatorInfo* pInfo = (STimeEveryOperatorInfo*) param; STimeEveryOperatorInfo* pInfo = (STimeEveryOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
if (pInfo->rangeStart) {
taosHashCleanup(pInfo->rangeStart); taosHashCleanup(pInfo->rangeStart);
}
} }
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param; STagScanInfo* pInfo = (STagScanInfo*) param;
if (pInfo->pRes) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
if (pInfo->pDataBlock) {
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
}
} }
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -7081,14 +7353,29 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7081,14 +7353,29 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param;
if (pInfo->pSet) {
taosHashCleanup(pInfo->pSet); taosHashCleanup(pInfo->pSet);
}
if (pInfo->buf) {
tfree(pInfo->buf); tfree(pInfo->buf);
}
if (pInfo->pDistinctDataInfo) {
taosArrayDestroy(&pInfo->pDistinctDataInfo); taosArrayDestroy(&pInfo->pDistinctDataInfo);
}
if (pInfo->pRes) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
} }
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
...@@ -7096,7 +7383,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -7096,7 +7383,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "MultiTableAggregate"; pOperator->name = "MultiTableAggregate";
pOperator->operatorType = OP_MultiTableAggregate; pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7111,10 +7406,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -7111,10 +7406,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyAggOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->seed = rand(); pInfo->seed = rand();
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
...@@ -7124,9 +7428,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7124,9 +7428,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = OP_Project; pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7141,6 +7454,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7141,6 +7454,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyProjectOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
...@@ -7175,12 +7494,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 ...@@ -7175,12 +7494,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
assert(numOfFilter > 0 && pCols != NULL); assert(numOfFilter > 0 && pCols != NULL);
doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0);
pInfo->numOfFilterCols = numOfFilter; pInfo->numOfFilterCols = numOfFilter;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "FilterOperator"; pOperator->name = "FilterOperator";
pOperator->operatorType = OP_Filter; pOperator->operatorType = OP_Filter;
...@@ -7195,13 +7520,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7195,13 +7520,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyConditionOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo);
return NULL;
}
pOperator->name = "LimitOperator"; pOperator->name = "LimitOperator";
pOperator->operatorType = OP_Limit; pOperator->operatorType = OP_Limit;
...@@ -7217,12 +7556,22 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -7217,12 +7556,22 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeWindow; pOperator->operatorType = OP_TimeWindow;
...@@ -7237,12 +7586,22 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -7237,12 +7586,22 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyBasicOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STimeEveryOperatorInfo* pInfo = calloc(1, sizeof(STimeEveryOperatorInfo)); STimeEveryOperatorInfo* pInfo = calloc(1, sizeof(STimeEveryOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; if (pInfo == NULL) {
return NULL;
}
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->seed = rand(); pInfo->seed = rand();
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
...@@ -7258,9 +7617,20 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -7258,9 +7617,20 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
} }
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pBInfo->pRes == NULL || pBInfo->pCtx == NULL || pBInfo->resultRowInfo.pResult == NULL ||
(pQueryAttr->needReverseScan && pInfo->rangeStart == NULL))
{
goto _clean;
}
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "TimeEveryOperator"; pOperator->name = "TimeEveryOperator";
pOperator->operatorType = OP_TimeEvery; pOperator->operatorType = OP_TimeEvery;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7275,18 +7645,36 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -7275,18 +7645,36 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyTimeEveryOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "StateWindowOperator"; pOperator->name = "StateWindowOperator";
pOperator->operatorType = OP_StateWindow; pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7300,17 +7688,34 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe ...@@ -7300,17 +7688,34 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyStateWindowOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = OP_SessionWindow; pOperator->operatorType = OP_SessionWindow;
...@@ -7325,16 +7730,33 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7325,16 +7730,33 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyStateWindowOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->pCtx == NULL || pInfo->pRes == NULL || pInfo->resultRowInfo.pResult == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "MultiTableTimeIntervalOperator"; pOperator->name = "MultiTableTimeIntervalOperator";
pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
...@@ -7349,14 +7771,22 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -7349,14 +7771,22 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyBasicOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index if (pInfo == NULL) {
return NULL;
}
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -7367,7 +7797,15 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7367,7 +7797,15 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
...@@ -7381,16 +7819,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7381,16 +7819,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyGroupbyOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL) {
goto _clean;
}
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
{ {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal);
if (pColInfo == NULL) {
goto _clean;
}
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
...@@ -7401,11 +7857,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -7401,11 +7857,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
if (pInfo->pFillInfo == NULL) {
goto _clean;
}
pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES);
if (pInfo->p == NULL) {
goto _clean;
}
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7420,14 +7885,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -7420,14 +7885,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroySFillOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
if (pInfo->orderColumnList == NULL) {
goto _clean;
}
pInfo->slimit = pQueryAttr->slimit; pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit; pInfo->limit = pQueryAttr->limit;
pInfo->capacity = pRuntimeEnv->resultInfo.capacity; pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
...@@ -7444,6 +7922,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7444,6 +7922,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
if (pInfo->prevRow == NULL) {
goto _clean;
}
int32_t offset = POINTER_BYTES * numOfCols; int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
...@@ -7457,6 +7938,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7457,6 +7938,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo->pRes == NULL || pOperator == NULL) {
goto _clean;
}
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
pOperator->operatorType = OP_SLimit; pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7468,6 +7953,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7468,6 +7953,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroySlimitOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static SSDataBlock* doTagScan(void* param, bool* newgroup) { static SSDataBlock* doTagScan(void* param, bool* newgroup) {
...@@ -7618,7 +8109,14 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { ...@@ -7618,7 +8109,14 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL) {
goto _clean;
}
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1); assert(numOfGroup == 0 || numOfGroup == 1);
...@@ -7627,6 +8125,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -7627,6 +8125,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pInfo->curPos = 0; pInfo->curPos = 0;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "SeqTableTagScan"; pOperator->name = "SeqTableTagScan";
pOperator->operatorType = OP_TagScan; pOperator->operatorType = OP_TagScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -7639,7 +8141,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -7639,7 +8141,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pOperator->cleanup = destroyTagScanOperatorInfo; pOperator->cleanup = destroyTagScanOperatorInfo;
return pOperator; return pOperator;
_clean:
destroyTagScanOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited // distinct info already inited
...@@ -7756,6 +8265,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -7756,6 +8265,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->totalBytes = 0; pInfo->totalBytes = 0;
pInfo->buf = NULL; pInfo->buf = NULL;
pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold
...@@ -7764,8 +8277,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -7764,8 +8277,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
if (pInfo->pDistinctDataInfo == NULL || pInfo->pSet == NULL || pInfo->pRes == NULL) {
goto _clean;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
goto _clean;
}
pOperator->name = "DistinctOperator"; pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
...@@ -7780,6 +8300,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -7780,6 +8300,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
_clean:
destroyDistinctOperatorInfo((void *)pInfo, numOfOutput);
tfree(pInfo);
return NULL;
} }
static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) {
......
...@@ -354,6 +354,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 ...@@ -354,6 +354,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
} }
SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
if (pFillInfo == NULL) {
return NULL;
}
taosResetFillInfo(pFillInfo, skey); taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order; pFillInfo->order = order;
...@@ -371,6 +375,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 ...@@ -371,6 +375,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->interval.slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
if (pFillInfo->pData == NULL) {
tfree(pFillInfo);
return NULL;
}
// if (numOfTags > 0) { // if (numOfTags > 0) {
pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo)); pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册