Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8efd7e87
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8efd7e87
编写于
10月 18, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream):replace disk buff
上级
fe5b898e
变更
15
展开全部
显示空白变更内容
内联
并排
Showing
15 changed file
with
1078 addition
and
1095 deletion
+1078
-1095
include/common/tcommon.h
include/common/tcommon.h
+8
-3
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+16
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+4
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+44
-47
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+79
-37
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+9
-10
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+2
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+612
-960
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+7
-4
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+282
-15
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+7
-1
tests/script/tsim/stream/session0.sim
tests/script/tsim/stream/session0.sim
+2
-2
tests/script/tsim/stream/session1.sim
tests/script/tsim/stream/session1.sim
+3
-3
tests/script/tsim/stream/state0.sim
tests/script/tsim/stream/state0.sim
+1
-1
tests/script/tsim/stream/triggerSession0.sim
tests/script/tsim/stream/triggerSession0.sim
+2
-7
未找到文件。
include/common/tcommon.h
浏览文件 @
8efd7e87
...
...
@@ -44,12 +44,17 @@ enum {
)
// clang-format on
typedef
struct
{
typedef
struct
SWinKey
{
uint64_t
groupId
;
TSKEY
ts
;
}
SWinKey
;
static
inline
int
sWinKeyCmprImpl
(
const
void
*
pKey1
,
const
void
*
pKey2
)
{
typedef
struct
SSessionKey
{
STimeWindow
win
;
uint64_t
groupId
;
}
SSessionKey
;
static
inline
int
winKeyCmprImpl
(
const
void
*
pKey1
,
const
void
*
pKey2
)
{
SWinKey
*
pWin1
=
(
SWinKey
*
)
pKey1
;
SWinKey
*
pWin2
=
(
SWinKey
*
)
pKey2
;
...
...
@@ -69,7 +74,7 @@ static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
}
static
inline
int
winKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
)
{
return
sW
inKeyCmprImpl
(
pKey1
,
pKey2
);
return
w
inKeyCmprImpl
(
pKey1
,
pKey2
);
}
typedef
struct
{
...
...
include/libs/stream/streamState.h
浏览文件 @
8efd7e87
...
...
@@ -25,6 +25,8 @@ extern "C" {
typedef
struct
SStreamTask
SStreamTask
;
typedef
bool
(
*
state_key_cmpr_fn
)(
void
*
pKey1
,
void
*
pKey2
);
// incremental state storage
typedef
struct
{
SStreamTask
*
pOwner
;
...
...
@@ -32,6 +34,7 @@ typedef struct {
TTB
*
pStateDb
;
TTB
*
pFuncStateDb
;
TTB
*
pFillStateDb
;
// todo refactor
TTB
*
pSessionStateDb
;
TXN
txn
;
int32_t
number
;
}
SStreamState
;
...
...
@@ -57,6 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t
streamStateClear
(
SStreamState
*
pState
);
void
streamStateSetNumber
(
SStreamState
*
pState
,
int32_t
number
);
int32_t
streamStateSessionAddIfNotExist
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateSessionPut
(
SStreamState
*
pState
,
const
SSessionKey
*
key
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateSessionGet
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateSessionDel
(
SStreamState
*
pState
,
const
SSessionKey
*
key
);
int32_t
streamStateSessionClear
(
SStreamState
*
pState
);
int32_t
streamStateSessionGetKVByCur
(
SStreamStateCur
*
pCur
,
SSessionKey
*
pKey
,
const
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateStateAddIfNotExist
(
SStreamState
*
pState
,
SSessionKey
*
key
,
char
*
pKeyData
,
int32_t
keyDataLen
,
state_key_cmpr_fn
fn
,
void
**
pVal
,
int32_t
*
pVLen
);
SStreamStateCur
*
streamStateSessionSeekKeyNext
(
SStreamState
*
pState
,
const
SSessionKey
*
key
);
SStreamStateCur
*
streamStateSessionSeekKeyPrev
(
SStreamState
*
pState
,
const
SSessionKey
*
key
);
SStreamStateCur
*
streamStateSessionGetCur
(
SStreamState
*
pState
,
const
SSessionKey
*
key
);
int32_t
streamStateFillPut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateFillGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateFillDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
...
...
source/common/src/tdatablock.c
浏览文件 @
8efd7e87
...
...
@@ -1892,12 +1892,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
)
||
!
pColInfoData
->
pData
)
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
"NULL"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
continue
;
}
void
*
var
=
colDataGetData
(
pColInfoData
,
j
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
memset
(
pBuf
,
0
,
sizeof
(
pBuf
));
...
...
@@ -1926,8 +1927,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15lf |"
,
*
(
double
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
//
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
//
if (len >= size - 1) return dumpBuf;
break
;
case
TSDB_DATA_TYPE_BOOL
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15d |"
,
*
(
bool
*
)
var
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
8efd7e87
...
...
@@ -53,6 +53,11 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
enum
{
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED
=
0x1u
,
...
...
@@ -434,15 +439,15 @@ typedef struct SCatchSupporter {
}
SCatchSupporter
;
typedef
struct
SStreamAggSupporter
{
SHashObj
*
pResultRows
;
SArray
*
pCurWins
;
int32_t
valueSize
;
int32_t
keySize
;
char
*
pKeyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
int32_t
currentPageId
;
// buffer page that is active
SSDataBlock
*
pScanBlock
;
SStreamState
*
pState
;
int64_t
gap
;
// stream session window gap
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSHashObj
*
pResultRows
;
int32_t
stateKeySize
;
int16_t
stateKeyType
;
SDiskbasedBuf
*
pResultBuf
;
}
SStreamAggSupporter
;
typedef
struct
SWindowSupporter
{
...
...
@@ -736,16 +741,14 @@ typedef struct SSessionAggOperatorInfo {
}
SSessionAggOperatorInfo
;
typedef
struct
SResultWindowInfo
{
SResultRowPosition
pos
;
STimeWindow
win
;
uint64_t
groupId
;
void
*
pOutputBuf
;
SSessionKey
sessionWin
;
bool
isOutput
;
bool
isClosed
;
}
SResultWindowInfo
;
typedef
struct
SStateWindowInfo
{
SResultWindowInfo
winInfo
;
SStateKeys
s
tateKey
;
SStateKeys
*
pS
tateKey
;
}
SStateWindowInfo
;
typedef
struct
SStreamSessionAggOperatorInfo
{
...
...
@@ -753,17 +756,15 @@ typedef struct SStreamSessionAggOperatorInfo {
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int64_t
gap
;
// session window gap
int32_t
primaryTsIndex
;
// primary timestamp slot id
int32_t
endTsIndex
;
// window end timestamp slot id
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SSDataBlock
*
pWinBlock
;
// window result
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSDataBlock
*
pDelRes
;
// delete result
SSDataBlock
*
pUpdateRes
;
// update window
bool
returnUpdate
;
S
HashObj
*
pStDeleted
;
S
SHashObj
*
pStDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result; final stream operator
SPhysiNode
*
pPhyNode
;
// create new child
...
...
@@ -772,6 +773,22 @@ typedef struct SStreamSessionAggOperatorInfo {
SHashObj
*
pGroupIdTbNameMap
;
}
SStreamSessionAggOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
SColumn
stateCol
;
SSDataBlock
*
pDelRes
;
SSHashObj
*
pSeDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
bool
ignoreExpiredData
;
SHashObj
*
pGroupIdTbNameMap
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SStreamPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SPartitionBySupporter
partitionSup
;
...
...
@@ -834,24 +851,6 @@ typedef struct SStateWindowOperatorInfo {
const
SNode
*
pCondition
;
}
SStateWindowOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SColumn
stateCol
;
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSDataBlock
*
pDelRes
;
SHashObj
*
pSeDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
bool
ignoreExpiredData
;
SHashObj
*
pGroupIdTbNameMap
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
...
...
@@ -1064,13 +1063,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
size
);
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int32_t
*
currentPageId
,
int32_t
interBufSize
);
SResultWindowInfo
*
getSessionTimeWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
);
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
);
void
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
SSessionKey
*
pKey
);
bool
isInTimeWindow
(
STimeWindow
*
pWin
,
TSKEY
ts
,
int64_t
gap
);
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
...
...
@@ -1100,6 +1094,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
int32_t
buildDataBlockFromGroupRes
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
);
int32_t
buildSessionResultDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
);
int32_t
releaseOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8efd7e87
...
...
@@ -4192,42 +4192,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return
TSDB_CODE_SUCCESS
;
}
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
size
)
{
pSup
->
currentPageId
=
-
1
;
pSup
->
resultRowSize
=
getResultRowSize
(
pCtx
,
numOfOutput
);
pSup
->
keySize
=
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
);
pSup
->
pKeyBuf
=
taosMemoryCalloc
(
1
,
pSup
->
keySize
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pSup
->
pResultRows
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
if
(
pSup
->
pKeyBuf
==
NULL
||
pSup
->
pResultRows
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSup
->
valueSize
=
size
;
pSup
->
pScanBlock
=
createSpecialDataBlock
(
STREAM_CLEAR
);
int32_t
pageSize
=
4096
;
while
(
pageSize
<
pSup
->
resultRowSize
*
4
)
{
pageSize
<<=
1u
;
}
// at least four pages need to be in buffer
int32_t
bufSize
=
4096
*
256
;
if
(
bufSize
<=
pageSize
)
{
bufSize
=
pageSize
*
4
;
}
if
(
!
osTempSpaceAvailable
())
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
qError
(
"Init stream agg supporter failed since %s"
,
terrstr
(
terrno
));
return
terrno
;
}
int32_t
code
=
createDiskbasedBuf
(
&
pSup
->
pResultBuf
,
pageSize
,
bufSize
,
pKey
,
tsTempDir
);
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
pCtx
[
i
].
saveHandle
.
pBuf
=
pSup
->
pResultBuf
;
}
return
code
;
}
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
SWinKey
key
=
{
...
...
@@ -4237,7 +4201,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
char
*
value
=
NULL
;
int32_t
size
=
pAggSup
->
resultRowSize
;
tSimpleHashPut
(
pAggSup
->
pResultRowHashTable
,
&
key
,
sizeof
(
SWinKey
),
NULL
,
0
);
if
(
streamStateAddIfNotExist
(
pState
,
&
key
,
(
void
**
)
&
value
,
&
size
)
<
0
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
...
...
@@ -4342,3 +4305,82 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
)
{
streamStateSessionPut
(
pState
,
key
,
(
const
void
*
)
buf
,
size
);
releaseOutputBuf
(
pState
,
NULL
,
(
SResultRow
*
)
buf
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildSessionResultDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
)
{
SExprInfo
*
pExprInfo
=
pSup
->
pExprInfo
;
int32_t
numOfExprs
=
pSup
->
numOfExprs
;
int32_t
*
rowEntryOffset
=
pSup
->
rowEntryInfoOffset
;
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
for
(
int32_t
i
=
pGroupResInfo
->
index
;
i
<
numOfRows
;
i
+=
1
)
{
SSessionKey
*
pKey
=
taosArrayGet
(
pGroupResInfo
->
pRows
,
i
);
int32_t
size
=
0
;
void
*
pVal
=
NULL
;
int32_t
code
=
streamStateSessionGet
(
pState
,
pKey
,
&
pVal
,
&
size
);
ASSERT
(
code
==
0
);
SResultRow
*
pRow
=
(
SResultRow
*
)
pVal
;
doUpdateNumOfRows
(
pCtx
,
pRow
,
numOfExprs
,
rowEntryOffset
);
// no results, continue to check the next one
if
(
pRow
->
numOfRows
==
0
)
{
pGroupResInfo
->
index
+=
1
;
releaseOutputBuf
(
pState
,
NULL
,
pRow
);
continue
;
}
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pKey
->
groupId
;
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pKey
->
groupId
)
{
releaseOutputBuf
(
pState
,
NULL
,
pRow
);
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
ASSERT
(
pBlock
->
info
.
rows
>
0
);
releaseOutputBuf
(
pState
,
NULL
,
pRow
);
break
;
}
pGroupResInfo
->
index
+=
1
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntryOffset
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code1
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code1
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code1
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code1
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
pBlock
->
info
.
rows
+=
pRow
->
numOfRows
;
// saveSessionDiscBuf(pState, pKey, pVal, size);
releaseOutputBuf
(
pState
,
NULL
,
pRow
);
}
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/executor/src/scanoperator.c
浏览文件 @
8efd7e87
...
...
@@ -1190,23 +1190,22 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
uidCol
[
i
],
startData
[
i
],
version
);
// gap must be 0.
S
ResultWindowInfo
*
pStartWin
=
getCurSessionWindow
(
pInfo
->
windowSup
.
pStreamAggSup
,
startData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
if
(
!
pStartWin
)
{
S
SessionKey
startWin
=
{
0
};
getCurSessionWindow
(
pInfo
->
windowSup
.
pStreamAggSup
,
startData
[
i
],
endData
[
i
],
groupId
,
&
startWin
);
if
(
IS_INVALID_SESSION_WIN_KEY
(
startWin
)
)
{
// window has been closed.
continue
;
}
S
ResultWindowInfo
*
pEndWin
=
getCurSessionWindow
(
pInfo
->
windowSup
.
pStreamAggSup
,
endData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
ASSERT
(
pEndWin
);
TSKEY
ts
=
INT64_MIN
;
colDataAppend
(
pDest
StartCol
,
i
,
(
const
char
*
)
&
pStartWin
->
win
.
s
key
,
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)
&
pEndWin
->
win
.
ekey
,
false
);
S
SessionKey
endWin
=
{
0
};
getCurSessionWindow
(
pInfo
->
windowSup
.
pStreamAggSup
,
endData
[
i
],
endData
[
i
],
groupId
,
&
endWin
);
ASSERT
(
!
IS_INVALID_SESSION_WIN_KEY
(
endWin
)
);
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)
&
startWin
.
win
.
skey
,
false
)
;
colDataAppend
(
pDest
EndCol
,
i
,
(
const
char
*
)
&
endWin
.
win
.
e
key
,
false
);
colDataAppendNULL
(
pDestUidCol
,
i
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppendNULL
(
pDestCalStartTsCol
,
i
);
...
...
source/libs/executor/src/tfill.c
浏览文件 @
8efd7e87
...
...
@@ -693,7 +693,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup
->
pAllColInfo
=
destroyFillColumnInfo
(
pFillSup
->
pAllColInfo
,
pFillSup
->
numOfFillCols
,
pFillSup
->
numOfAllCols
);
tSimpleHashCleanup
(
pFillSup
->
pResMap
);
pFillSup
->
pResMap
=
NULL
;
streamStateReleaseBuf
(
NULL
,
NULL
,
pFillSup
->
cur
.
pRowVal
);
releaseOutputBuf
(
NULL
,
NULL
,
(
SResultRow
*
)
pFillSup
->
cur
.
pRowVal
);
pFillSup
->
cur
.
pRowVal
=
NULL
;
taosMemoryFree
(
pFillSup
);
...
...
@@ -736,7 +736,7 @@ static void resetFillWindow(SResultRowData* pRowData) {
void
resetPrevAndNextWindow
(
SStreamFillSupporter
*
pFillSup
,
SStreamState
*
pState
)
{
resetFillWindow
(
&
pFillSup
->
prev
);
streamStateReleaseBuf
(
NULL
,
NULL
,
pFillSup
->
cur
.
pRowVal
);
releaseOutputBuf
(
NULL
,
NULL
,
(
SResultRow
*
)
pFillSup
->
cur
.
pRowVal
);
resetFillWindow
(
&
pFillSup
->
cur
);
resetFillWindow
(
&
pFillSup
->
next
);
resetFillWindow
(
&
pFillSup
->
nextNext
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
8efd7e87
此差异已折叠。
点击以展开。
source/libs/function/src/builtinsimpl.c
浏览文件 @
8efd7e87
...
...
@@ -2527,6 +2527,8 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t
start
=
pInput
->
startRowIndex
;
if
(
pInfo
->
algo
==
APERCT_ALGO_TDIGEST
)
{
buildTDigestInfo
(
pInfo
);
tdigestAutoFill
(
pInfo
->
pTDigest
,
COMPRESSION
);
for
(
int32_t
i
=
start
;
i
<
pInput
->
numOfRows
+
start
;
++
i
)
{
if
(
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
...
...
@@ -2540,12 +2542,11 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
tdigestAdd
(
pInfo
->
pTDigest
,
v
,
w
);
}
}
else
{
qDebug
(
"%s before add %d elements into histogram, total:%d, numOfEntry:%d, pHisto:%p, elems: %p"
,
__FUNCTION__
,
numOfElems
,
pInfo
->
pHisto
->
numOfElems
,
pInfo
->
pHisto
->
numOfEntries
,
pInfo
->
pHisto
,
pInfo
->
pHisto
->
elems
);
// might be a race condition here that pHisto can be overwritten or setup function
// has not been called, need to relink the buffer pHisto points to.
buildHistogramInfo
(
pInfo
);
qDebug
(
"%s before add %d elements into histogram, total:%d, numOfEntry:%d, pHisto:%p, elems: %p"
,
__FUNCTION__
,
numOfElems
,
pInfo
->
pHisto
->
numOfElems
,
pInfo
->
pHisto
->
numOfEntries
,
pInfo
->
pHisto
,
pInfo
->
pHisto
->
elems
);
for
(
int32_t
i
=
start
;
i
<
pInput
->
numOfRows
+
start
;
++
i
)
{
if
(
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
...
...
@@ -2579,8 +2580,9 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
buildTDigestInfo
(
pOutput
);
TDigest
*
pTDigest
=
pOutput
->
pTDigest
;
tdigestAutoFill
(
pTDigest
,
COMPRESSION
);
if
(
pTDigest
->
num_centroids
<=
0
)
{
if
(
pTDigest
->
num_centroids
<=
0
&&
pTDigest
->
num_buffered_pts
==
0
)
{
memcpy
(
pTDigest
,
pInput
->
pTDigest
,
(
size_t
)
TDIGEST_SIZE
(
COMPRESSION
));
tdigestAutoFill
(
pTDigest
,
COMPRESSION
);
}
else
{
...
...
@@ -2652,6 +2654,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if
(
pInfo
->
algo
==
APERCT_ALGO_TDIGEST
)
{
buildTDigestInfo
(
pInfo
);
tdigestAutoFill
(
pInfo
->
pTDigest
,
COMPRESSION
);
if
(
pInfo
->
pTDigest
->
size
>
0
)
{
pInfo
->
result
=
tdigestQuantile
(
pInfo
->
pTDigest
,
pInfo
->
percent
/
100
);
}
else
{
// no need to free
...
...
source/libs/stream/src/streamState.c
浏览文件 @
8efd7e87
...
...
@@ -24,6 +24,40 @@ typedef struct SStateKey {
int64_t
opNum
;
}
SStateKey
;
typedef
struct
SStateSessionKey
{
SSessionKey
key
;
int64_t
opNum
;
}
SStateSessionKey
;
static
inline
int
sessionKeyCmpr
(
const
SSessionKey
*
pWin1
,
const
SSessionKey
*
pWin2
)
{
if
(
pWin1
->
groupId
>
pWin2
->
groupId
)
{
return
1
;
}
else
if
(
pWin1
->
groupId
<
pWin2
->
groupId
)
{
return
-
1
;
}
if
(
pWin1
->
win
.
skey
>
pWin2
->
win
.
ekey
)
{
return
1
;
}
else
if
(
pWin1
->
win
.
ekey
<
pWin2
->
win
.
skey
)
{
return
-
1
;
}
return
0
;
}
static
inline
int
stateSessionKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
)
{
SStateSessionKey
*
pWin1
=
(
SStateSessionKey
*
)
pKey1
;
SStateSessionKey
*
pWin2
=
(
SStateSessionKey
*
)
pKey2
;
if
(
pWin1
->
opNum
>
pWin2
->
opNum
)
{
return
1
;
}
else
if
(
pWin1
->
opNum
<
pWin2
->
opNum
)
{
return
-
1
;
}
return
sessionKeyCmpr
(
&
pWin1
->
key
,
&
pWin2
->
key
);
}
static
inline
int
stateKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
)
{
SStateKey
*
pWin1
=
(
SStateKey
*
)
pKey1
;
SStateKey
*
pWin2
=
(
SStateKey
*
)
pKey2
;
...
...
@@ -79,6 +113,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto
_err
;
}
if
(
tdbTbOpen
(
"session.state.db"
,
sizeof
(
SStateSessionKey
),
-
1
,
stateSessionKeyCmpr
,
pState
->
db
,
&
pState
->
pSessionStateDb
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"func.state.db"
,
sizeof
(
STupleKey
),
-
1
,
STupleKeyCmpr
,
pState
->
db
,
&
pState
->
pFuncStateDb
)
<
0
)
{
goto
_err
;
}
...
...
@@ -95,6 +134,7 @@ _err:
tdbTbClose
(
pState
->
pStateDb
);
tdbTbClose
(
pState
->
pFuncStateDb
);
tdbTbClose
(
pState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pSessionStateDb
);
tdbClose
(
pState
->
db
);
taosMemoryFree
(
pState
);
return
NULL
;
...
...
@@ -105,6 +145,7 @@ void streamStateClose(SStreamState* pState) {
tdbTbClose
(
pState
->
pStateDb
);
tdbTbClose
(
pState
->
pFuncStateDb
);
tdbTbClose
(
pState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pSessionStateDb
);
tdbClose
(
pState
->
db
);
taosMemoryFree
(
pState
);
...
...
@@ -241,11 +282,11 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
;
int32_t
c
=
0
;
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
tdbTbcMoveTo
(
pCur
->
pCur
,
&
sKey
,
sizeof
(
SStateKey
),
&
c
);
if
(
c
!=
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
...
...
@@ -257,7 +298,7 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key)
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
;
int32_t
c
=
0
;
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
),
&
c
);
if
(
c
!=
0
)
{
streamStateFreeCur
(
pCur
);
...
...
@@ -348,21 +389,21 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
int32_t
c
;
int32_t
c
=
0
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
&
sKey
,
sizeof
(
SStateKey
),
&
c
)
<
0
)
{
tdbTbcClose
(
pCur
->
pCur
);
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
if
(
c
>
0
)
return
pCur
;
if
(
tdbTbcMoveToNext
(
pCur
->
pCur
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -375,20 +416,20 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
return
NULL
;
}
if
(
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
int32_t
c
;
int32_t
c
=
0
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
),
&
c
)
<
0
)
{
tdbTbcClose
(
pCur
->
pCur
);
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
if
(
c
>
0
)
return
pCur
;
if
(
tdbTbcMoveToNext
(
pCur
->
pCur
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -401,20 +442,20 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
return
NULL
;
}
if
(
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
int32_t
c
;
int32_t
c
=
0
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
),
&
c
)
<
0
)
{
tdbTbcClose
(
pCur
->
pCur
);
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
if
(
c
<
0
)
return
pCur
;
if
(
tdbTbcMoveToPrev
(
pCur
->
pCur
)
<
0
)
{
taosMemoryFree
(
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -445,3 +486,229 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
}
void
streamFreeVal
(
void
*
val
)
{
tdbFree
(
val
);
}
int32_t
streamStateSessionPut
(
SStreamState
*
pState
,
const
SSessionKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbUpsert
(
pState
->
pSessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
value
,
vLen
,
&
pState
->
txn
);
}
SStreamStateCur
*
streamStateSessionGetRanomCur
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
=
0
;
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
tdbTbcMoveTo
(
pCur
->
pCur
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
c
);
if
(
c
!=
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
return
pCur
;
}
int32_t
streamStateSessionGet
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
SStreamStateCur
*
pCur
=
streamStateSessionGetRanomCur
(
pState
,
key
);
void
*
tmp
=
NULL
;
if
(
streamStateSessionGetKVByCur
(
pCur
,
key
,
(
const
void
**
)
&
tmp
,
pVLen
)
==
0
)
{
*
pVal
=
tdbRealloc
(
NULL
,
*
pVLen
);
memcpy
(
*
pVal
,
tmp
,
*
pVLen
);
streamStateFreeCur
(
pCur
);
return
0
;
}
streamStateFreeCur
(
pCur
);
return
-
1
;
}
int32_t
streamStateSessionDel
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbDelete
(
pState
->
pSessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
pState
->
txn
);
}
SStreamStateCur
*
streamStateSessionSeekKeyPrev
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
int32_t
c
=
0
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
c
)
<
0
)
{
tdbTbcClose
(
pCur
->
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
if
(
c
>
0
)
return
pCur
;
if
(
tdbTbcMoveToPrev
(
pCur
->
pCur
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
return
pCur
;
}
SStreamStateCur
*
streamStateSessionSeekKeyNext
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
int32_t
c
=
0
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
c
)
<
0
)
{
tdbTbcClose
(
pCur
->
pCur
);
streamStateFreeCur
(
pCur
);
return
NULL
;
}
if
(
c
>
0
)
return
pCur
;
if
(
tdbTbcMoveToNext
(
pCur
->
pCur
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
return
pCur
;
}
int32_t
streamStateSessionGetKVByCur
(
SStreamStateCur
*
pCur
,
SSessionKey
*
pKey
,
const
void
**
pVal
,
int32_t
*
pVLen
)
{
if
(
!
pCur
)
{
return
-
1
;
}
const
SStateSessionKey
*
pKTmp
=
NULL
;
int32_t
kLen
;
if
(
tdbTbcGet
(
pCur
->
pCur
,
(
const
void
**
)
&
pKTmp
,
&
kLen
,
pVal
,
pVLen
)
<
0
)
{
return
-
1
;
}
if
(
pKTmp
->
opNum
!=
pCur
->
number
)
{
return
-
1
;
}
if
(
pKey
->
groupId
!=
0
&&
pKey
->
groupId
!=
pKTmp
->
key
.
groupId
)
{
return
-
1
;
}
*
pKey
=
pKTmp
->
key
;
return
0
;
}
int32_t
streamStateSessionClear
(
SStreamState
*
pState
)
{
SSessionKey
key
=
{.
win
.
skey
=
0
,
.
win
.
ekey
=
0
,
.
groupId
=
0
};
streamStateSessionPut
(
pState
,
&
key
,
NULL
,
0
);
SStreamStateCur
*
pCur
=
streamStateSessionSeekKeyNext
(
pState
,
&
key
);
while
(
1
)
{
SSessionKey
delKey
=
{
0
};
void
*
buf
=
NULL
;
int32_t
size
=
0
;
int32_t
code
=
streamStateSessionGetKVByCur
(
pCur
,
&
delKey
,
buf
,
&
size
);
if
(
code
==
0
)
{
memset
(
buf
,
0
,
size
);
streamStateSessionPut
(
pState
,
&
delKey
,
buf
,
size
);
}
else
{
break
;
}
streamStateCurNext
(
pState
,
pCur
);
}
streamStateFreeCur
(
pCur
);
streamStateSessionDel
(
pState
,
&
key
);
return
0
;
}
SStreamStateCur
*
streamStateSessionGetCur
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
pCur
=
streamStateSessionGetRanomCur
(
pState
,
key
);
SSessionKey
resKey
=
*
key
;
while
(
1
)
{
streamStateCurPrev
(
pState
,
pCur
);
SSessionKey
tmpKey
=
*
key
;
int32_t
code
=
streamStateSessionGetKVByCur
(
pCur
,
&
tmpKey
,
NULL
,
0
);
if
(
code
==
TSDB_CODE_SUCCESS
&&
sessionKeyCmpr
(
key
,
&
tmpKey
)
==
0
)
{
resKey
=
tmpKey
;
}
else
{
break
;
}
}
streamStateFreeCur
(
pCur
);
return
streamStateSessionGetRanomCur
(
pState
,
&
resKey
);
}
int32_t
streamStateSessionAddIfNotExist
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
// todo refactor
SStreamStateCur
*
pCur
=
streamStateSessionGetCur
(
pState
,
key
);
int32_t
size
=
*
pVLen
;
void
*
tmp
=
NULL
;
*
pVal
=
tdbRealloc
(
NULL
,
size
);
memset
(
*
pVal
,
0
,
size
);
if
(
streamStateSessionGetKVByCur
(
pCur
,
key
,
(
const
void
**
)
&
tmp
,
pVLen
)
==
0
)
{
memcpy
(
*
pVal
,
tmp
,
*
pVLen
);
streamStateFreeCur
(
pCur
);
return
0
;
}
streamStateFreeCur
(
pCur
);
return
1
;
}
int32_t
streamStateStateAddIfNotExist
(
SStreamState
*
pState
,
SSessionKey
*
key
,
char
*
pKeyData
,
int32_t
keyDataLen
,
state_key_cmpr_fn
fn
,
void
**
pVal
,
int32_t
*
pVLen
)
{
// todo refactor
int32_t
res
=
TSDB_CODE_SUCCESS
;
SSessionKey
tmpKey
=
*
key
;
int32_t
valSize
=
*
pVLen
;
void
*
tmp
=
tdbRealloc
(
NULL
,
valSize
);
if
(
!
tmp
)
{
return
-
1
;
}
SStreamStateCur
*
pCur
=
streamStateSessionGetRanomCur
(
pState
,
key
);
int32_t
code
=
streamStateSessionGetKVByCur
(
pCur
,
key
,
(
const
void
**
)
pVal
,
pVLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
memcpy
(
tmp
,
*
pVal
,
valSize
);
*
pVal
=
tmp
;
streamStateFreeCur
(
pCur
);
return
res
;
}
streamStateFreeCur
(
pCur
);
streamStateSessionPut
(
pState
,
key
,
NULL
,
0
);
pCur
=
streamStateSessionGetRanomCur
(
pState
,
key
);
streamStateCurPrev
(
pState
,
pCur
);
code
=
streamStateSessionGetKVByCur
(
pCur
,
key
,
(
const
void
**
)
pVal
,
pVLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
void
*
stateKey
=
(
char
*
)(
*
pVal
)
+
(
valSize
-
keyDataLen
);
if
(
fn
(
pKeyData
,
stateKey
)
==
true
)
{
memcpy
(
tmp
,
*
pVal
,
valSize
);
goto
_end
;
}
}
streamStateFreeCur
(
pCur
);
*
key
=
tmpKey
;
pCur
=
streamStateSessionSeekKeyNext
(
pState
,
key
);
code
=
streamStateSessionGetKVByCur
(
pCur
,
key
,
(
const
void
**
)
pVal
,
pVLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
void
*
stateKey
=
(
char
*
)(
*
pVal
)
+
(
valSize
-
keyDataLen
);
if
(
fn
(
pKeyData
,
stateKey
)
==
true
)
{
memcpy
(
tmp
,
*
pVal
,
valSize
);
goto
_end
;
}
}
*
key
=
tmpKey
;
res
=
1
;
memset
(
tmp
,
0
,
valSize
);
_end:
*
pVal
=
tmp
;
streamStateSessionDel
(
pState
,
&
tmpKey
);
streamStateFreeCur
(
pCur
);
return
res
;
}
tests/script/jenkins/basic.txt
浏览文件 @
8efd7e87
...
...
@@ -249,12 +249,18 @@
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/sliding.sim
#
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
#./test.sh -f tsim/stream/partitionbyColumnState.sim
#./test.sh -f tsim/stream/deleteInterval.sim
#./test.sh -f tsim/stream/deleteSession.sim
#./test.sh -f tsim/stream/deleteState.sim
#./test.sh -f tsim/stream/fillIntervalDelete0.sim
#./test.sh -f tsim/stream/fillIntervalDelete1.sim
./test.sh -f tsim/stream/fillIntervalLinear.sim
#./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
./test.sh -f tsim/stream/fillIntervalValue.sim
# ---- transaction ----
./test.sh -f tsim/trans/lossdata1.sim
...
...
tests/script/tsim/stream/session0.sim
浏览文件 @
8efd7e87
...
...
@@ -216,12 +216,12 @@ if $data02 != 3.274823935 then
goto loop2
endi
if $data03 != 1.
8
00000000 then
if $data03 != 1.
5
00000000 then
print ======$data03
return -1
endi
if $data04 != 3.
35
0000000 then
if $data04 != 3.
50
0000000 then
print ======$data04
return -1
endi
...
...
tests/script/tsim/stream/session1.sim
浏览文件 @
8efd7e87
...
...
@@ -5,15 +5,15 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql select * from information_schema.ins_databases
sql create database test vgroups 1
;
sql select * from information_schema.ins_databases
;
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql use test
;
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
...
...
tests/script/tsim/stream/state0.sim
浏览文件 @
8efd7e87
...
...
@@ -349,7 +349,7 @@ endi
if $rows != 3 then
print ====loop4=rows=$rows
#
goto loop4
goto loop4
endi
# row 0
...
...
tests/script/tsim/stream/triggerSession0.sim
浏览文件 @
8efd7e87
...
...
@@ -13,7 +13,7 @@ endi
print $data00 $data01 $data02
sql use test
sql use test
;
sql create table t2(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger window_close into streamt2 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s);
...
...
@@ -58,16 +58,11 @@ endi
sql insert into t2 values(1648791233002,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows !=
1
then
if $rows !=
0
then
print ======$rows
return -1
endi
if $data01 != 6 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791253003,1,2,3,1.0);
sleep 300
sql select * from streamt2;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录