提交 7721353b 编写于 作者: L Liu Jicong

enh: put func state into tdb

上级 bbab0327
...@@ -138,7 +138,7 @@ typedef struct SqlFunctionCtx { ...@@ -138,7 +138,7 @@ typedef struct SqlFunctionCtx {
char *pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param // input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param; SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum // corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput; SColumnInfoData *pTsOutput;
int32_t offset; int32_t offset;
...@@ -152,6 +152,7 @@ typedef struct SqlFunctionCtx { ...@@ -152,6 +152,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle; SSerializeDataHandle saveHandle;
int32_t exprIdx;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
...@@ -182,9 +183,9 @@ struct SScalarParam { ...@@ -182,9 +183,9 @@ struct SScalarParam {
int32_t numOfQualified; // number of qualified elements in the final results int32_t numOfQualified; // number of qualified elements in the final results
}; };
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
......
...@@ -182,7 +182,7 @@ struct SExecTaskInfo { ...@@ -182,7 +182,7 @@ struct SExecTaskInfo {
SSubplan* pSubplan; SSubplan* pSubplan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
SArray* pResultBlockList;// result block list SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
}; };
...@@ -199,7 +199,7 @@ typedef struct SOperatorFpSet { ...@@ -199,7 +199,7 @@ typedef struct SOperatorFpSet {
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_encode_fn_t encodeResultRow; __optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow; __optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn; __optr_explain_fn_t getExplainFn;
...@@ -255,22 +255,22 @@ typedef struct SLimitInfo { ...@@ -255,22 +255,22 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
...@@ -305,9 +305,9 @@ typedef struct { ...@@ -305,9 +305,9 @@ typedef struct {
} SAggOptrPushDownInfo; } SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo { typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch; uint64_t metaFetch;
uint64_t cacheHit; uint64_t cacheHit;
} STableMetaCacheInfo; } STableMetaCacheInfo;
typedef struct STableScanBase { typedef struct STableScanBase {
...@@ -325,46 +325,46 @@ typedef struct STableScanBase { ...@@ -325,46 +325,46 @@ typedef struct STableScanBase {
} STableScanBase; } STableScanBase;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STableScanBase base; STableScanBase base;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
int8_t scanMode; int8_t scanMode;
int8_t assignBlockUid; int8_t assignBlockUid;
bool hasGroupByTag; bool hasGroupByTag;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond SArray* queryConds; // array of queryTableDataCond
STableScanBase base; STableScanBase base;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo* pCols; SColumnInfo* pCols;
SSDataBlock* pRes; SSDataBlock* pRes;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -499,8 +499,8 @@ typedef struct STableCountScanOperatorInfo { ...@@ -499,8 +499,8 @@ typedef struct STableCountScanOperatorInfo {
STableCountScanSupp supp; STableCountScanSupp supp;
int32_t currGrpIdx; int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo; } STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
...@@ -678,19 +678,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 ...@@ -678,19 +678,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
void destroyOperatorInfo(SOperatorInfo* pOperator); void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator); int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey, void* pState);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
...@@ -803,10 +803,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, ...@@ -803,10 +803,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model); char* sql, EOPTR_EXEC_MODEL model);
...@@ -828,8 +828,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); ...@@ -828,8 +828,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName); uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......
...@@ -608,9 +608,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB ...@@ -608,9 +608,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
} }
} }
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; }
return (0 != pTaskInfo->code) ? true : false;
}
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
...@@ -1353,7 +1351,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -1353,7 +1351,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
} }
} }
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
if (!tsCountAlwaysReturnValue) { if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1361,12 +1359,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc ...@@ -1361,12 +1359,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo *)downstream->info)->hasGroupByTag == true)) { ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false; bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
...@@ -1415,7 +1413,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc ...@@ -1415,7 +1413,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) { static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
if (!blockAllocated) { if (!blockAllocated) {
return; return;
} }
...@@ -1441,8 +1439,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1441,8 +1439,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
bool hasValidBlock = false; bool hasValidBlock = false;
bool blockAllocated = false; bool blockAllocated = false;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
...@@ -1485,7 +1483,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1485,7 +1483,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
destroyDataBlockForEmptyInput(blockAllocated, &pBlock); destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
} }
// the downstream operator may return with error code, so let's check the code before generating results. // the downstream operator may return with error code, so let's check the code before generating results.
...@@ -1641,7 +1638,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -1641,7 +1638,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
} }
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey) { const char* pkey, void* pState) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1653,7 +1650,13 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo ...@@ -1653,7 +1650,13 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; if (pState) {
pSup->pCtx[i].saveHandle.pBuf = NULL;
pSup->pCtx[i].saveHandle.pState = pState;
pSup->pCtx[i].exprIdx = i;
} else {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1738,7 +1741,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -1738,7 +1741,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1764,7 +1768,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -1764,7 +1768,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info; STableScanInfo* pTableScanInfo = downstream->info;
......
...@@ -458,7 +458,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -458,7 +458,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -102,7 +102,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -102,7 +102,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -317,7 +318,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -317,7 +318,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pProjectInfo->mergeDataBlocks) { if (pProjectInfo->mergeDataBlocks) {
if (pRes->info.rows > 0) { if (pRes->info.rows > 0) {
pFinalRes->info.id.groupId = 0; //clear groupId pFinalRes->info.id.groupId = 0; // clear groupId
pFinalRes->info.version = pRes->info.version; pFinalRes->info.version = pRes->info.version;
// continue merge data, ignore the group id // continue merge data, ignore the group id
...@@ -402,7 +403,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -402,7 +403,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
blockDataEnsureCapacity(pResBlock, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows);
int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "tfill.h" #include "tfill.h"
#include "ttime.h" #include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
...@@ -666,8 +666,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -666,8 +666,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
numOfExprs); pBlock->info.rows, numOfExprs);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr); closeResultRow(pr);
...@@ -817,13 +817,13 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { ...@@ -817,13 +817,13 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
} else { } else {
int32_t code = comparePullWinKey(pPullInfo, pPullWins, index); int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
if (code == 0) { if (code == 0) {
SPullWindowInfo* pos = taosArrayGet(pPullWins ,index); SPullWindowInfo* pos = taosArrayGet(pPullWins, index);
pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey); pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey); pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey); pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey); pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (code > 0 ){ } else if (code > 0) {
index++; index++;
} }
} }
...@@ -879,10 +879,10 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { ...@@ -879,10 +879,10 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
} else if (pRKey->groupId < pDataPos->groupId) { } else if (pRKey->groupId < pDataPos->groupId) {
return -1; return -1;
} }
if (*(int64_t*)pRKey->key > pDataPos->ts) { if (*(int64_t*)pRKey->key > pDataPos->ts) {
return 1; return 1;
} else if (*(int64_t*)pRKey->key < pDataPos->ts){ } else if (*(int64_t*)pRKey->key < pDataPos->ts) {
return -1; return -1;
} }
return 0; return 0;
...@@ -961,8 +961,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -961,8 +961,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
...@@ -996,8 +996,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -996,8 +996,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
#endif #endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
...@@ -1164,7 +1164,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1164,7 +1164,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
...@@ -1188,8 +1188,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1188,8 +1188,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
...@@ -1394,8 +1394,8 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1394,8 +1394,8 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = *(uint64_t*)key;
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte; SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins); int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
...@@ -1656,7 +1656,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt ...@@ -1656,7 +1656,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
// the primary timestamp column // the primary timestamp column
bool needed = false; bool needed = false;
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr; SExprInfo* pExpr = pCtx[i].pExpr;
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
needed = true; needed = true;
...@@ -1724,7 +1724,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor ...@@ -1724,7 +1724,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
for (int32_t i = 0; i < numOfExpr; i++) { for (int32_t i = 0; i < numOfExpr; i++) {
// pCtx[i].isStream = true; // pCtx[i].isStream = true;
} }
} }
...@@ -1748,7 +1748,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1748,7 +1748,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code =
initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1800,8 +1801,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1800,8 +1801,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1869,7 +1870,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1869,7 +1870,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
...@@ -1886,8 +1887,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1886,8 +1887,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
...@@ -2000,7 +2001,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2000,7 +2001,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2018,8 +2020,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2018,8 +2020,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2068,7 +2070,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2068,7 +2070,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2091,8 +2094,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2091,8 +2094,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2309,7 +2312,8 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { ...@@ -2309,7 +2312,8 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
if (!chIds) { if (!chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; SPullWindowInfo pull = {
.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size1 = taosArrayGetSize(pInfo->pChildren); int32_t size1 = taosArrayGetSize(pInfo->pChildren);
...@@ -2384,7 +2388,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2384,7 +2388,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
}; };
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; SPullWindowInfo pull = {
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -2439,7 +2444,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2439,7 +2444,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput); pSDataBlock->info.rows, numOfOutput);
SWinKey key = { SWinKey key = {
.ts = nextWin.skey, .ts = nextWin.skey,
.groupId = groupId, .groupId = groupId,
...@@ -2530,7 +2535,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2530,7 +2535,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64 , IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
break; break;
} }
...@@ -2650,7 +2656,7 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { ...@@ -2650,7 +2656,7 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
if (pIntervalPhyNode->window.deleteMark <= 0) { if (pIntervalPhyNode->window.deleteMark <= 0) {
return DEAULT_DELETE_MARK; return DEAULT_DELETE_MARK;
} }
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark); int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark);
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval); deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
return deleteMark; return deleteMark;
} }
...@@ -2698,7 +2704,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2698,7 +2704,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2763,8 +2770,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2763,8 +2770,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
} }
...@@ -3563,8 +3570,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3563,8 +3570,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo); OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
if (downstream) { if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
...@@ -4068,8 +4075,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4068,8 +4075,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4154,8 +4161,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4154,8 +4161,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
pBlock->info.rows, pSup->numOfExprs); currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
...@@ -4175,7 +4182,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4175,7 +4182,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, pSup->numOfExprs); pBlock->info.rows, pSup->numOfExprs);
} }
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
...@@ -4326,7 +4333,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4326,7 +4333,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4345,8 +4353,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4345,8 +4353,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
false, OP_NOT_OPENED, miaInfo, pTaskInfo); false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4481,7 +4489,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4481,7 +4489,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed // output previous interval results after this interval (&win) is closed
...@@ -4513,7 +4521,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4513,7 +4521,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed // output previous interval results after this interval (&nextWin) is closed
...@@ -4630,7 +4638,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -4630,7 +4638,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4650,8 +4659,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -4650,8 +4659,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4828,7 +4837,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4828,7 +4837,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -32,7 +32,7 @@ typedef struct SSumRes { ...@@ -32,7 +32,7 @@ typedef struct SSumRes {
int16_t type; int16_t type;
int64_t prevTs; int64_t prevTs;
bool isPrevTsSet; bool isPrevTsSet;
bool overflow; // if overflow is true, dsum to be used for any type; bool overflow; // if overflow is true, dsum to be used for any type;
} SSumRes; } SSumRes;
typedef struct SMinmaxResInfo { typedef struct SMinmaxResInfo {
...@@ -46,7 +46,7 @@ typedef struct SMinmaxResInfo { ...@@ -46,7 +46,7 @@ typedef struct SMinmaxResInfo {
} SMinmaxResInfo; } SMinmaxResInfo;
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc); int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc);
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey); STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
......
...@@ -41,15 +41,15 @@ ...@@ -41,15 +41,15 @@
#define HLL_BUCKET_MASK (HLL_BUCKETS - 1) #define HLL_BUCKET_MASK (HLL_BUCKETS - 1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2) #define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
//typedef struct SMinmaxResInfo { // typedef struct SMinmaxResInfo {
// bool assign; // assign the first value or not // bool assign; // assign the first value or not
// int64_t v; // int64_t v;
// STuplePos tuplePos; // STuplePos tuplePos;
// //
// STuplePos nullTuplePos; // STuplePos nullTuplePos;
// bool nullTupleSaved; // bool nullTupleSaved;
// int16_t type; // int16_t type;
//} SMinmaxResInfo; // } SMinmaxResInfo;
typedef struct STopBotResItem { typedef struct STopBotResItem {
SVariant v; SVariant v;
...@@ -2018,7 +2018,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde ...@@ -2018,7 +2018,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde
} }
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL); pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} else { } else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} }
...@@ -2236,9 +2236,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2236,9 +2236,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t round = pInput->numOfRows >> 2; int32_t round = pInput->numOfRows >> 2;
int32_t reminder = pInput->numOfRows & 0x03; int32_t reminder = pInput->numOfRows & 0x03;
for (int32_t i = pInput->startRowIndex, tick = 0; tick < round; i += 4, tick += 1) { for (int32_t i = pInput->startRowIndex, tick = 0; tick < round; i += 4, tick += 1) {
int64_t cts = pts[i]; int64_t cts = pts[i];
int32_t chosen = i; int32_t chosen = i;
if (cts < pts[i + 1]) { if (cts < pts[i + 1]) {
cts = pts[i + 1]; cts = pts[i + 1];
...@@ -2262,18 +2262,18 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2262,18 +2262,18 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
} }
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data); doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
} }
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
} }
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++; numOfElems++;
...@@ -2439,7 +2439,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2439,7 +2439,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
...@@ -2777,7 +2777,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2777,7 +2777,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2805,7 +2805,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -2805,7 +2805,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
...@@ -2863,7 +2863,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -2863,7 +2863,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple // save the data of this tuple
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
...@@ -2937,8 +2937,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid ...@@ -2937,8 +2937,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf; return buf;
} }
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) {
const STupleKey* pKey) {
STuplePos p = {0}; STuplePos p = {0};
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
...@@ -2964,20 +2963,31 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf ...@@ -2964,20 +2963,31 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else {
// other tuple save policy // other tuple save policy
if (streamStateFuncPut(pHandle->pState, pKey, pBuf, length) < 0) { if (streamStateFuncPut(pHandle->pState, &key, pBuf, length) < 0) {
ASSERT(0); ASSERT(0);
} }
p.streamTupleKey = *pKey; p.streamTupleKey = key;
} }
return p; return p;
} }
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) { STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
prepareBuf(pCtx); prepareBuf(pCtx);
STupleKey key;
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
key.exprIdx = pCtx->exprIdx;
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey); return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key);
} }
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
...@@ -3623,7 +3633,7 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn ...@@ -3623,7 +3633,7 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn
pInfo->totalCount = 0; pInfo->totalCount = 0;
pInfo->normalized = 0; pInfo->normalized = 0;
char *binTypeStr = strndup(varDataVal(pCtx->param[1].param.pz), varDataLen(pCtx->param[1].param.pz)); char* binTypeStr = strndup(varDataVal(pCtx->param[1].param.pz), varDataLen(pCtx->param[1].param.pz));
int8_t binType = getHistogramBinType(binTypeStr); int8_t binType = getHistogramBinType(binTypeStr);
taosMemoryFree(binTypeStr); taosMemoryFree(binTypeStr);
...@@ -3947,7 +3957,6 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -3947,7 +3957,6 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data); SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data);
...@@ -4423,7 +4432,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4423,7 +4432,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
...@@ -4454,7 +4463,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4454,7 +4463,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }
...@@ -4758,7 +4767,7 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, ...@@ -4758,7 +4767,7 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
pItem->count += 1; pItem->count += 1;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, NULL); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock);
} }
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
...@@ -4798,7 +4807,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { ...@@ -4798,7 +4807,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }
...@@ -5299,7 +5308,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5299,7 +5308,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t len = sprintf(st + VARSTR_HEADER_SIZE, int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
"Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize/1024.0, compRatio, '%'); pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
......
...@@ -64,7 +64,7 @@ ...@@ -64,7 +64,7 @@
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) { static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256; const int32_t bitWidth = 256;
*width = (bitWidth>>3u) / bytes; *width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width); *remainder = numOfRows % (*width);
*rounds = numOfRows / (*width); *rounds = numOfRows / (*width);
} }
...@@ -92,8 +92,7 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder ...@@ -92,8 +92,7 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder
(_v) = (_sec)[j]; \ (_v) = (_sec)[j]; \
} \ } \
} }
static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) { static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) {
int8_t v = 0; int8_t v = 0;
const int8_t* p = pData; const int8_t* p = pData;
...@@ -116,7 +115,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu ...@@ -116,7 +115,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu
const int8_t* q = (const int8_t*)&initVal; const int8_t* q = (const int8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value } else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p); next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epu8(initVal, next); initVal = _mm256_max_epu8(initVal, next);
...@@ -126,7 +125,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu ...@@ -126,7 +125,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu
const uint8_t* q = (const uint8_t*)&initVal; const uint8_t* q = (const uint8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} }
} else { // min function } else { // min function
if (signVal) { if (signVal) {
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
...@@ -241,7 +240,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is ...@@ -241,7 +240,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is
// let compare the final results // let compare the final results
const int32_t* q = (const int32_t*)&initVal; const int32_t* q = (const int32_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value } else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p); next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi32(initVal, next); initVal = _mm256_max_epi32(initVal, next);
...@@ -281,7 +280,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is ...@@ -281,7 +280,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is
} }
static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) { static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) {
float v = 0; float v = 0;
const float* p = pData; const float* p = pData;
int32_t width, remain, rounds; int32_t width, remain, rounds;
...@@ -358,7 +357,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is ...@@ -358,7 +357,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is
static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows) { static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows) {
int32_t i = start; int32_t i = start;
while (i < (start + numOfRows) && (colDataIsNull_f(pCol->nullbitmap, i) == true)) { while (i < (start + numOfRows) && (colDataIsNull_f(pCol->nullbitmap, i) == true)) {
i += 1; i += 1;
} }
...@@ -495,7 +494,8 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S ...@@ -495,7 +494,8 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S
} }
} }
static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc) { static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf,
bool isMinFunc) {
float* pData = (float*)pCol->pData; float* pData = (float*)pCol->pData;
float* val = (float*)&pBuf->v; float* val = (float*)&pBuf->v;
...@@ -525,7 +525,8 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo ...@@ -525,7 +525,8 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
pBuf->assign = true; pBuf->assign = true;
} }
static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc) { static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf,
bool isMinFunc) {
double* pData = (double*)pCol->pData; double* pData = (double*)pCol->pData;
double* val = (double*)&pBuf->v; double* val = (double*)&pBuf->v;
...@@ -740,7 +741,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -740,7 +741,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} else { } else {
...@@ -754,7 +755,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -754,7 +755,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -768,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -768,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -782,7 +783,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -782,7 +783,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -798,7 +799,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -798,7 +799,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -819,7 +820,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -819,7 +820,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes); memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
numOfElems = 1; numOfElems = 1;
...@@ -883,9 +884,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -883,9 +884,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
_over: _over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pBuf->nullTupleSaved = true; pBuf->nullTupleSaved = true;
} }
return numOfElems; return numOfElems;
} }
\ No newline at end of file
...@@ -107,7 +107,7 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, ...@@ -107,7 +107,7 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
} }
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
szPage = szPage < 0 ? 4096 : szPage; szPage = szPage < 0 ? (16 * 1024) : szPage;
pages = pages < 0 ? 256 : pages; pages = pages < 0 ? 256 : pages;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册