Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
24261cc9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
24261cc9
编写于
8月 24, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream): refine stream backend interface
上级
c064bc38
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
216 addition
and
104 deletion
+216
-104
examples/c/stream_demo.c
examples/c/stream_demo.c
+1
-1
include/common/tcommon.h
include/common/tcommon.h
+24
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+9
-8
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+11
-5
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+36
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+98
-71
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+36
-18
未找到文件。
examples/c/stream_demo.c
浏览文件 @
24261cc9
...
...
@@ -96,7 +96,7 @@ int32_t create_stream() {
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start,
k from st1 partition by tbname state_window(k
)"
);
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start,
avg(k) from st1 partition by tbname interval(10s
)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/common/tcommon.h
浏览文件 @
24261cc9
...
...
@@ -44,6 +44,30 @@ enum {
)
// clang-format on
typedef
struct
{
TSKEY
ts
;
uint64_t
groupId
;
}
SWinKey
;
static
inline
int
SWinKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
)
{
SWinKey
*
pWin1
=
(
SWinKey
*
)
pKey1
;
SWinKey
*
pWin2
=
(
SWinKey
*
)
pKey2
;
if
(
pWin1
->
groupId
>
pWin2
->
groupId
)
{
return
1
;
}
else
if
(
pWin1
->
groupId
<
pWin2
->
groupId
)
{
return
-
1
;
}
if
(
pWin1
->
ts
>
pWin2
->
ts
)
{
return
1
;
}
else
if
(
pWin1
->
ts
<
pWin2
->
ts
)
{
return
-
1
;
}
return
0
;
}
enum
{
TMQ_MSG_TYPE__DUMMY
=
0
,
TMQ_MSG_TYPE__POLL_RSP
,
...
...
include/libs/stream/tstream.h
浏览文件 @
24261cc9
...
...
@@ -551,16 +551,17 @@ typedef struct {
}
SStreamStateCur
;
#if 1
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
void
streamFreeVal
(
void
*
val
);
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
void
streamStateFreeCur
(
SStreamStateCur
*
pCur
);
int32_t
stream
GetKVByCur
(
SStreamStateCur
*
pCur
,
void
**
pKey
,
int32_t
*
pKLen
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
stream
StateGetKVByCur
(
SStreamStateCur
*
pCur
,
SWinKey
*
pKey
,
const
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateSeekFirst
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
int32_t
streamStateSeekLast
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
24261cc9
...
...
@@ -652,27 +652,33 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
}
SReadHandle
handle
=
{
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initTqReader
=
1
,
.
pStateBackend
=
pTask
->
pState
,
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
executor
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
}
SReadHandle
mgHandle
=
{
.
vnode
=
NULL
,
.
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTask
->
childEpInfo
),
.
pStateBackend
=
pTask
->
pState
,
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
mgHandle
);
ASSERT
(
pTask
->
exec
.
executor
);
}
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
}
// sink
/*pTask->ahandle = pTq->pVnode;*/
if
(
pTask
->
outputType
==
TASK_OUTPUT__SMA
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
24261cc9
...
...
@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
}
#endif
qDebug
(
"stream scan called"
);
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__LOG
)
{
while
(
1
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
24261cc9
...
...
@@ -15,6 +15,7 @@
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
...
...
@@ -27,11 +28,6 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal)
typedef
struct
SWinRes
{
TSKEY
ts
;
uint64_t
groupId
;
}
SWinRes
;
typedef
struct
SPullWindowInfo
{
STimeWindow
window
;
uint64_t
groupId
;
...
...
@@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
setNotInterpoWindowKey
(
pSup
->
pCtx
,
numOfExprs
,
RESULT_ROW_START_INTERP
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
0
,
pBlock
->
info
.
rows
,
numOfExprs
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
0
,
pBlock
->
info
.
rows
,
numOfExprs
);
if
(
isResultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
))
{
closeResultRow
(
pr
);
...
...
@@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
int32_t
compareResKey
(
void
*
pKey
,
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SResKeyPos
*
pos
=
taosArrayGetP
(
res
,
index
);
SWin
Res
*
pData
=
(
SWinRes
*
)
pKey
;
SWin
Key
*
pData
=
(
SWinKey
*
)
pKey
;
if
(
pData
->
ts
==
*
(
int64_t
*
)
pos
->
key
)
{
if
(
pData
->
groupId
>
pos
->
groupId
)
{
return
1
;
...
...
@@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) {
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
SWin
Res
data
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
SWin
Key
data
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
int32_t
index
=
binarySearchCom
(
pUpdated
,
size
,
&
data
,
TSDB_ORDER_DESC
,
compareResKey
);
if
(
index
==
-
1
)
{
index
=
0
;
...
...
@@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
newPos
->
groupId
=
groupId
;
newPos
->
pos
=
(
SResultRowPosition
){.
pageId
=
pageId
,
.
offset
=
offset
};
*
(
int64_t
*
)
newPos
->
key
=
ts
;
SWin
Res
key
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
if
(
taosHashPut
(
pUpdatedMap
,
&
key
,
sizeof
(
SWin
Res
),
&
newPos
,
sizeof
(
void
*
))
!=
TSDB_CODE_SUCCESS
)
{
SWin
Key
key
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
if
(
taosHashPut
(
pUpdatedMap
,
&
key
,
sizeof
(
SWin
Key
),
&
newPos
,
sizeof
(
void
*
))
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
newPos
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
static
void
removeResults
(
SArray
*
pWins
,
SHashObj
*
pUpdatedMap
)
{
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWin
Res
*
pW
=
taosArrayGet
(
pWins
,
i
);
taosHashRemove
(
pUpdatedMap
,
pW
,
sizeof
(
SWin
Res
));
SWin
Key
*
pW
=
taosArrayGet
(
pWins
,
i
);
taosHashRemove
(
pUpdatedMap
,
pW
,
sizeof
(
SWin
Key
));
}
}
int64_t
getWinReskey
(
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SWin
Res
*
pos
=
taosArrayGet
(
res
,
index
);
SWin
Key
*
pos
=
taosArrayGet
(
res
,
index
);
return
pos
->
ts
;
}
int32_t
compareWinRes
(
void
*
pKey
,
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SWin
Res
*
pos
=
taosArrayGetP
(
res
,
index
);
SWin
Key
*
pos
=
taosArrayGetP
(
res
,
index
);
SResKeyPos
*
pData
=
(
SResKeyPos
*
)
pKey
;
if
(
*
(
int64_t
*
)
pData
->
key
==
pos
->
ts
)
{
if
(
pData
->
groupId
>
pos
->
groupId
)
{
...
...
@@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
...
...
@@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pSup
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
}
...
...
@@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
false
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
p
RowSup
->
numOfRows
,
p
Block
->
info
.
rows
,
numOfOutput
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
static
SSDataBlock
*
doStateWindowAgg
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsStarts
[
i
],
pInterval
,
TSDB_ORDER_ASC
);
doDeleteIntervalWindow
(
pAggSup
,
win
.
skey
,
groupIds
[
i
]);
if
(
pUpWins
)
{
SWin
Res
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
groupIds
[
i
]};
SWin
Key
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
groupIds
[
i
]};
taosArrayPush
(
pUpWins
,
&
winRes
);
}
}
...
...
@@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
uint64_t
winGpId
=
pGpDatas
?
pGpDatas
[
startPos
]
:
pBlock
->
info
.
groupId
;
bool
res
=
doClearWindow
(
pAggSup
,
pSup1
,
(
char
*
)
&
win
.
skey
,
sizeof
(
TSKEY
),
winGpId
,
numOfOutput
);
if
(
pUpWins
&&
res
)
{
SWin
Res
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
SWin
Key
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
taosArrayPush
(
pUpWins
,
&
winRes
);
}
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
...
...
@@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
STimeWindow
win
;
win
.
skey
=
ts
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
)
-
1
;
SWin
Res
winRe
=
{
SWin
Key
winRe
=
{
.
ts
=
win
.
skey
,
.
groupId
=
groupId
,
};
void
*
chIds
=
taosHashGet
(
pPullDataMap
,
&
winRe
,
sizeof
(
SWin
Res
));
void
*
chIds
=
taosHashGet
(
pPullDataMap
,
&
winRe
,
sizeof
(
SWin
Key
));
if
(
isCloseWindow
(
&
win
,
pSup
))
{
if
(
chIds
&&
pPullDataMap
)
{
SArray
*
chAy
=
*
(
SArray
**
)
chIds
;
...
...
@@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
for
(
int32_t
i
=
*
index
;
i
<
size
;
i
++
)
{
SWin
Res
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SWin
Key
*
pWin
=
taosArrayGet
(
pWins
,
i
);
colDataAppend
(
pTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
&
pWin
->
ts
,
false
);
colDataAppend
(
pGroupCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
&
pWin
->
groupId
,
false
);
pBlock
->
info
.
rows
++
;
...
...
@@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SStreamState
*
pState
=
pTaskInfo
->
streamInfo
.
pState
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdatedMap
);
}
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
...
...
@@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
}
pInfo
->
pRecycledPages
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWin
Res
));
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWin
Key
));
pInfo
->
delIndex
=
0
;
pInfo
->
pDelRes
=
createSpecialDataBlock
(
STREAM_DELETE_RESULT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
...
...
@@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
false
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
p
RowSup
->
numOfRows
,
p
Block
->
info
.
rows
,
numOfOutput
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
static
SSDataBlock
*
doSessionWindowAgg
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
return
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWin
Res
*
pWinRes
=
taosArrayGet
(
pWinArray
,
i
);
SWin
Key
*
pWinRes
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
STimeWindow
ParentWin
=
{.
skey
=
pWinRes
->
ts
,
.
ekey
=
pWinRes
->
ts
+
1
};
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
ParentWin
,
true
,
&
pCurResult
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
...
...
@@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY*
return
getNextQualifiedWindow
(
pInterval
,
pNextWin
,
pBlockInfo
,
tsCols
,
prevEndPos
,
TSDB_ORDER_ASC
);
}
void
addPullWindow
(
SHashObj
*
pMap
,
SWin
Res
*
pWinRes
,
int32_t
size
)
{
void
addPullWindow
(
SHashObj
*
pMap
,
SWin
Key
*
pWinRes
,
int32_t
size
)
{
SArray
*
childIds
=
taosArrayInit
(
8
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
taosArrayPush
(
childIds
,
&
i
);
}
taosHashPut
(
pMap
,
pWinRes
,
sizeof
(
SWin
Res
),
&
childIds
,
sizeof
(
void
*
));
taosHashPut
(
pMap
,
pWinRes
,
sizeof
(
SWin
Key
),
&
childIds
,
sizeof
(
void
*
));
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
...
...
@@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
if
(
IS_FINAL_OP
(
pInfo
)
&&
isClosed
&&
pInfo
->
pChildren
)
{
bool
ignore
=
true
;
SWin
Res
winRes
=
{
SWin
Key
winRes
=
{
.
ts
=
nextWin
.
skey
,
.
groupId
=
tableGroupId
,
};
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWin
Res
));
void
*
chIds
=
taosHashGet
(
pInfo
->
pPullDataMap
,
&
winRes
,
sizeof
(
SWin
Key
));
if
(
isDeletedWindow
(
&
nextWin
,
tableGroupId
,
&
pInfo
->
aggSup
)
&&
!
chIds
)
{
SPullWindowInfo
pull
=
{.
window
=
nextWin
,
.
groupId
=
tableGroupId
};
// add pull data request
...
...
@@ -3039,8 +3068,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
uint64_t
*
groupIdData
=
(
uint64_t
*
)
pGroupCol
->
pData
;
int32_t
chId
=
getChildIndex
(
pBlock
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
SWin
Res
winRes
=
{.
ts
=
tsData
[
i
],
.
groupId
=
groupIdData
[
i
]};
void
*
chIds
=
taosHashGet
(
pMap
,
&
winRes
,
sizeof
(
SWin
Res
));
SWin
Key
winRes
=
{.
ts
=
tsData
[
i
],
.
groupId
=
groupIdData
[
i
]};
void
*
chIds
=
taosHashGet
(
pMap
,
&
winRes
,
sizeof
(
SWin
Key
));
if
(
chIds
)
{
SArray
*
chArray
=
*
(
SArray
**
)
chIds
;
int32_t
index
=
taosArraySearchIdx
(
chArray
,
&
chId
,
compareInt32Val
,
TD_EQ
);
...
...
@@ -3049,7 +3078,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove
(
chArray
,
index
);
if
(
taosArrayGetSize
(
chArray
)
==
0
)
{
// pull data is over
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWin
Res
));
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWin
Key
));
}
}
}
...
...
@@ -3133,7 +3162,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWin
Res
));
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWin
Key
));
doClearWindows
(
&
pInfo
->
aggSup
,
pSup
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
pUpWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
...
...
@@ -3171,7 +3200,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
&&
!
IS_FINAL_OP
(
pInfo
))
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWin
Res
));
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWin
Key
));
doClearWindows
(
&
pInfo
->
aggSup
,
pSup
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
pUpWins
);
removeResults
(
pUpWins
,
pUpdatedMap
);
taosArrayDestroy
(
pUpWins
);
...
...
@@ -3386,7 +3415,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
ignoreExpiredData
=
pIntervalPhyNode
->
window
.
igExpired
;
pInfo
->
pDelRes
=
createSpecialDataBlock
(
STREAM_DELETE_RESULT
);
pInfo
->
delIndex
=
0
;
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWin
Res
));
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWin
Key
));
pInfo
->
pRecycledPages
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
pOperator
->
operatorType
=
pPhyNode
->
type
;
...
...
@@ -3721,8 +3750,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
}
if
(
pWinInfo
->
win
.
skey
>
pStartTs
[
i
])
{
if
(
pStDeleted
&&
pWinInfo
->
isOutput
)
{
SWin
Res
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Res
));
SWin
Key
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Key
));
pWinInfo
->
isOutput
=
false
;
}
pWinInfo
->
win
.
skey
=
pStartTs
[
i
];
...
...
@@ -3840,8 +3869,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
compactFunctions
(
pSup
->
pCtx
,
pInfo
->
pDummyCtx
,
numOfOutput
,
pTaskInfo
);
taosHashRemove
(
pStUpdated
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
));
if
(
pWinInfo
->
isOutput
)
{
SWin
Res
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Res
));
SWin
Key
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Key
));
pWinInfo
->
isOutput
=
false
;
}
taosArrayRemove
(
pInfo
->
streamAggSup
.
pCurWins
,
i
);
...
...
@@ -3903,8 +3932,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
pCurWin
->
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pStUpdated
)
{
SWin
Res
value
=
{.
ts
=
pCurWin
->
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pStUpdated
,
&
pCurWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWin
Res
));
SWin
Key
value
=
{.
ts
=
pCurWin
->
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pStUpdated
,
&
pCurWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWin
Key
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -3956,8 +3985,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t
step
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
gpCols
[
i
],
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
gpCols
[
i
],
gap
,
&
winIndex
);
if
(
!
pCurWin
||
pCurWin
->
pos
.
pageId
==
-
1
)
{
// window has been closed.
step
=
1
;
...
...
@@ -3982,9 +4010,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
if
(
pos
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
pos
->
groupId
=
((
SWin
Res
*
)
pData
)
->
groupId
;
pos
->
groupId
=
((
SWin
Key
*
)
pData
)
->
groupId
;
pos
->
pos
=
*
(
SResultRowPosition
*
)
key
;
*
(
int64_t
*
)
pos
->
key
=
((
SWin
Res
*
)
pData
)
->
ts
;
*
(
int64_t
*
)
pos
->
key
=
((
SWin
Key
*
)
pData
)
->
ts
;
taosArrayPush
(
pUpdated
,
&
pos
);
}
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
...
...
@@ -4000,7 +4028,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity
(
pBlock
,
size
);
size_t
keyLen
=
0
;
while
(((
*
Ite
)
=
taosHashIterate
(
pStDeleted
,
*
Ite
))
!=
NULL
)
{
SWin
Res
*
res
=
*
Ite
;
SWin
Key
*
res
=
*
Ite
;
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
colDataAppend
(
pTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
&
res
->
ts
,
false
);
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
...
...
@@ -4131,8 +4159,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t
size
=
taosArrayGetSize
(
pResWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pWinInfo
=
taosArrayGet
(
pResWins
,
i
);
SWin
Res
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
pWinInfo
->
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Res
));
SWin
Key
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
pWinInfo
->
groupId
};
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWin
Key
));
}
}
...
...
@@ -4170,14 +4198,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
pWins
);
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
p
Operator
->
exprSupp
.
numOfExprs
,
0
,
p
Wins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pBlock
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
);
}
taosArrayDestroy
(
pWins
);
...
...
@@ -4581,7 +4609,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_
}
int32_t
updateStateWindowInfo
(
SArray
*
pWinInfos
,
int32_t
winIndex
,
TSKEY
*
pTs
,
uint64_t
groupId
,
SColumnInfoData
*
pKeyCol
,
int32_t
rows
,
int32_t
start
,
bool
*
allEqual
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pKeyCol
,
int32_t
rows
,
int32_t
start
,
bool
*
allEqual
,
SHashObj
*
pSeDeleted
)
{
*
allEqual
=
true
;
SStateWindowInfo
*
pWinInfo
=
taosArrayGet
(
pWinInfos
,
winIndex
);
for
(
int32_t
i
=
start
;
i
<
rows
;
++
i
)
{
...
...
@@ -4602,9 +4631,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
}
if
(
pWinInfo
->
winInfo
.
win
.
skey
>
pTs
[
i
])
{
if
(
pSeDeleted
&&
pWinInfo
->
winInfo
.
isOutput
)
{
SWinRes
res
=
{.
ts
=
pWinInfo
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pSeDeleted
,
&
pWinInfo
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWinRes
));
SWinKey
res
=
{.
ts
=
pWinInfo
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pSeDeleted
,
&
pWinInfo
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
res
,
sizeof
(
SWinKey
));
pWinInfo
->
winInfo
.
isOutput
=
false
;
}
pWinInfo
->
winInfo
.
win
.
skey
=
pTs
[
i
];
...
...
@@ -4617,14 +4645,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
return
rows
-
start
;
}
static
void
doClearStateWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
static
void
doClearStateWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pGroupColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsColInfo
->
pData
;
bool
allEqual
=
false
;
int32_t
step
=
1
;
uint64_t
*
gpCol
=
(
uint64_t
*
)
pGroupColInfo
->
pData
;
uint64_t
*
gpCol
=
(
uint64_t
*
)
pGroupColInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
gpCol
[
i
],
&
winIndex
);
...
...
@@ -4668,13 +4696,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
bool
allEqual
=
true
;
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
,
tsCols
[
i
],
groupId
,
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCols
,
groupId
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pStDeleted
);
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
,
tsCols
[
i
],
groupId
,
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCols
,
groupId
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pStDeleted
);
if
(
!
allEqual
)
{
appendOneRow
(
pAggSup
->
pScanBlock
,
&
pCurWin
->
winInfo
.
win
.
skey
,
&
pCurWin
->
winInfo
.
win
.
ekey
,
GROUPID_COLUMN_INDEX
,
&
groupId
);
appendOneRow
(
pAggSup
->
pScanBlock
,
&
pCurWin
->
winInfo
.
win
.
skey
,
&
pCurWin
->
winInfo
.
win
.
ekey
,
GROUPID_COLUMN_INDEX
,
&
groupId
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
destroyStateWinInfo
);
continue
;
...
...
@@ -4685,8 +4712,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
pCurWin
->
winInfo
.
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
SWin
Res
value
=
{.
ts
=
pCurWin
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWin
Res
));
SWin
Key
value
=
{.
ts
=
pCurWin
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWin
Key
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
24261cc9
...
...
@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
FAIL_SHUFFLE_DISPATCH:
if
(
pReqs
)
{
for
(
int32_t
i
=
0
;
i
<
vgSz
;
i
++
)
{
taosArrayDestroy
(
pReqs
[
i
].
data
);
taosArrayDestroy
P
(
pReqs
[
i
].
data
,
taosMemoryFree
);
taosArrayDestroy
(
pReqs
[
i
].
dataLen
);
}
taosMemoryFree
(
pReqs
);
...
...
source/libs/stream/src/streamState.c
浏览文件 @
24261cc9
...
...
@@ -15,6 +15,7 @@
#include "executor.h"
#include "streamInc.h"
#include "tcommon.h"
#include "ttimer.h"
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
)
{
...
...
@@ -23,14 +24,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
char
statePath
[
2
00
];
char
statePath
[
3
00
];
sprintf
(
statePath
,
"%s/%d"
,
path
,
pTask
->
taskId
);
if
(
tdbOpen
(
statePath
,
16
*
1024
,
1
,
&
pState
->
db
)
<
0
)
{
if
(
tdbOpen
(
statePath
,
4096
,
256
,
&
pState
->
db
)
<
0
)
{
goto
_err
;
}
// open state storage backend
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pState
->
db
,
&
pState
->
pStateDb
)
<
0
)
{
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
SWinKey
),
-
1
,
SWinKeyCmpr
,
pState
->
db
,
&
pState
->
pStateDb
)
<
0
)
{
goto
_err
;
}
if
(
streamStateBegin
(
pState
)
<
0
)
{
goto
_err
;
}
...
...
@@ -60,6 +65,7 @@ int32_t streamStateBegin(SStreamState* pState) {
}
if
(
tdbBegin
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
tdbTxnClose
(
&
pState
->
txn
);
return
-
1
;
}
return
0
;
...
...
@@ -95,33 +101,39 @@ int32_t streamStateAbort(SStreamState* pState) {
return
0
;
}
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
const
void
*
value
,
int32_t
vLen
)
{
return
tdbTbUpsert
(
pState
->
pStateDb
,
key
,
kLen
,
value
,
vLen
,
&
pState
->
txn
);
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
return
tdbTbUpsert
(
pState
->
pStateDb
,
key
,
sizeof
(
SWinKey
)
,
value
,
vLen
,
&
pState
->
txn
);
}
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbGet
(
pState
->
pStateDb
,
key
,
kLen
,
pVal
,
pVLen
);
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbGet
(
pState
->
pStateDb
,
key
,
sizeof
(
SWinKey
)
,
pVal
,
pVLen
);
}
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
return
tdbTbDelete
(
pState
->
pStateDb
,
key
,
kLen
,
&
pState
->
txn
);
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
return
tdbTbDelete
(
pState
->
pStateDb
,
key
,
sizeof
(
SWinKey
)
,
&
pState
->
txn
);
}
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
;
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
);
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
)
,
&
c
);
if
(
c
!=
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
return
0
;
return
pCur
;
}
int32_t
streamGetKVByCur
(
SStreamStateCur
*
pCur
,
void
**
pKey
,
int32_t
*
pKLen
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbcGet
(
pCur
->
pCur
,
(
const
void
**
)
pKey
,
pKLen
,
(
const
void
**
)
pVal
,
pVLen
);
int32_t
streamStateGetKVByCur
(
SStreamStateCur
*
pCur
,
SWinKey
*
pKey
,
const
void
**
pVal
,
int32_t
*
pVLen
)
{
const
SWinKey
*
pKTmp
=
NULL
;
int32_t
kLen
;
if
(
tdbTbcGet
(
pCur
->
pCur
,
(
const
void
**
)
&
pKTmp
,
&
kLen
,
pVal
,
pVLen
)
<
0
)
{
return
-
1
;
}
*
pKey
=
*
pKTmp
;
return
0
;
}
int32_t
streamStateSeekFirst
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
)
{
...
...
@@ -134,14 +146,14 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
return
tdbTbcMoveToLast
(
pCur
->
pCur
);
}
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
int32_t
c
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
)
<
0
)
{
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
)
,
&
c
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
...
...
@@ -155,14 +167,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, i
return
pCur
;
}
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
int32_t
c
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
)
<
0
)
{
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
)
,
&
c
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
...
...
@@ -185,3 +197,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
//
return
tdbTbcMoveToPrev
(
pCur
->
pCur
);
}
void
streamStateFreeCur
(
SStreamStateCur
*
pCur
)
{
tdbTbcClose
(
pCur
->
pCur
);
taosMemoryFree
(
pCur
);
}
void
streamFreeVal
(
void
*
val
)
{
tdbFree
(
val
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录