Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
febe6031
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
febe6031
编写于
9月 07, 2022
作者:
D
dapan1121
提交者:
GitHub
9月 07, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16716 from taosdata/feature/TD-18820
feat(stream):change single interval disc buff and interval delete
上级
69f717ec
1f5d9a38
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
914 addition
and
200 deletion
+914
-200
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+9
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+106
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+134
-122
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+79
-24
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+140
-50
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+23
-0
tests/script/tsim/stream/basic1.sim
tests/script/tsim/stream/basic1.sim
+2
-2
tests/script/tsim/stream/deleteInterval.sim
tests/script/tsim/stream/deleteInterval.sim
+419
-0
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
febe6031
...
...
@@ -554,6 +554,8 @@ typedef struct {
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
int32_t
streamStateAddIfNotExist
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateReleaseBuf
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
*
pVal
);
void
streamFreeVal
(
void
*
val
);
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
febe6031
...
...
@@ -411,7 +411,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
STREAM_SCAN_FROM_UPDATERES
,
STREAM_SCAN_FROM_DELETE
RES
,
STREAM_SCAN_FROM_DELETE
_DATA
,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
,
STREAM_SCAN_FROM_DATAREADER_RANGE
,
}
EStreamScanMode
;
...
...
@@ -794,6 +794,7 @@ typedef struct SStreamPartitionOperatorInfo {
void
*
parIte
;
SSDataBlock
*
pInputDataBlock
;
int32_t
tsColIndex
;
SSDataBlock
*
pDelRes
;
}
SStreamPartitionOperatorInfo
;
typedef
struct
STimeSliceOperatorInfo
{
...
...
@@ -1108,6 +1109,13 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
bool
groupbyTbname
(
SNodeList
*
pGroupList
);
int32_t
generateGroupIdMap
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SNodeList
*
groupKey
);
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
int32_t
buildDataBlockFromGroupRes
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
releaseOutputBuf
(
SExecTaskInfo
*
pTaskInfo
,
SWinKey
*
pKey
,
SResultRow
*
pResult
);
int32_t
saveOutput
(
SExecTaskInfo
*
pTaskInfo
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
febe6031
...
...
@@ -3938,7 +3938,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pIntervalPhyNode
,
pTaskInfo
,
isStream
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
pOptr
=
createStreamIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
type
)
{
SMergeAlignedIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeAlignedIntervalPhysiNode
*
)
pPhyNode
;
...
...
@@ -4410,3 +4410,108 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
return
code
;
}
int32_t
setOutputBuf
(
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
,
SExecTaskInfo
*
pTaskInfo
)
{
SWinKey
key
=
{
.
ts
=
win
->
skey
,
.
groupId
=
tableGroupId
,
};
char
*
value
=
NULL
;
int32_t
size
=
pAggSup
->
resultRowSize
;
if
(
streamStateAddIfNotExist
(
pTaskInfo
->
streamInfo
.
pState
,
&
key
,
(
void
**
)
&
value
,
&
size
)
<
0
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
*
pResult
=
(
SResultRow
*
)
value
;
ASSERT
(
*
pResult
);
// set time window for current result
(
*
pResult
)
->
win
=
(
*
win
);
setResultRowInitCtx
(
*
pResult
,
pCtx
,
numOfOutput
,
rowEntryInfoOffset
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
releaseOutputBuf
(
SExecTaskInfo
*
pTaskInfo
,
SWinKey
*
pKey
,
SResultRow
*
pResult
)
{
streamStateReleaseBuf
(
pTaskInfo
->
streamInfo
.
pState
,
pKey
,
pResult
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
saveOutput
(
SExecTaskInfo
*
pTaskInfo
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
)
{
streamStatePut
(
pTaskInfo
->
streamInfo
.
pState
,
pKey
,
pResult
,
resSize
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildDataBlockFromGroupRes
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
)
{
SExprInfo
*
pExprInfo
=
pSup
->
pExprInfo
;
int32_t
numOfExprs
=
pSup
->
numOfExprs
;
int32_t
*
rowEntryOffset
=
pSup
->
rowEntryInfoOffset
;
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
for
(
int32_t
i
=
pGroupResInfo
->
index
;
i
<
numOfRows
;
i
+=
1
)
{
SResKeyPos
*
pPos
=
taosArrayGetP
(
pGroupResInfo
->
pRows
,
i
);
int32_t
size
=
0
;
void
*
pVal
=
NULL
;
SWinKey
key
=
{
.
ts
=
*
(
TSKEY
*
)
pPos
->
key
,
.
groupId
=
pPos
->
groupId
,
};
int32_t
code
=
streamStateGet
(
pTaskInfo
->
streamInfo
.
pState
,
&
key
,
&
pVal
,
&
size
);
ASSERT
(
code
==
0
);
SResultRow
*
pRow
=
(
SResultRow
*
)
pVal
;
doUpdateNumOfRows
(
pCtx
,
pRow
,
numOfExprs
,
rowEntryOffset
);
// no results, continue to check the next one
if
(
pRow
->
numOfRows
==
0
)
{
pGroupResInfo
->
index
+=
1
;
releaseOutputBuf
(
pTaskInfo
,
&
key
,
pRow
);
continue
;
}
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pPos
->
groupId
;
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pPos
->
groupId
)
{
releaseOutputBuf
(
pTaskInfo
,
&
key
,
pRow
);
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
ASSERT
(
pBlock
->
info
.
rows
>
0
);
releaseOutputBuf
(
pTaskInfo
,
&
key
,
pRow
);
break
;
}
pGroupResInfo
->
index
+=
1
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntryOffset
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
releaseOutputBuf
(
pTaskInfo
,
&
key
,
pRow
);
pBlock
->
info
.
rows
+=
pRow
->
numOfRows
;
}
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/groupoperator.c
浏览文件 @
febe6031
...
...
@@ -13,26 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorInt.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "executorInt.h"
static
void
*
getCurrentDataGroupInfo
(
const
SPartitionOperatorInfo
*
pInfo
,
SDataGroupInfo
**
pGroupInfo
,
int32_t
len
);
static
int32_t
*
setupColumnOffset
(
const
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
);
static
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SAggSupporter
*
pAggSup
);
static
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SAggSupporter
*
pAggSup
);
static
void
freeGroupKey
(
void
*
param
)
{
SGroupKeys
*
pKey
=
(
SGroupKeys
*
)
param
;
SGroupKeys
*
pKey
=
(
SGroupKeys
*
)
param
;
taosMemoryFree
(
pKey
->
pData
);
}
...
...
@@ -62,13 +62,13 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pGroupColList
,
i
);
(
*
keyLen
)
+=
pCol
->
bytes
;
// actual data + null_flag
(
*
keyLen
)
+=
pCol
->
bytes
;
// actual data + null_flag
SGroupKeys
key
=
{
0
};
key
.
bytes
=
pCol
->
bytes
;
key
.
type
=
pCol
->
type
;
key
.
bytes
=
pCol
->
bytes
;
key
.
type
=
pCol
->
type
;
key
.
isNull
=
false
;
key
.
pData
=
taosMemoryCalloc
(
1
,
pCol
->
bytes
);
key
.
pData
=
taosMemoryCalloc
(
1
,
pCol
->
bytes
);
if
(
key
.
pData
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -87,7 +87,8 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
return
TSDB_CODE_SUCCESS
;
}
static
bool
groupKeyCompare
(
SArray
*
pGroupCols
,
SArray
*
pGroupColVals
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
,
int32_t
numOfGroupCols
)
{
static
bool
groupKeyCompare
(
SArray
*
pGroupCols
,
SArray
*
pGroupColVals
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
,
int32_t
numOfGroupCols
)
{
SColumnDataAgg
*
pColAgg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pGroupCols
,
i
);
...
...
@@ -112,7 +113,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
if
(
pkey
->
type
==
TSDB_DATA_TYPE_JSON
)
{
int32_t
dataLen
=
getJsonValueLen
(
val
);
if
(
memcmp
(
pkey
->
pData
,
val
,
dataLen
)
==
0
){
if
(
memcmp
(
pkey
->
pData
,
val
,
dataLen
)
==
0
)
{
continue
;
}
else
{
return
false
;
...
...
@@ -154,7 +155,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
pkey
->
isNull
=
false
;
char
*
val
=
colDataGetData
(
pColInfoData
,
rowIndex
);
if
(
pkey
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
tTagIsJson
(
val
))
{
if
(
tTagIsJson
(
val
))
{
terrno
=
TSDB_CODE_QRY_JSON_IN_GROUP_ERROR
;
return
;
}
...
...
@@ -198,13 +199,13 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
}
}
return
(
int32_t
)
(
pStart
-
(
char
*
)
pKey
);
return
(
int32_t
)(
pStart
-
(
char
*
)
pKey
);
}
// assign the group keys or user input constant values if required
static
void
doAssignGroupKeys
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
totalRows
,
int32_t
rowIndex
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
pCtx
[
i
].
functionId
==
-
1
)
{
// select count(*),key from t group by key.
if
(
pCtx
[
i
].
functionId
==
-
1
)
{
// select count(*),key from t group by key.
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
&
pCtx
[
i
]);
SColumnInfoData
*
pColInfoData
=
pCtx
[
i
].
input
.
pData
[
0
];
...
...
@@ -221,7 +222,7 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
}
else
{
memcpy
(
dest
,
data
,
pColInfoData
->
info
.
bytes
);
}
}
else
{
// it is a NULL value
}
else
{
// it is a NULL value
pEntryInfo
->
isNullRes
=
1
;
}
...
...
@@ -275,7 +276,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
setGroupResultOutputBuf
(
pOperator
,
&
(
pInfo
->
binfo
),
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
keyBuf
,
len
,
pBlock
->
info
.
groupId
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
aggSup
);
int32_t
ret
=
setGroupResultOutputBuf
(
pOperator
,
&
(
pInfo
->
binfo
),
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
keyBuf
,
len
,
pBlock
->
info
.
groupId
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -291,9 +293,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if
(
num
>
0
)
{
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
setGroupResultOutputBuf
(
pOperator
,
&
(
pInfo
->
binfo
),
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
keyBuf
,
len
,
pBlock
->
info
.
groupId
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
aggSup
);
int32_t
ret
=
setGroupResultOutputBuf
(
pOperator
,
&
(
pInfo
->
binfo
),
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
keyBuf
,
len
,
pBlock
->
info
.
groupId
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -308,7 +309,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
while
(
1
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
);
...
...
@@ -323,7 +324,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
}
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
}
static
SSDataBlock
*
hashGroupbyAggregate
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -334,7 +335,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
buildGroupResultDataBlock
(
pOperator
);
...
...
@@ -343,7 +344,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
...
...
@@ -362,7 +363,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -403,8 +405,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
goto
_error
;
}
pInfo
->
pGroupCols
=
pGroupColList
;
pInfo
->
pCondition
=
pCondition
;
pInfo
->
pGroupCols
=
pGroupColList
;
pInfo
->
pCondition
=
pCondition
;
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pScalarExprInfo
,
numOfScalarExpr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -425,14 +427,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initBasicInfo
(
&
pInfo
->
binfo
,
pResultBlock
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = OP_Groupby;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
NULL
,
destroyGroupOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
NULL
,
destroyGroupOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -440,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
destroyGroupOperatorInfo
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
...
...
@@ -448,7 +451,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
}
static
void
doHashPartition
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -457,7 +460,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
SDataGroupInfo
*
pGroupInfo
=
NULL
;
void
*
pPage
=
getCurrentDataGroupInfo
(
pInfo
,
&
pGroupInfo
,
len
);
void
*
pPage
=
getCurrentDataGroupInfo
(
pInfo
,
&
pGroupInfo
,
len
);
pGroupInfo
->
numOfRows
+=
1
;
...
...
@@ -467,32 +470,32 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
// number of rows
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
size_t
numOfCols
=
pOperator
->
exprSupp
.
numOfExprs
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExpr
=
&
pOperator
->
exprSupp
.
pExprInfo
[
i
];
int32_t
slotId
=
pExpr
->
base
.
pParam
[
0
].
pCol
->
slotId
;
int32_t
slotId
=
pExpr
->
base
.
pParam
[
0
].
pCol
->
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
int32_t
bytes
=
pColInfoData
->
info
.
bytes
;
int32_t
startOffset
=
pInfo
->
columnOffset
[
i
];
int32_t
*
columnLen
=
NULL
;
int32_t
*
columnLen
=
NULL
;
int32_t
contentLen
=
0
;
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
int32_t
*
offset
=
(
int32_t
*
)((
char
*
)
pPage
+
startOffset
);
columnLen
=
(
int32_t
*
)
((
char
*
)
pPage
+
startOffset
+
sizeof
(
int32_t
)
*
pInfo
->
rowCapacity
);
char
*
data
=
(
char
*
)((
char
*
)
columnLen
+
sizeof
(
int32_t
));
columnLen
=
(
int32_t
*
)
((
char
*
)
pPage
+
startOffset
+
sizeof
(
int32_t
)
*
pInfo
->
rowCapacity
);
char
*
data
=
(
char
*
)((
char
*
)
columnLen
+
sizeof
(
int32_t
));
if
(
colDataIsNull_s
(
pColInfoData
,
j
))
{
offset
[(
*
rows
)]
=
-
1
;
contentLen
=
0
;
}
else
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
)
{
}
else
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
)
{
offset
[
*
rows
]
=
(
*
columnLen
);
char
*
src
=
colDataGetData
(
pColInfoData
,
j
);
char
*
src
=
colDataGetData
(
pColInfoData
,
j
);
int32_t
dataLen
=
getJsonValueLen
(
src
);
memcpy
(
data
+
(
*
columnLen
),
src
,
dataLen
);
...
...
@@ -511,8 +514,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
else
{
char
*
bitmap
=
(
char
*
)
pPage
+
startOffset
;
columnLen
=
(
int32_t
*
)
((
char
*
)
pPage
+
startOffset
+
BitmapLen
(
pInfo
->
rowCapacity
));
char
*
data
=
(
char
*
)
columnLen
+
sizeof
(
int32_t
);
columnLen
=
(
int32_t
*
)
((
char
*
)
pPage
+
startOffset
+
BitmapLen
(
pInfo
->
rowCapacity
));
char
*
data
=
(
char
*
)
columnLen
+
sizeof
(
int32_t
);
bool
isNull
=
colDataIsNull_f
(
pColInfoData
->
nullbitmap
,
j
);
if
(
isNull
)
{
...
...
@@ -539,7 +542,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
SDataGroupInfo
*
p
=
taosHashGet
(
pInfo
->
pGroupSet
,
pInfo
->
keyBuf
,
len
);
void
*
pPage
=
NULL
;
if
(
p
==
NULL
)
{
// it is a new group
if
(
p
==
NULL
)
{
// it is a new group
SDataGroupInfo
gi
=
{
0
};
gi
.
pPageList
=
taosArrayInit
(
100
,
sizeof
(
int32_t
));
taosHashPut
(
pInfo
->
pGroupSet
,
pInfo
->
keyBuf
,
len
,
&
gi
,
sizeof
(
SDataGroupInfo
));
...
...
@@ -550,12 +553,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
pPage
=
getNewBufPage
(
pInfo
->
pBuf
,
&
pageId
);
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
*
(
int32_t
*
)
pPage
=
0
;
*
(
int32_t
*
)
pPage
=
0
;
}
else
{
int32_t
*
curId
=
taosArrayGetLast
(
p
->
pPageList
);
pPage
=
getBufPage
(
pInfo
->
pBuf
,
*
curId
);
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
if
(
*
rows
>=
pInfo
->
rowCapacity
)
{
// release buffer
releaseBufPage
(
pInfo
->
pBuf
,
pPage
);
...
...
@@ -585,17 +588,18 @@ uint64_t calcGroupId(char* pData, int32_t len) {
}
int32_t
*
setupColumnOffset
(
const
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
)
{
size_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
size_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
int32_t
*
offset
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
int32_t
));
offset
[
0
]
=
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
);
// the number of rows in current page, ref to SSDataBlock paged serialization format
offset
[
0
]
=
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
);
// the number of rows in current page, ref to SSDataBlock paged serialization format
for
(
int32_t
i
=
0
;
i
<
numOfCols
-
1
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
-
1
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
int32_t
bytes
=
pColInfoData
->
info
.
bytes
;
int32_t
payloadLen
=
bytes
*
rowCapacity
;
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
// offset segment + content length + payload
offset
[
i
+
1
]
=
rowCapacity
*
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
payloadLen
+
offset
[
i
];
...
...
@@ -609,9 +613,9 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
}
static
void
clearPartitionOperator
(
SPartitionOperatorInfo
*
pInfo
)
{
void
*
ite
=
NULL
;
while
(
(
ite
=
taosHashIterate
(
pInfo
->
pGroupSet
,
ite
))
!=
NULL
)
{
taosArrayDestroy
(
((
SDataGroupInfo
*
)
ite
)
->
pPageList
);
void
*
ite
=
NULL
;
while
((
ite
=
taosHashIterate
(
pInfo
->
pGroupSet
,
ite
))
!=
NULL
)
{
taosArrayDestroy
(
((
SDataGroupInfo
*
)
ite
)
->
pPageList
);
}
taosArrayClear
(
pInfo
->
sortedGroupArray
);
clearDiskbasedBuf
(
pInfo
->
pBuf
);
...
...
@@ -626,13 +630,14 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
return
0
;
}
return
(
pGroupInfo1
->
groupId
<
pGroupInfo2
->
groupId
)
?
-
1
:
1
;
return
(
pGroupInfo1
->
groupId
<
pGroupInfo2
->
groupId
)
?
-
1
:
1
;
}
static
SSDataBlock
*
buildPartitionResult
(
SOperatorInfo
*
pOperator
)
{
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SDataGroupInfo
*
pGroupInfo
=
(
pInfo
->
groupIndex
!=
-
1
)
?
taosArrayGet
(
pInfo
->
sortedGroupArray
,
pInfo
->
groupIndex
)
:
NULL
;
SDataGroupInfo
*
pGroupInfo
=
(
pInfo
->
groupIndex
!=
-
1
)
?
taosArrayGet
(
pInfo
->
sortedGroupArray
,
pInfo
->
groupIndex
)
:
NULL
;
if
(
pInfo
->
groupIndex
==
-
1
||
pInfo
->
pageIndex
>=
taosArrayGetSize
(
pGroupInfo
->
pPageList
))
{
// try next group data
++
pInfo
->
groupIndex
;
...
...
@@ -647,7 +652,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
}
int32_t
*
pageId
=
taosArrayGet
(
pGroupInfo
->
pPageList
,
pInfo
->
pageIndex
);
void
*
page
=
getBufPage
(
pInfo
->
pBuf
,
*
pageId
);
void
*
page
=
getBufPage
(
pInfo
->
pBuf
,
*
pageId
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pInfo
->
rowCapacity
);
blockDataFromBuf1
(
pInfo
->
binfo
.
pRes
,
page
,
pInfo
->
rowCapacity
);
...
...
@@ -670,14 +675,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
blockDataCleanup
(
pRes
);
return
buildPartitionResult
(
pOperator
);
}
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
...
...
@@ -688,7 +693,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -727,7 +733,7 @@ static void destroyPartitionOperatorInfo(void* param) {
cleanupBasicInfo
(
&
pInfo
->
binfo
);
taosArrayDestroy
(
pInfo
->
pGroupCols
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pGroupColVals
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pGroupColVals
);
i
++
)
{
SGroupKeys
key
=
*
(
SGroupKeys
*
)
taosArrayGet
(
pInfo
->
pGroupColVals
,
i
);
taosMemoryFree
(
key
.
pData
);
}
...
...
@@ -743,24 +749,25 @@ static void destroyPartitionOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SPartitionOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SPartitionOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPartNode
->
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
numOfCols
);
pInfo
->
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
if
(
pPartNode
->
pExprs
!=
NULL
)
{
int32_t
num
=
0
;
int32_t
num
=
0
;
SExprInfo
*
pExprInfo1
=
createExprInfo
(
pPartNode
->
pExprs
,
NULL
,
&
num
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pExprInfo1
,
num
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pExprInfo1
,
num
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -772,7 +779,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto
_error
;
}
uint32_t
defaultPgsz
=
0
;
uint32_t
defaultPgsz
=
0
;
uint32_t
defaultBufsz
=
0
;
getBufferPgSize
(
pResBlock
->
info
.
rowSize
,
&
defaultPgsz
,
&
defaultBufsz
);
...
...
@@ -794,15 +801,15 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto
_error
;
}
pOperator
->
name
=
"PartitionOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
name
=
"PartitionOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PARTITION
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashPartition
,
NULL
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
@@ -810,16 +817,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SAggSupporter
*
pAggSup
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SAggSupporter
*
pAggSup
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SResultRowInfo
*
pResultRowInfo
=
&
binfo
->
resultRowInfo
;
SqlFunctionCtx
*
pCtx
=
pOperator
->
exprSupp
.
pCtx
;
...
...
@@ -833,37 +840,36 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
)
{
if
(
pExprSup
->
pExprInfo
!=
NULL
)
{
int32_t
code
=
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
int32_t
code
=
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"calaculate group id error, code:%d"
,
code
);
}
}
recordNewGroupKeys
(
pParSup
->
pGroupCols
,
pParSup
->
pGroupColVals
,
pBlock
,
rowId
);
int32_t
len
=
buildGroupKeys
(
pParSup
->
keyBuf
,
pParSup
->
pGroupColVals
);
int32_t
len
=
buildGroupKeys
(
pParSup
->
keyBuf
,
pParSup
->
pGroupColVals
);
uint64_t
groupId
=
calcGroupId
(
pParSup
->
keyBuf
,
len
);
return
groupId
;
}
static
bool
hasRemainPartion
(
SStreamPartitionOperatorInfo
*
pInfo
)
{
return
pInfo
->
parIte
!=
NULL
;
}
static
bool
hasRemainPartion
(
SStreamPartitionOperatorInfo
*
pInfo
)
{
return
pInfo
->
parIte
!=
NULL
;
}
static
SSDataBlock
*
buildStreamPartitionResult
(
SOperatorInfo
*
pOperator
)
{
SStreamPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pDest
=
pInfo
->
binfo
.
pRes
;
SSDataBlock
*
pDest
=
pInfo
->
binfo
.
pRes
;
ASSERT
(
hasRemainPartion
(
pInfo
));
SPartitionDataInfo
*
pParInfo
=
(
SPartitionDataInfo
*
)
pInfo
->
parIte
;
blockDataCleanup
(
pDest
);
int32_t
rows
=
taosArrayGetSize
(
pParInfo
->
rowIds
);
int32_t
rows
=
taosArrayGetSize
(
pParInfo
->
rowIds
);
SSDataBlock
*
pSrc
=
pInfo
->
pInputDataBlock
;
for
(
int32_t
i
=
0
;
i
<
rows
;
i
++
)
{
int32_t
rowIndex
=
*
(
int32_t
*
)
taosArrayGet
(
pParInfo
->
rowIds
,
i
);
for
(
int32_t
j
=
0
;
j
<
pOperator
->
exprSupp
.
numOfExprs
;
j
++
)
{
int32_t
slotId
=
pOperator
->
exprSupp
.
pExprInfo
[
j
].
base
.
pParam
[
0
].
pCol
->
slotId
;
int32_t
slotId
=
pOperator
->
exprSupp
.
pExprInfo
[
j
].
base
.
pParam
[
0
].
pCol
->
slotId
;
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
pSrc
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
j
);
bool
isNull
=
colDataIsNull
(
pSrcCol
,
pSrc
->
info
.
rows
,
rowIndex
,
NULL
);
char
*
pSrcData
=
colDataGetData
(
pSrcCol
,
rowIndex
);
bool
isNull
=
colDataIsNull
(
pSrcCol
,
pSrc
->
info
.
rows
,
rowIndex
,
NULL
);
char
*
pSrcData
=
colDataGetData
(
pSrcCol
,
rowIndex
);
colDataAppend
(
pDestCol
,
pDest
->
info
.
rows
,
pSrcData
,
isNull
);
}
pDest
->
info
.
rows
++
;
...
...
@@ -881,9 +887,9 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
pInfo
->
pInputDataBlock
=
pBlock
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
recordNewGroupKeys
(
pInfo
->
partitionSup
.
pGroupCols
,
pInfo
->
partitionSup
.
pGroupColVals
,
pBlock
,
i
);
int32_t
keyLen
=
buildGroupKeys
(
pInfo
->
partitionSup
.
keyBuf
,
pInfo
->
partitionSup
.
pGroupColVals
);
SPartitionDataInfo
*
pParData
=
(
SPartitionDataInfo
*
)
taosHashGet
(
pInfo
->
pPartitions
,
pInfo
->
partitionSup
.
keyBuf
,
keyLen
);
int32_t
keyLen
=
buildGroupKeys
(
pInfo
->
partitionSup
.
keyBuf
,
pInfo
->
partitionSup
.
pGroupColVals
);
SPartitionDataInfo
*
pParData
=
(
SPartitionDataInfo
*
)
taosHashGet
(
pInfo
->
pPartitions
,
pInfo
->
partitionSup
.
keyBuf
,
keyLen
);
if
(
pParData
)
{
taosArrayPush
(
pParData
->
rowIds
,
&
i
);
}
else
{
...
...
@@ -891,8 +897,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
newParData
.
groupId
=
calcGroupId
(
pInfo
->
partitionSup
.
keyBuf
,
keyLen
);
newParData
.
rowIds
=
taosArrayInit
(
64
,
sizeof
(
int32_t
));
taosArrayPush
(
newParData
.
rowIds
,
&
i
);
taosHashPut
(
pInfo
->
pPartitions
,
pInfo
->
partitionSup
.
keyBuf
,
keyLen
,
&
newParData
,
sizeof
(
SPartitionDataInfo
));
taosHashPut
(
pInfo
->
pPartitions
,
pInfo
->
partitionSup
.
keyBuf
,
keyLen
,
&
newParData
,
sizeof
(
SPartitionDataInfo
));
}
}
}
...
...
@@ -902,13 +907,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
hasRemainPartion
(
pInfo
))
{
return
buildStreamPartitionResult
(
pOperator
);
}
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
{
pInfo
->
pInputDataBlock
=
NULL
;
...
...
@@ -924,14 +929,18 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_INVALID
:
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
break
;
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
}
break
;
default:
return
pBlock
;
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -940,7 +949,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
doStreamHashPartitionImpl
(
pInfo
,
pBlock
);
}
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pInfo
->
parIte
=
taosHashIterate
(
pInfo
->
pPartitions
,
NULL
);
return
buildStreamPartitionResult
(
pOperator
);
}
...
...
@@ -950,7 +959,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
cleanupBasicInfo
(
&
pInfo
->
binfo
);
taosArrayDestroy
(
pInfo
->
partitionSup
.
pGroupCols
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
partitionSup
.
pGroupColVals
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
partitionSup
.
pGroupColVals
);
i
++
)
{
SGroupKeys
key
=
*
(
SGroupKeys
*
)
taosArrayGet
(
pInfo
->
partitionSup
.
pGroupColVals
,
i
);
taosMemoryFree
(
key
.
pData
);
}
...
...
@@ -958,6 +967,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
taosMemoryFree
(
pInfo
->
partitionSup
.
keyBuf
);
cleanupExprSupp
(
&
pInfo
->
scalarSup
);
blockDataDestroy
(
pInfo
->
pDelRes
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -970,7 +980,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
pScanInfo
->
pPartScalarSup
=
pExpr
;
}
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamPartitionOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamPartitionOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -980,7 +991,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo
->
partitionSup
.
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
if
(
pPartNode
->
pExprs
!=
NULL
)
{
int32_t
num
=
0
;
int32_t
num
=
0
;
SExprInfo
*
pCalExprInfo
=
createExprInfo
(
pPartNode
->
pExprs
,
NULL
,
&
num
);
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pCalExprInfo
,
num
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -989,7 +1000,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
int32_t
keyLen
=
0
;
code
=
initGroupOptrInfo
(
&
pInfo
->
partitionSup
.
pGroupColVals
,
&
keyLen
,
&
pInfo
->
partitionSup
.
keyBuf
,
pInfo
->
partitionSup
.
pGroupCols
);
code
=
initGroupOptrInfo
(
&
pInfo
->
partitionSup
.
pGroupColVals
,
&
keyLen
,
&
pInfo
->
partitionSup
.
keyBuf
,
pInfo
->
partitionSup
.
pGroupCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -1000,35 +1012,35 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
goto
_error
;
}
blockDataEnsureCapacity
(
pResBlock
,
4096
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
parIte
=
NULL
;
pInfo
->
pInputDataBlock
=
NULL
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
parIte
=
NULL
;
pInfo
->
pInputDataBlock
=
NULL
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pPartitions
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
tsColIndex
=
0
;
pInfo
->
pPartitions
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
tsColIndex
=
0
;
pInfo
->
pDelRes
=
createSpecialDataBlock
(
STREAM_DELETE_RESULT
);
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
numOfCols
);
pOperator
->
name
=
"StreamPartitionOperator"
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
name
=
"StreamPartitionOperator"
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamHashPartition
,
NULL
,
NULL
,
destroyStreamPartitionOperatorInfo
,
NULL
,
NULL
,
NULL
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamHashPartition
,
NULL
,
NULL
,
destroyStreamPartitionOperatorInfo
,
NULL
,
NULL
,
NULL
);
initParDownStream
(
downstream
,
&
pInfo
->
partitionSup
,
&
pInfo
->
scalarSup
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFreeClear
(
pInfo
);
destroyStreamPartitionOperatorInfo
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
source/libs/executor/src/scanoperator.c
浏览文件 @
febe6031
...
...
@@ -1057,24 +1057,24 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
return
true
;
}
static
STimeWindow
getSlidingWindow
(
TSKEY
*
tsCol
,
SInterval
*
pInterval
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
*
pRowIndex
,
bool
hasGroup
)
{
static
STimeWindow
getSlidingWindow
(
TSKEY
*
startTsCol
,
TSKEY
*
endTsCol
,
SInterval
*
pInterval
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
*
pRowIndex
,
bool
hasGroup
)
{
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
t
sCol
[
*
pRowIndex
],
pInterval
,
TSDB_ORDER_ASC
);
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
startT
sCol
[
*
pRowIndex
],
pInterval
,
TSDB_ORDER_ASC
);
STimeWindow
endWin
=
win
;
STimeWindow
preWin
=
win
;
while
(
1
)
{
if
(
hasGroup
)
{
(
*
pRowIndex
)
+=
1
;
}
else
{
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
pDataBlockInfo
,
tsCol
,
*
pRowIndex
,
endWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
pDataBlockInfo
,
startTsCol
,
*
pRowIndex
,
endWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
}
do
{
preWin
=
endWin
;
getNextTimeWindow
(
pInterval
,
&
endWin
,
TSDB_ORDER_ASC
);
}
while
(
t
sCol
[(
*
pRowIndex
)
-
1
]
>=
endWin
.
skey
);
}
while
(
endT
sCol
[(
*
pRowIndex
)
-
1
]
>=
endWin
.
skey
);
endWin
=
preWin
;
if
(
win
.
ekey
==
endWin
.
ekey
||
(
*
pRowIndex
)
==
pDataBlockInfo
->
rows
)
{
win
.
ekey
=
endWin
.
ekey
;
...
...
@@ -1102,6 +1102,11 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pResult
,
NULL
);
if
(
pResult
->
info
.
rows
==
0
)
{
continue
;
}
if
(
pInfo
->
partitionSup
.
needCalc
)
{
SSDataBlock
*
tmpBlock
=
createOneDataBlock
(
pResult
,
true
);
blockDataCleanup
(
pResult
);
...
...
@@ -1188,13 +1193,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return
code
;
}
SColumnInfoData
*
pSrcTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
SColumnInfoData
*
pSrcGpCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
ASSERT
(
pSrcTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pSrcTsCol
->
pData
;
ASSERT
(
pSrcStartTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
TSKEY
*
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDeUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
...
...
@@ -1204,12 +1211,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
rows
;)
{
uint64_t
srcUid
=
srcUidData
[
i
];
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
srcUid
,
t
sCol
[
i
],
version
);
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
srcUid
,
srcStartT
sCol
[
i
],
version
);
uint64_t
srcGpId
=
srcGp
[
i
];
TSKEY
calStartTs
=
t
sCol
[
i
];
TSKEY
calStartTs
=
srcStartT
sCol
[
i
];
colDataAppend
(
pCalStartTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
calStartTs
),
false
);
STimeWindow
win
=
getSlidingWindow
(
tsCol
,
&
pInfo
->
interval
,
&
pSrcBlock
->
info
,
&
i
,
pInfo
->
partitionSup
.
needCalc
);
TSKEY
calEndTs
=
tsCol
[
i
-
1
];
STimeWindow
win
=
getSlidingWindow
(
srcStartTsCol
,
srcEndTsCol
,
&
pInfo
->
interval
,
&
pSrcBlock
->
info
,
&
i
,
pInfo
->
partitionSup
.
needCalc
);
TSKEY
calEndTs
=
srcStartTsCol
[
i
-
1
];
colDataAppend
(
pCalEndTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
calEndTs
),
false
);
colDataAppend
(
pDeUidCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
srcUid
),
false
);
colDataAppend
(
pStartTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
win
.
skey
),
false
);
...
...
@@ -1229,11 +1237,49 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
generateDeleteResultBlock
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pSrcBlock
,
SSDataBlock
*
pDestBlock
)
{
if
(
pSrcBlock
->
info
.
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
blockDataCleanup
(
pDestBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
pSrcBlock
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
ASSERT
(
taosArrayGetSize
(
pSrcBlock
->
pDataBlock
)
>=
3
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startData
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
endData
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
uidCol
=
(
uint64_t
*
)
pUidCol
->
pData
;
SColumnInfoData
*
pDestStartCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestEndCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
uidCol
[
i
],
startData
[
i
],
version
);
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)(
startData
+
i
),
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)(
endData
+
i
),
false
);
colDataAppendNULL
(
pDestUidCol
,
i
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppendNULL
(
pDestCalStartTsCol
,
i
);
colDataAppendNULL
(
pDestCalEndTsCol
,
i
);
pDestBlock
->
info
.
rows
++
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
generateScanRange
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pSrcBlock
,
SSDataBlock
*
pDestBlock
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
isIntervalWindow
(
pInfo
))
{
code
=
generateIntervalScanRange
(
pInfo
,
pSrcBlock
,
pDestBlock
);
}
else
{
}
else
if
(
isSessionWindow
(
pInfo
)
||
isStateWindow
(
pInfo
))
{
code
=
generateSessionScanRange
(
pInfo
,
pSrcBlock
,
pDestBlock
);
}
pDestBlock
->
info
.
type
=
STREAM_CLEAR
;
...
...
@@ -1510,14 +1556,23 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoAddCloseWindowSBF
(
pInfo
->
pUpdateInfo
);
}
break
;
case
STREAM_DELETE_DATA
:
{
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
updateResIndex
=
0
;
generateScanRange
(
pInfo
,
pBlock
,
pInfo
->
pUpdateRes
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
return
pInfo
->
pDeleteDataRes
;
printDataBlock
(
pBlock
,
"stream scan delete recv"
);
if
(
!
isIntervalWindow
(
pInfo
)
&&
!
isSessionWindow
(
pInfo
)
&&
!
isStateWindow
(
pInfo
))
{
generateDeleteResultBlock
(
pInfo
,
pBlock
,
pInfo
->
pDeleteDataRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
printDataBlock
(
pBlock
,
"stream scan delete result"
);
return
pInfo
->
pDeleteDataRes
;
}
else
{
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
updateResIndex
=
0
;
generateScanRange
(
pInfo
,
pBlock
,
pInfo
->
pUpdateRes
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
printDataBlock
(
pBlock
,
"stream scan delete data"
);
return
pInfo
->
pDeleteDataRes
;
}
}
break
;
default:
break
;
...
...
@@ -1532,7 +1587,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
return
pInfo
->
pRes
;
}
break
;
case
STREAM_SCAN_FROM_DELETE
RES
:
{
case
STREAM_SCAN_FROM_DELETE
_DATA
:
{
generateScanRange
(
pInfo
,
pInfo
->
pUpdateDataRes
,
pInfo
->
pUpdateRes
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
...
...
@@ -1646,7 +1701,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
return
pInfo
->
pUpdateDataRes
;
}
else
if
(
pInfo
->
pUpdateDataRes
->
info
.
type
==
STREAM_DELETE_DATA
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DELETE
RES
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DELETE
_DATA
;
}
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
febe6031
...
...
@@ -955,8 +955,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
inputOrder
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -983,7 +983,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
numOfOutput
);
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
...
...
@@ -1406,20 +1406,25 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
SHashObj
*
pUpdatedMap
)
{
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
tsStarts
=
(
TSKEY
*
)
pStartCol
->
pData
;
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
tsEnds
=
(
TSKEY
*
)
pEndCol
->
pData
;
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
groupIds
=
(
uint64_t
*
)
pGroupCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsStarts
[
i
],
pInterval
,
TSDB_ORDER_ASC
);
doDeleteIntervalWindow
(
pAggSup
,
win
.
skey
,
groupIds
[
i
]);
SWinKey
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
groupIds
[
i
]};
if
(
pDelWins
)
{
taosArrayPush
(
pDelWins
,
&
winRes
);
}
if
(
pUpdatedMap
)
{
taosHashRemove
(
pUpdatedMap
,
&
winRes
,
sizeof
(
SWinKey
));
}
do
{
doDeleteIntervalWindow
(
pAggSup
,
win
.
skey
,
groupIds
[
i
]);
SWinKey
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
groupIds
[
i
]};
if
(
pDelWins
)
{
taosArrayPush
(
pDelWins
,
&
winRes
);
}
if
(
pUpdatedMap
)
{
taosHashRemove
(
pUpdatedMap
,
&
winRes
,
sizeof
(
SWinKey
));
}
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
}
while
(
win
.
skey
<
tsEnds
[
i
]);
}
}
...
...
@@ -2775,7 +2780,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
parentWin
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pChildSup
->
pCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
}
if
(
num
>
1
&&
pUpdatedMap
)
{
if
(
num
>
0
&&
pUpdatedMap
)
{
saveWinResultRow
(
pCurResult
,
pWinRes
->
groupId
,
pUpdatedMap
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
.
cur
);
}
...
...
@@ -2807,15 +2812,14 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
static
void
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
uint64_t
tableGroupId
,
SHashObj
*
pUpdatedMap
)
{
static
void
doHashInterval
Agg
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
uint64_t
tableGroupId
,
SHashObj
*
pUpdatedMap
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
(
SStreamFinalIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SResultRowInfo
*
pResultRowInfo
=
&
(
pInfo
->
binfo
.
resultRowInfo
);
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
step
=
1
;
bool
ascScan
=
true
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
int32_t
forwardRows
=
0
;
...
...
@@ -2824,7 +2828,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
int32_t
startPos
=
ascScan
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
)
;
int32_t
startPos
=
0
;
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
);
STimeWindow
nextWin
=
{
0
};
if
(
IS_FINAL_OP
(
pInfo
))
{
...
...
@@ -3165,7 +3169,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
doHashInterval
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdatedMap
);
doHashInterval
Agg
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdatedMap
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
...
...
@@ -3183,7 +3187,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
SStreamFinalIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
setInputDataBlock
(
pChildOp
,
pChildOp
->
exprSupp
.
pCtx
,
pBlock
,
pChInfo
->
order
,
MAIN_SCAN
,
true
);
doHashInterval
(
pChildOp
,
pBlock
,
pBlock
->
info
.
groupId
,
NULL
);
doHashInterval
Agg
(
pChildOp
,
pBlock
,
pBlock
->
info
.
groupId
,
NULL
);
}
}
...
...
@@ -5468,25 +5472,24 @@ _error:
return
NULL
;
}
static
void
doStreamIntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SHashObj
*
pUpdatedMap
)
{
static
void
doStreamIntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SHashObj
*
pUpdatedMap
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
(
SStreamIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
true
;
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
true
;
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
TSDB_ORDER_ASC
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
TSDB_ORDER_ASC
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
...
...
@@ -5547,11 +5550,88 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo
}
}
static
void
doStreamIntervalAggImpl2
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
uint64_t
tableGroupId
,
SHashObj
*
pUpdatedMap
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
(
SStreamIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SResultRowInfo
*
pResultRowInfo
=
&
(
pInfo
->
binfo
.
resultRowInfo
);
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
step
=
1
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
int32_t
forwardRows
=
0
;
int32_t
aa
=
4
;
ASSERT
(
pSDataBlock
->
pDataBlock
!=
NULL
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
int32_t
startPos
=
0
;
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
);
STimeWindow
nextWin
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
TSDB_ORDER_ASC
);
while
(
1
)
{
bool
isClosed
=
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
);
if
((
pInfo
->
ignoreExpiredData
&&
isClosed
)
||
!
inSlidingWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
))
{
startPos
=
getNexWindowPos
(
&
pInfo
->
interval
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
&
nextWin
);
if
(
startPos
<
0
)
{
break
;
}
continue
;
}
int32_t
code
=
setOutputBuf
(
&
nextWin
,
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdatedMap
)
{
saveWinResultRow
(
pResult
,
tableGroupId
,
pUpdatedMap
);
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pSDataBlock
->
info
.
rows
,
numOfOutput
);
SWinKey
key
=
{
.
ts
=
nextWin
.
skey
,
.
groupId
=
tableGroupId
,
};
saveOutput
(
pTaskInfo
,
&
key
,
pResult
,
pInfo
->
aggSup
.
resultRowSize
);
releaseOutputBuf
(
pTaskInfo
,
&
key
,
pResult
);
int32_t
prevEndPos
=
(
forwardRows
-
1
)
*
step
+
startPos
;
ASSERT
(
pSDataBlock
->
info
.
window
.
skey
>
0
&&
pSDataBlock
->
info
.
window
.
ekey
>
0
);
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
,
tsCols
,
prevEndPos
,
TSDB_ORDER_ASC
);
if
(
startPos
<
0
)
{
break
;
}
}
}
void
doBuildResult
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
SGroupResInfo
*
pGroupResInfo
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
// set output datablock version
pBlock
->
info
.
version
=
pTaskInfo
->
version
;
blockDataCleanup
(
pBlock
);
if
(
!
hasRemainResults
(
pGroupResInfo
))
{
return
;
}
// clear the existed group id
pBlock
->
info
.
groupId
=
0
;
buildDataBlockFromGroupRes
(
pTaskInfo
,
pBlock
,
&
pOperator
->
exprSupp
,
pGroupResInfo
);
}
static
SSDataBlock
*
doStreamIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int64_t
maxTs
=
INT64_MIN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int64_t
maxTs
=
INT64_MIN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
...
...
@@ -5622,6 +5702,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
doStreamIntervalAggImpl
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdatedMap
);
// new disc buf
// doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
...
...
@@ -5664,6 +5746,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
// new disc buf
finalizeUpdatedResult
(
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -5676,6 +5759,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
// new disc buf
// doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
}
...
...
@@ -5697,25 +5782,29 @@ void destroyStreamIntervalOperatorInfo(void* param) {
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamIntervalOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
SStreamIntervalPhysiNode
*
pIntervalPhyNode
=
(
SStreamIntervalPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
ASSERT
(
numOfCols
>
0
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
,
};
STimeWindowAggSupp
twAggSupp
=
{.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
};
SInterval
interval
=
{
.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
,
};
STimeWindowAggSupp
twAggSupp
=
{
.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
};
ASSERT
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
interval
=
interval
;
...
...
@@ -5732,11 +5821,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
}
}
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
;
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
int32_t
code
=
initAggInfo
(
pSup
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
int32_t
code
=
initAggInfo
(
pSup
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -5758,8 +5847,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
destroyStreamIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
destroyStreamIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
source/libs/stream/src/streamState.c
浏览文件 @
febe6031
...
...
@@ -112,6 +112,29 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return
tdbTbDelete
(
pState
->
pStateDb
,
key
,
sizeof
(
SWinKey
),
&
pState
->
txn
);
}
int32_t
streamStateAddIfNotExist
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
// todo refactor
int32_t
size
=
*
pVLen
;
if
(
streamStateGet
(
pState
,
key
,
pVal
,
pVLen
)
==
0
)
{
return
0
;
}
void
*
tmp
=
taosMemoryCalloc
(
1
,
size
);
if
(
streamStatePut
(
pState
,
key
,
&
tmp
,
size
)
==
0
)
{
taosMemoryFree
(
tmp
);
int32_t
code
=
streamStateGet
(
pState
,
key
,
pVal
,
pVLen
);
ASSERT
(
code
==
0
);
return
code
;
}
taosMemoryFree
(
tmp
);
return
-
1
;
}
int32_t
streamStateReleaseBuf
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
*
pVal
)
{
// todo refactor
streamFreeVal
(
pVal
);
return
0
;
}
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
...
...
tests/script/tsim/stream/basic1.sim
浏览文件 @
febe6031
...
...
@@ -5,7 +5,7 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql create database test vgroups 1
;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
...
...
@@ -13,7 +13,7 @@ endi
print $data00 $data01 $data02
sql use test
sql use test
;
sql create table t1(ts timestamp, a int, b int , c int, d double);
...
...
tests/script/tsim/stream/deleteInterval.sim
0 → 100644
浏览文件 @
febe6031
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 200
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop0:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != NULL then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213001,2,2,2,2.0);
sql insert into t1 values(1648791213002,3,3,3,3.0);
sql insert into t1 values(1648791213003,4,4,4,4.0);
sleep 200
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
$loop_count = 0
loop3:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 4 then
print =====data02=$data02
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop4
endi
sleep 200
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop5:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data02 != 4 then
print =====data02=$data02
goto loop5
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213005,2,2,2,2.0);
sql insert into t1 values(1648791213006,3,3,3,3.0);
sql insert into t1 values(1648791213007,4,4,4,4.0);
sql insert into t1 values(1648791223000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,2.0);
sql insert into t1 values(1648791223002,3,3,3,3.0);
sql insert into t1 values(1648791223003,4,4,4,4.0);
sql insert into t1 values(1648791233000,1,1,1,1.0);
sql insert into t1 values(1648791233001,2,2,2,2.0);
sql insert into t1 values(1648791233008,3,3,3,3.0);
sql insert into t1 values(1648791233009,4,4,4,4.0);
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop6:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 2 then
print =====data11=$data11
goto loop6
endi
if $data12 != 4 then
print =====data12=$data12
goto loop6
endi
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop7
endi
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop8:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != NULL then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
$loop_count = 0
loop11:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop11
endi
if $data02 != NULL then
print =====data02=$data02
goto loop11
endi
if $data11 != 6 then
print =====data11=$data11
goto loop11
endi
if $data12 != 3 then
print =====data12=$data12
goto loop11
endi
sleep 200
sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop12:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop12
endi
if $data01 != 1 then
print =====data01=$data01
goto loop12
endi
if $data02 != NULL then
print =====data02=$data02
goto loop12
endi
sql insert into t1 values(1648791213004,3,2,3,1.0);
sql insert into t1 values(1648791213005,3,2,3,1.0);
sql insert into t1 values(1648791213006,3,2,3,1.0);
sql insert into t1 values(1648791223004,1,2,3,1.0);
sql insert into t2 values(1648791213004,3,2,3,1.0);
sql insert into t2 values(1648791213005,3,2,3,1.0);
sql insert into t2 values(1648791213006,3,2,3,1.0);
sql insert into t2 values(1648791223004,1,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
$loop_count = 0
loop13:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop13
endi
if $data01 != 4 then
print =====data01=$data01
goto loop13
endi
if $data02 != 3 then
print =====data02=$data02
goto loop13
endi
if $data11 != 2 then
print =====data11=$data11
goto loop13
endi
if $data12 != 1 then
print =====data12=$data12
goto loop13
endi
sql insert into t1 values(1648791223005,1,2,3,1.0);
sql insert into t1 values(1648791223006,1,2,3,1.0);
sql insert into t2 values(1648791223005,1,2,3,1.0);
sql insert into t2 values(1648791223006,1,2,3,1.0);
sql insert into t1 values(1648791233005,4,2,3,1.0);
sql insert into t1 values(1648791233006,2,2,3,1.0);
sql insert into t2 values(1648791233005,5,2,3,1.0);
sql insert into t2 values(1648791233006,3,2,3,1.0);
sleep 200
sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop14:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
if $data02 != NULL then
print =====data02=$data02
goto loop14
endi
if $data11 != 2 then
print =====data11=$data11
goto loop14
endi
if $data12 != 3 then
print =====data12=$data12
goto loop14
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录