提交 8c47a18d 编写于 作者: 5 54liuyao

feat(stream): semi session operator

上级 c0074588
......@@ -154,6 +154,7 @@ typedef struct SWindowLogicNode {
int8_t slidingUnit;
int64_t sessionGap;
SNode* pTspk;
SNode* pTsEnd;
SNode* pStateExpr;
int8_t triggerType;
int64_t watermark;
......@@ -338,6 +339,7 @@ typedef struct SWinodwPhysiNode {
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
SNode* pTspk; // timestamp primary key
SNode* pTsEnd; // window end timestamp
int8_t triggerType;
int64_t watermark;
double filesFactor;
......
......@@ -423,6 +423,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray* pChildren;
SSDataBlock* pUpdateRes;
SPhysiNode* pPhyNode; // create new child
bool isFinal;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -547,14 +548,18 @@ typedef struct SStreamSessionAggOperatorInfo {
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
......@@ -813,10 +818,10 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap,
SHashObj* pStDeleted);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
......
......@@ -706,11 +706,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], pSDB->info.groupId, gap, &winIndex);
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN,
pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
pInfo->updateResIndex +=
updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL);
......
......@@ -70,11 +70,21 @@ int32_t fmFuncMgtInit() {
}
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
if (NULL != pVal) {
pFunc->funcId = *(int32_t*)pVal;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
if (NULL != gFunMgtService.pFuncNameHashTable) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
if (NULL != pVal) {
pFunc->funcId = *(int32_t*)pVal;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
......@@ -233,17 +243,7 @@ bool fmIsSameInOutType(int32_t funcId) {
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) {
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, msg, sizeof(msg));
}
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
}
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
......
......@@ -423,6 +423,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(slidingUnit);
COPY_SCALAR_FIELD(sessionGap);
CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
CLONE_NODE_FIELD(pStateExpr);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
......@@ -522,6 +523,7 @@ static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pD
CLONE_NODE_LIST_FIELD(pExprs);
CLONE_NODE_LIST_FIELD(pFuncs);
CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
......
......@@ -1784,6 +1784,7 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor";
......@@ -1801,6 +1802,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsEnd, nodeToJson, pNode->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
}
......@@ -1827,6 +1831,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsEnd, (SNode**)&pNode->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);
;
......
......@@ -352,6 +352,7 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
nodesDestroyList(pNode->pExprs);
nodesDestroyList(pNode->pFuncs);
nodesDestroyNode(pNode->pTspk);
nodesDestroyNode(pNode->pTsEnd);
}
static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
......@@ -718,6 +719,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pFuncs);
nodesDestroyNode(pLogicNode->pTspk);
nodesDestroyNode(pLogicNode->pTsEnd);
break;
}
case QUERY_NODE_LOGIC_PLAN_FILL: {
......
......@@ -559,6 +559,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
pWindow->pTsEnd = nodesCloneNode((SNode*)pSession->pCol);
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
......
......@@ -961,6 +961,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
}
if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
......
......@@ -176,7 +176,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType) ) {
return false;
}
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
......@@ -257,6 +257,34 @@ static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
return code;
}
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
int32_t index = 0;
SNode* pFunc = NULL;
FOREACH(pFunc, pWin->pFuncs) {
if (FUNCTION_TYPE_WENDTS == ((SFunctionNode*)pFunc)->funcType) {
*pIndex = index;
return TSDB_CODE_SUCCESS;
}
++index;
}
SFunctionNode* pWEnd = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pWEnd) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pWEnd->functionName, "_wendts");
snprintf(pWEnd->node.aliasName, sizeof(pWEnd->node.aliasName), "%s.%p", pWEnd->functionName, pWEnd);
int32_t code = fmGetFuncInfo(pWEnd, NULL, 0);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
}
*pIndex = index;
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
}
return code;
}
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
SNodeList* pFunc = pMergeWindow->pFuncs;
pMergeWindow->pFuncs = NULL;
......@@ -425,8 +453,18 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL;
SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
int32_t index = 0;
int32_t code = stbSplAppendWEnd(pPartWin, &index);
if (TSDB_CODE_SUCCESS == code) {
pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
if (NULL == pMergeWin->pTsEnd) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -231,9 +231,10 @@ sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s);
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
# sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s);
# sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
# sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4);
......@@ -269,13 +270,13 @@ if $rows == 0 then
goto loop3
endi
sql select * from streamt5;
#sql select * from streamt5;
if $rows == 0 then
print ======$rows
goto loop3
# goto loop3
endi
sql select * from streamt6;
# sql select * from streamt6;
if $rows == 0 then
print ======$rows
goto loop3
......
......@@ -85,7 +85,7 @@ sql insert into t2 values(1648791243003,1,2,3,1.0) (1648791243002,1,2,3,1.0) (16
sleep 500
sql select * from streamt2;
if $rows != 3 then
print ======$rows
print =====rows=$rows
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册