Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
89425dc4
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
89425dc4
编写于
1月 03, 2023
作者:
S
Shengliang Guan
提交者:
GitHub
1月 03, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19333 from taosdata/enh/TD-21207
enh: remove assert from mnode
上级
3d30466c
9df158e1
变更
11
显示空白变更内容
内联
并排
Showing
11 changed file
with
85 addition
and
64 deletion
+85
-64
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+4
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+1
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+0
-1
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+1
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+11
-7
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+12
-4
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+47
-43
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+5
-3
source/util/src/terror.c
source/util/src/terror.c
+2
-1
未找到文件。
include/util/taoserror.h
浏览文件 @
89425dc4
...
...
@@ -345,6 +345,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED)
#define TSDB_CODE_MND_INVALID_SUB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03EE)
#define TSDB_CODE_MND_IN_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x03EF)
// mnode-stream
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
89425dc4
...
...
@@ -157,8 +157,10 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
smPutNodeMsgToWriteQueue
(
pMgmt
,
pMsg
);
break
;
default:
ASSERTS
(
0
,
"msg:%p failed to put into snode queue since %s, type:%s qtype:%d"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
),
qtype
);
terrno
=
TSDB_CODE_INVALID_PARA
;
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
return
-
1
;
}
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
89425dc4
...
...
@@ -112,7 +112,6 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8
(
pRaw
,
dataPos
,
pDb
->
cfg
.
hashMethod
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pDb
->
cfg
.
numOfRetensions
,
_OVER
)
for
(
int32_t
i
=
0
;
i
<
pDb
->
cfg
.
numOfRetensions
;
++
i
)
{
ASSERT
(
taosArrayGetSize
(
pDb
->
cfg
.
pRetensions
)
==
pDb
->
cfg
.
numOfRetensions
);
SRetention
*
pRetension
=
taosArrayGet
(
pDb
->
cfg
.
pRetensions
,
i
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pRetension
->
freq
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pRetension
->
keep
,
_OVER
)
...
...
@@ -364,6 +363,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if
(
pCfg
->
hashPrefix
<
TSDB_MIN_HASH_PREFIX
||
pCfg
->
hashPrefix
>
TSDB_MAX_HASH_PREFIX
)
return
-
1
;
if
(
pCfg
->
hashSuffix
<
TSDB_MIN_HASH_SUFFIX
||
pCfg
->
hashSuffix
>
TSDB_MAX_HASH_SUFFIX
)
return
-
1
;
if
(
pCfg
->
tsdbPageSize
<
TSDB_MIN_TSDB_PAGESIZE
||
pCfg
->
tsdbPageSize
>
TSDB_MAX_TSDB_PAGESIZE
)
return
-
1
;
if
(
taosArrayGetSize
(
pCfg
->
pRetensions
)
!=
pCfg
->
numOfRetensions
)
return
-
1
;
terrno
=
0
;
return
terrno
;
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
89425dc4
...
...
@@ -489,7 +489,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pConsumerEp
);
cnt
++
;
}
ASSERT
(
cnt
==
sz
)
;
if
(
cnt
!=
sz
)
return
-
1
;
tlen
+=
taosEncodeArray
(
buf
,
pSub
->
unassignedVgs
,
(
FEncode
)
tEncodeSMqVgEp
);
tlen
+=
taosEncodeString
(
buf
,
pSub
->
dbName
);
return
tlen
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
89425dc4
...
...
@@ -763,7 +763,6 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
mInfo
(
"vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d"
,
readyMnodes
,
updatingMnodes
);
return
;
}
// ASSERT(0);
if
(
cfg
.
myIndex
==
-
1
)
{
#if 1
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
89425dc4
...
...
@@ -111,7 +111,7 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_PRIVILEGES
,
len
)
==
0
)
{
type
=
TSDB_MGMT_TABLE_PRIVILEGES
;
}
else
{
// ASSERT(0
);
mError
(
"invalid show name:%s len:%d"
,
name
,
len
);
}
return
type
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
89425dc4
...
...
@@ -488,7 +488,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy
(
smaObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
smaObj
.
createdTime
=
taosGetTimestampMs
();
smaObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
ASSERT
(
smaObj
.
uid
!=
0
);
char
resultTbName
[
TSDB_TABLE_FNAME_LEN
+
16
]
=
{
0
};
snprintf
(
resultTbName
,
TSDB_TABLE_FNAME_LEN
+
16
,
"%s_td_tsma_rst_tb"
,
pCreate
->
name
);
memcpy
(
smaObj
.
dstTbName
,
resultTbName
,
TSDB_TABLE_FNAME_LEN
);
...
...
@@ -558,13 +558,15 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
streamObj
.
ast
,
&
pAst
)
<
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_SMA_OPTION
;
mError
(
"sma:%s, failed to create since parse ast error"
,
smaObj
.
name
);
return
-
1
;
}
// extract output schema from ast
if
(
qExtractResultSchema
(
pAst
,
(
int32_t
*
)
&
streamObj
.
outputSchema
.
nCols
,
&
streamObj
.
outputSchema
.
pSchema
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_SMA_OPTION
;
mError
(
"sma:%s, failed to create since extract result schema error"
,
smaObj
.
name
);
return
-
1
;
}
...
...
@@ -579,15 +581,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
};
if
(
qCreateQueryPlan
(
&
cxt
,
&
pPlan
,
NULL
)
<
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_SMA_OPTION
;
mError
(
"sma:%s, failed to create since create query plan error"
,
smaObj
.
name
);
return
-
1
;
}
// save physcial plan
if
(
nodesNodeToString
((
SNode
*
)
pPlan
,
false
,
&
streamObj
.
physicalPlan
,
NULL
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_SMA_OPTION
;
mError
(
"sma:%s, failed to create since save physcial plan error"
,
smaObj
.
name
);
return
-
1
;
}
if
(
pAst
!=
NULL
)
nodesDestroyNode
(
pAst
);
nodesDestroyNode
((
SNode
*
)
pPlan
);
...
...
@@ -826,14 +831,13 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if
(
mndDropStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
mError
(
"stream:%s, failed to drop task since %s"
,
pStream
->
name
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
ASSERT
(
0
);
goto
_OVER
;
}
// drop stream
if
(
mndPersistDropStreamLog
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
mError
(
"stream:%s, failed to drop log since %s"
,
pStream
->
name
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
ASSERT
(
0
);
goto
_OVER
;
}
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
89425dc4
...
...
@@ -1177,7 +1177,9 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pTopic
->
ast
,
&
pAst
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC
;
mError
(
"topic:%s, create ast error"
,
pTopic
->
name
);
sdbRelease
(
pSdb
,
pTopic
);
return
-
1
;
}
...
...
@@ -1222,7 +1224,9 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pStream
->
ast
,
&
pAst
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_STREAM_OPTION
;
mError
(
"stream:%s, create ast error"
,
pStream
->
name
);
sdbRelease
(
pSdb
,
pStream
);
return
-
1
;
}
...
...
@@ -2094,7 +2098,9 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pTopic
->
ast
,
&
pAst
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
mError
(
"topic:%s, create ast error"
,
pTopic
->
name
);
sdbRelease
(
pSdb
,
pTopic
);
return
-
1
;
}
...
...
@@ -2141,7 +2147,9 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pStream
->
ast
,
&
pAst
)
!=
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_MND_INVALID_STREAM_OPTION
;
mError
(
"stream:%s, create ast error"
,
pStream
->
name
);
sdbRelease
(
pSdb
,
pStream
);
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
89425dc4
...
...
@@ -96,18 +96,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub
->
subType
=
pTopic
->
subType
;
pSub
->
withMeta
=
pTopic
->
withMeta
;
ASSERT
(
pSub
->
unassignedVgs
->
size
==
0
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
0
);
if
(
mndSchedInitSubEp
(
pMnode
,
pTopic
,
pSub
)
<
0
)
{
tDeleteSubscribeObj
(
pSub
);
taosMemoryFree
(
pSub
);
return
NULL
;
}
ASSERT
(
pSub
->
unassignedVgs
->
size
>
0
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
0
);
return
pSub
;
}
...
...
@@ -144,7 +138,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static
int32_t
mndPersistSubChangeVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqSubscribeObj
*
pSub
,
const
SMqRebOutputVg
*
pRebVg
)
{
ASSERT
(
pRebVg
->
oldConsumerId
!=
pRebVg
->
newConsumerId
);
if
(
pRebVg
->
oldConsumerId
==
pRebVg
->
newConsumerId
)
{
terrno
=
TSDB_CODE_MND_INVALID_SUB_OPTION
;
return
-
1
;
}
void
*
buf
;
int32_t
tlen
;
...
...
@@ -155,8 +152,8 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
int32_t
vgId
=
pRebVg
->
pVgEp
->
vgId
;
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
if
(
pVgObj
==
NULL
)
{
ASSERT
(
0
);
taosMemoryFree
(
buf
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -207,8 +204,8 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static
int32_t
mndDoRebalance
(
SMnode
*
pMnode
,
const
SMqRebInputObj
*
pInput
,
SMqRebOutputObj
*
pOutput
)
{
int32_t
totalVgNum
=
pOutput
->
pSub
->
vgNum
;
mInfo
(
"
mq rebalance: subscription: %s, vgNum: %d"
,
pOutput
->
pSub
->
key
,
pOutput
->
pSub
->
vgNum
);
const
char
*
sub
=
pOutput
->
pSub
->
key
;
mInfo
(
"
sub:%s, mq rebalance vgNum:%d"
,
sub
,
pOutput
->
pSub
->
vgNum
);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -218,11 +215,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t
actualRemoved
=
0
;
for
(
int32_t
i
=
0
;
i
<
removedNum
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pInput
->
pRebInfo
->
removedConsumers
,
i
);
ASSERT
(
consumerId
>
0
);
SMqConsumerEp
*
pConsumerEp
=
taosHashGet
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
ASSERT
(
pConsumerEp
);
if
(
pConsumerEp
)
{
ASSERT
(
consumerId
==
pConsumerEp
->
consumerId
);
actualRemoved
++
;
int32_t
consumerVgNum
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
consumerVgNum
;
j
++
)
{
...
...
@@ -233,7 +229,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"
mq rebalance: remove vgId:%d from consumer:%"
PRId64
,
pVgEp
->
vgId
,
consumerId
);
mInfo
(
"
sub:%s, mq rebalance remove vgId:%d from consumer:%"
PRId64
,
sub
,
pVgEp
->
vgId
,
consumerId
);
}
taosArrayDestroy
(
pConsumerEp
->
vgs
);
taosHashRemove
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
...
...
@@ -241,7 +237,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush
(
pOutput
->
removedConsumers
,
&
consumerId
);
}
}
ASSERT
(
removedNum
==
actualRemoved
);
if
(
removedNum
!=
actualRemoved
)
{
mError
(
"sub:%s, mq rebalance removedNum:%d not matched with actual:%d"
,
sub
,
removedNum
,
actualRemoved
);
}
// if previously no consumer, there are vgs not assigned
{
...
...
@@ -254,7 +253,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
rebOutput
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"
mq rebalance: remove vgId:%d from unassigned"
,
pVgEp
->
vgId
);
mInfo
(
"
sub:%s, mq rebalance remove vgId:%d from unassigned"
,
sub
,
pVgEp
->
vgId
);
}
}
...
...
@@ -268,8 +267,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt
=
totalVgNum
/
afterRebConsumerNum
;
imbConsumerNum
=
totalVgNum
%
afterRebConsumerNum
;
}
mInfo
(
"
mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg"
,
afterRebConsumerNum
,
minVgCnt
,
imbConsumerNum
);
mInfo
(
"
sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg"
,
sub
,
afterRebConsumerNum
,
minVgCnt
,
imbConsumerNum
);
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t
imbCnt
=
0
;
...
...
@@ -278,7 +277,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter
=
taosHashIterate
(
pOutput
->
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
ASSERT
(
pConsumerEp
->
consumerId
>
0
);
int32_t
consumerVgNum
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
// all old consumers still existing are touched
// TODO optimize: touch only consumer whose vgs changed
...
...
@@ -298,7 +297,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"
mq rebalance: remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
pVgEp
->
vgId
,
mInfo
(
"
sub:%s, mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
...
...
@@ -313,7 +312,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"
mq rebalance: remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
pVgEp
->
vgId
,
mInfo
(
"
sub:%s, mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
...
...
@@ -325,13 +324,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t
consumerNum
=
taosArrayGetSize
(
pInput
->
pRebInfo
->
newConsumers
);
for
(
int32_t
i
=
0
;
i
<
consumerNum
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pInput
->
pRebInfo
->
newConsumers
,
i
);
ASSERT
(
consumerId
>
0
);
SMqConsumerEp
newConsumerEp
;
newConsumerEp
.
consumerId
=
consumerId
;
newConsumerEp
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosHashPut
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
),
&
newConsumerEp
,
sizeof
(
SMqConsumerEp
));
taosArrayPush
(
pOutput
->
newConsumers
,
&
consumerId
);
mInfo
(
"
mq rebalance: add new consumer:%"
PRId64
,
consumerId
);
mInfo
(
"
sub:%s, mq rebalance add new consumer:%"
PRId64
,
sub
,
consumerId
);
}
}
...
...
@@ -344,13 +343,16 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter
=
taosHashIterate
(
pOutput
->
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
ASSERT
(
pConsumerEp
->
consumerId
>
0
);
// push until equal minVg
while
(
taosArrayGetSize
(
pConsumerEp
->
vgs
)
<
minVgCnt
)
{
// iter hash and find one vg
pRemovedIter
=
taosHashIterate
(
pHash
,
pRemovedIter
);
ASSERT
(
pRemovedIter
);
if
(
pRemovedIter
==
NULL
)
{
mError
(
"sub:%s, removed iter is null"
,
sub
);
continue
;
}
pRebVg
=
(
SMqRebOutputVg
*
)
pRemovedIter
;
// push
taosArrayPush
(
pConsumerEp
->
vgs
,
&
pRebVg
->
pVgEp
);
...
...
@@ -361,7 +363,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
ASSERT
(
pIter
==
NULL
);
// 7. handle unassigned vg
if
(
taosHashGetSize
(
pOutput
->
pSub
->
consumerHash
)
!=
0
)
{
// if has consumer, assign all left vg
...
...
@@ -377,9 +378,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
while
(
1
)
{
pIter
=
taosHashIterate
(
pOutput
->
pSub
->
consumerHash
,
pIter
);
ASSERT
(
pIter
);
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
ASSERT
(
pConsumerEp
->
consumerId
>
0
);
if
(
taosArrayGetSize
(
pConsumerEp
->
vgs
)
==
minVgCnt
)
{
break
;
}
...
...
@@ -404,19 +404,19 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter
=
taosHashIterate
(
pHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
pRebOutput
=
(
SMqRebOutputVg
*
)
pIter
;
ASSERT
(
pRebOutput
->
newConsumerId
==
-
1
);
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
taosArrayPush
(
pOutput
->
rebVgs
,
pRebOutput
);
mInfo
(
"
mq rebalance: unassign vgId:%d (second scan)"
,
pRebOutput
->
pVgEp
->
vgId
);
mInfo
(
"
sub:%s, mq rebalance unassign vgId:%d (second scan)"
,
sub
,
pRebOutput
->
pVgEp
->
vgId
);
}
}
// 8. generate logs
mInfo
(
"
mq rebalance: calculation completed, rebalanced vg:"
);
mInfo
(
"
sub:%s, mq rebalance calculation completed, rebalanced vg"
,
sub
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
mInfo
(
"
mq rebalance: vgId:%d, moved from consumer:%"
PRId64
", to consumer:%"
PRId64
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
mInfo
(
"
sub:%s, mq rebalance vgId:%d, moved from consumer:%"
PRId64
", to consumer:%"
PRId64
,
sub
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
}
{
void
*
pIter
=
NULL
;
...
...
@@ -425,10 +425,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
int32_t
sz
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
mInfo
(
"
mq rebalance: final cfg: consumer %"
PRId64
" has %d vg"
,
pConsumerEp
->
consumerId
,
sz
);
mInfo
(
"
sub:%s, mq rebalance final cfg: consumer %"
PRId64
" has %d vg"
,
sub
,
pConsumerEp
->
consumerId
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
pConsumerEp
->
vgs
,
i
);
mInfo
(
"mq rebalance: final cfg: vg %d to consumer %"
PRId64
""
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
mInfo
(
"sub:%s, mq rebalance final cfg: vg %d to consumer %"
PRId64
""
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
}
...
...
@@ -487,7 +488,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
consumerNum
=
taosArrayGetSize
(
pOutput
->
newConsumers
);
for
(
int32_t
i
=
0
;
i
<
consumerNum
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pOutput
->
newConsumers
,
i
);
ASSERT
(
consumerId
>
0
);
SMqConsumerObj
*
pConsumerOld
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
SMqConsumerObj
*
pConsumerNew
=
tNewSMqConsumerObj
(
pConsumerOld
->
consumerId
,
pConsumerOld
->
cgroup
);
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__ADD
;
...
...
@@ -497,7 +498,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush
(
pConsumerNew
->
rebNewTopics
,
&
topic
);
mndReleaseConsumer
(
pMnode
,
pConsumerOld
);
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
{
ASSERT
(
0
);
tDeleteSMqConsumerObj
(
pConsumerNew
);
taosMemoryFree
(
pConsumerNew
);
goto
REB_FAIL
;
...
...
@@ -510,7 +510,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
consumerNum
=
taosArrayGetSize
(
pOutput
->
removedConsumers
);
for
(
int32_t
i
=
0
;
i
<
consumerNum
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pOutput
->
removedConsumers
,
i
);
ASSERT
(
consumerId
>
0
);
SMqConsumerObj
*
pConsumerOld
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
SMqConsumerObj
*
pConsumerNew
=
tNewSMqConsumerObj
(
pConsumerOld
->
consumerId
,
pConsumerOld
->
cgroup
);
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__REMOVE
;
...
...
@@ -520,7 +520,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush
(
pConsumerNew
->
rebRemovedTopics
,
&
topic
);
mndReleaseConsumer
(
pMnode
,
pConsumerOld
);
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
{
ASSERT
(
0
);
tDeleteSMqConsumerObj
(
pConsumerNew
);
taosMemoryFree
(
pConsumerNew
);
goto
REB_FAIL
;
...
...
@@ -577,7 +576,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pRebInfo
->
key
,
topic
,
cgroup
,
true
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
/*ASSERT(pTopic);*/
if
(
pTopic
==
NULL
)
{
mError
(
"mq rebalance %s failed since topic %s not exist, abort"
,
pRebInfo
->
key
,
topic
);
continue
;
...
...
@@ -585,8 +583,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosRLockLatch
(
&
pTopic
->
lock
);
rebOutput
.
pSub
=
mndCreateSub
(
pMnode
,
pTopic
,
pRebInfo
->
key
);
if
(
rebOutput
.
pSub
==
NULL
)
{
mError
(
"mq rebalance %s failed create sub since %s, abort"
,
pRebInfo
->
key
,
terrstr
());
taosRUnLockLatch
(
&
pTopic
->
lock
);
mndReleaseTopic
(
pMnode
,
pTopic
);
continue
;
}
memcpy
(
rebOutput
.
pSub
->
dbName
,
pTopic
->
db
,
TSDB_DB_FNAME_LEN
);
ASSERT
(
taosHashGetSize
(
rebOutput
.
pSub
->
consumerHash
)
==
0
);
taosRUnLockLatch
(
&
pTopic
->
lock
);
mndReleaseTopic
(
pMnode
,
pTopic
);
...
...
@@ -606,7 +610,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// if add more consumer to balanced subscribe,
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
if
(
mndPersistRebResult
(
pMnode
,
pMsg
,
&
rebOutput
)
<
0
)
{
mError
(
"mq rebalance persist rebalance output error, possibly vnode splitted or dropped"
);
...
...
@@ -693,6 +696,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
int32_t
tlen
=
tEncodeSubscribeObj
(
NULL
,
pSub
);
if
(
tlen
<=
0
)
goto
SUB_ENCODE_OVER
;
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_SUBSCRIBE_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SUBSCRIBE
,
MND_SUBSCRIBE_VER_NUMBER
,
size
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
89425dc4
...
...
@@ -384,7 +384,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj
.
subType
=
pCreate
->
subType
;
topicObj
.
withMeta
=
pCreate
->
withMeta
;
if
(
topicObj
.
withMeta
)
{
ASSERT
(
topicObj
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
);
if
(
topicObj
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
if
(
pCreate
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
...
...
@@ -499,7 +503,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if
(
code
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
mndTransDrop
(
pTrans
);
ASSERT
(
0
);
return
-
1
;
}
void
*
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgHead
)
+
len
);
...
...
@@ -723,7 +726,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
// TODO check if rebalancing
if
(
mndDropSubByTopic
(
pMnode
,
pTrans
,
dropReq
.
name
)
<
0
)
{
/*ASSERT(0);*/
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
mndTransDrop
(
pTrans
);
mndReleaseTopic
(
pMnode
,
pTopic
);
...
...
source/util/src/terror.c
浏览文件 @
89425dc4
...
...
@@ -276,8 +276,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_OFFSET_NOT_EXIST
,
"Offset not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_CONSUMER_NOT_READY
,
"Consumer not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOPIC_SUBSCRIBED
,
"Topic subscribed cannot be dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOPIC_MUST_BE_DELETED
,
"Topic must be dropped first"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_CGROUP_USED
,
"Consumer group being used by some consumer"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOPIC_MUST_BE_DELETED
,
"Topic must be dropped first"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_SUB_OPTION
,
"Invalid subscribe option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_IN_REBALANCE
,
"Topic being rebalanced"
)
// mnode-stream
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录