提交 1d7581f6 编写于 作者: wmmhello's avatar wmmhello

fix:conflicts from main

......@@ -1035,6 +1035,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return TSDB_CODE_SUCCESS;
}
#if 0
typedef struct SHelper {
int32_t index;
union {
......@@ -1083,59 +1084,20 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
// SSDataBlock* pDataBlock = pHelper->pDataBlock;
SHelper* left = (SHelper*)p1;
SHelper* right = (SHelper*)p2;
SArray* pInfo = pHelper->orderInfo;
int32_t offset = 0;
// for(int32_t i = 0; i < pInfo->size; ++i) {
// SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0);
// SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
// if (pColInfoData->hasNull) {
// bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg);
// bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg);
// if (leftNull && rightNull) {
// continue; // continue to next slot
// }
//
// if (rightNull) {
// return pHelper->nullFirst? 1:-1;
// }
//
// if (leftNull) {
// return pHelper->nullFirst? -1:1;
// }
// }
// void* left1 = colDataGetData(pColInfoData, left);
// void* right1 = colDataGetData(pColInfoData, right);
// switch(pColInfoData->info.type) {
// case TSDB_DATA_TYPE_INT: {
int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset);
int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset);
// offset += pColInfoData->info.bytes;
if (leftx == rightx) {
// break;
return 0;
} else {
// if (pOrder->order == TSDB_ORDER_ASC) {
return (leftx < rightx) ? -1 : 1;
// } else {
// return (leftx < rightx)? 1:-1;
// }
}
// }
// default:
// assert(0);
// }
// }
}
return 0;
}
......@@ -1179,6 +1141,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// destroyTupleIndex(index);
return 0;
}
#endif
void blockDataCleanup(SSDataBlock* pDataBlock) {
blockDataEmpty(pDataBlock);
......@@ -1887,6 +1850,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
#if 0
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, &pBlock);
......@@ -1979,6 +1943,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
}
}
#endif
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
......
......@@ -175,7 +175,6 @@ struct SExecTaskInfo {
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo* pTableInfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -323,6 +322,8 @@ typedef struct STableScanBase {
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableListInfo;
} STableScanBase;
typedef struct STableScanInfo {
......@@ -363,11 +364,12 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
} STagScanInfo;
typedef enum EStreamScanMode {
......@@ -483,10 +485,11 @@ typedef struct SStreamScanInfo {
} SStreamScanInfo;
typedef struct {
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
STableListInfo* pTableListInfo;
} SStreamRawScanInfo;
typedef struct STableCountScanSupp {
......@@ -515,6 +518,16 @@ typedef struct SOptrBasicInfo {
bool mergeResultBlock;
} SOptrBasicInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
bool groupKeyOptimized;
} SAggOperatorInfo;
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
......@@ -731,6 +744,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
......@@ -753,11 +767,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// clang-format off
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
......@@ -773,7 +787,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
......@@ -787,9 +801,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
......@@ -834,9 +848,11 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
char* buildTaskId(uint64_t taskId, uint64_t queryId);
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
......@@ -850,7 +866,6 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
typedef struct {
bool hasAgg;
int32_t numOfRows;
int32_t startOffset;
} SFunctionCtxStatus;
static void destroyAggOperatorInfo(void* param);
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator);
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
pInfo->groupId = UINT64_MAX;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
cleanupExprSupp(&pOperator->exprSupp);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void destroyAggOperatorInfo(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarExprSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
// this is a blocking operator
int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs();
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
bool hasValidBlock = false;
bool blockAllocated = false;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
if (!hasValidBlock) {
createDataBlockForEmptyInput(pOperator, &pBlock);
if (pBlock == NULL) {
break;
}
blockAllocated = true;
} else {
break;
}
}
hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
}
// the downstream operator may return with error code, so let's check the code before generating results.
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return pTaskInfo->code;
}
SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
setOperatorCompleted(pOperator);
return NULL;
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
setOperatorCompleted(pOperator);
break;
}
if (pInfo->pRes->info.rows > 0) {
break;
}
}
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pInfo->pRes;
}
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process == NULL) {
continue;
}
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupKeyOptimized) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS;
}
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
(strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
hasCountFunc = true;
break;
}
}
if (!hasCountFunc) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = createDataBlock();
pBlock->info.rows = 1;
pBlock->info.capacity = 0;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData colInfo = {0};
colInfo.hasNull = true;
colInfo.info.type = TSDB_DATA_TYPE_NULL;
colInfo.info.bytes = 1;
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
if (slotId >= numOfCols) {
taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
for (int32_t k = numOfCols; k < slotId + 1; ++k) {
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
}
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
// do nothing
}
}
}
blockDataEnsureCapacity(pBlock, pBlock->info.rows);
*ppBlock = pBlock;
return TSDB_CODE_SUCCESS;
}
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
if (!blockAllocated) {
return;
}
blockDataDestroy(*ppBlock);
*ppBlock = NULL;
}
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
return;
}
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
// record the current active group id
pAggInfo->groupId = groupId;
}
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
}
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
// a new buffer page for each table. Needs to opt this design
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
if (pWindowRes->pageId != -1) {
return 0;
}
SFilePage* pData = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
}
}
}
if (pData == NULL) {
return -1;
}
// set the number of rows in current disk page
if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pageId = pageId;
pWindowRes->offset = (int32_t)pData->num;
pData->num += size;
}
return 0;
}
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) {
int32_t code = 0;
// _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
return code;
}
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
return code;
}
return code;
}
void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfCols; ++i) {
if (pState) {
pSup->pCtx[i].saveHandle.pBuf = NULL;
pSup->pCtx[i].saveHandle.pState = pState;
pSup->pCtx[i].exprIdx = i;
} else {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
}
}
return TSDB_CODE_SUCCESS;
}
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily
SFunctionCtxStatus status = {0};
functionCtxSave(&pCtx[k], &status);
pCtx[k].input.startRowIndex = offset;
pCtx[k].input.numOfRows = forwardStep;
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
pCtx[k].input.colDataSMAIsSet = false;
}
if (pCtx[k].isPseudoFunc) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0};
idata.info.type = TSDB_DATA_TYPE_BIGINT;
idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
idata.pData = p;
SScalarParam out = {.columnData = &idata};
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1;
} else {
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
T_LONG_JMP(taskInfo->env, code);
}
}
// restore it
functionCtxRestore(&pCtx[k], &status);
}
}
}
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
pStatus->numOfRows = pCtx->input.numOfRows;
pStatus->startOffset = pCtx->input.startRowIndex;
}
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
\ No newline at end of file
......@@ -37,6 +37,7 @@ typedef struct SCacheRowsScanInfo {
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
STableListInfo* pTableList;
} SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
......@@ -47,15 +48,17 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
pInfo->pTableList = pTableListInfo;
pInfo->readHandle = *readHandle;
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
......@@ -75,20 +78,18 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error;
}
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
int32_t totalTables = tableListGetSize(pTableList);
int32_t totalTables = tableListGetSize(pTableListInfo);
int32_t capacity = 0;
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname
if (oneTableForEachGroup(pTableList) || (totalTables == 1)) {
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
STableKeyInfo* pList = tableListGetInfo(pTableList, 0);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
uint64_t suid = tableListGetSuid(pTableList);
uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
......@@ -136,7 +137,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
......@@ -281,6 +282,7 @@ void destroyCacheScanOperator(void* param) {
taosMemoryFree(pInfo->pSlotIds);
taosArrayDestroy(pInfo->pUidList);
taosArrayDestroy(pInfo->matchInfo.pList);
tableListDestroy(pInfo->pTableList);
if (pInfo->pLastrowReader != NULL) {
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
......
......@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
for (int32_t i = 0; i < totalSources; ++i) {
......@@ -576,7 +576,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
return TSDB_CODE_SUCCESS;
......@@ -629,7 +629,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
......
......@@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
......@@ -485,9 +485,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
......@@ -507,7 +505,12 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (handle) {
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
SArray* pInfoList = getTableListInfo(*pTask);
STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
taosArrayDestroy(pInfoList);
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTableListInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
goto _error;
......@@ -1083,7 +1086,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
// if pOffset equal to current offset, means continue consume
......@@ -1092,17 +1094,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream != 1) {
qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
return -1;
}
pOperator = pOperator->pDownstream[0];
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
tsdbReaderClose(pScanBaseInfo->dataReader);
......@@ -1187,9 +1186,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
return -1;
......
......@@ -34,9 +34,7 @@
#include "ttypes.h"
#include "vnode.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#if 0
......@@ -74,34 +72,21 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
bool groupKeyOptimized;
} SAggOperatorInfo;
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
static void releaseQueryBuf(size_t numOfTables);
static void destroyAggOperatorInfo(void* param);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);
static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
......@@ -266,57 +251,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
return pResult;
}
// a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
if (pWindowRes->pageId != -1) {
return 0;
}
SFilePage* pData = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
}
}
}
if (pData == NULL) {
return -1;
}
// set the number of rows in current disk page
if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pageId = pageId;
pWindowRes->offset = (int32_t)pData->num;
pData->num += size;
}
return 0;
}
// query_range_start, query_range_end, window_duration, window_start, window_end
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
......@@ -332,72 +266,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
}
typedef struct {
bool hasAgg;
int32_t numOfRows;
int32_t startOffset;
} SFunctionCtxStatus;
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
pStatus->numOfRows = pCtx->input.numOfRows;
pStatus->startOffset = pCtx->input.startRowIndex;
}
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily
SFunctionCtxStatus status = {0};
functionCtxSave(&pCtx[k], &status);
pCtx[k].input.startRowIndex = offset;
pCtx[k].input.numOfRows = forwardStep;
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
pCtx[k].input.colDataSMAIsSet = false;
}
if (pCtx[k].isPseudoFunc) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0};
idata.info.type = TSDB_DATA_TYPE_BIGINT;
idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
idata.pData = p;
SScalarParam out = {.columnData = &idata};
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1;
} else {
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
T_LONG_JMP(taskInfo->env, code);
}
}
// restore it
functionCtxRestore(&pCtx[k], &status);
}
}
}
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
......@@ -512,25 +380,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
return code;
}
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process == NULL) {
continue;
}
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......@@ -654,149 +503,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
return win;
}
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NOT_LOAD;
pBlock->pDataBlock = NULL;
pBlock->pBlockAgg = NULL;
// int64_t groupId = pRuntimeEnv->current->groupIndex;
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
STaskCostInfo* pCost = &pTaskInfo->cost;
// pCost->totalBlocks += 1;
// pCost->totalRows += pBlock->info.rows;
#if 0
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
(*status) = BLK_DATA_DATA_LOAD;
}
// check if this data block is required to load
if ((*status) != BLK_DATA_DATA_LOAD) {
bool needFilter = true;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
SResultRow* pResult = NULL;
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (pQueryAttr->pointInterpQuery) {
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset);
} else {
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
pRuntimeEnv->current->groupIndex);
}
if (needFilter) {
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
} else {
(*status) = BLK_DATA_DATA_LOAD;
}
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
// *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
// pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
} else if ((*status) == BLK_DATA_SMA_LOAD) {
// this function never returns error?
pCost->loadBlockStatis += 1;
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
pCost->totalCheckedRows += pBlock->info.rows;
}
} else {
assert((*status) == BLK_DATA_DATA_LOAD);
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
{ // set previous window
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
SResultRow* pResult = NULL;
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
}
bool load = false;
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pTableScanInfo->pCtx[i].functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
// load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
// (char*)&(pBlock->pBlockAgg[i].max));
if (!load) { // current block has been discard due to filter applied
pCost->skipBlocks += 1;
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
}
}
}
// current block has been discard due to filter applied
// if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
// pCost->skipBlocks += 1;
// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
// pBlockInfo->window.ekey, pBlockInfo->rows);
// (*status) = BLK_DATA_FILTEROUT;
// return TSDB_CODE_SUCCESS;
// }
pCost->totalCheckedRows += pBlockInfo->rows;
pCost->loadBlocks += 1;
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
// if (pBlock->pDataBlock == NULL) {
// return terrno;
// }
// if (pQueryAttr->pFilters != NULL) {
// filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
// }
// if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
// }
}
#endif
return TSDB_CODE_SUCCESS;
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
if (status == TASK_NOT_COMPLETED) {
pTaskInfo->status = status;
......@@ -1027,43 +733,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
}
}
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
}
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
return;
}
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
// record the current active group id
pAggInfo->groupId = groupId;
}
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
......@@ -1281,161 +950,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//
// STimeWindow tw = *win;
// getNextTimeWindow(pQueryAttr, &tw);
//
// if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
// (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
//
// // load the data block and check data remaining in current data block
// // TODO optimize performance
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// tw = *win;
// int32_t startPos =
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//
// // set the abort info
// pQueryAttr->pos = startPos;
//
// // reset the query start timestamp
// pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
// pQueryAttr->window.skey = pTableQueryInfo->win.skey;
// TSKEY key = pTableQueryInfo->win.skey;
//
// pWindowResInfo->prevSKey = tw.skey;
// int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index
//
// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
// lastKey:%" PRId64,
// GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
// pQueryAttr->current->lastKey);
//
// return key;
// } else { // do nothing
// pQueryAttr->window.skey = tw.skey;
// pWindowResInfo->prevSKey = tw.skey;
// pTableQueryInfo->lastKey = tw.skey;
//
// return tw.skey;
// }
//
// return true;
// }
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
// // if queried with value filter, do NOT forward query start position
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
// pRuntimeEnv->pFillInfo != NULL) {
// return true;
// }
//
// /*
// * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
// * pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
// value is
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
// */
// STimeWindow w = TSWINDOW_INITIALIZER;
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
//
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
//
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
//
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
// &w); pWindowResInfo->prevSKey = w.skey;
// }
// } else {
// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
// pWindowResInfo->prevSKey = w.skey;
// }
//
// // the first time window
// STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
//
// while (pQueryAttr->limit.offset > 0) {
// STimeWindow tw = win;
//
// if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
// pQueryAttr->limit.offset -= 1;
// pWindowResInfo->prevSKey = win.skey;
//
// // current time window is aligned with blockInfo.window.ekey
// // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
// if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
// pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
// }
// }
//
// if (pQueryAttr->limit.offset == 0) {
// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
// return true;
// }
//
// // current window does not ended in current data block, try next data block
// getNextTimeWindow(pQueryAttr, &tw);
//
// /*
// * If the next time window still starts from current data block,
// * load the primary timestamp column first, and then find the start position for the next queried time window.
// * Note that only the primary timestamp column is required.
// * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
// required
// * time window resides in current data block.
// */
// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
//
// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
// pQueryAttr->limit.offset -= 1;
// }
//
// if (pQueryAttr->limit.offset == 0) {
// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
// return true;
// } else {
// tw = win;
// int32_t startPos =
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
// // set the abort info
// pQueryAttr->pos = startPos;
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
// pWindowResInfo->prevSKey = tw.skey;
// win = tw;
// }
// } else {
// break; // offset is not 0, and next time window begins or ends in the next block.
// }
// }
// }
//
// // check for error
// if (terrno != TSDB_CODE_SUCCESS) {
// T_LONG_JMP(pRuntimeEnv->env, terrno);
// }
//
// return true;
// }
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
if (p->pDownstream == NULL) {
......@@ -1481,191 +995,23 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupKeyOptimized) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS;
}
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
(strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
hasCountFunc = true;
break;
}
}
if (!hasCountFunc) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = createDataBlock();
pBlock->info.rows = 1;
pBlock->info.capacity = 0;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData colInfo = {0};
colInfo.hasNull = true;
colInfo.info.type = TSDB_DATA_TYPE_NULL;
colInfo.info.bytes = 1;
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
if (slotId >= numOfCols) {
taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
for (int32_t k = numOfCols; k < slotId + 1; ++k) {
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
}
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
// do nothing
}
}
}
blockDataEnsureCapacity(pBlock, pBlock->info.rows);
*ppBlock = pBlock;
return TSDB_CODE_SUCCESS;
}
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
if (!blockAllocated) {
return;
}
blockDataDestroy(*ppBlock);
*ppBlock = NULL;
}
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs();
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
bool hasValidBlock = false;
bool blockAllocated = false;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
if (!hasValidBlock) {
createDataBlockForEmptyInput(pOperator, &pBlock);
if (pBlock == NULL) {
break;
}
blockAllocated = true;
} else {
break;
}
}
hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
}
// the downstream operator may return with error code, so let's check the code before generating results.
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return pTaskInfo->code;
}
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
setOperatorCompleted(pOperator);
//QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
return NULL;
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
setOperatorCompleted(pOperator);
break;
if (pOperator->operatorType == type) {
return pOperator;
} else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
return NULL;
}
if (pInfo->pRes->info.rows > 0) {
break;
}
return extractOperatorInTree(pOperator->pDownstream[0], type, id);
}
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pInfo->pRes;
}
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
......@@ -1732,70 +1078,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
return 0;
}
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) {
int32_t code = 0;
// _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
return code;
}
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
return code;
}
return code;
}
void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfCols; ++i) {
if (pState) {
pSup->pCtx[i].saveHandle.pBuf = NULL;
pSup->pCtx[i].saveHandle.pState = pState;
pSup->pCtx[i].exprIdx = i;
} else {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
}
}
return TSDB_CODE_SUCCESS;
}
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
if (numOfRows == 0) {
numOfRows = 4096;
......@@ -1814,7 +1096,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
initResultRowInfo(&pInfo->resultRowInfo);
}
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
if (pCtx == NULL) {
return NULL;
}
......@@ -1866,99 +1148,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
pInfo->groupId = UINT64_MAX;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
cleanupExprSupp(&pOperator->exprSupp);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
static void freeItem(void* pItem) {
void** p = pItem;
if (*p != NULL) {
taosMemoryFreeClear(*p);
}
}
void destroyAggOperatorInfo(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarExprSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
char* buildTaskId(uint64_t taskId, uint64_t queryId) {
char* p = taosMemoryMalloc(64);
......@@ -1986,7 +1177,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
......@@ -1997,8 +1187,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v
return pTaskInfo;
}
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
......@@ -2101,7 +1289,6 @@ bool groupbyTbname(SNodeList* pGroupList) {
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser) {
int32_t type = nodeType(pPhyNode);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
......@@ -2115,11 +1302,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableScanNode->groupSort = true;
}
STableListInfo* pTableListInfo = tableListCreate();
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL;
}
......@@ -2127,10 +1316,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
tableListDestroy(pTableListInfo);
return NULL;
}
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
return NULL;
......@@ -2140,11 +1330,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
return NULL;
}
......@@ -2152,12 +1344,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
tableListDestroy(pTableListInfo);
return NULL;
}
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
tableListDestroy(pTableListInfo);
return NULL;
}
......@@ -2168,29 +1362,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pHandle->vnode) {
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
return NULL;
}
#ifndef NDEBUG
int32_t sz = tableListGetSize(pTableListInfo);
qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
}
#endif
}
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
......@@ -2199,7 +1386,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2208,9 +1395,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
......@@ -2231,9 +1419,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
}
pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo);
......@@ -2248,7 +1437,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
......@@ -2256,7 +1445,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
if (pOperator != NULL) {
if (pOperator != NULL) { // todo moved away
pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
}
......@@ -2397,9 +1586,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return -1;
}
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle) {
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
......@@ -2416,8 +1603,9 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);
int32_t tbNum = tableListGetSize(pTableListInfo);
pDeleterParam->suid = tableListGetSuid(pTableListInfo);
// TODO extract uid list
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
......@@ -2427,11 +1615,12 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
}
*pParam = pDeleterParam;
break;
}
default:
......@@ -2481,8 +1670,6 @@ static void freeBlock(void* pParam) {
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
......@@ -2809,3 +1996,21 @@ void qStreamCloseTsdbReader(void* task) {
}
}
}
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
} else {
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
extractTableList(pList, pOperator->pDownstream[0]);
}
}
}
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
}
\ No newline at end of file
......@@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
uint32_t status = 0;
......@@ -771,7 +771,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList);
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableListInfo);
STableKeyInfo tInfo = {0};
while (1) {
......@@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
......@@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
......@@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
} else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
......@@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
......@@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
tsdbSetTableList(pInfo->base.dataReader, pList, num);
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
......@@ -866,25 +866,30 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
return 0;
}
static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
static void destroyTableScanBase(STableScanBase* pBase) {
cleanupQueryTableDataCond(&pBase->cond);
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
tsdbReaderClose(pBase->dataReader);
pBase->dataReader = NULL;
if (pTableScanInfo->base.matchInfo.pList != NULL) {
taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
if (pBase->matchInfo.pList != NULL) {
taosArrayDestroy(pBase->matchInfo.pList);
}
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
tableListDestroy(pBase->pTableListInfo);
taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
cleanupExprSupp(&pBase->pseudoSup);
}
static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
destroyTableScanBase(&pTableScanInfo->base);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = 0;
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -941,6 +946,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->base.pTableListInfo = pTableListInfo;
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
......@@ -1060,7 +1066,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
if (hasNext) {
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
tsdbReaderClose(pReader);
......@@ -1081,7 +1087,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
}
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
......@@ -1555,7 +1562,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version;
pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
......@@ -2137,19 +2145,19 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
if (code) {
tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
if (pBlock == NULL) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
......@@ -2239,6 +2247,7 @@ static void destroyRawScanOperatorInfo(void* param) {
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
tsdbReaderClose(pRawScan->dataReader);
destroySnapContext(pRawScan->sContext);
tableListDestroy(pRawScan->pTableListInfo);
taosMemoryFree(pRawScan);
}
......@@ -2260,6 +2269,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
goto _end;
}
pInfo->pTableListInfo = tableListCreate();
pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext;
......@@ -2278,9 +2288,11 @@ _end:
static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
destroyOperatorInfo(pStreamScan->pTableScanOp);
}
if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader);
}
......@@ -2307,13 +2319,14 @@ static void destroyStreamScanOperatorInfo(void* param) {
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SArray* pColIds = NULL;
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
......@@ -2327,6 +2340,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
int32_t code =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
tableListDestroy(pTableListInfo);
goto _error;
}
......@@ -2346,11 +2360,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
tableListDestroy(pTableListInfo);
goto _error;
}
}
......@@ -2360,10 +2377,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
}
......@@ -2371,11 +2390,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error;
}
if (pHandle->vnode) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) {
pTSInfo->base.cond.endVersion = pHandle->version;
......@@ -2383,7 +2403,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
STableKeyInfo* pList = NULL;
int32_t num = 0;
tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
tableListGetGroupList(pTableListInfo, 0, &pList, &num);
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
......@@ -2413,16 +2433,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
}
taosArrayDestroy(tableIdList);
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
} else {
taosArrayDestroy(pColIds);
tableListDestroy(pTableListInfo);
pColIds = NULL;
}
......@@ -2457,7 +2479,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
......@@ -2488,7 +2510,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
......@@ -2500,7 +2522,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2561,10 +2583,11 @@ static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo,
SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -2587,6 +2610,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error;
}
pInfo->pTableListInfo = pTableListInfo;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
......@@ -2620,7 +2644,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
......@@ -2677,7 +2701,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue;
}
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -2733,10 +2757,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{
size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
......@@ -2870,7 +2894,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
......@@ -2879,7 +2903,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
......@@ -2904,7 +2928,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
......@@ -2926,9 +2950,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
p->dataReader = NULL;
}
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
......@@ -2937,20 +2958,14 @@ void destroyTableMergeScanOperatorInfo(void* param) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
if (pTableScanInfo->base.matchInfo.pList != NULL) {
taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
destroyTableScanBase(&pTableScanInfo->base);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
taosMemoryFreeClear(param);
}
......@@ -2969,7 +2984,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -3011,6 +3026,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->base.limitInfo.limit.limit = -1;
pInfo->base.limitInfo.slimit.limit = -1;
pInfo->base.pTableListInfo = pTableListInfo;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
......
......@@ -83,10 +83,11 @@ typedef struct MergeIndex {
} MergeIndex;
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
uint64_t uid; // table uid
} SBlockDistInfo;
static int32_t sysChkFilter__Comm(SNode* pNode);
......@@ -2214,6 +2215,7 @@ static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
tsdbReaderClose(pDistInfo->pHandle);
tableListDestroy(pDistInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
......@@ -2245,7 +2247,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -2263,9 +2265,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
pInfo->pTableListInfo = pTableListInfo;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
cleanupQueryTableDataCond(&cond);
......
......@@ -938,6 +938,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
......@@ -2175,13 +2176,6 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
}
}
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
......
......@@ -757,7 +757,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) {
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType ||
FUNCTION_TYPE_IROWTS == pFunc->funcType) {
return true;
}
}
......@@ -3119,6 +3120,19 @@ static const char* getPrecisionStr(uint8_t precision) {
return "unknown";
}
static void convertVarDuration(SValueNode* pOffset, uint8_t precision) {
const int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
const int8_t units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND};
if (pOffset->unit == 'n') {
pOffset->datum.i = pOffset->datum.i * 31 * (NANOSECOND_PER_DAY / factors[precision]);
} else {
pOffset->datum.i = pOffset->datum.i * 365 * (NANOSECOND_PER_DAY / factors[precision]);
}
pOffset->unit = units[precision];
}
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
......@@ -3143,6 +3157,10 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG);
}
if (pOffset->unit == 'n' || pOffset->unit == 'y') {
convertVarDuration(pOffset, precision);
}
}
if (NULL != pInterval->pSliding) {
......
......@@ -212,10 +212,10 @@ print ===> rows2: $data20 $data21 $data22 $data23 $data24
if $rows != 3 then
return -1
endi
if $data01 != 2 then
if $data01 != 4 then
return -1
endi
if $data04 != 2 then
if $data04 != 4 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册