提交 820d6d13 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 d2f8a330
...@@ -113,7 +113,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -113,7 +113,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
...@@ -121,6 +121,6 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); ...@@ -121,6 +121,6 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor ...@@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window SResultRowPosition pos; // current active time window
// SVariant tag;
} STableQueryInfo; } STableQueryInfo;
typedef struct SLimit { typedef struct SLimit {
...@@ -156,8 +155,6 @@ typedef struct STaskAttr { ...@@ -156,8 +155,6 @@ typedef struct STaskAttr {
} STaskAttr; } STaskAttr;
struct SOperatorInfo; struct SOperatorInfo;
//struct SAggSupporter;
//struct SOptrBasicInfo;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
...@@ -199,25 +196,15 @@ typedef struct STaskRuntimeEnv { ...@@ -199,25 +196,15 @@ typedef struct STaskRuntimeEnv {
STaskAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object. STSCursor cur;
char** prevRow; char* tagVal; // tag value of current data block
STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
char* tagVal; // tag value of current data block
// STableGroupInfo tableqinfoGroupInfo; // this is a table list
struct SOperatorInfo* proot; struct SOperatorInfo* proot;
SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value
int64_t currentOffset; // dynamic offset value
STableQueryInfo* current; STableQueryInfo* current;
SResultInfo resultInfo; SResultInfo resultInfo;
struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
...@@ -243,7 +230,7 @@ typedef struct SOperatorInfo { ...@@ -243,7 +230,7 @@ typedef struct SOperatorInfo {
bool blocking; // block operator or not bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfExprs; // number of columns of the current operator results int32_t numOfExprs; // number of columns of the current operator results
char* name; // name, used to show the query execution plan char* name; // name, for debug purpose
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
...@@ -308,6 +295,13 @@ typedef struct SSampleExecInfo { ...@@ -308,6 +295,13 @@ typedef struct SSampleExecInfo {
uint32_t seed; // random seed value uint32_t seed; // random seed value
} SSampleExecInfo; } SSampleExecInfo;
typedef struct SExecSupp {
SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
} SExecSupp;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void* dataReader; void* dataReader;
SReadHandle readHandle; SReadHandle readHandle;
...@@ -320,17 +314,13 @@ typedef struct STableScanInfo { ...@@ -320,17 +314,13 @@ typedef struct STableScanInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset; int32_t* rowEntryInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SArray* pColMatchInfo; SArray* pColMatchInfo;
int32_t numOfOutput; int32_t numOfOutput;
SExprInfo* pPseudoExpr; SExecSupp pseudoSup;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
// int32_t* rowCellInfoOffset;
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
...@@ -436,19 +426,20 @@ typedef struct SBlockDistInfo { ...@@ -436,19 +426,20 @@ typedef struct SBlockDistInfo {
void* pHandle; void* pHandle;
} SBlockDistInfo; } SBlockDistInfo;
// todo remove this
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info int32_t* rowEntryInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx; SqlFunctionCtx* pCtx;
SSDataBlock* pRes; SSDataBlock* pRes;
} SOptrBasicInfo; } SOptrBasicInfo;
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset // TODO move the resultrowsiz together with SOptrBasicInfo:rowEntryInfoOffset
typedef struct SAggSupporter { typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row, todo remove it
} SAggSupporter; } SAggSupporter;
typedef struct STimeWindowSupp { typedef struct STimeWindowSupp {
...@@ -503,7 +494,7 @@ typedef struct SAggOperatorInfo { ...@@ -503,7 +494,7 @@ typedef struct SAggOperatorInfo {
SExprInfo *pScalarExprInfo; SExprInfo *pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied
SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct. SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct.
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t *rowEntryInfoOffset; // offset value for each row result cell info
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
...@@ -528,11 +519,7 @@ typedef struct SIndefOperatorInfo { ...@@ -528,11 +519,7 @@ typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SArray* pPseudoColInfo; SArray* pPseudoColInfo;
SExecSupp scalarSup;
SExprInfo* pScalarExpr;
int32_t numOfScalarExpr;
SqlFunctionCtx* pScalarCtx;
int32_t* rowCellInfoOffset;
} SIndefOperatorInfo; } SIndefOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
...@@ -544,13 +531,6 @@ typedef struct SFillOperatorInfo { ...@@ -544,13 +531,6 @@ typedef struct SFillOperatorInfo {
bool multigroupResult; bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SScalarSupp {
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
} SScalarSupp;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
...@@ -563,7 +543,7 @@ typedef struct SGroupbyOperatorInfo { ...@@ -563,7 +543,7 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SScalarSupp scalarSup; SExecSupp scalarSup;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
...@@ -587,7 +567,7 @@ typedef struct SPartitionOperatorInfo { ...@@ -587,7 +567,7 @@ typedef struct SPartitionOperatorInfo {
void* pGroupIter; // group iterator void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes; SSDataBlock* pUpdateRes;
SScalarSupp scalarSupp; SExecSupp scalarSup;
} SPartitionOperatorInfo; } SPartitionOperatorInfo;
typedef struct SWindowRowsSup { typedef struct SWindowRowsSup {
...@@ -764,9 +744,11 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); ...@@ -764,9 +744,11 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
void cleanupExecSupp(SExecSupp* pSupp);
SSDataBlock* loadNextDataBlock(void* param); SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId, char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
...@@ -885,12 +867,13 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -885,12 +867,13 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size); int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap,
int32_t start, int64_t gap, SHashObj* pStDeleted); SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
......
...@@ -42,21 +42,6 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -42,21 +42,6 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
} }
} }
void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// SResultRow *pWindowRes = pResultRowInfo->pResult[i];
// clearResultRow(pRuntimeEnv, pWindowRes);
int32_t groupIndex = 0;
int64_t uid = 0;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, &groupIndex, sizeof(groupIndex), uid);
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
}
pResultRowInfo->size = 0;
}
void closeAllResultRows(SResultRowInfo *pResultRowInfo) { void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
// do nothing // do nothing
} }
...@@ -518,14 +503,14 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -518,14 +503,14 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset) {
SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) { if (pFuncCtx == NULL) {
return NULL; return NULL;
} }
*rowCellInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t)); *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
if (*rowCellInfoOffset == 0) { if (*rowEntryInfoOffset == 0) {
taosMemoryFreeClear(pFuncCtx); taosMemoryFreeClear(pFuncCtx);
return NULL; return NULL;
} }
...@@ -584,8 +569,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -584,8 +569,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
(*rowCellInfoOffset)[i] = (*rowEntryInfoOffset)[i] =
(int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
} }
setSelectValueColumnInfo(pFuncCtx, numOfOutput); setSelectValueColumnInfo(pFuncCtx, numOfOutput);
......
...@@ -147,32 +147,6 @@ static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); ...@@ -147,32 +147,6 @@ static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SArray* getOrderCheckColumns(STaskAttr* pQuery);
typedef struct SRowCompSupporter {
STaskRuntimeEnv* pRuntimeEnv;
int16_t dataOffset;
__compar_fn_t comFunc;
} SRowCompSupporter;
static int compareRowData(const void* a, const void* b, const void* userData) {
const SResultRow* pRow1 = (const SResultRow*)a;
const SResultRow* pRow2 = (const SResultRow*)b;
SRowCompSupporter* supporter = (SRowCompSupporter*)userData;
STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv;
SFilePage* page1 = getBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId);
SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
int16_t offset = supporter->dataOffset;
return 0;
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
// setup the output buffer for each operator // setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
...@@ -410,7 +384,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes ...@@ -410,7 +384,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, STimeWindow* win, static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, STimeWindow* win,
bool masterscan, SResultRow** pResult, int64_t groupId, SqlFunctionCtx* pCtx, bool masterscan, SResultRow** pResult, int64_t groupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) { int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId); return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
} }
...@@ -1248,17 +1222,17 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1248,17 +1222,17 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
if (pQueryAttr->pointInterpQuery) { if (pQueryAttr->pointInterpQuery) {
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset); pTableScanInfo->rowEntryInfoOffset);
} else { } else {
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
pRuntimeEnv->current->groupIndex); pRuntimeEnv->current->groupIndex);
} }
...@@ -1303,7 +1277,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1303,7 +1277,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
...@@ -1389,7 +1363,7 @@ void initResultRow(SResultRow* pResultRow) { ...@@ -1389,7 +1363,7 @@ void initResultRow(SResultRow* pResultRow) {
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx; SqlFunctionCtx* pCtx = pInfo->pCtx;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; int32_t* rowEntryInfoOffset = pInfo->rowEntryInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
initResultRowInfo(pResultRowInfo); initResultRowInfo(pResultRowInfo);
...@@ -1400,7 +1374,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t ...@@ -1400,7 +1374,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
pTaskInfo, false, pSup); pTaskInfo, false, pSup);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowCellInfoOffset); struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
cleanupResultRowEntry(pEntry); cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry; pCtx[i].resultInfo = pEntry;
...@@ -1441,9 +1415,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { ...@@ -1441,9 +1415,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
// cleanupResultRowInfo(&pTableQueryInfo->resInfo); // cleanupResultRowInfo(&pTableQueryInfo->resInfo);
} }
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
...@@ -1543,7 +1517,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u ...@@ -1543,7 +1517,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; int32_t* rowEntryInfoOffset = pAggInfo->binfo.rowEntryInfoOffset;
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup); sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
...@@ -1561,7 +1535,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u ...@@ -1561,7 +1535,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
} }
} }
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
} }
void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) { void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) {
...@@ -1725,7 +1699,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -1725,7 +1699,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
int32_t numOfExprs = pOperator->numOfExprs; int32_t numOfExprs = pOperator->numOfExprs;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; int32_t* rowCellOffset = pbInfo->rowEntryInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes; SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx; SqlFunctionCtx* pCtx = pbInfo->pCtx;
...@@ -1740,7 +1714,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -1740,7 +1714,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
} }
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset) { int32_t* rowEntryInfoOffset) {
// update the number of result for each, only update the number of rows for the corresponding window result. // update the number of result for each, only update the number of rows for the corresponding window result.
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return; // return;
...@@ -1755,7 +1729,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -1755,7 +1729,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
continue; continue;
} }
SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowCellInfoOffset); SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
} }
} }
...@@ -2909,7 +2883,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -2909,7 +2883,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error; goto _error;
} }
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowEntryInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) { if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
...@@ -3561,7 +3535,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -3561,7 +3535,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowEntryInfoOffset);
pBasicInfo->pRes = pResultBlock; pBasicInfo->pRes = pResultBlock;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
...@@ -3607,7 +3581,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3607,7 +3581,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr; pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) { if (pInfo->pScalarExprInfo != NULL) {
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowEntryInfoOffset);
} }
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
...@@ -3659,7 +3633,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { ...@@ -3659,7 +3633,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
assert(pInfo != NULL); assert(pInfo != NULL);
destroySqlFunctionCtx(pInfo->pCtx, numOfOutput); destroySqlFunctionCtx(pInfo->pCtx, numOfOutput);
taosMemoryFreeClear(pInfo->rowCellInfoOffset); taosMemoryFreeClear(pInfo->rowEntryInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo); cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
...@@ -3692,17 +3666,20 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3692,17 +3666,20 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
} }
void cleanupExecSupp(SExecSupp* pSupp) {
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param; SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExecSupp(&pInfo->scalarSup);
destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput);
destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr);
taosMemoryFree(pInfo->rowCellInfoOffset);
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -3829,9 +3806,10 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { ...@@ -3829,9 +3806,10 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
} }
// there is an scalar expression that needs to be calculated before apply the group aggregation. // there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pIndefInfo->pScalarExpr != NULL) { SExecSupp* pScalarSup = &pIndefInfo->scalarSup;
code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx, if (pScalarSup->pExprInfo != NULL) {
pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo); code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -3872,8 +3850,9 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3872,8 +3850,9 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
int32_t numOfScalarExpr = 0; int32_t numOfScalarExpr = 0;
if (pPhyNode->pExprs != NULL) { if (pPhyNode->pExprs != NULL) {
pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr); SExecSupp* pSup = &pInfo->scalarSup;
pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset); pSup->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, numOfScalarExpr, &pSup->rowEntryInfoOffset);
} }
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
...@@ -3893,7 +3872,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3893,7 +3872,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->numOfScalarExpr = numOfScalarExpr;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
......
...@@ -37,6 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -37,6 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pInfo->keyBuf); taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
cleanupExecSupp(&pInfo->scalarSup);
} }
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
...@@ -322,8 +323,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -322,8 +323,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pScalarExprInfo != NULL) { if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL); pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
...@@ -386,9 +387,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -386,9 +387,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList; pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo; pInfo->scalarSup.pExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr; pInfo->scalarSup.numOfExprs = numOfScalarExpr;
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset); pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -645,8 +646,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { ...@@ -645,8 +646,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
} }
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSupp.pScalarExprInfo != NULL) { if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL); pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
...@@ -666,14 +667,18 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -666,14 +667,18 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
taosMemoryFree(key.pData); taosMemoryFree(key.pData);
} }
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
taosMemoryFree(pInfo->keyBuf); taosMemoryFree(pInfo->keyBuf);
taosHashCleanup(pInfo->pGroupSet); taosHashCleanup(pInfo->pGroupSet);
taosMemoryFree(pInfo->columnOffset); taosMemoryFree(pInfo->columnOffset);
cleanupExecSupp(&pInfo->scalarSup);
} }
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
...@@ -691,10 +696,10 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -691,10 +696,10 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) { if (pPartNode->pExprs != NULL) {
pInfo->scalarSupp.numOfScalarExpr = 0; pInfo->scalarSup.numOfExprs = 0;
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr); pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs);
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx( pInfo->scalarSup.pCtx = createSqlFunctionCtx(
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset); pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset);
} }
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
...@@ -752,6 +757,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* ...@@ -752,6 +757,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup); doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL); assert(pResultRow != NULL);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -261,9 +261,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -261,9 +261,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, SExecSupp* pSup = &pTableScanInfo->pseudoSup;
pBlock); addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
...@@ -538,8 +538,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -538,8 +538,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); SExecSupp* pSup = &pInfo->pseudoSup;
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
...@@ -1869,7 +1870,7 @@ typedef struct STableMergeScanInfo { ...@@ -1869,7 +1870,7 @@ typedef struct STableMergeScanInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset; int32_t* rowEntryInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SArray* pColMatchInfo; SArray* pColMatchInfo;
...@@ -1878,7 +1879,7 @@ typedef struct STableMergeScanInfo { ...@@ -1878,7 +1879,7 @@ typedef struct STableMergeScanInfo {
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx; SqlFunctionCtx* pPseudoCtx;
// int32_t* rowCellInfoOffset; // int32_t* rowEntryInfoOffset;
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
...@@ -2257,7 +2258,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2257,7 +2258,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
......
...@@ -39,7 +39,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -39,7 +39,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SArray* pColMatchColInfo = SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
......
...@@ -103,7 +103,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -103,7 +103,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan, static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
...@@ -118,7 +118,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo ...@@ -118,7 +118,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
pResultRow->win = (*win); pResultRow->win = (*win);
*pResult = pResultRow; *pResult = pResultRow;
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -656,7 +656,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -656,7 +656,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
STimeWindow w = pr->win; STimeWindow w = pr->win;
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -812,7 +812,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -812,7 +812,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -836,7 +836,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -836,7 +836,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// restore current time window // restore current time window
ret = ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -861,7 +861,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -861,7 +861,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowEntryInfoOffset,
&pInfo->aggSup, pTaskInfo); &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1042,7 +1042,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1042,7 +1042,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pRowSup->win.ekey = pRowSup->win.skey; pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1068,7 +1068,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1068,7 +1068,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1166,7 +1166,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1166,7 +1166,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
// todo merged with the build group result. // todo merged with the build group result.
static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
int32_t* rowCellInfoOffset) { int32_t* rowEntryInfoOffset) {
size_t num = taosArrayGetSize(pUpdateList); size_t num = taosArrayGetSize(pUpdateList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
...@@ -1176,7 +1176,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr ...@@ -1176,7 +1176,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowCellInfoOffset); SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowEntryInfoOffset);
if (pRow->numOfRows < pEntry->numOfRes) { if (pRow->numOfRows < pEntry->numOfRes) {
pRow->numOfRows = pEntry->numOfRes; pRow->numOfRows = pEntry->numOfRes;
} }
...@@ -1199,7 +1199,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrB ...@@ -1199,7 +1199,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrB
SResultRow* pResult = getResultRowByPos(pResultBuf, p1); SResultRow* pResult = getResultRowByPos(pResultBuf, p1);
SqlFunctionCtx* pCtx = pBinfo->pCtx; SqlFunctionCtx* pCtx = pBinfo->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pBinfo->rowCellInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pBinfo->rowEntryInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue; continue;
...@@ -1338,7 +1338,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1338,7 +1338,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
...@@ -1603,7 +1603,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1603,7 +1603,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
pRowSup->win.ekey = pRowSup->win.skey; pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1623,7 +1623,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1623,7 +1623,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -2068,14 +2068,14 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra ...@@ -2068,14 +2068,14 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
STimeWindow* pParentWin = taosArrayGet(pWinArray, i); STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pInfo->binfo.pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SIntervalAggOperatorInfo* pChInfo = pChildOp->info; SIntervalAggOperatorInfo* pChInfo = pChildOp->info;
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChInfo->binfo.pCtx, setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChInfo->binfo.pCtx,
pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, &pChInfo->aggSup, pTaskInfo); pChildOp->numOfExprs, pChInfo->binfo.rowEntryInfoOffset, &pChInfo->aggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
} }
} }
...@@ -2121,7 +2121,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2121,7 +2121,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
} }
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -2267,7 +2267,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2267,7 +2267,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
} }
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
...@@ -2398,7 +2398,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2398,7 +2398,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
} }
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowEntryInfoOffset);
pBasicInfo->pRes = pResultBlock; pBasicInfo->pRes = pResultBlock;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = NULL; pBasicInfo->pCtx[i].pBuf = NULL;
...@@ -2592,7 +2592,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t ...@@ -2592,7 +2592,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t
} }
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, uint64_t groupId, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
assert(pWinInfo->win.skey <= pWinInfo->win.ekey); assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query // too many time window in query
...@@ -2621,7 +2621,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes ...@@ -2621,7 +2621,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
// set time window for current result // set time window for current result
(*pResult)->win = pWinInfo->win; (*pResult)->win = pWinInfo->win;
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowCellInfoOffset); setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2632,7 +2632,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre ...@@ -2632,7 +2632,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, tsColId); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, tsColId);
TSKEY* tsCols = (int64_t*)pColDataInfo->pData; TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
int32_t code = setWindowOutputBuf(pCurWin, pResult, pBinfo->pCtx, pSDataBlock->info.groupId, numOutput, int32_t code = setWindowOutputBuf(pCurWin, pResult, pBinfo->pCtx, pSDataBlock->info.groupId, numOutput,
pBinfo->rowCellInfoOffset, pAggSup, pTaskInfo); pBinfo->rowEntryInfoOffset, pAggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -2674,7 +2674,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -2674,7 +2674,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset,
&pInfo->streamAggSup, pTaskInfo); &pInfo->streamAggSup, pTaskInfo);
num += startIndex + 1; num += startIndex + 1;
ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pCurWins)); ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pCurWins));
...@@ -2682,7 +2682,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -2682,7 +2682,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
for (int32_t i = startIndex + 1; i < num; i++) { for (int32_t i = startIndex + 1; i < num; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pCurWins, i); SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pCurWins, i);
SResultRow* pWinResult = NULL; SResultRow* pWinResult = NULL;
setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset,
&pInfo->streamAggSup, pTaskInfo); &pInfo->streamAggSup, pTaskInfo);
pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey); pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey);
compactFunctions(pInfo->binfo.pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); compactFunctions(pInfo->binfo.pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
...@@ -2815,7 +2815,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin ...@@ -2815,7 +2815,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset,
&pInfo->streamAggSup, pTaskInfo); &pInfo->streamAggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
...@@ -2829,7 +2829,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin ...@@ -2829,7 +2829,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) {
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId, numOfOutput, setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId, numOfOutput,
pChInfo->binfo.rowCellInfoOffset, &pChInfo->streamAggSup, pTaskInfo); pChInfo->binfo.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
continue; continue;
} }
...@@ -2956,7 +2956,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2956,7 +2956,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
...@@ -3310,7 +3310,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3310,7 +3310,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
...@@ -3428,7 +3428,7 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t ...@@ -3428,7 +3428,7 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, pOperatorInfo->numOfExprs, iaInfo->binfo.rowEntryInfoOffset, pResultBlock,
pTaskInfo); pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
...@@ -3456,7 +3456,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -3456,7 +3456,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
//TODO: remove the hash table usage (groupid + winkey => result row position) //TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); numOfOutput, iaInfo->binfo.rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -3483,7 +3483,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -3483,7 +3483,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
startPos = currPos; startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); numOfOutput, iaInfo->binfo.rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册