提交 ee5517d5 编写于 作者: 5 54liuyao

feat(stream): combine function spread\elapsed\histogram\hll

上级 80d2d150
...@@ -1494,10 +1494,8 @@ typedef struct { ...@@ -1494,10 +1494,8 @@ typedef struct {
int32_t code; int32_t code;
} STaskDropRsp; } STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE_SMA 0
#define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2 #define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_WINDOW_CLOSE_SMA 3
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
......
...@@ -395,13 +395,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -395,13 +395,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay; req.pRSmaParam.delay = pStb->delay;
if (pStb->ast1Len > 0) { if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
} }
if (pStb->ast2Len > 0) { if (pStb->ast2Len > 0) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
......
...@@ -458,7 +458,6 @@ typedef struct STimeWindowSupp { ...@@ -458,7 +458,6 @@ typedef struct STimeWindowSupp {
int64_t waterMark; int64_t waterMark;
TSKEY maxTs; TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
SHashObj *winMap;
} STimeWindowAggSupp; } STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
...@@ -908,8 +907,6 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap ...@@ -908,8 +907,6 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted); int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int64_t getSmaWaterMark(int64_t interval, double filesFactor);
bool isSmaStream(int8_t triggerType);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
......
...@@ -4722,18 +4722,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4722,18 +4722,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.waterMark = pIntervalPhyNode->window.watermark, .waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.winMap = NULL,
}; };
if (isSmaStream(pIntervalPhyNode->window.triggerType)) {
if (FLT_LESS(pIntervalPhyNode->window.filesFactor, 1.000000)) {
as.calTrigger = STREAM_TRIGGER_AT_ONCE_SMA;
} else {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
as.winMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
as.waterMark = getSmaWaterMark(interval.interval, pIntervalPhyNode->window.filesFactor);
as.calTrigger = STREAM_TRIGGER_WINDOW_CLOSE_SMA;
}
}
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type); bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
...@@ -5446,16 +5435,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -5446,16 +5435,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
return code; return code;
} }
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
int64_t waterMark = 0;
ASSERT(FLT_GREATEREQUAL(filesFactor, 0.000000));
waterMark = -1 * filesFactor;
return waterMark;
}
bool isSmaStream(int8_t triggerType) {
if (triggerType == STREAM_TRIGGER_AT_ONCE || triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
return false;
}
return true;
}
...@@ -1028,10 +1028,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1028,10 +1028,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto _error; goto _error;
} }
if (isSmaStream(pTableScanNode->triggerType)) {
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTableScanNode->filesFactor);
}
if (pSTInfo->interval.interval > 0 && pDataReader) { if (pSTInfo->interval.interval > 0 && pDataReader) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else { } else {
......
...@@ -818,13 +818,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -818,13 +818,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
saveResult(pResult, tableGroupId, pUpdated); saveResult(pResult, tableGroupId, pUpdated);
} }
if (pInfo->twAggSup.winMap) {
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
}
} }
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
...@@ -872,13 +868,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -872,13 +868,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
saveResult(pResult, tableGroupId, pUpdated); saveResult(pResult, tableGroupId, pUpdated);
} }
if (pInfo->twAggSup.winMap) {
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
}
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
...@@ -1264,16 +1256,9 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1264,16 +1256,9 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
if (win.ekey < pSup->maxTs - pSup->waterMark) { if (win.ekey < pSup->maxTs - pSup->waterMark) {
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
if (taosHashGet(pSup->winMap, &win.skey, sizeof(TSKEY))) {
continue;
}
}
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
if (pSup->calTrigger != STREAM_TRIGGER_AT_ONCE_SMA && pSup->calTrigger != STREAM_TRIGGER_WINDOW_CLOSE_SMA) { taosHashRemove(pHashMap, keyBuf, keyLen);
taosHashRemove(pHashMap, keyBuf, keyLen);
}
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) { if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -1281,11 +1266,10 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1281,11 +1266,10 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
pos->groupId = groupId; pos->groupId = groupId;
pos->pos = *(SResultRowPosition*)pIte; pos->pos = *(SResultRowPosition*)pIte;
*(int64_t*)pos->key = ts; *(int64_t*)pos->key = ts;
if (!taosArrayPush(closeWins, &pos)) { if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) {
taosMemoryFree(pos); taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashPut(pSup->winMap, &win.skey, sizeof(TSKEY), NULL, 0);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1340,10 +1324,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1340,10 +1324,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE || taosArrayAddAll(pUpdated, pClosed);
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
taosArrayAddAll(pUpdated, pClosed);
}
taosArrayDestroy(pClosed); taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
...@@ -2127,7 +2108,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2127,7 +2108,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.waterMark = pIntervalPhyNode->window.watermark, .waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.winMap = NULL,
}; };
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -3141,7 +3121,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3141,7 +3121,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.waterMark = pStateNode->window.watermark, .waterMark = pStateNode->window.watermark,
.calTrigger = pStateNode->window.triggerType, .calTrigger = pStateNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.winMap = NULL,
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
......
...@@ -112,6 +112,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); ...@@ -112,6 +112,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx);
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getSpreadInfoSize(); int32_t getSpreadInfoSize();
int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
...@@ -120,6 +121,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); ...@@ -120,6 +121,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx);
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getElapsedInfoSize(); int32_t getElapsedInfoSize();
int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
...@@ -128,6 +130,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); ...@@ -128,6 +130,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx);
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getHistogramInfoSize(); int32_t getHistogramInfoSize();
int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t hllFunction(SqlFunctionCtx* pCtx); int32_t hllFunction(SqlFunctionCtx* pCtx);
...@@ -135,6 +138,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx); ...@@ -135,6 +138,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx);
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getHLLInfoSize(); int32_t getHLLInfoSize();
int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
...@@ -1447,6 +1447,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1447,6 +1447,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = apercentileFunctionSetup, .initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction, .processFunc = apercentileFunction,
.finalizeFunc = apercentileFinalize, .finalizeFunc = apercentileFinalize,
.invertFunc = NULL,
.combineFunc = apercentileCombine, .combineFunc = apercentileCombine,
.pPartialFunc = "_apercentile_partial", .pPartialFunc = "_apercentile_partial",
.pMergeFunc = "_apercentile_merge" .pMergeFunc = "_apercentile_merge"
...@@ -1459,7 +1460,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1459,7 +1460,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getApercentileFuncEnv, .getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup, .initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction, .processFunc = apercentileFunction,
.finalizeFunc = apercentilePartialFinalize .finalizeFunc = apercentilePartialFinalize,
.invertFunc = NULL,
.combineFunc = apercentileCombine,
}, },
{ {
.name = "_apercentile_merge", .name = "_apercentile_merge",
...@@ -1469,7 +1472,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1469,7 +1472,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getApercentileFuncEnv, .getEnvFunc = getApercentileFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = apercentileFunctionMerge, .processFunc = apercentileFunctionMerge,
.finalizeFunc = apercentileFinalize .finalizeFunc = apercentileFinalize,
.invertFunc = NULL,
.combineFunc = apercentileCombine,
}, },
{ {
.name = "top", .name = "top",
...@@ -1503,6 +1508,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1503,6 +1508,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = spreadFunctionSetup, .initFunc = spreadFunctionSetup,
.processFunc = spreadFunction, .processFunc = spreadFunction,
.finalizeFunc = spreadFinalize, .finalizeFunc = spreadFinalize,
.invertFunc = NULL,
.combineFunc = spreadCombine,
.pPartialFunc = "_spread_partial", .pPartialFunc = "_spread_partial",
.pMergeFunc = "_spread_merge" .pMergeFunc = "_spread_merge"
}, },
...@@ -1515,7 +1522,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1515,7 +1522,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSpreadFuncEnv, .getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup, .initFunc = spreadFunctionSetup,
.processFunc = spreadFunction, .processFunc = spreadFunction,
.finalizeFunc = spreadPartialFinalize .finalizeFunc = spreadPartialFinalize,
.invertFunc = NULL,
.combineFunc = spreadCombine,
}, },
{ {
.name = "_spread_merge", .name = "_spread_merge",
...@@ -1526,7 +1535,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1526,7 +1535,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSpreadFuncEnv, .getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup, .initFunc = spreadFunctionSetup,
.processFunc = spreadFunctionMerge, .processFunc = spreadFunctionMerge,
.finalizeFunc = spreadFinalize .finalizeFunc = spreadFinalize,
.invertFunc = NULL,
.combineFunc = spreadCombine,
}, },
{ {
.name = "elapsed", .name = "elapsed",
...@@ -1538,6 +1549,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1538,6 +1549,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = elapsedFunctionSetup, .initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction, .processFunc = elapsedFunction,
.finalizeFunc = elapsedFinalize, .finalizeFunc = elapsedFinalize,
.invertFunc = NULL,
.combineFunc = elapsedCombine,
.pPartialFunc = "_elapsed_partial", .pPartialFunc = "_elapsed_partial",
.pMergeFunc = "_elapsed_merge" .pMergeFunc = "_elapsed_merge"
}, },
...@@ -1550,7 +1563,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1550,7 +1563,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getElapsedFuncEnv, .getEnvFunc = getElapsedFuncEnv,
.initFunc = elapsedFunctionSetup, .initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction, .processFunc = elapsedFunction,
.finalizeFunc = elapsedPartialFinalize .finalizeFunc = elapsedPartialFinalize,
.invertFunc = NULL,
.combineFunc = elapsedCombine,
}, },
{ {
.name = "_elapsed_merge", .name = "_elapsed_merge",
...@@ -1561,7 +1576,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1561,7 +1576,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getElapsedFuncEnv, .getEnvFunc = getElapsedFuncEnv,
.initFunc = elapsedFunctionSetup, .initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunctionMerge, .processFunc = elapsedFunctionMerge,
.finalizeFunc = elapsedFinalize .finalizeFunc = elapsedFinalize,
.invertFunc = NULL,
.combineFunc = elapsedCombine,
}, },
{ {
.name = "last_row", .name = "last_row",
...@@ -1614,8 +1631,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1614,8 +1631,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = histogramFunctionSetup, .initFunc = histogramFunctionSetup,
.processFunc = histogramFunction, .processFunc = histogramFunction,
.finalizeFunc = histogramFinalize, .finalizeFunc = histogramFinalize,
.invertFunc = NULL,
.combineFunc = histogramCombine,
.pPartialFunc = "_histogram_partial", .pPartialFunc = "_histogram_partial",
.pMergeFunc = "_histogram_merge" .pMergeFunc = "_histogram_merge",
}, },
{ {
.name = "_histogram_partial", .name = "_histogram_partial",
...@@ -1625,7 +1644,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1625,7 +1644,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHistogramFuncEnv, .getEnvFunc = getHistogramFuncEnv,
.initFunc = histogramFunctionSetup, .initFunc = histogramFunctionSetup,
.processFunc = histogramFunction, .processFunc = histogramFunction,
.finalizeFunc = histogramPartialFinalize .finalizeFunc = histogramPartialFinalize,
.invertFunc = NULL,
.combineFunc = histogramCombine,
}, },
{ {
.name = "_histogram_merge", .name = "_histogram_merge",
...@@ -1635,7 +1656,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1635,7 +1656,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHistogramFuncEnv, .getEnvFunc = getHistogramFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = histogramFunctionMerge, .processFunc = histogramFunctionMerge,
.finalizeFunc = histogramFinalize .finalizeFunc = histogramFinalize,
.invertFunc = NULL,
.combineFunc = histogramCombine,
}, },
{ {
.name = "hyperloglog", .name = "hyperloglog",
...@@ -1646,6 +1669,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1646,6 +1669,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = hllFunction, .processFunc = hllFunction,
.finalizeFunc = hllFinalize, .finalizeFunc = hllFinalize,
.invertFunc = NULL,
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_partial", .pPartialFunc = "_hyperloglog_partial",
.pMergeFunc = "_hyperloglog_merge" .pMergeFunc = "_hyperloglog_merge"
}, },
...@@ -1657,7 +1682,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1657,7 +1682,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = hllFunction, .processFunc = hllFunction,
.finalizeFunc = hllPartialFinalize .finalizeFunc = hllPartialFinalize,
.invertFunc = NULL,
.combineFunc = hllCombine,
}, },
{ {
.name = "_hyperloglog_merge", .name = "_hyperloglog_merge",
...@@ -1667,7 +1694,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1667,7 +1694,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = hllFunctionMerge, .processFunc = hllFunctionMerge,
.finalizeFunc = hllFinalize .finalizeFunc = hllFinalize,
.invertFunc = NULL,
.combineFunc = hllCombine,
}, },
{ {
.name = "diff", .name = "diff",
......
...@@ -2227,7 +2227,6 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2227,7 +2227,6 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SAPercentileInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); SAPercentileInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
...@@ -3100,6 +3099,17 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3100,6 +3099,17 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SSpreadInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
spreadTransferInfo(pDBuf, pSBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t getElapsedInfoSize() { int32_t getElapsedInfoSize() {
return (int32_t)sizeof(SElapsedInfo); return (int32_t)sizeof(SElapsedInfo);
} }
...@@ -3259,6 +3269,18 @@ int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3259,6 +3269,18 @@ int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SElapsedInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SElapsedInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
elapsedTransferInfo(pDBuf, pSBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t getHistogramInfoSize() { int32_t getHistogramInfoSize() {
return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
} }
...@@ -3554,6 +3576,18 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3554,6 +3576,18 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return 1; return 1;
} }
int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SHistoFuncInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SHistoFuncInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
histogramTransferInfo(pDBuf, pSBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t getHLLInfoSize() { int32_t getHLLInfoSize() {
return (int32_t)sizeof(SHLLInfo); return (int32_t)sizeof(SHLLInfo);
} }
...@@ -3738,6 +3772,18 @@ int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3738,6 +3772,18 @@ int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SHLLInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SHLLInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
hllTransferInfo(pDBuf, pSBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStateInfo); pEnv->calcMemSize = sizeof(SStateInfo);
return true; return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册