提交 5dd7a1ad 编写于 作者: X xywang

[TS-1029]<fix>(query): added some memory allocation checks

上级 7bbab9b0
...@@ -2041,6 +2041,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { ...@@ -2041,6 +2041,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self);
if (pQueryInfo->pQInfo == NULL) {
taosHashCleanup(tableGroupInfo.map);
taosArrayDestroy(group);
tscAsyncResultOnError(pSql);
pRes->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pRes->code;
}
} }
uint64_t localQueryId = pSql->self; uint64_t localQueryId = pSql->self;
......
...@@ -3873,8 +3873,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr ...@@ -3873,8 +3873,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
STsBufInfo bufInfo = {0}; STsBufInfo bufInfo = {0};
SQueryParam param = {.pOperator = pa}; SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger); int32_t code = initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger);
taosArrayDestroy(pa); taosArrayDestroy(pa);
if (code != TSDB_CODE_SUCCESS) {
goto _cleanup;
}
return pQInfo; return pQInfo;
......
...@@ -321,9 +321,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -321,9 +321,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
const static int32_t minSize = 8; const static int32_t minSize = 8;
SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput; if (res == NULL) {
qError("failed to allocate for output buffer");
goto _clean;
}
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
if (res->pDataBlock == NULL) {
qError("failed to init arrary for data block of output buffer");
goto _clean;
}
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
idata.info.type = pExpr[i].base.resType; idata.info.type = pExpr[i].base.resType;
...@@ -332,10 +340,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -332,10 +340,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
int32_t size = MAX(idata.info.bytes * numOfRows, minSize); int32_t size = MAX(idata.info.bytes * numOfRows, minSize);
idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform
if (idata.pData == NULL) {
qError("failed to allocate column buffer for output buffer");
goto _clean;
}
taosArrayPush(res->pDataBlock, &idata); taosArrayPush(res->pDataBlock, &idata);
res->info.numOfCols++;
} }
return res; return res;
_clean:
destroyOutputBuf(res);
return NULL;
} }
void* destroyOutputBuf(SSDataBlock* pBlock) { void* destroyOutputBuf(SSDataBlock* pBlock) {
...@@ -2176,23 +2194,35 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2176,23 +2194,35 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
switch (*op) { switch (*op) {
case OP_TagScan: { case OP_TagScan: {
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_MultiTableTimeInterval: { case OP_MultiTableTimeInterval: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_AllMultiTableTimeInterval: { case OP_AllMultiTableTimeInterval: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2202,6 +2232,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2202,6 +2232,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_AllTimeWindow: { case OP_AllTimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2211,6 +2244,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2211,6 +2244,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Groupby: { case OP_Groupby: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
...@@ -2221,6 +2257,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2221,6 +2257,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_SessionWindow: { case OP_SessionWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2230,12 +2269,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2230,12 +2269,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiTableAggregate: { case OP_MultiTableAggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_Aggregate: { case OP_Aggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
...@@ -2256,11 +2301,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2256,11 +2301,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
assert(pQueryAttr->pExpr2 != NULL); assert(pQueryAttr->pExpr2 != NULL);
pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
} }
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_StateWindow: { case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) { if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -2270,6 +2322,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2270,6 +2322,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Limit: { case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2280,12 +2335,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2280,12 +2335,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
} else { } else {
SColumnInfo* pColInfo = SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
} }
...@@ -2295,11 +2356,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2295,11 +2356,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Fill: { case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot; SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_MultiwayMergeSort: { case OP_MultiwayMergeSort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2311,6 +2378,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2311,6 +2378,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -2318,16 +2388,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2318,16 +2388,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
int32_t num = pRuntimeEnv->proot->numOfOutput; int32_t num = pRuntimeEnv->proot->numOfOutput;
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_Distinct: { case OP_Distinct: {
pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
case OP_Order: { case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
break; break;
} }
...@@ -4826,7 +4905,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4826,7 +4905,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb; pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) { if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4847,18 +4925,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4847,18 +4925,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
switch(tbScanner) { switch(tbScanner) {
case OP_TableBlockInfoScan: { case OP_TableBlockInfoScan: {
pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_TableSeqScan: { case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_DataBlocksOptScan: { case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
case OP_TableScan: { case OP_TableScan: {
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
if (pRuntimeEnv->proot == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
break; break;
} }
default: { // do nothing default: { // do nothing
...@@ -4923,7 +5013,6 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI ...@@ -4923,7 +5013,6 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
} }
} }
STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
STsdbQueryCond cond = { STsdbQueryCond cond = {
.colList = pQueryAttr->tableCols, .colList = pQueryAttr->tableCols,
...@@ -5164,6 +5253,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5164,6 +5253,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
...@@ -5172,6 +5265,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5172,6 +5265,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo);
return NULL;
}
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5186,6 +5284,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5186,6 +5284,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = 1; pInfo->times = 1;
...@@ -5196,6 +5297,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5196,6 +5297,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pRuntimeEnv->enableGroupData = true; pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo);
return NULL;
}
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan; pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5210,9 +5316,16 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5210,9 +5316,16 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
if (pInfo->block.pDataBlock == NULL) {
tfree(pInfo);
return NULL;
}
SColumnInfoData infoData = {{0}}; SColumnInfoData infoData = {{0}};
infoData.info.type = TSDB_DATA_TYPE_BINARY; infoData.info.type = TSDB_DATA_TYPE_BINARY;
...@@ -5221,6 +5334,12 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu ...@@ -5221,6 +5334,12 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
taosArrayPush(pInfo->block.pDataBlock, &infoData); taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
tfree(pInfo->block.pDataBlock);
tfree(pInfo);
return NULL;
}
pOperator->name = "TableBlockInfoScanOperator"; pOperator->name = "TableBlockInfoScanOperator";
pOperator->operatorType = OP_TableBlockInfoScan; pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -5293,6 +5412,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5293,6 +5412,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
if (pInfo == NULL) {
return NULL;
}
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
...@@ -5300,6 +5423,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5300,6 +5423,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
if (pOptr == NULL) {
tfree(pInfo);
return NULL;
}
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan; pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->pRuntimeEnv = pRuntimeEnv;
...@@ -5417,6 +5545,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5417,6 +5545,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
...@@ -5476,6 +5608,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5476,6 +5608,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
if (pInfo->binfo.pRes == NULL) {
return NULL;
}
{ // todo extract method to create prev compare buffer { // todo extract method to create prev compare buffer
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
...@@ -5886,8 +6022,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5886,8 +6022,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
} } else if(srows > 0) {
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) { if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
...@@ -6607,6 +6742,10 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -6607,6 +6742,10 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand(); pInfo->seed = rand();
...@@ -6705,6 +6844,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -6705,6 +6844,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6734,6 +6878,10 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6734,6 +6878,10 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
...@@ -6831,6 +6979,11 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -6831,6 +6979,11 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL || pInfo->pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6856,6 +7009,11 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -6856,6 +7009,11 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL || pInfo->pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6881,6 +7039,11 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe ...@@ -6881,6 +7039,11 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6903,6 +7066,11 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6903,6 +7066,11 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
...@@ -6929,6 +7097,11 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -6929,6 +7097,11 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL || pInfo->pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6953,6 +7126,11 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu ...@@ -6953,6 +7126,11 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL || pInfo->pCtx == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6987,6 +7165,11 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6987,6 +7165,11 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->binfo.pRes == NULL) {
return NULL;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7008,6 +7191,11 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7008,6 +7191,11 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL) {
return NULL;
}
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
{ {
...@@ -7077,6 +7265,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -7077,6 +7265,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL) {
return NULL;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
...@@ -7222,6 +7414,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -7222,6 +7414,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
if (pInfo->pRes == NULL) {
return NULL;
}
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1); assert(numOfGroup == 0 || numOfGroup == 1);
...@@ -7364,6 +7560,10 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -7364,6 +7560,10 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
if (pInfo->pRes == NULL) {
return NULL;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator"; pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册