Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
09fbb8eb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
09fbb8eb
编写于
12月 27, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'main' into FIX/TD-21535-main
上级
a7bfeb5f
56cf8863
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
41 addition
and
43 deletion
+41
-43
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+2
-2
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+6
-3
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+7
-27
source/libs/sync/inc/syncPipeline.h
source/libs/sync/inc/syncPipeline.h
+2
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+8
-1
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+13
-10
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/util/taoserror.h
浏览文件 @
09fbb8eb
...
@@ -520,6 +520,7 @@ int32_t* taosGetErrno();
...
@@ -520,6 +520,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) //
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
// tq
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
09fbb8eb
...
@@ -391,9 +391,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
...
@@ -391,9 +391,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
vGTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
", weak:%d, code:%d, state:%d %s, type:%s
code:0x%x
"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
pMsg
->
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
pMsg
->
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
)
,
pMsg
->
code
);
return
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
pMsg
);
return
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
pMsg
);
}
}
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
09fbb8eb
...
@@ -471,17 +471,20 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
...
@@ -471,17 +471,20 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
}
}
int32_t
ctgInitTask
(
SCtgJob
*
pJob
,
CTG_TASK_TYPE
type
,
void
*
param
,
int32_t
*
taskId
)
{
int32_t
ctgInitTask
(
SCtgJob
*
pJob
,
CTG_TASK_TYPE
type
,
void
*
param
,
int32_t
*
taskId
)
{
int32_t
code
=
0
;
int32_t
tid
=
atomic_fetch_add_32
(
&
pJob
->
taskIdx
,
1
);
int32_t
tid
=
atomic_fetch_add_32
(
&
pJob
->
taskIdx
,
1
);
CTG_LOCK
(
CTG_WRITE
,
&
pJob
->
taskLock
);
CTG_LOCK
(
CTG_WRITE
,
&
pJob
->
taskLock
);
CTG_ERR_RET
((
*
gCtgAsyncFps
[
type
].
initFp
)(
pJob
,
tid
,
param
));
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
type
].
initFp
)(
pJob
,
tid
,
param
));
CTG_UNLOCK
(
CTG_WRITE
,
&
pJob
->
taskLock
);
if
(
taskId
)
{
if
(
taskId
)
{
*
taskId
=
tid
;
*
taskId
=
tid
;
}
}
return
TSDB_CODE_SUCCESS
;
_return:
CTG_UNLOCK
(
CTG_WRITE
,
&
pJob
->
taskLock
);
return
code
;
}
}
int32_t
ctgInitJob
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgJob
**
job
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
int32_t
ctgInitJob
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgJob
**
job
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
09fbb8eb
...
@@ -2500,6 +2500,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
...
@@ -2500,6 +2500,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
if
(
NULL
==
pCache
->
pMeta
)
{
if
(
NULL
==
pCache
->
pMeta
)
{
CTG_UNLOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
ctgDebug
(
"tb %s meta not in cache, dbFName:%s"
,
pName
->
tname
,
dbFName
);
ctgDebug
(
"tb %s meta not in cache, dbFName:%s"
,
pName
->
tname
,
dbFName
);
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArraySetSize
(
ctx
->
pResList
,
taosArrayGetSize
(
ctx
->
pResList
)
+
1
);
taosArraySetSize
(
ctx
->
pResList
,
taosArrayGetSize
(
ctx
->
pResList
)
+
1
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
09fbb8eb
...
@@ -907,7 +907,7 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
...
@@ -907,7 +907,7 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
}
}
bool
isOverdue
(
TSKEY
ekey
,
STimeWindowAggSupp
*
pTwSup
)
{
bool
isOverdue
(
TSKEY
ekey
,
STimeWindowAggSupp
*
pTwSup
)
{
ASSERT
(
pTwSup
->
maxTs
==
INT64_MIN
||
pTwSup
->
maxTs
>
0
);
ASSERT
S
(
pTwSup
->
maxTs
==
INT64_MIN
||
pTwSup
->
maxTs
>
0
,
"maxts should greater than 0"
);
return
pTwSup
->
maxTs
!=
INT64_MIN
&&
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
;
return
pTwSup
->
maxTs
!=
INT64_MIN
&&
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
;
}
}
...
@@ -1396,7 +1396,6 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
...
@@ -1396,7 +1396,6 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
while
((
pIte
=
tSimpleHashIterate
(
pHashMap
,
pIte
,
&
iter
))
!=
NULL
)
{
while
((
pIte
=
tSimpleHashIterate
(
pHashMap
,
pIte
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
int32_t
code
=
saveWinResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
int32_t
code
=
saveWinResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
...
@@ -1547,7 +1546,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
...
@@ -1547,7 +1546,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pChildren
,
i
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pChildren
,
i
);
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
ASSERT
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
);
ASSERT
S
(
pChInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
,
"children trigger type should be at once"
);
pChInfo
->
twAggSup
.
maxTs
=
TMAX
(
pChInfo
->
twAggSup
.
maxTs
,
maxTs
);
pChInfo
->
twAggSup
.
maxTs
=
TMAX
(
pChInfo
->
twAggSup
.
maxTs
,
maxTs
);
closeStreamIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
closeStreamIntervalWindow
(
pChInfo
->
aggSup
.
pResultRowHashTable
,
&
pChInfo
->
twAggSup
,
&
pChInfo
->
interval
,
NULL
,
NULL
,
NULL
,
pOperator
);
NULL
,
pOperator
);
...
@@ -1767,8 +1766,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
...
@@ -1767,8 +1766,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
.
maxTs
=
INT64_MIN
,
.
maxTs
=
INT64_MIN
,
};
};
ASSERT
(
as
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
...
@@ -2252,7 +2249,6 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
...
@@ -2252,7 +2249,6 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
return
;
return
;
}
}
blockDataEnsureCapacity
(
pBlock
,
size
-
(
*
pIndex
));
blockDataEnsureCapacity
(
pBlock
,
size
-
(
*
pIndex
));
ASSERT
(
3
<=
taosArrayGetSize
(
pBlock
->
pDataBlock
));
SColumnInfoData
*
pStartTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pStartTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTs
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pGroupId
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pGroupId
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
...
@@ -2359,7 +2355,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
...
@@ -2359,7 +2355,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
SResultRow
*
pResult
=
NULL
;
SResultRow
*
pResult
=
NULL
;
int32_t
forwardRows
=
0
;
int32_t
forwardRows
=
0
;
ASSERT
(
pSDataBlock
->
pDataBlock
!=
NULL
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
...
@@ -2482,7 +2477,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2482,7 +2477,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
return
pInfo
->
pPullDataRes
;
return
pInfo
->
pPullDataRes
;
}
}
...
@@ -2543,7 +2537,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2543,7 +2537,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo
->
numOfDatapack
++
;
pInfo
->
numOfDatapack
++
;
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval final recv"
:
"interval semi recv"
);
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval final recv"
:
"interval semi recv"
);
ASSERT
(
pBlock
->
info
.
type
!=
STREAM_INVERT
);
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
...
@@ -2633,7 +2626,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2633,7 +2626,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
return
pInfo
->
pPullDataRes
;
return
pInfo
->
pPullDataRes
;
}
}
...
@@ -2688,7 +2680,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2688,7 +2680,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.
deleteMarkSaved
=
0
,
.
deleteMarkSaved
=
0
,
.
calTriggerSaved
=
0
,
.
calTriggerSaved
=
0
,
};
};
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
ASSERT
S
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
...
@@ -2713,7 +2705,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2713,7 +2705,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initStreamFunciton
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
);
initStreamFunciton
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
);
ASSERT
(
numOfCols
>
0
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
pInfo
->
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
pInfo
->
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
...
@@ -2724,6 +2715,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2724,6 +2715,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo
->
pChildren
=
NULL
;
pInfo
->
pChildren
=
NULL
;
if
(
numOfChild
>
0
)
{
if
(
numOfChild
>
0
)
{
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
void
*
));
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
void
*
));
if
(
!
pInfo
->
pChildren
)
{
goto
_error
;
}
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
if
(
pChildOp
)
{
if
(
pChildOp
)
{
...
@@ -2746,7 +2740,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2746,7 +2740,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// semi interval operator does not catch result
// semi interval operator does not catch result
pInfo
->
isFinal
=
false
;
pInfo
->
isFinal
=
false
;
pOperator
->
name
=
"StreamSemiIntervalOperator"
;
pOperator
->
name
=
"StreamSemiIntervalOperator"
;
ASSERT
(
pInfo
->
aggSup
.
currentPageId
==
-
1
);
}
}
if
(
!
IS_FINAL_OP
(
pInfo
)
||
numOfChild
==
0
)
{
if
(
!
IS_FINAL_OP
(
pInfo
)
||
numOfChild
==
0
)
{
...
@@ -3162,15 +3155,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
...
@@ -3162,15 +3155,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
}
}
void
deleteWindow
(
SArray
*
pWinInfos
,
int32_t
index
,
FDelete
fp
)
{
ASSERT
(
index
>=
0
&&
index
<
taosArrayGetSize
(
pWinInfos
));
if
(
fp
)
{
void
*
ptr
=
taosArrayGet
(
pWinInfos
,
index
);
fp
(
ptr
);
}
taosArrayRemove
(
pWinInfos
,
index
);
}
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SArray
*
result
)
{
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
SArray
*
result
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
...
@@ -3218,7 +3202,6 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
...
@@ -3218,7 +3202,6 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
int32_t
iter
=
0
;
int32_t
iter
=
0
;
while
((
pIte
=
tSimpleHashIterate
(
pStUpdated
,
pIte
,
&
iter
))
!=
NULL
)
{
while
((
pIte
=
tSimpleHashIterate
(
pStUpdated
,
pIte
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
ASSERT
(
keyLen
==
sizeof
(
SSessionKey
));
taosArrayPush
(
pUpdated
,
key
);
taosArrayPush
(
pUpdated
,
key
);
}
}
taosArraySort
(
pUpdated
,
sessionKeyCompareAsc
);
taosArraySort
(
pUpdated
,
sessionKeyCompareAsc
);
...
@@ -3279,7 +3262,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
...
@@ -3279,7 +3262,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SSessionKey
*
pWinKey
=
taosArrayGet
(
pWinArray
,
i
);
SSessionKey
*
pWinKey
=
taosArrayGet
(
pWinArray
,
i
);
...
@@ -3380,7 +3362,6 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
...
@@ -3380,7 +3362,6 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
void
initGroupResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
)
{
void
initGroupResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
)
{
pGroupResInfo
->
pRows
=
pArrayList
;
pGroupResInfo
->
pRows
=
pArrayList
;
pGroupResInfo
->
index
=
0
;
pGroupResInfo
->
index
=
0
;
ASSERT
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
}
void
doBuildSessionResult
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SGroupResInfo
*
pGroupResInfo
,
void
doBuildSessionResult
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SGroupResInfo
*
pGroupResInfo
,
...
@@ -4811,7 +4792,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -4811,7 +4792,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
ASSERT
(
numOfCols
>
0
);
SSDataBlock
*
pResBlock
=
createDataBlockFromDescNode
(
pPhyNode
->
pOutputDataBlockDesc
);
SSDataBlock
*
pResBlock
=
createDataBlockFromDescNode
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{
SInterval
interval
=
{
...
@@ -4831,7 +4811,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -4831,7 +4811,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.
deleteMark
=
getDeleteMark
(
pIntervalPhyNode
),
.
deleteMark
=
getDeleteMark
(
pIntervalPhyNode
),
};
};
ASSERT
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
ASSERT
S
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
,
"trigger type should not be max delay"
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
interval
=
interval
;
pInfo
->
interval
=
interval
;
...
...
source/libs/sync/inc/syncPipeline.h
浏览文件 @
09fbb8eb
...
@@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
...
@@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
int32_t
syncLogBufferValidate
(
SSyncLogBuffer
*
pBuf
);
int32_t
syncLogBufferValidate
(
SSyncLogBuffer
*
pBuf
);
int32_t
syncLogBufferRollback
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SyncIndex
toIndex
);
int32_t
syncLogBufferRollback
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SyncIndex
toIndex
);
int32_t
syncLogFsmExecute
(
SSyncNode
*
pNode
,
SSyncFSM
*
pFsm
,
ESyncState
role
,
SyncTerm
term
,
SSyncRaftEntry
*
pEntry
,
int32_t
applyCode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
09fbb8eb
...
@@ -2393,7 +2393,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
...
@@ -2393,7 +2393,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer
// append to log buffer
if
(
syncLogBufferAppend
(
ths
->
pLogBuf
,
ths
,
pEntry
)
<
0
)
{
if
(
syncLogBufferAppend
(
ths
->
pLogBuf
,
ths
,
pEntry
)
<
0
)
{
sError
(
"vgId:%d, failed to enqueue sync log buffer. index:%"
PRId64
""
,
ths
->
vgId
,
pEntry
->
index
);
sError
(
"vgId:%d, failed to enqueue sync log buffer, index:%"
PRId64
,
ths
->
vgId
,
pEntry
->
index
);
terrno
=
TSDB_CODE_SYN_BUFFER_FULL
;
(
void
)
syncLogFsmExecute
(
ths
,
ths
->
pFsm
,
ths
->
state
,
ths
->
pRaftStore
->
currentTerm
,
pEntry
,
TSDB_CODE_SYN_BUFFER_FULL
);
syncEntryDestroy
(
pEntry
);
return
-
1
;
return
-
1
;
}
}
...
@@ -2697,6 +2701,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
...
@@ -2697,6 +2701,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
}
}
int32_t
code
=
syncNodeAppend
(
ths
,
pEntry
);
int32_t
code
=
syncNodeAppend
(
ths
,
pEntry
);
if
(
code
<
0
)
{
sNError
(
ths
,
"failed to append blocking msg"
);
}
return
code
;
return
code
;
}
else
{
}
else
{
syncEntryDestroy
(
pEntry
);
syncEntryDestroy
(
pEntry
);
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
09fbb8eb
...
@@ -26,6 +26,11 @@
...
@@ -26,6 +26,11 @@
#include "syncSnapshot.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncUtil.h"
static
bool
syncIsMsgBlock
(
tmsg_t
type
)
{
return
(
type
==
TDMT_VND_CREATE_TABLE
)
||
(
type
==
TDMT_VND_ALTER_TABLE
)
||
(
type
==
TDMT_VND_DROP_TABLE
)
||
(
type
==
TDMT_VND_UPDATE_TAG_VAL
)
||
(
type
==
TDMT_VND_ALTER_CONFIRM
);
}
int64_t
syncLogBufferGetEndIndex
(
SSyncLogBuffer
*
pBuf
)
{
int64_t
syncLogBufferGetEndIndex
(
SSyncLogBuffer
*
pBuf
)
{
taosThreadMutexLock
(
&
pBuf
->
mutex
);
taosThreadMutexLock
(
&
pBuf
->
mutex
);
int64_t
index
=
pBuf
->
endIndex
;
int64_t
index
=
pBuf
->
endIndex
;
...
@@ -441,26 +446,25 @@ _out:
...
@@ -441,26 +446,25 @@ _out:
return
matchIndex
;
return
matchIndex
;
}
}
int32_t
syncLogFsmExecute
(
SSyncNode
*
pNode
,
SSyncFSM
*
pFsm
,
ESyncState
role
,
SyncTerm
term
,
SSyncRaftEntry
*
pEntry
)
{
int32_t
syncLogFsmExecute
(
SSyncNode
*
pNode
,
SSyncFSM
*
pFsm
,
ESyncState
role
,
SyncTerm
term
,
SSyncRaftEntry
*
pEntry
,
ASSERTS
(
pFsm
->
FpCommitCb
!=
NULL
,
"No commit cb registered for the FSM"
);
int32_t
applyCode
)
{
if
((
pNode
->
replicaNum
==
1
)
&&
pNode
->
restoreFinish
&&
pNode
->
vgId
!=
1
)
{
if
((
pNode
->
replicaNum
==
1
)
&&
pNode
->
restoreFinish
&&
pNode
->
vgId
!=
1
)
{
return
0
;
return
0
;
}
}
if
(
pNode
->
vgId
!=
1
&&
vnode
IsMsgBlock
(
pEntry
->
originalRpcType
))
{
if
(
pNode
->
vgId
!=
1
&&
sync
IsMsgBlock
(
pEntry
->
originalRpcType
))
{
sTrace
(
"vgId:%d, blocking msg ready to execute
. index:%"
PRId64
", term: %"
PRId64
", type: %s"
,
pNode
->
vgId
,
sTrace
(
"vgId:%d, blocking msg ready to execute
, index:%"
PRId64
", term:%"
PRId64
", type:%s code:0x%x"
,
p
Entry
->
index
,
pEntry
->
term
,
TMSG_INFO
(
pEntry
->
originalRpcType
)
);
p
Node
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
applyCode
);
}
}
SRpcMsg
rpcMsg
=
{
0
};
SRpcMsg
rpcMsg
=
{
.
code
=
applyCode
};
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
SFsmCbMeta
cbMeta
=
{
0
};
SFsmCbMeta
cbMeta
=
{
0
};
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
pNode
,
pEntry
->
index
);
cbMeta
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
pNode
,
pEntry
->
index
);
cbMeta
.
isWeak
=
pEntry
->
isWeak
;
cbMeta
.
isWeak
=
pEntry
->
isWeak
;
cbMeta
.
code
=
0
;
cbMeta
.
code
=
applyCode
;
cbMeta
.
state
=
role
;
cbMeta
.
state
=
role
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
term
=
pEntry
->
term
;
...
@@ -469,7 +473,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
...
@@ -469,7 +473,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
(
void
)
syncRespMgrGetAndDel
(
pNode
->
pSyncRespMgr
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
(
void
)
syncRespMgrGetAndDel
(
pNode
->
pSyncRespMgr
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
int32_t
code
=
pFsm
->
FpCommitCb
(
pFsm
,
&
rpcMsg
,
&
cbMeta
);
int32_t
code
=
pFsm
->
FpCommitCb
(
pFsm
,
&
rpcMsg
,
&
cbMeta
);
ASSERT
(
rpcMsg
.
pCont
==
NULL
);
return
code
;
return
code
;
}
}
...
@@ -520,7 +523,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
...
@@ -520,7 +523,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry
->
term
,
TMSG_INFO
(
pEntry
->
originalRpcType
));
pEntry
->
term
,
TMSG_INFO
(
pEntry
->
originalRpcType
));
}
}
if
(
syncLogFsmExecute
(
pNode
,
pFsm
,
role
,
term
,
pEntry
)
!=
0
)
{
if
(
syncLogFsmExecute
(
pNode
,
pFsm
,
role
,
term
,
pEntry
,
0
)
!=
0
)
{
sError
(
"vgId:%d, failed to execute sync log entry. index:%"
PRId64
", term:%"
PRId64
sError
(
"vgId:%d, failed to execute sync log entry. index:%"
PRId64
", term:%"
PRId64
", role: %d, current term: %"
PRId64
,
", role: %d, current term: %"
PRId64
,
vgId
,
pEntry
->
index
,
pEntry
->
term
,
role
,
term
);
vgId
,
pEntry
->
index
,
pEntry
->
term
,
role
,
term
);
...
...
source/util/src/terror.c
浏览文件 @
09fbb8eb
...
@@ -407,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st
...
@@ -407,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_BATCH_ERROR
,
"Sync batch error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_BATCH_ERROR
,
"Sync batch error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_RESTORING
,
"Sync is restoring"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_RESTORING
,
"Sync is restoring"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG
,
"Sync invalid snapshot msg"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG
,
"Sync invalid snapshot msg"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_BUFFER_FULL
,
"Sync buffer is full"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
//tq
//tq
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录