Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
72f19ae5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
72f19ae5
编写于
5月 26, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/valgrind
上级
510b5fce
e5a15402
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
367 addition
and
67 deletion
+367
-67
.gitignore
.gitignore
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+4
-0
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
source/common/src/systable.c
source/common/src/systable.c
+2
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-1
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+5
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+115
-7
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+26
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-21
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+196
-31
未找到文件。
.gitignore
浏览文件 @
72f19ae5
...
...
@@ -110,3 +110,4 @@ contrib/*
!contrib/CMakeLists.txt
!contrib/test
sql
debug*/
include/common/tmsg.h
浏览文件 @
72f19ae5
...
...
@@ -1010,6 +1010,10 @@ typedef struct {
SReplica
replicas
[
TSDB_MAX_REPLICA
];
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
// SRetention
// for tsma
int8_t
isTsma
;
}
SCreateVnodeReq
;
int32_t
tSerializeSCreateVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateVnodeReq
*
pReq
);
...
...
include/libs/nodes/nodes.h
浏览文件 @
72f19ae5
...
...
@@ -214,6 +214,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_FILL
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
...
...
source/common/src/systable.c
浏览文件 @
72f19ae5
...
...
@@ -196,12 +196,14 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.
name
=
"status"
,
.
bytes
=
12
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"nfiles"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"file_size"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tsma"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
};
static
const
SSysDbTableSchema
smaSchema
[]
=
{
{.
name
=
"sma_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"stable_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SSysDbTableSchema
transSchema
[]
=
{
...
...
source/common/src/tmsg.c
浏览文件 @
72f19ae5
...
...
@@ -2917,6 +2917,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
if
(
tEncodeI8
(
&
encoder
,
pReq
->
isTsma
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -2979,6 +2981,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
}
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
isTsma
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
72f19ae5
...
...
@@ -183,7 +183,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
dDebug
(
"vgId:%d, create vnode req is received
"
,
createReq
.
vgId
);
dDebug
(
"vgId:%d, create vnode req is received
, tsma:%d"
,
createReq
.
vgId
,
createReq
.
isTsma
);
SVnodeCfg
vnodeCfg
=
{
0
};
vmGenerateVnodeCfg
(
&
createReq
,
&
vnodeCfg
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
72f19ae5
...
...
@@ -330,6 +330,7 @@ typedef struct {
int64_t
compStorage
;
int64_t
pointsWritten
;
int8_t
compact
;
int8_t
isTsma
;
int8_t
replica
;
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
}
SVgObj
;
...
...
@@ -588,7 +589,8 @@ typedef struct {
int8_t
status
;
int8_t
createdBy
;
// STREAM_CREATED_BY__USER or SMA
int32_t
fixedSinkVgId
;
// 0 for shuffle
int64_t
smaId
;
// 0 for unused
SVgObj
fixedSinkVg
;
int64_t
smaId
;
// 0 for unused
int8_t
trigger
;
int32_t
triggerParam
;
int64_t
waterMark
;
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
72f19ae5
...
...
@@ -30,6 +30,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
const
SVgObj
*
pVgroup
);
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
int32_t
mndAllocSmaVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
SArray
*
mndBuildDnodesArray
(
SMnode
*
pMnode
);
int32_t
mndAddVnodeToVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
72f19ae5
...
...
@@ -1155,7 +1155,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
NULL
==
pDb
||
pVgroup
->
dbUid
==
pDb
->
uid
)
{
if
(
(
NULL
==
pDb
||
pVgroup
->
dbUid
==
pDb
->
uid
)
&&
!
pVgroup
->
isTsma
)
{
SVgroupInfo
vgInfo
=
{
0
};
vgInfo
.
vgId
=
pVgroup
->
vgId
;
vgInfo
.
hashBegin
=
pVgroup
->
hashBegin
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
72f19ae5
...
...
@@ -237,11 +237,14 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
taosArrayPush
(
tasks
,
&
pTask
);
pTask
->
nodeId
=
pStream
->
fixedSinkVgId
;
#if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif
pTask
->
epSet
=
mndGetVgroupEpset
(
pMnode
,
&
pStream
->
fixedSinkVg
);
// source
pTask
->
sourceType
=
TASK_SOURCE__MERGE
;
pTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
...
...
@@ -263,7 +266,8 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
// dispatch
pTask
->
dispatchType
=
TASK_DISPATCH__NONE
;
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_VND_TASK_DEPLOY
,
pVgroup
->
vgId
);
/*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_VND_TASK_DEPLOY
,
pStream
->
fixedSinkVg
.
vgId
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
72f19ae5
...
...
@@ -295,9 +295,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
}
static
void
*
mndBuildVDropSmaReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SSmaObj
*
pSma
,
int32_t
*
pContLen
)
{
SEncoder
encoder
=
{
0
};
int32_t
contLen
;
SName
name
=
{
0
};
SEncoder
encoder
=
{
0
};
int32_t
contLen
;
SName
name
=
{
0
};
tNameFromString
(
&
name
,
pSma
->
name
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
SVDropTSmaReq
req
=
{
0
};
...
...
@@ -354,6 +354,22 @@ static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj
return
0
;
}
static
int32_t
mndSetCreateSmaVgroupRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSmaVgroupCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pVgRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSmaRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
...
...
@@ -393,6 +409,34 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return
0
;
}
static
int32_t
mndSetCreateSmaVgroupRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
0
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
// todo add sma info here
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_CREATE_VNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndCreateSma
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SMCreateSmaReq
*
pCreate
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
SSmaObj
smaObj
=
{
0
};
memcpy
(
smaObj
.
name
,
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
...
...
@@ -448,9 +492,14 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj
.
version
=
1
;
streamObj
.
sql
=
pCreate
->
sql
;
streamObj
.
createdBy
=
STREAM_CREATED_BY__SMA
;
streamObj
.
fixedSinkVgId
=
smaObj
.
dstVgId
;
streamObj
.
smaId
=
smaObj
.
uid
;
/*streamObj.physicalPlan = "";*/
if
(
mndAllocSmaVgroup
(
pMnode
,
pDb
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
{
mError
(
"sma:%s, failed to create since %s"
,
smaObj
.
name
,
terrstr
());
return
-
1
;
}
smaObj
.
dstVgId
=
streamObj
.
fixedSinkVg
.
vgId
;
streamObj
.
fixedSinkVgId
=
smaObj
.
dstVgId
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_SMA
,
pReq
);
...
...
@@ -460,8 +509,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetDbInfo
(
pTrans
,
pDb
);
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupCommitLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
STREAM_TRIGGER_AT_ONCE
,
0
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
...
@@ -480,7 +532,6 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
if
(
pCreate
->
intervalUnit
<
0
)
return
-
1
;
if
(
pCreate
->
slidingUnit
<
0
)
return
-
1
;
if
(
pCreate
->
timezone
<
0
)
return
-
1
;
if
(
pCreate
->
dstVgId
<
0
)
return
-
1
;
if
(
pCreate
->
interval
<
0
)
return
-
1
;
if
(
pCreate
->
offset
<
0
)
return
-
1
;
if
(
pCreate
->
sliding
<
0
)
return
-
1
;
...
...
@@ -602,6 +653,24 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *
return
0
;
}
static
int32_t
mndSetDropSmaVgroupRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSmaVgroupCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pVgRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSmaRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
...
...
@@ -643,23 +712,59 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
return
0
;
}
static
int32_t
mndSetDropSmaVgroupRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
0
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildDropVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_DROP_VNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndDropSma
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_SMA
,
pReq
);
SVgObj
*
pVgroup
=
NULL
;
STrans
*
pTrans
=
NULL
;
pVgroup
=
mndAcquireVgroup
(
pMnode
,
pSma
->
dstVgId
);
if
(
pVgroup
==
NULL
)
goto
_OVER
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_SMA
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
mndTransSetDbInfo
(
pTrans
,
pDb
);
if
(
mndSetDropSmaRedoLogs
(
pMnode
,
pTrans
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaVgroupRedoLogs
(
pMnode
,
pTrans
,
pVgroup
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaCommitLogs
(
pMnode
,
pTrans
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaVgroupCommitLogs
(
pMnode
,
pTrans
,
pVgroup
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
pVgroup
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
_OVER:
mndTransDrop
(
pTrans
);
mndReleaseVgroup
(
pMnode
,
pVgroup
);
return
code
;
}
...
...
@@ -846,6 +951,9 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
n1
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pSma
->
dstVgId
,
false
);
numOfRows
++
;
sdbRelease
(
pSdb
,
pSma
);
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
72f19ae5
...
...
@@ -829,7 +829,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
sendRsp
=
true
;
}
}
else
{
if
(
pTrans
->
stage
==
TRN_STAGE_REDO_ACTION
&&
pTrans
->
failedTimes
>
0
)
{
if
(
pTrans
->
stage
==
TRN_STAGE_REDO_ACTION
&&
pTrans
->
failedTimes
>
6
)
{
if
(
code
==
0
)
code
=
TSDB_CODE_MND_TRANS_UNKNOW_ERROR
;
sendRsp
=
true
;
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
72f19ae5
...
...
@@ -80,6 +80,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
hashEnd
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
dbUid
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgroup
->
isTsma
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgroup
->
replica
,
_OVER
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
...
...
@@ -127,6 +128,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
hashEnd
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
dbUid
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pVgroup
->
isTsma
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pVgroup
->
replica
,
_OVER
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
...
...
@@ -167,6 +169,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pOld
->
hashBegin
=
pNew
->
hashBegin
;
pOld
->
hashEnd
=
pNew
->
hashEnd
;
pOld
->
replica
=
pNew
->
replica
;
pOld
->
isTsma
=
pNew
->
isTsma
;
memcpy
(
pOld
->
vnodeGid
,
pNew
->
vnodeGid
,
TSDB_MAX_REPLICA
*
sizeof
(
SVnodeGid
));
return
0
;
}
...
...
@@ -426,6 +429,25 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
return
0
;
}
int32_t
mndAllocSmaVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SArray
*
pArray
=
mndBuildDnodesArray
(
pMnode
);
if
(
pArray
==
NULL
)
return
-
1
;
pVgroup
->
vgId
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_VGROUP
);
pVgroup
->
isTsma
=
1
;
pVgroup
->
createdTime
=
taosGetTimestampMs
();
pVgroup
->
updateTime
=
pVgroup
->
createdTime
;
pVgroup
->
version
=
1
;
memcpy
(
pVgroup
->
dbName
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
pVgroup
->
dbUid
=
pDb
->
uid
;
pVgroup
->
replica
=
1
;
if
(
mndGetAvailableDnode
(
pMnode
,
pVgroup
,
pArray
)
!=
0
)
return
-
1
;
mInfo
(
"db:%s, sma vgId:%d is alloced"
,
pDb
->
name
,
pVgroup
->
vgId
);
return
0
;
}
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
)
{
int32_t
code
=
-
1
;
SArray
*
pArray
=
NULL
;
...
...
@@ -702,9 +724,12 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppendNULL
(
pColInfo
,
numOfRows
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppendNULL
(
pColInfo
,
numOfRows
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
isTsma
,
false
);
numOfRows
++
;
sdbRelease
(
pSdb
,
pVgroup
);
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
72f19ae5
...
...
@@ -465,6 +465,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SAggSupporter
aggSup
;
// aggregate supporter
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SArray
*
pChildren
;
}
SStreamFinalIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
...
...
@@ -581,6 +582,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock
*
pDelRes
;
SHashObj
*
pStDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
}
SStreamSessionAggOperatorInfo
;
typedef
struct
STimeSliceOperatorInfo
{
...
...
@@ -722,7 +724,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
STimeWindowAggSupp
*
pTwAggSupp
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
...
...
@@ -797,7 +799,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
int32_t
initCa
tch
Supporter
(
SCatchSupporter
*
pCatchSup
,
size_t
rowSize
,
const
char
*
pKey
,
int32_t
initCa
che
Supporter
(
SCatchSupporter
*
pCatchSup
,
size_t
rowSize
,
const
char
*
pKey
,
const
char
*
pDir
);
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
);
SResultRow
*
getNewResultRow_rv
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
72f19ae5
...
...
@@ -829,33 +829,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_UPDATERES
)
{
SSDataBlock
*
pDB
=
getDataFromCatch
(
pInfo
);
if
(
pDB
!=
NULL
)
{
return
pDB
;
}
else
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
}
if
(
pInfo
->
validBlockIndex
>=
total
)
{
doClearBufferedBlocks
(
pInfo
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
int32_t
current
=
pInfo
->
validBlockIndex
++
;
SSDataBlock
*
pBlock
=
taosArrayGetP
(
pInfo
->
pBlockLists
,
current
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_UPDATERES
;
}
else
{
int32_t
code
=
catchDatablock
(
pBlock
,
&
pInfo
->
childAggSup
,
pInfo
->
primaryTsIndex
,
0
);
if
(
code
!=
TDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
return
pBlock
;
int32_t
current
=
pInfo
->
validBlockIndex
++
;
return
taosArrayGetP
(
pInfo
->
pBlockLists
,
current
);
}
else
{
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_RES
)
{
blockDataDestroy
(
pInfo
->
pUpdateRes
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
72f19ae5
...
...
@@ -1067,7 +1067,8 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
}
static
void
doClearWindows
(
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pBinfo
,
SInterval
*
pIntrerval
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
SSDataBlock
*
pBlock
)
{
SInterval
*
pIntrerval
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
SSDataBlock
*
pBlock
,
SArray
*
pUpWins
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
int32_t
step
=
0
;
...
...
@@ -1079,6 +1080,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
step
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
i
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
doClearWindow
(
pSup
,
pBinfo
,
(
char
*
)
&
win
.
skey
,
sizeof
(
TKEY
),
pBlock
->
info
.
groupId
,
numOfOutput
);
if
(
pUpWins
)
{
taosArrayPush
(
pUpWins
,
&
win
);
}
}
}
...
...
@@ -1119,7 +1123,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
0
,
pOperator
->
numOfExprs
,
pBlock
);
pOperator
->
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
...
...
@@ -1154,6 +1158,15 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo
*
pInfo
=
(
SStreamFinalIntervalOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
if
(
pInfo
->
pChildren
)
{
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyIntervalOperatorInfo
(
pChildOp
->
info
,
numOfOutput
);
taosMemoryFreeClear
(
pChildOp
->
info
);
taosMemoryFreeClear
(
pChildOp
);
}
}
}
bool
allInvertible
(
SqlFunctionCtx
*
pFCtx
,
int32_t
numOfCols
)
{
...
...
@@ -1228,32 +1241,38 @@ _error:
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamFinalIntervalOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
pInfo
->
primaryTsIndex
=
primaryTsSlotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
int32_t
code
=
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
pResBlock
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
int32_t
numOfChild
=
8
;
// Todo(liuyao) get it from phy plan
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
SOperatorInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SSDataBlock
*
chRes
=
createOneDataBlock
(
pResBlock
,
false
);
SOperatorInfo
*
pChildOp
=
createIntervalOperatorInfo
(
NULL
,
pExprInfo
,
numOfCols
,
chRes
,
pInterval
,
primaryTsSlotId
,
pTwAggSupp
,
NULL
,
pTaskInfo
);
if
(
pChildOp
&&
chRes
)
{
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
continue
;
}
goto
_error
;
}
pOperator
->
name
=
"StreamFinalIntervalOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
...
...
@@ -1703,6 +1722,51 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
return
pUpdated
;
}
bool
isFinalInterval
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
return
pInfo
->
pChildren
!=
NULL
;
}
void
compactFunctions
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
if
(
fmIsWindowPseudoColumnFunc
(
pDestCtx
[
k
].
functionId
))
{
continue
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
functionNeedToExecute
(
&
pDestCtx
[
k
])
&&
pDestCtx
[
k
].
fpSet
.
combine
!=
NULL
)
{
code
=
pDestCtx
[
k
].
fpSet
.
combine
(
&
pDestCtx
[
k
],
&
pSourceCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s apply functions error, code: %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
pTaskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
}
}
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
STimeWindow
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pCurResult
,
0
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SIntervalAggOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
SResultRow
*
pChResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pChResult
,
0
,
pChInfo
->
binfo
.
pCtx
,
pChildOp
->
numOfExprs
,
pChInfo
->
binfo
.
rowCellInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
compactFunctions
(
pInfo
->
binfo
.
pCtx
,
pChInfo
->
binfo
.
pCtx
,
numOfOutput
,
pTaskInfo
);
}
}
}
static
SSDataBlock
*
doStreamFinalIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -1726,10 +1790,26 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
STimeWindow
));
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
pInfo
->
primaryTsIndex
,
pOperator
->
numOfExprs
,
pBlock
);
pInfo
->
primaryTsIndex
,
pOperator
->
numOfExprs
,
pBlock
,
pUpWins
);
if
(
isFinalInterval
(
pInfo
))
{
int32_t
childIndex
=
0
;
//Todo(liuyao) get child id from SSDataBlock
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SIntervalAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearWindows
(
&
pChildInfo
->
aggSup
,
&
pChildInfo
->
binfo
,
&
pChildInfo
->
interval
,
pChildInfo
->
primaryTsIndex
,
pChildOp
->
numOfExprs
,
pBlock
,
NULL
);
rebuildIntervalWindow
(
pInfo
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
}
taosArrayDestroy
(
pUpWins
);
continue
;
}
if
(
isFinalInterval
(
pInfo
))
{
int32_t
chIndex
=
1
;
//Todo(liuyao) get it from SSDataBlock
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
doStreamIntervalAgg
(
pChildOp
);
}
pUpdated
=
doHashInterval
(
pOperator
,
pBlock
,
0
);
}
...
...
@@ -1752,6 +1832,16 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
destroyStreamAggSupporter
(
&
pInfo
->
streamAggSup
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
pChildren
!=
NULL
)
{
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
destroyStreamSessionAggOperatorInfo
(
pChInfo
,
numOfOutput
);
taosMemoryFreeClear
(
pChild
);
taosMemoryFreeClear
(
pChInfo
);
}
}
}
int32_t
initBiasicInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SExprInfo
*
pExprInfo
,
...
...
@@ -1780,6 +1870,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pI
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SStreamSessionAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamSessionAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -1789,7 +1880,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
initResultSizeInfo
(
pOperator
,
4096
);
int32_t
code
=
initStreamAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
code
=
initStreamAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -1820,6 +1911,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
pInfo
->
pDelIterator
=
NULL
;
pInfo
->
pDelRes
=
createOneDataBlock
(
pResBlock
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pDelRes
,
64
);
pInfo
->
pChildren
=
NULL
;
pOperator
->
name
=
"StreamSessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
;
...
...
@@ -2068,24 +2160,6 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
return
size
-
startIndex
-
1
;
}
void
compactFunctions
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
if
(
fmIsWindowPseudoColumnFunc
(
pDestCtx
[
k
].
functionId
))
{
continue
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
functionNeedToExecute
(
&
pDestCtx
[
k
])
&&
pDestCtx
[
k
].
fpSet
.
combine
!=
NULL
)
{
code
=
pDestCtx
[
k
].
fpSet
.
combine
(
&
pDestCtx
[
k
],
&
pSourceCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s apply functions error, code: %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
pTaskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
}
}
void
compactTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
int32_t
startIndex
,
int32_t
num
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
)
{
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pInfo
->
streamAggSup
.
pResultRows
,
startIndex
);
...
...
@@ -2164,7 +2238,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
}
static
void
doClearSessionWindows
(
SStreamAggSupporter
*
pAggSup
,
SOptrBasicInfo
*
pBinfo
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
int64_t
gap
)
{
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
int32_t
step
=
0
;
...
...
@@ -2173,7 +2247,11 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
],
gap
,
&
winIndex
);
step
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pBlock
->
info
.
rows
,
i
,
gap
,
NULL
);
ASSERT
(
isInWindow
(
pCurWin
,
tsCols
[
i
],
gap
));
doClearWindowImpl
(
&
pCurWin
->
pos
,
pAggSup
->
pResultBuf
,
pBinfo
,
numOfOutput
);
if
(
result
)
{
taosArrayPush
(
result
,
pCurWin
);
}
}
}
...
...
@@ -2215,6 +2293,42 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
}
static
void
rebuildTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
setWindowOutputBuf
(
pParentWin
,
&
pCurResult
,
pInfo
->
binfo
.
pCtx
,
groupId
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
SArray
*
pChWins
=
pChInfo
->
streamAggSup
.
pResultRows
;
int32_t
chWinSize
=
taosArrayGetSize
(
pChWins
);
int32_t
index
=
binarySearch
(
pChWins
,
chWinSize
,
pParentWin
->
win
.
skey
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
for
(
int32_t
k
=
index
;
k
>
0
&&
k
<
chWinSize
;
k
++
)
{
SResultWindowInfo
*
pcw
=
taosArrayGet
(
pChWins
,
k
);
if
(
pParentWin
->
win
.
skey
<=
pcw
->
win
.
skey
&&
pcw
->
win
.
ekey
<=
pParentWin
->
win
.
ekey
)
{
SResultRow
*
pChResult
=
NULL
;
setWindowOutputBuf
(
pcw
,
&
pChResult
,
pChInfo
->
binfo
.
pCtx
,
groupId
,
numOfOutput
,
pChInfo
->
binfo
.
rowCellInfoOffset
,
&
pChInfo
->
streamAggSup
,
pTaskInfo
);
compactFunctions
(
pInfo
->
binfo
.
pCtx
,
pChInfo
->
binfo
.
pCtx
,
numOfOutput
,
pTaskInfo
);
continue
;
}
break
;
}
}
}
}
bool
isFinalSession
(
SStreamSessionAggOperatorInfo
*
pInfo
)
{
return
pInfo
->
pChildren
!=
NULL
;
}
static
SSDataBlock
*
doStreamSessionWindowAgg
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
...
...
@@ -2247,10 +2361,25 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pInfo
->
binfo
,
pBlock
,
0
,
pOperator
->
numOfExprs
,
pInfo
->
gap
);
pOperator
->
numOfExprs
,
pInfo
->
gap
,
pWins
);
if
(
isFinalSession
(
pInfo
))
{
int32_t
childIndex
=
0
;
//Todo(liuyao) get child id from SSDataBlock
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildInfo
->
binfo
,
pBlock
,
0
,
pChildOp
->
numOfExprs
,
pChildInfo
->
gap
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
}
taosArrayDestroy
(
pWins
);
continue
;
}
if
(
isFinalSession
(
pInfo
))
{
int32_t
childIndex
=
0
;
//Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
doStreamSessionWindowAggImpl
(
pOperator
,
pBlock
,
NULL
,
NULL
);
}
doStreamSessionWindowAggImpl
(
pOperator
,
pBlock
,
pStUpdated
,
pInfo
->
pStDeleted
);
}
...
...
@@ -2271,3 +2400,39 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
pInfo
->
streamAggSup
.
pResultBuf
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SStreamSessionAggOperatorInfo
*
pInfo
=
NULL
;
SOperatorInfo
*
pOperator
=
createStreamSessionAggOperatorInfo
(
downstream
,
pExprInfo
,
numOfCols
,
pResBlock
,
gap
,
tsSlotId
,
pTwAggSupp
,
pTaskInfo
);
if
(
pOperator
==
NULL
)
{
goto
_error
;
}
pOperator
->
name
=
"StreamFinalSessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW
;
int32_t
numOfChild
=
1
;
//Todo(liuyao) get it from phy plan
pInfo
=
pOperator
->
info
;
pInfo
->
pChildren
=
taosArrayInit
(
8
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChild
=
createStreamSessionAggOperatorInfo
(
NULL
,
pExprInfo
,
numOfCols
,
NULL
,
gap
,
tsSlotId
,
pTwAggSupp
,
pTaskInfo
);
if
(
pChild
==
NULL
)
{
goto
_error
;
}
taosArrayPush
(
pInfo
->
pChildren
,
&
pChild
);
}
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroyStreamSessionAggOperatorInfo
(
pInfo
,
numOfCols
);
}
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录